# SparkML-Customer Churn Prediction with sparkify dataset

### import libraries and data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max, split, udf
from pyspark.sql.types import IntegerType, DoubleType, BooleanType
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, GBTClassifier, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import Imputer, CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import BinaryClassificationMetrics

import re

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20190321155918-0000
KERNEL_ID = 55dd955c-2ce1-4b4c-8376-ca7486cb47e5


## If running on IBM watson studio:

In [2]:
import ibmos2spark
# @hidden_cell
credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'service_id': 'iam-ServiceId-dcd80924-823c-4e98-a4a1-9008ddc1a9a8',
    'iam_service_endpoint': 'https://iam.bluemix.net/oidc/token',
    'api_key': 'kV730WOitMi206CFroom3MFI13vBRa2kD8MNECWTx1tQ'
}

configuration_name = 'os_522f6938d423453eb1317cb06aa5d727_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

spark = SparkSession.builder.getOrCreate()
# Since JSON data can be semi-structured and contain additional metadata, it is possible that you might face issues with the DataFrame layout.
# Please read the documentation of 'SparkSession.read()' to learn more about the possibilities to adjust the data loading.
# PySpark documentation: http://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json

df = spark.read.json(cos.url('medium-sparkify-dataset.json', 'sparkify-donotdelete-pr-1hvb6rdsl1zaqx'))
df.take(2)


[Row(artist='Martin Orford', auth='Logged In', firstName='Joseph', gender='M', itemInSession=20, lastName='Morales', length=597.55057, level='free', location='Corpus Christi, TX', method='PUT', page='NextSong', registration=1532063507000, sessionId=292, song='Grand Designs', status=200, ts=1538352011000, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='293'),
 Row(artist="John Brown's Body", auth='Logged In', firstName='Sawyer', gender='M', itemInSession=74, lastName='Larson', length=380.21179, level='free', location='Houston-The Woodlands-Sugar Land, TX', method='PUT', page='NextSong', registration=1538069638000, sessionId=97, song='Bulls', status=200, ts=1538352025000, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='98')]

## If Running on local machine:

uncomment the following code block and input the data path to execute the notebook.

In [None]:

# PATH = #path to json data
# spark = SparkSession.builder \
#     .master("local") \
#     .appName("Sparkify project") \
#     .getOrCreate()
# # Since JSON data can be semi-structured and contain additional metadata, it is possible that you might face issues with the DataFrame layout.
# # Please read the documentation of 'SparkSession.read()' to learn more about the possibilities to adjust the data loading.
# # PySpark documentation: http://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json

# df = spark.read.json(PATH)
# df.persist()
# df.show(2)

## Data preprocessing

### Filter rows with len(userid) == 0

In [3]:
df = df.filter("length(userid) != 0")

In [4]:
df_valid = df.dropna(how = 'any', subset = ['userId', 'sessionId'])
df_valid.show(1)

+-------------+---------+---------+------+-------------+--------+---------+-----+------------------+------+--------+-------------+---------+-------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|          location|method|    page| registration|sessionId|         song|status|           ts|           userAgent|userId|
+-------------+---------+---------+------+-------------+--------+---------+-----+------------------+------+--------+-------------+---------+-------------+------+-------------+--------------------+------+
|Martin Orford|Logged In|   Joseph|     M|           20| Morales|597.55057| free|Corpus Christi, TX|   PUT|NextSong|1532063507000|      292|Grand Designs|   200|1538352011000|"Mozilla/5.0 (Mac...|   293|
+-------------+---------+---------+------+-------------+--------+---------+-----+------------------+------+--------+-------------+---------+-------------+------+-------------+---------

### Fill empty entries in  columns:
StringIndexer in pyspark cannot deal with empty entries. The empty entries in columns with string type should be filled with "None" to avoid exception. The numeric columns are filled with mean.

In [5]:
string_columns = ['artist', 'gender', 'location', 'song', 'userAgent']
num_columns = ['itemInSession', 'length']

In [6]:
def fill_string_null(df, columns):
    '''
        input: 
                df : spark data frame.
                columns : columns in df that are of string type.
        
        return: transformed data frame.
    '''
    for column in columns:
        df = df.withColumn(column, F.when(F.isnull(F.col(column)),"NONE").otherwise(F.col(column)))
    return df

def fill_num_null(df, columns):
    #transform to double type
    for column in columns:
        df = df.withColumn(column, df[column].cast(DoubleType()))
        
    imputer = Imputer(inputCols = columns, outputCols = columns)
    df = imputer.fit(df).transform(df)
    return df
        

In [7]:
df_valid = fill_string_null(df_valid, string_columns)

In [8]:
df_valid = fill_num_null(df_valid, num_columns)

# Exploratory Data Analysis

### Define Churn

Create a column `Churn` to use as the label for your model. `Cancellation Confirmation`,`Cancel`,`Submit Downgrade` and `Downgrade` events are considered to define churn.


### Explory visualization:

In [9]:
df_valid.filter('page == "Cancellation Confirmation" or page == "Downgrade" or page == "Cancel" or page == "Submit Downgrade"').show(2)

+------+---------+---------+------+-------------+--------+------------------+-----+-------------+------+---------+-------------+---------+----+------+-------------+--------------------+------+
|artist|     auth|firstName|gender|itemInSession|lastName|            length|level|     location|method|     page| registration|sessionId|song|status|           ts|           userAgent|userId|
+------+---------+---------+------+-------------+--------+------------------+-----+-------------+------+---------+-------------+---------+----+------+-------------+--------------------+------+
|  NONE|Logged In|    Sofia|     F|        268.0|  Gordon|248.66459278007738| paid|Rochester, MN|   GET|Downgrade|1533175710000|      162|NONE|   200|1538352336000|"Mozilla/5.0 (Mac...|   163|
|  NONE|Logged In|    Sofia|     F|        296.0|  Gordon|248.66459278007738| paid|Rochester, MN|   GET|Downgrade|1533175710000|      162|NONE|   200|1538358258000|"Mozilla/5.0 (Mac...|   163|
+------+---------+---------+------+

In [10]:
df_valid.select('page').distinct().show()

+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|                Home|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|               About|
|            Settings|
|     Add to Playlist|
|          Add Friend|
|            NextSong|
|           Thumbs Up|
|                Help|
|             Upgrade|
|               Error|
|      Submit Upgrade|
+--------------------+



### Add Churn for each sample

In [11]:
label_class = udf(lambda x : 1 if x in ["Cancellation Confirmation", "Downgrade", 'Submit Downgrade', 'Cancel'] else 0, IntegerType())
df_valid = df_valid.withColumn('Churn', label_class(df_valid.page))

In [12]:
df_valid.groupby(df_valid.Churn).count().show()

+-----+------+
|Churn| count|
+-----+------+
|    1|  4126|
|    0|523879|
+-----+------+



# Feature Engineering


We select the following fields as features to train our model:

* string_columns = ['artist', 'gender', 'location', 'song', 'userAgent']

* num_columns = ['itemInSession', 'length']

### Encode gender feature:

In [13]:
def encodeFeatures(df, columns):
    '''
        encode categorical features to numerical features.
        input:
            df: spark dataframe.
            columns: categorical feature columns.
    '''
    for column in columns:
        indexer = StringIndexer(inputCol = column, outputCol = column + "_encode")
        df = indexer.fit(df).transform(df)
    return df

In [14]:
df_valid = encodeFeatures(df_valid, string_columns)
df_valid.show(1)

+-------------+---------+---------+------+-------------+--------+---------+-----+------------------+------+--------+-------------+---------+-------------+------+-------------+--------------------+------+-----+-------------+-------------+---------------+-----------+----------------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|          location|method|    page| registration|sessionId|         song|status|           ts|           userAgent|userId|Churn|artist_encode|gender_encode|location_encode|song_encode|userAgent_encode|
+-------------+---------+---------+------+-------------+--------+---------+-----+------------------+------+--------+-------------+---------+-------------+------+-------------+--------------------+------+-----+-------------+-------------+---------------+-----------+----------------+
|Martin Orford|Logged In|   Joseph|     M|         20.0| Morales|597.55057| free|Corpus Christi, TX|   PUT|NextSong|1532063507000|      292|Grand Desig

### Generate feature column:

* string_columns = ['artist', 'gender', 'location', 'song', 'userAgent']
* num_columns = ['itemInSession', 'length']

In [15]:
df_valid = df_valid.drop('features')
df_valid = df_valid.drop('rawFeatures')
assembler = VectorAssembler(inputCols = ["artist_encode","gender_encode",\
                                       "location_encode","song_encode",\
                                       "userAgent_encode", "itemInSession",\
                                       "length"], outputCol = "rawFeatures")
df_valid = assembler.transform(df_valid)
# normalize features
scaler = Normalizer(inputCol="rawFeatures", outputCol="features")
df_valid = scaler.transform(df_valid)

In [16]:
df_valid.show(3)

+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+------------------+------+-------------+--------------------+------+-----+-------------+-------------+---------------+-----------+----------------+--------------------+--------------------+
|           artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|              song|status|           ts|           userAgent|userId|Churn|artist_encode|gender_encode|location_encode|song_encode|userAgent_encode|         rawFeatures|            features|
+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+------------------+------+-------------+--------------------+------+-----+-------------+-------------+---------------+-----------+----------------+--------------------+--------------

# Modeling
Split the full dataset into train, test sets. Evaluate the metrics of the various models.

In [63]:
#only paid users are of interest.
rest, validation = df_valid.filter(df_valid.level == 'paid').randomSplit([0.75, 0.25], seed=42)

### Add weight column to solve imbalanced dataset problem:


In [45]:
# Only use the information of training dataset to calculate the balanced class weight. 
# Do not use information from test dataset to avoid fake score.
def addWeight(df_train, df_test, gammas = [1]):
    '''
        add weight column to solve imbalanced problem.
        The weightCol field is used in logistic regression model.
        input:
            df_train : training data frame.
            df_test : test data frame.
            
        return:
            training and test dataframe after transformation. 
    '''
    total_instances = df_train.count()
    negative_instances = df_train.filter(df_train.Churn == 0).count()
    for gamma in gammas:
        balance_ratio = (total_instances - negative_instances * gamma) / total_instances
        print("gamma=%.3f, balance_ratio = %.3f"%(gamma, balance_ratio))
        add_weightCol = udf(lambda x : balance_ratio if x == 0 else (1 - balance_ratio), DoubleType())
        df_train = df_train.withColumn('weightCol%d'%(gamma*1000), add_weightCol(df_train.Churn))
        df_test = df_test.withColumn('weightCol%d'%(gamma*1000), add_weightCol(df_test.Churn))
    
    return df_train, df_test

In [64]:
rest, validation = addWeight(rest, validation, gammas = [0.8, 0.9, 0.95, 0.99, 1, 1.001])

gamma=0.800, balance_ratio = 0.208
gamma=0.900, balance_ratio = 0.109
gamma=0.950, balance_ratio = 0.059
gamma=0.990, balance_ratio = 0.020
gamma=1.000, balance_ratio = 0.010
gamma=1.001, balance_ratio = 0.009


## Experiments

### Logistic regression model

In [53]:
# Train a LogisticRegression model.
lgr = LogisticRegression(labelCol="Churn", featuresCol="features", \
                         weightCol = 'weightCol800', maxIter = 50, regParam = 0)

#pipeline = Pipeline(stages=[gbt])
pipeline = Pipeline(stages=[lgr])

# Train model.  This also runs the indexers.
model = pipeline.fit(rest)

# Make predictions.
predictions = model.transform(validation)

# Select example rows to display.
predictions.select("prediction", "Churn", "features").show(5)


+----------+-----+--------------------+
|prediction|Churn|            features|
+----------+-----+--------------------+
|       0.0|    0|[0.33881679736696...|
|       0.0|    0|[0.33896164916210...|
|       0.0|    0|[0.16668946441272...|
|       0.0|    0|[0.55517912002349...|
|       0.0|    0|[0.49187204269535...|
+----------+-----+--------------------+
only showing top 5 rows



### Metric Evalulation

In [21]:
def precision(tp, fp):
    #add one smoothness
    return tp / (tp + fp)

def recall(tp, fn):
    #add one smoothness
    return tp / (tp + fn)

def f1_score(tp, tn, fp, fn):
    pre = precision(tp, fp)
    rec = recall(tp, fn)
    return 2 * pre * rec / (pre + rec)

def accuracy(tp, tn, fp, fn):
    return (tp + tn) / (tp + tn + fp + fn)

In [51]:
def print_metrics(predictions):
    tp = predictions.filter(predictions.prediction == predictions.Churn).\
                 filter(predictions.prediction == 1).count()
    tn = predictions.filter(predictions.prediction == predictions.Churn).\
                     filter(predictions.prediction == 0).count()
    fp = predictions.filter(predictions.prediction != predictions.Churn).\
                     filter(predictions.prediction == 1).count()
    fn = predictions.filter(predictions.prediction != predictions.Churn).\
                     filter(predictions.prediction == 0).count()
    print("true positive = ", tp)
    print("true negative = ", tn)
    print("false positive = ", fp)
    print("false negative = ", fn)
    print("precision = ",precision(tp, fp))
    print("recall = ",recall(tp, fn))
    print("f1_score = ",f1_score(tp, tn, fp, fn))
    print("accuracy = ", accuracy(tp, tn, fp, fn))

In [54]:
print_metrics(predictions)

true positive =  0
true negative =  103407
false positive =  0
false negative =  1008


ZeroDivisionError: division by zero

## Error Analysis

Running the above code block throws division by zero error to us. The reason is that the class is highly imbalanced. It will easily get very high metrics by predicting all samples to be negative. To resolve this problem, we should select 'weightCol' with higher **gamma** value in fitting the model. e.g. 'weightCol990', 'weightCol1000'


In [55]:
# Train a LogisticRegression model.
lgr = LogisticRegression(labelCol="Churn", featuresCol="features", \
                         weightCol = 'weightCol1000', maxIter = 50, regParam = 0)

#pipeline = Pipeline(stages=[gbt])
pipeline = Pipeline(stages=[lgr])

# Train model.  This also runs the indexers.
model = pipeline.fit(rest)

# Make predictions.
predictions = model.transform(validation)

In [56]:
print_metrics(predictions)

true positive =  559
true negative =  95173
false positive =  8234
false negative =  449
precision =  0.06357329694074833
recall =  0.5545634920634921
f1_score =  0.11406999285787167
accuracy =  0.9168414499832399


### Linear support vector machine:

support vector machine is time-consuming to train.

In [None]:
svc = LinearSVC(labelCol="Churn", featuresCol="features", \
                        weightCol = 'weightCol1000', maxIter = 50, regParam = 0)

# Chain indexers and GBT in a Pipeline
#pipeline = Pipeline(stages=[gbt])
pipeline = Pipeline(stages=[svc])

# Train model. This also runs the indexers.
model = pipeline.fit(rest)

# Make predictions.
predictions_svm = model.transform(validation)

# Select example rows to display.
predictions_svm.select("prediction", "Churn", "features").show(5)

In [187]:
#Calculate four important metrics
tp = predictions_svm.filter(predictions_svm.prediction == predictions_svm.Churn).\
                 filter(predictions_svm.prediction == 1).count()
tn = predictions_svm.filter(predictions_svm.prediction == predictions_svm.Churn).\
                 filter(predictions_svm.prediction == 0).count()
fp = predictions_svm.filter(predictions_svm.prediction != predictions_svm.Churn).\
                 filter(predictions_svm.prediction == 1).count()
fn = predictions_svm.filter(predictions_svm.prediction != predictions_svm.Churn).\
                 filter(predictions_svm.prediction == 0).count()

In [188]:
#Print for important metrics
print("True Positive Samples : ", tp)
print("True Negative Samples : ", tn)
print("False Positive Samples : ", fp)
print("False Negative Samples : ", fn)

True Positive Samples :  1008
True Negative Samples :  87468
False Positive Samples :  15939
False Negative Samples :  0


In [None]:
print_metrics(predictions_svm)

### Tunning models using k-fold cross-validation

In [67]:
lgr = LogisticRegression(labelCol="Churn", featuresCol="features", \
                        threshold = 0.6, maxIter = 5, regParam = 0)

pipeline = Pipeline(stages=[lgr])


In [68]:
# Use k-fold cross validation to select best
paramGrid = ParamGridBuilder() \
    .addGrid(lgr.weightCol, ['weightCol800','weightCol900','weightCol950',\
                             'weightCol990','weightCol1000'])\
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(rawPredictionCol = 'prediction', \
                                                                  labelCol = 'Churn', metricName = 'areaUnderPR'),
                          numFolds=3)

In [69]:
# Fit cross validation model on training dataset
cvModel_q1 = crossval.fit(rest)

In [98]:
# prediction on validation dataset
predictions = cvModel_q1.transform(validation)

In [70]:
# select the hyperparameter with highest metric
cvModel_q1.avgMetrics

[0.009737378859114671,
 0.009737378859114671,
 0.009737378859114671,
 0.05115955214467296,
 0.04692467914905578]

### Cross validation conclusin:

This suggests that gamma = 0.99 gives us best result.

### Use best hyper-parameter to train model:

In [67]:
# Train a LogisticRegression model.
lgr = LogisticRegression(labelCol="Churn", featuresCol="features", \
                        weightCol = 'weightCol990', threshold = 0.6, maxIter = 50, regParam = 0)

#pipeline = Pipeline(stages=[gbt])
pipeline = Pipeline(stages=[lgr])

# Train model.  This also runs the indexers.
model = pipeline.fit(rest)

# Make predictions.
predictions = model.transform(validation)

# Select example rows to display.
predictions.select("prediction", "Churn", "features").show(5)


+----------+-----+--------------------+
|prediction|Churn|            features|
+----------+-----+--------------------+
|       0.0|    0|[2916.0,0.0,7.0,8...|
|       0.0|    0|[2916.0,0.0,57.0,...|
|       0.0|    0|[2916.0,0.0,25.0,...|
|       0.0|    0|[6571.0,1.0,48.0,...|
|       0.0|    0|[7109.0,1.0,101.0...|
+----------+-----+--------------------+
only showing top 5 rows



In [154]:
print_metrics(predictions)

precision =  0.05947955390334572
recall =  1.0
f1_score =  0.11228070175438597
accuracy =  0.8473495187473065
