##### Load Data

In [2]:
sparkDF = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/churn_telcov2.csv')
sparkDF.cache()

In [3]:
sparkDF.columns

In [4]:
sparkDF.count()

In [5]:
display(sparkDF)

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes
9305-CDSKC,Female,0,No,No,8,Yes,Yes,Fiber optic,No,No,Yes,No,Yes,Yes,Month-to-month,Yes,Electronic check,99.65,820.5,Yes
1452-KIOVK,Male,0,No,Yes,22,Yes,Yes,Fiber optic,No,Yes,No,No,Yes,No,Month-to-month,Yes,Credit card (automatic),89.1,1949.4,No
6713-OKOMC,Female,0,No,No,10,No,No phone service,DSL,Yes,No,No,No,No,No,Month-to-month,No,Mailed check,29.75,301.9,No
7892-POOKP,Female,0,Yes,No,28,Yes,Yes,Fiber optic,No,No,Yes,Yes,Yes,Yes,Month-to-month,Yes,Electronic check,104.8,3046.05,Yes
6388-TABGU,Male,0,No,Yes,62,Yes,No,DSL,Yes,Yes,No,No,No,No,One year,No,Bank transfer (automatic),56.15,3487.95,No


#### Data wrangling

In [7]:
#Replacing spaces with null values in total charges column
from pyspark.sql.functions import *
dfWithEmptyReplaced = sparkDF.withColumn('TotalCharges', when(col('TotalCharges') == ' ', None).otherwise(col('TotalCharges')).cast("float"))
dfWithEmptyReplaced = dfWithEmptyReplaced.na.drop()

In [8]:
#Replacing 'No internet service' to No for the following columns
replace_cols = [ 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection',
                'TechSupport','StreamingTV', 'StreamingMovies']
#replace values
for col_name in replace_cols:
    dfwithNo = dfWithEmptyReplaced.withColumn(col_name, when(col(col_name)== "No internet service","No").otherwise(col(col_name)))
    
display(dfwithNo)
dfwithNo.createOrReplaceTempView("datawrangling")

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes
9305-CDSKC,Female,0,No,No,8,Yes,Yes,Fiber optic,No,No,Yes,No,Yes,Yes,Month-to-month,Yes,Electronic check,99.65,820.5,Yes
1452-KIOVK,Male,0,No,Yes,22,Yes,Yes,Fiber optic,No,Yes,No,No,Yes,No,Month-to-month,Yes,Credit card (automatic),89.1,1949.4,No
6713-OKOMC,Female,0,No,No,10,No,No phone service,DSL,Yes,No,No,No,No,No,Month-to-month,No,Mailed check,29.75,301.9,No
7892-POOKP,Female,0,Yes,No,28,Yes,Yes,Fiber optic,No,No,Yes,Yes,Yes,Yes,Month-to-month,Yes,Electronic check,104.8,3046.05,Yes
6388-TABGU,Male,0,No,Yes,62,Yes,No,DSL,Yes,Yes,No,No,No,No,One year,No,Bank transfer (automatic),56.15,3487.95,No


In [9]:
# Using Spark SQL to create categories 
df_wrangling = spark.sql("""
select distinct 
         customerID
        ,gender
        ,SeniorCitizen
        ,Partner
        ,Dependents
        ,tenure
        ,case when (tenure<=12) then "Tenure_0-12"
              when (tenure>12 and tenure <=24) then "Tenure_12-24"
              when (tenure>24 and tenure <=48) then "Tenure_24-48"
              when (tenure>48 and tenure <=60) then "Tenure_48-60"
              when (tenure>60) then "Tenure_gt_60"
        end as tenure_group
        ,PhoneService
        ,MultipleLines
        ,InternetService
        ,OnlineSecurity
        ,OnlineBackup
        ,DeviceProtection
        ,TechSupport
        ,StreamingTV
        ,StreamingMovies
        ,Contract
        ,PaperlessBilling
        ,PaymentMethod
        ,MonthlyCharges
        ,TotalCharges
        ,Churn
    from datawrangling
""")


#### Exploratory Analysis

In [11]:
display(df_wrangling)

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,tenure_group,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
6497-TILVL,Female,0,Yes,Yes,7,Tenure_0-12,Yes,No,DSL,Yes,No,No,No,No,No,Month-to-month,No,Mailed check,50.7,350.35,No
0691-JVSYA,Female,0,Yes,No,53,Tenure_48-60,Yes,No,Fiber optic,No,No,Yes,No,Yes,Yes,One year,Yes,Bank transfer (automatic),94.85,5000.2,Yes
8544-GOQSH,Female,0,No,No,14,Tenure_12-24,Yes,No,Fiber optic,No,Yes,No,Yes,No,No,Month-to-month,Yes,Credit card (automatic),80.05,1112.3,No
5172-MIGPM,Male,0,No,No,4,Tenure_0-12,Yes,Yes,DSL,No,No,No,Yes,No,Yes,Month-to-month,No,Mailed check,65.55,237.2,No
4312-KFRXN,Male,0,Yes,No,72,Tenure_gt_60,Yes,Yes,No,No internet service,No internet service,No internet service,No internet service,No internet service,No,Two year,No,Mailed check,25.4,1710.9,No
5875-YPQFJ,Male,0,No,No,1,Tenure_0-12,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,69.9,69.9,Yes
6467-CHFZW,Male,0,Yes,Yes,47,Tenure_24-48,Yes,Yes,Fiber optic,No,Yes,No,No,Yes,Yes,Month-to-month,Yes,Electronic check,99.35,4749.15,Yes
3714-NTNFO,Female,0,No,No,49,Tenure_48-60,Yes,Yes,Fiber optic,No,No,No,No,No,Yes,Month-to-month,Yes,Electronic check,84.5,3906.7,No
9560-BBZXK,Female,0,No,No,36,Tenure_24-48,No,No phone service,DSL,Yes,No,No,No,No,No,Two year,No,Bank transfer (automatic),31.05,1126.35,No
4484-GLZOU,Female,0,Yes,No,52,Tenure_48-60,Yes,Yes,Fiber optic,No,Yes,Yes,No,Yes,Yes,Month-to-month,Yes,Electronic check,105.05,5624.85,Yes


###### Variables summary

In [13]:
df_wrangling.describe().toPandas().transpose()

###### Variables correlation

In [15]:
import numpy as np
import plotly.graph_objs as go#visualization
import plotly.offline as py#visualization

df_wrangling_pandas = df_wrangling.toPandas()
#correlation
correlation = df_wrangling_pandas.corr()

correlation

#### Models

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ['gender','SeniorCitizen','Partner','Dependents','PhoneService','MultipleLines','InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract','PaperlessBilling','PaymentMethod']

stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [18]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="Churn", outputCol="label")
stages += [label_stringIdx]

In [19]:
# Transform all features into a vector using VectorAssembler
numericCols = ['MonthlyCharges', 'TotalCharges']#'TotalRmbRCN1', 
assemblerInputs = numericCols + [c + "classVec" for c in categoricalColumns]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
IDcols = ['customerID']

In [20]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(df_wrangling)
dataset = pipelineModel.transform(df_wrangling)
# Keep relevant columns
selectedcols= ["label", "features"] + IDcols
dataset = dataset.select(selectedcols)
display(dataset)

label,features,customerID
0.0,"List(0, 28, List(0, 1, 3, 6, 7, 10, 12, 13, 15, 17, 19, 21, 22, 26), List(50.7, 350.3500061035156, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",6497-TILVL
1.0,"List(0, 28, List(0, 1, 3, 5, 6, 7, 9, 11, 13, 16, 17, 20, 24, 27), List(94.85, 5000.2001953125, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",0691-JVSYA
0.0,"List(0, 28, List(0, 1, 3, 4, 5, 6, 7, 9, 11, 14, 15, 18, 19, 21, 22, 24), List(80.05, 1112.300048828125, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",8544-GOQSH
0.0,"List(0, 28, List(0, 1, 2, 3, 4, 5, 6, 8, 10, 11, 13, 15, 18, 19, 22, 26), List(65.55, 237.1999969482422, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",5172-MIGPM
0.0,"List(0, 28, List(0, 1, 2, 3, 5, 6, 8, 21, 23, 26), List(25.4, 1710.9000244140625, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",4312-KFRXN
1.0,"List(1, 28, List(), List(69.9, 69.9000015258789, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0))",5875-YPQFJ
0.0,"List(0, 28, List(0, 1, 2, 3, 4, 5, 6, 7, 9, 11, 13, 16, 17, 19, 22), List(85.7, 256.75, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",3916-NRPAP
0.0,"List(0, 28, List(0, 1, 3, 4, 5, 10, 12, 14, 16, 18, 20, 21, 24, 25), List(53.6, 2879.199951171875, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",4598-ZADCK
0.0,"List(0, 28, List(0, 1, 2, 3, 4, 5, 6, 8, 9, 12, 13, 15, 17, 19, 22, 26), List(90.75, 174.75, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",4137-BTIKL
0.0,"List(0, 28, List(0, 1, 3, 6, 8, 10, 12, 14, 16, 18, 19, 24, 25), List(78.45, 5333.35009765625, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",1032-MAELW


##### Create Training and Test Set

In [22]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=200)
trainingData.createOrReplaceTempView("train")
print(trainingData.count())
testData.createOrReplaceTempView("test")
print(testData.count())

#### Decision Tree

In [24]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)
# Evaluate model
evaluator = BinaryClassificationEvaluator()

# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)

# Evaluate best model
evaluator.evaluate(predictions)

In [25]:
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)

##### Confusion Matrix

In [27]:
# View Best model's predictions and probabilities of each prediction class
selecteddt = predictions.select("label", "prediction", "probability")
selecteddt.createOrReplaceTempView("selecteddt")

In [28]:
confusion_matrixdt = spark.sql (""" 
select count(*), label, prediction
from selecteddt
group by label, prediction 
""")

display(confusion_matrixdt)

count(1),label,prediction
194,1.0,1.0
95,0.0,1.0
337,1.0,0.0
1450,0.0,0.0


##### Logistic Regression

In [30]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

# Make predictions on test data using the Transformer.transform() method.
predictions = lrModel.transform(testData)

# Evaluate best model
evaluator.evaluate(predictions)

In [31]:
# View Best model's predictions and probabilities of each prediction class
selectedlr = predictions.select("label", "prediction", "probability")
selectedlr.createOrReplaceTempView("selectedlr")

In [32]:
confusion_matrixlr = spark.sql (""" 
select count(*), label, prediction
from selectedlr
group by label, prediction 
""")

display(confusion_matrixlr)

count(1),label,prediction
307,1.0,1.0
185,0.0,1.0
224,1.0,0.0
1360,0.0,0.0


##### Random Forest

In [34]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
# Train model with Training Data
rfModel = rf.fit(trainingData)
rfModel.featureImportances

# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

# Evaluate best model
evaluator.evaluate(predictions)

In [35]:
import pandas as pd
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [36]:
ExtractFeatureImp(rfModel.featureImportances, trainingData, "features").head(10)

In [37]:
# View Best model's predictions and probabilities of each prediction class
selectedrf = predictions.select("label", "prediction", "probability")
selectedrf.createOrReplaceTempView("selectedrf")

In [38]:
confusion_matrixrf = spark.sql (""" 
select count(*), label, prediction
from selectedrf
group by label, prediction 
""")

display(confusion_matrixrf)

count(1),label,prediction
246,1.0,1.0
134,0.0,1.0
285,1.0,0.0
1411,0.0,0.0
