This project is targeted on the telecommunication industry aiming at attempting to predict customer churn using big data tool. What are the chances that a customer will leave and stop using our services or stay and continue consuming our services. What is the significance of predicting customer churn? Like in any other business predicting potential bad events happening is paramount in strategizing a solution before the problem occurring. This adds value to the business in that it prepares the business to fair its storms and continue providing customers with excellent service. It also creates opportunities to learn and understand consumer behavior. 
In predicting customer churn will utilize Spark Python DataFrame API and incorporate jupyter notebook for data exploration. We will also utilize a machine learning library such as random forest, Gradient Boosted Tree Classifier and potentially use other algorithms within MLLib library to achieve our objective of prediction.
We will use the famous data set used for telecommunication demonstration and the dataset was downloaded from https://raw.githubusercontent.com/navdeep-G/customer-churn/master/data/TelcoChurn.csv.



**Set up the enviroment**

In [3]:
# Installing Spark
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (91.189.88.24)] [Connecting to security.ub0% [Connecting to archive.ubuntu.com (91.189.88.24)] [Connecting to security.ub0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.24)]                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.24)]                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Hit:4 https://developer.download.nvidia.com/comp

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

**Uploading the dataset**

In [5]:
!wget https://raw.githubusercontent.com/navdeep-G/customer-churn/master/data/TelcoChurn.csv

--2020-03-09 10:15:32--  https://raw.githubusercontent.com/navdeep-G/customer-churn/master/data/TelcoChurn.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 970457 (948K) [text/plain]
Saving to: ‘TelcoChurn.csv’


2020-03-09 10:15:33 (12.8 MB/s) - ‘TelcoChurn.csv’ saved [970457/970457]



In [51]:
!ls


churn.csv    spark-2.3.1-bin-hadoop2.7	    spark-warehouse
sample_data  spark-2.3.1-bin-hadoop2.7.tgz


In [0]:
# Changing the dataset name
!mv TelcoChurn.csv churn.csv

In [52]:
#Making sure the file was downloaded correctly
churn_df = spark.read.csv('churn.csv',header=True,inferSchema=True)
churn_df.show(5)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

In [53]:
churn_df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [0]:
# Converting a string to double
churn_df = churn_df.withColumn("TotalCharges",churn_df["TotalCharges"].cast("double"))
churn_df = churn_df.withColumn("SeniorCitizen",churn_df["SeniorCitizen"].cast("double"))
churn_df = churn_df.withColumn("tenure",churn_df["tenure"].cast("double"))

In [55]:
churn_df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: double (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: double (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)



In [56]:
# How large is the dataset
churn_df.count()

7043

We are working with a large dataset with 7043 rows. We will use a sample for analysis purpose but when we start training the model we will use the entire dataset.

In [0]:
# Sampling without no replacement(False)
sample_df = churn_df.sample(False,0.1)

In [58]:
sample_df.count()

711

**Data Exploration**

In [59]:
#Predictive variable
sample_df.select("Churn").show()

+-----+
|Churn|
+-----+
|   No|
|  Yes|
|   No|
|   No|
|   No|
|   No|
|   No|
|   No|
|  Yes|
|  Yes|
|  Yes|
|   No|
|   No|
|   No|
|   No|
|   No|
|   No|
|   No|
|   No|
|   No|
+-----+
only showing top 20 rows



In [60]:
sample_df.describe(['Churn']).show()

+-------+-----+
|summary|Churn|
+-------+-----+
|  count|  711|
|   mean| null|
| stddev| null|
|    min|   No|
|    max|  Yes|
+-------+-----+



In [61]:
sample_df.describe(['tenure','MonthlyCharges','TotalCharges']).show()

+-------+------------------+------------------+------------------+
|summary|            tenure|    MonthlyCharges|      TotalCharges|
+-------+------------------+------------------+------------------+
|  count|               711|               711|               711|
|   mean|31.860759493670887| 65.32925457102674|2260.6234177215188|
| stddev|24.939962139807548|29.729356229792774|2271.9783255009074|
|    min|               1.0|              18.7|              19.1|
|    max|              72.0|            117.15|            8529.5|
+-------+------------------+------------------+------------------+



In [0]:
import pyspark.sql.functions as F
def count_missings(churn_df,sort=True):
    churn_df = churn_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in churn_df.dtypes if c_type not in ('timestamp', 'string', 'date')]).toPandas()

    if len(churn_df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return churn_df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return churn_df

In [63]:
count_missings(churn_df)

Unnamed: 0,count
TotalCharges,11
SeniorCitizen,0
tenure,0
MonthlyCharges,0


In [0]:
# MonthlyCharges will be probably highly correlated with TotalCharges and it has missing values. We will drop the variable TotalCharges.
churn = churn_df.drop(*['TotalCharges'])

In [65]:
churn.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
customerID,7043,,,0002-ORFBO,9995-HOTOH
gender,7043,,,Female,Male
SeniorCitizen,7043,0.1621468124378816,0.3686116056100135,0.0,1.0
Partner,7043,,,No,Yes
Dependents,7043,,,No,Yes
tenure,7043,32.37114865824223,24.559481023094442,0.0,72.0
PhoneService,7043,,,No,Yes
MultipleLines,7043,,,No,Yes
InternetService,7043,,,DSL,No


**Preparing Data for Machine Learning** 

In [0]:
# importing Libraries
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [0]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
catCols  = ["customerID","gender","Partner","Dependents","PhoneService","MultipleLines","InternetService","OnlineSecurity","OnlineBackup","DeviceProtection","TechSupport","StreamingTV","StreamingMovies","Contract","PaperlessBilling","PaymentMethod","Churn"]
stages = []
for categoricalCol in catCols:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'Churn', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['tenure', 'SeniorCitizen', 'MonthlyCharges']
assemblerInputs = [c + "classVec" for c in catCols] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

The code above is not my own work and full credit go to databricks'official site found here.[databricks'official](https://docs.databricks.com/applications/machine-learning/mllib/binary-classification-mllib-pipelines.html) The code indexes categorical features by utilizing StringIndexer and changes indexed categories into a one hot encoded features.



In [0]:
# Specifying machine learning pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(churn)
churn_df = pipelineModel.transform(churn)

In [71]:
pipelineModel

PipelineModel_4bf4a293bb49f6cc8873

In [72]:
churn

DataFrame[customerID: string, gender: string, SeniorCitizen: double, Partner: string, Dependents: string, tenure: double, PhoneService: string, MultipleLines: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, DeviceProtection: string, TechSupport: string, StreamingTV: string, StreamingMovies: string, Contract: string, PaperlessBilling: string, PaymentMethod: string, MonthlyCharges: double, Churn: string]

In [0]:
# Creating labels and features
selectedcols = ["label", "features"] 
churn = churn_df.select(selectedcols)

In [0]:
#Spit data into training and validation set
(trainingData, testData) = churn_df.randomSplit([0.7, 0.3],seed = 2020)

In [78]:
trainingData.count()

4951

In [79]:
testData.count()

2092

**Random Forest**

In [0]:
# Building a Random Forest Model
rf = RandomForestClassifier(labelCol="label",featuresCol="features", maxDepth=10, maxBins=32, numTrees=20)


In [0]:
#Training the model on training data
 rfModel = rf.fit(trainingData)

In [100]:
 rfModel

RandomForestClassificationModel (uid=RandomForestClassifier_4a03904b19cae2c84f69) with 20 trees

In [0]:
#Predicting on the test 
predictions = rfModel.transform(testData)


In [0]:
# Evaluating the model using Area under the curve
 evaluator = BinaryClassificationEvaluator()
 auc = evaluator.evaluate(predictions)



In [103]:
print("Area under the curve is : "+str(auc)+"s")

Area under the curve is : 0.8703076128802409s


In [106]:
# Model Evaluation using Area Under ROC
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))


Test Area Under ROC: 0.8149677416249168


**Gradient Boosted Tree Classifier**

In [112]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(trainingData)
predictions = gbtModel.transform(testData)
predictions.select('gender', 'tenure', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+------+------+-----+--------------------+----------+--------------------+
|gender|tenure|label|       rawPrediction|prediction|         probability|
+------+------+-----+--------------------+----------+--------------------+
|  Male|   9.0|  0.0|[1.32590267922033...|       0.0|[0.93412217565278...|
|  Male|  13.0|  1.0|[-1.3259026792203...|       1.0|[0.06587782434721...|
|  Male|  63.0|  0.0|[1.32590267922033...|       0.0|[0.93412217565278...|
|Female|   7.0|  0.0|[1.32590267922033...|       0.0|[0.93412217565278...|
|Female|  65.0|  0.0|[1.32590267922033...|       0.0|[0.93412217565278...|
|  Male|  54.0|  0.0|[1.32590267922033...|       0.0|[0.93412217565278...|
|Female|  34.0|  0.0|[1.32590267922033...|       0.0|[0.93412217565278...|
|Female|  50.0|  0.0|[1.32590267922033...|       0.0|[0.93412217565278...|
|Female|   3.0|  0.0|[1.32590267922033...|       0.0|[0.93412217565278...|
|Female|   4.0|  1.0|[-1.3259026792203...|       1.0|[0.06587782434721...|
+------+------+-----+----

In [113]:
# Evaluating Gradient Boosted Tree Classifier
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 1.0


Gradient Boosting Produces the best results with 93 % Accuracy