# Sparkify-model build
#### Dependencies: Sparkify-analysis and preprocessing has been run. 

This notebook contains the development of the Sparkify customer churn model.

In [1]:
#Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, asc, desc, lit, min, max, udf, isnull,when, row_number, floor,ceil, sum ,count, countDistinct
from pyspark.sql.types import IntegerType,DoubleType
from pyspark.sql import Window

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, NaiveBayes, GBTClassifier, RandomForestClassifier, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel

import datetime
import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import os
from time import time

import gc

In [2]:
#Check versions of packages
from platform import python_version

print(python_version())

print(pd.__version__)

print(matplotlib.__version__)

3.6.3
0.23.3
2.1.0


In [3]:
#Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()


In [4]:
spark.sparkContext.getConf().getAll()
spark

# Load dataset
Load the dataset created in Sparkify-analysis and preprocessing `dfFinal.csv`.  

In [5]:
#Find current direct
current_path = os.getcwd()
current_path

'/home/workspace'

In [6]:
dfFinal=pd.read_csv(str(current_path)+'/dfFinal.csv')
dfFinal

Unnamed: 0,userId,gender,churn,daysRegistered,weeksRegistered,latestLevel,totalSession,totalInteraction,totalLength,avgIntPerSession,...,totalDaysActiveFN0,totalWeeksActiveFN0,totalFortnightsActiveFN0,ActiveDaysRatioFN0,ActiveWeeksRatioFN0,totalDaysActiveFN1,totalWeeksActiveFN1,totalFortnightsActiveFN1,ActiveDaysRatioFN1,ActiveWeeksRatioFN1
0,100010,1,0,55.643657,7.949094,0,7,381,6.694090e+04,54.428571,...,2,2,1,0.142857,1.0,1,1,1,0.071429,0.5
1,200002,0,0,70.074630,10.010661,1,6,474,9.400888e+04,79.000000,...,1,1,1,0.071429,0.5,2,1,1,0.142857,0.5
2,125,0,1,71.316887,10.188127,0,1,11,2.089113e+03,11.000000,...,1,1,1,0.071429,0.5,0,0,0,0.000000,0.0
3,124,1,0,131.555914,18.793702,1,29,4825,1.012312e+06,166.379310,...,7,2,1,0.500000,1.0,11,2,1,0.785714,1.0
4,51,0,1,19.455845,2.779406,1,10,2464,5.232758e+05,246.400000,...,10,2,1,0.714286,1.0,2,1,1,0.142857,0.5
5,7,0,0,72.778183,10.396883,0,7,201,3.803409e+04,28.714286,...,3,2,1,0.214286,1.0,0,0,0,0.000000,0.0
6,15,0,0,56.513576,8.073368,1,15,2278,4.773076e+05,151.866667,...,5,2,1,0.357143,1.0,2,1,1,0.142857,0.5
7,54,1,1,110.751690,15.821670,1,37,3437,7.113449e+05,92.891892,...,11,2,1,0.785714,1.0,10,2,1,0.714286,1.0
8,155,1,0,23.556019,3.365146,1,6,1002,1.987793e+05,167.000000,...,3,2,1,0.214286,1.0,4,2,1,0.285714,1.0
9,100014,0,1,85.083403,12.154772,1,6,310,6.770347e+04,51.666667,...,2,2,1,0.142857,1.0,0,0,0,0.000000,0.0


In [7]:
dfFinal = spark.createDataFrame(dfFinal)
dfFinal.printSchema()


root
 |-- userId: long (nullable = true)
 |-- gender: long (nullable = true)
 |-- churn: long (nullable = true)
 |-- daysRegistered: double (nullable = true)
 |-- weeksRegistered: double (nullable = true)
 |-- latestLevel: long (nullable = true)
 |-- totalSession: long (nullable = true)
 |-- totalInteraction: long (nullable = true)
 |-- totalLength: double (nullable = true)
 |-- avgIntPerSession: double (nullable = true)
 |-- avgSessionLength: double (nullable = true)
 |-- avgSessionsPerWeek: double (nullable = true)
 |-- totalSong: long (nullable = true)
 |-- avgSongPerSession: double (nullable = true)
 |-- avgSongPerWeek: double (nullable = true)
 |-- totalHome: long (nullable = true)
 |-- avgHomePerSession: double (nullable = true)
 |-- avgHomePerWeek: double (nullable = true)
 |-- totalAddFriend: long (nullable = true)
 |-- avgAddFriendPerSession: double (nullable = true)
 |-- avgAddFriendPerWeek: double (nullable = true)
 |-- totalAddToPlaylist: long (nullable = true)
 |-- avgAddT

In [8]:
#Ensure features are all of the right type for machine learning
featuresList = dfFinal.columns[:]
for feature in featuresList:
    dfFinal = dfFinal.withColumn(feature, dfFinal[feature].cast("float"))

#Drop unnecessary columns from features
dfMLInput = dfFinal.drop("userId")
feature_cols = dfMLInput.drop("userId", "churn").columns


In [9]:
dfMLInput

DataFrame[gender: float, churn: float, daysRegistered: float, weeksRegistered: float, latestLevel: float, totalSession: float, totalInteraction: float, totalLength: float, avgIntPerSession: float, avgSessionLength: float, avgSessionsPerWeek: float, totalSong: float, avgSongPerSession: float, avgSongPerWeek: float, totalHome: float, avgHomePerSession: float, avgHomePerWeek: float, totalAddFriend: float, avgAddFriendPerSession: float, avgAddFriendPerWeek: float, totalAddToPlaylist: float, avgAddToPlaylistPerSession: float, avgAddToPlaylistPerWeek: float, totalNextSong: float, avgNextSongPerSession: float, avgNextSongPerWeek: float, totalThumbsUp: float, avgThumbsUpPerSession: float, avgThumbsUpPerWeek: float, totalLogout: float, avgLogoutPerSession: float, avgLogoutPerWeek: float, totalDaysActive: float, totalWeeksActive: float, totalFortnightsActive: float, ActiveDaysRatio: float, ActiveWeeksRatio: float, totalSessionWk0: float, totalInteractionWk0: float, totalLengthWk0: float, avgIntP

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [10]:
# extract prediction columns (all except 'churned'), and split data into train and test sets
#pred_cols = users.columns
#pred_cols.remove('churned')
#train, test = users.randomSplit([0.75, 0.25],seed=123)



# Split data into train and test

traindf, testdf = dfMLInput.randomSplit([0.7, 0.3], seed = 42)
print('Shape of train and test sets:')
print((traindf.count(), len(traindf.columns)))
print((testdf.count(), len(testdf.columns)))







Shape of train and test sets:
(167, 67)
(58, 67)


#### Model selection for classicfication problem

Have decided to train three models on the dataset:
* Naive Bayes
* Logistic regression 
* Naive Bayes
* XG Boost
* SVM


#### Training and predicting pipeline for preliminary models

In [11]:
def train_predict(learner, train, test): 
    '''
    inputs:
       - learner: the learning algorithm to be trained and predicted on
       - train: features training set
       - test: features testing set
    '''
    #Create assembler and scaler for our pipeline
    #Transform features into features vector column
    assembler = VectorAssembler(inputCols=feature_cols,outputCol="featuresVector")

    #Scale the features    
    minmaxscaler = MinMaxScaler(inputCol="featuresVector", outputCol="scaledFeatures")

    #Performance metrics 
    evaluatorAcc = MulticlassClassificationEvaluator(labelCol='churn', predictionCol='prediction', metricName='accuracy')
    evaluatorF1 = MulticlassClassificationEvaluator(labelCol='churn', predictionCol='prediction', metricName='f1')
    evaluatorAUPR = BinaryClassificationEvaluator(labelCol='churn', metricName='areaUnderPR')
    
    #Pipeline for classifier using default parameters
    pipeline = Pipeline(stages=[assembler, minmaxscaler, learner])
    #results = {}
    
    #Train model and get predictions
    start = time() # Get start time

    model = pipeline.fit(train)
    preds = model.transform(test)
    
    end = time() # Get end time
    trainTime = end - start
    print('Total training time for {}: {} seconds'.format(learner.__class__.__name__, trainTime))
  
    
    #Calculate, tabulate and print metrics
    Acc = evaluatorAcc.evaluate(preds)
    F1 = evaluatorF1.evaluate(preds)
    AUPR = evaluatorAUPR.evaluate(preds)
    
    metrics.append([learner.__class__.__name__, Acc, F1, AUPR, trainTime])
    
    print('Accuracy Score for {} on test set: {} '.format(learner.__class__.__name__, Acc))   
    print('F1 Score for {} on test set: {} '.format(learner.__class__.__name__, F1))   
    print('Area under P-R curve for {} on test set: {} '.format(learner.__class__.__name__, AUPR))   

    return model,metrics


#### Train and evaluate initial models

In [12]:
metrics = []

modelNB=NaiveBayes(featuresCol='scaledFeatures', labelCol='churn')
modelLR=LogisticRegression(featuresCol='scaledFeatures', labelCol='churn')
modelRF=RandomForestClassifier(featuresCol='scaledFeatures', labelCol='churn',seed=42)
modelGBT=GBTClassifier(featuresCol='scaledFeatures', labelCol='churn',seed=42)
modelSVM=LinearSVC(featuresCol='scaledFeatures', labelCol='churn')

modelNB, metrics = train_predict(modelNB, traindf, testdf)
modelLR, metrics = train_predict(modelLR, traindf, testdf)
modelRF, metrics = train_predict(modelRF, traindf, testdf)
modelGBT, metrics = train_predict(modelGBT, traindf, testdf)
modelSVM, metrics = train_predict(modelSVM, traindf, testdf)

Total training time for NaiveBayes: 19.98766016960144 seconds
Accuracy Score for NaiveBayes on test set: 0.8448275862068966 
F1 Score for NaiveBayes on test set: 0.813255709807434 
Area under P-R curve for NaiveBayes on test set: 0.3162795555533192 
Total training time for LogisticRegression: 27.134162664413452 seconds
Accuracy Score for LogisticRegression on test set: 0.8103448275862069 
F1 Score for LogisticRegression on test set: 0.827931898712298 
Area under P-R curve for LogisticRegression on test set: 0.35334182967161687 
Total training time for RandomForestClassifier: 5.716929912567139 seconds
Accuracy Score for RandomForestClassifier on test set: 0.8793103448275862 
F1 Score for RandomForestClassifier on test set: 0.8905021173623715 
Area under P-R curve for RandomForestClassifier on test set: 0.6987998047549935 
Total training time for GBTClassifier: 18.816765546798706 seconds
Accuracy Score for GBTClassifier on test set: 0.7758620689655172 
F1 Score for GBTClassifier on test 

In [13]:
#Tabulate metrics

dfMetrics=pd.DataFrame(metrics)

dfMetrics.columns = ['Model','Accuracy','F1 Score','AUC P-R' ,'Training time']
dfMetrics


Unnamed: 0,Model,Accuracy,F1 Score,AUC P-R,Training time
0,NaiveBayes,0.844828,0.813256,0.31628,19.98766
1,LogisticRegression,0.810345,0.827932,0.353342,27.134163
2,RandomForestClassifier,0.87931,0.890502,0.6988,5.71693
3,GBTClassifier,0.775862,0.807669,0.553506,18.816766
4,LinearSVC,0.827586,0.816961,0.306953,29.593498


In [14]:
#Tabulate metrics

dfMetrics=pd.DataFrame(metrics)

dfMetrics.columns = ['Model','Accuracy','F1 Score','AUC P-R' ,'Training time']
dfMetrics



Unnamed: 0,Model,Accuracy,F1 Score,AUC P-R,Training time
0,NaiveBayes,0.844828,0.813256,0.31628,19.98766
1,LogisticRegression,0.810345,0.827932,0.353342,27.134163
2,RandomForestClassifier,0.87931,0.890502,0.6988,5.71693
3,GBTClassifier,0.775862,0.807669,0.553506,18.816766
4,LinearSVC,0.827586,0.816961,0.306953,29.593498


#### Hyperparameter tuning for final model and evaluation

In [15]:
RandomForestClassifier().explainParams()

"cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)\ncheckpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)\nfeatureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use log2(number of features)

In [16]:
RF = RandomForestClassifier(featuresCol='scaledFeatures', labelCol='churn',seed = 42)


#Create assembler and scaler for our pipeline
#Transform features into features vector column
assembler = VectorAssembler(inputCols=feature_cols,outputCol="featuresVector")

#Scale the features    
minmaxscaler = MinMaxScaler(inputCol="featuresVector", outputCol="scaledFeatures")

#Performance metrics 
evaluatorAcc = MulticlassClassificationEvaluator(labelCol='churn', predictionCol='prediction', metricName='accuracy')
evaluatorF1 = MulticlassClassificationEvaluator(labelCol='churn', predictionCol='prediction', metricName='f1')
evaluatorAUPR = BinaryClassificationEvaluator(labelCol='churn', metricName='areaUnderPR')
    
#Pipeline for classifier using default parameters
pipeline = Pipeline(stages=[assembler, minmaxscaler, RF])


paramGrid = ParamGridBuilder().addGrid(RF.impurity,['entropy', 'gini']) \
                              .addGrid(RF.maxDepth,[3,4,5])\
                              .addGrid(RF.maxBins,[5,10,15])\
                              .build()

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps = paramGrid, 
                    evaluator = evaluatorAUPR,
                    numFolds=3,seed = 97)


#Train model and get predictions
start = time() # Get start time

modelFinal = cv.fit(traindf)
preds = modelFinal.transform(testdf)
    
end = time() # Get end time
trainTime = end - start
print('Total training time for {}: {} seconds'.format(RF.__class__.__name__, trainTime))
      
# Calculate, tabulate and print metrics
Acc = evaluatorAcc.evaluate(preds)
F1 = evaluatorF1.evaluate(preds)
AUPR = evaluatorAUPR.evaluate(preds)
    
metrics.append([modelFinal.__class__.__name__, Acc, F1, AUPR, trainTime])
    
print('Accuracy Score for {} on test set: {} '.format(RF.__class__.__name__, Acc))   
print('F1 Score for {} on test set: {} '.format(RF.__class__.__name__, F1))   
print('Area under P-R curve for {} on test set: {} '.format(RF.__class__.__name__, AUPR))   




Total training time for RandomForestClassifier: 114.22470283508301 seconds
Accuracy Score for RandomForestClassifier on test set: 0.9137931034482759 
F1 Score for RandomForestClassifier on test set: 0.9192658147323516 
Area under P-R curve for RandomForestClassifier on test set: 0.511199768368886 


In [26]:
#Tabulate metrics

dfMetrics=pd.DataFrame(metrics)

dfMetrics.columns = ['Model','Accuracy','F1 Score','AUC P-R' ,'Training time']

dfMetrics.loc[[2],["Model"]]="Baseline Random Forest"
dfMetrics.loc[[5],["Model"]]="Optimised Random Forest"
dfMetrics.iloc[[2,5],:]



Unnamed: 0,Model,Accuracy,F1 Score,AUC P-R,Training time
2,Baseline Random Forest,0.87931,0.890502,0.6988,5.71693
5,Optimised Random Forest,0.913793,0.919266,0.5112,114.224703


In [27]:
def featureImportance(model):
    '''
    Rank the feature importance of a tree-based model.
    Inputs:
    - model: the trained tree-based model 
    Outputs:
    - table of the top ten most important features.
    '''
    featureImportanceList = list(model.bestModel.stages[-1].featureImportances)
    featuresList = list(feature_cols)

    importanceDict = {"Feature":featuresList,"importanceScore":featureImportanceList}
    pdFeatureImportance = pd.DataFrame(importanceDict).sort_values(by = "importanceScore", ascending = False)

    return (pdFeatureImportance.head(10))


top10 = featureImportance(modelFinal)
top10

Unnamed: 0,Feature,importanceScore
32,totalWeeksActive,0.097631
35,ActiveWeeksRatio,0.092371
33,totalFortnightsActive,0.06363
2,weeksRegistered,0.057067
1,daysRegistered,0.05327
34,ActiveDaysRatio,0.051185
31,totalDaysActive,0.036815
26,avgThumbsUpPerSession,0.030207
54,avgIntPerSessionWk3,0.027732
41,totalSessionWk1,0.024994
