##Import data

In [2]:
# File location and type
file_location = "/FileStore/tables/WA_Fn_UseC__Telco_Customer_Churn-89c80.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option('nanValue',' ')\
  .option('nullValue',' ')\
  .load(file_location)

df.show(5)

## Data Cleaning and data process

In [4]:
df.printSchema()

Check NA/NULL value

In [6]:
from pyspark.sql.functions import isnan,when,count,col
df.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in df.columns]).show()

In [7]:
display(df.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in df.columns]))

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,11,0


Only one cloumn monthly charge has 11 Null/NA values.

## Exploratory Data analysis

In [10]:
# Create a view or table

temp_table_name = "SQLtable"

df.createOrReplaceTempView(temp_table_name)

In [11]:
%sql

/* Query the created temp table in a SQL cell */

select * from SQLtable limit 5

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


In [12]:
pd_df= df.toPandas()  #convert spark dataframe to Pandas dataframe,not good for very large dataset

In [13]:
import matplotlib.pyplot as plt
plt.clf()
plt.plot(pd_df['tenure'],pd_df['TotalCharges'],'.')
plt.xlabel('tenure')
plt.ylabel('total charges')
display()

In [14]:
#Our target predictor 'Churn' with 'yes' and 'no' values.
df.groupBy('Churn').count().show()

In [15]:
#descript statistic data information
df.select('tenure','TotalCharges','MonthlyCharges').describe().show()


In [16]:
%sql

select gender,churn,count(*) from SQLtable group by gender,churn

-- there is not much differece between male and female under two different churn.

gender,churn,count(1)
Male,No,2625
Male,Yes,930
Female,No,2549
Female,Yes,939


In [17]:
%sql

select SeniorCitizen,churn,count(*) from SQLtable group by SeniorCitizen,churn

-- for senior citizen,more people will stay on the same telecom business.

SeniorCitizen,churn,count(1)
1,No,666
0,No,4508
0,Yes,1393
1,Yes,476


In [18]:
%sql
select cast(tenure as int),churn,count(churn) from SQLtable group by tenure,churn order by cast(tenure as int)
-- With the incresing of the tenure year, the churn number decrease. There is a strong relationship between churn and tenure.

tenure,churn,count(churn)
0,No,11
1,No,233
1,Yes,380
2,No,115
2,Yes,123
3,No,106
3,Yes,94
4,No,93
4,Yes,83
5,No,69


In [19]:
display(df.stat.crosstab('SeniorCitizen','InternetService'))
# for internet service, most senior people choose fiber optic.There is some correlation but not causation.

SeniorCitizen_InternetService,DSL,Fiber optic,No
1,259,831,52
0,2162,2265,1474


In [20]:
df.stat.freqItems(['PhoneService','MultipleLines','InternetService','OnlineSecurity','OnlineBackup','DeviceProtection','TechSupport','StreamingTV','StreamingMovies'],0.75).collect()
# check the 70% most frquency results of above variables

In [21]:
%sql
select PaperlessBilling,churn,count(*) from SQLtable group by PaperlessBilling,churn
-- for paperlessbilling people are more likely to churn.

PaperlessBilling,churn,count(1)
Yes,Yes,1400
No,No,2403
Yes,No,2771
No,Yes,469


In [22]:
%sql
select PaymentMethod,churn,count(*) from SQLtable group by PaymentMethod,churn
-- Electronic check users are more likely to churn

PaymentMethod,churn,count(1)
Credit card (automatic),No,1290
Bank transfer (automatic),No,1286
Mailed check,Yes,308
Credit card (automatic),Yes,232
Electronic check,No,1294
Electronic check,Yes,1071
Bank transfer (automatic),Yes,258
Mailed check,No,1304


## Model building process

In [24]:
#devide the data set to training and testing date set by 70/30.
model_df = df
(train_data,test_data)= model_df.randomSplit([0.7,0.3],999)

print(str(train_data.count()))
print(str(test_data.count()))

In [25]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator,StringIndexer,VectorAssembler
# choose all categorical varibles
categColumn = ['gender','SeniorCitizen','Partner','Dependents','PhoneService','MultipleLines','InternetService','OnlineSecurity','OnlineBackup','DeviceProtection','TechSupport','StreamingTV','StreamingMovies','Contract','PaperlessBilling','PaymentMethod']

In [26]:
#Using pipeline to transform and estimate.
#1st pipeline transformer
stages = []

for catCol in categColumn:
  stringIndexer = StringIndexer(inputCol = catCol,outputCol = catCol +'Index')
  encoder = OneHotEncoderEstimator(inputCols = [stringIndexer.getOutputCol()],outputCols = [catCol +'catVec'])
  stages += [stringIndexer,encoder]

In [27]:
stages

In [28]:
#2rd pipeline transformer
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols = ['TotalCharges'],outputCols = ['Out_TotalCharges'])
stages += [imputer]

In [29]:
temp2 = imputer.fit(train_data).transform(train_data)

In [30]:
temp2.show(5)

In [31]:
stages

In [32]:
#3rd pipeline transformer
label_Idx = StringIndexer(inputCol = 'Churn',outputCol = 'label')
stages += [label_Idx]

In [33]:
#using label_Inx to transform training data
temp = label_Idx.fit(train_data).transform(train_data)

In [34]:
temp.show(5)
#we have label column after Churn which is show churn status in numeric variable

In [35]:
df.stat.corr('TotalCharges','MonthlyCharges')
#there is some positive correlation bewteen TotalCharges and MonthlyCharges

In [36]:
%sql
select cast(tenure as int),churn,count(*) as churned from SQLtable where churn = 'Yes' group by tenure,churn order by cast(tenure as int)
-- there is some linear relationship between tenure and churn


tenure,churn,churned
1,Yes,380
2,Yes,123
3,Yes,94
4,Yes,83
5,Yes,64
6,Yes,40
7,Yes,51
8,Yes,42
9,Yes,46
10,Yes,45


In [37]:
#Add 4th pipeline transformation --- create 3 bins for tenure
from pyspark.ml.feature import QuantileDiscretizer
tenure_bin = QuantileDiscretizer(numBuckets =3,inputCol = 'tenure',outputCol = 'tenure_bin')
stages += [tenure_bin]

In [38]:
stages

In [39]:
# 5th pipeline transformation -- add numeric col
numericCols = ['tenure_bin','Out_TotalCharges','MonthlyCharges']
assembleInputs = assemblerInputs = [c+'catVec' for c in categColumn] + numericCols
assembler = VectorAssembler(inputCols = assembleInputs,outputCol = 'features')

stages += [assembler]

In [40]:
stages

In [41]:
#apply pipeline to training data
pipeline = Pipeline().setStages(stages)
pipelineModel = pipeline.fit(train_data)

In [42]:
#Transform the train data and test data
trainprepDF = pipelineModel.transform(train_data)
testprepDF = pipelineModel.transform(test_data)

In [43]:
trainprepDF.head(1)

In [44]:
trainprepDF.select('tenure_bin').show()

## Fit model

In [46]:
from pyspark.ml.classification import LogisticRegression

#Create initial LogisticRegression model
lr = LogisticRegression(labelCol='label',featuresCol = 'features',maxIter = 10)

#Train model with Training data
lrModel = lr.fit(trainprepDF)

In [47]:
print('Coefficients: ' + str(lrModel.coefficients))
print('Intercept: ' + str(lrModel.intercept))

In [48]:
summary = lrModel.summary

In [49]:
accuracy = summary.accuracy
falsePositiveRate = summary.weightedFalsePositiveRate
turePositiveRate = summary.weightedTruePositiveRate
fMeasure = summary.weightedFMeasure()
precision = summary.weightedPrecision
recall = summary.weightedRecall
areaunderROC = summary.areaUnderROC
print('Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s\nAreaUnderROC: %s' % (accuracy,falsePositiveRate,turePositiveRate,fMeasure,precision,recall,areaunderROC))

In [50]:
display(lrModel,trainprepDF,'ROC')

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.8347161554445083
0.0,0.04,0.8347161554445083
0.0,0.08,0.767917251623131
0.0,0.12,0.7669676999142464
0.0,0.16,0.7352410974514162
0.0153846153846153,0.16,0.7337593855752097
0.0307692307692307,0.16,0.7094905721735388
0.0307692307692307,0.2,0.7089974406143809
0.0461538461538461,0.2,0.7077603306087087
0.0615384615384615,0.2,0.7016775170813181


In [51]:
display(lrModel,trainprepDF,'fittedVsResiduals')

fitted values,residuals
-0.8012651431623505,-0.3097549575466016
-2.869715527341812,0.9463289013152796
-3.01138869714186,-0.0469140136559805
0.6391154686856171,0.3454465162665997
-3.3577735349944917,-0.0336415300552263
-2.6338847483020738,-0.0669892384805876
1.191267617308192,0.2330323000857536
-0.7107344691086965,0.6705634303170174
-4.884193924045399,-0.0075084166269284
-0.327567547489444,0.5811674062390477


## Evaluation

In [53]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = lrModel.transform(testprepDF)
evaluatorLR = BinaryClassificationEvaluator(rawPredictionCol = 'prediction')
area_under_curve = evaluatorLR.evaluate(predictions)

#default evaluation is areaunderROC
print('aresunderROC = %g' % area_under_curve)

evaluatorLR.getMetricName()

In [54]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

results = predictions.select(['prediction','label'])

# prepare score-label set
results_collect = results.collect()
results_list = [(float(i[0]),float(i[1])) for i in results_collect]
predictionAndLabels = sc.parallelize(results_list)

metrics = BinaryClassificationMetrics(predictionAndLabels)

#Area under precision-recall curve
print('Area under PR = %s' % metrics.areaUnderPR)

#Area under ROC curve
print('Area under ROC = %s' % metrics.areaUnderROC)

predictions.show(1)

In [55]:
test_data.groupBy('Churn').count().show()

In [56]:
#confusion matrix
count = predictions.count()
correct = results.filter(results.prediction == results.label).count()
wrong = results.filter(results.prediction != results.label).count()

tp = results.filter(results.prediction == 1.0).filter(results.prediction == results.label).count()
fp = results.filter(results.prediction == 1.0).filter(results.prediction != results.label).count()
fn = results.filter(results.prediction == 0.0).filter(results.prediction != results.label).count()
tn = results.filter(results.prediction == 0.0).filter(results.prediction == results.label).count()

accuracy = (tp+tn)/count

precision = tp/(tp+fp)

recall = tp/(tp+fn)

print('Correct: %s\nWrong: %s\ntp: %s\nfp: %s\nfn: %s\ntn: %s\naccuracy: %s\nprecision: %s\nrecall: %s'% (correct,wrong,tp,fp,fn,tn,accuracy,precision,recall))


In [57]:
# Cross validation
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
#create ParaGrid for cross validation
paraGrid = (ParamGridBuilder()
           .addGrid(lr.regParam,[0.01,0.5,2.0])
           .addGrid(lr.elasticNetParam,[0.0,0.5,1.0])
           .addGrid(lr.maxIter,[5,10,20])
           .build())

In [58]:
cv = CrossValidator(estimator = lr,estimatorParamMaps = paraGrid,evaluator = evaluatorLR,numFolds = 5)

#Run corss validation
cvModel = cv.fit(trainprepDF)

In [59]:
predictions = cvModel.bestModel.transform(testprepDF)

In [60]:
evaluatorLR.evaluate(predictions)

In [61]:
#confusion matrix
count = predictions.count()
correct = results.filter(results.prediction == results.label).count()
wrong = results.filter(results.prediction != results.label).count()

tp = results.filter(results.prediction == 1.0).filter(results.prediction == results.label).count()
fp = results.filter(results.prediction == 1.0).filter(results.prediction != results.label).count()
fn = results.filter(results.prediction == 0.0).filter(results.prediction != results.label).count()
tn = results.filter(results.prediction == 0.0).filter(results.prediction == results.label).count()

accuracy = (tp+tn)/count

precision = tp/(tp+fp)

recall = tp/(tp+fn)

print('Correct: %s\nWrong: %s\ntp: %s\nfp: %s\nfn: %s\ntn: %s\naccuracy: %s\nprecision: %s\nrecall: %s'% (correct,wrong,tp,fp,fn,tn,accuracy,precision,recall))
#from the result,the cross validation did not make any significant improvement for our model.


In [62]:
cvModel.explainParams()

In [63]:
#Use random Forest to train the mode
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol = 'label', featuresCol = 'features').setImpurity('gini').setMaxDepth(6).setNumTrees(50).setFeatureSubsetStrategy('auto').setSeed(999)

rfModel = rf.fit(trainprepDF)

In [64]:
predictions = rfModel.transform(testprepDF)

In [65]:
results = predictions.select(['prediction','label'])

count = predictions.count()
correct = results.filter(results.prediction == results.label).count()
wrong = results.filter(results.prediction != results.label).count()

tp = results.filter(results.prediction == 1.0).filter(results.prediction == results.label).count()
fp = results.filter(results.prediction == 1.0).filter(results.prediction != results.label).count()
fn = results.filter(results.prediction == 0.0).filter(results.prediction != results.label).count()
tn = results.filter(results.prediction == 0.0).filter(results.prediction == results.label).count()

accuracy = (tp+tn)/count

precision = tp/(tp+fp)

recall = tp/(tp+fn)

print('Correct: %s\nWrong: %s\ntp: %s\nfp: %s\nfn: %s\ntn: %s\naccuracy: %s\nprecision: %s\nrecall: %s'% (correct,wrong,tp,fp,fn,tn,accuracy,precision,recall))