In [24]:
# import libraries
import matplotlib.pyplot as plt
from pyspark.sql.window import Window
from pyspark.sql.functions import sum as Fsum
from pyspark.sql import SparkSession

from pyspark.sql.functions import avg, col, min, max, sum, udf, count, countDistinct,
from pyspark.sql.types import IntegerType, FloatType

from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Spark on Sparkify") \
    .getOrCreate()

In [3]:
# read in the small dataset in json format
sparkify_data = 'mini_sparkify_event_data.json'
df = spark.read.json(sparkify_data)

In [4]:
# drop missing values
df = df.dropna(how = 'any', subset = ['registration'])

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

### features from `page`
we have seen that the following page labels are very different in churn/unchurn groups and at the same time there are relatively a large percentage of these labels:
Downgrade, Thumbs Down, Thumbs Up, Home, Add to Playlist, Add Friend, NextSong. We one hot encode these columns and then get the sum as feature in model.

In [5]:
# create new columns to take the pages names
# this is one hot encoding
features_page = ['Downgrade', 'Thumbs Down',' Thumbs Up', 'Home', 
                  'Add to Playlist',' Add Friend', 'NextSong']
for feat in features_page:
    df = df.withColumn(feat.replace(' ',''), (df.page == feat).cast(IntegerType()))

In [6]:
# sum up the counts for every page name
df_page = df.groupBy('userId') \
          .agg(sum('Downgrade'),
               sum('ThumbsDown'),
               sum('ThumbsUp'),
               sum('Home'),
               sum('AddtoPlaylist'),
               sum('AddFriend'),
               sum('NextSong'))

In [7]:
# df_page.head()

### features from `level`
The churn rate in free and paid groups are different by about 8%, we can use this one as our feature.

In [8]:
df = df.withColumn('hasPaid',(df.level == 'paid').cast(IntegerType()))
df_level = df.groupBy('userId') \
             .agg(max('haspaid')) \
             .withColumnRenamed('max(haspaid)','hasPaid')

In [9]:
# df_level.head()

### features from `gender`
The churn rate in free and paid groups are different by about 8%, we can use this one as our feature.

In [10]:
df = df.withColumn('isman',(df.gender == 'M').cast(IntegerType()))
df_gender = df.groupBy('userId') \
              .agg(max('isman')) \
              .withColumnRenamed('max(isman)','isMale')

In [11]:
# df_gender.head()

### features from `registration`
New users tend to churn.

In [12]:
ms2day = udf(lambda x: x/(1000*3600*24))

df_reg = df.groupBy('userId','registration') \
            .agg(max('ts')) \
            .withColumn('sinceRegDays',ms2day(col('max(ts)')-col('registration')).cast(FloatType())) \
            .select('userId','sinceRegDays')

In [13]:
# df_reg.head()

### features from `sessionId`

In [14]:
df_ses = df.groupBy('userId') \
           .agg(countDistinct('sessionId')) \
           .withColumnRenamed('count(DISTINCT sessionId)','numSessions')

In [15]:
# df_ses.head()

### features from `ts`

In [16]:
df_ts = df.groupBy('userId') \
          .agg(max('ts'),min('ts')) \
          .withColumn('activeDays', ms2day(col('max(ts)')-col('min(ts)')).cast(FloatType())) \
          .select('userId','activeDays')

In [17]:
# df_ts.head()

### Preparing for modeling
Some of the above features extracted can be divided by the activeDays column to get action frequency.

In [18]:
# define churn
# just to repeat what was done in the first notebook
user_window = Window.partitionBy('userId')
function = udf(lambda hascancelled : int(hascancelled=='Cancellation Confirmation'), IntegerType())
df = df.withColumn('reachedcancel',function(col('page'))) \
    .withColumn('Churn',Fsum('reachedcancel').over(user_window))

# join all these dataframes on userId
df_ml = df.select('userId','Churn').dropDuplicates()
df_ml = df_ml.join(df_page,on='userId',how='inner') \
             .join(df_level,on='userId',how='inner') \
             .join(df_gender,on='userId',how='inner') \
             .join(df_reg,on='userId',how='inner') \
             .join(df_ses,on='userId',how='inner') \
             .join(df_ts,on='userId',how='inner')

It is nice to try daily number of NextSong, but it is not that meaningful to say number of daily actions if the number is too small(<<1). So I keep the rest as they are.

In [19]:
# create new column DailyNumSongs
df_ml = df_ml.withColumn('DailyNumSongs', col('sum(NextSong)')/col('activeDays'))

In [20]:
# change names of columns
# change churn to label
df_ml = df_ml.withColumnRenamed('sum(Downgrade)','Downgrade') \
             .withColumnRenamed('sum(ThumbsDown)','ThumbsDown') \
             .withColumnRenamed('sum(ThumbsUp)','ThumbsUp') \
             .withColumnRenamed('sum(Home)','Home') \
             .withColumnRenamed('sum(AddtoPlaylist)','AddtoPlaylist') \
             .withColumnRenamed('sum(AddFriend)','AddFriend') \
             .withColumnRenamed('sum(NextSong)','NextSong') \
             .withColumnRenamed('Churn','label')

In [55]:
# df_ml.show(1,vertical=True)

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [25]:
# gender and level can also be made numeric by stringindexers
# and then use a pipeline to put together the indexers and assembler
# but it seems easier to get them ready before this step
# assemble the features
assembler = VectorAssembler(inputCols=['Downgrade', 
                                       'ThumbsDown',
                                       'ThumbsUp',
                                       'Home',
                                       'AddtoPlaylist',
                                       'AddFriend',
                                       'DailyNumSongs', 
                                       'hasPaid',
                                       'isMale',
                                       'sinceRegDays',
                                       'numSessions',
                                       'activeDays'],
                            outputCol='features')
data = assembler.transform(df_ml)

In [26]:
# minmax scale features
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(data)
data = scalerModel.transform(data)

In [27]:
# train test split
train, test = data.randomSplit([0.8, 0.2], seed=42)

### Logistic Regression

In [43]:
# fit model
classifier_lr = LogisticRegression(labelCol="label",
                                featuresCol="scaledFeatures",
                                maxIter=5, 
                                regParam=0.0, 
                                elasticNetParam=0)
classifierModel_lr = classifier_lr.fit(train)

In [44]:
# predict
preds_lr = classifierModel_lr.transform(test)

In [45]:
# evaluate model
def evalModel(preds):
    accEval = MulticlassClassificationEvaluator(metricName='accuracy')
    f1sEval = MulticlassClassificationEvaluator(metricName='f1')
    
    acc = accEval.evaluate(preds.select(col('label'), col('prediction')))
    f1s = f1sEval.evaluate(preds.select(col('label'), col('prediction')))
    
    print('accuracy:', acc)
    print('f1 score:', f1s)
    return

In [46]:
evalModel(preds_lr)

accuracy: 0.7941176470588235
f1 score: 0.7262656475019387


### Random Forest Classifier

In [47]:
# repeat the above steps using random forest classifier
classifier_rf = RandomForestClassifier(labelCol="label",
                                    featuresCol="scaledFeatures",
                                    seed=42)
classifierModel_rf = classifier_rf.fit(train)
preds_rf = classifierModel_rf.transform(test)
evalModel(preds_rf)

accuracy: 0.8823529411764706
f1 score: 0.8669467787114846


### GBT Classifier

In [48]:
# repeat the above steps using gradient boost tree classifier
classifier_gbt = GBTClassifier(labelCol="label",
                           featuresCol="scaledFeatures",
                           seed=42)
classifierModel_gbt = classifier_gbt.fit(train)
preds_gbt = classifierModel_gbt.transform(test)
evalModel(preds_gbt)

accuracy: 0.8529411764705882
f1 score: 0.8558246828143022


### Tuning the model
The randome forest model is the best, so let's tune this one.

In [57]:
RandomForestClassifier??
# MulticlassClassificationEvaluator??

In [52]:
# initiate the model
rf = RandomForestClassifier(labelCol="label",
                            featuresCol="scaledFeatures",
                            seed=42)
# build params
paramGrid = ParamGridBuilder() \
    .addGrid(rf.minInstancesPerNode,[1, 3, 5]) \
    .addGrid(rf.maxDepth,[2, 4, 8]) \
    .build()

# initiate crossvalidator
crossval_rf = CrossValidator(estimator=rf,
                             estimatorParamMaps=paramGrid,
                             evaluator=MulticlassClassificationEvaluator(),
                             numFolds=3)

In [53]:
cvModel_rf = crossval_rf.fit(train)
cvModel_rf.avgMetrics

[0.7421574948767932,
 0.8105860273482082,
 0.8036588602213759,
 0.7421574948767932,
 0.8117395770053698,
 0.8130240381339346,
 0.7421574948767932,
 0.788348089492855,
 0.8060755307071097]

In [56]:
cvModel_rf.bestModel

RandomForestClassificationModel (uid=RandomForestClassifier_419186432dbe) with 20 trees

It is funny that we had actually a better model before the tuning part. Anyway, we now know that the maxDepth=5 is actually the best parameter.

# Conclusion
In this project, I have gone through 6 main steps, every one is very important:
* **data wrangling**  
    In this step, it was important to understand that the missing values are due to users who are not logged in.
* **define the goal**  
    In this step, I define the goal as predicting the churn users.
* **EDA**  
    In this step, I searched for all the promising features that are related to churn by comparing different metrics for churn users and unchurn users.
* **feature engineering**  
    In this step, I extracted the good features to predict churn users.
* **fitting models**  
    In this step, I tried 3 different models, namely Logistic Regression, Random Forest, and Gradient Boost Tree. I found out that Random Forest performs the best.
* **tuning model**  
    In this step, I have tuned the best model, Random Forest, using a parameter grid of minInstancesPerNode and maxDepth.

The challenge of project is that we need to use spark to analyze data, since the dataset is too large that a single computer is not capable of manipulating it. In this project, however, due to very limited time, I have only tried to complete this procedure on a small subset of the full dataset. Hope that I will have chance to run my script on large clusters in the future.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.