#### Predicting Churn for one of the Telecom Company

Customer Churn Overview Companies invest significantly to acquire new customers. So, after acquisition, companies would like these customers to stay long and be loyal. Companies also make investments to keep engaging with these customers continuously and ensure they are happy and satisfied with their offerings. Losing customers mean loss of investment and loss of possible future revenue. So, it is important for companies to infer early signs of a customer about to churn and engage or offer incentives to retain them. But as targeting each customer is not possible, companies can take advantage of analytics to predict if a customer high probability of churning. A possible intervention can be made to retain the customer. For this exercise, we have taken the data from below link: 
    
https://www.kaggle.com/becksddf/churn-in-telecoms-dataset/version/1

#### Import Libraries

In [1]:
import os
import findspark 
findspark.init()
import pyspark
from time import time
from subprocess import check_output
from pyspark.sql import (SparkSession, Row)
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import (StringIndexer, StandardScaler)
from pyspark.ml.tuning import (CrossValidator, ParamGridBuilder)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.linalg import Vectors

#### Data Pre-processing

In [2]:
# load spark session
sc = SparkSession\
    .builder\
    .master("local[*]")\
    .appName('customer_churn')\
    .getOrCreate()

In [3]:
df = sc.read.load('C:\\Data Science\\Certifications\\AnalytixLabs\\Big Data-Hadoop-Spark\\Class 19-20 Files\\Spark Case Studies - Assignments\\Predicting Customer Churn\\churn.csv', 
                  format='com.databricks.spark.csv', 
                  header='true', 
                  inferSchema='true').cache()
df.printSchema()

root
 |-- state: string (nullable = true)
 |-- account length: integer (nullable = true)
 |-- area code: integer (nullable = true)
 |-- phone number: string (nullable = true)
 |-- international plan: string (nullable = true)
 |-- voice mail plan: string (nullable = true)
 |-- number vmail messages: integer (nullable = true)
 |-- total day minutes: double (nullable = true)
 |-- total day calls: integer (nullable = true)
 |-- total day charge: double (nullable = true)
 |-- total eve minutes: double (nullable = true)
 |-- total eve calls: integer (nullable = true)
 |-- total eve charge: double (nullable = true)
 |-- total night minutes: double (nullable = true)
 |-- total night calls: integer (nullable = true)
 |-- total night charge: double (nullable = true)
 |-- total intl minutes: double (nullable = true)
 |-- total intl calls: integer (nullable = true)
 |-- total intl charge: double (nullable = true)
 |-- customer service calls: integer (nullable = true)
 |-- churn: boolean (nullable 

In [4]:
# check sample data
df.toPandas().head(10).transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
state,KS,OH,NJ,OH,OK,AL,MA,MO,LA,WV
account length,128,107,137,84,75,118,121,147,117,141
area code,415,415,415,408,415,510,510,415,408,415
phone number,382-4657,371-7191,358-1921,375-9999,330-6626,391-8027,355-9993,329-9001,335-4719,330-8173
international plan,no,no,no,yes,yes,yes,no,yes,no,yes
voice mail plan,yes,yes,no,no,no,no,yes,no,no,yes
number vmail messages,25,26,0,0,0,0,24,0,0,37
total day minutes,265.1,161.6,243.4,299.4,166.7,223.4,218.2,157,184.5,258.6
total day calls,110,123,114,71,113,98,88,79,97,84
total day charge,45.07,27.47,41.38,50.9,28.34,37.98,37.09,26.69,31.37,43.96


In [5]:
# drop some columns that are not needed for prediction
df = df.drop(
    "State", "Area code", "Total day charge",
    "Total night charge", "Total intl charge",
    "phone number")

# cast churn label from boolean to string
df = df.withColumn("Churn", df["Churn"].cast("string"))

In [6]:
# turn categoric data to numerical
def toNumeric(df):
    cols = ["Churn", "international plan", "voice mail plan"]
    for col in cols:
        df = StringIndexer(
            inputCol=col,
            outputCol=col+"_indx")\
            .fit(df)\
            .transform(df)\
            .drop(col)\
            .withColumnRenamed(col+"_indx", col)
    return df

df = toNumeric(df).cache()

In [7]:
# check label proportions
df.groupBy("Churn").count().toPandas()

Unnamed: 0,Churn,count
0,0.0,2850
1,1.0,483


In [8]:
# perform some down-sampling

df = df.sampleBy(
    "Churn", 
    fractions={
        0: 483./2850,
        1: 1.0
    }).cache()
df.groupBy("Churn").count().toPandas()

Unnamed: 0,Churn,count
0,0.0,503
1,1.0,483


#### Feature processing

In [9]:
feature_cols = df.columns
feature_cols.remove("Churn")

# make label as last column
df = df[feature_cols + ["Churn"]]

# vectorize labels and features
row = Row("label", "features")
df_vec = df.rdd.map(
    lambda r: (row(r[-1], Vectors.dense(r[:-1])))
).toDF()
df_vec.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[137.0,0.0,243.4,...|
|  0.0|[117.0,0.0,184.5,...|
|  1.0|[65.0,0.0,129.1,1...|
|  1.0|[161.0,0.0,332.9,...|
|  1.0|[77.0,0.0,62.4,89...|
+-----+--------------------+
only showing top 5 rows



In [10]:
# normalize the features

df_vec = StandardScaler(
    inputCol="features",
    outputCol="features_norm",
    withStd=True,
    withMean=True)\
    .fit(df_vec)\
    .transform(df_vec)\
    .drop("features")\
    .withColumnRenamed("features_norm", "features")

In [11]:
# split data into train/test

train, test = df_vec.randomSplit([0.8, 0.2])
print("Train values: '{}'".format(train.count()))
print("Test values: '{}'".format(test.count())) 

Train values: '780'
Test values: '206'


####  Build a model

In [12]:
start_time = time()
r_forest = RandomForestClassifier(
    numTrees = 100,
    labelCol = "label"
)
rf_model = r_forest.fit(train)

print("Training time taken: {0:.4f}(min)".format((time() - start_time)/60))

Training time taken: 0.1291(min)


#### Model evaluation and tuning

In [13]:
predictions = rf_model.transform(test)
acc = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction",
    labelCol="label",
    metricName="areaUnderROC")\
    .evaluate(predictions)
print("Accuracy (binary): '{0:.4f}%'".format(acc*100))

Accuracy (binary): '90.3208%'


In [14]:
#tune best performing model: random forest
paramGrid = ParamGridBuilder()\
    .addGrid(r_forest.maxDepth, [5,10,15,20,25,30])\
    .addGrid(r_forest.numTrees, [30, 60, 90, 120, 150, 180, 200])\
    .build()

#define evaluation metric
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", 
    metricName="areaUnderROC"
)

#start tuning
cv = CrossValidator(
    estimator=r_forest, 
    estimatorParamMaps=paramGrid, 
    evaluator=evaluator, 
    numFolds=5
)

#start timer
cv_start_time = time()

#fit tuned model
cvModel = cv.fit(train)

#calculate time taken to tune prameters
print("Hyper-param tuning time taken: '{0:.2}' (min)".format((time() - cv_start_time)/60))

Hyper-param tuning time taken: '1.2e+01' (min)


In [15]:
# Accuracy after tuning

train_pred = cvModel.transform(train)
test_pred = cvModel.transform(test)

print("Random forest accuracy (train): {0:.4f}%".format((evaluator.evaluate(train_pred))*100))
print("Random forest accuracy (test): {0:.4f}%".format((evaluator.evaluate(test_pred))*100))

Random forest accuracy (train): 99.9829%
Random forest accuracy (test): 90.8491%
