# 🛍️ Retail Sales Pipeline - Complete Tutorial

Bu notebook, **Medallion Architecture** kullanan modern bir veri pipeline'ının nasıl çalıştığını gösterir.

## 📋 İçindekiler
1. [Pipeline Mimarisi](#pipeline-mimarisi)
2. [Bronze Layer - Data Ingestion](#bronze-layer)
3. [Silver Layer - Data Cleaning](#silver-layer)
4. [Gold Layer - Business Analytics](#gold-layer)
5. [Advanced Analytics](#advanced-analytics)
6. [Dashboard & Visualization](#dashboard)
7. [Airflow Automation](#airflow)

---

## 🏗️ Pipeline Mimarisi {#pipeline-mimarisi}

**Medallion Architecture** 3 katmanlı veri mimarisidir:

```
📁 Raw Data (CSV/Excel)
    ↓
🥉 Bronze Layer (Ham Delta Tables)
    ↓
🥈 Silver Layer (Temizlenmiş Delta Tables)
    ↓
🥇 Gold Layer (İş Analitiği Tabloları)
    ↓
🔬 Advanced Analytics (RFM, CLTV, Forecasting)
    ↓
📊 Dashboard (Streamlit)
```

### 🎯 Her Katmanın Amacı:
- **Bronze:** Ham veriyi olduğu gibi sakla
- **Silver:** Veriyi temizle, validate et
- **Gold:** İş kararları için hazır tablolar oluştur

## 🛠️ Kurulum ve Hazırlık

In [None]:
# Gerekli kütüphaneleri import edelim
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from pyspark.sql.functions import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

# Spark session oluştur
builder = SparkSession.builder \
    .appName("RetailPipelineTutorial") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

print("✅ Spark session başlatıldı!")
print(f"📋 Spark version: {spark.version}")

## 🥉 Bronze Layer - Data Ingestion {#bronze-layer}

Ham veriyi CSV'den okuyup Delta formatında saklıyoruz.

In [None]:
# 1️⃣ CSV verisini oku
print("📖 CSV verisini okuyoruz...")
df_raw = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("data/input/retail_data.csv")

print(f"📊 Okunan veri: {df_raw.count():,} satır, {len(df_raw.columns)} sütun")

# Schema'yı incele
print("\n📋 Veri Şeması:")
df_raw.printSchema()

# İlk 5 satırı göster
print("\n🔍 İlk 5 Satır:")
df_raw.show(5)

In [None]:
# 2️⃣ Bronze katmanına Delta formatında kaydet
print("💾 Bronze katmanına kaydediliyor...")
df_raw.write.format("delta").mode("overwrite").save("delta/bronze/online_retail")

print("✅ Bronze layer tamamlandı!")
print("📁 Lokasyon: delta/bronze/online_retail")

## 🥈 Silver Layer - Data Cleaning {#silver-layer}

Bronze'dan veriyi alıp temizliyoruz.

In [None]:
# 3️⃣ Bronze'dan veriyi oku
print("📖 Bronze katmanından veriyi okuyoruz...")
df_bronze = spark.read.format("delta").load("delta/bronze/online_retail")

print(f"📊 Bronze veri: {df_bronze.count():,} satır")

# Veri kalitesini incele
print("\n🔍 Veri Kalitesi Analizi:")
print("NULL değerler:")
for col_name in df_bronze.columns:
    null_count = df_bronze.filter(col(col_name).isNull()).count()
    if null_count > 0:
        print(f"  {col_name}: {null_count:,} NULL değer")

In [None]:
# 4️⃣ Veri temizleme işlemleri
print("🧹 Veri temizleme işlemleri...")

df_silver = df_bronze \
    .dropna(subset=["InvoiceNo", "StockCode", "Description", "InvoiceDate", "CustomerID"]) \
    .withColumn("InvoiceDate", to_timestamp(col("InvoiceDate"))) \
    .withColumn("Quantity", col("Quantity").cast("int")) \
    .withColumn("UnitPrice", col("UnitPrice").cast("double")) \
    .withColumn("CustomerID", col("CustomerID").cast("string")) \
    .withColumn("Description", trim(col("Description"))) \
    .filter(col("Quantity") > 0) \
    .filter(col("UnitPrice") > 0)

print(f"📊 Temizlenmiş veri: {df_silver.count():,} satır")
print(f"🗑️ Silinen satır: {df_bronze.count() - df_silver.count():,}")

# Silver katmanına kaydet
df_silver.write.format("delta").mode("overwrite").save("delta/silver/online_retail_cleaned")

print("✅ Silver layer tamamlandı!")
print("📁 Lokasyon: delta/silver/online_retail_cleaned")

## 🥇 Gold Layer - Business Analytics {#gold-layer}

Silver'dan iş analitiği tablolarını oluşturuyoruz.

In [None]:
# 5️⃣ Silver'dan veriyi oku
print("📖 Silver katmanından veriyi okuyoruz...")
df_silver = spark.read.format("delta").load("delta/silver/online_retail_cleaned")

print(f"📊 Silver veri: {df_silver.count():,} satır")

In [None]:
# 6️⃣ GOLD TABLE 1: Günlük Satış Özeti
print("📈 Günlük satış özeti oluşturuluyor...")

daily_sales = df_silver \
    .withColumn("SaleDate", to_date("InvoiceDate")) \
    .groupBy("SaleDate") \
    .agg(
        sum("Quantity").alias("TotalQuantity"),
        sum(df_silver["Quantity"] * df_silver["UnitPrice"]).alias("TotalRevenue"),
        countDistinct("CustomerID").alias("UniqueCustomers"),
        count("InvoiceNo").alias("TotalTransactions")
    ) \
    .orderBy("SaleDate")

daily_sales.write.format("delta").mode("overwrite").save("delta/gold/daily_sales")

print(f"✅ Günlük satış tablosu: {daily_sales.count()} gün verisi")
daily_sales.show(5)

In [None]:
# 7️⃣ GOLD TABLE 2: En Çok Satılan Ürünler
print("🏆 En çok satılan ürünler oluşturuluyor...")

top_products = df_silver \
    .groupBy("StockCode", "Description") \
    .agg(
        sum("Quantity").alias("TotalSold"),
        sum(df_silver["Quantity"] * df_silver["UnitPrice"]).alias("TotalRevenue"),
        avg("UnitPrice").alias("AvgPrice")
    ) \
    .orderBy(col("TotalSold").desc()) \
    .limit(20)

top_products.write.format("delta").mode("overwrite").save("delta/gold/top_products")

print("✅ En çok satılan ürünler tablosu tamamlandı")
top_products.show(5)

In [None]:
# 8️⃣ GOLD TABLE 3: Ülke Bazlı Satış
print("🌍 Ülke bazlı satış oluşturuluyor...")

country_sales = df_silver \
    .groupBy("Country") \
    .agg(
        sum(df_silver["Quantity"] * df_silver["UnitPrice"]).alias("CountryRevenue"),
        sum("Quantity").alias("TotalQuantity"),
        countDistinct("CustomerID").alias("UniqueCustomers"),
        countDistinct("StockCode").alias("UniqueProducts")
    ) \
    .orderBy(col("CountryRevenue").desc())

country_sales.write.format("delta").mode("overwrite").save("delta/gold/country_sales")

print("✅ Ülke bazlı satış tablosu tamamlandı")
country_sales.show(5)

## 🔬 Advanced Analytics {#advanced-analytics}

İleri düzey müşteri analitiği: RFM, CLTV ve Forecasting

In [None]:
# 9️⃣ RFM ANALİZİ (Recency, Frequency, Monetary)
print("🎯 RFM analizi başlatılıyor...")

# En son tarih
latest_date = df_silver.agg(max("InvoiceDate")).collect()[0][0]
print(f"📅 En son tarih: {latest_date}")

# RFM hesaplama
rfm = df_silver \
    .withColumn("InvoiceDateOnly", to_date("InvoiceDate")) \
    .groupBy("CustomerID") \
    .agg(
        datediff(lit(latest_date), max("InvoiceDateOnly")).alias("Recency"),
        count("InvoiceNo").alias("Frequency"),
        sum(col("Quantity") * col("UnitPrice")).alias("Monetary")
    )

rfm.write.format("delta").mode("overwrite").save("delta/gold/rfm_table")

print(f"✅ RFM analizi tamamlandı: {rfm.count():,} müşteri")
rfm.orderBy(col("Monetary").desc()).show(5)

In [None]:
# 🔟 CLTV ANALİZİ (Customer Lifetime Value)
print("💰 CLTV analizi başlatılıyor...")

# Basit CLTV formülü: (Frequency * Monetary) / (Recency + 1)
cltv = rfm.withColumn(
    "CLTV", 
    (col("Frequency") * col("Monetary")) / (col("Recency") + lit(1))
)

cltv.write.format("delta").mode("overwrite").save("delta/gold/cltv_table")

print("✅ CLTV analizi tamamlandı")
cltv.orderBy(col("CLTV").desc()).show(5)

In [None]:
# 1️⃣1️⃣ FORECASTİNG (12 Aylık Gelir Tahmini)
print("🔮 12 aylık gelir tahmini başlatılıyor...")

# Basit tahmin: CLTV * 12 ay
forecast = cltv.withColumn("ExpectedRevenue_12M", col("CLTV") * lit(12))

forecast.write.format("delta").mode("overwrite").save("delta/gold/forecast_table")

total_forecast = forecast.agg(sum("ExpectedRevenue_12M")).collect()[0][0]
print(f"✅ Forecasting tamamlandı")
print(f"🎯 Toplam 12 aylık tahmini gelir: ${total_forecast:,.2f}")

forecast.orderBy(col("ExpectedRevenue_12M").desc()).show(5)

## 📊 Data Visualization {#dashboard}

Oluşturduğumuz tabloları görselleştirelim.

In [None]:
# 1️⃣2️⃣ Günlük Satış Trendi
print("📈 Günlük satış trendi çiziliyor...")

# Pandas'a çevir
daily_df = daily_sales.toPandas()
daily_df['SaleDate'] = pd.to_datetime(daily_df['SaleDate'])

plt.figure(figsize=(12, 6))
plt.plot(daily_df['SaleDate'], daily_df['TotalRevenue'])
plt.title('📈 Günlük Satış Cirosu Trendi')
plt.xlabel('Tarih')
plt.ylabel('Toplam Ciro ($)')
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()

print(f"📊 Toplam satış günü: {len(daily_df)} gün")
print(f"💰 Toplam ciro: ${daily_df['TotalRevenue'].sum():,.2f}")

In [None]:
# 1️⃣3️⃣ En Çok Satılan Ürünler
print("🏆 En çok satılan ürünler çiziliyor...")

products_df = top_products.toPandas()

plt.figure(figsize=(12, 8))
plt.barh(products_df['Description'][:10], products_df['TotalSold'][:10])
plt.title('🏆 En Çok Satılan 10 Ürün')
plt.xlabel('Satış Adedi')
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()

print(f"📦 Toplam ürün çeşidi: {len(products_df)}")

In [None]:
# 1️⃣4️⃣ Ülke Bazlı Satış Dağılımı
print("🌍 Ülke bazlı satış dağılımı çiziliyor...")

country_df = country_sales.toPandas()

plt.figure(figsize=(12, 8))
plt.pie(country_df['CountryRevenue'][:5], 
        labels=country_df['Country'][:5], 
        autopct='%1.1f%%',
        startangle=90)
plt.title('🌍 Top 5 Ülke Satış Dağılımı')
plt.axis('equal')
plt.show()

print(f"🌎 Toplam ülke: {len(country_df)} ülke")
print(f"🇬🇧 En çok satış yapan ülke: {country_df.iloc[0]['Country']}")

In [None]:
# 1️⃣5️⃣ RFM Analizi Görselleştirme
print("🎯 RFM analizi görselleştiriliyor...")

rfm_df = rfm.toPandas()

fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Recency dağılımı
axes[0].hist(rfm_df['Recency'], bins=50, alpha=0.7, color='red')
axes[0].set_title('📅 Recency Dağılımı')
axes[0].set_xlabel('Gün')

# Frequency dağılımı
axes[1].hist(rfm_df['Frequency'], bins=50, alpha=0.7, color='blue')
axes[1].set_title('🔄 Frequency Dağılımı')
axes[1].set_xlabel('Alışveriş Sayısı')

# Monetary dağılımı
axes[2].hist(rfm_df['Monetary'], bins=50, alpha=0.7, color='green')
axes[2].set_title('💰 Monetary Dağılımı')
axes[2].set_xlabel('Toplam Harcama ($)')

plt.tight_layout()
plt.show()

print(f"🎯 RFM istatistikleri:")
print(f"   📅 Ortalama Recency: {rfm_df['Recency'].mean():.1f} gün")
print(f"   🔄 Ortalama Frequency: {rfm_df['Frequency'].mean():.1f} alışveriş")
print(f"   💰 Ortalama Monetary: ${rfm_df['Monetary'].mean():.2f}")

## 🤖 Apache Airflow Automation {#airflow}

Pipeline'ınızı Airflow ile otomatize etmek için:

### 🚀 Airflow DAG'larını Çalıştırma

```bash
# Airflow home dizinini ayarla
export AIRFLOW_HOME=$(pwd)/airflow

# DAG'ları listele
airflow dags list

# Manuel DAG tetikleme
airflow dags trigger retail_sales_pipeline

# DAG durumunu kontrol et
airflow dags state retail_sales_pipeline

# Web interface başlat
airflow webserver --port 8080
```

### 📋 Kullanılabilir DAG'lar

1. **`retail_sales_pipeline`** - Tam pipeline (Bronze → Silver → Gold → Advanced)
2. **`bronze_ingestion_dag`** - Sadece Bronze layer
3. **`silver_cleaning_dag`** - Sadece Silver layer

### 🔧 Manuel Pipeline Çalıştırma

```bash
# Tek komutla tüm pipeline'ı çalıştır
./run_complete_pipeline.sh
```

## 📊 Dashboard Başlatma

Pipeline tamamlandıktan sonra Streamlit dashboard'unu başlatın:

```bash
# Conda environment aktif et
conda activate spark-delta-env

# Dashboard'u başlat
streamlit run dashboard/app.py --server.port 8504
```

Dashboard şunları içerir:
- 📈 Günlük satış cirosu grafiği
- 🏆 En çok satılan ürünler
- 🌍 Ülke bazlı satış dağılımı
- 🎯 RFM analizi tabloları
- 💰 CLTV analizi
- 🔮 12 aylık gelir tahminleri

## 🎯 Özet ve Sonuçlar

Bu tutorial'da öğrendikleriniz:

### ✅ Teknik Beceriler
- **Apache Spark** ile büyük veri işleme
- **Delta Lake** ile modern veri gölü mimarisi
- **Medallion Architecture** prensipleri
- **Apache Airflow** ile pipeline otomasyonu
- **Streamlit** ile dashboard geliştirme

### 📊 Analitik Beceriler
- **RFM Analizi** ile müşteri segmentasyonu
- **CLTV Hesaplama** ile müşteri değer analizi
- **Forecasting** ile gelir tahminleme
- **İş analitiği** tabloları oluşturma

### 🚀 Production Ready Features
- **Error handling** ve monitoring
- **Scalable** mimari
- **Automated** pipeline execution
- **Interactive** dashboard

---

**🎉 Tebrikler!** Modern bir veri mühendisliği projesi tamamladınız!

In [None]:
# Spark session'ı kapat
spark.stop()
print("✅ Spark session kapatıldı!")
print("🎉 Tutorial tamamlandı!")