In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, isnan, when, count
import pandas as pd
import numpy as np

In [2]:
import findspark
findspark.init()

In [3]:
### a) Data Preparation
## 1. Create Spark session & Load dataset

spark = SparkSession.builder.appName("CustomerChurn").getOrCreate()

data = spark.read.csv("Data/WA_Fn-UseC_-Telco-Customer-Churn.csv", header=True, inferSchema=True)

data.show(10)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

In [4]:
## 2. EDA
# Basic info
data.printSchema()
data.describe().show()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)

+-------+----------+------+------------------+-------+----------+----------------

In [5]:
# Churn distribution
data.groupBy("Churn").count().orderBy("Churn").show()

+-----+-----+
|Churn|count|
+-----+-----+
|   No| 5174|
|  Yes| 1869|
+-----+-----+



In [6]:
## 3. Look at missing values and outliers
# Look at missing values
for col in data.columns:
    data.select(col).filter(data[col].isNull()).show() # Empty set, no missing value to handle

+----------+
|customerID|
+----------+
+----------+

+------+
|gender|
+------+
+------+

+-------------+
|SeniorCitizen|
+-------------+
+-------------+

+-------+
|Partner|
+-------+
+-------+

+----------+
|Dependents|
+----------+
+----------+

+------+
|tenure|
+------+
+------+

+------------+
|PhoneService|
+------------+
+------------+

+-------------+
|MultipleLines|
+-------------+
+-------------+

+---------------+
|InternetService|
+---------------+
+---------------+

+--------------+
|OnlineSecurity|
+--------------+
+--------------+

+------------+
|OnlineBackup|
+------------+
+------------+

+----------------+
|DeviceProtection|
+----------------+
+----------------+

+-----------+
|TechSupport|
+-----------+
+-----------+

+-----------+
|StreamingTV|
+-----------+
+-----------+

+---------------+
|StreamingMovies|
+---------------+
+---------------+

+--------+
|Contract|
+--------+
+--------+

+----------------+
|PaperlessBilling|
+----------------+
+----------------+


In [8]:
# According to the dataset structure, Churn should be only "Yes" or "No", otherwise will be outliers
data.filter((data["Churn"] != "Yes" ) & (data["Churn"] != "No")).show() # Empty set, no outlier to handle

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-

In [10]:
## 4. Convert step (using list way)

label_indexer = StringIndexer(inputCol="Churn", outputCol="label")

# Define categorical and numerical columns
categorical_cols = ['gender','Partner','Dependents','PhoneService','MultipleLines',
                    'InternetService','OnlineSecurity','OnlineBackup','DeviceProtection',
                    'TechSupport','StreamingTV','StreamingMovies','Contract','PaperlessBilling',
                    'PaymentMethod']

numerical_cols = ['SeniorCitizen','tenure','MonthlyCharges','TotalCharges']

# 2. Index categorical columns
indexers = [StringIndexer(inputCol=col, 
                          outputCol=col+"_index", 
                          handleInvalid="keep") for col in categorical_cols]

encoders = [OneHotEncoder(inputCol=col+"_index", 
                          outputCol=col+"_vec") for col in categorical_cols]


In [11]:
# Give feature a label(numerical):
for col in categorical_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col+"_label")
    data = indexer.fit(data).transform(data)
data.show(10)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+------------+-------------+----------------+------------------+-------------------+---------------------+--------------------+------------------+----------------------+-----------------+-----------------+---------------------+--------------+----------------------+-------------------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|gender_label|Partner_label|Dependents_label|PhoneService_label|MultipleLines_label|InternetService_label|OnlineSecurity_label|OnlineBackup_label|DeviceProtection

In [12]:
## 5. Split step
train_data, test_data = data.randomSplit([0.8, 0.2], seed=666)

In [13]:
### b) Feature Engineering
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

## 1. New label features
# Already get new _label features in above step

## 2. Select relevant features based on domain knowledge
key_cols=['Partner','Dependents','PhoneService','MultipleLines',
                    'InternetService','OnlineSecurity','OnlineBackup','DeviceProtection',
                    'TechSupport','StreamingTV','StreamingMovies','Contract','PaperlessBilling',
                    'PaymentMethod']

key_numerical_cols=['SeniorCitizen','tenure','MonthlyCharges']

## 3. Feature vector
assembler_inputs = [col + "_label" for col in key_cols] + key_numerical_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")


In [18]:
data_vec = assembler.transform(data)
data_vec.select("features").show(n=20, truncate=False)


+------------------------------------------------------------------------------+
|features                                                                      |
+------------------------------------------------------------------------------+
|(17,[0,2,3,4,6,15,16],[1.0,1.0,2.0,1.0,1.0,1.0,29.85])                        |
|(17,[4,5,7,11,12,13,15,16],[1.0,1.0,1.0,2.0,1.0,1.0,34.0,56.95])              |
|(17,[4,5,6,13,15,16],[1.0,1.0,1.0,1.0,2.0,53.85])                             |
|[0.0,0.0,1.0,2.0,1.0,1.0,0.0,1.0,1.0,0.0,0.0,2.0,1.0,2.0,0.0,45.0,42.3]       |
|(17,[15,16],[2.0,70.7])                                                       |
|(17,[3,7,9,10,15,16],[1.0,1.0,1.0,1.0,8.0,99.65])                             |
|(17,[1,3,6,9,13,15,16],[1.0,1.0,1.0,1.0,3.0,22.0,89.1])                       |
|(17,[2,3,4,5,12,13,15,16],[1.0,2.0,1.0,1.0,1.0,1.0,10.0,29.75])               |
|(17,[0,3,7,8,9,10,15,16],[1.0,1.0,1.0,1.0,1.0,1.0,28.0,104.8])                |
|(17,[1,4,5,6,11,12,13,15,16

The full feature vector has 17 dimensions.

Index 0 has value 1.0, index 2 has value 1.0, ..., index 15 = 29.85, and 16 = 56.95.

All other positions (e.g., 1, 5, 7...) are zero.

In [19]:
## 4. Normalize rating
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_data")
scaler_model = scaler.fit(data_vec)
data_scaled = scaler_model.transform(data_vec)


In [21]:
data_scaled.select("customerID", "scaled_data").show(n = 10, truncate=False)

+----------+---------------------------------------------------------------------------------------------------------------+
|customerID|scaled_data                                                                                                    |
+----------+---------------------------------------------------------------------------------------------------------------+
|7590-VHVEG|(17,[0,2,3,4,6,15,16],[1.0,1.0,1.0,0.5,0.5,0.013888888888888888,0.11542288557213931])                          |
|5575-GNVDE|(17,[4,5,7,11,12,13,15,16],[0.5,0.5,0.5,1.0,1.0,0.3333333333333333,0.4722222222222222,0.3850746268656717])     |
|3668-QPYBK|(17,[4,5,6,13,15,16],[0.5,0.5,0.5,0.3333333333333333,0.027777777777777776,0.35422885572139307])                |
|7795-CFOCW|[0.0,0.0,1.0,1.0,0.5,0.5,0.0,0.5,0.5,0.0,0.0,1.0,1.0,0.6666666666666666,0.0,0.625,0.23930348258706463]         |
|9237-HQITU|(17,[15,16],[0.027777777777777776,0.5218905472636816])                                                         |


In [23]:
## 5. PCA
from pyspark.ml.feature import PCA

pca = PCA(k=3, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(data_vec)
data_pca = pca_model.transform(data_vec)
data_pca.select("customerID", "pca_features").show(10, truncate=False)

+----------+-----------------------------------------------------------+
|customerID|pca_features                                               |
+----------+-----------------------------------------------------------+
|7590-VHVEG|[-27.406691178016334,11.693790868578565,1.1969727088506252]|
|5575-GNVDE|[-65.92985815796929,-6.727223526642739,2.707176250522056]  |
|3668-QPYBK|[-49.520899117989934,20.940307086484992,3.1083642143072026]|
|7795-CFOCW|[-57.35010884651055,-22.929783459084042,1.7980282699980046]|
|9237-HQITU|[-64.80325489013778,28.14919500187816,2.824674190910207]   |
|9305-CDSKC|[-93.52016716553062,34.94429258733854,4.8852894966922795]  |
|1452-KIOVK|[-89.93973205142214,17.762948723375242,4.04167982216013]   |
|6713-OKOMC|[-31.133151982310405,3.494643572100136,1.0900020120929679] |
|7892-POOKP|[-106.67147667432033,19.014496930306297,4.950745626065711] |
|6388-TABGU|[-77.10861552968844,-32.41372180314005,2.061728739797676]  |
+----------+---------------------------------------

In [26]:
### c) Model Implementation & d) Evaluation and Discussion
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression

def evaluate_model(model_name, model, train_data, test_data, param_grid):
    print(f"\n{'='*20} {model_name} {'='*20}")
    pipeline = Pipeline(stages=indexers + encoders + [label_indexer, assembler, model])

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

    cv = CrossValidator(estimator=pipeline,
                        estimatorParamMaps=param_grid,
                        evaluator=evaluator,
                        numFolds=3)

    cv_model = cv.fit(train_data)
    predictions = cv_model.transform(test_data)

    # Evaluation metrics
    for metric in ["accuracy", "weightedPrecision", "weightedRecall", "f1"]:
        evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName=metric)
        score = evaluator.evaluate(predictions)
        print(f"{metric.capitalize()}: {score:.4f}")

    # Show predictions
    predictions.select("customerID", "label", "prediction", "probability").show(5, truncate=False)


# ========== 1. Decision Tree ==========
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dt_param_grid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [3, 5, 10]) \
    .addGrid(dt.minInstancesPerNode, [1, 5]) \
    .build()

evaluate_model("Decision Tree", dt, train_data, test_data, dt_param_grid)


# ========== 2. Random Forest ==========
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rf_param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

evaluate_model("Random Forest", rf, train_data, test_data, rf_param_grid)


# ========== 3. Logistic Regression ==========
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
lr_param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

evaluate_model("Logistic Regression", lr, train_data, test_data, lr_param_grid)




Accuracy: 0.8025
Weightedprecision: 0.7979
Weightedrecall: 0.8025
F1: 0.7998
+----------+-----+----------+----------------------------------------+
|customerID|label|prediction|probability                             |
+----------+-----+----------+----------------------------------------+
|0023-XUOPT|1.0  |1.0       |[0.36261980830670926,0.6373801916932907]|
|0042-RLHYP|0.0  |0.0       |[0.9360857483128225,0.06391425168717745]|
|0048-PIHNL|0.0  |0.0       |[0.9360857483128225,0.06391425168717745]|
|0057-QBUQH|0.0  |0.0       |[0.9360857483128225,0.06391425168717745]|
|0058-EVZWM|0.0  |0.0       |[0.6818181818181818,0.3181818181818182] |
+----------+-----+----------+----------------------------------------+
only showing top 5 rows


Accuracy: 0.8105
Weightedprecision: 0.8007
Weightedrecall: 0.8105
F1: 0.8023
+----------+-----+----------+-----------------------------------------+
|customerID|label|prediction|probability                              |
+----------+-----+----------+-------

In [None]:
###### I think LR model is the best one because Accuracy, Weightedprecision, Weightedrecall and F1 are all the highest, and it use shortest time.
###### 'OnlineSecurity','OnlineBackup','DeviceProtection','TechSupport'is the most affecting features.
###### Limit: Some method may only useful in thin matrix
###### Suggestions: Try implicitPrefs=True if having data like clicks or watch counts. 
###### Use MAP or NDCG for ranked metrics (can be implemented manually).
###### Segment by user type (new vs. frequent) for deeper analysis.



In [None]:
# Stop Spark session
spark.stop()