In [1]:
# File location and type
file_location = "/FileStore/tables/telco_churn.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option('nanValue',' ') \
  .option('nullValue',' ') \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes
9305-CDSKC,Female,0,No,No,8,Yes,Yes,Fiber optic,No,No,Yes,No,Yes,Yes,Month-to-month,Yes,Electronic check,99.65,820.5,Yes
1452-KIOVK,Male,0,No,Yes,22,Yes,Yes,Fiber optic,No,Yes,No,No,Yes,No,Month-to-month,Yes,Credit card (automatic),89.1,1949.4,No
6713-OKOMC,Female,0,No,No,10,No,No phone service,DSL,Yes,No,No,No,No,No,Month-to-month,No,Mailed check,29.75,301.9,No
7892-POOKP,Female,0,Yes,No,28,Yes,Yes,Fiber optic,No,No,Yes,Yes,Yes,Yes,Month-to-month,Yes,Electronic check,104.8,3046.05,Yes
6388-TABGU,Male,0,No,Yes,62,Yes,No,DSL,Yes,Yes,No,No,No,No,One year,No,Bank transfer (automatic),56.15,3487.95,No


In [2]:
df.printSchema()

In [3]:
from pyspark.sql.functions import *

df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

In [4]:
# Create a view or table

temp_table_name = "churn_analysis"

df.createOrReplaceTempView(temp_table_name)

In [5]:
%sql

/* Query the created temp table in a SQL cell */

select * from `churn_analysis`

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes
9305-CDSKC,Female,0,No,No,8,Yes,Yes,Fiber optic,No,No,Yes,No,Yes,Yes,Month-to-month,Yes,Electronic check,99.65,820.5,Yes
1452-KIOVK,Male,0,No,Yes,22,Yes,Yes,Fiber optic,No,Yes,No,No,Yes,No,Month-to-month,Yes,Credit card (automatic),89.1,1949.4,No
6713-OKOMC,Female,0,No,No,10,No,No phone service,DSL,Yes,No,No,No,No,No,Month-to-month,No,Mailed check,29.75,301.9,No
7892-POOKP,Female,0,Yes,No,28,Yes,Yes,Fiber optic,No,No,Yes,Yes,Yes,Yes,Month-to-month,Yes,Electronic check,104.8,3046.05,Yes
6388-TABGU,Male,0,No,Yes,62,Yes,No,DSL,Yes,Yes,No,No,No,No,One year,No,Bank transfer (automatic),56.15,3487.95,No


In [6]:
#use if you are working with small data
pd_df = df.toPandas()

In [7]:
import matplotlib.pyplot as plt
plt.clf()
plt.plot(pd_df.tenure,pd_df.TotalCharges,'.')
plt.xlabel('tenure')
plt.ylabel('totalcharges')
display()

In [8]:
df.groupBy('Churn').count().show()

#### The data is a bit imbalanced

In [10]:
df.select('tenure','TotalCharges','MonthlyCharges').describe().show()

In [11]:
%sql 

select gender,churn,count(*) from churn_analysis group by gender,churn

gender,churn,count(1)
Male,No,2625
Male,Yes,930
Female,No,2549
Female,Yes,939


In [12]:
%sql 

select seniorcitizen,churn,count(*) from churn_analysis group by seniorcitizen,churn

-- senior citizens have very high chances of getting chruned

seniorcitizen,churn,count(1)
1,No,666
0,No,4508
0,Yes,1393
1,Yes,476


In [13]:
%sql 

select cast(tenure as int), churn, count(churn) from churn_analysis group by tenure, churn order by cast(tenure as int)


tenure,churn,count(churn)
0,No,11
1,No,233
1,Yes,380
2,No,115
2,Yes,123
3,No,106
3,Yes,94
4,No,93
4,Yes,83
5,No,69


In [14]:
df.stat.crosstab('SeniorCitizen','InternetService').show()

In [15]:
df.stat.freqItems(df.columns,0.6).collect()

In [16]:
%sql 

select PaperlessBilling,churn,count(*) from churn_analysis group by PaperlessBilling,churn

PaperlessBilling,churn,count(1)
Yes,Yes,1400
No,No,2403
Yes,No,2771
No,Yes,469


In [17]:
%sql 

select PaymentMethod,churn,count(*) from churn_analysis group by PaymentMethod,churn

PaymentMethod,churn,count(1)
Credit card (automatic),No,1290
Bank transfer (automatic),No,1286
Mailed check,Yes,308
Credit card (automatic),Yes,232
Electronic check,No,1294
Electronic check,Yes,1071
Bank transfer (automatic),Yes,258
Mailed check,No,1304


In [18]:
churn_df = df

churn_df = churn_df.drop('customerID')

(train_data,test_data) = churn_df.randomSplit([0.7,0.3],42)

print('Records for training: ' +  str(train_data.count()))
print('Records for evaluation: ' + str(test_data.count()))

In [19]:
train_data.show(1)

In [20]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

#StringIndexer - converts string value into a numerical value
#VectorAssembler - Once done with all transformations you can create a spark sparse matrix for the model to run through


In [21]:
pd_df = churn_df.limit(2).toPandas()
pd_df['SeniorCitizen'] = pd_df['SeniorCitizen'].astype(object)

In [22]:
pd_df.dtypes

In [23]:
catColumns = [c for c in pd_df.columns if c!='Churn' and pd_df[c].dtypes =='O']

In [24]:
catColumns

In [25]:
stages = []

for catCol in catColumns:
  stringIndexer = StringIndexer(inputCol = catCol, outputCol = catCol + "Index")
  encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols= [catCol + "catVec"])
  
  stages += [stringIndexer,encoder]

In [26]:
stages

In [27]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols =['TotalCharges'], outputCols = ['Out_TotalCharges'])
stages += [imputer]

In [28]:
label_Idx = StringIndexer(inputCol ="Churn", outputCol="label")
stages += [label_Idx]

In [29]:
temp = label_Idx.fit(train_data).transform(train_data)
#how the labeling works is it takes the highest count and start from 0. order by desc and start from 0 is the logic.

In [30]:
temp.show(1)

In [31]:
df.stat.corr('TotalCharges','MonthlyCharges')

In [32]:
from pyspark.ml.feature import QuantileDiscretizer

tenure_bin = QuantileDiscretizer(numBuckets=3,inputCol='tenure',outputCol='tenure_bin')

stages += [tenure_bin]

In [33]:
stages

In [34]:
numColumns = ['tenure_bin', 'MonthlyCharges', 'Out_TotalCharges']

assembleInputs = [c + 'catVec' for c in catColumns] + numColumns

assembler = assembler = VectorAssembler(inputCols=assembleInputs, outputCol="features")

stages += [assembler]

In [35]:
numColumns

In [36]:
pipeline = Pipeline().setStages(stages)
pipelineModel = pipeline.fit(train_data)

In [37]:
trainprepDF = pipelineModel.transform(train_data)
testprepDF = pipelineModel.transform(test_data)

In [38]:
trainprepDF.head(1)

In [39]:
#we are kinda done with the data preparation, we shall build the estimator 

from pyspark.ml.classification import LogisticRegression

#create intital LogisticRegression model

lr = LogisticRegression(labelCol='label', featuresCol='features',maxIter=10)

lrmodel = lr.fit(trainprepDF)

In [40]:
print('Coefficients: ' +str(lrmodel.coefficients))
print('Intercept: ' +str(lrmodel.intercept))

In [41]:
summary = lrmodel.summary

In [42]:
accuracy = summary.accuracy
falsePositiveRate = summary.weightedFalsePositiveRate
truePositiveRate = summary.weightedTruePositiveRate
fMeasure = summary.weightedFMeasure()
precision = summary.weightedPrecision
recall = summary.weightedRecall

print('Acc: %s\nFPR: %s\nTPR : %s\nF-measure : %s\nPrecision : %s\nRecall : %s\nAUC ROC : %s'     %(accuracy,falsePositiveRate,truePositiveRate,fMeasure,precision,recall,summary.areaUnderROC))

In [43]:
display(lrmodel,trainprepDF,"ROC")

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.8365104114005518
0.0,0.04,0.8365104114005518
0.0,0.08,0.8280370240546178
0.0,0.12,0.8087161469017103
0.0153846153846153,0.12,0.7816503346899251
0.0307692307692307,0.12,0.7649222704756213
0.0307692307692307,0.16,0.690987855969101
0.0307692307692307,0.2,0.6689519581259424
0.0461538461538461,0.2,0.6420208293273202
0.0461538461538461,0.24,0.6385845186034268


In [44]:
display(lrmodel, trainprepDF,"fittedVsResiduals")

fitted values,residuals
0.0726464348380802,-0.5181536255916214
0.147775594530783,-0.5368768145368098
0.8047416718425023,0.309012144030899
-1.5497471518857426,-0.1751227902871741
-0.1022561741057846,0.5255417912978873
1.4416895196874484,0.1912838530982896
-0.1475332118781847,-0.4631834520486895
-1.7887907504903109,-0.1432210449996921
-0.9027360962637055,-0.2884885534442123
0.5692258227431949,0.3614154813965732


In [45]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = lrmodel.transform(testprepDF)
evaluatorLR = BinaryClassificationEvaluator(rawPredictionCol = "prediction")
AUC = evaluatorLR.evaluate(predictions)

#default evaluation is AUC ROC
print("AUC ROC = %g" %AUC)

evaluatorLR.getMetricName()

In [46]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

results = predictions.select(['prediction','label'])

#prepare score-label set

results_collect = results.collect()
results_list = [(float(i[0]),float(i[1])) for i in results_collect]

predictionAndLabels = sc.parallelize(results_list) #to parallelize spark context

metrics = BinaryClassificationMetrics(predictionAndLabels)

#area under precision-recall curve
print("Area under PR = %s" %metrics.areaUnderPR)

#area under ROC Curve
print("Area under ROC = %s" %metrics.areaUnderROC)

predictions.show(1)

##### It is clear that the model is overfit. Let's try to do something to generalize the model well

In [48]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

#Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
            .addGrid(lr.regParam, [0.01,0.5,2.0])
            .addGrid(lr.elasticNetParam,[0.0,0.5,1.0])
            .addGrid(lr.maxIter, [5,10,20])
            .build())

In [49]:
cv = CrossValidator(estimator=lr,estimatorParamMaps=paramGrid,evaluator=evaluatorLR,numFolds=5)

#Run cross validations
cvModel = cv.fit(trainprepDF)

In [50]:
predictions = cvModel.bestModel.transform(testprepDF)

In [51]:
evaluatorLR.evaluate(predictions)

In [52]:
cvModel.explainParams()

In [53]:
from pyspark.ml.classification import RandomForestClassifier 

In [54]:
rf = RandomForestClassifier(labelCol="label",featuresCol='features').setImpurity('gini').setMaxDepth(6).setNumTrees(50).setFeatureSubsetStrategy('auto').setSeed(1010)

rfModel = rf.fit(trainprepDF)

In [55]:
predictions = rfModel.transform(testprepDF)

In [56]:
evaluatorLR = BinaryClassificationEvaluator(rawPredictionCol = "prediction")
AUC = evaluatorLR.evaluate(predictions)

#default evaluation is AUC ROC
print("AUC ROC = %g" %AUC)

evaluatorLR.getMetricName()

In [57]:
results = predictions.select(['prediction','label'])

In [58]:
count = predictions.count()
correct = results.filter(results.prediction == results.label).count()
wrong = results.filter(results.prediction != results.label).count()
tp = results.filter(results.prediction == 1.0).filter(results.prediction == results.label).count()
fp = results.filter(results.prediction == 1.0).filter(results.prediction != results.label).count()
fn = results.filter(results.prediction == 0.0).filter(results.prediction != results.label).count()
tn = results.filter(results.prediction == 0.0).filter(results.prediction == results.label).count()

accuracy = (tp+tn)/count

precision = tp/(tp+fp)

recall = tp/(tp+fn)

print('correct: %s\nwrong: %s\ntp: %s\nfp: %s\nfn: %s\ntn: %s\nacc: %s\nprecision: %s\nrecall: %s' %(correct,wrong,tp,fp,fn,tn,accuracy,precision,recall))