In [1]:

import ibmos2spark
# @hidden_cell
credentials = {
    'endpoint': 'YOUR_ENDPOINT_HERE',
    'service_id': 'YOUR_SERVICE_ID',
    'iam_service_endpoint': 'YOUR_IAM_SERVICE_ACCOUNT',
    'api_key': 'YOUR_API_KEY'
}

configuration_name = 'CONFIGURATION'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Since JSON data can be semi-structured and contain additional metadata, it is possible that you might face issues with the DataFrame layout.
# Please read the documentation of 'SparkSession.read()' to learn more about the possibilities to adjust the data loading.
# PySpark documentation: http://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json

# df_data_1 = spark.read.json(cos.url('medium-sparkify-event-data.json', 'sparkify-donotdelete-pr-dt38yc2ek9gkw7'))
# df_data_1.take(5)


Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200414144241-0002
KERNEL_ID = 42aa9e45-fc0c-4252-857b-c3067905a3bd


In [2]:
from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max as Fmax, split, udf
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.types import IntegerType
from pyspark.sql import Window

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, GBTClassifier,LinearSVC, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import datetime

from time import time

In [3]:
df_data_1 = spark.read.json(cos.url('medium-sparkify-event-data.json', 'sparkify-donotdelete-pr-dt38yc2ek9gkw7'))
df_data_1.take(5)

[Row(artist='Martin Orford', auth='Logged In', firstName='Joseph', gender='M', itemInSession=20, lastName='Morales', length=597.55057, level='free', location='Corpus Christi, TX', method='PUT', page='NextSong', registration=1532063507000, sessionId=292, song='Grand Designs', status=200, ts=1538352011000, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='293'),
 Row(artist="John Brown's Body", auth='Logged In', firstName='Sawyer', gender='M', itemInSession=74, lastName='Larson', length=380.21179, level='free', location='Houston-The Woodlands-Sugar Land, TX', method='PUT', page='NextSong', registration=1538069638000, sessionId=97, song='Bulls', status=200, ts=1538352025000, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='98'),
 Row(artist='Afroman', auth='Logged In', firstName='Maverick', gender='M', 

In [4]:
df = df_data_1.dropna(how = 'any', subset = ['userId', 'sessionId'])
df = df[df.userId != ""]
df.count()

528005

In [5]:
new_ts = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime("%m-%d-%Y %H:%M:%S"))
df = df.withColumn('updated_registration', new_ts('registration'))
df = df.withColumn('updated_ts', new_ts('ts'))

In [6]:
flag_downgrade_event = udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())
df = df.withColumn("downgrade_event", flag_downgrade_event("page"))

windowval = Window.partitionBy('userId')
df = df.withColumn('downgrade', Fmax('downgrade_event').over(windowval))

In [7]:
# define a churn label
churn_value = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())
df = df.withColumn("churn_value", churn_value("page"))

churn_window = Window.partitionBy('UserId')
df = df.withColumn("user_churn", Fmax('churn_value').over(churn_window))

### Feature Engineering

In [8]:
total_songs = df.select('userId', 'song').groupBy('userId').count().withColumnRenamed('count', 'total_songs')
total_length = df.select('userId', 'length').groupBy('userId').sum().withColumnRenamed('sum(length)', 'total_length')
thumbs_up = df.select('userId', 'page').where(df.page == 'Thumbs Up').groupBy('userId').count().withColumnRenamed('count', 'thumbs_up')
thumbs_down = df.select('userId', 'page').where(df.page == 'Thumbs Down').groupBy('userId').count().withColumnRenamed('count', 'thumbs_down')
user_lifetime = df.select('userId', 'registration', 'ts').withColumn('tot_lifetime', (df.ts - df.registration)).groupBy('userId').agg({'tot_lifetime' : 'max'})\
    .withColumnRenamed('max(tot_lifetime)', 'tot_lifetime').select('userId', (col('tot_lifetime')/1000/3600/24).alias('user_lifetime'))
friends_added = df.select('userId', 'page').where(df.page == 'Add Friend').groupBy('userId').count().withColumnRenamed('count', 'friends_added')
songs_per_session = df.where('page == "NextSong"').groupby(['userId', 'sessionId']).count().groupby('userId')\
          .agg({'count' : 'avg'}).withColumnRenamed('avg(count)', 'songs_per_session')
total_artists = df.filter(df.page == "NextSong").select("userId", "artist").dropDuplicates().groupby("userId").count()\
         .withColumnRenamed("count", "total_artists")
gender = df.select('userId', 'gender').dropDuplicates().replace(['F', 'M'], ['1', '0'], 'gender').select('userId', col('gender').cast('int'))
playlist_added = df.select('userID','page').where(df.page == 'Add to Playlist').groupBy('userID').count().withColumnRenamed('count', 'playlist_added')
downgrade = df.select('userId', 'downgrade').dropDuplicates().select('userId', col('downgrade'))
user_churn = df.select('userId', col('user_churn')).dropDuplicates()


In [9]:
final_df = total_songs.join(total_length, 'userId', 'inner').join(thumbs_up, 'userId', 'inner')\
                .join(thumbs_down, 'userId', 'inner').join(user_lifetime, 'userId', 'inner').join(songs_per_session, 'userId', 'inner')\
                .join(friends_added, 'userId', 'inner').join(playlist_added, 'userId', 'inner').join(gender, 'userId', 'inner')\
                .join(total_artists, 'userId', 'inner').join(downgrade, 'userId', 'inner')\
                .join(user_churn, 'userId', 'inner').drop('userId')

final_df.show(10)

+-----------+------------------+---------+-----------+------------------+------------------+-------------+--------------+------+-------------+---------+----------+
|total_songs|      total_length|thumbs_up|thumbs_down|     user_lifetime| songs_per_session|friends_added|playlist_added|gender|total_artists|downgrade|user_churn|
+-----------+------------------+---------+-----------+------------------+------------------+-------------+--------------+------+-------------+---------+----------+
|        137|25870.950739999997|        4|          3|14.328449074074074|              48.0|            3|             1|     1|           94|        0|         1|
|        395| 78695.82683999998|       15|          5| 53.32314814814814|              62.0|            2|             6|     0|          287|        0|         1|
|         84|15813.771070000003|        3|          1|105.42211805555554|20.666666666666668|            3|             2|     0|           61|        0|         0|
|       2156|   

### Modeling

In [10]:
assmblr = VectorAssembler(inputCols = ['total_songs', 'total_length', 'thumbs_up', 'thumbs_down', 'user_lifetime', 
                                       'songs_per_session', 'friends_added', 'playlist_added', 'gender', 'total_artists', 
                                       'downgrade'],
                          outputCol = "VectFeatures")
final_df = assmblr.transform(final_df)

In [11]:
scaler = StandardScaler(inputCol = 'VectFeatures', outputCol = "scaled_features", withStd = True)
scaler_model = scaler.fit(final_df)
final_df = scaler_model.transform(final_df)

In [12]:
data = final_df.select(final_df.user_churn.alias("label"), final_df.scaled_features.alias("features"))

In [13]:
train, rest = data.randomSplit([0.7, 0.3])
val, test = rest.randomSplit([0.5, 0.5])

In [14]:
train.select('label').agg({'label': 'sum'}).collect()

[Row(sum(label)=57)]

In [15]:
train.count()

250

In [16]:
dummy_classifier = val.withColumn('prediction', lit(0.0))

print('Dummy Classifier metrics:')
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print('Accuracy: {}'.format(evaluator.evaluate(dummy_classifier, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(dummy_classifier, {evaluator.metricName: "f1"})))
print('Recall Score:{}'.format(evaluator.evaluate(dummy_classifier, {evaluator.metricName: "weightedRecall"})))

Dummy Classifier metrics:
Accuracy: 0.7755102040816326
F-1 Score:0.6774571897724607
Recall Score:0.7755102040816326


In [19]:
lr = LogisticRegression(maxIter=15)
rf = RandomForestClassifier()
svc = LinearSVC(maxIter=15)
gb = GBTClassifier(maxIter=15)

In [18]:
model_lr = lr.fit(train)
results_lr = model_lr.transform(val)

print('Logistic Regression metrics:')
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print('Accuracy: {}'.format(evaluator.evaluate(results_lr, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(results_lr, {evaluator.metricName: "f1"})))
print('Recall Score:{}'.format(evaluator.evaluate(results_lr, {evaluator.metricName: "weightedRecall"})))

Logistic Regression metrics:
Accuracy: 0.8163265306122449
F-1 Score:0.7624711423030751
Recall Score:0.8163265306122449


In [20]:
f1_evaluation = MulticlassClassificationEvaluator(metricName = 'f1')
param_grid = ParamGridBuilder().build()

lrs = LogisticRegression(maxIter=15)
cv_lr = CrossValidator(estimator = lrs, estimatorParamMaps = param_grid, evaluator = f1_evaluation, numFolds = 3)
model_lr_cv = cv_lr.fit(train)
results_lr_cv = model_lr_cv.transform(val)

print('Logistic Regression Cross-Val metrics:')
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print('Accuracy: {}'.format(evaluator.evaluate(results_lr_cv, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(results_lr_cv, {evaluator.metricName: "f1"})))

Logistic Regression Cross-Val metrics:
Accuracy: 0.8163265306122449
F-1 Score:0.7624711423030751


In [21]:
f1_evaluation = MulticlassClassificationEvaluator(metricName = 'weightedRecall')
param_grid = ParamGridBuilder().build()

lrs = LogisticRegression(maxIter=15)
cv_lr = CrossValidator(estimator = lrs, estimatorParamMaps = param_grid, evaluator = f1_evaluation, numFolds = 3)
model_lr_cv = cv_lr.fit(train)
results_lr_cv = model_lr_cv.transform(val)

print('Logistic Regression Cross-Val metrics:')
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print('Accuracy: {}'.format(evaluator.evaluate(results_lr_cv, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(results_lr_cv, {evaluator.metricName: "f1"})))
print('Recall Score:{}'.format(evaluator.evaluate(results_lr_cv, {evaluator.metricName: "weightedRecall"})))

Logistic Regression Cross-Val metrics:
Accuracy: 0.8163265306122449
F-1 Score:0.7624711423030751
Recall Score:0.8163265306122449


In [22]:
model_rf = rf.fit(train)
results_rf = model_rf.transform(val)

print('Random Forest metrics:')
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print('Accuracy: {}'.format(evaluator.evaluate(results_rf, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(results_rf, {evaluator.metricName: "f1"})))
print('Recall Score:{}'.format(evaluator.evaluate(results_rf, {evaluator.metricName: "weightedRecall"})))

Random Forest metrics:
Accuracy: 0.8367346938775511
F-1 Score:0.7978620019436345
Recall Score:0.836734693877551


In [23]:
model_svc = svc.fit(train)
results_svc = model_svc.transform(val)

print('Linear SVC metrics:')
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print('Accuracy: {}'.format(evaluator.evaluate(results_svc, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(results_svc, {evaluator.metricName: "f1"})))

Linear SVC metrics:
Accuracy: 0.7755102040816326
F-1 Score:0.6774571897724607


In [24]:
model_gb = gb.fit(train)
results_gb = model_gb.transform(val)

print('GB Tree metrics:')
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print('Accuracy: {}'.format(evaluator.evaluate(results_gb, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(results_gb, {evaluator.metricName: "f1"})))

GB Tree metrics:
Accuracy: 0.7551020408163265
F-1 Score:0.7181433549029368


### Tuning Random Forest Model

In [28]:
rf_tune = RandomForestClassifier()

param_grid_rf_tune = ParamGridBuilder().addGrid(rf_tune.maxDepth, [5, 10])\
                                   .addGrid(rf_tune.numTrees, [20, 30, 40, 50]).build()

f1_evaluation = MulticlassClassificationEvaluator(metricName = 'f1')

cv_rf_tune = CrossValidator(estimator = rf_tune, estimatorParamMaps = param_grid_rf_tune, evaluator = f1_evaluation, numFolds = 3)

model_rf_tune = cv_rf_tune.fit(train)
results_rf_tune = model_rf_tune.transform(val)

print('Tuned Random Forest metrics:')
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print('Accuracy: {}'.format(evaluator.evaluate(results_rf_tune, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(results_rf_tune, {evaluator.metricName: "f1"})))

Tuned Random Forest metrics:
Accuracy: 0.8571428571428571
F-1 Score:0.8298336201950659


In [39]:
model_rf_tune.bestModel.explainParams()

"cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: False)\ncheckpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext (default: 10)\nfeatureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n]. (default: auto)\nfeaturesCol: features column name (default: features)\nimpurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)\nlabelCol: label column name (default: label)\nmaxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for

In [41]:
results_test = model_rf_tune.transform(test)

### Testing model performance for Final Model

In [42]:
print('Tuned Random Forest metrics:')
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print('Accuracy: {}'.format(evaluator.evaluate(results_test, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(results_test, {evaluator.metricName: "f1"})))

Tuned Random Forest metrics:
Accuracy: 0.7391304347826086
F-1 Score:0.6814774430966335


Features in model: |total_songs|      total_length|thumbs_up|thumbs_down|user_lifetime| songs_per_session|friends_added|playlist_added|gender|total_artists|downgrade

In [43]:
model_rf_tune.bestModel.featureImportances

SparseVector(11, {0: 0.0883, 1: 0.0539, 2: 0.082, 3: 0.0799, 4: 0.3667, 5: 0.1184, 6: 0.0834, 7: 0.0704, 8: 0.019, 9: 0.0296, 10: 0.0084})