In [36]:
import sys 
import os
import warnings
warnings.filterwarnings('ignore')
%load_ext autoreload
sys.path.insert(0, '../../Telcom-Customer-Churn-Prediction')
from src.config import TRAINING_FILE,MODELS,MODEL_TRAINING_FILE
from src.utility import get_spark

from pyspark.ml import Pipeline
from pyspark.ml.feature import  StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, lit,when
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, NaiveBayes

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [37]:
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","
file_type = 'csv'
# The applied options are for CSV files. For other file types, these will be ignored.
df = get_spark().read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(TRAINING_FILE)

display(df.toPandas())

                                                                                

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.30,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.70,151.65,Yes
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7038,6840-RESVB,Male,0,Yes,Yes,24,Yes,Yes,DSL,Yes,...,Yes,Yes,Yes,Yes,One year,Yes,Mailed check,84.80,1990.5,No
7039,2234-XADUH,Female,0,Yes,Yes,72,Yes,Yes,Fiber optic,No,...,Yes,No,Yes,Yes,One year,Yes,Credit card (automatic),103.20,7362.9,No
7040,4801-JZAZL,Female,0,Yes,Yes,11,No,No phone service,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.60,346.45,No
7041,8361-LTMKD,Male,1,Yes,No,4,Yes,Yes,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Mailed check,74.40,306.6,Yes


In [38]:
stages = []
categoricalColumns = ['gender','Partner','Dependents','PhoneService','MultipleLines','InternetService','OnlineSecurity','OnlineBackup','DeviceProtection',
                     'TechSupport','StreamingTV','StreamingMovies','Contract','PaperlessBilling','PaymentMethod']
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    stages += [stringIndexer]
label_stringIdx = StringIndexer(inputCol = 'Churn', outputCol = 'label')
stages += [label_stringIdx]

In [39]:
numericCols = list(set(df.columns) - set(categoricalColumns) - set(['customerID','Churn','TotalCharges']))

In [40]:
assemblerInputs = [c + "Index" for c in categoricalColumns] + numericCols

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

stages += [assembler]


In [41]:
# Fitting Pipeline
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(df)

In [42]:
preppedDataDF = pipelineModel.transform(df)

In [43]:
selectedcols = ["label", "features"] + df.columns
dataset = preppedDataDF.select(selectedcols)

dataset.toPandas().head(5)

Unnamed: 0,label,features,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,0.0,"(1.0, 1.0, 0.0, 1.0, 2.0, 1.0, 0.0, 1.0, 0.0, ...",7590-VHVEG,Female,0,Yes,No,1,No,No phone service,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, ...",5575-GNVDE,Male,0,No,No,34,Yes,No,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, ...",3668-QPYBK,Male,0,No,No,2,Yes,No,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,0.0,"[0.0, 0.0, 0.0, 1.0, 2.0, 1.0, 1.0, 0.0, 1.0, ...",7795-CFOCW,Male,0,No,No,45,No,No phone service,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,1.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",9237-HQITU,Female,0,No,No,2,Yes,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


In [44]:
train, test = dataset.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 4978
Test Dataset Count: 2065


In [45]:
print (train)

DataFrame[label: double, features: vector, customerID: string, gender: string, SeniorCitizen: int, Partner: string, Dependents: string, tenure: int, PhoneService: string, MultipleLines: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, DeviceProtection: string, TechSupport: string, StreamingTV: string, StreamingMovies: string, Contract: string, PaperlessBilling: string, PaymentMethod: string, MonthlyCharges: double, TotalCharges: string, Churn: string]


In [46]:
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data with Logistic Regression
lrModel = lr.fit(train)

In [47]:
rf = RandomForestClassifier(numTrees=50,labelCol="label", featuresCol="features")
rfModel = rf.fit(train)

In [48]:
nb = NaiveBayes(labelCol="label", featuresCol="features")
nb_model=nb.fit(train)

In [49]:
lrPreds = lrModel.transform(test)
lrPreds.select('label', 'rawPrediction', 'prediction', 'probability').show(10)

+-----+--------------------+----------+--------------------+
|label|       rawPrediction|prediction|         probability|
+-----+--------------------+----------+--------------------+
|  0.0|[1.49260902672313...|       0.0|[0.81646955031704...|
|  0.0|[0.74675652261104...|       0.0|[0.67847155134241...|
|  0.0|[2.14010937083949...|       0.0|[0.89474091142552...|
|  0.0|[0.47608243207585...|       0.0|[0.61682237241467...|
|  0.0|[0.32169904943477...|       0.0|[0.57973826761698...|
|  0.0|[0.42132266388738...|       0.0|[0.60379970835508...|
|  0.0|[1.12154076924831...|       0.0|[0.75427440146784...|
|  0.0|[-0.9512708137628...|       1.0|[0.27862932289240...|
|  0.0|[1.87354732869217...|       0.0|[0.86686819964147...|
|  0.0|[0.14270874716200...|       0.0|[0.53561676035323...|
+-----+--------------------+----------+--------------------+
only showing top 10 rows



In [50]:
rfPreds = rfModel.transform(test)
rfPreds.select('label', 'rawPrediction', 'prediction', 'probability').show(10)

+-----+--------------------+----------+--------------------+
|label|       rawPrediction|prediction|         probability|
+-----+--------------------+----------+--------------------+
|  0.0|[39.5305776660646...|       0.0|[0.79061155332129...|
|  0.0|[37.3316337962377...|       0.0|[0.74663267592475...|
|  0.0|[41.1909074896073...|       0.0|[0.82381814979214...|
|  0.0|[35.4273577526957...|       0.0|[0.70854715505391...|
|  0.0|[34.0080557844228...|       0.0|[0.68016111568845...|
|  0.0|[29.9990585340560...|       0.0|[0.59998117068112...|
|  0.0|[37.5168768750253...|       0.0|[0.75033753750050...|
|  0.0|[20.5471047296826...|       1.0|[0.41094209459365...|
|  0.0|[41.1280180526863...|       0.0|[0.82256036105372...|
|  0.0|[28.7652786706687...|       0.0|[0.57530557341337...|
+-----+--------------------+----------+--------------------+
only showing top 10 rows



In [51]:
nbPreds = nb_model.transform(test)
nbPreds.select('label', 'rawPrediction', 'prediction', 'probability').show(10)

+-----+--------------------+----------+--------------------+
|label|       rawPrediction|prediction|         probability|
+-----+--------------------+----------+--------------------+
|  0.0|[-103.66415518465...|       0.0|[0.99999822767701...|
|  0.0|[-78.301512209024...|       0.0|[0.99910409084695...|
|  0.0|[-121.80688887799...|       0.0|[0.99991709719558...|
|  0.0|[-79.342028729604...|       1.0|[1.97611882195788...|
|  0.0|[-77.997701982655...|       1.0|[4.56617845482170...|
|  0.0|[-68.424879094722...|       1.0|[4.62794428756894...|
|  0.0|[-144.37211843203...|       0.0|[0.83396941013991...|
|  0.0|[-102.27955142923...|       1.0|[6.53176875240345...|
|  0.0|[-169.43735994978...|       0.0|[0.99999997660592...|
|  0.0|[-131.94742151101...|       1.0|[0.07356273172467...|
+-----+--------------------+----------+--------------------+
only showing top 10 rows



In [52]:
correct = lrPreds.where("(label = prediction)").count()
incorrect = lrPreds.where("(label != prediction)").count()

resultDF = get_spark().createDataFrame([['correct', correct], ['incorrect', incorrect]], ['metric', 'value'])
display(resultDF)

DataFrame[metric: string, value: bigint]

In [53]:
# AUC 
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(lrPreds)

0.841397887954177

In [54]:
print("Classification report of Logistic")
lrPreds.groupBy('label', 'prediction').count().show()
# Calculate the elements of the confusion matrix
TN = lrPreds.filter('prediction = 0 AND label = prediction').count()
TP = lrPreds.filter('prediction = 1 AND label = prediction').count()
FN = lrPreds.filter('prediction = 0 AND label <> prediction').count()
FP = lrPreds.filter('prediction = 1 AND label <> prediction').count()
# calculate accuracy, precision, recall, and F1-score
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
print('n precision: %0.3f' % precision)
print('n recall: %0.3f' % recall)
print('n accuracy: %0.3f' % accuracy)
print('n F1 score: %0.3f' % F)

Classification report of Logistic
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  293|
|  0.0|       1.0|  143|
|  1.0|       0.0|  262|
|  0.0|       0.0| 1367|
+-----+----------+-----+

n precision: 0.672
n recall: 0.528
n accuracy: 0.804
n F1 score: 0.591


In [55]:
correct = rfPreds.where("(label = prediction)").count()
incorrect = rfPreds.where("(label != prediction)").count()

resultDF_RF = get_spark().createDataFrame([['correct', correct], ['incorrect', incorrect]], ['metric', 'value'])
display(resultDF_RF)

DataFrame[metric: string, value: bigint]

In [56]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(rfPreds)

0.8422152616192327

In [57]:
print("Classification report of Random Forest")
rfPreds.groupBy('label', 'prediction').count().show()
# Calculate the elements of the confusion matrix
TN = rfPreds.filter('prediction = 0 AND label = prediction').count()
TP = rfPreds.filter('prediction = 1 AND label = prediction').count()
FN = rfPreds.filter('prediction = 0 AND label <> prediction').count()
FP = rfPreds.filter('prediction = 1 AND label <> prediction').count()
# calculate accuracy, precision, recall, and F1-score
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
print('n precision: %0.3f' % precision)
print('n recall: %0.3f' % recall)
print('n accuracy: %0.3f' % accuracy)
print('n F1 score: %0.3f' % F)

Classification report of Random Forest
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  207|
|  0.0|       1.0|   90|
|  1.0|       0.0|  348|
|  0.0|       0.0| 1420|
+-----+----------+-----+

n precision: 0.697
n recall: 0.373
n accuracy: 0.788
n F1 score: 0.486


In [58]:
correct = nbPreds.where("(label = prediction)").count()
incorrect = nbPreds.where("(label != prediction)").count()

resultDF_NB = get_spark().createDataFrame([['correct', correct], ['incorrect', incorrect]], ['metric', 'value'])
display(resultDF_NB)

DataFrame[metric: string, value: bigint]

In [59]:
print("Classification report of Naive Bayes")
nbPreds.groupBy('label', 'prediction').count().show()
# Calculate the elements of the confusion matrix
TN = nbPreds.filter('prediction = 0 AND label = prediction').count()
TP = nbPreds.filter('prediction = 1 AND label = prediction').count()
FN = nbPreds.filter('prediction = 0 AND label <> prediction').count()
FP = nbPreds.filter('prediction = 1 AND label <> prediction').count()
# calculate accuracy, precision, recall, and F1-score
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
print('n precision: %0.3f' % precision)
print('n recall: %0.3f' % recall)
print('n accuracy: %0.3f' % accuracy)
print('n F1 score: %0.3f' % F)

Classification report of Naive Bayes
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  411|
|  0.0|       1.0|  392|
|  1.0|       0.0|  144|
|  0.0|       0.0| 1118|
+-----+----------+-----+

n precision: 0.512
n recall: 0.741
n accuracy: 0.740
n F1 score: 0.605


In [60]:
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [61]:
# Running with Cross validation
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)

In [62]:
predictions = cvModel.transform(test)

In [63]:
correct = predictions.where("(label = prediction)").count()
incorrect = predictions.where("(label != prediction)").count()

resultDF_cv = get_spark().createDataFrame([['correct', correct], ['incorrect', incorrect]], ['metric', 'value'])
display(resultDF_cv)

DataFrame[metric: string, value: bigint]

In [64]:
print("Classification report of Logistic with CV")
predictions.groupBy('label', 'prediction').count().show()
# Calculate the elements of the confusion matrix
TN = predictions.filter('prediction = 0 AND label = prediction').count()
TP = predictions.filter('prediction = 1 AND label = prediction').count()
FN = predictions.filter('prediction = 0 AND label <> prediction').count()
FP = predictions.filter('prediction = 1 AND label <> prediction').count()
# calculate accuracy, precision, recall, and F1-score
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
print('n precision: %0.3f' % precision)
print('n recall: %0.3f' % recall)
print('n accuracy: %0.3f' % accuracy)
print('n F1 score: %0.3f' % F)

Classification report of Logistic with CV
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  295|
|  0.0|       1.0|  138|
|  1.0|       0.0|  260|
|  0.0|       0.0| 1372|
+-----+----------+-----+

n precision: 0.681
n recall: 0.532
n accuracy: 0.807
n F1 score: 0.597


In [65]:
paramGrid_RF = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [5, 10, 15])
             .build())

In [66]:
cv_rf = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid_RF, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel_rf = cv_rf.fit(train)

23/12/13 13:26:12 WARN DAGScheduler: Broadcasting large task binary with size 1050.2 KiB
23/12/13 13:26:12 WARN DAGScheduler: Broadcasting large task binary with size 1701.2 KiB
23/12/13 13:26:12 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
23/12/13 13:26:13 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB
23/12/13 13:26:14 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/12/13 13:26:15 WARN DAGScheduler: Broadcasting large task binary with size 1050.2 KiB
23/12/13 13:26:15 WARN DAGScheduler: Broadcasting large task binary with size 1701.2 KiB
23/12/13 13:26:16 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
23/12/13 13:26:16 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB
23/12/13 13:26:17 WARN DAGScheduler: Broadcasting large task binary with size 4.8 MiB
23/12/13 13:26:18 WARN DAGScheduler: Broadcasting large task binary with size 5.9 MiB
23/12/13 13:26:18 WARN DAGScheduler: Broad

In [67]:
predictions_rf = cvModel_rf.transform(test)

In [68]:
correct = predictions_rf.where("(label = prediction)").count()
incorrect = predictions_rf.where("(label != prediction)").count()

resultDF_cv = get_spark().createDataFrame([['correct', correct], ['incorrect', incorrect]], ['metric', 'value'])
display(resultDF_cv)

23/12/13 13:27:19 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
23/12/13 13:27:19 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


DataFrame[metric: string, value: bigint]

In [69]:
evaluator.evaluate(predictions_rf)

23/12/13 13:27:19 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


0.8409754787900419

In [70]:
print("Classification report of Random Forest with CV")
predictions_rf.groupBy('label', 'prediction').count().show()
# Calculate the elements of the confusion matrix
TN = predictions_rf.filter('prediction = 0 AND label = prediction').count()
TP = predictions_rf.filter('prediction = 1 AND label = prediction').count()
FN = predictions_rf.filter('prediction = 0 AND label <> prediction').count()
FP = predictions_rf.filter('prediction = 1 AND label <> prediction').count()
# calculate accuracy, precision, recall, and F1-score
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
print('n precision: %0.3f' % precision)
print('n recall: %0.3f' % recall)
print('n accuracy: %0.3f' % accuracy)
print('n F1 score: %0.3f' % F)

Classification report of Random Forest with CV


23/12/13 13:27:20 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
23/12/13 13:27:21 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  276|
|  0.0|       1.0|  143|
|  1.0|       0.0|  279|
|  0.0|       0.0| 1367|
+-----+----------+-----+



23/12/13 13:27:21 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
23/12/13 13:27:21 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
23/12/13 13:27:22 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


n precision: 0.659
n recall: 0.497
n accuracy: 0.796
n F1 score: 0.567


23/12/13 13:27:22 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


In [72]:
#lrModel.save('../models/logistic_regression')
#rfModel.save('../models/random_forrest')
nb_model.save('../models/naive_bayes')