##### Load Data

In [1]:
import findspark

findspark.init()

import pyspark

from pyspark.sql import SparkSession
from pyspark.context import SparkContext

sc = SparkContext.getOrCreate()

spark = SparkSession.builder.config("spark.driver.memory", "32g").getOrCreate() #increasing Driver memory for huge size datasets. 

sparkDF = spark.read.format('csv').options(header='true', inferSchema='true').load("D:\\coventry\\Big Data\\CourseWork\\CustomerChurn\\TelcoCustomerChurn.csv")
sparkDF.cache()

DataFrame[customerID: string, gender: string, SeniorCitizen: int, Partner: string, Dependents: string, tenure: int, PhoneService: string, MultipleLines: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, DeviceProtection: string, TechSupport: string, StreamingTV: string, StreamingMovies: string, Contract: string, PaperlessBilling: string, PaymentMethod: string, MonthlyCharges: double, TotalCharges: string, Churn: string]

In [2]:
sparkDF.columns

['customerID',
 'gender',
 'SeniorCitizen',
 'Partner',
 'Dependents',
 'tenure',
 'PhoneService',
 'MultipleLines',
 'InternetService',
 'OnlineSecurity',
 'OnlineBackup',
 'DeviceProtection',
 'TechSupport',
 'StreamingTV',
 'StreamingMovies',
 'Contract',
 'PaperlessBilling',
 'PaymentMethod',
 'MonthlyCharges',
 'TotalCharges',
 'Churn']

In [3]:
sparkDF.count()

7043

In [5]:
sparkDF_pandas = sparkDF.toPandas()
sparkDF_pandas.head(5)
#sparkDF.show(5)

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


# Data wrangling

In [6]:
#Replacing spaces with null values in total charges column
from pyspark.sql.functions import *
dfWithEmptyReplaced = sparkDF.withColumn('TotalCharges', when(col('TotalCharges') == ' ', None).otherwise(col('TotalCharges')).cast("float"))
dfWithEmptyReplaced = dfWithEmptyReplaced.na.drop()

In [10]:
#Replacing 'No internet service' to No for the following columns
replace_cols = [ 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection',
                'TechSupport','StreamingTV', 'StreamingMovies']
#replace values
for col_name in replace_cols:
    dfwithNo = dfWithEmptyReplaced.withColumn(col_name, when(col(col_name)== "No internet service","No").otherwise(col(col_name)))
    
dfwithNo.createOrReplaceTempView("datawrangling")

In [11]:
dfwithNo_pandas = dfwithNo.toPandas()
dfwithNo_pandas.head(5)

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.150002,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.649994,Yes


# SQL code

In [13]:
# Using Spark SQL to create categories 
df_wrangling = spark.sql("""
select distinct 
         customerID
        ,gender
        ,SeniorCitizen
        ,Partner
        ,Dependents
        ,tenure
        ,case when (tenure<=12) then "Tenure_0-12"
              when (tenure>12 and tenure <=24) then "Tenure_12-24"
              when (tenure>24 and tenure <=48) then "Tenure_24-48"
              when (tenure>48 and tenure <=60) then "Tenure_48-60"
              when (tenure>60) then "Tenure_gt_60"
        end as tenure_group
        ,PhoneService
        ,MultipleLines
        ,InternetService
        ,OnlineSecurity
        ,OnlineBackup
        ,DeviceProtection
        ,TechSupport
        ,StreamingTV
        ,StreamingMovies
        ,Contract
        ,PaperlessBilling
        ,PaymentMethod
        ,MonthlyCharges
        ,TotalCharges
        ,Churn
    from datawrangling
""")


In [14]:
df_wrangling_pandas = df_wrangling.toPandas()
df_wrangling_pandas.head(5)

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,tenure_group,PhoneService,MultipleLines,InternetService,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,6497-TILVL,Female,0,Yes,Yes,7,Tenure_0-12,Yes,No,DSL,...,No,No,No,No,Month-to-month,No,Mailed check,50.7,350.350006,No
1,0691-JVSYA,Female,0,Yes,No,53,Tenure_48-60,Yes,No,Fiber optic,...,Yes,No,Yes,Yes,One year,Yes,Bank transfer (automatic),94.85,5000.200195,Yes
2,8544-GOQSH,Female,0,No,No,14,Tenure_12-24,Yes,No,Fiber optic,...,No,Yes,No,No,Month-to-month,Yes,Credit card (automatic),80.05,1112.300049,No
3,5172-MIGPM,Male,0,No,No,4,Tenure_0-12,Yes,Yes,DSL,...,No,Yes,No,Yes,Month-to-month,No,Mailed check,65.55,237.199997,No
4,4312-KFRXN,Male,0,Yes,No,72,Tenure_gt_60,Yes,Yes,No,...,No internet service,No internet service,No internet service,No,Two year,No,Mailed check,25.4,1710.900024,No


###### Variables summary

In [15]:
df_wrangling.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
customerID,7032,,,0002-ORFBO,9995-HOTOH
gender,7032,,,Female,Male
SeniorCitizen,7032,0.16240045506257111,0.368843996757105,0,1
Partner,7032,,,No,Yes
Dependents,7032,,,No,Yes
tenure,7032,32.421786120591584,24.545259709263245,1,72
tenure_group,7032,,,Tenure_0-12,Tenure_gt_60
PhoneService,7032,,,No,Yes
MultipleLines,7032,,,No,Yes


###### Variables correlation

In [16]:
import numpy as np
import plotly.graph_objs as go#visualization
import plotly.offline as py#visualization

df_wrangling_pandas = df_wrangling.toPandas()
#correlation
correlation = df_wrangling_pandas.corr()

correlation

Unnamed: 0,SeniorCitizen,tenure,MonthlyCharges,TotalCharges
SeniorCitizen,1.0,0.015683,0.219874,0.102411
tenure,0.015683,1.0,0.246862,0.82588
MonthlyCharges,0.219874,0.246862,1.0,0.651065
TotalCharges,0.102411,0.82588,0.651065,1.0


#  Model Building
Churn prediction is a straightforward classification problem thus we will use methods such as the Logistic Regression, Decision Tree and Random Forest from Spark ML.

Before training the classification models we create Pipelines, as in scikit-learn, which consists of different stages (Transformers and Estimators) and are really useful when using Big Data tables.

## Pipelines

We encode categorical variables and assemble it with numerical variables.

### Models

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
categoricalColumns = ['gender','SeniorCitizen','Partner','Dependents','PhoneService','MultipleLines','InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract','PaperlessBilling','PaymentMethod']

stages = [] # stages in Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Using OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Adding stages to be used later.
    stages += [stringIndexer, encoder]

In [18]:
# Converting label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="Churn", outputCol="label")
stages += [label_stringIdx]

In [19]:
# Transforming all features into a vector using VectorAssembler
numericCols = ['MonthlyCharges', 'TotalCharges'] 
assemblerInputs = numericCols + [c + "classVec" for c in categoricalColumns]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
IDcols = ['customerID']

In [20]:
# Creating a Pipeline.
pipeline = Pipeline(stages=stages)
# Running the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(df_wrangling)
dataset = pipelineModel.transform(df_wrangling)
# Keeping relevant columns
selectedcols= ["label", "features"] + IDcols
dataset = dataset.select(selectedcols)
dataset_pandas = dataset.toPandas()

In [21]:
 dataset_pandas.head(5)

Unnamed: 0,label,features,customerID
0,0.0,"(50.7, 350.3500061035156, 0.0, 1.0, 0.0, 0.0, ...",6497-TILVL
1,1.0,"(94.85, 5000.2001953125, 0.0, 1.0, 0.0, 1.0, 1...",0691-JVSYA
2,0.0,"(80.05, 1112.300048828125, 0.0, 1.0, 1.0, 1.0,...",8544-GOQSH
3,0.0,"(65.55, 237.1999969482422, 1.0, 1.0, 1.0, 1.0,...",5172-MIGPM
4,0.0,"(25.4, 1710.9000244140625, 1.0, 1.0, 0.0, 1.0,...",4312-KFRXN


##### Creating Training and Test Sets
Before fitting models we decide to divide the data set into 2 parts: 70% to train the model and 30% to test it.

The model is trained by making associations between the input features and the labeled output associated with these features.


In [22]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=200)
trainingData.createOrReplaceTempView("train")
print(trainingData.count())
testData.createOrReplaceTempView("test")
print(testData.count())

4889
2143


#### A- Decision Tree
Each model is evaluated with the AUC (Area Under the Curve) of the Receiver Operating Characteristic (ROC) curve on the Test set.


In [23]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Creating initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Training model with Training Data
dtModel = dt.fit(trainingData)
# Evaluating model
evaluator = BinaryClassificationEvaluator()

# Making predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)

# Evaluating best model
evaluator.evaluate(predictions)


0.7811525258333769

The AUC obtained is 0.7811.

In [24]:
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)

numNodes =  7
depth =  3


##### Confusion Matrix

In [25]:
# Viewing Best model's predictions and probabilities of each prediction class
selecteddt = predictions.select("label", "prediction", "probability")
selecteddt.createOrReplaceTempView("selecteddt")

In [27]:
confusion_matrixdt = spark.sql (""" 
select count(*), label, prediction
from selecteddt
group by label, prediction 
""")

confusion_matrixdt_pandas = confusion_matrixdt.toPandas()
confusion_matrixdt_pandas

Unnamed: 0,count(1),label,prediction
0,256,1.0,1.0
1,115,0.0,1.0
2,336,1.0,0.0
3,1436,0.0,0.0


##### B- Logistic Regression

In [28]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression

# Creating initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Training model with Training Data
lrModel = lr.fit(trainingData)

# Making predictions on test data using the Transformer.transform() method.
predictions = lrModel.transform(testData)

# Evaluating best model
evaluator.evaluate(predictions)

0.8425487261923431

The AUC obtained is 0.8425

In [29]:
# View Best model's predictions and probabilities of each prediction class
selectedlr = predictions.select("label", "prediction", "probability")
selectedlr.createOrReplaceTempView("selectedlr")

In [30]:
confusion_matrixlr = spark.sql (""" 
select count(*), label, prediction
from selectedlr
group by label, prediction 
""")
confusion_matrixlr_pandas = confusion_matrixlr.toPandas()
confusion_matrixlr_pandas

Unnamed: 0,count(1),label,prediction
0,318,1.0,1.0
1,164,0.0,1.0
2,274,1.0,0.0
3,1387,0.0,0.0


##### C- Random Forest

In [31]:
from pyspark.ml.classification import RandomForestClassifier

# Creating an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
# Training model with Training Data
rfModel = rf.fit(trainingData)
rfModel.featureImportances

# Making predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

# Evaluating best model
evaluator.evaluate(predictions)

0.8434417855960408

The AUC obtained is 0.8420

### Features Importance


In [32]:
import pandas as pd
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [33]:
ExtractFeatureImp(rfModel.featureImportances, trainingData, "features").head(10)

Unnamed: 0,idx,name,score
22,22,ContractclassVec_Month-to-month,0.238101
17,17,TechSupportclassVec_No,0.198066
1,1,TotalCharges,0.166983
9,9,InternetServiceclassVec_Fiber optic,0.098778
11,11,OnlineSecurityclassVec_No,0.073433
0,0,MonthlyCharges,0.055682
23,23,ContractclassVec_Two year,0.046893
10,10,InternetServiceclassVec_DSL,0.025855
15,15,DeviceProtectionclassVec_No,0.024417
18,18,TechSupportclassVec_Yes,0.016461


We see that the Contract Tenure and the Technical Support has a huge impact on client resignation.

In [34]:
# View Best model's predictions and probabilities of each prediction class
selectedrf = predictions.select("label", "prediction", "probability")
selectedrf.createOrReplaceTempView("selectedrf")

In [35]:
confusion_matrixrf = spark.sql (""" 
select count(*), label, prediction
from selectedrf
group by label, prediction 
""")
confusion_matrixrf_pandas = confusion_matrixrf.toPandas()
confusion_matrixrf_pandas

Unnamed: 0,count(1),label,prediction
0,276,1.0,1.0
1,110,0.0,1.0
2,316,1.0,0.0
3,1441,0.0,0.0
