# Sparkify EMR Cluster Model Training, Tuning and Evaluation

In [1]:
import datetime
import time

from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.functions import udf
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import desc, asc, sum as Fsum
from pyspark.sql.functions import month, dayofmonth, dayofweek, hour

from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
from pyspark.ml.feature import StringIndexer, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6,application_1593600233643_0007,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Our first Python Spark SQL example") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Load and clean the data

In [3]:
print('Loading the data ...')
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
event_log = spark.read.json(event_data)
print('Data loaded!\n')

print('Cleaning the data ...')

# removes the empty user ID
event_log = event_log.filter(event_log.userId != "")

# Flags if a user cancelled at any point
flag_cancel_event = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0
                        , IntegerType())
event_log = event_log.withColumn('cancelled'
                                 , flag_cancel_event(event_log.page))

# all rows after cancel event set churned to 1 else 0
windowval = Window.partitionBy('userId')\
                    .orderBy(desc('ts'))\
                    .rangeBetween(Window.unboundedPreceding, 0)
event_log = event_log.withColumn('churned', Fsum('cancelled').over(windowval))
print('Data cleaned!\n')


print('Extracting useable features from datetime timestamp ...')
get_timestamp = udf(lambda x:
                        datetime.datetime.fromtimestamp(int(int(x)/1000))
                    , TimestampType())
event_log = event_log.withColumn('ts', get_timestamp(event_log.ts)) \
                    .withColumn('hour', hour('ts')) \
                    .withColumn('day', dayofmonth('ts')) \
                    .withColumn('month', month('ts')) \
                    .withColumn('weekday', dayofweek('ts'))
print('Features extracted and added to table!\n')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Loading the data ...
Data loaded!

Cleaning the data ...
Data cleaned!

Extracting useable features from datetime timestamp ...
Features extracted and added to table!

# Feature Engineering

In [4]:
avg_daily_use = event_log.groupby(['userId','month','day']).count() \
                        .sort('userId','month','day') \
                        .groupby('userId').avg('count') \
                        .withColumnRenamed('avg(count)','avg_daily_songplays')
avg_daily_use.createOrReplaceTempView('avg_daily_use_table')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
event_log.createOrReplaceTempView('event_log_table')
build_user_table = spark.sql('''
SELECT
    distinct e.userId
    , sum(CASE WHEN page="NextSong" THEN 1 ELSE 0 END) as num_songplays
    , sum(CASE WHEN page="Thumbs Up" THEN 1 ELSE 0 END) as num_thumbs_up
    , sum(CASE WHEN page="Thumbs Down" THEN 1 ELSE 0 END) as num_thumbs_down
    ,sum(CASE WHEN page="Add to Playlist" THEN 1 ELSE 0 END) as num_plylst_add
    , sum(CASE WHEN page="Add Friend" THEN 1 ELSE 0 END) as num_frnd_add
    , sum(CASE WHEN page="Save Settings" THEN 1 ELSE 0 END) as num_sav_set
    , sum(CASE WHEN page="Roll Advert" THEN 1 ELSE 0 END) as num_ad_rolls
    , count(distinct sessionId) as num_sessions
    , max(CASE WHEN level="paid" THEN 1 else 0 END) as had_paid
    , max(adu.avg_daily_songplays) as avg_daily_songplays
    , max(churned) as churned
FROM
    event_log_table e
LEFT JOIN avg_daily_use_table adu
    ON e.userId = adu.userId
GROUP BY
    e.userId
''')
build_user_table.show(n=5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------+-------------+---------------+--------------+------------+-----------+------------+------------+--------+-------------------+-------+
| userId|num_songplays|num_thumbs_up|num_thumbs_down|num_plylst_add|num_frnd_add|num_sav_set|num_ad_rolls|num_sessions|had_paid|avg_daily_songplays|churned|
+-------+-------------+-------------+---------------+--------------+------------+-----------+------------+------------+--------+-------------------+-------+
|1000280|         1022|           53|             33|            25|          14|          1|          74|          22|       1| 62.714285714285715|      1|
|1002185|         1778|           92|             14|            49|          25|          3|           1|          17|       1|  99.04761904761905|      0|
|1017805|          250|            7|              4|             5|          13|          1|           6|           3|       1|               80.0|      0|
|1030587|         1472|           66|             16|     

# Model Build

## Vectorise and scale the data

In [6]:
assembler = VectorAssembler(inputCols=['num_songplays','num_thumbs_up',
                                       'num_thumbs_down','num_plylst_add',
                                       'num_frnd_add','num_sav_set',
                                       'num_ad_rolls','num_sessions',
                                       'avg_daily_songplays']
                            , outputCol='NumFeatures')
build_user_table = assembler.transform(build_user_table)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
scaler = Normalizer(inputCol='NumFeatures', outputCol='ScaledNumFeatures')
build_user_table = scaler.transform(build_user_table)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
data = build_user_table.select(col('churned').alias('label')
                               , col('ScaledNumFeatures').alias('features'))
data.head(n=3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(label=1, features=DenseVector([0.9931, 0.0515, 0.0321, 0.0243, 0.0136, 0.001, 0.0719, 0.0214, 0.0609])), Row(label=0, features=DenseVector([0.9966, 0.0516, 0.0078, 0.0275, 0.014, 0.0017, 0.0006, 0.0095, 0.0555])), Row(label=0, features=DenseVector([0.9503, 0.0266, 0.0152, 0.019, 0.0494, 0.0038, 0.0228, 0.0114, 0.3041]))]

## Split the data into a train and test set

In [9]:
train, test = data.randomSplit([0.8,0.2], seed=42)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Logistic Regression

Instantiate the logistic regression model and define the evaluator. Then train and evaluate the data by the F1 score.

In [10]:
lr = LogisticRegression(maxIter=10, regParam=0.0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
model = lr

# fit the dataset
print('Logistic Regression is training...\n')
model = model.fit(train)

# predict
print('Logistic Regression is predicting...\n')
pred_test = model.transform(test)

# metrics
print('Logistic Regression is evaluating...')    
f1_score = evaluator.evaluate(pred_test.select('label','prediction')
                              ,{evaluator.metricName: 'f1'})
print('Test F1-score: {}\n'.format(f1_score))   

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Logistic Regression is training...

Logistic Regression is predicting...

Logistic Regression is evaluating...
Test F1-score: 0.7015055350033608

## Decision Tree

Instantiate the decision tree classifier model and define the evaluator. Then train and evaluate the data by the F1 score.

In [13]:
dt = DecisionTreeClassifier(seed=7)
model = dt

# fit the dataset
print('Decision Tree is training...\n')
model = model.fit(train)

# predict
print('Decision Tree is predicting...\n')
pred_test = model.transform(test)

# metrics
print('Decision Tree is evaluating...')    
f1_score = evaluator.evaluate(pred_test.select('label','prediction')
                              ,{evaluator.metricName: 'f1'})
print('Test F1-score: {}\n'.format(f1_score))   

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Decision Tree is training...

Decision Tree is predicting...

Decision Tree is evaluating...
Test F1-score: 0.7071845030653348

# Tune the Logistic Regression model

- Define the parameter grid we want to optimse on
- Define the cross validator object with the optimising metric as the F1 score
- Train the model with the cross validator object
- Transform the test data using the trained model
- Print the accuracy and F1 score of the model

In [14]:
paramGrid = ParamGridBuilder(). \
            addGrid(lr.elasticNetParam,[0.1,0.5,1]). \
            addGrid(lr.regParam,[0.01,0.05,0.1]). \
            build()

crossval = CrossValidator(estimator=lr,
                         estimatorParamMaps=paramGrid,
                         evaluator=MulticlassClassificationEvaluator(metricName='f1'),
                         numFolds=3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
cv_lr = crossval.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-15:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 1425



In [16]:
cv_lr_results = cv_lr.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
print('Accuracy: {}'.format(evaluator\
                            .evaluate(cv_lr_results.select('label'
                                                           ,'prediction')
                                      , {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator\
                            .evaluate(cv_lr_results.select('label'
                                                           ,'prediction')
                                      , {evaluator.metricName: "f1"})))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy: 0.7741077366437596
F-1 Score:0.6946604184991364

# Tune the Decision Tree Classifier model

- Define the parameter grid we want to optimse on
- Define the cross validator object with the optimising metric as the F1 score
- Train the model with the cross validator object
- Transform the test data using the trained model
- Print the accuracy and F1 score of the model

In [18]:
paramGrid = ParamGridBuilder(). \
            addGrid(dt.impurity, ['entropy','gini']). \
            addGrid(dt.maxDepth, [x for x in range(5,25,5)]). \
            build()

crossval = CrossValidator(estimator=lr,
                 estimatorParamMaps=paramGrid,
                 evaluator=MulticlassClassificationEvaluator(metricName='f1'),
                 numFolds=3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
cv_dt = crossval.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-19:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 2992



In [20]:
cv_dt_results = cv_dt.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
print('Accuracy: {}'.format(evaluator\
                            .evaluate(cv_dt_results.select('label'
                                                           ,'prediction')
                                      , {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator\
                            .evaluate(cv_dt_results.select('label'
                                                           ,'prediction')
                                      , {evaluator.metricName: "f1"})))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy: 0.7714475726003104
F-1 Score:0.7015055350033608