In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
sc = SparkContext(appName='teleco-customer-churn')
spark = SparkSession.builder.getOrCreate()

KeyboardInterrupt: 

This notebook will work more as a guide to help us develop the script that will run on GCP, and since we will run it localy, I'll just grab a sample of around 1000 records, so we can do it faster.

In [None]:
customers_table = spark.read.csv('../data/WA_Fn-UseC_-Telco-Customer-Churn.csv', header='true', inferSchema='true')
customers_table_sample = customers_table.sample(withReplacement=False, fraction=0.15, seed=42)

In [None]:
customers_table_sample.count()

1102

We can start by applying the same steps we did on the analysis notebook to treat missing values and standardize column names.

In [None]:
customers_table_sample = customers_table_sample.withColumnRenamed('gender', 'Gender').withColumnRenamed('tenure', 'Tenure').withColumnRenamed('customerId', 'CustomerId')
customers_table_sample = customers_table_sample.replace(subset='TotalCharges', to_replace=' ', value='0.00')
customers_table_sample = customers_table_sample.withColumn('TotalCharges', customers_table_sample.TotalCharges.cast('double'))

In [None]:
customers_table_sample.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|CustomerId|Gender|SeniorCitizen|Partner|Dependents|Tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|6713-OKOMC|Female|            0|     No|        No|    10|  

In [None]:
customers_table_sample.distinct()

DataFrame[CustomerId: string, Gender: string, SeniorCitizen: int, Partner: string, Dependents: string, Tenure: int, PhoneService: string, MultipleLines: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, DeviceProtection: string, TechSupport: string, StreamingTV: string, StreamingMovies: string, Contract: string, PaperlessBilling: string, PaymentMethod: string, MonthlyCharges: double, TotalCharges: double, Churn: string]

### Pre-processing
---
First, we'll drop de Id column, since it doesn't present any predictive value. Then we'll convert the categorical string variables into numeric variables.

In [None]:
customers_table_sample = customers_table_sample.drop('CustomerId')

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression

In [None]:
string_variables = [variable[0] for variable in customers_table_sample.dtypes if variable[1] == 'string']
output_string_variables = [variable+'_numeric' for variable in string_variables]
rename_columns_dic = {output_string_variables[index]:string_variables[index] for index in range(len(string_variables))}

indexer_model = StringIndexer(inputCols=string_variables, outputCols=output_string_variables)
indexer_fitted = indexer_model.fit(customers_table_sample)
numeric_customers_table = indexer_fitted.transform(customers_table_sample)

numeric_customers_table = numeric_customers_table.drop(*string_variables)
numeric_customers_table = numeric_customers_table.withColumnsRenamed(rename_columns_dic)

numeric_customers_table.show()

+-------------+------+--------------+------------+------+-------+----------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+-----+
|SeniorCitizen|Tenure|MonthlyCharges|TotalCharges|Gender|Partner|Dependents|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|Churn|
+-------------+------+--------------+------------+------+-------+----------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+-----+
|            0|    10|         29.75|       301.9|   0.0|    0.0|       0.0|         1.0|          2.0|            1.0|           1.0|         0.0|             0.0|        0.0|        1.0|            0.0|     0.0|             1.0|          3.0|  0

In [None]:
numeric_customers_table.dtypes

[('SeniorCitizen', 'int'),
 ('Tenure', 'int'),
 ('MonthlyCharges', 'double'),
 ('TotalCharges', 'double'),
 ('Gender', 'double'),
 ('Partner', 'double'),
 ('Dependents', 'double'),
 ('PhoneService', 'double'),
 ('MultipleLines', 'double'),
 ('InternetService', 'double'),
 ('OnlineSecurity', 'double'),
 ('OnlineBackup', 'double'),
 ('DeviceProtection', 'double'),
 ('TechSupport', 'double'),
 ('StreamingTV', 'double'),
 ('StreamingMovies', 'double'),
 ('Contract', 'double'),
 ('PaperlessBilling', 'double'),
 ('PaymentMethod', 'double'),
 ('Churn', 'double')]

Cool, we got all the variables set as numeric values. We will now create our first model so we can use it as a baseline. I don't expect it to be the most accurate, but after that we can dig more into other pre-processing techniques that will later on improve the accuracy.

I'll just create a few functions that will help us pre-process and evaluate the models

In [None]:
from pyspark.sql.functions import rand
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
def train_test_splitter(dataframe, test_ratio = 0.7, seed=42):
    pre_split_dataframe = dataframe.withColumn('train_test_index', rand(seed=seed))
    
    train_dataframe = pre_split_dataframe.filter(pre_split_dataframe.train_test_index <= test_ratio)
    test_dataframe = pre_split_dataframe.filter(pre_split_dataframe.train_test_index > test_ratio)

    train_dataframe = train_dataframe.drop('train_test_index')
    test_dataframe = test_dataframe.drop('train_test_index')

    print(f'Rows on train dataframe: {train_dataframe.count()}\nRows on test dataframe: {test_dataframe.count()}')
    return train_dataframe, test_dataframe


def vectorize_dataframe(dataframe, label):
    features_cols = dataframe.drop(label).columns

    vecAssembler = VectorAssembler(inputCols=features_cols, outputCol='features')
    vectorized_df = vecAssembler.transform(dataframe)
    vectorized_df = vectorized_df.drop(*features_cols)

    return vectorized_df

def evaluate_model(model, dataframe):
    prediction = model.transform(dataframe)

    evaluator = MulticlassClassificationEvaluator(labelCol='Churn', metricName='f1', metricLabel=1.0)
    f1_score = evaluator.evaluate(prediction)
    accuracy_score = evaluator.evaluate(prediction, {evaluator.metricName:'accuracy'})
    recall_score = evaluator.evaluate(prediction, {evaluator.metricName:'recallByLabel'})

    confusion_matrix = prediction.groupBy('Churn', 'prediction').count()

    return f1_score, accuracy_score, recall_score, confusion_matrix

Before we can create our model, we need to do a train-test split. I won't be using RandomSplit() as it presents some unstable results. [You can read about it here](https://sergei-ivanov.medium.com/why-you-should-not-use-randomsplit-in-pyspark-to-split-data-into-train-and-test-58576d539a36). Instead, we'll create a column with random values and filter it, and then we'll vectorize those datasets so we can have them ready for training.

In [None]:
first_train_df, first_test_df = train_test_splitter(numeric_customers_table)
vectorized_train_df = vectorize_dataframe(first_train_df, label='Churn')
vectorized_test_df = vectorize_dataframe(first_test_df, label='Churn')

Rows on train dataframe: 815
Rows on test dataframe: 287


In [None]:
vectorized_train_df.show(truncate=False)

+-----+-------------------------------------------------------------------------------------+
|Churn|features                                                                             |
+-----+-------------------------------------------------------------------------------------+
|0.0  |(19,[1,2,3,7,8,9,10,14,17,18],[10.0,29.75,301.9,1.0,2.0,1.0,1.0,1.0,1.0,3.0])        |
|0.0  |[0.0,52.0,20.65,1022.95,0.0,0.0,0.0,0.0,0.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,1.0,3.0] |
|0.0  |(19,[1,2,3,4,8,9,10,13,15,16,18],[52.0,79.75,4217.8,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|0.0  |(19,[0,1,2,3,8,11],[1.0,43.0,90.25,3838.75,1.0,1.0])                                 |
|0.0  |[0.0,34.0,24.95,894.3,0.0,1.0,1.0,0.0,1.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,1.0,1.0,1.0]   |
|0.0  |(19,[1,2,3,4,5,8,11,14,16],[47.0,78.9,3650.35,1.0,1.0,1.0,1.0,1.0,2.0])              |
|0.0  |[0.0,46.0,19.95,927.1,0.0,1.0,1.0,0.0,0.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,1.0,0.0,3.0]   |
|0.0  |(19,[1,2,3,4,5,6,11,12,14,17,18],[30.0,82.05,2570.2,1

In [None]:
lr_base = LogisticRegression(labelCol='Churn')
lr_trained_model = lr_base.fit(vectorized_train_df)

In [None]:
train_f1, train_accuracy, train_recall, train_conf_mat = evaluate_model(lr_trained_model, vectorized_train_df)
train_conf_mat.show()
print(f'F1-Score: {round(train_f1, 4)}\nAccuracy: {round(train_accuracy, 4)}\nRecall: {round(train_recall, 4)}')

+-----+----------+-----+
|Churn|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  125|
|  0.0|       1.0|   63|
|  1.0|       0.0|   92|
|  0.0|       0.0|  535|
+-----+----------+-----+

F1-Score: 0.8053
Accuracy: 0.8098
Recall: 0.576


In [None]:
test_f1, test_accuracy, test_recall, test_conf_mat = evaluate_model(lr_trained_model, vectorized_test_df)
test_conf_mat.show()
print(f'F1-Score: {round(test_f1, 4)}\nAccuracy: {round(test_accuracy, 4)}\nRecall: {round(test_recall, 4)}')

+-----+----------+-----+
|Churn|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   39|
|  0.0|       1.0|   28|
|  1.0|       0.0|   33|
|  0.0|       0.0|  187|
+-----+----------+-----+

F1-Score: 0.7849
Accuracy: 0.7875
Recall: 0.5417


Great, we have our first model! Let's take a look at the training dataset metrics.
The first thing it tells us is that we have both a F1 score and accuracy of 78%. That's good for a first model.

When we look at the confusion matrix and the recall value, we can see that of all the customers that left the company, we could only predict 54%, which means that almost half the customers could churn just beneath our radars. It also tells us that our model learned way more about negative outcomes that about positive outcomes, that could be because of the labels unbalancing on the train data.

One thing that we need to point out is **how important the recall metric is** in this case. Since we are trying to predict customers close to leaving the company, a false negative means we couldn't anticipate a churn. So we need to get those false negatives as low as possible. And a higher recall means fewer FN's.

There are a couple ways we can deal with an unbalanced dataset, we can oversample it, undersample it, and we can also use cross-validation along with those two options. For now we'll just use undersample, but when we move to the cloud platform we can use a more robust approach.

In [None]:
vectorized_train_df.groupBy('Churn').count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|  0.0|  598|
|  1.0|  217|
+-----+-----+



In [None]:
undersampled_0_label = vectorized_train_df.filter('Churn == 0').sample(0.4)
undersampled_train_df = undersampled_0_label.union(vectorized_train_df.filter('Churn == 1'))
undersampled_train_df.groupBy('Churn').count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|  0.0|  225|
|  1.0|  217|
+-----+-----+



Not a perfect solution, but we got a more balanced dataset, let's see how that affects our model.

In [None]:
undersampled_lr_model = lr_base.fit(undersampled_train_df)

In [None]:
test_f1, test_accuracy, test_recall, test_conf_mat = evaluate_model(undersampled_lr_model, vectorized_test_df)
test_conf_mat.show()
print(f'F1-Score: {round(test_f1, 4)}\nAccuracy: {round(test_accuracy, 4)}\nRecall: {round(test_recall, 4)}')

+-----+----------+-----+
|Churn|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   57|
|  0.0|       1.0|   68|
|  1.0|       0.0|   15|
|  0.0|       0.0|  147|
+-----+----------+-----+

F1-Score: 0.7294
Accuracy: 0.7108
Recall: 0.7917


So we got a great improvement on the recall value! We went from predicting 54% of churn cases, to predicting 79%. We also had a little drop in the accuracy and F1 score. That might have happened due to the undersample, which affected the model's ability to predict negative churn cases. Although it's not ideal to have a higher FP rate, in our case, it's better to have a high FP rate, than a high FN rate.

Let's create a model using cross-validation and undersampling and compare the results.

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
def create_cross_validated_model(dataframe, estimator, params, evaluator):
    vectorized_data = vectorize_dataframe(dataframe, label='Churn')

    cv = CrossValidator(estimator=estimator, evaluator=evaluator, estimatorParamMaps=params,numFolds=3, parallelism=2)
    cvModel = cv.fit(vectorized_data)

    return cvModel

In [None]:
undersampled_0_label = numeric_customers_table.filter('Churn == 0').sample(0.4, seed=42)
undersampled_df = undersampled_0_label.union(numeric_customers_table.filter('Churn == 1'))
params_grid = ParamGridBuilder().addGrid(lr_base.maxIter, [75, 100, 150, 200, 250]).build()
evaluator = MulticlassClassificationEvaluator(labelCol='Churn', metricName='recallByLabel', metricLabel=1.0)

trained_cvmodel = create_cross_validated_model(undersampled_df, lr_base, params_grid,evaluator)

In [None]:
test_f1, test_accuracy, test_recall, test_conf_mat = evaluate_model(trained_cvmodel, vectorized_test_df)
test_conf_mat.show()
print(f'F1-Score: {round(test_f1, 4)}\nAccuracy: {round(test_accuracy, 4)}\nRecall: {round(test_recall, 4)}')

+-----+----------+-----+
|Churn|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   57|
|  0.0|       1.0|   55|
|  1.0|       0.0|   15|
|  0.0|       0.0|  160|
+-----+----------+-----+

F1-Score: 0.7701
Accuracy: 0.7561
Recall: 0.7917


Here we already got some interesting results! We could see the power of undersampling and cross-validating, and now we can start building our .py script that will be running on our cluster.

In [None]:
param = 'getMaxIter()'
trained_cvmodel.bestModel._java_obj.getMaxIter()

75

In [None]:
trained_cvmodel.bestModel.extractParamMap()

{Param(parent='LogisticRegression_521bdbf7db6a', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_521bdbf7db6a', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_521bdbf7db6a', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_521bdbf7db6a', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_521bdbf7db6a', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_521bdbf7db6a', name='labelCol', doc='label column name.'): 'Churn',
 Param(parent='LogisticRegression_521bdbf7db6a', name='maxBlockSizeInMB', doc='maximum memory in MB for s