# Sparkify Project Workspace (AWS EMR)
This notebook contains the analysis carried out on the full Sparkify dataset for the capstone project of the Udacity nanodegree. Analysis was done initially on the 'mini' dataset on a desktop PC in a Jupyter PySpark container environment. Analysis of the full dataset is done in this notebook on an AWS EMR  cluster without the ETL/EDA found in the original analysis.

In [1]:
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder.appName("Sparkify Mini").getOrCreate()
sc = spark.sparkContext

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1602345845742_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
from pyspark.ml.feature import RegexTokenizer, CountVectorizer,IDF,StringIndexer
from pyspark.sql.functions import udf,concat,col,lit,when,count,min as f_min,max as f_max
from pyspark.sql.types import IntegerType,StringType,FloatType,BooleanType
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler,VectorIndexer
from pyspark.ml.classification import RandomForestClassifier,LogisticRegression,DecisionTreeClassifier,GBTClassifier,LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.window import Window
import re

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [3]:
# Set S3 path (this is being run from an AWS EMR cluster notebook)
s3_path='s3n://udacity-dsnd/sparkify/sparkify_event_data.json'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Load to dataframe and persist
df = spark.read.json(s3_path)
df.persist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

# Clean data
These cleaning steps were determined on the mini data set and then repeated here (the working assumption being that the full dataset can be cleaned in same fashion).

In [5]:
# This is the cleaning step determined 
df_clean=df.dropna(how='any',subset=['sessionId','userId','registration']).distinct()
df_clean.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

25480720

### Define Churn
This repeats the Churn1/Churn2 labelling done on the small dataset.

In [6]:
# Do this in PySpark because we will need to label Churn when working on large dataset...
from pyspark.sql.types import IntegerType,StringType,FloatType,BooleanType
# Use 'Cancellation confirmation' events to apply the Churn1 label
udf_churn1=udf(lambda x:True if x =='Cancellation Confirmation' else False,BooleanType())
df_clean=df_clean.withColumn('Churn1',udf_churn1(df.page))

# Use 'Downgrade' events to apply the Churn2 label
udf_churn2=udf(lambda x:True if x =='Downgrade' else False,BooleanType())
df_clean=df_clean.withColumn('Churn2',udf_churn2(df.page))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Get list of churned users
churned_users=[x.userId for x in df_clean.where(df_clean.Churn1==True).select('userId').distinct().collect()]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Number of churned users
len(churned_users)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5003

In [9]:
# Get list of downgrade users
downgrade_users=[x.userId for x in df_clean.where(df_clean.Churn2==True).select('userId').distinct().collect()]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Number of downgrade users
len(downgrade_users)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

15209

In [11]:
# Label each row with users who have cancelled/downgraded
df_clean2=df_clean.withColumn('Churn1',when(col('userId').isin(churned_users),True).otherwise(False))
df_clean2=df_clean2.withColumn('Churn2',when(col('userId').isin(downgrade_users),True).otherwise(False))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [12]:
# Create a US 'state' column
udf_getstate=udf(lambda x:x.split(',')[1].strip(),StringType())
df_clean2=df_clean2.withColumn('location_state',udf_getstate(df.location))

# Generate a 'platform' column
def platform_filter(userAgentString):
    platform_list=['Windows','Linux','iPad','iPhone','Macintosh']
    
    for platform in platform_list:
        if platform in userAgentString:
            return platform
    
    return None

udf_getplatform=udf(platform_filter)
df_clean2=df_clean2.withColumn('platform',udf_getplatform(df.userAgent))

# Generate aggregates...

# Select state,platform,gender,Churn1 from df
df_features=df_clean2.dropDuplicates(subset=['userId']).select(['userId','Churn1','Churn2','location_state','platform','gender'])

# Average songs per session by churn status
ave_songs_per_session=df_clean2.filter(df.page=='NextSong').groupBy(['userId','sessionId']).agg({"*": "count"}).groupBy(['userId']).agg({"count(1)": "avg"}).withColumnRenamed("avg(count(1))","ave_songs_per_session")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Average session length in time by churn status
ave_session_length=df_clean2.groupBy(['userId','sessionId']).agg((f_max('ts')-f_min('ts'))/1000).groupBy(['userId']).agg({"((max(ts) - min(ts)) / 1000)":"avg"}).withColumnRenamed("avg(((max(ts) - min(ts)) / 1000))","ave_session_length")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# Errors (404 HTTP status) per user per session
error_rate_per_user=df_clean2.filter(df_clean2.status=='404').groupBy(['userId','sessionId']).agg({"*":"count"}).groupBy(['userId']).agg({"count(1)":"avg"}).withColumnRenamed("avg(count(1))","ave_errors_per_session")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Some users have zero error rate (the server never returned a 404 for them) and are not represented in error_rate_per_user...
error_rate_per_user.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

11272

In [16]:
# Join aggregates to features table
df_features=df_features.join(ave_songs_per_session,on='userId').join(ave_session_length,on='userId').join(error_rate_per_user,on='userId',how='outer')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Fill NA corresponding to 0 error rates for some users left over from join on error_rate_per_user
df_features=df_features.na.fill(0.,subset=["ave_errors_per_session"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
df_features.where(df_features.Churn1==True).select('userId').distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5002

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [19]:
# Encode label
udf_churnencode=udf(lambda x:1 if x else 0)
df_features = df_features.withColumn('label',udf_churnencode(df_features.Churn1))
df_features = StringIndexer(inputCol="label", outputCol="label_idx").fit(df_features).transform(df_features)
# Encode categorical feature columns
df_features = StringIndexer(inputCol="location_state", outputCol="location_state_idx").fit(df_features).transform(df_features)
df_features = StringIndexer(inputCol="platform", outputCol="platform_idx").fit(df_features).transform(df_features)     
df_features = StringIndexer(inputCol="gender", outputCol="gender_idx").fit(df_features).transform(df_features)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
#Encode churn2
udf_churn2encode=udf(lambda x:1 if x else 0)
df_features = df_features.withColumn('Churn2_int',udf_churn2encode(df_features.Churn2))
df_features = StringIndexer(inputCol="Churn2_int", outputCol="Churn2_idx").fit(df_features).transform(df_features)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
df_features=df_features.drop('catFeatures','features','indexedFeatures','numFeatures','normFeatures')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# Vector index the categorical features columns
categorical_features = ["platform_idx","gender_idx","Churn2_idx"]

assembler = VectorAssembler(inputCols=categorical_features, outputCol='catFeatures')
features_vec = assembler.transform(df_features)

indexer = VectorIndexer(inputCol="catFeatures", outputCol="indexedFeatures")
indexerModel= indexer.fit(features_vec)
df_features = indexerModel.transform(features_vec)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Scale numerical features
numerical_features = ['ave_songs_per_session','ave_session_length','ave_errors_per_session']

assembler = VectorAssembler(inputCols=numerical_features, outputCol='numfeatures')
features_vec = assembler.transform(df_features)

normalizer = Normalizer(inputCol="numfeatures", outputCol="normFeatures", p=1.0)
df_features = normalizer.transform(features_vec)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Merge numerical and categorical feature vectors
assembler = VectorAssembler(inputCols=['normFeatures','indexedFeatures'], outputCol='features')
df_features = assembler.transform(df_features)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
(training_data, test_data) = df_features.randomSplit([0.8,0.2])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
rf = RandomForestClassifier(labelCol='label_idx',featuresCol='features')
rf_paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [3,4,5]).addGrid(rf.maxDepth, [3,5]).build()

lr = LogisticRegression(labelCol='label_idx',featuresCol='features')
lr_paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.01,0.1,0.3]).addGrid(lr.maxIter, [20]).build()

dt = DecisionTreeClassifier(labelCol='label_idx',featuresCol='features',seed=5)
dt_paramGrid = ParamGridBuilder().addGrid(dt.maxBins, [32]).addGrid(dt.maxMemoryInMB,[256]).addGrid(dt.maxDepth,[3,4,5]).build()

gbt = GBTClassifier(labelCol='label_idx',featuresCol='features',maxIter = 10,seed = 38)
gbt_paramGrid = ParamGridBuilder().addGrid(gbt.maxDepth,[3,4,5]).build()

svm = LinearSVC(labelCol='label_idx',featuresCol='features',maxIter = 10)
svm_paramGrid = ParamGridBuilder().addGrid(svm.regParam,[0.01, 0.05, 0.5]).build()

clf_and_pg=[
    (rf,rf_paramGrid),
    (lr,lr_paramGrid),
    (dt,dt_paramGrid),
    (gbt,gbt_paramGrid),
    (svm,svm_paramGrid),
]


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
def train_evaluate_model(clf,paramGrid):
    print('** '+clf.__class__.__name__+" **")

    crossval = CrossValidator(estimator=clf,estimatorParamMaps=paramGrid,evaluator=MulticlassClassificationEvaluator(labelCol='label_idx',predictionCol='prediction',metricName='f1'),numFolds=3)

    # Run cross-validation, and choose the best set of parameters.
    model = crossval.fit(training_data)

    predictions = model.transform(test_data)

    evaluator = MulticlassClassificationEvaluator(labelCol='label_idx',predictionCol='prediction',metricName='accuracy')
    accuracy = evaluator.evaluate(predictions)
    print('Accuracy ', accuracy)
    evaluator_alt = MulticlassClassificationEvaluator(labelCol='label_idx',predictionCol='prediction',metricName='f1')
    f1 = evaluator_alt.evaluate(predictions)
    print('f1-score ', f1)
    return model,predictions

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
rf_model,rf_preds=train_evaluate_model(rf,rf_paramGrid)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

** RandomForestClassifier **
Accuracy  0.7736376339077783
f1-score  0.6751375174204469

In [29]:
lr_model,lr_preds=train_evaluate_model(lr,lr_paramGrid)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-29:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 3239



** LogisticRegression **
Accuracy  0.7734047508150909
f1-score  0.6745836316367476

In [30]:
dt_model,dt_preds=train_evaluate_model(dt,dt_paramGrid)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-30:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 6490



** DecisionTreeClassifier **
Accuracy  0.7724732184443409
f1-score  0.6741252288670756

In [31]:
gbt_model,gbt_preds=train_evaluate_model(gbt,gbt_paramGrid)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-31:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 7482



** GBTClassifier **
Accuracy  0.7724732184443409
f1-score  0.6745620452222737

In [32]:
svm_model,svm_preds=train_evaluate_model(svm,svm_paramGrid)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-32:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 13685



** LinearSVC **
Accuracy  0.7734047508150909
f1-score  0.6745836316367476

In [33]:
print(rf_model.bestModel.explainParams())
feature_importances=list(rf_model.bestModel.featureImportances.toArray())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bootstrap: Whether bootstrap samples are used when building trees. (default: True)
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the featur

In [35]:
feature_names=['ave_songs_per_session','ave_session_length','ave_errors_per_session',"platform_idx","gender_idx","Churn2_idx"]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
for fn,fi in zip(feature_names,feature_importances):
    print(fn,fi)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ave_songs_per_session 0.030989477582098978
ave_session_length 0.11121268380475458
ave_errors_per_session 0.3683195352568954
platform_idx 0.046437023166469714
gender_idx 0.009456187452293789
Churn2_idx 0.4335850927374876