In [1]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, count, isnull, udf, when

from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import datetime

import numpy as np
#import pandas as pd
#import seaborn as sns

#%matplotlib inline
#import matplotlib.pyplot as plt

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1584446657739_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

In [3]:
# clean up: remove all rows where the userId is empty as these are from events where a user was 
# not logged in which is non-relevant for our churn analysis
df_clean = df.filter(df.userId != "")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df_clean.describe("userId", "sessionId").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+------------------+
|summary|            userId|         sessionId|
+-------+------------------+------------------+
|  count|          26259199|          26259199|
|   mean|1488379.8347142653|100577.99253503505|
| stddev|286970.08894623973| 71909.21077875949|
|    min|           1000025|                 1|
|    max|           1999996|            240381|
+-------+------------------+------------------+

In [5]:
# Add the hour of day when the song was played to the data frame as a feature
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). hour)
df_clean = df_clean.withColumn("hour", get_hour(df_clean.ts))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Filter the data frame to only contain churn events
df_churn = df_clean.where(df_clean.auth=="Cancelled")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# create a churn column in the dataframe that is 1 for all the events that are from users that have churned
list_churnusers = [(row['userId']) for row in df_churn.collect()]

df_clean_churn = df_clean.withColumn("churn", when(col("userId").isin(list_churnusers),1).otherwise(0))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Now its time create the features for each userId
# Feature 1: Gender
f1 = df_clean_churn.select('userId', 'gender').dropDuplicates().replace(['F', 'M'], ['0', '1'], 'gender').select('userId', col('gender').cast('int'))

f1.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------+
| userId|gender|
+-------+------+
|1679072|     1|
|1141231|     1|
|1163054|     1|
|1585825|     1|
|1367398|     1|
|1458511|     0|
|1048010|     0|
|1769374|     1|
|1996694|     0|
|1502623|     1|
|1766596|     0|
|1136642|     0|
|1076017|     0|
|1439734|     0|
|1859039|     1|
|1998774|     0|
|1377692|     1|
|1403205|     0|
|1226440|     0|
|1739620|     1|
+-------+------+
only showing top 20 rows

In [10]:
# Feature 2: Level
f2 = df_clean_churn.select('userId', 'level').dropDuplicates().replace(['free', 'paid'], ['0', '1'], 'level').select('userId', col('level').cast('int'))

f2.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+
| userId|level|
+-------+-----+
|1057440|    1|
|1319705|    1|
|1513921|    0|
|1434232|    1|
|1147993|    1|
|1347514|    0|
|1094918|    0|
|1381135|    0|
|1539625|    1|
|1269740|    0|
|1029700|    1|
|1692537|    0|
|1717782|    0|
|1140592|    0|
|1805728|    1|
|1014253|    1|
|1495417|    1|
|1216358|    0|
|1990153|    0|
|1914558|    0|
+-------+-----+
only showing top 20 rows

In [11]:
# Feature 3: Number of songs listened per session
f3  = df_clean_churn.where(df_clean_churn.page == "NextSong").groupby(['userId', 'sessionId']).count().groupby('userId').agg({'count' : 'avg'}).withColumnRenamed('avg(count)', 'songs_per_session')

f3.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+
| userId| songs_per_session|
+-------+------------------+
|1690101| 66.16666666666667|
|1396135|            121.75|
|1057724| 98.64102564102564|
|1828442| 21.09090909090909|
|1994878| 77.18518518518519|
|1875484|             59.96|
|1624220|           27.6875|
|1983423| 97.85714285714286|
|1517899| 51.57142857142857|
|1114507| 65.23076923076923|
|1347224|              88.5|
|1017805| 83.33333333333333|
|1553683|              13.0|
|1178731|115.78260869565217|
|1578670|           113.375|
|1917123|54.666666666666664|
|1492713|              31.4|
|1829495|              66.4|
|1351489| 80.23076923076923|
|1658815| 71.06666666666666|
+-------+------------------+
only showing top 20 rows

In [12]:
# Feature 4 : Number of songs played / total number of events per user
f4 = df_clean_churn.select('userId', 'song').groupBy('userId').count().withColumnRenamed('count', 'total_songs')
f4.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+
| userId|total_songs|
+-------+-----------+
|1331962|        727|
|1178731|       3165|
|1528396|        154|
|1002185|       2080|
|1333041|        966|
|1567623|       1375|
|1190352|       2622|
|1983423|       1629|
|1271218|       5755|
|1396828|       1519|
|1142513|        618|
|1875484|       1834|
|1083324|       1784|
|1612069|        339|
|1734557|        588|
|1071308|       1693|
|1492713|        629|
|1658815|       1310|
|1057724|       4669|
|1718034|        875|
+-------+-----------+
only showing top 20 rows

In [13]:
# Feature 5: Number of songs added to playlists
f5 = df_clean_churn.select('userId', 'page').where(df_clean_churn.page == 'Add to Playlist').groupBy('userId').count().withColumnRenamed('count', 'playlist_adds')
f5.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------+
| userId|playlist_adds|
+-------+-------------+
|1002185|           49|
|1178731|           84|
|1567623|           39|
|1331962|           17|
|1190352|           63|
|1271218|          137|
|1983423|           26|
|1396828|           50|
|1142513|           19|
|1083324|           43|
|1734557|           14|
|1071308|           26|
|1612069|            5|
|1492713|           20|
|1588738|           57|
|1069552|           11|
|1339632|          117|
|1965481|           83|
|1200956|          107|
|1803077|           18|
+-------+-------------+
only showing top 20 rows

In [14]:
# Feature 6: Number of friends added
f6 = df_clean_churn.select('userId', 'page').where(df_clean_churn.page == 'Add Friend').groupBy('userId').count().withColumnRenamed('count', 'friends_added')
f6.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------+
| userId|friends_added|
+-------+-------------+
|1983423|           23|
|1002185|           25|
|1190352|           40|
|1142513|           16|
|1083324|           26|
|1071308|           31|
|1339632|           74|
|1965481|           48|
|1200956|           43|
|1718034|            9|
|1396828|           13|
|1396135|           13|
|1102913|           18|
|1633577|           55|
|1663036|           26|
|1994878|           51|
|1057724|           76|
|1794110|           15|
|1676292|           46|
|1178731|           35|
+-------+-------------+
only showing top 20 rows

In [15]:
# Feature 7: Number of errors
f7 = df_clean_churn.select('userId', 'page').where(df_clean_churn.page == 'Error').groupBy('userId').count().withColumnRenamed('count', 'errors')
f7.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------+
| userId|errors|
+-------+------+
|1331962|     1|
|1500901|     7|
|1663036|     1|
|1178731|     7|
|1339632|     5|
|1200956|     3|
|1619792|     3|
|1983423|     3|
|1633577|     3|
|1002185|     2|
|1658815|     1|
|1875484|     4|
|1271218|     6|
|1519090|     2|
|1151194|     2|
|1083324|     2|
|1734557|     1|
|1102913|     2|
|1190352|     3|
|1142513|     1|
+-------+------+
only showing top 20 rows

In [16]:
# Feature 8: Number of adverts
f8 = df_clean_churn.select('userId', 'page').where(df_clean_churn.page == 'Roll Advert').groupBy('userId').count().withColumnRenamed('count', 'adverts')
f8.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+
| userId|adverts|
+-------+-------+
|1358765|      4|
|1519090|     33|
|1718034|     19|
|1384823|     23|
|1875484|     43|
|1492713|     36|
|1658815|     19|
|1633577|     51|
|1114507|     13|
|1338783|      9|
|1057724|     67|
|1612069|     19|
|1142513|     36|
|1394508|     50|
|1994878|     34|
|1578670|     10|
|1178731|      7|
|1396828|     10|
|1656082|      7|
|1567623|     20|
+-------+-------+
only showing top 20 rows

In [17]:
# Feature 9: Number of thumbs up
f9 = df_clean_churn.select('userId', 'page').where(df_clean_churn.page == 'Thumbs Up').groupBy('userId').count().withColumnRenamed('count', 'thumbs_up')
f9.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------+
| userId|thumbs_up|
+-------+---------+
|1331962|       30|
|1002185|       92|
|1178731|      147|
|1567623|       65|
|1528396|        3|
|1983423|       75|
|1190352|      113|
|1333041|       34|
|1271218|      250|
|1083324|       75|
|1875484|       81|
|1142513|       22|
|1612069|       10|
|1071308|       74|
|1734557|       18|
|1069552|       26|
|1658815|       53|
|1718034|       30|
|1588738|       98|
|1396828|       67|
+-------+---------+
only showing top 20 rows

In [18]:
# Feature 10: Number of thumbs down
f10 = df_clean_churn.select('userId', 'page').where(df_clean_churn.page == 'Thumbs Down').groupBy('userId').count().withColumnRenamed('count', 'thumbs_down')
f10.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+
| userId|thumbs_down|
+-------+-----------+
|1351489|          8|
|1390009|          2|
|1358765|          1|
|1178731|         27|
|1500901|         45|
|1384823|          9|
|1492713|          3|
|1588738|         22|
|1331962|          8|
|1875484|         14|
|1396135|          9|
|1394508|         16|
|1578670|         13|
|1718034|          9|
|1190352|         28|
|1339632|         39|
|1994878|         25|
|1002185|         14|
|1794110|          6|
|1612069|          4|
+-------+-----------+
only showing top 20 rows

In [19]:
churn = df_clean_churn.select('userId', col('churn').alias('churn')).dropDuplicates()
churn.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+
| userId|churn|
+-------+-----+
|1041334|    0|
|1633090|    0|
|1426289|    1|
|1322128|    0|
|1613426|    1|
|1176340|    0|
|1892796|    0|
|1580477|    0|
|1310470|    1|
|1187013|    1|
|1612481|    1|
|1889806|    0|
|1082054|    0|
|1901007|    1|
|1391302|    1|
|1195956|    0|
|1789949|    0|
|1351526|    0|
|1432320|    1|
|1169629|    0|
+-------+-----+
only showing top 20 rows

In [20]:
# as a final step we combine all the features with the churn column into a feature_data dataframe

feature_data = f1.join(f2, 'userId', 'inner').join(f3, 'userId', 'inner').join(f4, 'userId', 'inner')\
             .join(f5, 'userId', 'inner').join(f6, 'userId', 'inner').join(f7, 'userId', 'inner')\
             .join(f8, 'userId', 'inner').join(f9, 'userId', 'inner').join(f10, 'userId', 'inner').join(churn, 'userId', 'inner').drop('userId')

feature_data.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-----+------------------+-----------+-------------+-------------+------+-------+---------+-----------+-----+
|gender|level| songs_per_session|total_songs|playlist_adds|friends_added|errors|adverts|thumbs_up|thumbs_down|churn|
+------+-----+------------------+-----------+-------------+-------------+------+-------+---------+-----------+-----+
|     1|    1|48.666666666666664|       1317|           25|           14|     3|     74|       53|         33|    1|
|     1|    0|48.666666666666664|       1317|           25|           14|     3|     74|       53|         33|    1|
|     0|    1|104.58823529411765|       2080|           49|           25|     2|      1|       92|         14|    0|
|     1|    0| 98.64102564102564|       4669|          135|           76|     1|     67|      200|         29|    0|
|     1|    1| 98.64102564102564|       4669|          135|           76|     1|     67|      200|         29|    0|
|     0|    0|37.916666666666664|        582|           11|     

In [21]:
# before we start the modelling we need to combine the features into a features vector
feature_columns = ['gender', 'level', 'songs_per_session', 'total_songs', 'playlist_adds', 'friends_added', 'errors', 'adverts', 'thumbs_up', 'thumbs_down']
assmbler = VectorAssembler(inputCols = feature_columns, outputCol = "vectorized_features")
feature_data = assmbler.transform(feature_data)
feature_data.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-----+------------------+-----------+-------------+-------------+------+-------+---------+-----------+-----+--------------------+
|gender|level| songs_per_session|total_songs|playlist_adds|friends_added|errors|adverts|thumbs_up|thumbs_down|churn| vectorized_features|
+------+-----+------------------+-----------+-------------+-------------+------+-------+---------+-----------+-----+--------------------+
|     1|    1|48.666666666666664|       1317|           25|           14|     3|     74|       53|         33|    1|[1.0,1.0,48.66666...|
|     1|    0|48.666666666666664|       1317|           25|           14|     3|     74|       53|         33|    1|[1.0,0.0,48.66666...|
|     0|    1|104.58823529411765|       2080|           49|           25|     2|      1|       92|         14|    0|[0.0,1.0,104.5882...|
|     1|    0| 98.64102564102564|       4669|          135|           76|     1|     67|      200|         29|    0|[1.0,0.0,98.64102...|
|     1|    1| 98.64102564102564| 

In [None]:
# in order to enable our machine learning models to work with our data without getting bias though different scales
# we need to scale the data
scaler = StandardScaler(inputCol = 'vectorized_features', outputCol = "scaled_features", withStd = True, withMean = False)
scaler_model = scaler.fit(feature_data)
feature_data = scaler_model.transform(feature_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# here we prepare the data into label and features vector as well was renaming these to use the standard 
# column names used by the used classifiers.
feature_data_final = feature_data.select(['churn', 'scaled_features'])
feature_data_final = feature_data_final.selectExpr("churn as label", "scaled_features as features")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# splitting the data in training and testing data sets.
train_data , test_data = feature_data_final.randomSplit([0.7,0.3], seed = 42)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Fitting a Random Forest Classifier
rfc = RandomForestClassifier()

mce_f1 = MulticlassClassificationEvaluator(metricName = 'f1')
parameter_grid = ParamGridBuilder().build()

cv_rf = CrossValidator(estimator = rfc, estimatorParamMaps = parameter_grid, evaluator = mce_f1, numFolds = 2)

cv_rf_model = cv_rf.fit(train_data)
cv_rf_model.avgMetrics

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.6816315447720964]

In [None]:
# Evaluating the results with regard to f1 score and accuracy
test_result_rf = cv_rf_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction")
evaluator_ROC = BinaryClassificationEvaluator(rawPredictionCol='prediction', metricName='areaUnderROC')
print("f1 score: " + str(evaluator.evaluate(test_result_rf, {evaluator.metricName : "accuracy"})))
print("accuracy: " + str(evaluator.evaluate(test_result_rf, {evaluator.metricName : "f1"})))
print("area under ROC: " + str(evaluator_ROC.evaluate(test_result_rf, {evaluator.metricName : "areaUnderROC"})))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

f1 score: 0.7853231106243155
accuracy: 0.6962696408305166
area under ROC: 0.5102765088380845

In [None]:
# Fitting a Gradient Boost Tree Classifier
gbtc = GBTClassifier(maxIter = 5, maxDepth = 5, seed = 42)

mce_f1 = MulticlassClassificationEvaluator(metricName = 'f1')
parameter_grid = ParamGridBuilder().build()

cv_gbtc = CrossValidator(estimator = gbtc, estimatorParamMaps = parameter_grid, evaluator = mce_f1, numFolds = 2)

cv_gbtc_model = cv_gbtc.fit(train_data)
cv_gbtc_model.avgMetrics

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.6970628674152508]

In [None]:
# Evaluating the results with regard to f1 score and accuracy
test_result_gbtc = cv_gbtc_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction")
evaluator_ROC = BinaryClassificationEvaluator(rawPredictionCol='prediction', metricName='areaUnderROC')
print("f1 score: " + str(evaluator.evaluate(test_result_gbtc, {evaluator.metricName : "accuracy"})))
print("accuracy: " + str(evaluator.evaluate(test_result_gbtc, {evaluator.metricName : "f1"})))
print("area under ROC: " + str(evaluator_ROC.evaluate(test_result_gbtc, {evaluator.metricName : "areaUnderROC"})))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

f1 score: 0.7882438846294268
accuracy: 0.7085762138808424
area under ROC: 0.5226772841938998

In [None]:
# Fitting a Gradient Boost Tree Classifier and using Grid Search to find the optimal hyperparameters
gbtc_final = GBTClassifier(maxIter = 5, maxDepth = 5, seed = 42)
parameter_grid_final = ParamGridBuilder().addGrid(gbtc_final.maxIter, [5, 10, 15, 20]).addGrid(gbtc_final.maxDepth, [5, 10]).build()

cv_gbtc_final = CrossValidator(estimator = gbtc_final, estimatorParamMaps = parameter_grid_final, evaluator = mce_f1, numFolds = 3)
cv_gbtc_model_final = cv_gbtc_final.fit(train_data)
cv_gbtc_model_final.avgMetrics

In [None]:
# Evaluating the results with regard to f1 score and accuracy
test_result_gbtc_final = cv_gbtc_model_final.transform(test_data)

evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction")
evaluator_ROC = BinaryClassificationEvaluator(rawPredictionCol='prediction', metricName='areaUnderROC')

print("f1 score: " + str(evaluator.evaluate(test_result_gbtc_final, {evaluator.metricName : "accuracy"})))
print("accuracy: " + str(evaluator.evaluate(test_result_gbtc_final, {evaluator.metricName : "f1"})))
print("area under ROC: " + str(evaluator_ROC.evaluate(test_result_gbtc_final, {evaluator.metricName : "areaUnderROC"})))