In [0]:
# importing spark session
from pyspark.sql import SparkSession

# importing visualization modules
import matplotlib.pyplot as plt
import plotly.express as px

# pandas module
import pandas as pd

# pyspark SQL functions
from pyspark.sql.functions import col,when,count
from pyspark.sql.types import IntegerType,BooleanType,DateType,DoubleType

# pyspark data preprocessing modules
from pyspark.ml.feature import Imputer,StringIndexer,VectorAssembler,StandardScaler,OneHotEncoder

# pyspark data modelling and model evaluation modules
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator 

In [0]:
spark = SparkSession.builder.appName("customer_churn").getOrCreate()
# spark

In [0]:
df_raw = spark.read.option("header","true").option("inferschema","true").csv("/mnt/adls/test/PrachiSingh/Try/CustomerChurn/")

In [0]:
df_raw.printSchema()

In [0]:
df_raw.count()

In [0]:
# df_raw.columns
# Gives the list of all the column names in the Pyspark dataframe

In [0]:
len(df_raw.columns)

#### Exploratory Data Analysis
1. Distribution Analysis
2. Correlation Analysis
3. Unvariate Analysis
4. Finding Missing Values

In [0]:
display(df_raw)

In [0]:
df_raw = df_raw.withColumn("SeniorCitizen",when(df_raw.SeniorCitizen == 0,"No")
                                           .when(df_raw.SeniorCitizen == 1,"Yes"))

In [0]:
df_raw = df_raw.withColumn("TotalCharges",df_raw.TotalCharges.cast(DoubleType()))

In [0]:
df_raw.dtypes

In [0]:
# Definning some lists to store different column names with different data types
# numerical_columns contains int and double data types
numerical_columns =  [name for name,typ in df_raw.dtypes if typ=="double" or typ=="int"]
numerical_columns

In [0]:
categorical_columns =  [name for name,typ in df_raw.dtypes if typ=="string"]
categorical_columns

In [0]:
# Get all the numerical features and store them into a Pandas Dataframe
df_num = df_raw.select(numerical_columns).toPandas()
df_num.head()

In [0]:
# Creating Histograms to analyse the distribution of numerical data
fig = plt.figure(figsize = (15,10))
ax = fig.gca()
df_num.hist(ax=ax, bins=20)

In [0]:
df_num.describe()

In [0]:
# From the df_num.describe(), we can see that the maximum value of TotalCharges is far than its mean value. So, this maxm value can be considered as an outlier. So, we will remove this value from the dataset.

In [0]:
# When exploring data, the outliers are the extreme values within the dataset. That means the outlier data points vary greatly from the expected values—either being much larger or significantly smaller. For data that follows a normal distribution, the values that fall more than three standard deviations from the mean are typically considered outliers. 

In [0]:
# Correlation matrix- To understand the data and to see if we have any interesting patterns or not in the dataset.

In [0]:
df_num.corr()

In [0]:
# 1. The correlation between tenure and MonthlyCharges is 0.2479 which is very low. Thus, it is safe to say that there is no significant correlation between tenure and MonthlyCharges. 
# 2. The correlation between tenure and TotalCharges is 0.82588, which is quite a big number. Thus, it is safe to say that there is no significant correlation between tenure and TotalCharges.
# 3. The correlation between MonthlyCharges and TotalCharges is 0.0.651065, which is moderate. Thus, it is safe to say that there is no significant correlation between MonthlyCharges and TotalCharges.

In [0]:
# Let's get the unique value count per each categorical column
# This ensures to check mis-spellings and missing values, if any, in the categorical columns.

In [0]:
# Get all the categorical features 
df_cat = df_raw.select(categorical_columns).toPandas()
df_cat_py = df_raw.select(categorical_columns)
# df_cat.head()

In [0]:
for col_name in categorical_columns:
  df_cat_py.groupby(col_name).count().show()

In [0]:
for col_name in df_raw.columns:
  df_raw.select(count(when(col(col_name).isNull(),col_name)).alias(col_name)).show()

## Preprocess and Cleaning Data
######### Handling the missing Values
######### Removing the outliers

In [0]:
# Dealing with Missing values
# The filling of missing values in the columns depends on the type of column they are in.
# For categorical columns, common technique is to filling the missing value with the most frequent value used in the column, or using the classification model to predict them.
# For numerical columns, we can inject the missing value by the average value of the column.
# Using Imputer, we can fill the missing value with the average, median or mode value.

In [0]:
# Let's create the list of column names with the missing values
columns_with_missing_values = ["TotalCharges"]

In [0]:
imputer_object = Imputer(inputCols=columns_with_missing_values, outputCols=columns_with_missing_values).setStrategy("mean")
# Use imputer to fit the missing values
imputer_object = imputer_object.fit(df_raw)

In [0]:
df_new = imputer_object.transform(df_raw)

In [0]:
# Let's check the missing values count again
df_raw.select(count(when(col("TotalCharges").isNull(),"TotalCharges")).alias("TotalCharges")).show()

In [0]:
# df_raw.select("*").where(df_raw.tenure>100).show()

In [0]:
# Numerical Features:- Vector Assembling and Numerical Scaling
# Categorical Features:- String Indexing and Vector Assembling
# Vector Assembling- To apply our ML model, we need to combine all of our numerical and categorical features into vectors. For now, let's create a feature vector for our numerical columns

In [0]:
numerical_vector_assembler = VectorAssembler(inputCols = numerical_columns, outputCol = "numerical_features_vectors")
df_new = numerical_vector_assembler.transform(df_new)

In [0]:
# Let's standardize all of our numerical columns
scaler = StandardScaler(inputCol = "numerical_features_vectors", 
                        outputCol="numerical_features_scaled",
                        withStd=True,
                        withMean=True)

In [0]:
df_new = scaler.fit(df_new).transform(df_new)

In [0]:
# String Indexing -> Vector Assembling
# String Indexing: Converting all the string columns to numerical columns

In [0]:
categorical_columns_indexed = [name + "_Indexed" for name in categorical_columns]

indexer = StringIndexer(inputCols=categorical_columns,outputCols=categorical_columns_indexed)

df_new = indexer.fit(df_new).transform(df_new)

In [0]:
categorical_columns_indexed.remove("customerID_Indexed")
categorical_columns_indexed.remove("Churn_Indexed")

In [0]:
categorical_vector_assembler = VectorAssembler(inputCols=categorical_columns_indexed,outputCol="categorical_features_vectors")
df_new = categorical_vector_assembler.transform(df_new)

In [0]:
final_vector_assembler = VectorAssembler(inputCols=["categorical_features_vectors","numerical_features_scaled"],outputCol="final_feature_vector")

df_new = final_vector_assembler.transform(df_new)

In [0]:
display(df_new.select("final_feature_vector","Churn_Indexed"))

In [0]:
# 1. Train and test data splitting
# 2. Creating our model
# 3. Training our model
# 4. Make initial predictions using our model

In [0]:
train, test = df_new.randomSplit([0.7,0.3],seed=100)

train.count()

# test.count()

In [0]:
# Now let's create and train our decision tree
dt = DecisionTreeClassifier(featuresCol="final_feature_vector",labelCol="Churn_Indexed",maxDepth=3)
model = dt.fit(train)

In [0]:
predictions_test = model.transform(test)

In [0]:
# Model Evaluation
# 1. Calculating area under the ROC curve for training set
# 2. Calculating area under the ROC curve for test set
# 3. Hyper Parameter Tuning

In [0]:
# AUC measures the ability of the classifier to distinguish between positive and negative classes. It ranges from 0 to 1, where 1 represents a perfect classifier, 0.5 for random classifier.

In [0]:
evaluator = BinaryClassificationEvaluator(labelCol="Churn_Indexed")
auc_test = evaluator.evaluate(predictions_test,{evaluator.metricName:"areaUnderROC"})
auc_test

In [0]:
def evaluate_dt(model_params):
  test_accuracies=[]
  train_accuracies=[]

  for maxD in model_params:
    # train the model based on the maxD
    decision_tree = DecisionTreeClassifier(featuresCol="final_feature_vector",labelCol="Churn_Indexed",maxDepth=maxD)
    dtModel = decision_tree.fit(train)

    # Calculating test error
    predictions_test = dtModel.transform(test)
    evaluator = BinaryClassificationEvaluator(labelCol="Churn_Indexed")
    auc_test = evaluator.evaluate(predictions_test,{evaluator.metricName:"areaUnderROC"})
    # recording the accuracy
    test_accuracies.append(auc_test)

    # Calculating train error
    predictions_train = dtModel.transform(train)
    evaluator = BinaryClassificationEvaluator(labelCol="Churn_Indexed")
    auc_train = evaluator.evaluate(predictions_test,{evaluator.metricName:"areaUnderROC"})
    train_accuracies.append(auc_train)

  return(test_accuracies,train_accuracies)