In [25]:
# Install PySpark (only once)
!pip install -q pyspark



In [26]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BankingCustomerChurn") \
    .getOrCreate()

spark


In [None]:
from google.colab import files
uploaded = files.upload()  # click "Choose Files" and upload your CSV, e.g., Churn_Modelling.csv

# After upload, list files to confirm
import os
print("Files in /content/:")
print(os.listdir("/content"))


In [None]:
# Change filename if different
csv_path = "/content/Churn_Modelling.csv"

df = spark.read.option("header", "true") \
               .option("inferSchema", "true") \
               .csv(csv_path)

# Basic checks
print("Number of rows:", df.count())
print("Columns:", df.columns)
df.printSchema()
df.show(5, truncate=False)


In [None]:
from pyspark.sql.functions import col

# Drop columns that are identifiers or not useful
cols_to_drop = ["RowNumber", "CustomerId", "Surname"]
df_clean = df.drop(*cols_to_drop)

# Verify
print("Columns after dropping:", df_clean.columns)
df_clean.groupBy("Exited").count().show()


In [None]:
# Convert cleaned DataFrame to RDD for RDD-style ops
rdd = df_clean.rdd

# 1. Map + reduceByKey: count customers per Geography
geo_rdd = rdd.map(lambda row: (row.Geography, 1))
geo_counts = geo_rdd.reduceByKey(lambda a, b: a + b).collect()
print("Customer count by Geography (RDD):", geo_counts)

# 2. Filter: active members count
active_count = rdd.filter(lambda row: row.IsActiveMember == 1).count()
print("Active Members (RDD):", active_count)

# 3. Churn distribution
churn_dist = rdd.map(lambda row: ("Churned" if row.Exited == 1 else "Not Churned", 1)) \
                .reduceByKey(lambda a, b: a + b).collect()
print("Churn Distribution (RDD):", churn_dist)

# 4. High balance customers (RDD)
high_balance_rdd = rdd.filter(lambda row: (row.Balance is not None) and (row.Balance > 100000))
print("Customers with balance > 100000 (RDD):", high_balance_rdd.count())


In [None]:
# Group by Geography
print("Customer count by Geography (DataFrame):")
df_clean.groupBy("Geography").count().show()

# Average balance by Exited
print("Average balance by churn status:")
df_clean.groupBy("Exited").avg("Balance").show()

# Average balance by Gender
print("Average balance by gender:")
df_clean.groupBy("Gender").agg({"Balance": "avg"}).show()

# Top 5 by CreditScore
print("Top 5 highest credit score customers:")
df_clean.orderBy(df_clean.CreditScore.desc()).show(5)


In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Columns (kept same as your code)
categorical_cols = ["Geography", "Gender"]
numeric_cols = ["CreditScore", "Age", "Tenure", "Balance",
                "NumOfProducts", "HasCrCard", "IsActiveMember", "EstimatedSalary"]
label_col = "Exited"

# Indexers (handleInvalid="keep" to be safe)
indexers = [
    StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid="keep")
    for col_name in categorical_cols
]

# OneHotEncoders
encoders = [
    OneHotEncoder(inputCols=[col_name + "_index"], outputCols=[col_name + "_ohe"])
    for col_name in categorical_cols
]

# Assembler
assembler_inputs = numeric_cols + [c + "_ohe" for c in categorical_cols]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

print("Assembler inputs:", assembler_inputs)



In [None]:
train_df, test_df = df_clean.randomSplit([0.8, 0.2], seed=42)
print("Train rows:", train_df.count())
print("Test rows:", test_df.count())


In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

lr = LogisticRegression(featuresCol="features", labelCol=label_col, maxIter=20)

stages = []
stages.extend(indexers)
stages.extend(encoders)
stages.append(assembler)
stages.append(lr)

pipeline = Pipeline(stages=stages)
lr_model = pipeline.fit(train_df)

print("Logistic Regression training done ✅")

predictions = lr_model.transform(test_df)
predictions.select("features", "Exited", "probability", "prediction").show(5, truncate=False)


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import when

# AUC
evaluator_auc = BinaryClassificationEvaluator(labelCol=label_col, rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator_auc.evaluate(predictions)
print("LR AUC:", auc)

# Accuracy (simple compute)
predictions = predictions.withColumn("correct", when(predictions[label_col] == predictions["prediction"], 1).otherwise(0))
accuracy = predictions.agg({"correct": "avg"}).collect()[0][0]
print("LR Accuracy:", accuracy)


In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol=label_col, numTrees=100, maxDepth=6, seed=42)

rf_stages = []
rf_stages.extend(indexers)
rf_stages.extend(encoders)
rf_stages.append(assembler)
rf_stages.append(rf)

rf_pipeline = Pipeline(stages=rf_stages)
rf_model = rf_pipeline.fit(train_df)

print("Random Forest model training done ✅")

rf_predictions = rf_model.transform(test_df)
rf_predictions.select("Exited", "probability", "prediction").show(5, truncate=False)


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import when

rf_evaluator_auc = BinaryClassificationEvaluator(labelCol=label_col, rawPredictionCol="rawPrediction", metricName="areaUnderROC")
rf_auc = rf_evaluator_auc.evaluate(rf_predictions)
print("RF AUC:", rf_auc)

rf_predictions = rf_predictions.withColumn("correct", when(rf_predictions[label_col] == rf_predictions["prediction"], 1).otherwise(0))
rf_accuracy = rf_predictions.agg({"correct": "avg"}).collect()[0][0]
print("RF Accuracy:", rf_accuracy)


In [None]:
# Get RandomForestClassificationModel (last stage)
rf_stage_model = rf_model.stages[-1]
importances = rf_stage_model.featureImportances

# Feature names in same order as assembler_inputs
feature_names = assembler_inputs

feature_importance = list(zip(feature_names, importances.toArray()))
feature_importance_sorted = sorted(feature_importance, key=lambda x: x[1], reverse=True)

print("Feature Importances (sorted):")
for name, score in feature_importance_sorted:
    print(f"{name}: {score}")


In [None]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(featuresCol="features", labelCol=label_col, maxIter=50, maxDepth=5, stepSize=0.1, seed=42)

gbt_stages = []
gbt_stages.extend(indexers)
gbt_stages.extend(encoders)
gbt_stages.append(assembler)
gbt_stages.append(gbt)

gbt_pipeline = Pipeline(stages=gbt_stages)
gbt_model = gbt_pipeline.fit(train_df)

print("GBT model training done ✅")

gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select("Exited", "probability", "prediction").show(5, truncate=False)


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import when

gbt_evaluator_auc = BinaryClassificationEvaluator(labelCol=label_col, rawPredictionCol="rawPrediction", metricName="areaUnderROC")
gbt_auc = gbt_evaluator_auc.evaluate(gbt_predictions)
print("GBT AUC:", gbt_auc)

gbt_predictions = gbt_predictions.withColumn("correct", when(gbt_predictions[label_col] == gbt_predictions["prediction"], 1).otherwise(0))
gbt_accuracy = gbt_predictions.agg({"correct": "avg"}).collect()[0][0]
print("GBT Accuracy:", gbt_accuracy)


In [None]:
# Save GBT predictions as parquet
gbt_predictions.write.mode("overwrite").parquet("/content/churn_predictions_parquet")
print("Predictions saved successfully at /content/churn_predictions_parquet")

# Save the trained GBT pipeline model
gbt_model.write().overwrite().save("/content/gbt_churn_model")
print("GBT Model saved successfully at /content/gbt_churn_model")


In [None]:
import pandas as pd

# Use lr_predictions OR rf_predictions OR gbt_predictions
pred_df = predictions.select("rawPrediction", "Exited").toPandas()

# Extract probability of class 1 (churn)
pred_df["score"] = pred_df["rawPrediction"].apply(lambda x: float(x[1]))
y_true = pred_df["Exited"]
y_scores = pred_df["score"]

In [None]:
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt

fpr, tpr, thresholds = roc_curve(y_true, y_scores)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(7,6))
plt.plot(fpr, tpr, lw=2, label='ROC curve (AUC = %0.4f)' % roc_auc)
plt.plot([0, 1], [0, 1], linestyle='--')

plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()

In [None]:
import pandas as pd
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt

# Convert Spark predictions to Pandas
gbt_pd = gbt_predictions.select("rawPrediction", "Exited").toPandas()

# Extract positive class probability (GBT gives a vector)
gbt_pd["score"] = gbt_pd["rawPrediction"].apply(lambda x: float(x[1]))

# True labels and predicted scores
y_true = gbt_pd["Exited"]
y_scores = gbt_pd["score"]

# Compute ROC
fpr, tpr, thresholds = roc_curve(y_true, y_scores)
roc_auc = auc(fpr, tpr)

# Plot ROC curve
plt.figure(figsize=(7,6))
plt.plot(fpr, tpr, lw=2, label='GBT ROC (AUC = %0.4f)' % roc_auc)
plt.plot([0, 1], [0, 1], linestyle='--')

plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('GBT ROC Curve')
plt.legend(loc="lower right")
plt.show()

In [None]:
import pandas as pd
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt

# Convert Spark predictions to Pandas
rf_pd = rf_predictions.select("rawPrediction", "Exited").toPandas()

# Extract probability of class 1 (churn)
rf_pd["score"] = rf_pd["rawPrediction"].apply(lambda x: float(x[1]))

# True labels and score values
y_true = rf_pd["Exited"]
y_scores = rf_pd["score"]

# Compute ROC curve
fpr, tpr, thresholds = roc_curve(y_true, y_scores)
roc_auc = auc(fpr, tpr)

# Plot ROC
plt.figure(figsize=(7,6))
plt.plot(fpr, tpr, lw=2, label='Random Forest ROC (AUC = %0.4f)' % roc_auc)
plt.plot([0, 1], [0, 1], linestyle='--')

plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Random Forest ROC Curve')
plt.legend(loc="lower right")
plt.show()