In [1]:
sc

In [2]:
spark

In [3]:
teleCus_df = spark.read.csv("file:///home/hadoop/Downloads/Telco_Customer_Churn.csv", inferSchema=True, header=True)

## 1. Data Exploration

In [4]:
teleCus_df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [5]:
# How many records are there in the dataset?
teleCus_df.select("customerID").count()

7043

In [6]:
# What is the distribution of gender among customers?
teleCus_df.select("gender").distinct().show()
teleCus_df.createOrReplaceTempView('telecus')

teleCus_df.select("gender").distinct().show()

+------+
|gender|
+------+
|Female|
|  Male|
+------+

+------+
|gender|
+------+
|Female|
|  Male|
+------+



In [7]:
teleCus_df.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|  

### 

In [8]:
teleCus_df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [9]:
from pyspark.sql.functions import *

In [10]:
teleCus_df.withColumn('TotalCharges',when(col('TotalCharges')==" ",None).otherwise(col('TotalCharges')))

DataFrame[customerID: string, gender: string, SeniorCitizen: int, Partner: string, Dependents: string, tenure: int, PhoneService: string, MultipleLines: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, DeviceProtection: string, TechSupport: string, StreamingTV: string, StreamingMovies: string, Contract: string, PaperlessBilling: string, PaymentMethod: string, MonthlyCharges: double, TotalCharges: string, Churn: string]

In [11]:
teleCus_df1 = teleCus_df.na.drop()

In [12]:
from pyspark.sql.types import FloatType
teleCus_df2 = teleCus_df1.withColumn('TotalCharges',col('TotalCharges').cast(FloatType()))

In [13]:
teleCus_df2.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: float (nullable = true)
 |-- Churn: string (nullable = true)



In [14]:
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator, VectorAssembler
from pyspark.ml import Pipeline

In [15]:
print(teleCus_df.columns)

['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn']


In [16]:
categorical_cols = ['gender', 'SeniorCitizen', 'Partner', 'Dependents', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod']

stages1 = []
for catcols in categorical_cols:
    stringIndexer = StringIndexer(inputCol=catcols,outputCol=catcols+"Index")
    oneHotEncoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                          outputCols=[catcols+"classVec"])
    
    stages1+= [stringIndexer,oneHotEncoder]

In [18]:
teleCus_df2.select([count(when(isnull(col),col)).alias(col) for col in teleCus_df2.columns]).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0| 

In [19]:
teleCus_df2 = teleCus_df2.na.drop()

In [20]:
teleCus_df2.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: float (nullable = true)
 |-- Churn: string (nullable = true)



In [21]:
numericCols = ['tenure','MonthlyCharges','TotalCharges']

assemblerInputs = [c+"classVec" for c in categorical_cols]+numericCols

assembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')

stages1 += [assembler]

In [22]:
pipeline = Pipeline(stages=stages1)
pipelinemodel = pipeline.fit(teleCus_df2)
teleCus_df2 = pipelinemodel.transform(teleCus_df2)
selectedcols = ["churn","features"]
teleCus_df2 = teleCus_df2.select(selectedcols)


+-----+--------------------+
|churn|            features|
+-----+--------------------+
|   No|(30,[1,3,8,9,12,1...|
|   No|(30,[0,1,2,3,4,5,...|
|  Yes|[1.0,1.0,1.0,1.0,...|
+-----+--------------------+
only showing top 3 rows



In [56]:
teleCus_df2 = teleCus_df2.withColumn("Churn",regexp_replace('Churn','Yes',"1"))
teleCus_df2 = teleCus_df2.withColumn("Churn",regexp_replace('Churn','No',"0"))

In [62]:
teleCus_df2.show()

+-----+--------------------+
|Churn|            features|
+-----+--------------------+
|    0|(30,[1,3,8,9,12,1...|
|    0|(30,[0,1,2,3,4,5,...|
|    1|[1.0,1.0,1.0,1.0,...|
|    0|(30,[0,1,2,3,8,10...|
|    1|(30,[1,2,3,4,5,7,...|
|    1|(30,[1,2,3,4,6,7,...|
|    0|(30,[0,1,2,4,6,7,...|
|    0|(30,[1,2,3,8,10,1...|
|    1|(30,[1,3,4,6,7,9,...|
|    0|(30,[0,1,2,4,5,8,...|
|    0|(30,[0,1,4,5,8,10...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,3,4,6,7,...|
|    1|[1.0,1.0,1.0,1.0,...|
|    0|[1.0,1.0,1.0,1.0,...|
|    0|(30,[1,4,6,7,10,1...|
|    0|(30,[1,2,3,4,5,25...|
|    0|(30,[0,1,2,4,6,7,...|
|    1|(30,[1,4,5,8,9,11...|
|    0|(30,[1,2,3,4,5,7,...|
+-----+--------------------+
only showing top 20 rows



### split and train

In [60]:
train.select(['churn','features'])

DataFrame[churn: string, features: vector]

In [63]:
from pyspark.sql.types import IntegerType
tele_cus3 = teleCus_df2.withColumn('churn',col('churn').cast(IntegerType()))

In [101]:
train,test = tele_cus3.randomSplit([0.8,0.2] ,seed=432)

In [102]:
train.select(['churn','features'])

DataFrame[churn: int, features: vector]

In [103]:
train.show()

+-----+--------------------+
|churn|            features|
+-----+--------------------+
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
|    0|(30,[0,1,2,3,4,5,...|
+-----+--------------------+
only showing top 20 rows



In [104]:
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
tree = DecisionTreeClassifier(featuresCol = 'features',labelCol='churn')
model = tree.fit(train)

In [105]:
#predictions on test data
predict = model.transform(test)

In [83]:
predict.select(['probability','prediction']).show(truncate=False)

+----------------------------------------+----------+
|probability                             |prediction|
+----------------------------------------+----------+
|[0.365814696485623,0.634185303514377]   |1.0       |
|[0.365814696485623,0.634185303514377]   |1.0       |
|[0.6186770428015564,0.38132295719844356]|0.0       |
|[0.723404255319149,0.2765957446808511]  |0.0       |
|[0.365814696485623,0.634185303514377]   |1.0       |
|[0.365814696485623,0.634185303514377]   |1.0       |
|[0.365814696485623,0.634185303514377]   |1.0       |
|[0.7641509433962265,0.2358490566037736] |0.0       |
|[0.4764705882352941,0.5235294117647059] |1.0       |
|[0.6186770428015564,0.38132295719844356]|0.0       |
|[0.6186770428015564,0.38132295719844356]|0.0       |
|[0.7641509433962265,0.2358490566037736] |0.0       |
|[0.6186770428015564,0.38132295719844356]|0.0       |
|[0.9024390243902439,0.0975609756097561] |0.0       |
|[0.8157560355781448,0.18424396442185514]|0.0       |
|[0.5056603773584906,0.49433

In [106]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='churn', predictionCol='prediction',
                                 metricName='accuracy')
accuracy1 = evaluator.evaluate(predict)

In [107]:
accuracy1

0.7718168812589413

Logistic regression

In [108]:
logit = LogisticRegression(featuresCol='features',labelCol='churn')
logit_model = logit.fit(train)

In [109]:
log_predictions = logit_model.transform(test)

In [110]:
evaluator = MulticlassClassificationEvaluator(labelCol='churn', predictionCol='prediction',
                                 metricName='accuracy')
accuracy2 = evaluator.evaluate(log_predictions)

In [111]:
accuracy2

0.7982832618025751

#### Random Forest

In [112]:
from pyspark.ml.classification import RandomForestClassifier

In [113]:
rf = RandomForestClassifier(featuresCol='features',labelCol='churn')
rf_model = logit.fit(train)

In [114]:
rf_predictions = rf_model.transform(test)

In [115]:
evaluator = MulticlassClassificationEvaluator(labelCol='churn', predictionCol='prediction',
                                 metricName='accuracy')
accuracy3 = evaluator.evaluate(rf_predictions)

In [116]:
accuracy3

0.7982832618025751