# Sparkify Project Workspace - FE and Modelling Part
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [99]:
# import libraries
from pyspark.sql import SparkSession

In [100]:
# create a Spark session

from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import avg, stddev, split, udf, isnull, first, col, format_number, rand
from pyspark.sql.types import IntegerType, ArrayType, FloatType, DoubleType, Row, DateType, StringType, LongType,TimestampType
from pyspark.sql.functions import regexp_replace, col
import pyspark.sql.functions as sf
import pyspark.sql.types as st
import pyspark.sql.functions as F
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
import datetime
from pyspark.sql.functions import from_utc_timestamp, from_unixtime


from pyspark.sql import Window
import pandas as pd
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
%matplotlib inline

import re
from pyspark.sql import functions as sF
from pyspark.sql import types as sT

from functools import reduce

sns.set_style('whitegrid')
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler, VectorAssembler


In [101]:

from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import IntegerType, ArrayType, FloatType, DoubleType, Row, DateType
from pyspark.ml.linalg import DenseVector, SparseVector
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier,DecisionTreeClassifier,LinearSVC
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [102]:
pd.set_option('display.max_columns', None)  
pd.set_option('display.expand_frame_repr', False)
pd.set_option('max_colwidth', -1)

In [103]:
from IPython.display import display, HTML

display(HTML(data="""
<style>
    div#notebook-container    { width: 95%; }
    div#menubar-container     { width: 65%; }
    div#maintoolbar-container { width: 99%; }
</style>
"""))

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [104]:
# create a spark session
def get_spark_session(master,appName):
    spark = SparkSession.builder.master(master).appName(appName).getOrCreate()
    return spark

In [105]:
def load_dataset(filename):
    df_temp = spark.read.json(filename)
    return df_temp

In [106]:
def feature_engineering_phase_1(df_local):
    df_local = df_local.withColumn('transaction_timestamp', from_unixtime(col('ts').cast(LongType())/1000).cast(TimestampType()))
    df_local = df_local.withColumn('registration_timestamp', from_unixtime(col('registration').cast(LongType())/1000).cast(TimestampType()))
    df_local = df_local.withColumn('year', F.col('transaction_timestamp').cast('string').substr(1, 4).cast('int')) 
    df_local = df_local.withColumn('month', F.col('transaction_timestamp').cast('string').substr(6, 2).cast('int'))
    df_local = df_local.withColumn('day', F.col('transaction_timestamp').cast('string').substr(9, 2).cast('int'))
    df_local = df_local.withColumn('location', split(col('location'),',').getItem(1))
    df_local = df_local.withColumn('gender',F.when((col('gender')=='M'),1).otherwise(0))
    #states = set([state[1].strip() for state in [x.split(',') for x in df_local.location.unique()]])
    #states = set([state[1].strip() for state in [x.split(',') for x in np.array(df_local.select('location').distinct().toPandas()['location'].tolist())]])    
    # Define a user defined function
    #state = udf(lambda x: x.split(',')[1].strip())
    #df_local = df_local.withColumn("location", state(df.location))
    df_local = df_local.filter(df_local.userId != "")
    df_local = df_local.filter(col('userId').isNotNull())
    df_local = df_local.dropna(how = "any", subset = ["userId", "sessionId"])
    df_local = df_local.withColumn('churn_flag',F.when((col('page').isin(['Cancellation Confirmation','Cancel'])) |  (col('auth')=='Cancelled'),1 ).otherwise(0))
    return df_local



In [107]:
def feature_engineering_phase_2(df_local):
    ex = '\(([^\)]*)\)'
    userAgents = [x for x  in np.array(df_local.select('userAgent').distinct().toPandas()['userAgent'].tolist())] 
    mapping = {'Compatible': 1,  'Ipad': 2,  'Iphone': 3, 'Macintosh': 4,  'Windows nt 5.1': 5,  'Windows nt 6.0': 6,  'Windows nt 6.1': 7,  'Windows nt 6.2': 8,  'Windows nt 6.3': 9,  'X11': 10}
    os_specific = udf(lambda x: mapping[re.findall(ex, x)[0].split(';')[0].capitalize()])
    df_local = df_local.withColumn("os", os_specific(df_local.userAgent).cast('int'))
    df_local = df_local.withColumn('age', F.datediff(df.transaction_timestamp, df.registration_timestamp).alias('age').cast(IntegerType()))
    df_local = df_local.drop('userAgent')
    df_local = df_local.drop('registration')
    for field in df_local.schema.fields:
        if field.dataType==StringType():
            df_local = df_local.withColumn(field.name, regexp_replace(field.name, '[^a-zA-Z0-9\,\-]', ''))
    df_level = df_local.orderBy('ts', ascending=False).groupBy('userId').agg(first('level').alias('last_level'))
    df_status = df_local.orderBy('ts', ascending=False).groupBy('userId').agg(first('status').cast('int').alias('last_status'))
    df_local = df_local.join(df_level, on='userId') 
    df_local = df_local.join(df_status, on='userId') 
    df_local = df_local.drop('level')
    df_local = df_local.drop('status')
    df_local = df_local.drop('length')
    return df_local

In [108]:
def feature_engineering_phase_3(df_new):
    df_new = df_new.groupby('userId','year','month','day').agg(F.countDistinct(df_new.sessionId).alias('d_cnt_session'),
                                                               F.max(df_new.location).cast('string').alias('d_location'),
                                                               F.max(df_new.last_level).cast('string').alias('d_last_level'),
                                                               F.max(df_new.last_status).cast('int').alias('d_last_status'),
                                                               F.max(df_new.age).cast('int').alias('d_age'),
                                                               F.max(df_new.os).cast('string').alias('d_os'),
                                                               F.max(df_new.churn_flag).cast('int').alias('d_churn'),
                                                               F.count(df_new.song).cast('int').alias('d_cnt_song'),
                                                               F.max(df_new.itemInSession).alias('d_max_item_in_session'),
                                                               F.min(df_new.itemInSession).alias('d_min_item_in_session'),
                                                               F.avg(df_new.itemInSession).cast('int').alias('d_avg_item_in_session'),
                                                               ((F.max(df_new.transaction_timestamp).cast(LongType()) - F.min(df_new.transaction_timestamp).cast(LongType()))/24*60).cast('double').alias('d_max_session_duration'),
                                                               ((F.min(df_new.transaction_timestamp).cast(LongType()) - F.min(df_new.transaction_timestamp).cast(LongType()))/24*60).cast('double').alias('d_min_session_duration'),
                                                               F.sum(F.when((df_new.page=='About'),1 ).otherwise(0)).alias('d_cnt_about'),
                                                               F.sum(F.when((df_new.page=='AddFriend'),1 ).otherwise(0)).alias('d_cnt_addfriend'),
                                                               F.sum(F.when((df_new.page=='AddtoPlaylist'),1 ).otherwise(0)).alias('d_cnt_addtoplaylist'),
                                                               F.sum(F.when((df_new.page=='Downgrade'),1 ).otherwise(0)).alias('d_cnt_downgrade'),
                                                               F.sum(F.when((df_new.page=='Error'),1 ).otherwise(0)).alias('d_cnt_error'),
                                                               F.sum(F.when((df_new.page=='Help'),1 ).otherwise(0)).alias('d_cnt_help'),
                                                               F.sum(F.when((df_new.page=='Home'),1 ).otherwise(0)).alias('d_cnt_home'),
                                                               F.sum(F.when((df_new.page=='Logout'),1 ).otherwise(0)).alias('d_cnt_logout'),
                                                               F.sum(F.when((df_new.page=='NextSong'),1 ).otherwise(0)).alias('d_cnt_nextsong'),
                                                               F.sum(F.when((df_new.page=='RollAdvert'),1 ).otherwise(0)).alias('d_cnt_rolladvert'),
                                                               F.sum(F.when((df_new.page=='SaveSettings'),1 ).otherwise(0)).alias('d_cnt_savesettings'),
                                                               F.sum(F.when((df_new.page=='Settings'),1 ).otherwise(0)).alias('d_cnt_settings'),
                                                               F.sum(F.when((df_new.page=='SubmitDowngrade'),1 ).otherwise(0)).alias('d_cnt_submitdowngrade'),
                                                               F.sum(F.when((df_new.page=='SubmitUpgrade'),1 ).otherwise(0)).alias('d_cnt_submitupgrade'),
                                                               F.sum(F.when((df_new.page=='ThumbsDown'),1 ).otherwise(0)).alias('d_cnt_thumbsdown'),
                                                               F.sum(F.when((df_new.page=='ThumbsUp'),1 ).otherwise(0)).alias('d_cnt_thumbsup'),
                                                               F.sum(F.when((df_new.page=='Upgrade'),1 ).otherwise(0)).alias('d_cnt_upgrade'),
                                                               F.sum(F.when((df_new.song=='YoureTheOne'),1 ).otherwise(0)).alias('d_cnt_song_youretheone'),
                                                               F.sum(F.when((df_new.song=='Revelry'),1 ).otherwise(0)).alias('d_cnt_song_revelry'),
                                                               F.sum(F.when((df_new.song=='Undo'),1 ).otherwise(0)).alias('d_cnt_song_undo'),
                                                               F.sum(F.when((df_new.song=='Sehrkosmisch'),1 ).otherwise(0)).alias('d_cnt_song_sehrkosmisch'),
                                                               F.sum(F.when((df_new.song=='HornConcertoNo4inEflatK495IIRomanceAndantecantabile'),1 ).otherwise(0)).alias('d_cnt_song_hornconcerto'),
                                                               F.sum(F.when((df_new.song=='DogDaysAreOverRadioEdit'),1 ).otherwise(0)).alias('d_cnt_song_dogdaysareoverradio'),
                                                               F.sum(F.when((df_new.song=='UseSomebody'),1 ).otherwise(0)).alias('d_cnt_song_usesomebody'),
                                                               F.sum(F.when((df_new.song=='Secrets'),1 ).otherwise(0)).alias('d_cnt_song_secrets'),
                                                               F.sum(F.when((df_new.song=='Canada'),1 ).otherwise(0)).alias('d_cnt_song_canada'),
                                                               F.sum(F.when((df_new.song=='SinceritEtJalousie'),1 ).otherwise(0)).alias('d_cnt_song_sinceritetjalousie'),
                                                               F.sum(F.when((df_new.song=='AintMisbehavin'),1 ).otherwise(0)).alias('d_cnt_song_aintmisbehavin'),
                                                               F.sum(F.when((df_new.song=='Reprsente'),1 ).otherwise(0)).alias('d_cnt_song_reprsente'),
                                                               F.sum(F.when((df_new.song=='LoveStory'),1 ).otherwise(0)).alias('d_cnt_song_lovestory'),
                                                               F.sum(F.when((df_new.song=='Fireflies'),1 ).otherwise(0)).alias('d_cnt_song_fireflies'),
                                                               F.sum(F.when((df_new.song=='CatchYouBabyStevePitronMaxSannaRadioEdit'),1 ).otherwise(0)).alias('d_cnt_song_catchyoubabysteve'),
                                                               F.sum(F.when((df_new.song=='HeySoulSister'),1 ).otherwise(0)).alias('d_cnt_song_heysoulsister'),
                                                               F.sum(F.when((df_new.song=='TheGift'),1 ).otherwise(0)).alias('d_cnt_song_thegift'),
                                                               F.sum(F.when((df_new.song=='Invalid'),1 ).otherwise(0)).alias('d_cnt_song_invalid'),
                                                               F.sum(F.when((df_new.song=='SomebodyToLove'),1 ).otherwise(0)).alias('d_cnt_song_somebodytolove'),
                                                               F.sum(F.when((df_new.artist=='KingsOfLeon'),1 ).otherwise(0)).alias('d_cnt_kingsofleon'),
                                                               F.sum(F.when((df_new.artist=='Coldplay'),1 ).otherwise(0)).alias('d_cnt_coldplay'),
                                                               F.sum(F.when((df_new.artist=='DwightYoakam'),1 ).otherwise(0)).alias('d_cnt_dwightyoakam'),
                                                               F.sum(F.when((df_new.artist=='FlorenceTheMachine'),1 ).otherwise(0)).alias('d_cnt_florencethemachine'),
                                                               F.sum(F.when((df_new.artist=='TheBlackKeys'),1 ).otherwise(0)).alias('d_cnt_theblackkeys'),
                                                               F.sum(F.when((df_new.artist=='Bjrk'),1 ).otherwise(0)).alias('d_cnt_bjrk'),
                                                               F.sum(F.when((df_new.artist=='JustinBieber'),1 ).otherwise(0)).alias('d_cnt_justinbieber'),
                                                               F.sum(F.when((df_new.artist=='JackJohnson'),1 ).otherwise(0)).alias('d_cnt_jackjohnson'),
                                                               F.sum(F.when((df_new.artist=='TaylorSwift'),1 ).otherwise(0)).alias('d_cnt_taylorswift'),
                                                               F.sum(F.when((df_new.artist=='Harmonia'),1 ).otherwise(0)).alias('d_cnt_harmonia'),
                                                               F.sum(F.when((df_new.artist=='AllianceEthnik'),1 ).otherwise(0)).alias('d_cnt_allianceethnik'),
                                                               F.sum(F.when((df_new.artist=='GunsNRoses'),1 ).otherwise(0)).alias('d_cnt_gunsnroses'),
                                                               F.sum(F.when((df_new.artist=='Train'),1 ).otherwise(0)).alias('d_cnt_train'),
                                                               F.sum(F.when((df_new.artist=='Eminem'),1 ).otherwise(0)).alias('d_cnt_eminem'),
                                                               F.sum(F.when((df_new.artist=='OneRepublic'),1 ).otherwise(0)).alias('d_cnt_onerepublic')
                                                              )

    return df_new

In [109]:
def feature_engineering_phase_4(df_new_local):
    df_new_local = df_new_local.groupby('userId').agg(   F.max(df_new_local.d_churn).cast('int').alias('label'),                                                         
                                                         F.max(df_new_local.d_age).cast('int').alias('age'),
                                                         F.max(df_new_local.d_os).cast('int').alias('os'),
                                                         F.max(df_new_local.d_location).cast('string').alias('location'),       
                                                         F.max(df_new_local.d_last_level).cast('string').alias('2m_last_level'),
                                                         F.max(df_new_local.d_last_status).cast('int').alias('2m_last_status'),
                                                         F.sum(df_new_local.d_cnt_song).alias('2m_avg_song'),   
                                                         F.sum(df_new_local.d_cnt_session).alias('2m_sum_session'),
                                                         F.avg(df_new_local.d_cnt_session).alias('2m_avg_session'),
                                                         F.max(df_new_local.d_cnt_session).alias('2m_max_session'),
                                                         F.max(df_new_local.d_max_item_in_session).alias('2m_max_item_in_session'),
                                                         F.min(df_new_local.d_min_item_in_session).alias('2m_min_item_in_session'),
                                                         F.avg(df_new_local.d_avg_item_in_session).cast('int').alias('2m_avg_item_in_session'),
                                                         F.max(df_new_local.d_max_session_duration).cast('int').alias('2m_max_session_duration'),
                                                         F.max(df_new_local.d_min_session_duration).cast('int').alias('2m_min_session_duration'),
                                                         F.avg(df_new_local.d_cnt_rolladvert).alias('2m_avg_page_rolladvert'),
                                                         F.avg(df_new_local.d_cnt_settings).alias('2m_avg_page_settings'),
                                                         F.avg(df_new_local.d_cnt_downgrade).alias('2m_avg_page_downgrade'),
                                                         F.avg(df_new_local.d_cnt_nextsong).alias('2m_avg_page_nextsong'),
                                                         F.avg(df_new_local.d_cnt_error).alias('2m_avg_page_error'),
                                                         F.avg(df_new_local.d_cnt_about).alias('2m_avg_page_about'),
                                                         F.avg(df_new_local.d_cnt_upgrade).alias('2m_avg_page_upgrade'),
                                                         F.avg(df_new_local.d_cnt_home).alias('2m_avg_page_home'),
                                                         F.avg(df_new_local.d_cnt_logout).alias('2m_avg_page_logout'),
                                                         F.avg(df_new_local.d_cnt_addtoplaylist).alias('2m_avg_page_addtoplaylist'),
                                                         F.avg(df_new_local.d_cnt_thumbsdown).alias('2m_avg_page_thumbsdown'),
                                                         F.avg(df_new_local.d_cnt_thumbsup).alias('2m_avg_page_thumbsup'),
                                                         F.avg(df_new_local.d_cnt_savesettings).alias('2m_avg_page_savesettings'),
                                                         F.avg(df_new_local.d_cnt_addfriend).alias('2m_avg_page_addfriend'),
                                                         F.avg(df_new_local.d_cnt_submitupgrade).alias('2m_avg_page_submitupgrade'),
                                                         F.avg(df_new_local.d_cnt_help).alias('2m_avg_page_help'),
                                                         F.avg(df_new_local.d_cnt_submitdowngrade).alias('2m_avg_page_submitdowngrade'),
                                                         F.sum(df_new_local.d_cnt_rolladvert).alias('2m_sum_page_rolladvert'),
                                                         F.sum(df_new_local.d_cnt_settings).alias('2m_sum_page_settings'),
                                                         F.sum(df_new_local.d_cnt_downgrade).alias('2m_sum_page_downgrade'),
                                                         F.sum(df_new_local.d_cnt_nextsong).alias('2m_sum_page_nextsong'),
                                                         F.sum(df_new_local.d_cnt_error).alias('2m_sum_page_error'),
                                                         F.sum(df_new_local.d_cnt_about).alias('2m_sum_page_about'),
                                                         F.sum(df_new_local.d_cnt_upgrade).alias('2m_sum_page_upgrade'),
                                                         F.sum(df_new_local.d_cnt_home).alias('2m_sum_page_home'),
                                                         F.sum(df_new_local.d_cnt_logout).alias('2m_sum_page_logout'),
                                                         F.sum(df_new_local.d_cnt_addtoplaylist).alias('2m_sum_page_addtoplaylist'),
                                                         F.sum(df_new_local.d_cnt_thumbsdown).alias('2m_sum_page_thumbsdown'),
                                                         F.sum(df_new_local.d_cnt_thumbsup).alias('2m_sum_page_thumbsup'),
                                                         F.sum(df_new_local.d_cnt_savesettings).alias('2m_sum_page_savesettings'),
                                                         F.sum(df_new_local.d_cnt_addfriend).alias('2m_sum_page_addfriend'),
                                                         F.sum(df_new_local.d_cnt_submitupgrade).alias('2m_sum_page_submitupgrade'),
                                                         F.sum(df_new_local.d_cnt_help).alias('2m_sum_page_help'),
                                                         F.sum(df_new_local.d_cnt_submitdowngrade).alias('2m_sum_page_submitdowngrade'),
                                                         F.avg(df_new_local.d_cnt_song_youretheone).alias('2m_avg_song_youretheone'),
                                                         F.avg(df_new_local.d_cnt_song_revelry).alias('2m_avg_song_revelry'),
                                                         F.avg(df_new_local.d_cnt_song_undo).alias('2m_avg_song_undo'),
                                                         F.avg(df_new_local.d_cnt_song_sehrkosmisch).alias('2m_avg_song_sehrkosmisch'),
                                                         F.avg(df_new_local.d_cnt_song_hornconcerto).alias('2m_avg_song_hornconcerto'),
                                                         F.avg(df_new_local.d_cnt_song_dogdaysareoverradio).alias('2m_avg_song_dogdaysareoverradio'),
                                                         F.avg(df_new_local.d_cnt_song_usesomebody).alias('2m_avg_song_usesomebody'),
                                                         F.avg(df_new_local.d_cnt_song_secrets).alias('2m_avg_song_secrets'),
                                                         F.avg(df_new_local.d_cnt_song_canada).alias('2m_avg_song_canada'),
                                                         F.avg(df_new_local.d_cnt_song_sinceritetjalousie).alias('2m_avg_song_sinceritetjalousie'),
                                                         F.avg(df_new_local.d_cnt_song_aintmisbehavin).alias('2m_avg_song_aintmisbehavin'),
                                                         F.avg(df_new_local.d_cnt_song_reprsente).alias('2m_avg_song_reprsente'),
                                                         F.avg(df_new_local.d_cnt_song_lovestory).alias('2m_avg_song_lovestory'),
                                                         F.avg(df_new_local.d_cnt_song_fireflies).alias('2m_avg_song_fireflies'),
                                                         F.avg(df_new_local.d_cnt_song_catchyoubabysteve).alias('2m_avg_song_catchyoubabysteve'),
                                                         F.avg(df_new_local.d_cnt_song_heysoulsister).alias('2m_avg_song_heysoulsister'),
                                                         F.avg(df_new_local.d_cnt_song_thegift).alias('2m_avg_song_thegift'),
                                                         F.avg(df_new_local.d_cnt_song_invalid).alias('2m_avg_song_invalid'),
                                                         F.avg(df_new_local.d_cnt_song_somebodytolove).alias('2m_avg_song_somebodytolove'),
                                                         F.avg(df_new_local.d_cnt_kingsofleon).alias('2m_avg_artist_kingsofleon'),
                                                         F.avg(df_new_local.d_cnt_coldplay).alias('2m_avg_artist_coldplay'),
                                                         F.avg(df_new_local.d_cnt_dwightyoakam).alias('2m_avg_artist_dwightyoakam'),
                                                         F.avg(df_new_local.d_cnt_florencethemachine).alias('2m_avg_artist_florencethemachine'),
                                                         F.avg(df_new_local.d_cnt_theblackkeys).alias('2m_avg_artist_theblackkeys'),
                                                         F.avg(df_new_local.d_cnt_bjrk).alias('2m_avg_artist_bjrk'),
                                                         F.avg(df_new_local.d_cnt_justinbieber).alias('2m_avg_artist_justinbieber'),
                                                         F.avg(df_new_local.d_cnt_jackjohnson).alias('2m_avg_artist_jackjohnson'),
                                                         F.avg(df_new_local.d_cnt_taylorswift).alias('2m_avg_artist_taylorswift'),
                                                         F.avg(df_new_local.d_cnt_harmonia).alias('2m_avg_artist_harmonia'),
                                                         F.avg(df_new_local.d_cnt_allianceethnik).alias('2m_avg_artist_allianceethnik'),
                                                         F.avg(df_new_local.d_cnt_gunsnroses).alias('2m_avg_artist_gunsnroses'),
                                                         F.avg(df_new_local.d_cnt_train).alias('2m_avg_artist_train'),
                                                         F.avg(df_new_local.d_cnt_eminem).alias('2m_avg_artist_eminem'),
                                                         F.avg(df_new_local.d_cnt_onerepublic).alias('2m_avg_artist_onerepublic')
                                                                  )
    df_new_local = df_new_local.drop('userId')
    return df_new_local

In [111]:
spark = get_spark_session("local","Udacity - Sparkify")
df=load_dataset('mini_sparkify_event_data.json')

In [113]:
print('before cleaning {}'.format(df.count()))
df = feature_engineering_phase_1(df)
print('after cleaning {}'.format(df.count()))

before cleaning 286500
after cleaning 278154


First I cleaned the dataset, transformed some features to 

In [114]:
df = feature_engineering_phase_2(df)

In [115]:
df = feature_engineering_phase_3(df)
#df_dap = df.toPandas()
#df_dap.head(10)

In [116]:
df = feature_engineering_phase_4(df)
#df_map = df.toPandas()
#df_map.head(10)

In [117]:
df.printSchema()

root
 |-- label: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- os: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- 2m_last_level: string (nullable = true)
 |-- 2m_last_status: integer (nullable = true)
 |-- 2m_avg_song: long (nullable = true)
 |-- 2m_sum_session: long (nullable = true)
 |-- 2m_avg_session: double (nullable = true)
 |-- 2m_max_session: long (nullable = true)
 |-- 2m_max_item_in_session: long (nullable = true)
 |-- 2m_min_item_in_session: long (nullable = true)
 |-- 2m_avg_item_in_session: integer (nullable = true)
 |-- 2m_max_session_duration: integer (nullable = true)
 |-- 2m_min_session_duration: integer (nullable = true)
 |-- 2m_avg_page_rolladvert: double (nullable = true)
 |-- 2m_avg_page_settings: double (nullable = true)
 |-- 2m_avg_page_downgrade: double (nullable = true)
 |-- 2m_avg_page_nextsong: double (nullable = true)
 |-- 2m_avg_page_error: double (nullable = true)
 |-- 2m_avg_page_about: double (nullable = true)

In [118]:
#df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).toPandas()

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [121]:
numeric_features = ['age','os','2m_last_status','2m_avg_song','2m_sum_session','2m_avg_session','2m_max_session','2m_max_item_in_session',\
                '2m_min_item_in_session','2m_avg_item_in_session','2m_max_session_duration','2m_min_session_duration','2m_avg_page_rolladvert',\
                '2m_avg_page_settings','2m_avg_page_downgrade','2m_avg_page_nextsong','2m_avg_page_error','2m_avg_page_about','2m_avg_page_upgrade',\
                '2m_avg_page_home','2m_avg_page_logout','2m_avg_page_addtoplaylist','2m_avg_page_thumbsdown','2m_avg_page_thumbsup','2m_avg_page_savesettings',\
                '2m_avg_page_addfriend','2m_avg_page_submitupgrade','2m_avg_page_help','2m_avg_page_submitdowngrade','2m_sum_page_rolladvert','2m_sum_page_settings',\
                '2m_sum_page_downgrade','2m_sum_page_nextsong','2m_sum_page_error','2m_sum_page_about','2m_sum_page_upgrade','2m_sum_page_home','2m_sum_page_logout',\
                '2m_sum_page_addtoplaylist','2m_sum_page_thumbsdown','2m_sum_page_thumbsup','2m_sum_page_savesettings','2m_sum_page_addfriend','2m_sum_page_submitupgrade',\
                '2m_sum_page_help','2m_sum_page_submitdowngrade','2m_avg_song_youretheone','2m_avg_song_revelry','2m_avg_song_undo','2m_avg_song_sehrkosmisch',\
                '2m_avg_song_hornconcerto','2m_avg_song_dogdaysareoverradio','2m_avg_song_usesomebody','2m_avg_song_secrets','2m_avg_song_canada','2m_avg_song_sinceritetjalousie',\
                '2m_avg_song_aintmisbehavin','2m_avg_song_reprsente','2m_avg_song_lovestory','2m_avg_song_fireflies','2m_avg_song_catchyoubabysteve','2m_avg_song_heysoulsister',\
                '2m_avg_song_thegift','2m_avg_song_invalid','2m_avg_song_somebodytolove','2m_avg_artist_kingsofleon','2m_avg_artist_coldplay','2m_avg_artist_dwightyoakam',\
                '2m_avg_artist_florencethemachine','2m_avg_artist_theblackkeys','2m_avg_artist_bjrk','2m_avg_artist_justinbieber','2m_avg_artist_jackjohnson',\
                '2m_avg_artist_taylorswift','2m_avg_artist_harmonia','2m_avg_artist_allianceethnik','2m_avg_artist_gunsnroses','2m_avg_artist_train',\
                '2m_avg_artist_eminem','2m_avg_artist_onerepublic']

indexer_location = StringIndexer(inputCol='location', outputCol='location_index')
indexer_2m_last_level = StringIndexer(inputCol='2m_last_level', outputCol='level_index')
assembler = VectorAssembler(inputCols=numeric_features, outputCol='features')
process_pipeline = Pipeline(stages=[indexer_location, indexer_2m_last_level, assembler])
modelling_dataframe = process_pipeline.fit(df).transform(df)


In [122]:
train, test = modelling_dataframe.randomSplit([0.8, 0.2], seed=42)

In [123]:
def fit_the_model(classifier):
    """
    fit and predict with training and test data
    :param classifier : Classifier Algorithm class
    :return classifer and prediction results
    """
    clf = classifier.fit(train)
    result = clf.transform(test)
    return clf, result

In [124]:
def print_metrics(result):
    """
    :param result : prediction results which will be use to print metrics
    """
    evaluator= MulticlassClassificationEvaluator(predictionCol="prediction")
    print('Accuracy: {}'.format(evaluator.evaluate(result.select('label','prediction'), {evaluator.metricName: "accuracy"})))
    print('F1 Score:{}'.format(evaluator.evaluate(result.select('label','prediction'), {evaluator.metricName: "f1"})))

In [125]:
def print_feature_importance(clf, cols):
    a = {}
    feat_imp = clf.featureImportances
    for i in range(len(cols)):
        a.update({cols[i]:feat_imp[i]})
        
    feat_importance = pd.DataFrame.from_dict(a, orient="index").reset_index()
    feat_importance.columns = ['feature', 'importance']
    feat_importance_top_20 = feat_importance.sort_values(by="importance", ascending=False).head(20)
    return feat_importance_top_20

In [126]:
def run_model_pipeline(model_algorithm):
    """
    :param model_algorithm : classifier algorithm class
    :return classifier, prediction result
    """
    clf, result = fit_the_model(model_algorithm)
    print_metrics(result)    
    return clf,result

First model with randomforest with default params

In [127]:
x,y=run_model_pipeline(RandomForestClassifier())

Accuracy: 0.7058823529411765
F1 Score:0.6288515406162466
Test F1 Score : 62.89%


First model with GradientBoosting with default params

In [128]:
x,y=run_model_pipeline(GBTClassifier())

Accuracy: 0.7058823529411765
F1 Score:0.6591970121381886
Test F1 Score : 65.92%


First model with LogisticRegression with default params

In [129]:
x,y=run_model_pipeline(LogisticRegression())

Accuracy: 0.7058823529411765
F1 Score:0.6954248366013073
Test F1 Score : 69.54%


According to default parameters, LogisticRegression is more good than the others based on f1 score

## Tuning Model

In [132]:
indexer_location = StringIndexer(inputCol='location', outputCol='location_index')
indexer_2m_last_level = StringIndexer(inputCol='2m_last_level', outputCol='level_index')
assembler = VectorAssembler(inputCols=numeric_features, outputCol='features')

In [163]:
lr =  LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)
indexer_location = StringIndexer(inputCol='location', outputCol='location_index')
indexer_2m_last_level = StringIndexer(inputCol='2m_last_level', outputCol='level_index')
assembler = VectorAssembler(inputCols=numeric_features, outputCol='features')
pipeline_tune_lr = Pipeline(stages=[indexer_location, indexer_2m_last_level, assembler])

In [167]:
numTrees=[20,60]
maxDepth=[10,30]

In [179]:
def tune_model(classifier_model_alg, numTrees_param, maxDepth_param, metricName_param, numFolds_param ):
    paramGrid = ParamGridBuilder().addGrid(classifier_model_alg.numTrees, numTrees_param).addGrid(classifier_model_alg.maxDepth, maxDepth_param).build() - 
    crossval = CrossValidator(estimator = Pipeline(stages=[classifier_model_alg]), estimatorParamMaps = paramGrid, evaluator = MulticlassClassificationEvaluator(metricName=metricName_param),numFolds = numFolds_param)
    cross_validation_model = crossval.fit(train)
    prediction_results = cross_validation_model.transform(test)
    return prediction_results

tuning the randomforest alg

In [168]:
predictions = tune_model(RandomForestClassifier(), [20,60], [10,30], 'f1', 3 )

In [169]:
print_metrics(predictions)

Accuracy: 0.7647058823529411
F1 Score:0.7030812324929971
Test F1 Score : 70.31%


tuning the randomforest alg

In [181]:
pred_results = tune_model(RandomForestClassifier(), [20,70], [5,30], 'f1', 4 )

In [182]:
print_metrics(pred_results)

Accuracy: 0.7352941176470589
F1 Score:0.6479031804109204
Test F1 Score : 64.79%


In [183]:
pred_results = tune_model(RandomForestClassifier(), [20,80], [10,25], 'f1', 5 )

In [184]:
print_metrics(pred_results)

Accuracy: 0.7058823529411765
F1 Score:0.5841784989858012
Test F1 Score : 58.42%


tuning the randomforest alg

In [188]:
predictions = tune_model(RandomForestClassifier(), [20,75], [10,30], 'f1', 3 )

In [189]:
print_metrics(predictions)

Accuracy: 0.7647058823529411
F1 Score:0.7030812324929971
Test F1 Score : 70.31%


In [204]:
indexer = StringIndexer(inputCol='label', outputCol='label')
assembler = VectorAssembler(inputCols=numeric_features, outputCol='features')

In [207]:
def tune_model_lr(classifier_model_alg, maxIter_param,regParam_param, elasticNetParam_param, numFolds_param ):
    pipeline_lr_tuned = Pipeline(stages=[assembler, indexer, lr])
    paramGrid_lr_tuned = ParamGridBuilder().addGrid(classifier_model_alg.maxIter, maxIter_param).addGrid(classifier_model_alg.regParam, regParam_param).addGrid(classifier_model_alg.elasticNetParam, elasticNetParam_param).build()

    # Cross validator for above grid parameters
    crossval_lr_tuned = CrossValidator(estimator=classifier_model_alg, estimatorParamMaps=paramGrid_lr_tuned,evaluator=BinaryClassificationEvaluator(),numFolds=3)
    crossval_lr_model_tuned = crossval_lr_tuned.fit(train)
    prediction_results_tuned = crossval_lr_model_tuned.transform(test)
    return prediction_results_tuned

tuning the LogisticRegression alg

In [208]:
predictions_lr = tune_model_lr(LogisticRegression(), [10, 20], [0.0, 0.1],[0.0, 0.5],3 )

In [209]:
print_metrics(predictions_lr)

Accuracy: 0.7058823529411765
F1 Score:0.5841784989858012
Test F1 Score : 58.42%


tuning the LogisticRegression alg

In [210]:
predictions_lr2 = tune_model_lr(LogisticRegression(), [5, 25], [0.1, 0.3],[0.0, 0.8],4 )

In [211]:
print_metrics(predictions_lr2)

Accuracy: 0.7647058823529411
F1 Score:0.7030812324929971
Test F1 Score : 70.31%


In [213]:
predictions_lr3 = tune_model_lr(LogisticRegression(), [5, 30], [0.1, 0.5],[0.0, 0.9],4)
print_metrics(predictions_lr3)

Accuracy: 0.7647058823529411
F1 Score:0.7030812324929971
Test F1 Score : 70.31%


tuning the LogisticRegression alg

In [214]:
predictions_lr4 = tune_model_lr(LogisticRegression(), [10, 60], [0.1, 0.5],[0.0, 1.0],3)
print_metrics(predictions_lr4)

Accuracy: 0.7647058823529411
F1 Score:0.7030812324929971
Test F1 Score : 70.31%


As you see, LogisticRegression was the best model for this work

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.