# Sparkify Full dataset run

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql.window import Window

from pyspark.ml.feature import CountVectorizer, IDF, CountVectorizerModel
from pyspark.ml.feature import OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.pipeline import PipelineModel

from datetime import datetime
import numpy as np
from itertools import chain
from typing import Dict
import pandas as pd

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1632506132459_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# rimestamp coefficient
TS_COEF = 1000*60*60*24

# today date
TODAY = str(datetime.today().date())

# S3 storage path
my_storage = 's3://sparkify-saved-models/'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Load and Clean Dataset

In [4]:
# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

# Feature Engineering

### Compile the modelling dataset
1. Exclude records with empty *userId*.
2. Add label: 1 = Churn, 0 = Not churn. Condition: `page='Cancellation Confirmation'`
3. Remove records of `page='Cancellation Confirmation'` and `page='Cancel'`.
4. Sort dataframe by `userId` and `ts`
5. Aggregate features at user level:
    * create list of songs
    * create list of artists
    * list of page events (Cancellation Confirmation and Cancel events preliminary filtered out to remove the leak)
    * session frequency
    * average number of songs per session
    * binary feature: Male gender = 1/0
    * binary feature: paid acoount = 1/0
    * lifetime (days): time difference between last activity and registration date
    * derive OS/device features from agent values
6. Apply TF-IDF transformation to lists: songs, artists, page events. For songs and artists we keep only top-100 values, page events are all included (since only 22 values in total).  

**Step 1**: Aggregate user-level properties

In [5]:
def preprocess_data(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    """
    Aggregate App data at user level and collects
    dataframe for further feature engineering steps
    =================
    Args:
        df (pyspark Dataframe) : data extraction from Sparkify
        
    Return:
        preprocessed pyspark dataframe
    """
    w = Window.partitionBy(df.userId).orderBy(df.ts)
    w_uid = Window.partitionBy(df.userId)

    preprocessed_df = (df
                       .filter(F.col('userId')!='') #filter out guests
                       .withColumn('cancelled', (F.col('page')=='Cancellation Confirmation').cast(IntegerType())) 
                       .withColumn('churn', F.max('cancelled').over(w_uid)) # define churn label
                       .withColumn('current_level', F.last('level').over(w)) # sort levels of subscription by date
                       .withColumn('last_userAgent', F.last('userAgent').over(w)) # sort agents by date
                       .filter(~F.col('page').isin(['Cancellation Confirmation',
                                                    'Cancel'])) #remove cancellation page events from dataset
                       .groupby('userId') # aggregate features at user level
                       .agg(F.collect_list('artist').alias('artist_list'), # combine into list all artist
                            F.collect_list('song').alias('song_list'), # combine into list all songs
                            F.collect_list('page').alias('page_list'), # combine into list all page events
                            F.countDistinct('sessionId').alias('session_count'), # calculate total number of sessions
                            F.count('song').alias('song_count'), # calculate total number of songs
                            F.first('gender').alias('gender'), # gender data
                            F.last('current_level').alias('current_level'), # take last level value
                            F.max('churn').alias('churn'), 
                            F.min('ts').alias('min_ts'), # start timestamp 
                            F.max('ts').alias('max_ts'), # end timestamp
                            F.last('last_userAgent').alias('last_userAgent'), # recent agent
                            F.min('registration').alias('registration') # registration date
                           )
                       # frequency of sessions
                       .withColumn('session_freq', F.col('session_count')/((F.col('max_ts')-F.col('min_ts'))/TS_COEF))
                       # avg number of songs per session
                       .withColumn('song_per_session', F.col('song_count')/F.col('session_count'))
                       # binary feature: Male = 1/0
                       .withColumn('gender_Male', (F.col('gender')=='M').cast(IntegerType()))
                       # binary feature: paid = 1/0
                       .withColumn('is_paid', (F.col('current_level')=='paid').cast(IntegerType()))
                       # lifetime
                       .withColumn('lifetime', (F.col('max_ts')-F.col('registration'))/TS_COEF)
                       # extract device/OS pointers from agent
                       .withColumn('agent_Windows', F.col('last_userAgent').contains('Windows').cast(IntegerType()))
                       .withColumn('agent_Mac', F.col('last_userAgent').contains('Mac').cast(IntegerType()))
                       .withColumn('agent_iPhone', F.col('last_userAgent').contains('iPhone').cast(IntegerType()))
                       .withColumn('agent_iPad', F.col('last_userAgent').contains('iPad').cast(IntegerType()))
                       .withColumn('agent_Linux', F.col('last_userAgent').contains('Linix').cast(IntegerType()))
                      ).cache()
    
    return preprocessed_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Number of unique users in dataset
preprocessed_df = preprocess_data(df)
preprocessed_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22278

In [7]:
# Check target balance
preprocessed_df.groupby('churn').count().show()

# churn <---> 22.5%

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+
|churn|count|
+-----+-----+
|    1| 5003|
|    0|17275|
+-----+-----+

**Step 2**: Prepare transformers to collect feature vector

Used features:
* Apply TF-IDF to artist list, song list and page list. We limit vocabSize to 100 elements
* Beside TF-IDF generated features keep session frequency, avg number of songs per session, lifetime, gender, paid, agent based features

In [8]:
def tf_idf_transformer(list_name: str,
                       vocabSize: int=100):
    """
    Combines TF and IDF pyspark transformers
    ------------
    
    Args:
        list_name (string) : prefix of the feature with work list in the format
            prefix_list
        vocabSize (int)    : number of top-output words to keep
    
    Returns:
        tf transformer, idf transformer
    """
    tf = CountVectorizer(inputCol=f"{list_name}_list", outputCol=f"TF_{list_name}", vocabSize=vocabSize)
    tf_idf = IDF(inputCol=f"TF_{list_name}", outputCol=f"TFIDF_{list_name}")
    return tf, tf_idf


artist_tf, artist_tf_idf = tf_idf_transformer('artist')
song_tf, song_tf_idf = tf_idf_transformer('song')
page_tf, page_tf_idf = tf_idf_transformer('page')

assembler = VectorAssembler(inputCols=["TFIDF_artist", "TFIDF_song", "TFIDF_page",
                                       "session_freq", "song_per_session", 
                                       "lifetime", "gender_Male", 
                                       "is_paid", "agent_Windows",
                                       "agent_Mac", "agent_iPhone", "agent_iPad", 
                                       "agent_Linux"], 
                            outputCol="features", 
                            handleInvalid="skip")


feature_pipeline = Pipeline(stages=[artist_tf, artist_tf_idf, 
                                   song_tf, song_tf_idf,
                                   page_tf, page_tf_idf,
                                   assembler
                                   ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# test the pipeline
test = feature_pipeline.fit(preprocessed_df)
test_df = test.transform(preprocessed_df)
test_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22261

In [10]:
# extract vocabularies for future explanations
stages = test.stages
vectorizers = [s for s in stages if isinstance(s, CountVectorizerModel)]
vocab_dict = {}
vocab_dict['artist'] = vectorizers[0].vocabulary
vocab_dict['song'] = vectorizers[1].vocabulary
vocab_dict['page'] = vectorizers[2].vocabulary


# extract feature names after vector assembler
attrs = sorted(
    (attr["idx"], attr["name"]) for attr in (chain(*test_df
        .schema["features"]
        .metadata["ml_attr"]["attrs"].values())))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Modeling
We split the full dataset into train (70%) and test (30%). During cross-validation process train data is additionally split into train and validation subsets. Test data is used only to check the model (nexer seen during training).

We try 2 models:
* Random Forest Classifier
* Gradient Boosted Tree Classifier

Note: since we use tree-based models, we don't don't need to scale numerical features.
Our problem is imbalanced: 23% of positive cases (churn) and 67% of negative (stayed). Thus, we use F1-score to tune hyperparameters and check final quality of the model.

In [11]:
(train_data, test_data) = preprocessed_df.randomSplit([0.7, 0.3], seed=10)

# cache dataframes
train_data = train_data.cache()
test_data = test_data.cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
def score_the_model(test_data: pyspark.sql.DataFrame, 
                    model: Pipeline, 
                    metric_name: str='accuracy'):
    """
    Calculate model score by metric given in metric_name
    --------------------
    Args:
        test_data (pyspark DataFrame): dataframe with test samples
        model (Pipeline)             : pretrained model
        metric_name (string)         : one of the MulticlassClassificationEvaluator metrics
    Return:
        None
    """
    # Make predictions
    predictions = model.transform(test_data)

    # Set up evaluator and compute score
    evaluator = MulticlassClassificationEvaluator(
        labelCol="churn", 
        predictionCol="prediction", 
        metricName=metric_name)
    score = evaluator.evaluate(predictions)
    print("Score = ", score)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
def rename(x: str,
           vocab_dict: Dict) -> str:
    """
    Rename raw attribute names according to vocabularies
    ----------------
    Args:
        x (string) : original name
        vocab_dict (Dict) : dictionary containing all vocabularies
    Return:
        string : new name
    """
    if 'TFIDF' in x:
        components = x.split('_')
        new_components = components[:-1]
        new_components.append(vocab_dict[components[1]][int(components[-1])])
        new_x = '_'.join(new_components)
    else:
        new_x = x
    return new_x

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Random Forest Classifier

In [27]:
# Tune model
rf = RandomForestClassifier(labelCol="churn", featuresCol="features", 
                            seed = 10)
rf_pipeline = Pipeline(stages=[feature_pipeline, rf])

# set parameters grid
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [5, 7])
            .addGrid(rf.numTrees, [50, 100])
            .build()
            )

# choose evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="churn", 
                                               predictionCol="prediction", 
                                               metricName="f1")

# define cross-validator
crossval = CrossValidator(estimator=rf_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,
                          seed=10)

# run cross-validation
cvModel = crossval.fit(train_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-27:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 2797



In [28]:
# check best combination of parameters
cvModel.getEstimatorParamMaps()[ np.argmax(cvModel.avgMetrics) ]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{Param(parent='RandomForestClassifier_454cc2d78dab', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 7, Param(parent='RandomForestClassifier_454cc2d78dab', name='numTrees', doc='Number of trees to train (>= 1).'): 50}

In [29]:
# let's test it
score_the_model(test_data, cvModel, metric_name='f1')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Score =  0.7805696127516474

In [30]:
# save model
cvModel.bestModel.write().overwrite().save(my_storage + "/saved_models/rf_model")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
# load model
persistedModel = PipelineModel.load(my_storage + "/saved_models/rf_model")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
feature_importance_tab = pd.DataFrame([(name, persistedModel.stages[-1].featureImportances[idx])
                                       for idx, name in attrs
                                       if persistedModel.stages[-1].featureImportances[idx]],
                                      columns=['feature_name_raw', 'importance'])
feature_importance_tab['feature_name'] = feature_importance_tab['feature_name_raw'].apply(lambda x: rename(x, vocab_dict))
feature_importance_tab.sort_values(by='importance', ascending=False)[:20]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

     feature_name_raw  importance                 feature_name
217      session_freq    0.320661                 session_freq
219          lifetime    0.250284                     lifetime
202      TFIDF_page_2    0.022534         TFIDF_page_Thumbs Up
204      TFIDF_page_5    0.021398        TFIDF_page_Add Friend
208      TFIDF_page_9    0.017630         TFIDF_page_Downgrade
218  song_per_session    0.013578             song_per_session
206      TFIDF_page_7    0.011557            TFIDF_page_Logout
203      TFIDF_page_3    0.011242   TFIDF_page_Add to Playlist
200      TFIDF_page_0    0.011202          TFIDF_page_NextSong
201      TFIDF_page_1    0.010849              TFIDF_page_Home
221           is_paid    0.009743                      is_paid
207      TFIDF_page_8    0.009247       TFIDF_page_Thumbs Down
205      TFIDF_page_6    0.008412             TFIDF_page_Login
3      TFIDF_artist_3    0.006292   TFIDF_artist_Dwight Yoakam
6      TFIDF_artist_6    0.005877  TFIDF_artist_The Bla

In [33]:
feature_importance_tab.to_csv(my_storage+'rf_feature_importance.csv', index=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Gradient Boosted Tree classifier

In [None]:
# Tune model
gbt = GBTClassifier(labelCol="churn", featuresCol="features", seed = 10)
gbt_pipeline = Pipeline(stages=[feature_pipeline, gbt])

# set parameters grid
paramGrid = (ParamGridBuilder()
            .addGrid(gbt.maxDepth, [3, 5])
            .addGrid(gbt.maxIter, [5, 10])
            .build()
            )

# choose evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="churn", 
                                               predictionCol="prediction", 
                                               metricName="f1")

# define cross-validator
crossval = CrossValidator(estimator=gbt_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,
                          seed=10)

# run cross-validation
cvModel = crossval.fit(train_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-18:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 1597



In [None]:
# check best combination of parameters
cvModel.getEstimatorParamMaps()[ np.argmax(cvModel.avgMetrics) ]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{Param(parent='GBTClassifier_d2bfce4b0817', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5, Param(parent='GBTClassifier_d2bfce4b0817', name='maxIter', doc='max number of iterations (>= 0).'): 10}

In [None]:
# let's test it
score_the_model(test_data, cvModel, metric_name='f1')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Score =  0.8593792207860763

In [None]:
# save model
cvModel.bestModel.write().overwrite().save(my_storage + "/saved_models/gbt_model")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# load model
persistedModel = PipelineModel.load(my_storage + "/saved_models/gbt_model")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
feature_importance_tab = pd.DataFrame([(name, persistedModel.stages[-1].featureImportances[idx])
                                       for idx, name in attrs
                                       if persistedModel.stages[-1].featureImportances[idx]],
                                      columns=['feature_name_raw', 'importance'])
feature_importance_tab['feature_name'] = feature_importance_tab['feature_name_raw'].apply(lambda x: rename(x, vocab_dict))
feature_importance_tab.sort_values(by='importance', ascending=False)[:20]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

    feature_name_raw  importance                         feature_name
76      session_freq    0.217340                         session_freq
64      TFIDF_page_2    0.106327                 TFIDF_page_Thumbs Up
66      TFIDF_page_5    0.092529                TFIDF_page_Add Friend
78          lifetime    0.088635                             lifetime
77  song_per_session    0.086767                     song_per_session
63      TFIDF_page_1    0.070272                      TFIDF_page_Home
70      TFIDF_page_9    0.050510                 TFIDF_page_Downgrade
68      TFIDF_page_7    0.026754                    TFIDF_page_Logout
69      TFIDF_page_8    0.025402               TFIDF_page_Thumbs Down
74     TFIDF_page_13    0.022753                   TFIDF_page_Upgrade
79           is_paid    0.017192                              is_paid
1     TFIDF_artist_1    0.013103                TFIDF_artist_Coldplay
40     TFIDF_song_13    0.011670          TFIDF_song_Somebody To Love
9    TFIDF_artist_13

In [26]:
feature_importance_tab.to_csv(my_storage+'gbt_feature_importance.csv', index=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Final check for pretrained models

In [34]:
# dictionary with paths
path_dict = {'rf': my_storage + "/saved_models/rf_model",
             'gbt': my_storage + "/saved_models/gbt_model"}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
# GBT F1-score
pretrainedModel = PipelineModel.load(path_dict['gbt'])
score_the_model(test_data, pretrainedModel, metric_name='f1')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Score =  0.8593792207860763

In [38]:
# GBT Recall
pretrainedModel = PipelineModel.load(path_dict['gbt'])
score_the_model(test_data, pretrainedModel, metric_name='weightedRecall')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Score =  0.8686732554657084

In [36]:
# RF F1-score
pretrainedModel = PipelineModel.load(path_dict['rf'])
score_the_model(test_data, pretrainedModel, metric_name='f1')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Score =  0.7805696127516474

In [39]:
# RF Recall
pretrainedModel = PipelineModel.load(path_dict['rf'])
score_the_model(test_data, pretrainedModel, metric_name='weightedRecall')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Score =  0.8250973345312967