# Sparkify project (add-ons to workspace notebook from aws emr)

## Instantiate notebook

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number, rank, isnan, count, when, col, desc, udf, col, sort_array, asc, avg, lit, substring, from_unixtime
from pyspark.sql.types import Row, DateType, TimestampType
from pyspark.sql.window import Window
from pyspark.sql import functions as func
from pyspark.sql.types import IntegerType
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import pandas as pd

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1551622673720_0005,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
from platform import python_version
print(python_version())

VBox()

2.7.15

In [3]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

## Load dataset

In [4]:
# Read in full sparkify dataset
#event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json" # full dataset
event_data = "s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json" # mini-dataset
df = spark.read.json(event_data)
df.head()

VBox()

Row(artist=u'Martha Tilston', auth=u'Logged In', firstName=u'Colin', gender=u'M', itemInSession=50, lastName=u'Freeman', length=277.89016, level=u'paid', location=u'Bakersfield, CA', method=u'PUT', page=u'NextSong', registration=1538173362000, sessionId=29, song=u'Rockpools', status=200, ts=1538352117000, userAgent=u'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId=u'30')

## Data cleaning and preprocessing

In [5]:
w = Window().partitionBy("userId").orderBy(col("ts").desc())

df_cleaned = (df
              .filter(df.userId!='') # Remove rows without UserIds (typically in auth=Logged Out)
              .withColumn('ts_timestamp', from_unixtime((col('ts')/1000)).cast(TimestampType())) # Transform ts to readable format
              .withColumn('registration_timestamp', from_unixtime((col('registration')/1000)).cast(TimestampType())) # Transform registration to readable format
              .withColumn('overallDaysAsCustomer', ((func.max(df['ts']).over(w)-func.min(df['registration']).over(w))/(1000* (60*60*24)))) # Calculate overall days as a customer
              .withColumn("daysSinceRegistration", (df.ts - df.registration)/(1000* (60*60*24))) # Calculate days since registration
              .withColumn("daysToLastInteraction", (func.max(df['ts']).over(w) - df['ts'])/(1000* (60*60*24))) # Calculate days to last interaction
             )

VBox()

## Churn definition

In [6]:
# Identify df with all userId
df_all_ids = df_cleaned.select('userId').dropDuplicates()

# Create Df with all userIds containing cancellation confirmation
df_cancel_conf_ids = df_cleaned.filter(df_cleaned.page =='Cancellation Confirmation').select('userId').dropDuplicates().withColumn('label', lit(1))

# Left Join von df_all_ids on df_cancellation_confirmation with fillna with False
df_churn = df_all_ids.join(df_cancel_conf_ids,on='userID', how='left').fillna(0)

VBox()

In [7]:
df_churn.groupby('label').count().show()

VBox()

+-----+-----+
|label|count|
+-----+-----+
|    1|   52|
|    0|  173|
+-----+-----+

## Feature Engineering
Features implemented:
* artists
  * f01: number of artists listened to
* gender
  * f02: user gender
* level
  * f03: current user level
* page
  * f04: songs played overall
  * f05: (average number of songs played per day (last 7 days)) / (average number of songs played per day (overall))
  * f06: number of downgrades last 7 days
  * f07: number of upgrades last 7 days
  * f08: number of thumbs up last 7 days
  * f09: number of thumbs down last 7 days
  * f10: number of add friend last 7 days
  * f11: number of add to playlist last 7 days
  * f12: number of roll advert last 7 days
* length
  * f13: average length listened to a song
  * f14: length trend: (average length listened to a song (last 7 days) / average length listened to a song (overall)) * (7 / overall days)
* ts
  * f15: number of days as a customer
* sessionId
  * f16: (average number of sessions per day (last 7 days)) / (average number of sessions per day (overall))

##### f01: number of artists listened to

In [8]:
f01 = df_cleaned.select('userId','artist').dropDuplicates().groupBy('userId').count().selectExpr('userId as userId', "Count as n_artisted_listened")

VBox()

##### f02: Gender

In [9]:
f02 = (df_cleaned
 .select('userId','gender')
 .dropDuplicates()
 .replace(["M", "F"], ["0", "1"], "gender")
)

f02 =  f02.withColumn("gender", f02["gender"].cast(IntegerType()))

VBox()

##### f03: current user level

In [10]:
w = Window().partitionBy("userId").orderBy(col("ts").desc())

f03 = (df_cleaned
      .withColumn("rn", row_number().over(w))
      .where(col("rn") == 1)
      .select("userId", "level")
      .replace(["free", "paid"], ["0", "1"], "level"))

f03 =  f03.withColumn("level", f03["level"].cast(IntegerType()))

VBox()

##### f04: songs played overall

In [11]:
f04 = (df_cleaned
       .select('userId','page')
       .where(df_cleaned.page=='NextSong')
       .groupBy('userId')
       .count()
       .selectExpr('userId as userId', "Count as n_songs_played")
      )

VBox()

##### f05: Average number of songs played per day (last 7 days) / overall

In [12]:
avg_number_of_songs_overall = (df_cleaned
                                  .select('userId','overallDaysAsCustomer', 'page')
                                  .filter(df_cleaned.page=='NextSong')
                                  .groupBy('userId','overallDaysAsCustomer')
                                  .count()
                                  .sort(desc('count'))
                                 )
avg_number_of_songs_overall = (avg_number_of_songs_overall
                                  .withColumn('avg_n_songs_overall', avg_number_of_songs_overall['count']/avg_number_of_songs_overall['overallDaysAsCustomer'])
                                  .drop('count','overallDaysAsCustomer')
                                 )

VBox()

In [13]:
avg_number_of_songs_last7 = (df_cleaned
                                  .select('userId','daysToLastInteraction', 'page')
                                  .filter((df_cleaned.daysToLastInteraction<=7)&(df_cleaned.page=='NextSong'))
                                  .groupBy('userId')
                                  .count()
                                  .sort(desc('count'))
                                 )
avg_number_of_songs_last7 = (avg_number_of_songs_last7
                             .withColumn('avg_n_songs_last7', avg_number_of_songs_last7['count']/(7))
                             .drop('count')
                            )

VBox()

In [14]:
f05 = avg_number_of_songs_overall.join(avg_number_of_songs_last7,on='userId', how='inner')
f05 = (f05
       .withColumn('songRatio', f05['avg_n_songs_last7']/f05['avg_n_songs_overall'])
       .drop('avg_n_songs_last7', 'avg_n_songs_overall')
      )

VBox()

##### f06: number of downgrades last 7 days

In [15]:
def count_events_per_user(df, count_column, count_value, last_x_days, feature_name):
    """
        Counts occurences of one value in one column per user for a given timeframe back from the last event of the user
        Input:
            df (pyspark.sql.dataframe.DataFrame): dataframe to be inspected
            count_column (str): name of the column containing the values to be counted
            count_value: value which occurences should be counted
            last_x_days (float): number of days to be considered before last event of the user
            feature_name (str): name of the feature column in the returned dataframe
        Output:
            pyspark.sql.dataframe.DataFrame: Contains userId and feature column containing the counting results
    """
    fxx = (df
          .select('userId', count_column)
          .filter((df_cleaned.daysToLastInteraction<=last_x_days)&(df_cleaned.page==count_value))
          .groupBy('userId')
          .count()
          .selectExpr("userId as userId", "count as {}".format(feature_name))
          )
    return fxx

VBox()

In [16]:
f06 = count_events_per_user(df_cleaned, 'page', 'Downgrade',7,'n_downgrades_last7')

VBox()

##### f07: number of upgrades last 7 days

In [17]:
f07 = count_events_per_user(df_cleaned, 'page', 'Upgrade',7,'n_upgrades_last7')

VBox()

##### f08: number of thumbs up last 7 days

In [18]:
f08 = count_events_per_user(df_cleaned, 'page', 'Thumbs Up',7,'n_thumbs_up_last7')

VBox()

##### f09: number of thumbs down last 7 days

In [19]:
f09 = count_events_per_user(df_cleaned, 'page', 'Thumbs Down',7,'n_thumbs_down_last7')

VBox()

##### f10: number of add friend last 7 days

In [20]:
f10 = count_events_per_user(df_cleaned, 'page', 'Add Friend',7,'n_add_friend_last7')

VBox()

##### f11: number of add to playlist last 7 days

In [21]:
f11 = count_events_per_user(df_cleaned, 'page', 'Add to Playlist',7,'n_add_playlist_last7')

VBox()

##### f12: number of roll advert last 7 days

In [22]:
f12 = count_events_per_user(df_cleaned, 'page', 'Roll Advert',7,'n_roll_advert_last7')

VBox()

##### f13: average length listened to a song

In [23]:
f13 = (df_cleaned
       .select('userId','length', 'page')
       .filter(df_cleaned.page=='NextSong')
       .groupBy('userId')
       .mean()
       .select(col("userId"), col("avg(length)").alias("average_length"))
      )

VBox()

##### f14: length trend: (average length listened to a song last 7 days / average length all songs) (7 / overall days)

In [24]:
avg_length_overall = f13.select(col("userId"), col("average_length").alias("average_length_overall"))

VBox()

In [25]:
avg_length_last7 = (df_cleaned
       .select('userId','length', 'page','overallDaysAsCustomer')
       .filter((df_cleaned.daysToLastInteraction<=7) & (df_cleaned.page=='NextSong'))
       .groupBy('userId','overallDaysAsCustomer')
       .mean()
       .select(col("userId"), col('overallDaysAsCustomer'), col("avg(length)").alias("average_length_last7"))
      )

VBox()

In [26]:
f14 = avg_length_overall.join(avg_length_last7,on='userId', how='inner')
f14 = (f14
       .withColumn('lengthRatio', (f14['average_length_last7']/f14['average_length_overall'])*(7/f14['overallDaysAsCustomer']))
       .drop('average_length_overall', 'overallDaysAsCustomer', 'average_length_last7')
      )

VBox()

##### f15: number of days as a customer

In [27]:
f15 = (df_cleaned
       .select('userId','overallDaysAsCustomer')
       .dropDuplicates()
      )

VBox()

##### f16: average number of sessions per day past 7 days / average number of sessions per day overall

In [28]:
avg_number_of_sessions_overall = (df_cleaned
                                  .select('userId','overallDaysAsCustomer','sessionId')
                                  .dropDuplicates()
                                  .groupBy('userId','overallDaysAsCustomer')
                                  .count()
                                 )
avg_number_of_sessions_overall = (avg_number_of_sessions_overall
                                  .withColumn('avg_n_sessions_overall', avg_number_of_sessions_overall['count']/avg_number_of_sessions_overall['overallDaysAsCustomer'])
                                  .drop('count','overallDaysAsCustomer')
                                 )

VBox()

In [29]:
avg_number_of_sessions_past7 = (df_cleaned
                                .filter(df_cleaned.daysToLastInteraction<=7)
                                  .select('userId','overallDaysAsCustomer','sessionId')
                                  .dropDuplicates()
                                  .groupBy('userId','overallDaysAsCustomer')
                                  .count()
                                 )
avg_number_of_sessions_past7 = (avg_number_of_sessions_past7
                                  .withColumn('avg_n_sessions_past7', avg_number_of_sessions_past7['count']/7)
                                  .drop('count','overallDaysAsCustomer')
                                 )

VBox()

In [30]:
f16 = avg_number_of_sessions_overall.join(avg_number_of_sessions_past7,on='userId', how='inner')
f16 = (f16
       .withColumn('sessionRatio', (f16['avg_n_sessions_past7']/f16['avg_n_sessions_overall']))
       .drop('avg_n_sessions_past7', 'avg_n_sessions_overall', 'overallDaysAsCustomer')
      )

VBox()

##### Merge all features

In [31]:
feature_dataframes_to_merge = [f01,
                               f02,
                                  f03,
                                  f04,
                                  f05,
                                  f06,
                                  f07,
                                  f08,
                                  f09,
                                  f10,
                                  f11,
                                  f12,
                                  f13,
                                  f14,
                                  f15,
                                  f16
                                 ]

VBox()

In [32]:
df_features = df_churn
for feature_dataframe_to_merge in feature_dataframes_to_merge:
    df_features = df_features.join(feature_dataframe_to_merge,on='userId',how='left')

df_features = df_features.fillna(0)  # Required for counted values like upgrades. If no upgrade exists there is no row in the feature dataframe

VBox()

In [33]:
df_features.limit(5).toPandas().T

VBox()

                              0         1          2         3         4
userId                   100010    200002        125       124        51
label                         0         0          1         0         1
n_artisted_listened         253       340          9      2233      1386
gender                        1         0          0         1         0
level                         0         1          0         1         1
n_songs_played              275       387          8      4079      2111
songRatio              0.693739   3.10408    10.1881   1.04589   1.26265
n_downgrades_last7            0         2          0         4        11
n_upgrades_last7              1         0          0         0         0
n_thumbs_up_last7             2         6          0        14        47
n_thumbs_down_last7           1         0          0         1        12
n_add_friend_last7            0         0          0         2         4
n_add_playlist_last7          1         2          

## Modeling

### Preprocess for machine learning

In [34]:
input_columns = df_features.columns[2:]

VBox()

In [35]:
assembler = VectorAssembler(inputCols=input_columns, outputCol="features_tmp")
scaler = StandardScaler(inputCol="features_tmp", outputCol="features")

pipeline = Pipeline(stages=[assembler, scaler])

VBox()

In [36]:
data_model = pipeline.fit(df_features).transform(df_features)
data_model = data_model.select('userId','features','label')

VBox()

In [37]:
# Split data into training and test set
data_train, data_test = data_model.randomSplit([0.8, 0.2], seed=42)

VBox()

In [38]:
data_train.limit(5).toPandas().T

VBox()

                                                          0  ...                                                  4
userId                                               100010  ...                                                 54
features  [0.4189040110171376, 2.0012710519236623, 0.0, ...  ...  [2.889278653062866, 2.0012710519236623, 2.0844...
label                                                     0  ...                                                  1

[3 rows x 5 columns]

### Build and evaluate models with grid search and cross validation

In [39]:
def evaluate_model(model, data_test, model_name):
    """
        Evaluates model calculating metrics based on train and test data
        Input:
            model (classifier): Trained classifier
            data_test (pyspark.sql.dataframe.DataFrame): Testing data
            model_name (str): Name of the model
        Output:
            pyspark.sql.dataframe.DataFrame: test results
            float: test accuracy
            float: test f1-score
    """
    
    test_result = model.transform(data_test)
    
    # Calculate accuracy and F-1 score
    accuracy_evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
    accuracy = accuracy_evaluator.evaluate(test_result.select(col('label'), col('prediction')))

    f1_score_evaluator = MulticlassClassificationEvaluator(metricName='f1')
    f1_score = f1_score_evaluator.evaluate(test_result.select(col('label'), col('prediction')))

    print('{}\nAccuracy test:\t{:.5f}\nF-1 score:\t{:.5f}\n'.format(model_name, accuracy, f1_score))
    
    return test_result, accuracy, f1_score

VBox()

In [40]:
def perform_crossvalidation(classifier, param_grid, data_train, number_folds = 3, metric_name='f1'):
    """
        Creates and trains a model based on a classifier with grid search and cross validation
        Input:
            classifier (pyspark.ml.classification-object): classifier to be trained
            param_grid (pyspark.ml.tuning.ParamGridBuilder-object): parameter grid to be applied
            data_train (pyspark.sql.dataframe.DataFrame): data to train the models on
            number_folds (int): number of cross validation iterations
            metric_name (str): name of the optimization metric to be applied
        Output:
            trained pyspark.ml.tuning.CrossValidator-object
    """

    pipeline = Pipeline(stages=[classifier])

    crossval = CrossValidator(estimator = pipeline,
                             estimatorParamMaps = param_grid,
                             evaluator = MulticlassClassificationEvaluator(metricName=metric_name),
                             numFolds = number_folds)
    model = crossval.fit(data_train)
    return model

VBox()

#### Logistic regression

In [41]:
# Optimize a logistic model through Grid Search
classifier = LogisticRegression()
param_grid = (ParamGridBuilder()
    .addGrid(classifier.regParam,[0.0, 0.1])
                .build())
model_lr = perform_crossvalidation(classifier, param_grid, data_train)

VBox()

Exception in thread cell_monitor-41:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.6/site-packages/awseditorssparkmonitoringwidget-1.0-py3.6.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 844



In [42]:
model_lr.avgMetrics

VBox()

[0.757560587813493, 0.7615296518407464]

In [43]:
test_results_lr, accuracy_lr, f1_score_lr = evaluate_model(model_lr, data_test, 'LOGISTIC REGRESSION GRID')

VBox()

LOGISTIC REGRESSION GRID
Accuracy test:	0.82353
F-1 score:	0.77972

#### Random forest

In [44]:
classifier = RandomForestClassifier()
param_grid = (ParamGridBuilder()
                .addGrid(classifier.maxDepth,[2, 5, 8])
                            .build())
model_rf = perform_crossvalidation(classifier, param_grid, data_train)

VBox()

Exception in thread cell_monitor-44:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.6/site-packages/awseditorssparkmonitoringwidget-1.0-py3.6.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 154, in cell_monitor
    job_group_filtered_jobs = [job for job in jobs_data if job['jobGroup'] == str(statement_id)]
  File "/opt/conda/lib/python3.6/site-packages/awseditorssparkmonitoringwidget-1.0-py3.6.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 154, in <listcomp>
    job_group_filtered_jobs = [job for job in jobs_data if job['jobGroup'] == str(statement_id)]
KeyError: 'jobGroup'

Exception in thread cell_monitor-43:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/

In [45]:
model_rf.avgMetrics

VBox()

[0.743706107344416, 0.7656457466695343, 0.7765434887174016]

In [46]:
test_results_rf, accuracy_rf, f1_score_rf = evaluate_model(model_rf, data_test, 'RANDOM FOREST GRID')

VBox()

Exception in thread cell_monitor-45:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.6/site-packages/awseditorssparkmonitoringwidget-1.0-py3.6.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 154, in cell_monitor
    job_group_filtered_jobs = [job for job in jobs_data if job['jobGroup'] == str(statement_id)]
  File "/opt/conda/lib/python3.6/site-packages/awseditorssparkmonitoringwidget-1.0-py3.6.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 154, in <listcomp>
    job_group_filtered_jobs = [job for job in jobs_data if job['jobGroup'] == str(statement_id)]
KeyError: 'jobGroup'

Exception in thread cell_monitor-46:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/

RANDOM FOREST GRID
Accuracy test:	0.88235
F-1 score:	0.86695

#### Gradient boosting

In [41]:
classifier = GBTClassifier()
param_grid = (ParamGridBuilder()
                .addGrid(classifier.maxDepth,[2, 5, 8])
                            .build())
model_gb = perform_crossvalidation(classifier, param_grid, data_train)

VBox()

Exception in thread cell_monitor-41:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.6/site-packages/awseditorssparkmonitoringwidget-1.0-py3.6.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 2440



In [43]:
model_gb.avgMetrics

VBox()

[0.7633458631451194, 0.7272825137455322, 0.7374694451128305]

In [42]:
test_results_gb, accuracy_gb, f1_score_gb = evaluate_model(model_gb, data_test, 'GRADIENT BOOSTING GRID')

VBox()

GRADIENT BOOSTING GRID
Accuracy test:	0.82353
F-1 score:	0.81419