In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, count, when
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline

In [2]:
spark = SparkSession.builder.appName('ChurnPrediction').getOrCreate()

In [3]:
df = spark.read.option('header', 'true').csv('../data/raw/data.csv', inferSchema=True)

In [4]:
df.show(10)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

In [5]:
#Drop Customer ID
df = df.drop('customerID')

In [6]:
num_rows = df.count()
num_columns = len(df.columns)

print("Number of rows:", num_rows)
print("Number of columns:", num_columns)

Number of rows: 7043
Number of columns: 20


In [7]:
df.printSchema()
def categorize_columns(df):
  numerical_cols = []
  categorical_cols = []

  for column, dtype in df.dtypes:
    if dtype in ['double', 'int']:
      numerical_cols.append(column)
    else:
      categorical_cols.append(column)

  return numerical_cols, categorical_cols


# Categorize columns
numerical_cols, categorical_cols = categorize_columns(df)

print("Numerical columns:", numerical_cols)
print("Categorical columns:", categorical_cols)

# Unique values on Categorical columns
for c in categorical_cols:
    unique_values = df.select(c).distinct()
    print(f"Unique values in {c}:")
    unique_values.show()

root
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)

Numerical columns: ['SeniorCitizen', 'tenure', 'MonthlyCharges']
Categorical columns: ['gender', 'Partner', 'Dependents', '

In [8]:
# Convert Total charges to numeric
df = df.withColumn("TotalCharges", col("TotalCharges").cast("double"))
df.printSchema()

# Categorize columns
numerical_cols, categorical_cols = categorize_columns(df)

print("Numerical columns:", numerical_cols)
print("Categorical columns:", categorical_cols)

root
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)

Numerical columns: ['SeniorCitizen', 'tenure', 'MonthlyCharges', 'TotalCharges']
Categorical columns: ['gender', 'Partner',

In [9]:
#Drop Duplicates
df = df.dropDuplicates()
#Drop Missing Values
df = df.na.drop()

In [10]:
# Combine numerical features into a single vector
numerical_features_df = VectorAssembler(
    inputCols=numerical_cols, 
    outputCol="features"
).transform(df)

# Scale numerical features
standardScaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaled_numerical_df = standardScaler.fit(numerical_features_df).transform(numerical_features_df)

# Convert categorical columns to indexed numeric columns
inx_categorical_col = [c + "_inx" for c in categorical_cols]
indexer = StringIndexer(inputCols=categorical_cols, outputCols=inx_categorical_col)
inx_df = indexer.fit(scaled_numerical_df).transform(scaled_numerical_df)
inx_categorical_col = inx_categorical_col[:-1]  # Exclude Churn_inx
categorical_features_df = VectorAssembler(
    inputCols=inx_categorical_col, 
    outputCol="cat_features_inx"
).transform(inx_df)

# Combine scaled numerical features and indexed categorical features
final_df = VectorAssembler(
    inputCols=["scaledFeatures", "cat_features_inx"], 
    outputCol="final_feature_vector"
).transform(categorical_features_df)
final_df.show(10)

+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+--------------------+--------------------+----------+-----------+--------------+----------------+-----------------+-------------------+------------------+----------------+--------------------+---------------+---------------+-------------------+------------+--------------------+-----------------+---------+--------------------+--------------------+
|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|            features|      scaledFeatures|gender_

In [11]:
input_df = final_df.select("final_feature_vector","Churn_inx")
input_df.show(10)

+--------------------+---------+
|final_feature_vector|Churn_inx|
+--------------------+---------+
|[0.0,2.8955433114...|      0.0|
|[0.0,2.3245911092...|      0.0|
|(19,[1,2,3,8,9,10...|      0.0|
|[0.0,1.6312920064...|      0.0|
|[0.0,1.9575504077...|      0.0|
|(19,[1,2,3,4,5,13...|      0.0|
|[0.0,2.3653734093...|      0.0|
|(19,[0,1,2,3,6,9,...|      0.0|
|(19,[1,2,3,9,10,1...|      0.0|
|(19,[1,2,3,4,10,1...|      1.0|
+--------------------+---------+
only showing top 10 rows



In [12]:
# Handle imbalance data
class_counts = input_df.groupBy("Churn_inx").count()
class_counts.show()

major_df = input_df.filter(col('Churn_inx') == 0)
minor_df = input_df.filter(col('Churn_inx') == 1)


major_count = major_df.count()
minor_count = minor_df.count()
ratio = major_count / minor_count


oversampled_minor_df = minor_df.sample(withReplacement=True, fraction=ratio)
input_data = major_df.unionAll(oversampled_minor_df)

class_counts_balanced = input_data.groupBy("Churn_inx").count()
class_counts_balanced.show()


+---------+-----+
|Churn_inx|count|
+---------+-----+
|      0.0| 5153|
|      1.0| 1857|
+---------+-----+

+---------+-----+
|Churn_inx|count|
+---------+-----+
|      0.0| 5153|
|      1.0| 5157|
+---------+-----+



In [13]:
train, test = input_data.randomSplit([0.8, 0.2], seed=79)
train.count() , test.count()

(8166, 2144)

In [14]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Logistic regression
logistic_regression_model = LogisticRegression(
    featuresCol='final_feature_vector',
    labelCol='Churn_inx',
    regParam=0.001,
    elasticNetParam=0.95,
    maxIter=50
)

trained_lr_model = logistic_regression_model.fit(train)
test_predictions = trained_lr_model.transform(test)
auc_evaluator = BinaryClassificationEvaluator(
    labelCol="Churn_inx",
    metricName="areaUnderROC"
)

auc_score = auc_evaluator.evaluate(test_predictions)
print("AUC on the test dataset: ", auc_score)


AUC on the test dataset:  0.8482987483244262


In [15]:
from pyspark.ml.classification import DecisionTreeClassifier

# Decision Tree Classifier
decision_tree = DecisionTreeClassifier(
    featuresCol='final_feature_vector', 
    labelCol='Churn_inx',
    maxDepth=5,
    minInstancesPerNode=10
)

trained_dt_model = decision_tree.fit(train)

test_predictions = trained_dt_model.transform(test)

dt_auc_evaluator = BinaryClassificationEvaluator(
    labelCol="Churn_inx",
    metricName="areaUnderROC"
)

dt_auc_score = dt_auc_evaluator.evaluate(test_predictions)
print("AUC on test data: ", auc_score)


AUC on test data:  0.8482987483244262


In [16]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Random Forest
random_forest = RandomForestClassifier(
    featuresCol='final_feature_vector', 
    labelCol='Churn_inx', 
    numTrees=20, 
    maxDepth=5, 
    minInstancesPerNode=10
)

trained_rf_model = random_forest.fit(train)
test_predictions = trained_rf_model.transform(test)

rf_auc_evaluator = BinaryClassificationEvaluator(labelCol="Churn_inx", metricName="areaUnderROC")
rf_auc_score = rf_auc_evaluator.evaluate(test_predictions)
print("AUC on test data: ", rf_auc_score)


AUC on test data:  0.8488610448618624


In [17]:
from pyspark.ml.classification import LinearSVC

# SVM
svc = LinearSVC(featuresCol='final_feature_vector', labelCol='Churn_inx')
trained_svc_model = svc.fit(train)
predictions = trained_svc_model.transform(test)

evaluator = BinaryClassificationEvaluator(labelCol="Churn_inx", metricName="areaUnderROC")
svc_auc = evaluator.evaluate(predictions)
print("AUC on test data: ", svc_auc)


AUC on test data:  0.8462349633549786


In [18]:
from pyspark.ml.classification import GBTClassifier

# Gradient Boosting Machines
gbt = GBTClassifier(featuresCol='final_feature_vector', labelCol='Churn_inx', maxIter=10)
trained_gbt_model = gbt.fit(train)
predictions = trained_gbt_model.transform(test)

evaluator = BinaryClassificationEvaluator(labelCol="Churn_inx", metricName="areaUnderROC")
gbt_auc = evaluator.evaluate(predictions)
print("AUC on test data: ", gbt_auc)


AUC on test data:  0.8641518548822327
