# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [1]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, CountVectorizer, \
    IDF, StringIndexer, RegexTokenizer, VectorAssembler, Normalizer, StandardScaler
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, FloatType
from pyspark.ml.feature import RegexTokenizer, VectorAssembler, Normalizer, StandardScaler
from pyspark.sql.functions import udf
from pyspark.sql.functions import desc, asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import concat, lit, avg, split, isnan, isnull, when, count, col, \
sum, mean, stddev, min, max, countDistinct,approx_count_distinct, size, collect_set, round
from pyspark.sql import Window


import seaborn as sns
import re
from datetime import datetime
import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

from pyspark.ml.feature import StringIndexer, VectorAssembler, Normalizer, StandardScaler, MinMaxScaler
from pyspark.ml.feature import OneHotEncoder, StringIndexer, OneHotEncoderEstimator
from pyspark.ml.classification import  RandomForestClassifier, LogisticRegression, LinearSVC, NaiveBayes, GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics

In [2]:
# create a Spark session
spark = SparkSession\
    .builder\
    .appName("SmallSparkify")\
    .getOrCreate()

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

## Create a function for feature extraction

In [3]:
def create_dataframe_forML(df):
    def subfunc_cleaning(df):
        # Remove no id rows
        df = df.filter(df["userId"] != "")
        
        return df
        
    def subfunc_featureCreation(df):
        # Make a list of ids who churned
        churn_id_list = df[df.page == "Cancellation Confirmation"].select("userId").distinct().collect()
        churn_id_list = [x['userId'] for x in churn_id_list]
        create_churn_udf = udf(lambda userid: 1 if userid in churn_id_list else 0, IntegerType())
        df = df.withColumn("churn", create_churn_udf(df.userId))

        # Create new userAgent column
        def create_new_agent(userAgent):
            if userAgent == None:
                computer_os = None
            else:
                computer_os = userAgent.split()[1]
                computer_os = re.sub(r'[\W\s]','' ,computer_os)

            return  computer_os
        create_new_agent_udf = udf(create_new_agent, StringType())
        df = df.withColumn("os_system", create_new_agent_udf(df.userAgent))

        # Create new location column
        def create_new_location(location):
            if location == None:
                location_first = None
            else:
                location_first = location.split(',')[-1].split('-')[0].strip()

            return location_first

        create_new_location_udf = udf(create_new_location, StringType())
        df = df.withColumn("location_first", create_new_location_udf(df.location))

        # Create total number of sessionId column
        w = Window.partitionBy(df.userId)
        df = df.withColumn('total_sessionId', size(collect_set('sessionId').over(w)))

        # Create total number of itemInSession column
        df = df.withColumn('total_itemInSession', count('itemInSession').over(w))


      

        # Create last_access_time,first_access_time columns
        df = df.withColumn('last_access_time', max('ts').over(w))
        df = df.withColumn('first_access_time', min('ts').over(w))

        # Create last_level column
        df = df.withColumn('last_level',when(df.last_access_time == df.ts, df.level))


        # Create time difference column
        def calculate_time_after_id_creation(last_time, first_time):
            last_access_datetime = datetime.utcfromtimestamp(last_time / 1000)
            first_access_datetime = datetime.utcfromtimestamp(first_time / 1000)
            time_after_id_creation = last_access_datetime - first_access_datetime
            result = time_after_id_creation.total_seconds()/3600

            return result

        calculate_time_after_id_creation_udf = udf(calculate_time_after_id_creation, FloatType())
        df = df.withColumn("time_after_id_creation(hour)", calculate_time_after_id_creation_udf(df.last_access_time, df.first_access_time))
        df = df.withColumn("time_after_id_creation(hour)", round(col('time_after_id_creation(hour)')/1, 2))


        # convert timestamp to date (string)
        def create_numWeek(ts):
            return datetime.utcfromtimestamp(ts / 1000).strftime("%V")
        def create_numMonth(ts):
            return datetime.utcfromtimestamp(ts / 1000).strftime("%m")
        def create_numYear(ts):
            return datetime.utcfromtimestamp(ts / 1000).strftime("%Y")

        create_numWeek_udf = udf(create_numWeek, StringType())
        create_numMonth_udf = udf(create_numMonth, StringType())
        create_numYear_udf = udf(create_numYear, StringType())
        df = df.withColumn('num_week', create_numWeek_udf(col('ts')))
        df = df.withColumn('num_month', create_numMonth_udf(col('ts')))
        df = df.withColumn('num_year', create_numYear_udf(col('ts')))

        # Make a top_100 alltime artist list
        tmp_list = df.where(df.artist != "").groupby("artist").count().sort(col("count").desc()).collect()[0:100]
        top_100_alltime_artist_list = [row["artist"] for row in tmp_list]
        top_100_alltime_artist_list

        # Make udf to set 1 at churn column in every row of the chrun users 
        def create_top100_alltime(artist):
            if artist in top_100_alltime_artist_list:
                return 1
            else:
                return 0

        create_top100_alltime_udf = udf(create_top100_alltime, IntegerType())
        df = df.withColumn("top100_artist_alltime", create_top100_alltime_udf(df.artist))
        # Count total number of top 100
        w = Window.partitionBy(df.userId)
        df = df.withColumn('total_Top100_artist_alltime', sum('top100_artist_alltime').over(w))

        # Make a dictionary of a best seller song list of each week
        tmp_list = df.select("num_week").distinct().sort("num_week").collect()
        available_week_list = [row["num_week"] for row in tmp_list]
        available_week_list


    # Make a dictionary of a best seller song list of each week

        def create_dict_top100_song_week(available_week_list):
            dict_top100_song_week = dict()
            for week in available_week_list:
                top_100_week_song_list = df.where((df.artist != "") & (df.num_week == week)).groupby("song","num_week").count()\
                .sort(col("num_week"), col("count").desc()).collect()[0:100]
                top_100_week_song_list = [row['song'] for row in top_100_week_song_list]
                dict_top100_song_week[week] = top_100_week_song_list

            return dict_top100_song_week

        dict_top100_song_week = create_dict_top100_song_week(available_week_list)

        # Make a top_100_song_week list
        def create_top100_song_week(song, num_week):
            if song in dict_top100_song_week[num_week]:
                return 1
            else:
                return 0

        create_top100_song_week_udf = udf(create_top100_song_week, IntegerType())
        df = df.withColumn("top100_song_week", create_top100_song_week_udf(df.song, df.num_week))
        # Count total number of top 100_song_week
        w = Window.partitionBy(df.userId)
        df = df.withColumn('total_Top100_song_week', sum('top100_song_week').over(w))
        
        
        return df
    


    def subfunc_joinDf_featureExtraction(df):
        # Create other dataframes to be joined to df dataframe to get 
        # Number of thumb up/ thumb down/ advert/ add friend/ upgrade/ downgrade/ error/ logout
        thumbsup_df = df.where(df.page == 'Thumbs Up').groupBy("userId").agg(count("page").alias("total_thumbsup"))
        thumbsdown_df = df.where(df.page == 'Thumbs Down').groupBy("userId").agg(count("page").alias("total_thumbsdown"))
        advert_df =  df.where(df.page == 'Roll Advert').groupBy("userId").agg(count("page").alias("total_rolladvert"))
        addfriend_df =  df.where(df.page == 'Add Friend').groupBy("userId").agg(count("page").alias("total_addfriend"))
        addplaylist_df =  df.where(df.page == 'Add to Playlist').groupBy("userId").agg(count("page").alias("total_addplaylist"))
        sub_upgrade_df = df.where(df.page == 'Submit Upgrade').groupBy("userId").agg(count("page").alias("total_sub_upgrade"))
        sub_downgrade_df = df.where(df.page == 'Submit Downgrade').groupBy("userId").agg(count("page").alias("total_sub_downgrade"))
        error_df = df.where(df.page == 'Error').groupBy("userId").agg(count("page").alias("total_error"))
        logout_df = df.where(df.page == 'Logout').groupBy("userId").agg(count("page").alias("total_logout"))
        last_level_df = df[df.last_level != 'None'].select("userId","last_level")
        
        # Create total spent time
        spent_time_df = df.groupBy("userId", "sessionId").agg(max('ts').alias("max_ts_session"), 
                                                              min('ts').alias("min_ts_session")).orderBy("sessionId", ascending=True)
        def calculate_time_inSession(max_ts_session, min_ts_session):
            max_ts_session_datetime = datetime.utcfromtimestamp(max_ts_session / 1000)
            min_ts_session_datetime = datetime.utcfromtimestamp(min_ts_session / 1000)
            spent_time_session = max_ts_session_datetime - min_ts_session_datetime
            result = spent_time_session.total_seconds()/3600

            return result

        calculate_time_inSession_udf = udf(calculate_time_inSession, FloatType())
        spent_time_df = spent_time_df.withColumn("time_spent_Session_hour", 
                                                     calculate_time_inSession_udf(spent_time_df.max_ts_session, spent_time_df.min_ts_session))

        w = Window.partitionBy(spent_time_df.userId)
        spent_time_df = spent_time_df.withColumn("time_spent_Total_hour", sum(spent_time_df.time_spent_Session_hour).over(w))
        spent_time_df = spent_time_df.drop('max_ts_session','min_ts_session')
        spent_time_df = spent_time_df.withColumn('time_spent_Session_hour', round(col('time_spent_Session_hour')/1, 2))
        spent_time_df = spent_time_df.withColumn('time_spent_Total_hour', round(col('time_spent_Total_hour')/1, 2))
        spent_time_df_only_total = spent_time_df.drop('time_spent_Session_hour','sessionId').distinct()
        
        # Make df_new for machine learning
        df_new = df.select("userId",'gender', 'churn', 'os_system', 'location_first', 'total_sessionId', 'total_itemInSession',"time_after_id_creation(hour)", 
                           "total_Top100_artist_alltime", "total_Top100_song_week").dropna().drop_duplicates()
        df_new = df_new.join(thumbsup_df, 'userId', how='left').distinct()
        df_new = df_new.join(thumbsdown_df, 'userId', how='left').distinct()
        df_new = df_new.join(advert_df, 'userId', how='left').distinct()
        df_new = df_new.join(addfriend_df, 'userId', how='left').distinct()
        df_new = df_new.join(addplaylist_df, 'userId', how='left').distinct()
        df_new = df_new.join(sub_upgrade_df, 'userId', how='left').distinct()
        df_new = df_new.join(sub_downgrade_df, 'userId', how='left').distinct()
        df_new = df_new.join(error_df, 'userId', how='left').distinct()
        df_new = df_new.join(logout_df, 'userId', how='left').distinct()
        df_new = df_new.fillna(0, subset=['total_thumbsup','total_thumbsdown', 'total_rolladvert', 'total_addfriend', 'total_addplaylist', 'total_sub_upgrade', 
                                         'total_sub_downgrade', 'total_error', 'total_logout'])
        df_new = df_new.join(last_level_df, 'userId', how='left').distinct()
        df_new = df_new.join(spent_time_df_only_total, 'userId', how='left').distinct()
        
        df_new1 = df_new.withColumn("time_spent_Total_day", round(col('time_spent_Total_hour')/24, 2))
        df_new1 = df_new1.withColumn("time_after_id_creation(day)", round(col('time_after_id_creation(hour)')/24, 2))
        df_new1 = df_new1.withColumn("avg_total_sessionId_afterCreation", round(col('total_sessionId') / col('time_after_id_creation(day)'), 2))
        df_new1 = df_new1.withColumn("avg_itemInSession_afterCreation", round(col('total_itemInSession')/col('time_after_id_creation(day)'), 2))
        df_new1 = df_new1.withColumn("avg_thumbsup_afterCreation", round(col('total_thumbsup')/col('time_after_id_creation(day)'), 2))
        df_new1 = df_new1.withColumn("avg_thumbsdown_afterCreation", round((col('total_thumbsdown')/col('time_after_id_creation(day)')), 2))
        df_new1 = df_new1.withColumn("avg_rolladvert_afterCreation", round((col('total_rolladvert')/col('time_after_id_creation(day)')), 2))
        df_new1 = df_new1.withColumn("avg_addfriend_afterCreation", round((col('total_addfriend')/col('time_after_id_creation(day)')), 2))
        df_new1 = df_new1.withColumn("avg_addplaylist_afterCreation", round((col('total_addplaylist')/col('time_after_id_creation(day)')), 2))
        df_new1 = df_new1.withColumn("avg_error_afterCreation", round((col('total_error')/col('time_after_id_creation(day)')), 2))
        df_new1 = df_new1.withColumn("avg_logout_afterCreation", round((col('total_logout')/col('time_after_id_creation(day)')), 2))
        df_new1 = df_new1.withColumn("avg_total_Top100_artist_alltime", round((col('total_Top100_artist_alltime')/col('time_after_id_creation(day)')), 2))
        df_new1 = df_new1.withColumn("avg_total_Top100_song_week", round((col('total_Top100_song_week')/col('time_after_id_creation(day)')), 2))

        
        return df_new1
    
    df = subfunc_cleaning(df)
    print("Cleaning is done")
    df = subfunc_featureCreation(df)
    print("Feature creation is done")
    df_new1 = subfunc_joinDf_featureExtraction(df)
    print("Joining dataframes and feature extraction are done")
    
    return df_new1

df = spark.read.options(header=True).json("mini_sparkify_event_data.json")
df_ML = create_dataframe_forML(df)
df_ML.cache()
df_ML.take(1)

Cleaning is done
Feature creation is done
Joining dataframes and feature extraction are done


[Row(userId='100010', gender='F', churn=0, os_system='iPhone', location_first='CT', total_sessionId=7, total_itemInSession=381, time_after_id_creation(hour)=1061.23, total_Top100_artist_alltime=61, total_Top100_song_week=31, total_thumbsup=17, total_thumbsdown=5, total_rolladvert=52, total_addfriend=4, total_addplaylist=7, total_sub_upgrade=0, total_sub_downgrade=0, total_error=0, total_logout=5, last_level='free', time_spent_Total_hour=18.02, time_spent_Total_day=0.75, time_after_id_creation(day)=44.22, avg_total_sessionId_afterCreation=0.16, avg_itemInSession_afterCreation=8.62, avg_thumbsup_afterCreation=0.38, avg_thumbsdown_afterCreation=0.11, avg_rolladvert_afterCreation=1.18, avg_addfriend_afterCreation=0.09, avg_addplaylist_afterCreation=0.16, avg_error_afterCreation=0.0, avg_logout_afterCreation=0.11, avg_total_Top100_artist_alltime=1.38, avg_total_Top100_song_week=0.7)]

In [4]:
df_ML.count()

225

In [5]:
df_ML.head(1)

[Row(userId='100010', gender='F', churn=0, os_system='iPhone', location_first='CT', total_sessionId=7, total_itemInSession=381, time_after_id_creation(hour)=1061.23, total_Top100_artist_alltime=61, total_Top100_song_week=31, total_thumbsup=17, total_thumbsdown=5, total_rolladvert=52, total_addfriend=4, total_addplaylist=7, total_sub_upgrade=0, total_sub_downgrade=0, total_error=0, total_logout=5, last_level='free', time_spent_Total_hour=18.02, time_spent_Total_day=0.75, time_after_id_creation(day)=44.22, avg_total_sessionId_afterCreation=0.16, avg_itemInSession_afterCreation=8.62, avg_thumbsup_afterCreation=0.38, avg_thumbsdown_afterCreation=0.11, avg_rolladvert_afterCreation=1.18, avg_addfriend_afterCreation=0.09, avg_addplaylist_afterCreation=0.16, avg_error_afterCreation=0.0, avg_logout_afterCreation=0.11, avg_total_Top100_artist_alltime=1.38, avg_total_Top100_song_week=0.7)]

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

### Make train, test and validation sets 

In [6]:
### Make train, test and validation sets 
train, test, validation = df_ML.randomSplit([0.7, 0.15, 0.15], seed = 44)

In [7]:
print(train.count())
print(test.count())
print(validation.count())

156
38
31


In [8]:
df_ML.head(1)

[Row(userId='100010', gender='F', churn=0, os_system='iPhone', location_first='CT', total_sessionId=7, total_itemInSession=381, time_after_id_creation(hour)=1061.23, total_Top100_artist_alltime=61, total_Top100_song_week=31, total_thumbsup=17, total_thumbsdown=5, total_rolladvert=52, total_addfriend=4, total_addplaylist=7, total_sub_upgrade=0, total_sub_downgrade=0, total_error=0, total_logout=5, last_level='free', time_spent_Total_hour=18.02, time_spent_Total_day=0.75, time_after_id_creation(day)=44.22, avg_total_sessionId_afterCreation=0.16, avg_itemInSession_afterCreation=8.62, avg_thumbsup_afterCreation=0.38, avg_thumbsdown_afterCreation=0.11, avg_rolladvert_afterCreation=1.18, avg_addfriend_afterCreation=0.09, avg_addplaylist_afterCreation=0.16, avg_error_afterCreation=0.0, avg_logout_afterCreation=0.11, avg_total_Top100_artist_alltime=1.38, avg_total_Top100_song_week=0.7)]

### Make Pipelines to feed data into classifiers

In [9]:
column_for_stringindexer_list = ["gender", 'last_level', 'location_first']
indexers = [StringIndexer(inputCol=column, outputCol="indexed_"+column) for column in column_for_stringindexer_list]
encoder = OneHotEncoderEstimator(inputCols=["indexed_gender", "indexed_last_level", "indexed_location_first"],
                                       outputCols=["gender_feat", "last_level_feat", "location_feat"],
                                handleInvalid = 'keep')
features = ['gender_feat', 'last_level_feat', 'location_feat', 'time_after_id_creation(day)','avg_total_sessionId_afterCreation','avg_itemInSession_afterCreation',
            'avg_thumbsup_afterCreation','avg_thumbsdown_afterCreation','avg_rolladvert_afterCreation','avg_addfriend_afterCreation',
            'avg_addplaylist_afterCreation','avg_error_afterCreation','avg_logout_afterCreation','avg_total_Top100_artist_alltime','avg_total_Top100_song_week']
assembler = VectorAssembler(inputCols=features, outputCol="scaled_features")


pipeline = Pipeline(stages=[indexers[0], indexers[1], indexers[2], encoder, assembler])
df_ML1 = pipeline.fit(df_ML).transform(df_ML)
df_ML1.head()

Row(userId='100010', gender='F', churn=0, os_system='iPhone', location_first='CT', total_sessionId=7, total_itemInSession=381, time_after_id_creation(hour)=1061.23, total_Top100_artist_alltime=61, total_Top100_song_week=31, total_thumbsup=17, total_thumbsdown=5, total_rolladvert=52, total_addfriend=4, total_addplaylist=7, total_sub_upgrade=0, total_sub_downgrade=0, total_error=0, total_logout=5, last_level='free', time_spent_Total_hour=18.02, time_spent_Total_day=0.75, time_after_id_creation(day)=44.22, avg_total_sessionId_afterCreation=0.16, avg_itemInSession_afterCreation=8.62, avg_thumbsup_afterCreation=0.38, avg_thumbsdown_afterCreation=0.11, avg_rolladvert_afterCreation=1.18, avg_addfriend_afterCreation=0.09, avg_addplaylist_afterCreation=0.16, avg_error_afterCreation=0.0, avg_logout_afterCreation=0.11, avg_total_Top100_artist_alltime=1.38, avg_total_Top100_song_week=0.7, indexed_gender=1.0, indexed_last_level=1.0, indexed_location_first=9.0, gender_feat=SparseVector(2, {1: 1.0}),

In [10]:
df_ML1_togo = df_ML1.select(col("churn").alias("label"), col("scaled_features").alias("features"))
df_ML1_togo.head(1)

[Row(label=0, features=SparseVector(57, {1: 1.0, 3: 1.0, 13: 1.0, 45: 44.22, 46: 0.16, 47: 8.62, 48: 0.38, 49: 0.11, 50: 1.18, 51: 0.09, 52: 0.16, 54: 0.11, 55: 1.38, 56: 0.7}))]

In [11]:
### Make train, test and validation sets 
train, test, validation = df_ML1_togo.randomSplit([0.7, 0.15, 0.15], seed = 41)

In [12]:
# initialize random forest classifier
rf = RandomForestClassifier(featuresCol="features",labelCol="label")
# initialize logistic regression
lr = LogisticRegression(maxIter=5,threshold=0.3)
# initialize logistic regression
l_SVC = LinearSVC(featuresCol="features",labelCol="label",threshold=0.4)
# initialize Naive Bayes
nb = NaiveBayes(featuresCol="features",labelCol="label")
# initialize N GBTClassifier
gbt = GBTClassifier(featuresCol="features",labelCol="label", maxIter=5)

In [13]:
# random forest
##
starttime = datetime.now()
model_rf = rf.fit(train)
pred_train_rf = model_rf.transform(train)
pred_test_rf = model_rf.transform(test)
pred_validation_rf = model_rf.transform(validation)

## train_set
predictionAndLabels_rf = pred_train_rf.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_rf = MulticlassMetrics(predictionAndLabels_rf )
# F1 score
print("F1 score of training set: ", metrics_rf.fMeasure())
print("Precision of training set: ", metrics_rf.precision(1))
print("Recall of training set: " , metrics_rf.recall(1))
print(metrics_rf.confusionMatrix().toArray())
print()

## test_set
predictionAndLabels_rf = pred_test_rf.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_rf = MulticlassMetrics(predictionAndLabels_rf)
# F1 score
print("F1 score of test set: ", metrics_rf.fMeasure())
print("Precision of test set: ", metrics_rf.precision(1))
print("Recall of test set: " , metrics_rf.recall(1))
print(metrics_rf.confusionMatrix().toArray())
print(datetime.now() - starttime)
print()

## validation_set
predictionAndLabels_rf = pred_validation_rf.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_rf = MulticlassMetrics(predictionAndLabels_rf)
# F1 score
print("F1 score of validation set: ", metrics_rf.fMeasure())
print("Precision of validation set: ", metrics_rf.precision(1))
print("Recall of validation set: ", metrics_rf.recall(1))
print(metrics_rf.confusionMatrix().toArray())
print(datetime.now() - starttime)

F1 score of training set:  0.9294871794871795
Precision of training set:  0.9666666666666667
Recall of training set:  0.7435897435897436
[[ 116.    1.]
 [  10.   29.]]

F1 score of test set:  0.9166666666666666
Precision of test set:  1.0
Recall of test set:  0.6666666666666666
[[ 27.   0.]
 [  3.   6.]]
0:03:16.522448

F1 score of validation set:  0.8484848484848485
Precision of validation set:  0.3333333333333333
Recall of validation set:  0.25
[[ 27.   2.]
 [  3.   1.]]
0:04:24.809489


In [14]:
# Logistic regression
##
starttime = datetime.now()
model_lr = lr.fit(train)
pred_train_lr = model_lr.transform(train)
pred_test_lr = model_lr.transform(test)
pred_validation_lr = model_lr.transform(validation)

## train_set
predictionAndLabels_lr = pred_train_lr.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_lr = MulticlassMetrics(predictionAndLabels_lr)

# F1 score
print("F1 score of training set: ", metrics_lr.fMeasure())
print("Precision of training set: ", metrics_lr.precision(1))
print("Recall of training set: " , metrics_lr.recall(1))
print(metrics_lr.confusionMatrix().toArray())
print()

## test_set
predictionAndLabels_lr = pred_test_lr.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_lr = MulticlassMetrics(predictionAndLabels_lr)
# F1 score
print("F1 score of test set: ", metrics_lr.fMeasure())
print("Precision of test set: ", metrics_lr.precision(1))
print("Recall of test set: ", metrics_lr.recall(1))
print(metrics_lr.confusionMatrix().toArray())
print(datetime.now() - starttime)
print()

## validation_set
predictionAndLabels_lr = pred_validation_lr.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_lr = MulticlassMetrics(predictionAndLabels_lr)
# F1 score
print("F1 score of validation set: " , metrics_lr.fMeasure())
print("Precision of validation set:  " , metrics_lr.precision(1))
print("Recall on validation set: ", metrics_lr.recall(1))
print(metrics_lr.confusionMatrix().toArray())
print(datetime.now() - starttime)

F1 score of training set:  0.8846153846153846
Precision of training set:  0.7333333333333333
Recall of training set:  0.8461538461538461
[[ 105.   12.]
 [   6.   33.]]

F1 score of test set:  0.6388888888888888
Precision of test set:  0.3333333333333333
Recall of test set:  0.4444444444444444
[[ 19.   8.]
 [  5.   4.]]
0:02:38.314477

F1 score of validation set:  0.7575757575757576
Precision of validation set:   0.16666666666666666
Recall on validation set:  0.25
[[ 24.   5.]
 [  3.   1.]]
0:03:43.606134


In [15]:
# Linear SVC
##
starttime = datetime.now()
model_l_SVC = l_SVC.fit(train)
pred_train_l_SVC = model_l_SVC.transform(train)
pred_test_l_SVC = model_l_SVC.transform(test)

## train_set
predictionAndLabels_l_SVC = pred_train_l_SVC.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_l_SVC = MulticlassMetrics(predictionAndLabels_l_SVC)

# F1 score
print("F1 score on training dataset is %s" % metrics_l_SVC.fMeasure())
print("Precision on training dataset is %s" % metrics_l_SVC.precision(1))
print("Recall on training dataset is %s" % metrics_l_SVC.recall(1))
print(metrics_l_SVC.confusionMatrix().toArray())
print()

## test_set
predictionAndLabels_l_SVC = pred_test_l_SVC.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_l_SVC = MulticlassMetrics(predictionAndLabels_l_SVC)
# F1 score
print("F1 score on test dataset is %s" % metrics_l_SVC.fMeasure())
print("Precision on test dataset is %s" % metrics_l_SVC.precision(1))
print("Recall on test dataset is %s" % metrics_l_SVC.recall(1))
print(metrics_l_SVC.confusionMatrix().toArray())
print(datetime.now() - starttime)

F1 score on training dataset is 0.8910256410256411
Precision on training dataset is 0.8928571428571429
Recall on training dataset is 0.6410256410256411
[[ 114.    3.]
 [  14.   25.]]

F1 score on test dataset is 0.75
Precision on test dataset is 0.5
Recall on test dataset is 0.3333333333333333
[[ 24.   3.]
 [  6.   3.]]
0:19:42.210861


In [16]:
# Naive Bayes
##
starttime = datetime.now()
model_nb = nb.fit(train)
pred_train_nb = model_nb.transform(train)
pred_test_nb = model_nb.transform(test)

## train_set
predictionAndLabels_nb = pred_train_nb.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_nb = MulticlassMetrics(predictionAndLabels_nb)

# F1 score
print("F1 score of training set: " , metrics_nb.fMeasure())
print("Precision of training set: " , metrics_nb.precision(1))
print("Recall of training set: " , metrics_nb.recall(1))
print(metrics_nb.confusionMatrix().toArray())
print()

## test_set
predictionAndLabels_nb = pred_test_nb.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_nb = MulticlassMetrics(predictionAndLabels_nb)
# F1 score
print("F1 score of test set: " , metrics_nb.fMeasure())
print("Precision of test set: " , metrics_nb.precision(1))
print("Recall of test set: " , metrics_nb.recall(1))
print(metrics_nb.confusionMatrix().toArray())
print(datetime.now() - starttime)

F1 score of training set:  0.8012820512820513
Precision of training set:  0.7
Recall of training set:  0.358974358974359
[[ 111.    6.]
 [  25.   14.]]

F1 score of test set:  0.7777777777777778
Precision of test set:  0.5714285714285714
Recall of test set:  0.4444444444444444
[[ 24.   3.]
 [  5.   4.]]
0:02:16.665492


In [17]:
# GBT
##
starttime = datetime.now()
model_gbt = gbt.fit(train)
pred_train_gbt = model_gbt.transform(train)
pred_test_gbt = model_gbt.transform(test)
pred_validation_gbt = model_gbt.transform(validation)

## train_set
predictionAndLabels_gbt = pred_train_gbt.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_gbt = MulticlassMetrics(predictionAndLabels_gbt)

# F1 score
print("F1 score of training set: ", metrics_gbt.fMeasure())
print("Precision of training set: ", metrics_gbt.precision(1))
print("Recall of training set: ", metrics_gbt.recall(1))
print(metrics_gbt.confusionMatrix().toArray())
print()

## test_set
predictionAndLabels_gbt = pred_test_gbt.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_gbt = MulticlassMetrics(predictionAndLabels_gbt)
# F1 score
print("F1 score of test set: ", metrics_gbt.fMeasure())
print("Precision of test set: " , metrics_gbt.precision(1))
print("Recall of test set: " , metrics_gbt.recall(1))
print(metrics_gbt.confusionMatrix().toArray())
print(datetime.now() - starttime)
print()

## valid_set
predictionAndLabels_gbt = pred_validation_gbt.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_gbt = MulticlassMetrics(predictionAndLabels_gbt)
# F1 score
print("F1 score of validation set: ", metrics_gbt.fMeasure())
print("Precision of validation set: ", metrics_gbt.precision(1))
print("Recall of validation set: ", metrics_gbt.recall(1))
print(metrics_gbt.confusionMatrix().toArray())
print(datetime.now() - starttime)

F1 score of training set:  1.0
Precision of training set:  1.0
Recall of training set:  1.0
[[ 117.    0.]
 [   0.   39.]]

F1 score of test set:  0.8055555555555556
Precision of test set:  0.5833333333333334
Recall of test set:  0.7777777777777778
[[ 22.   5.]
 [  2.   7.]]
0:04:22.122882

F1 score of validation set:  0.9090909090909091
Precision of validation set:  0.6666666666666666
Recall of validation set:  0.5
[[ 28.   1.]
 [  2.   2.]]
0:05:27.898781


According to the results above, only 3 classifiers including `Random Forest`, `Logistic Regression`, and `GBT classifier` passed to the final steps. The reasons of disqualification of the rest two models are that LinearSVC was too slow and Naive Bayes did not have good F1 score. Logistic Regression did not show good F1 score, but I passed it to the final steps because the model has threshold parameter that I expecedt that it might help to increase precision and recall values.


# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.

### Pipelines after MinMax Scaler

In [18]:
column_for_stringindexer_list = ["gender", 'last_level', 'location_first']
indexers = [StringIndexer(inputCol=column, outputCol="indexed_"+column) for column in column_for_stringindexer_list]
encoder = OneHotEncoderEstimator(inputCols=["indexed_gender", "indexed_last_level", "indexed_location_first"],
                                       outputCols=["gender_feat", "last_level_feat", "location_feat"],
                                handleInvalid = 'keep')
features = ['gender_feat', 'last_level_feat', 'location_feat', 'time_after_id_creation(day)','avg_total_sessionId_afterCreation','avg_itemInSession_afterCreation',
            'avg_thumbsup_afterCreation','avg_thumbsdown_afterCreation','avg_rolladvert_afterCreation','avg_addfriend_afterCreation',
            'avg_addplaylist_afterCreation','avg_error_afterCreation','avg_logout_afterCreation','avg_total_Top100_artist_alltime','avg_total_Top100_song_week']
assembler = VectorAssembler(inputCols=features, outputCol="scaled_features")
scaler = MinMaxScaler(inputCol="scaled_features" , outputCol="ScaledFeatures")


pipeline = Pipeline(stages=[indexers[0], indexers[1], indexers[2], encoder, assembler, scaler])
df_ML2= pipeline.fit(df_ML).transform(df_ML)
df_ML2.head()

Row(userId='100010', gender='F', churn=0, os_system='iPhone', location_first='CT', total_sessionId=7, total_itemInSession=381, time_after_id_creation(hour)=1061.23, total_Top100_artist_alltime=61, total_Top100_song_week=31, total_thumbsup=17, total_thumbsdown=5, total_rolladvert=52, total_addfriend=4, total_addplaylist=7, total_sub_upgrade=0, total_sub_downgrade=0, total_error=0, total_logout=5, last_level='free', time_spent_Total_hour=18.02, time_spent_Total_day=0.75, time_after_id_creation(day)=44.22, avg_total_sessionId_afterCreation=0.16, avg_itemInSession_afterCreation=8.62, avg_thumbsup_afterCreation=0.38, avg_thumbsdown_afterCreation=0.11, avg_rolladvert_afterCreation=1.18, avg_addfriend_afterCreation=0.09, avg_addplaylist_afterCreation=0.16, avg_error_afterCreation=0.0, avg_logout_afterCreation=0.11, avg_total_Top100_artist_alltime=1.38, avg_total_Top100_song_week=0.7, indexed_gender=1.0, indexed_last_level=1.0, indexed_location_first=9.0, gender_feat=SparseVector(2, {1: 1.0}),

In [19]:
df_ML2_togo = df_ML2.select(col("churn").alias("label"), col("scaled_features").alias("features"))
df_ML2_togo.head(1)

[Row(label=0, features=SparseVector(57, {1: 1.0, 3: 1.0, 13: 1.0, 45: 44.22, 46: 0.16, 47: 8.62, 48: 0.38, 49: 0.11, 50: 1.18, 51: 0.09, 52: 0.16, 54: 0.11, 55: 1.38, 56: 0.7}))]

In [20]:
### Make train, test and validation sets 
train, test, validation = df_ML2_togo.randomSplit([0.7, 0.15, 0.15], seed = 41)

In [21]:
# initialize random forest classifier
rf = RandomForestClassifier(featuresCol="features",labelCol="label")
# initialize logistic regression
lr = LogisticRegression(maxIter=5,threshold=0.3)
# initialize N GBTClassifier
gbt = GBTClassifier(featuresCol="features",labelCol="label", maxIter=5)

In [22]:
# random forest
##
starttime = datetime.now()
model_rf = rf.fit(train)
pred_train_rf = model_rf.transform(train)
pred_test_rf = model_rf.transform(test)
pred_validation_rf = model_rf.transform(validation)

## train_set
predictionAndLabels_rf = pred_train_rf.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_rf = MulticlassMetrics(predictionAndLabels_rf )
# F1 score
print("F1 score of training set: ", metrics_rf.fMeasure())
print("Precision of training set: ", metrics_rf.precision(1))
print("Recall of training set: " , metrics_rf.recall(1))
print(metrics_rf.confusionMatrix().toArray())
print()

## test_set
predictionAndLabels_rf = pred_test_rf.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_rf = MulticlassMetrics(predictionAndLabels_rf)
# F1 score
print("F1 score of test set: ", metrics_rf.fMeasure())
print("Precision of test set: ", metrics_rf.precision(1))
print("Recall of test set: " , metrics_rf.recall(1))
print(metrics_rf.confusionMatrix().toArray())
print(datetime.now() - starttime)
print()

## validation_set
predictionAndLabels_rf = pred_validation_rf.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_rf = MulticlassMetrics(predictionAndLabels_rf)
# F1 score
print("F1 score of validation set: ", metrics_rf.fMeasure())
print("Precision of validation set: ", metrics_rf.precision(1))
print("Recall of validation set: ", metrics_rf.recall(1))
print(metrics_rf.confusionMatrix().toArray())
print(datetime.now() - starttime)

F1 score of training set:  0.9294871794871795
Precision of training set:  0.9666666666666667
Recall of training set:  0.7435897435897436
[[ 116.    1.]
 [  10.   29.]]

F1 score of test set:  0.9166666666666666
Precision of test set:  1.0
Recall of test set:  0.6666666666666666
[[ 27.   0.]
 [  3.   6.]]
0:03:01.402539

F1 score of validation set:  0.8484848484848485
Precision of validation set:  0.3333333333333333
Recall of validation set:  0.25
[[ 27.   2.]
 [  3.   1.]]
0:04:09.096067


In [23]:
# Logistic regression
##
starttime = datetime.now()
model_lr = lr.fit(train)
pred_train_lr = model_lr.transform(train)
pred_test_lr = model_lr.transform(test)
pred_validation_lr = model_lr.transform(validation)

## train_set
predictionAndLabels_lr = pred_train_lr.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_lr = MulticlassMetrics(predictionAndLabels_lr)

# F1 score
print("F1 score of training set: ", metrics_lr.fMeasure())
print("Precision of training set: ", metrics_lr.precision(1))
print("Recall of training set: " , metrics_lr.recall(1))
print(metrics_lr.confusionMatrix().toArray())
print()

## test_set
predictionAndLabels_lr = pred_test_lr.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_lr = MulticlassMetrics(predictionAndLabels_lr)
# F1 score
print("F1 score of test set: ", metrics_lr.fMeasure())
print("Precision of test set: ", metrics_lr.precision(1))
print("Recall of test set: ", metrics_lr.recall(1))
print(metrics_lr.confusionMatrix().toArray())
print(datetime.now() - starttime)
print()

## validation_set
predictionAndLabels_lr = pred_validation_lr.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_lr = MulticlassMetrics(predictionAndLabels_lr)
# F1 score
print("F1 score of validation set: " , metrics_lr.fMeasure())
print("Precision of validation set:  " , metrics_lr.precision(1))
print("Recall on validation set: ", metrics_lr.recall(1))
print(metrics_lr.confusionMatrix().toArray())
print(datetime.now() - starttime)

F1 score of training set:  0.8846153846153846
Precision of training set:  0.7333333333333333
Recall of training set:  0.8461538461538461
[[ 105.   12.]
 [   6.   33.]]

F1 score of test set:  0.6388888888888888
Precision of test set:  0.3333333333333333
Recall of test set:  0.4444444444444444
[[ 19.   8.]
 [  5.   4.]]
0:02:35.169486

F1 score of validation set:  0.7575757575757576
Precision of validation set:   0.16666666666666666
Recall on validation set:  0.25
[[ 24.   5.]
 [  3.   1.]]
0:03:40.341912


In [24]:
# GBT
##
starttime = datetime.now()
model_gbt = gbt.fit(train)
pred_train_gbt = model_gbt.transform(train)
pred_test_gbt = model_gbt.transform(test)
pred_validation_gbt = model_gbt.transform(validation)

## train_set
predictionAndLabels_gbt = pred_train_gbt.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_gbt = MulticlassMetrics(predictionAndLabels_gbt)

# F1 score
print("F1 score of training set: ", metrics_gbt.fMeasure())
print("Precision of training set: ", metrics_gbt.precision(1))
print("Recall of training set: ", metrics_gbt.recall(1))
print(metrics_gbt.confusionMatrix().toArray())
print()

## test_set
predictionAndLabels_gbt = pred_test_gbt.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_gbt = MulticlassMetrics(predictionAndLabels_gbt)
# F1 score
print("F1 score of test set: ", metrics_gbt.fMeasure())
print("Precision of test set: " , metrics_gbt.precision(1))
print("Recall of test set: " , metrics_gbt.recall(1))
print(metrics_gbt.confusionMatrix().toArray())
print(datetime.now() - starttime)
print()

## valid_set
predictionAndLabels_gbt = pred_validation_gbt.rdd.map(lambda x: (float(x.prediction), float(x.label)))
# Instantiate metrics object
metrics_gbt = MulticlassMetrics(predictionAndLabels_gbt)
# F1 score
print("F1 score of validation set: ", metrics_gbt.fMeasure())
print("Precision of validation set: ", metrics_gbt.precision(1))
print("Recall of validation set: ", metrics_gbt.recall(1))
print(metrics_gbt.confusionMatrix().toArray())
print(datetime.now() - starttime)

F1 score of training set:  1.0
Precision of training set:  1.0
Recall of training set:  1.0
[[ 117.    0.]
 [   0.   39.]]

F1 score of test set:  0.8055555555555556
Precision of test set:  0.5833333333333334
Recall of test set:  0.7777777777777778
[[ 22.   5.]
 [  2.   7.]]
0:04:19.272090

F1 score of validation set:  0.9090909090909091
Precision of validation set:  0.6666666666666666
Recall of validation set:  0.5
[[ 28.   1.]
 [  2.   2.]]
0:05:25.579408


# Conclusion

Here, I chose `GBTclassifier` as the final model for this Sparkify project. 

To select the best model, I considered two metrics F1 score and recall. `Logistic Regression` was not qualified because I tested  several times with different thresholds, it could not show a good F1 score compared to other two. In terms of `F1 score`, `Random Forest Classifier` showed a good F1 score, a good precision, but a bad recall.  The main goal of this project is to predict Sparkify users who are going to churn for `Sparkify`. With a expected churn list, Sparkify will do something to prevent the users from churning. 


**So, it is meaningful to scarifice some F1 score and get higher recall for Sparkify because the company could do some promotions on the user who are reallly going to churn.** If I selected  `Random Forest Classifier` as my prediction model, the majority of the real churn users would not be predicted by the model.