In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col, when

# Impor library MLlib
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
)
from pyspark.ml.classification import (
    LogisticRegression, RandomForestClassifier, GBTClassifier
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
import pandas as pd

# Hentikan SparkSession jika ada yang aktif
try:
    spark.stop()
except:
    pass

# Buat SparkSession baru
spark = SparkSession.builder \
    .appName("ChurnModeling") \
    .config("spark.driver.memory", "12g") \
    .config("spark.driver.maxResultSize", "4g") \
    .getOrCreate()

print("SparkSession dan library MLlib siap.")

SparkSession dan library MLlib siap.


In [4]:
data_path = "data/processed_eda/train_members_grouped.parquet"
members_df = spark.read.parquet(data_path)

In [5]:
members_df.printSchema()

root
 |-- msno: string (nullable = true)
 |-- is_churn: integer (nullable = true)
 |-- city: integer (nullable = true)
 |-- bd: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: integer (nullable = true)
 |-- registration_init_time: integer (nullable = true)
 |-- bd_clean: string (nullable = true)
 |-- bd_int: integer (nullable = true)
 |-- age_group: string (nullable = true)



In [6]:
data_path = "data/processed_eda_rev/full_analysis_transactions.parquet"
transactions_df = spark.read.parquet(data_path)

In [7]:
transactions_df.printSchema()

root
 |-- msno: string (nullable = true)
 |-- payment_method_id: integer (nullable = true)
 |-- payment_plan_days: integer (nullable = true)
 |-- plan_list_price: integer (nullable = true)
 |-- actual_amount_paid: integer (nullable = true)
 |-- is_auto_renew: integer (nullable = true)
 |-- transaction_date: integer (nullable = true)
 |-- membership_expire_date: integer (nullable = true)
 |-- is_cancel: integer (nullable = true)
 |-- is_churn: integer (nullable = true)
 |-- city: integer (nullable = true)
 |-- bd: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: integer (nullable = true)
 |-- registration_init_time: integer (nullable = true)
 |-- bd_clean: string (nullable = true)
 |-- bd_int: integer (nullable = true)
 |-- age_group: string (nullable = true)
 |-- plan_days_group: string (nullable = true)
 |-- amount_paid_group: string (nullable = true)
 |-- list_price_group: string (nullable = true)
 |-- discount_status: string (nullable = true)



In [8]:
data_path = "data/processed_eda/plot_data_logs.parquet"
logs_df = spark.read.parquet(data_path)

In [9]:
logs_df.printSchema()

root
 |-- msno: string (nullable = true)
 |-- is_churn: integer (nullable = true)
 |-- city: integer (nullable = true)
 |-- bd: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: integer (nullable = true)
 |-- registration_init_time: integer (nullable = true)
 |-- bd_clean: string (nullable = true)
 |-- bd_int: integer (nullable = true)
 |-- age_group: string (nullable = true)
 |-- total_secs_sum: double (nullable = true)
 |-- total_active_days: long (nullable = true)
 |-- total_songs_100: long (nullable = true)
 |-- total_songs_all: long (nullable = true)
 |-- completion_rate: double (nullable = true)
 |-- listening_time_group: string (nullable = true)
 |-- active_days_group: string (nullable = true)
 |-- completion_habit: string (nullable = true)



In [12]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# --- 1. LOAD & PREPARE DATA (Sama seperti sebelumnya) ---
print("Memuat dan Menyiapkan Data untuk Age Group 0-17...")

# Filter Age Group
target_age_group = "0-17 (Remaja)"
members_df_filtered = members_df.filter(F.col("age_group") == target_age_group)

# Agregasi Transaksi
trans_agg = transactions_df.groupBy("msno").agg(
    F.count("payment_method_id").alias("total_transactions"),
    F.sum("payment_plan_days").alias("total_plan_days"),
    F.sum("actual_amount_paid").alias("total_amount_paid"),
    F.avg("actual_amount_paid").alias("avg_amount_paid"),
    F.sum("is_auto_renew").alias("count_auto_renew"),
    F.sum("is_cancel").alias("count_cancel"),
    F.mode("payment_method_id").alias("most_frequent_payment_method")
)

# Seleksi Logs
logs_selected = logs_df.select(
    "msno", "total_secs_sum", "total_active_days", 
    "total_songs_100", "completion_rate"
)

# Join & Fillna
final_df = members_df_filtered.join(trans_agg, "msno", "left").join(logs_selected, "msno", "left")
final_df = final_df.fillna(0, subset=[
    "total_transactions", "total_plan_days", "total_amount_paid", 
    "avg_amount_paid", "count_auto_renew", "count_cancel",
    "total_secs_sum", "total_active_days", "total_songs_100", "completion_rate",
    "most_frequent_payment_method"
])
final_df = final_df.fillna("Unknown", subset=["city", "registered_via"])

# Drop Columns
cols_to_drop = ["msno", "bd", "bd_clean", "bd_int", "registration_init_time", "gender", "age_group"]
final_df_model = final_df.drop(*cols_to_drop)
final_df.printSchema()

# --- 2. BUILD PIPELINE STAGES (PREPROCESSING) ---

label_col = "is_churn"
feature_cols = [c for c in final_df_model.columns if c != label_col]
cat_cols = [c for c in feature_cols if c in ["city", "registered_via", "most_frequent_payment_method"]]
num_cols = [c for c in feature_cols if c not in cat_cols]

stages = []

# Categorical -> Index -> Vector
for col in cat_cols:
    indexer = StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep")
    encoder = OneHotEncoder(inputCols=[f"{col}_idx"], outputCols=[f"{col}_vec"])
    stages += [indexer, encoder]

# Numeric -> Vector -> Scaled
num_assembler = VectorAssembler(inputCols=num_cols, outputCol="num_features_raw")
scaler = StandardScaler(inputCol="num_features_raw", outputCol="num_features_scaled")
stages += [num_assembler, scaler]

# Combine All -> Features
input_vecs = [f"{c}_vec" for c in cat_cols] + ["num_features_scaled"]
final_assembler = VectorAssembler(inputCols=input_vecs, outputCol="features")
stages += [final_assembler]

# --- 3. TRAINING DUA MODEL (GBT & RF) ---

# Split Data
train_data, test_data = final_df_model.randomSplit([0.8, 0.2], seed=42)
print(f"Data Latih: {train_data.count()}, Data Uji: {test_data.count()}")


# Definisi Model
gbt = GBTClassifier(labelCol=label_col, featuresCol="features", maxIter=20, seed=42)
rf = RandomForestClassifier(labelCol=label_col, featuresCol="features", numTrees=50, seed=42)

# Pipeline khusus tiap model
pipeline_gbt = Pipeline(stages=stages + [gbt])
pipeline_rf = Pipeline(stages=stages + [rf])

print("\n--- Melatih Model GBT ---")
model_gbt = pipeline_gbt.fit(train_data)
preds_gbt = model_gbt.transform(test_data)

print("\n--- Melatih Model Random Forest ---")
model_rf = pipeline_rf.fit(train_data)
preds_rf = model_rf.transform(test_data)

# --- 4. FUNGSI EVALUASI LENGKAP ---

def evaluate_model_complete(predictions, model_name):
    print(f"\n=== HASIL EVALUASI: {model_name} ===")
    
    # 1. Hitung AUC (ROC & PR)
    evaluator_roc = BinaryClassificationEvaluator(labelCol=label_col, metricName="areaUnderROC")
    evaluator_pr = BinaryClassificationEvaluator(labelCol=label_col, metricName="areaUnderPR")
    
    auc_roc = evaluator_roc.evaluate(predictions)
    auc_pr = evaluator_pr.evaluate(predictions)
    
    print(f"AUC-ROC : {auc_roc:.4f}")
    print(f"AUC-PR  : {auc_pr:.4f}")
    
    # 2. Hitung Confusion Matrix Manual untuk Metrik Detail
    # Kita convert ke Pandas (aman karena hasil agregasi kecil)
    cm = predictions.groupBy(label_col, "prediction").count().toPandas()
    
    # Ekstrak TP, TN, FP, FN
    try:
        tn = cm[(cm[label_col]==0) & (cm['prediction']==0)]['count'].values[0]
    except: tn = 0
    
    try:
        fp = cm[(cm[label_col]==0) & (cm['prediction']==1)]['count'].values[0]
    except: fp = 0
        
    try:
        fn = cm[(cm[label_col]==1) & (cm['prediction']==0)]['count'].values[0]
    except: fn = 0
        
    try:
        tp = cm[(cm[label_col]==1) & (cm['prediction']==1)]['count'].values[0]
    except: tp = 0
    
    # Hitung Rumus
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    print("-" * 30)
    print(f"Accuracy  : {accuracy:.4f}")
    print(f"Precision : {precision:.4f} (Ketepatan prediksi Churn)")
    print(f"Recall    : {recall:.4f}    (Daya tangkap Churn)")
    print(f"F1-Score  : {f1_score:.4f}")
    print("-" * 30)
    print(f"Confusion Matrix:\n TP: {tp} | FP: {fp}")
    print(f" FN: {fn} | TN: {tn}")

# --- 5. PRINT HASIL ---
evaluate_model_complete(preds_gbt, "GBT Classifier")
evaluate_model_complete(preds_rf, "Random Forest")

Memuat dan Menyiapkan Data untuk Age Group 0-17...
root
 |-- msno: string (nullable = true)
 |-- is_churn: integer (nullable = true)
 |-- city: integer (nullable = true)
 |-- bd: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: integer (nullable = true)
 |-- registration_init_time: integer (nullable = true)
 |-- bd_clean: string (nullable = true)
 |-- bd_int: integer (nullable = true)
 |-- age_group: string (nullable = true)
 |-- total_transactions: long (nullable = true)
 |-- total_plan_days: long (nullable = true)
 |-- total_amount_paid: long (nullable = true)
 |-- avg_amount_paid: double (nullable = false)
 |-- count_auto_renew: long (nullable = true)
 |-- count_cancel: long (nullable = true)
 |-- most_frequent_payment_method: integer (nullable = true)
 |-- total_secs_sum: double (nullable = false)
 |-- total_active_days: long (nullable = true)
 |-- total_songs_100: long (nullable = true)
 |-- completion_rate: double (nullable = false)

Data Latih

In [None]:
# --- 4. FUNGSI EVALUASI LENGKAP ---

def evaluate_model_complete(predictions, model_name):
    print(f"\n=== HASIL EVALUASI: {model_name} ===")
    
    # 1. Hitung AUC (ROC & PR)
    evaluator_roc = BinaryClassificationEvaluator(labelCol=label_col, metricName="areaUnderROC")
    evaluator_pr = BinaryClassificationEvaluator(labelCol=label_col, metricName="areaUnderPR")
    
    auc_roc = evaluator_roc.evaluate(predictions)
    auc_pr = evaluator_pr.evaluate(predictions)
    
    print(f"AUC-ROC : {auc_roc:.4f}")
    print(f"AUC-PR  : {auc_pr:.4f}")
    
    # 2. Hitung Confusion Matrix Manual untuk Metrik Detail
    # Kita convert ke Pandas (aman karena hasil agregasi kecil)
    cm = predictions.groupBy(label_col, "prediction").count().toPandas()
    
    # Ekstrak TP, TN, FP, FN
    try:
        tn = cm[(cm[label_col]==0) & (cm['prediction']==0)]['count'].values[0]
    except: tn = 0
    
    try:
        fp = cm[(cm[label_col]==0) & (cm['prediction']==1)]['count'].values[0]
    except: fp = 0
        
    try:
        fn = cm[(cm[label_col]==1) & (cm['prediction']==0)]['count'].values[0]
    except: fn = 0
        
    try:
        tp = cm[(cm[label_col]==1) & (cm['prediction']==1)]['count'].values[0]
    except: tp = 0
    
    # Hitung Rumus
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    print("-" * 30)
    print(f"Accuracy  : {accuracy:.4f}")
    print(f"Precision : {precision:.4f} (Ketepatan prediksi Churn)")
    print(f"Recall    : {recall:.4f}    (Daya tangkap Churn)")
    print(f"F1-Score  : {f1_score:.4f}")
    print("-" * 30)
    print(f"Confusion Matrix:\n TP: {tp} | FP: {fp}")
    print(f" FN: {fn} | TN: {tn}")

# --- 5. PRINT HASIL ---
evaluate_model_complete(preds_gbt, "GBT Classifier")
evaluate_model_complete(preds_rf, "Random Forest")


=== HASIL EVALUASI: GBT Classifier ===
AUC-ROC : 0.8126
AUC-PR  : 0.6569
------------------------------
Accuracy  : 0.7835
Precision : 0.6757 (Ketepatan prediksi Churn)
Recall    : 0.4363    (Daya tangkap Churn)
F1-Score  : 0.5302
------------------------------
Confusion Matrix:
 TP: 298 | FP: 143
 FN: 385 | TN: 1613

=== HASIL EVALUASI: Random Forest ===
AUC-ROC : 0.8070
AUC-PR  : 0.6369
------------------------------
Accuracy  : 0.7819
Precision : 0.7127 (Ketepatan prediksi Churn)
Recall    : 0.3704    (Daya tangkap Churn)
F1-Score  : 0.4875
------------------------------
Confusion Matrix:
 TP: 253 | FP: 102
 FN: 430 | TN: 1654


In [None]:
# trans_agg = transactions_df.groupBy("msno").agg(
#     F.count("payment_method_id").alias("total_transactions"),
#     F.sum("payment_plan_days").alias("total_plan_days"),
#     F.sum("actual_amount_paid").alias("total_amount_paid"),
#     F.avg("actual_amount_paid").alias("avg_amount_paid"),
#     F.sum("is_auto_renew").alias("count_auto_renew"),
#     F.sum("is_cancel").alias("count_cancel"),
#     F.mode("payment_method_id").alias("most_frequent_payment_method")
# )

# # Logs (Sama)
# logs_selected = logs_df.select("msno", "total_secs_sum", "total_active_days", "total_songs_100", "completion_rate")

# # Join Semua
# final_df = members_df.join(trans_agg, "msno", "left").join(logs_selected, "msno", "left")

# # Fillna
# final_df = final_df.fillna(0, subset=[
#     "total_transactions", "total_plan_days", "total_amount_paid", 
#     "avg_amount_paid", "count_auto_renew", "count_cancel",
#     "total_secs_sum", "total_active_days", "total_songs_100", "completion_rate",
#     "most_frequent_payment_method"
# ])
# final_df = final_df.fillna("Unknown", subset=["city", "registered_via", "age_group"])

# # DROP Columns (TAPI 'age_group' DISIMPAN!)
# cols_to_drop = ["msno", "bd", "bd_clean", "bd_int", "registration_init_time", "gender"] 
# final_df_model = final_df.drop(*cols_to_drop)

# # --- 2. PIPELINE (DENGAN AGE GROUP) ---
# label_col = "is_churn"
# feature_cols = [c for c in final_df_model.columns if c != label_col]
# # Masukkan age_group ke kategorikal
# cat_cols = [c for c in feature_cols if c in ["city", "registered_via", "most_frequent_payment_method", "age_group"]]
# num_cols = [c for c in feature_cols if c not in cat_cols]

# print("Fitur Kategorikal:", cat_cols)

# stages = []
# for col in cat_cols:
#     indexer = StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep")
#     encoder = OneHotEncoder(inputCols=[f"{col}_idx"], outputCols=[f"{col}_vec"])
#     stages += [indexer, encoder]

# num_assembler = VectorAssembler(inputCols=num_cols, outputCol="num_features_raw")
# scaler = StandardScaler(inputCol="num_features_raw", outputCol="num_features_scaled")
# stages += [num_assembler, scaler]

# input_vecs = [f"{c}_vec" for c in cat_cols] + ["num_features_scaled"]
# final_assembler = VectorAssembler(inputCols=input_vecs, outputCol="features")
# stages += [final_assembler]

# # --- 3. MODEL YANG LEBIH KUAT (TUNED HYPERPARAMETERS) ---
# # Kita naikkan maxIter dan maxDepth agar model lebih pintar
# gbt = GBTClassifier(
#     labelCol=label_col, 
#     featuresCol="features", 
#     maxIter=100,    # Naik dari 20 -> 100 (Belajar lebih lama)
#     maxDepth=5,     # Pohon lebih dalam (menangkap pola kompleks)
#     stepSize=0.1,   # Learning rate standar
#     seed=42
# )
# stages.append(gbt)
# pipeline = Pipeline(stages=stages)

# # --- 4. SPLIT & OVERSAMPLING (SAMA SEPERTI SEBELUMNYA) ---
# train_data, test_data = final_df_model.randomSplit([0.8, 0.2], seed=42)

# # Oversampling Logic (Wajib untuk Imbalance)
# major_df = train_data.filter(F.col(label_col) == 0)
# minor_df = train_data.filter(F.col(label_col) == 1)
# ratio = major_df.count() / minor_df.count()
# minor_oversampled = minor_df.sample(withReplacement=True, fraction=ratio, seed=42)
# train_data_balanced = major_df.unionAll(minor_oversampled)

# print("Melatih Model GBT (Tuned)...")
# model = pipeline.fit(train_data_balanced)

# print("Evaluasi...")
# predictions = model.transform(test_data)
# evaluate_model_complete(predictions, "GBT Classifier (Tuned + Full Data)")

Fitur Kategorikal: ['city', 'registered_via', 'age_group', 'most_frequent_payment_method']
Melatih Model GBT (Tuned)...
Evaluasi...

=== HASIL EVALUASI: GBT Classifier (Tuned + Full Data) ===
AUC-ROC : 0.9587
AUC-PR  : 0.7047
------------------------------
Accuracy  : 0.8497
Precision : 0.3788 (Ketepatan prediksi Churn)
Recall    : 0.9680    (Daya tangkap Churn)
F1-Score  : 0.5445
------------------------------
Confusion Matrix:
 TP: 19444 | FP: 31892
 FN: 642 | TN: 164547
