In [None]:

from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import pandas as pd



In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("ChurnModelTraining") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

In [3]:
df = spark.read.parquet("hdfs://namenode:8020/user/telco/cleaned/telco_cleaned.parquet")

In [45]:
df.printSchema()

root
 |-- customerid: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- seniorcitizen: string (nullable = true)
 |-- partner: string (nullable = true)
 |-- dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- phoneservice: string (nullable = true)
 |-- multiplelines: string (nullable = true)
 |-- internetservice: string (nullable = true)
 |-- onlinesecurity: string (nullable = true)
 |-- onlinebackup: string (nullable = true)
 |-- deviceprotection: string (nullable = true)
 |-- techsupport: string (nullable = true)
 |-- streamingtv: string (nullable = true)
 |-- streamingmovies: string (nullable = true)
 |-- contract: string (nullable = true)
 |-- paperlessbilling: string (nullable = true)
 |-- paymentmethod: string (nullable = true)
 |-- monthlycharges: double (nullable = true)
 |-- totalcharges: double (nullable = true)
 |-- churn: integer (nullable = true)



In [46]:
df_pandas = df.toPandas()

In [47]:
df_pandas[['churn']]

Unnamed: 0,churn
0,0
1,0
2,1
3,0
4,1
...,...
7038,0
7039,0
7040,0
7041,1


In [48]:
df_pandas.corr()

Unnamed: 0,tenure,monthlycharges,totalcharges,churn
tenure,1.0,0.2479,0.828025,-0.352229
monthlycharges,0.2479,1.0,0.651139,0.193356
totalcharges,0.828025,0.651139,1.0,-0.198342
churn,-0.352229,0.193356,-0.198342,1.0


In [49]:
df_pandas.isnull().sum()

customerid          0
gender              0
seniorcitizen       0
partner             0
dependents          0
tenure              0
phoneservice        0
multiplelines       0
internetservice     0
onlinesecurity      0
onlinebackup        0
deviceprotection    0
techsupport         0
streamingtv         0
streamingmovies     0
contract            0
paperlessbilling    0
paymentmethod       0
monthlycharges      0
totalcharges        0
churn               0
dtype: int64

In [50]:
# df = df.withColumn("churn", when(col("churn") == "Yes", 1).otherwise(0))

In [51]:
df_pandas = df.toPandas()

In [56]:
df_pandas["churn"].value_counts()

0    5174
1    1869
Name: churn, dtype: int64

## We Want to detect the outliers

In [57]:
def detect_outliers(df, column):
    quantiles = df.approxQuantile(column, [0.25, 0.75], 0.05)
    Q1 = quantiles[0]
    Q3 = quantiles[1]
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    outliers = df.filter((col(column) < lower_bound) | (col(column) > upper_bound)) 
    print(f"Number of outliers in {column}: {outliers.count()}")
    return lower_bound, upper_bound


In [58]:
t_lower_bound, t_upper_bound = detect_outliers(df , "totalcharges")

Number of outliers in totalcharges: 0


In [59]:
detect_outliers(df , "monthlycharges")

Number of outliers in monthlycharges: 0


(-31.250000000000007, 157.15)

In [60]:
detect_outliers(df , "tenure")

Number of outliers in tenure: 0


(-55.5, 116.5)

In [61]:
def cap_outliers(df, col_name, lower_bound, upper_bound):
    return df.withColumn(
        col_name,
        when(col(col_name) < lower_bound, lower_bound)
        .when(col(col_name) > upper_bound, upper_bound)
        .otherwise(col(col_name))
    )

In [62]:
df = cap_outliers(df, "totalcharges", t_lower_bound, t_upper_bound)

In [63]:
detect_outliers(df , "totalcharges")

Number of outliers in totalcharges: 0


(-3927.1499999999996, 7662.049999999999)

## Outliers removed (winsorizing)

## Split the data

In [64]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [65]:
trainCols = train_data.dtypes
train_data.dtypes

[('customerid', 'string'),
 ('gender', 'string'),
 ('seniorcitizen', 'string'),
 ('partner', 'string'),
 ('dependents', 'string'),
 ('tenure', 'int'),
 ('phoneservice', 'string'),
 ('multiplelines', 'string'),
 ('internetservice', 'string'),
 ('onlinesecurity', 'string'),
 ('onlinebackup', 'string'),
 ('deviceprotection', 'string'),
 ('techsupport', 'string'),
 ('streamingtv', 'string'),
 ('streamingmovies', 'string'),
 ('contract', 'string'),
 ('paperlessbilling', 'string'),
 ('paymentmethod', 'string'),
 ('monthlycharges', 'double'),
 ('totalcharges', 'double'),
 ('churn', 'int')]

In [66]:
stringCols = [f for (f , v) in trainCols if v == "string" and f not in  ["customerid" , "streamingmovies", "deviceprotection", "streamingtv", "paymentmethod"]]

In [67]:
numericCols = [f for (f, v) in trainCols if ((v == "double") & (f != "churn"))]

In [68]:
strIndexCols = [col+"_index" for col in stringCols]
strIndexCols

['gender_index',
 'seniorcitizen_index',
 'partner_index',
 'dependents_index',
 'phoneservice_index',
 'multiplelines_index',
 'internetservice_index',
 'onlinesecurity_index',
 'onlinebackup_index',
 'techsupport_index',
 'contract_index',
 'paperlessbilling_index']

In [69]:
oheCols = [col+"_ohe" for col in stringCols]
oheCols

['gender_ohe',
 'seniorcitizen_ohe',
 'partner_ohe',
 'dependents_ohe',
 'phoneservice_ohe',
 'multiplelines_ohe',
 'internetservice_ohe',
 'onlinesecurity_ohe',
 'onlinebackup_ohe',
 'techsupport_ohe',
 'contract_ohe',
 'paperlessbilling_ohe']

In [70]:
allDataCols = oheCols + numericCols 

In [71]:
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid='keep') for col in stringCols]

ohe = OneHotEncoder(inputCols=strIndexCols, outputCols=oheCols)

assembler = VectorAssembler(inputCols=allDataCols, outputCol="features_unscaled")

scaler = StandardScaler(inputCol="features_unscaled", outputCol="features")

lr = LogisticRegression(featuresCol="features" , labelCol="churn",predictionCol="churnPrediction")

In [72]:
pipeline = Pipeline(stages = indexers +  [ohe ,  assembler  , scaler , lr])

pipeline_model = pipeline.fit(train_data)

In [73]:
predictions = pipeline_model.transform(test_data)

In [74]:
evaluator = BinaryClassificationEvaluator(
    labelCol="churn", rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
print(f"AUC-ROC: {auc:.4f}")

AUC-ROC: 0.8322


In [75]:
precision_evaluator = MulticlassClassificationEvaluator(
    labelCol="churn", predictionCol="churnPrediction", metricName="precisionByLabel"
)
recall_evaluator = MulticlassClassificationEvaluator(
    labelCol="churn", predictionCol="churnPrediction", metricName="recallByLabel"
)
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="churn", predictionCol="churnPrediction", metricName="f1"
)

precision = precision_evaluator.evaluate(predictions)
recall = recall_evaluator.evaluate(predictions)
f1 = f1_evaluator.evaluate(predictions)

print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1:.4f}")

Precision: 0.8381
Recall:    0.9068
F1 Score:  0.7972


In [76]:
lr_model = pipeline_model.stages[-1]  # last stage is RandomForestClassifier
importances = lr_model.coefficients.toArray()
features = pipeline_model.stages[-3].getInputCols()  # VectorAssembler input features

# Zip and sort
importance = sorted(zip(features, importances), key=lambda x: x[1], reverse=True)

# Print feature importance
for feature, score in importance:
    print(f"{feature:30s}: {score:.4f}")

totalcharges                  : 0.0244
techsupport_ohe               : 0.0117
monthlycharges                : 0.0117
internetservice_ohe           : -0.0166
dependents_ohe                : -0.0299
paperlessbilling_ohe          : -0.0337
phoneservice_ohe              : -0.0415
seniorcitizen_ohe             : -0.0532
gender_ohe                    : -0.0618
multiplelines_ohe             : -0.0736
contract_ohe                  : -0.0887
onlinesecurity_ohe            : -0.1089
partner_ohe                   : -0.1266
onlinebackup_ohe              : -0.2068


## Try a new pipeline with RandomForest

In [77]:
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid='keep') for col in stringCols]

ohe = OneHotEncoder(inputCols=strIndexCols, outputCols=oheCols)

assembler = VectorAssembler(inputCols=allDataCols, outputCol="features_unscaled")

scaler = StandardScaler(inputCol="features_unscaled", outputCol="features")

rf = RandomForestClassifier(featuresCol="features" , labelCol="churn",predictionCol="churnPrediction")

In [78]:
pipeline = Pipeline(stages = indexers +  [ohe ,  assembler  , scaler , rf])

pipeline_model = pipeline.fit(train_data)

In [79]:
predictions = pipeline_model.transform(test_data)

In [80]:
evaluator = BinaryClassificationEvaluator(
    labelCol="churn", rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
print(f"AUC-ROC: {auc:.4f}")

AUC-ROC: 0.8299


In [81]:
precision_evaluator = MulticlassClassificationEvaluator(
    labelCol="churn", predictionCol="churnPrediction", metricName="precisionByLabel"
)
recall_evaluator = MulticlassClassificationEvaluator(
    labelCol="churn", predictionCol="churnPrediction", metricName="recallByLabel"
)
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="churn", predictionCol="churnPrediction", metricName="f1"
)

precision = precision_evaluator.evaluate(predictions)
recall = recall_evaluator.evaluate(predictions)
f1 = f1_evaluator.evaluate(predictions)

print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1:.4f}")

Precision: 0.7911
Recall:    0.9508
F1 Score:  0.7525


In [82]:
rf_model = pipeline_model.stages[-1]  # last stage is RandomForestClassifier
importances = rf_model.featureImportances.toArray()
features = pipeline_model.stages[-3].getInputCols()  # VectorAssembler input features

# Zip and sort
importance = sorted(zip(features, importances), key=lambda x: x[1], reverse=True)

# Print feature importance
for feature, score in importance:
    print(f"{feature:30s}: {score:.4f}")


totalcharges                  : 0.0926
partner_ohe                   : 0.0044
multiplelines_ohe             : 0.0038
phoneservice_ohe              : 0.0027
onlinesecurity_ohe            : 0.0025
dependents_ohe                : 0.0021
techsupport_ohe               : 0.0014
internetservice_ohe           : 0.0011
gender_ohe                    : 0.0008
seniorcitizen_ohe             : 0.0005
monthlycharges                : 0.0001
paperlessbilling_ohe          : 0.0000
onlinebackup_ohe              : 0.0000
contract_ohe                  : 0.0000


In [83]:
predictions.select("customerid", "churn", "churnPrediction").show(5)

+----------+-----+---------------+
|customerid|churn|churnPrediction|
+----------+-----+---------------+
|0004-TLHLJ|    1|            1.0|
|0013-SMEOE|    0|            0.0|
|0015-UOCOJ|    0|            0.0|
|0019-EFAEP|    0|            0.0|
|0023-HGHWL|    1|            0.0|
+----------+-----+---------------+
only showing top 5 rows



## The `logisiticRegression` model is more balanced than the `RandomForest`

In [4]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sqlalchemy import create_engine


In [5]:
df= df.toPandas()

In [6]:
df.head(10)

Unnamed: 0,customerid,gender,seniorcitizen,partner,dependents,tenure,phoneservice,multiplelines,internetservice,onlinesecurity,...,deviceprotection,techsupport,streamingtv,streamingmovies,contract,paperlessbilling,paymentmethod,monthlycharges,totalcharges,churn
0,7590-VHVEG,Female,No,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,0
1,5575-GNVDE,Male,No,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,0
2,3668-QPYBK,Male,No,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,1
3,7795-CFOCW,Male,No,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,0
4,9237-HQITU,Female,No,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,1
5,9305-CDSKC,Female,No,No,No,8,Yes,Yes,Fiber optic,No,...,Yes,No,Yes,Yes,Month-to-month,Yes,Electronic check,99.65,820.5,1
6,1452-KIOVK,Male,No,No,Yes,22,Yes,Yes,Fiber optic,No,...,No,No,Yes,No,Month-to-month,Yes,Credit card (automatic),89.1,1949.4,0
7,6713-OKOMC,Female,No,No,No,10,No,No phone service,DSL,Yes,...,No,No,No,No,Month-to-month,No,Mailed check,29.75,301.9,0
8,7892-POOKP,Female,No,Yes,No,28,Yes,Yes,Fiber optic,No,...,Yes,Yes,Yes,Yes,Month-to-month,Yes,Electronic check,104.8,3046.05,1
9,6388-TABGU,Male,No,No,Yes,62,Yes,No,DSL,Yes,...,No,No,No,No,One year,No,Bank transfer (automatic),56.15,3487.95,0


In [7]:
def detect_outliers(df, column):
    q1 = df[column].quantile(0.25)
    q3 = df[column].quantile(0.75)
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr
    return lower_bound, upper_bound

def cap_outliers(df, col_name, lower_bound, upper_bound):
    df[col_name] = np.where(df[col_name] < lower_bound, lower_bound, df[col_name])
    df[col_name] = np.where(df[col_name] > upper_bound, upper_bound, df[col_name])
    return df

In [8]:
lower, upper = detect_outliers(df, "totalcharges")
df = cap_outliers(df, "totalcharges", lower, upper)

In [9]:
string_cols = [col for col in df.columns if df[col].dtype == "object"]
numeric_cols = [col for col in df.select_dtypes(include=np.number).columns if col != "churn"]
features = string_cols + numeric_cols

In [10]:
X = df[features]
y = df["churn"]

In [11]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [13]:
preprocessor = ColumnTransformer(
    [
        ("ohe", OneHotEncoder(handle_unknown="ignore", sparse=False), string_cols),
        ("sc", StandardScaler(), numeric_cols),
    ]
)

In [14]:
clf = Pipeline(
    [
        ("prep", preprocessor),
        ("rf", RandomForestClassifier(random_state= 42 , class_weight= "balanced"))
    ]
)

In [16]:
model = Pipeline([
    ("prep", preprocessor),
    ("rf", RandomForestClassifier(random_state=42, class_weight="balanced"))
])

In [17]:
model.fit(X_train, y_train)

Pipeline(steps=[('prep',
                 ColumnTransformer(transformers=[('ohe',
                                                  OneHotEncoder(handle_unknown='ignore',
                                                                sparse=False),
                                                  ['customerid', 'gender',
                                                   'seniorcitizen', 'partner',
                                                   'dependents', 'phoneservice',
                                                   'multiplelines',
                                                   'internetservice',
                                                   'onlinesecurity',
                                                   'onlinebackup',
                                                   'deviceprotection',
                                                   'techsupport', 'streamingtv',
                                                   'streamingmovies',
                   

In [18]:
y_pred = model.predict(X_test)

In [20]:
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
print("\n✅ Accuracy:", round(accuracy_score(y_test, y_pred), 4))
print("\n📊 Classification Report:\n", classification_report(y_test, y_pred, digits=4))
print("\n📉 Confusion Matrix:\n", confusion_matrix(y_test, y_pred))


✅ Accuracy: 0.7991

📊 Classification Report:
               precision    recall  f1-score   support

           0     0.8329    0.9093    0.8694      1036
           1     0.6619    0.4933    0.5653       373

    accuracy                         0.7991      1409
   macro avg     0.7474    0.7013    0.7173      1409
weighted avg     0.7876    0.7991    0.7889      1409


📉 Confusion Matrix:
 [[942  94]
 [189 184]]
