# 1. Read the Dataset

In [0]:
#Read file by using AWS
df_train = spark.read.csv(
    "s3a://churn-databricks/raw/customer_churn_dataset-training-master.csv",
    header=True,
    inferSchema=True
)

df_test = spark.read.csv(
    "s3a://churn-databricks/raw/customer_churn_dataset-testing-master.csv",
    header=True,
    inferSchema=True
)

In [0]:
%python
df_train = spark.read.csv(
    "/Volumes/workspace/default/data_customers/customer_churn_dataset-training-master.csv",
    header=True,
    inferSchema=True,
)

df_test = spark.read.csv(
    "/Volumes/workspace/default/data_customers/customer_churn_dataset-testing-master.csv",
    header=True,
    inferSchema=True,
)


# 2. EDA

In [0]:
df_train = df_train.drop("CustomerID")
df_test = df_test.drop("CustomerID")

In [0]:
%python
df_train.printSchema()

In [0]:
df_test.printSchema()

In [0]:
%python
display(df_train.describe())

In [0]:
%python
from pyspark.sql.functions import col, sum, when

missing_df_train= df_train.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_train.columns
])

display(missing_df_train)

In [0]:
%python
df_train = df_train.dropna()
df_test = df_test.dropna()


df_train.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_train.columns
]).show()


In [0]:
df_train = df_train.dropDuplicates()
df_test = df_test.dropDuplicates()

In [0]:
%python
for c in df_train.columns:
    print(c, df_train.select(c).distinct().count())

In [0]:
%python
import matplotlib.pyplot as plt
import seaborn as sns

churn_pd = df_train.select("Churn").toPandas()

plt.figure(figsize=(6,4))
ax = sns.countplot(x='Churn', data=churn_pd)

for p in ax.patches:
    ax.annotate(
        f'{int(p.get_height())}',
        (p.get_x() + p.get_width()/2., p.get_height()),
        ha='center',
        va='bottom'
    )

plt.show()

In [0]:
num_cols = [
    f.name for f in df_train.schema.fields
    if f.dataType.simpleString() in ["int", "double"]
    and f.name != "Churn"
]

print("Numeric columns:", num_cols)

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

for col in num_cols:
    
    # convert c·ªôt Spark -> Pandas
    data = df_train.select(col).toPandas()[col]
    
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))

    # Histogram
    axes[0].hist(data, bins=30, density=True, alpha=0.7)
    axes[0].set_title(f"Histogram of {col}")
    axes[0].set_xlabel(col)
    axes[0].set_ylabel("Density")
    axes[0].grid(alpha=0.3)

    # KDE
    sns.kdeplot(data, fill=True, ax=axes[1], color='orange')
    axes[1].set_title(f"KDE of {col}")
    axes[1].set_xlabel(col)
    axes[1].set_ylabel("Density")
    axes[1].grid(alpha=0.3)

    plt.tight_layout()
    plt.show()

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# N·∫øu data l·ªõn th√¨ sample tr∆∞·ªõc
sample_df = df_train.sample(fraction=0.2, seed=42)

for col in num_cols:
    
    # Spark -> Pandas
    data = sample_df.select(col).toPandas()[col]

    fig, axes = plt.subplots(1, 2, figsize=(12,4))

    # Boxplot
    axes[0].boxplot(
        data,
        vert=False,
        patch_artist=True,
        boxprops=dict(facecolor='skyblue', alpha=0.7),
        medianprops=dict(color='red')
    )
    axes[0].set_title(f"Boxplot of {col}")
    axes[0].set_xlabel(col)
    axes[0].grid(axis='x', alpha=0.3)

    # Violin Plot
    sns.violinplot(
        x=data,
        ax=axes[1],
        inner='quartile',
        color='orange'
    )
    axes[1].set_title(f"Violin Plot of {col}")
    axes[1].set_xlabel(col)
    axes[1].grid(axis='x', alpha=0.3)

    plt.tight_layout()
    plt.show()

In [0]:
from pyspark.sql.functions import avg

churn_rate = (
    df_train
    .groupBy("Support Calls")
    .agg(avg("Churn").alias("churn_rate"))
    .orderBy("Support Calls")
)

churn_pd = churn_rate.toPandas()

plt.figure(figsize=(8,5))
plt.bar(churn_pd['Support Calls'], churn_pd['churn_rate'])

plt.xlabel("Number of Support Calls")
plt.ylabel("Churn Rate")
plt.title("Churn Rate by Support Calls")
plt.ylim(0,1)

plt.show()

In [0]:
cols = num_cols + ["Churn"]
pdf = df_train.select(cols).toPandas()

corr = pdf.corr()

plt.figure(figsize=(10,8))
sns.heatmap(corr, annot=True, cmap='coolwarm')
plt.title("Correlation Matrix including Churn")
plt.show()

### Check Categorical Columns

In [0]:
categorical_cols = [f.name for f in df_train.schema.fields 
                    if f.dataType.simpleString() == 'string']
categorical_cols

In [0]:
for col in categorical_cols:
    print(f"\n===== {col} =====")
    df_train.groupBy(col).count().orderBy("count", ascending=False).show()

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# Convert to√†n b·ªô categorical columns sang pandas m·ªôt l·∫ßn
pdf = df_train.select(categorical_cols).toPandas()

for col in categorical_cols:
    plt.figure(figsize=(6,4))
    
    sns.countplot(
        x=pdf[col],
        order=pdf[col].value_counts().index
    )
    
    plt.title(f"{col} Distribution")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

### Data Engineering : New Column

In [0]:
from pyspark.sql.functions import when, col

# Train
df_train = df_train.withColumn(
    "Age_group",
    when(col("Age") < 30, "Young (<30)")
    .when((col("Age") >= 30) & (col("Age") < 50), "Adult (30-50)")
    .otherwise("Senior (50+)")
)

# Test
df_test = df_test.withColumn(
    "Age_group",
    when(col("Age") < 30, "Young (<30)")
    .when((col("Age") >= 30) & (col("Age") < 50), "Adult (30-50)")
    .otherwise("Senior (50+)")
)

In [0]:
import matplotlib.pyplot as plt

# Spark -> Pandas
pdf = df_train.select("Age_group", "Churn").toPandas()

# Group & pivot b·∫±ng pandas
pivot = (
    pdf.groupby(["Age_group", "Churn"])
       .size()
       .unstack(fill_value=0)
)

# Plot
plt.figure(figsize=(12,6))
pivot.plot(kind='bar')
plt.title('Age Group vs Churn')
plt.xlabel('Age Group')
plt.ylabel('Count')
plt.xticks(rotation=45)
plt.legend(title='Churn')
plt.tight_layout()
plt.show()

### Check Last Interaction with Churn

In [0]:
import matplotlib.pyplot as plt

# Spark -> Pandas
pdf = df_train.select("Last Interaction", "Churn").toPandas()

# Group + pivot
pivot = (
    pdf.groupby(["Last Interaction", "Churn"])
       .size()
       .unstack(fill_value=0)
)

# Chu·∫©n ho√° theo % t·ª´ng h√†ng
pivot_norm = pivot.div(pivot.sum(axis=1), axis=0)

# Plot
plt.figure(figsize=(12,6))
pivot_norm.plot(kind='bar', stacked=True)

plt.title("Percentage Distribution of Churn by Last Interaction")
plt.ylabel("Percentage")
plt.xlabel("Last Interaction")
plt.legend(title="Churn")
plt.tight_layout()
plt.show()

# 4.Feature Engineering

In [0]:
from pyspark.sql.functions import col

# ===== TRAIN =====
df_train = df_train \
    .withColumn("Usage_Per_Tenure",
                col("Usage Frequency") / (col("Tenure") + 1)) \
    .withColumn("Spend_Per_Usage",
                col("Total Spend") / (col("Usage Frequency") + 1)) \
    .withColumn("Spend_Per_Tenure",
                col("Total Spend") / (col("Tenure") + 1)) \
    .withColumn("Payment_Delay_Ratio",
                col("Payment Delay") / 30) \
    .withColumn("Engagement_Score",
                (col("Usage Frequency") * col("Total Spend")) / 1000)

# ===== TEST =====
df_test = df_test \
    .withColumn("Usage_Per_Tenure",
                col("Usage Frequency") / (col("Tenure") + 1)) \
    .withColumn("Spend_Per_Usage",
                col("Total Spend") / (col("Usage Frequency") + 1)) \
    .withColumn("Spend_Per_Tenure",
                col("Total Spend") / (col("Tenure") + 1)) \
    .withColumn("Payment_Delay_Ratio",
                col("Payment Delay") / 30) \
    .withColumn("Engagement_Score",
                (col("Usage Frequency") * col("Total Spend")) / 1000)

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline



indexers = [
    StringIndexer(
        inputCol=c,
        outputCol=c+"_index",
        handleInvalid="keep"
    )
    for c in categorical_cols
]

encoder = OneHotEncoder(
    inputCols=[c+"_index" for c in categorical_cols],
    outputCols=[c+"_vec" for c in categorical_cols]
)

numeric_cols = [f.name for f in df_train.schema.fields 
                if f.dataType.simpleString() in ['int', 'double']
                and f.name != "Churn"]

assembler = VectorAssembler(
    inputCols=numeric_cols + [c+"_vec" for c in categorical_cols],
    outputCol="features"
)

pipeline = Pipeline(stages=indexers + [encoder, assembler])

df_train_processed = pipeline.fit(df_train).transform(df_train)

# Fit tr√™n TRAIN
pipeline_model = pipeline.fit(df_train)

# Transform TRAIN
df_train_processed = pipeline_model.transform(df_train)

# Transform TEST (quan tr·ªçng!)
df_test_processed = pipeline_model.transform(df_test)

display(df_train_processed)

### Modeling

In [0]:
train_full = df_train_processed
test_final = df_test_processed

train_data, val_data = train_full.randomSplit([0.8, 0.2], seed=42)

In [0]:
from pyspark.ml.classification import (
    LogisticRegression,
    DecisionTreeClassifier,
    RandomForestClassifier,
    GBTClassifier,
    LinearSVC
)

models = {
    "Logistic Regression": LogisticRegression(
        featuresCol="features",
        labelCol="Churn",
        maxIter=100
    ),

    "Decision Tree": DecisionTreeClassifier(
        featuresCol="features",
        labelCol="Churn",
        maxDepth=5
    ),

    "Random Forest": RandomForestClassifier(
        featuresCol="features",
        labelCol="Churn",
        numTrees=100,
        maxDepth=5
    ),

    "Gradient Boosting": GBTClassifier(
        featuresCol="features",
        labelCol="Churn",
        maxIter=50
    ),

    "Linear SVM": LinearSVC(
        featuresCol="features",
        labelCol="Churn"
    )
}

In [0]:
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator
)

auc_eval = BinaryClassificationEvaluator(
    labelCol="Churn",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

acc_eval = MulticlassClassificationEvaluator(
    labelCol="Churn",
    predictionCol="prediction",
    metricName="accuracy"
)

recall_eval = MulticlassClassificationEvaluator(
    labelCol="Churn",
    predictionCol="prediction",
    metricName="recallByLabel"
)

results = {}

for name, model in models.items():

    fitted = model.fit(train_data)
    preds = fitted.transform(val_data)

    results[name] = {
        "AUC": auc_eval.evaluate(preds),
        "Accuracy": acc_eval.evaluate(preds),
        "Recall": recall_eval.evaluate(preds)
    }


results_df = pd.DataFrame(results).T.sort_values("AUC", ascending=False)
display(results_df)

In [0]:
best_model_name = results_df.index[0]
print("Best model:", best_model_name)

In [0]:
train_full = df_train_processed

best_model = models[best_model_name].fit(train_full)

final_preds = best_model.transform(df_test_processed)

In [0]:
from sklearn.metrics import classification_report

pdf = final_preds.select("Churn", "prediction").toPandas()

print(classification_report(pdf["Churn"], pdf["prediction"]))

In [0]:
import mlflow
import mlflow.spark
import os
os.environ["MLFLOW_DFS_TMP"] = "/Volumes/workspace/default/mlflow_tmp"

with mlflow.start_run(run_name=best_model_name):

    mlflow.log_metric("Test_AUC", auc_eval.evaluate(final_preds))
    mlflow.log_metric("Test_Accuracy", acc_eval.evaluate(final_preds))
    mlflow.log_metric("Test_Recall", recall_eval.evaluate(final_preds))

    mlflow.spark.log_model(best_model, "model")

print("Logged to MLflow üöÄ")