# TDSP Stage 3 – Modeling

## Proyek: Segmentasi Pelanggan Online Retail Berbasis RFM (Spark)

Stage ini berfokus pada **feature engineering RFM**, **transformasi data**, dan **clustering pelanggan** menggunakan Apache Spark.
Hasil dari stage ini adalah segmen pelanggan yang bermakna dan dapat diinterpretasikan secara bisnis.

## 3.1 Tujuan Modeling

Tujuan utama tahap modeling adalah:

1. Mengubah data transaksi menjadi **fitur RFM** pada level pelanggan.
2. Menyiapkan dataset yang siap untuk algoritma clustering.
3. Mengelompokkan pelanggan berdasarkan kemiripan perilaku transaksi.
4. Menghasilkan segmentasi yang **interpretable** dan **actionable** bagi bisnis.

Pendekatan yang digunakan bersifat **unsupervised learning**, karena tidak terdapat label target.

In [7]:
# ==============================================================
# 3.1 Inisialisasi Spark Session (Wajib di awal)
# ==============================================================
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import col

# Inisialisasi Spark
spark = (
    SparkSession.builder
    .appName("RFM-Customer-Segmentation-Stage3")
    .getOrCreate()
)

## 3.2 Persiapan Data Modeling

Dataset yang digunakan adalah hasil filter dari TDSP Stage 2.
Langkah awal:
- Pastikan kolom tanggal dalam format timestamp
- Hitung nilai transaksi (Amount)
- Tentukan *reference date* untuk perhitungan Recency

In [8]:
# ==============================================================
# 3.2 Load Data & Preprocessing (Metode CSV Bridge - Fixed Lazy Eval)
# ==============================================================
import pandas as pd
import os
from pyspark.sql.functions import col, to_timestamp

# Path Data
PARQUET_PATH = "../datasets/online_retail_stage2.parquet"
TEMP_CSV_PATH = "../datasets/temp_online_retail.csv"

print("1. Loading Parquet via Pandas...")
pdf = pd.read_parquet(PARQUET_PATH)
pdf["InvoiceDate"] = pdf["InvoiceDate"].astype("datetime64[us]")

print("2. Saving to Temp CSV...")
pdf.to_csv(TEMP_CSV_PATH, index=False)

print("3. Loading CSV into Spark...")
df_temp = spark.read.csv(TEMP_CSV_PATH, header=True, inferSchema=True)

# Transformasi Tipe Data
df_stage3 = df_temp.withColumn("InvoiceDate", to_timestamp(col("InvoiceDate"))) \
                   .withColumn("CustomerID", col("CustomerID").cast("integer")) \
                   .withColumn("Quantity", col("Quantity").cast("integer")) \
                   .withColumn("UnitPrice", col("UnitPrice").cast("double"))

# --- BAGIAN PENTING ---
# Kita Cache ke memory & trigger Action (count) DULU sebelum hapus file
print("4. Caching data to memory...")
df_stage3.cache() 
row_count = df_stage3.count() # Ini memaksa Spark membaca CSV sekarang juga
print(f"Data loaded successfully: {row_count} rows")

# 5. Sekarang aman untuk hapus file temp
if os.path.exists(TEMP_CSV_PATH):
    os.remove(TEMP_CSV_PATH)
    print("   (Temp CSV deleted)")

df_stage3.printSchema()
df_stage3.show(5)

1. Loading Parquet via Pandas...
2. Saving to Temp CSV...
3. Loading CSV into Spark...
4. Caching data to memory...
Data loaded successfully: 361878 rows
   (Temp CSV deleted)
root
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

+--------------------+--------+-------------------+---------+----------+--------------+
|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+--------------------+--------+-------------------+---------+----------+--------------+
|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|
| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|
|KNITTED UNION FLA...|       6|2010-1

## 3.3 Feature Engineering – Monetary

**Monetary** merepresentasikan total nilai uang yang dibelanjakan pelanggan.

Langkah:
- Hitung `Amount = Quantity × Price`
- Agregasi total Amount per pelanggan (PostCode)

In [9]:
# ==============================================================
# 3.3 Hitung Monetary
# ==============================================================
from pyspark.sql.functions import sum as spark_sum, round as spark_round

# 1. Hitung total per baris transaksi
df_stage3 = df_stage3.withColumn("TotalPrice", col("Quantity") * col("UnitPrice"))

# 2. Group by CustomerID untuk mendapatkan Monetary Value
monetary_df = df_stage3.groupBy("CustomerID") \
    .agg(spark_round(spark_sum("TotalPrice"), 2).alias("Monetary"))

monetary_df.show(5)

+----------+--------+
|CustomerID|Monetary|
+----------+--------+
|     17420|  598.83|
|     16861|  151.65|
|     16503| 1421.43|
|     15727| 5178.96|
|     17389|31300.08|
+----------+--------+
only showing top 5 rows



## 3.4 Feature Engineering – Recency & Frequency

- **Recency**: jarak waktu (hari) antara transaksi terakhir pelanggan dengan tanggal referensi
- **Frequency**: jumlah transaksi unik (Invoice) per pelanggan


In [10]:
# ==============================================================
# 3.4 Hitung Recency & Frequency (Fix Missing InvoiceNo)
# ==============================================================
from pyspark.sql.functions import max as spark_max, countDistinct, datediff, lit, col

# 1. Tentukan Reference Date (Max Date di dataset + 1 hari)
max_date_row = df_stage3.agg(spark_max("InvoiceDate")).collect()[0][0]
print(f"Last Transaction Date in Data: {max_date_row}")

# 2. Hitung Recency & Frequency
#    CATATAN: Kita gunakan countDistinct("InvoiceDate") sebagai pengganti "InvoiceNo"
rf_df = df_stage3.groupBy("CustomerID") \
    .agg(
        spark_max("InvoiceDate").alias("LastPurchaseDate"),
        countDistinct("InvoiceDate").alias("Frequency") 
    ) \
    .withColumn("Recency", datediff(lit(max_date_row), col("LastPurchaseDate"))) \
    .drop("LastPurchaseDate")

# 3. Pastikan monetary_df ada (Hitung ulang cepat jika variable hilang)
#    Ini defensive coding agar cell ini mandiri
df_temp_m = df_stage3.withColumn("TotalPrice", col("Quantity") * col("UnitPrice"))
monetary_df = df_temp_m.groupBy("CustomerID").agg(spark_max("TotalPrice").alias("Monetary")) 
# Note: Idealnya sum(), tapi di snippet ini saya pakai logic sederhana dulu agar jalan. 
# Koreksi: Mari pakai SUM agar benar secara bisnis.
from pyspark.sql.functions import sum as spark_sum
monetary_df = df_temp_m.groupBy("CustomerID").agg(spark_sum("TotalPrice").alias("Monetary"))


# 4. Gabungkan (Join) RFM menjadi satu DataFrame utama
rfm_df = rf_df.join(monetary_df, on="CustomerID", how="inner")

# Validasi hasil
print("Sample Data RFM:")
rfm_df.show(5)
print(f"Total Unique Customers: {rfm_df.count()}")

Last Transaction Date in Data: 2011-12-09 12:49:00
Sample Data RFM:
+----------+---------+-------+-----------------+
|CustomerID|Frequency|Recency|         Monetary|
+----------+---------+-------+-----------------+
|     14450|        3|    180|           483.25|
|     16861|        3|     59|           151.65|
|     17389|       43|      0|         31300.08|
|     17420|        3|     50|598.8299999999999|
|     16503|        5|    106|          1421.43|
+----------+---------+-------+-----------------+
only showing top 5 rows

Total Unique Customers: 3950


## 3.5 Pemeriksaan Distribusi RFM

Distribusi RFM biasanya **skewed**, khususnya Monetary.
Pemeriksaan statistik deskriptif penting sebelum scaling dan clustering.

In [11]:
# ==============================================================
# 3.5 Statistik Deskriptif RFM
# ==============================================================
# Cek statistik dasar untuk melihat range dan skewness
rfm_df.select("Recency", "Frequency", "Monetary").summary().show()

+-------+------------------+-----------------+------------------+
|summary|           Recency|        Frequency|          Monetary|
+-------+------------------+-----------------+------------------+
|  count|              3950|             3950|              3950|
|   mean| 91.32303797468354|4.989367088607595|1713.3856693670866|
| stddev|100.23684765413593|8.614673469817246| 6548.608224207452|
|    min|                 0|                1|          -4287.63|
|    25%|                16|                1|282.18999999999994|
|    50%|                50|                3|            626.99|
|    75%|               143|                5|           1521.79|
|    max|               373|              225|256438.48999999996|
+-------+------------------+-----------------+------------------+



## 3.6 Feature Scaling

Karena perbedaan skala antar fitur RFM cukup besar,
maka diperlukan **normalisasi / scaling** sebelum clustering.

Metode yang digunakan:
- `StandardScaler` (mean = 0, std = 1)

In [12]:
# ==============================================================
# 3.6 Vector Assembly & Standard Scaling
# ==============================================================
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# 1. Assemble fitur menjadi satu kolom vektor
assembler = VectorAssembler(
    inputCols=["Recency", "Frequency", "Monetary"],
    outputCol="features_vec"
)

# 2. Standard Scaler
scaler = StandardScaler(
    inputCol="features_vec",
    outputCol="scaled_features",
    withStd=True,
    withMean=True
)

# 3. Buat Pipeline Preprocessing
pipeline_prep = Pipeline(stages=[assembler, scaler])
model_prep = pipeline_prep.fit(rfm_df)
rfm_scaled = model_prep.transform(rfm_df)

rfm_scaled.select("CustomerID", "scaled_features").show(5, truncate=False)

+----------+----------------------------------------------------------------+
|CustomerID|scaled_features                                                 |
+----------+----------------------------------------------------------------+
|14450     |[0.8846742899506721,-0.23092774155371382,-0.18784688704078964]  |
|16861     |[-0.32246662510989116,-0.23092774155371382,-0.2384836007739786] |
|17389     |[-0.9110725258419013,4.412312671463191,4.518012578804657]       |
|17420     |[-0.41225396589951985,-0.23092774155371382,-0.17019733525164013]|
|16503     |[0.1464228212359475,0.0012342790971314125,-0.04458285781822258] |
+----------+----------------------------------------------------------------+
only showing top 5 rows



## 3.7 Clustering Pelanggan (K-Means)

Algoritma **K-Means** digunakan untuk mengelompokkan pelanggan berdasarkan fitur RFM.

Jumlah cluster ditentukan berdasarkan:
- Interpretabilitas bisnis
- Eksperimen awal

Pada tahap ini digunakan **k = 5**, sesuai praktik umum segmentasi RFM.

In [13]:
# ==============================================================
# 3.7 Training K-Means
# ==============================================================
from pyspark.ml.clustering import KMeans

# Inisialisasi K-Means
kmeans = KMeans(featuresCol="scaled_features", k=5, seed=42)

# Training Model
model_kmeans = kmeans.fit(rfm_scaled)

# Prediksi Cluster
predictions = model_kmeans.transform(rfm_scaled)

# Menampilkan hasil prediksi (Cluster 0 s/d 4)
predictions.select("CustomerID", "Recency", "Frequency", "Monetary", "prediction").show(10)

# Simpan hasil prediksi untuk Stage 4 (Deployment/Analisis Lanjutan)
# predictions.write.parquet("../datasets/rfm_clustered.parquet", mode="overwrite")

+----------+-------+---------+------------------+----------+
|CustomerID|Recency|Frequency|          Monetary|prediction|
+----------+-------+---------+------------------+----------+
|     14450|    180|        3|            483.25|         2|
|     16861|     59|        3|            151.65|         3|
|     17389|      0|       43|          31300.08|         0|
|     17420|     50|        3| 598.8299999999999|         3|
|     16503|    106|        5|           1421.43|         3|
|     15447|    330|        1|            155.17|         2|
|     15727|     16|        7| 5178.960000000001|         1|
|     13623|     30|        7|            672.44|         1|
|     13285|     23|        4|           2709.12|         1|
|     14570|    280|        2|218.05999999999992|         2|
+----------+-------+---------+------------------+----------+
only showing top 10 rows



## 3.8 Interpretasi Awal Cluster

Langkah awal interpretasi:
- Hitung statistik RFM per cluster
- Bandingkan karakteristik antar cluster

Interpretasi ini akan menjadi dasar rekomendasi bisnis pada stage selanjutnya.

In [14]:
# ==============================================================
# 3.8 Cluster Profiling (Analisis Rata-rata)
# ==============================================================
from pyspark.sql.functions import avg, count

# Hitung rata-rata R, F, M dan jumlah member per cluster
cluster_analysis = predictions.groupBy("prediction") \
    .agg(
        count("CustomerID").alias("Num_Customers"),
        spark_round(avg("Recency"), 0).alias("Avg_Recency"),
        spark_round(avg("Frequency"), 1).alias("Avg_Frequency"),
        spark_round(avg("Monetary"), 2).alias("Avg_Monetary")
    ) \
    .orderBy("prediction")

cluster_analysis.show()

# Penjelasan singkat (Manual Inspection based on output):
print("Interpretasi (Contoh Generic):")
print("- Cluster dengan Recency Rendah, Freq & Monetary Tinggi = 'Champions'")
print("- Cluster dengan Recency Tinggi, Freq & Monetary Rendah = 'Lost/Churned'")

+----------+-------------+-----------+-------------+------------+
|prediction|Num_Customers|Avg_Recency|Avg_Frequency|Avg_Monetary|
+----------+-------------+-----------+-------------+------------+
|         0|           50|        5.0|         53.8|     29624.7|
|         1|         1219|       18.0|          9.2|      2724.9|
|         2|          841|      262.0|          1.8|      418.35|
|         3|         1838|       65.0|          2.3|      636.15|
|         4|            2|        4.0|         56.0|   221960.33|
+----------+-------------+-----------+-------------+------------+

Interpretasi (Contoh Generic):
- Cluster dengan Recency Rendah, Freq & Monetary Tinggi = 'Champions'
- Cluster dengan Recency Tinggi, Freq & Monetary Rendah = 'Lost/Churned'


## Ringkasan Stage 3

Pada TDSP Stage 3 ini, kita telah:
- Melakukan feature engineering RFM menggunakan Spark
- Melakukan scaling fitur untuk clustering
- Mengelompokkan pelanggan menggunakan K-Means
- Menghasilkan segmentasi pelanggan awal

Tahap berikutnya adalah **TDSP Stage 4 – Deployment**,
yang berfokus pada penyajian hasil, insight bisnis, dan output yang siap dikonsumsi stakeholder.