# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [2]:
# import libraries
import datetime
import seaborn as sns
import time
import matplotlib.pyplot as plt
import pandas as pd
import pyspark.sql.functions as F
import os

from lightgbm import LGBMClassifier
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler, OneHotEncoderEstimator, StringIndexer, VectorAssembler # PCA, IDF, 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col, udf
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import IntegerType, StringType, TimestampType

In [2]:
# create a Spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Sparkify") \
    .getOrCreate()

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [162]:
events = spark.read.json("data/mini_sparkify_event_data.json")

In [163]:
events.show()

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+---------------+-------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|           page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+---------------+-------------+---------+--------------------+------+-------------+--------------------+------+
|      Martha Tilston|Logged In|    Colin|     M|           50| Freeman|277.89016| paid|     Bakersfield, CA|   PUT|       NextSong|1538173362000|       29|           Rockpools|   200|1538352117000|Mozilla/5.0 (Wind...|    30|
|    Five Iron Frenzy|Logged In|    Micah|     M|           79|    Long|236.09424| free|Bost

The userId contains values with an empty string. These entries need to be removed.

In [11]:
# events where a userId is an empty string are not valid, remove these
valid_events = events.where(col("userId") != "")
print("Number of total events: {}; number of valid events {}".format(events.count(), valid_events.count()))
print("Number of users: {}".format(valid_events.select("userId").distinct().count()))

Number of total events: 286500; number of valid events 278154
Number of users: 225


In [37]:
# registration and ts can both be converted to timestamps
get_date = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0), TimestampType()) # udf to convert to timestamp/date

In [38]:
valid_events = valid_events.withColumn("log_date", get_date(col("ts"))) # date when the log entry was done

In [39]:
# add a column "churn" to the dataframe indicating that a cancellation was confirmed
find_churn = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())

In [40]:
valid_events = valid_events.withColumn("churn", find_churn(col("page")))

In [41]:
user_length = valid_events.groupBy("userId").agg({"length": "mean"}) \
    .withColumnRenamed("avg(length)", "avg_length")

In [42]:
# some values can be set into relation to a certain time period, e.g. the duration the user is active
# hence we need to create a df with all users and their active time period
# first find the first and last log entry for each user and how much log entries exist per user (all actions)
time_df = valid_events.groupBy(["userId"]) \
    .agg(F.sum("churn").alias("churned"), F.min("log_date").alias("first_log"),
         F.max("log_date").alias("last_log"), F.count("page").alias("log_counts"), F.max("ts").alias("last_ts"))

In [43]:
def get_time_difference(date_1, date_2):
    """TODO"""
    # difference between the dates
    delta = date_2 - date_1
    
    # minimum difference is one 1
    if delta.days == 0:
        return 1
    else:
        return delta.days

# create a udf for this function
get_time_difference_udf = udf(get_time_difference, IntegerType())

time_df = time_df.withColumn("duration", get_time_difference_udf(col("first_log"), col("last_log"))) \
            .drop("first_log", "last_log").withColumnRenamed("churned", "label")

In [45]:
# create a dummy dataframe where each action (About, Thumbs Up, ...) from page is a new column with the number
# how often this action appeared in the data for each user
dummy_df = valid_events.select("userId", "page").groupBy("userId").pivot("page") \
    .count().drop("Cancel", "Cancellation Confirmation")
# fill null values
dummy_df = dummy_df.na.fill(0)

In [46]:
user_level = valid_events.orderBy("log_date", ascending=False).groupBy("userId").agg(F.first("level").alias('valid_level'))

In [47]:
user_gender = valid_events.select(["userId", "gender"]).distinct()

In [None]:
# calculate the total amount of days the user listened to music
songs_per_date = valid_events.withColumn("date", F.to_date(col("log_date"))).where(col("page") == "NextSong") \
    .groupBy(["userId", "date"]).agg(F.lit(1).alias("played_music"))
songs_per_day = songs_per_date.groupBy("userId").agg(F.sum("played_music").alias("music_days"))

In [127]:
# join user_df (time_df, dummy_df) with user_level and gender_level and user_length
df = time_df.join(dummy_df, on="userId").join(user_level, on="userId") \
    .join(user_gender, on="userId").join(user_length, on="userId").join(songs_per_day, on="userId")

In [121]:
# divide the actions by the amount of logs or the overall duration of their registration
def divide_columns_by(df, columns, value, appendix):
    """TODO"""
    for name in columns:
        new_name = name+"_"+appendix
        df = df.withColumn(new_name, col(name) / col(value))
    return df

In [108]:
cols_to_divide = ['music_days', 'About', 'Add Friend', 'Add to Playlist', 'Downgrade', 'Error', 'Help', 'Home',
               'Logout', 'NextSong', 'Roll Advert', 'Save Settings', 'Settings', 'Submit Downgrade',
               'Submit Upgrade', 'Thumbs Down', 'Thumbs Up', 'Upgrade']

In [128]:
df_per = divide_columns_by(df, cols_to_divide, "duration", "per_day")
df_model = divide_columns_by(df_per, cols_to_divide, "log_counts", "per_log")

In [129]:
df_model.printSchema()

root
 |-- userId: string (nullable = true)
 |-- label: long (nullable = true)
 |-- log_counts: long (nullable = false)
 |-- last_ts: long (nullable = true)
 |-- duration: integer (nullable = true)
 |-- About: long (nullable = true)
 |-- Add Friend: long (nullable = true)
 |-- Add to Playlist: long (nullable = true)
 |-- Downgrade: long (nullable = true)
 |-- Error: long (nullable = true)
 |-- Help: long (nullable = true)
 |-- Home: long (nullable = true)
 |-- Logout: long (nullable = true)
 |-- NextSong: long (nullable = true)
 |-- Roll Advert: long (nullable = true)
 |-- Save Settings: long (nullable = true)
 |-- Settings: long (nullable = true)
 |-- Submit Downgrade: long (nullable = true)
 |-- Submit Upgrade: long (nullable = true)
 |-- Thumbs Down: long (nullable = true)
 |-- Thumbs Up: long (nullable = true)
 |-- Upgrade: long (nullable = true)
 |-- valid_level: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- avg_length: double (nullable = true)
 |-- music_days

In [None]:
# baseline data
numerical_baseline = ["About", "Add Friend", "Add to Playlist", "Downgrade", "Error",
                      "Help", "Home", "Login", "Logout", "NextSong", "Register", "Roll Advert", "Save Settings",
                      "Settings", "Submit Downgrade", "Submit Registration", "Submit Upgrade", "Thumbs Down",
                      "Thumbs Up", "Upgrade"]
columns_baseline = ["gender", "valid_level"] + numerical_baseline

df_baseline = df_model.select(*columns_baseline)

## Selecting Numerical Features

In [31]:
# load feature importance from csv
num_feature_df = pd.read_csv("data/feature_selection_df.csv").drop("Unnamed: 0", axis=1)
num_feature_df.head()

Unnamed: 0,Feature,Pearson,RFE,Logistics,Random Forest,LightGBM,Total
0,last_ts,True,True,True,True,True,5
1,duration,True,True,True,True,True,5
2,music_days,True,True,True,False,True,4
3,Thumbs Down_per_log,True,True,True,True,False,4
4,Submit Upgrade_per_day,True,True,True,True,False,4


In [33]:
# creating a list of features which do not contain any duplicates e.g. not both Thumbs Down_per_log and Thumbs Down
important_num_features = list(num_feature_df[num_feature_df["Total"] >= 2]["Feature"])
unique_num_features = []
final_num_features = []
for idx, f in enumerate(important_num_features):
    if f.find("_per_day") > -1:
        f_stripped = f.replace("_per_day", "")
    elif f.find("_per_log") > -1:
        f_stripped = f.replace("_per_log", "")
    else:
        f_stripped = f
    # check if feature is already in or not
    if f_stripped not in unique_num_features:
        unique_num_features.append(f_stripped)
        final_num_features.append(f)
        
print("Final {} numerical features selected by feature importance: \n{}".format(len(final_num_features), final_num_features))

Final 22 numerical features selected by feature importance: 
['last_ts', 'duration', 'music_days', 'Thumbs Down_per_log', 'Submit Upgrade_per_day', 'NextSong_per_log', 'Add Friend_per_day', 'About', 'Submit Downgrade', 'Roll Advert', 'Home_per_day', 'Help_per_day', 'Downgrade', 'Add to Playlist', 'log_counts', 'avg_length', 'Upgrade_per_log', 'Thumbs Up_per_day', 'Settings_per_day', 'Save Settings_per_day', 'Logout_per_log', 'Error']


In [130]:
categorical_columns = ["gender", "valid_level"]
columns_for_modeling = ["label"] +  categorical_columns + final_num_features
df_model = df_model.select(*columns_for_modeling)
df_model.printSchema()

root
 |-- label: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- valid_level: string (nullable = true)
 |-- last_ts: long (nullable = true)
 |-- duration: integer (nullable = true)
 |-- music_days: long (nullable = true)
 |-- Thumbs Down_per_log: double (nullable = true)
 |-- Submit Upgrade_per_day: double (nullable = true)
 |-- NextSong_per_log: double (nullable = true)
 |-- Add Friend_per_day: double (nullable = true)
 |-- About: long (nullable = true)
 |-- Submit Downgrade: long (nullable = true)
 |-- Roll Advert: long (nullable = true)
 |-- Home_per_day: double (nullable = true)
 |-- Help_per_day: double (nullable = true)
 |-- Downgrade: long (nullable = true)
 |-- Add to Playlist: long (nullable = true)
 |-- log_counts: long (nullable = false)
 |-- avg_length: double (nullable = true)
 |-- Upgrade_per_log: double (nullable = true)
 |-- Thumbs Up_per_day: double (nullable = true)
 |-- Settings_per_day: double (nullable = true)
 |-- Save Settings_per_day: double (n

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [131]:
# no userId should be in here!
train = df_model.sampleBy("label", fractions={0: 0.8, 1: 0.5}, seed=42) # randomSplit([0.8, 0.2], seed=42)
test = df_model.subtract(train) # for balancing

train_baseline = df_baseline.sampleBy("label", fractions={0: 0.8, 1: 0.5}, seed=42)
test_baseline = df_baseline.subtract(train_baseline)

In [139]:
# handle categorical columns in pipeline
indexers = []

for cat in categorical_columns:
    indexers.append(StringIndexer(inputCol = cat, outputCol = "{}_indexed".format(cat)))

encoder = OneHotEncoderEstimator(inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=["{}_encoded".format(indexer.getOutputCol()) for indexer in indexers])

In [140]:
# numerical_cols = [col.name for col in df.schema.fields if col.dataType != StringType()]
# numerical_cols.remove("label")
# numerical_cols

In [141]:
numeric_assembler = VectorAssembler(inputCols=final_num_features, outputCol="numeric_vectorized")
#Lets scale the data
scaler = StandardScaler(inputCol = "numeric_vectorized", outputCol = "numeric_scaled", withStd = True, withMean = True)


#create final VectorAssembler to push data to ML models
assembler = VectorAssembler(inputCols=["numeric_scaled"] + encoder.getOutputCols(), outputCol="features")

## Random Forst Model

In [142]:
model_rf = RandomForestClassifier(featuresCol="features", labelCol="label")
pipeline_rf = Pipeline(stages= indexers + [encoder, numeric_assembler, scaler, assembler, model_rf])

In [143]:
paramGrid_rf = ParamGridBuilder() \
        .addGrid(model_rf.numTrees, [20,75]) \
        .addGrid(model_rf.maxDepth, [10, 20]) \
        .build() 


crossval_rf = CrossValidator(estimator=pipeline_rf,
                          estimatorParamMaps=paramGrid_rf,
                          evaluator=MulticlassClassificationEvaluator(metricName="f1"),
                          numFolds=3) # here you can set parallelism parameter

In [227]:
#start_time = time.time()
#cvModel_rf = crossval_rf.fit(train)
#end_time = time.time()
#print("Fitting the model took {} s.".format(round(end_time - start_time,2)))

## Logistic Regression

In [144]:
# Initiate log regression model
model_lr = LogisticRegression()

# Make pipeline for lr
pipeline_lr = Pipeline(stages= indexers + [encoder, numeric_assembler, scaler, assembler, model_lr])

# Grid Search Params
paramGrid_lr = ParamGridBuilder() \
    .addGrid(model_lr.maxIter, [10, 20]) \
    .addGrid(model_lr.elasticNetParam, [0, 0.5]) \
    .addGrid(model_lr.regParam,[0.1, 1]) \
    .build()

crossval_lr = CrossValidator(estimator=pipeline_lr,
                          estimatorParamMaps=paramGrid_lr,
                          evaluator=MulticlassClassificationEvaluator(metricName="f1"),
                          numFolds=3) # here you can set parallelism parameter

## GBTC Classifier

In [145]:
# Initiate log regression model
model_gbtc = GBTClassifier()

# Make pipeline for lr
pipeline_gbtc = Pipeline(stages= indexers + [encoder, numeric_assembler, scaler, assembler, model_gbtc])

# Grid Search Params
paramGrid_gbtc = ParamGridBuilder() \
    .addGrid(model_gbtc.maxIter, [10, 12]) \
    .build()

crossval_gbtc = CrossValidator(estimator=pipeline_gbtc,
                          estimatorParamMaps=paramGrid_gbtc,
                          evaluator=MulticlassClassificationEvaluator(metricName="f1"),
                          numFolds=3) # here you can set parallelism parameter

## LightGBM: either use the one from mmlspark or LGBMClassifier


In [402]:
# Initiate 
# from mmlspark.lightgbm import LightGBMClassifier
#model_lgbm = LGBMClassifier()

# Make pipeline for lr
#pipeline_lgbm = Pipeline(stages= indexers + [encoder, numeric_assembler, scaler, assembler, model_lgbm])

# Grid Search Params
#paramGrid_lgbm = ParamGridBuilder() \
#    .addGrid(model_lgbm.learningRate, [0.1, 0.5]) \
#    .addGrid(model_lgbm.numLeaves, [31, 76]) \
#    .addGrid(model_lgbm.numIterations, [100]) \
#    .addGrid(model_lgbm.objective, ["binary"]) \
#    .build()

#crossval = CrossValidator(estimator=pipeline_lgbm,
#                          estimatorParamMaps=paramGrid_lgbm,
#                          evaluator=MulticlassClassificationEvaluator(metricName="f1"),
#                          numFolds=3)

## Baseline

In [None]:
numeric_assembler_baseline = VectorAssembler(inputCols=numerical_baseline, outputCol="numeric_vectorized")
#Lets scale the data
scaler_baseline = StandardScaler(inputCol = "numeric_vectorized", outputCol = "numeric_scaled", withStd = True, withMean = True)


#create final VectorAssembler to push data to ML models
assembler = VectorAssembler(inputCols=["numeric_scaled"] + encoder.getOutputCols(), outputCol="features")
model_baseline = RandomForestClassifier(featuresCol="features", labelCol="label")
pipeline_baseline = Pipeline(stages= indexers + [encoder, numeric_assembler_baseline, scaler_baseline, assembler, model_rf])

In [150]:
def fit_crossval(crossval, train):
    """TODO"""
    start_time = time.time() # start
    cv_model = crossval.fit(train) # fit
    end_time = time.time() # end
    
    print("Fitting the model took {} s.".format(round(end_time - start_time,2)))
    return cv_model

In [152]:
def evaluate_model(model, train, test, metric = 'f1'):
    """TODO"""
    # init evaluator
    evaluator = MulticlassClassificationEvaluator(metricName = metric)
    
    # make predictions
    prediction_result_train = model.transform(train)
    prediction_result_test = model.transform(test)
    
    # calcualte the scores
    score_train = evaluator.evaluate(prediction_result_train)
    score_test = evaluator.evaluate(prediction_result_test)
    print("{} score on training data is {}".format(metric, score_train))
    print("{} score on test data is {}".format(metric, score_test))
    
    # feature importance
    #importance = {}
    #for i in range(len(model.stages[-1].featureImportances)):
    #    importance[features_df.columns[i]] = model.stages[-1].featureImportances[i]
    # 
    #sorted_importance = {k: v for k, v in sorted(importance.items(), key=lambda item: item[1])}
    #print(sorted_importance)
    try:
        print(model.stages[-1].featureImportances)
        try:
            features = prediction_result_test.select("features").distinct().collect()
            print(features)
        except:
            pass
        try:
            numeric_metadata = prediction_result_test.select("features").schema[0].metadata.get('ml_attr').get('attrs').get('numeric')
            binary_metadata = prediction_result_test.select("features").schema[0].metadata.get('ml_attr').get('attrs').get('binary')

            merge_list = numeric_metadata + binary_metadata
            print(merge_list)
    except:
        pass
        
    return score_train, score_test

# Fit Models and Evaluate

In [151]:
crossval_dict = {}
crossval_dict["Baseline"] = crossval_baseline
crossval_dict["Random-Forst"] = crossval_rf
crossval_dict["Logistic Regression"] = crossval_lr
crossval_dict["GBTC"] = crossval_gbtc
result_dict = {}
if len(crossval_dict) > 1:
    # worth caching
    train.cache()
    test.cache()

# tuned models
for cv_key in crossval_dict:
    # fit the model
    print("Fitting {} model ...".format(cv_key))
    if cv_key == "Baseline":
        # fit model
        cv_model = fit_crossval(crossval_dict[cv_key], train_baseline)
        # evaluate model
        score_train, score_test = evaluate_model(cv_model.bestModel, train_baseline, test_baseline)
    else:
        # fit model
        cv_model = fit_crossval(crossval_dict[cv_key], train)
        # evaluate it
        score_train, score_test = evaluate_model(cv_model.bestModel, train, test)
        
    result_dict[cv_key] = [score_train, score_test]

print("-------------Result Summary--------------")
print(result_dict)

Fitting Random-Forst model ...


IllegalArgumentException: 'Field "Thumbs Down_per_log" does not exist.\nAvailable fields: userId, label, log_counts, last_ts, duration, About, Add Friend, Add to Playlist, Downgrade, Error, Help, Home, Logout, NextSong, Roll Advert, Save Settings, Settings, Submit Downgrade, Submit Upgrade, Thumbs Down, Thumbs Up, Upgrade, valid_level, gender, avg_length, music_days, music_days_per_day, About_per_day, Add Friend_per_day, Add to Playlist_per_day, Downgrade_per_day, Error_per_day, Help_per_day, Home_per_day, Logout_per_day, NextSong_per_day, Roll Advert_per_day, Save Settings_per_day, Settings_per_day, Submit Downgrade_per_day, Submit Upgrade_per_day, Thumbs Down_per_day, Thumbs Up_per_day, Upgrade_per_day, CrossValidator_ba8ed13f5ea0_rand, gender_indexed, valid_level_indexed, gender_indexed_encoded, valid_level_indexed_encoded'