In [1]:
sc

In [2]:
spark

In [3]:
tele_data = spark.read.csv("file:///home/hadoop/Downloads/Telco_Customer_Churn.csv", inferSchema=True, header=True)
tele_data.cache()

DataFrame[customerID: string, gender: string, SeniorCitizen: int, Partner: string, Dependents: string, tenure: int, PhoneService: string, MultipleLines: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, DeviceProtection: string, TechSupport: string, StreamingTV: string, StreamingMovies: string, Contract: string, PaperlessBilling: string, PaymentMethod: string, MonthlyCharges: double, TotalCharges: string, Churn: string]

In [4]:
tele_data.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [5]:
tele_data.toPandas().head()

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


### Data Exploration

a) How many records are there in the database

In [6]:
tele_data.count()

7043

b) What is the distribution of gender among customers?

In [7]:
tele_data.groupBy('gender').count().show()

+------+-----+
|gender|count|
+------+-----+
|Female| 3488|
|  Male| 3555|
+------+-----+



c) What is the distribution of contact types?

In [8]:
tele_data.groupBy('contract').count().show()

+--------------+-----+
|      contract|count|
+--------------+-----+
|Month-to-month| 3875|
|      One year| 1473|
|      Two year| 1695|
+--------------+-----+



d) What is the percentage of customers who chruned?

In [9]:
total_customer = tele_data.count()
churned_cust = tele_data.filter(tele_data.Churn == 'Yes').count()
churned_percentage = (churned_cust/total_customer)*100 
churned_percentage

26.536987079369588

### Data Processing

In [10]:
from pyspark.sql.functions import *

In [11]:
tele_data.select([count(when(isnull(col), col)).alias(col) for col in tele_data.columns]).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0| 

In [12]:
tele_data = tele_data.withColumn('TotalCharges', when(col('TotalCharges') == " ", None).otherwise(col('TotalCharges')))

In [13]:
tele_data.select([count(when(isnull(col), col)).alias(col) for col in tele_data.columns]).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0| 

In [14]:
tele_data1 = tele_data.na.drop()

In [25]:
tele_data1.count()

7032

In [18]:
from pyspark.sql.types import FloatType
tele_data1 = tele_data1.withColumn('TotalCharges', col('TotalCharges').cast(FloatType()))

In [19]:
tele_data1.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: float (nullable = true)
 |-- Churn: string (nullable = true)



f.Convert categorical variables into numberical format using one-hot encoder or label encoding

In [64]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml import Pipeline

In [65]:
print(tele_data1.columns)

['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn']


In [66]:
categorical_cols = ['gender', 'SeniorCitizen', 'Partner', 'Dependents', 'PhoneService', 'MultipleLines',\
                    'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', \
                    'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod']

#use StringIndexer to convert catergorical column to numerical indexes
stages = []
for catcols in categorical_cols:
    stringindexer = StringIndexer(inputCol=catcols, outputCol=catcols+"Index")
    onehotencoder = OneHotEncoderEstimator(inputCols=[stringindexer.getOutputCol()],
                                          outputCols=[catcols+"classVec"])
    stages += [stringindexer, onehotencoder]

In [67]:
numericalCols = ['tenure','MonthlyCharges', 'TotalCharges']
assemblerInput = [c + "classVec" for c in categorical_cols] + numericalCols
assembler = VectorAssembler(inputCols=assemblerInput, outputCol= 'features')
stages += [assembler]

In [69]:
label_String = StringIndexer(inputCol = 'Churn', outputCol='label')
stages += [label_String]

In [70]:
pipeline = Pipeline(stages=stages)
churn_df = pipeline.fit(tele_data1).transform(tele_data1)
selectedCols = ['Churn','features']
churn_df = churn_df.select(selectedCols)
churn_df.show()

+-----+--------------------+
|Churn|            features|
+-----+--------------------+
|   No|(30,[1,3,8,9,12,1...|
|   No|(30,[0,1,2,3,4,5,...|
|  Yes|[1.0,1.0,1.0,1.0,...|
|   No|(30,[0,1,2,3,8,10...|
|  Yes|(30,[1,2,3,4,5,7,...|
|  Yes|(30,[1,2,3,4,6,7,...|
|   No|(30,[0,1,2,4,6,7,...|
|   No|(30,[1,2,3,8,10,1...|
|  Yes|(30,[1,3,4,6,7,9,...|
|   No|(30,[0,1,2,4,5,8,...|
|   No|(30,[0,1,4,5,8,10...|
|   No|(30,[0,1,2,3,4,5,...|
|   No|(30,[0,1,3,4,6,7,...|
|  Yes|[1.0,1.0,1.0,1.0,...|
|   No|[1.0,1.0,1.0,1.0,...|
|   No|(30,[1,4,6,7,10,1...|
|   No|(30,[1,2,3,4,5,25...|
|   No|(30,[0,1,2,4,6,7,...|
|  Yes|(30,[1,4,5,8,9,11...|
|   No|(30,[1,2,3,4,5,7,...|
+-----+--------------------+
only showing top 20 rows



In [76]:
churn_df = churn_df.withColumn('Churn',regexp_replace('Churn','No',"0"))
churn_df = churn_df.withColumn('Churn',regexp_replace('Churn','Yes',"1"))

In [80]:
from pyspark.sql.types import IntegerType
churn_df = churn_df.withColumn('Churn',col('Churn').cast("integer"))

In [86]:
train, test = churn_df.randomSplit([0.8,0.2],seed=123)

In [85]:
train.show()

+-----+--------------------+-------+
|Churn|            features|Revenue|
+-----+--------------------+-------+
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
|    0|(30,[0,1,2,3,4,5,...|      0|
+-----+--------------------+-------+
only showing top 20 rows



In [84]:
train.select(['features','Churn'])

DataFrame[features: vector, Churn: int]

In [94]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, LogisticRegression
tree = DecisionTreeClassifier(featuresCol = 'features', labelCol= 'Churn')
decision_model= tree.fit(train)

In [88]:
predictions = decision_model.transform(test)

In [90]:
predictions.select(['features','Churn','prediction']).show()

+--------------------+-----+----------+
|            features|Churn|prediction|
+--------------------+-----+----------+
|(30,[0,1,2,3,4,5,...|    0|       0.0|
|(30,[0,1,2,3,4,5,...|    0|       0.0|
|(30,[0,1,2,3,4,5,...|    0|       0.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       0.0|
|(30,[0,1,2,3,4,5,...|    0|       0.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       0.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
|(30,[0,1,2,3,4,5,...|    0|       2.0|
+--------------------+-----+----------+
only showing top 20 rows



In [91]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='Churn', predictionCol='prediction',metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

In [92]:
accuracy

0.7924528301886793

### Logistic Regression

In [97]:
logit = LogisticRegression(featuresCol='features', labelCol='Churn')
logit_model = logit.fit(train)

In [98]:
predictions = logit_model.transform(test)

In [99]:
evaluator = MulticlassClassificationEvaluator(labelCol='Churn',predictionCol='prediction',metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
accuracy

0.8085255066387141

### RandomForest

In [100]:
rf_cls = RandomForestClassifier(featuresCol='features', labelCol='Churn')
rf_model = rf_cls.fit(train)

In [101]:
predictions = rf_model.transform(test)