In [68]:
# import libraries
import numpy as np
from time import time

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType

from pyspark.ml.stat import Correlation
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

import warnings
warnings.filterwarnings("ignore")


# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)

df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

In [69]:
#check 
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [70]:
#No of sessions
print('The dataset has {} rows.'.format(df.count()))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The dataset has 26259199 rows.

In [71]:
#Start time
df.select(min(to_timestamp(col('ts')/1000)).alias('Start time')).show()
#End time
df.select(max(to_timestamp(col('ts')/1000)).alias('End time')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+
|         Start time|
+-------------------+
|2018-10-01 00:00:01|
+-------------------+

+-------------------+
|           End time|
+-------------------+
|2018-12-01 00:00:02|
+-------------------+

In [72]:
def clean_session_data(df):
    """Clean a Sparkify dataset 
        Args:
        df: (spark dataframe) a Sparkify dataset
         Returns:
         df: (spark dataframe) a preprocessed Sparkify dataset
     """
    #remove 
    #remove nulls and blanks
    df=df.filter(df['registration'].isNotNull())
    #& df['registration']!=''
    df=df.filter(df['userId'].isNotNull())
    #& df['userId']!=''
    
    
    #convert timestamps to date 
    df=df.withColumn('registrationTime',to_timestamp(col('registration')/1000))\
         .withColumn('time',to_timestamp(col('ts')/1000))
    
    
    return df  

df = clean_session_data(df)
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042', registrationTime=datetime.datetime(2018, 8, 8, 13, 22, 21), time=datetime.datetime(2018, 10, 1, 0, 0, 1))

In [73]:
def add_churn(df,label='Churn'):
    """Add Churn data
    Args:
    df: (spark dataframe) a Sparkify dataset
    Returns:
    df: (spark dataframe) a preprocessed Sparkify dataset with label 'churn'
    """
    userWindow = Window.partitionBy('userId').orderBy('ts').\
                  rangeBetween(Window.unboundedPreceding,Window.unboundedFollowing)
    df=df.withColumn(label,when(col('page')=='Cancellation Confirmation',1).\
                     otherwise(0)).withColumn(label,max(label).over(userWindow))
    return df
df = add_churn(df)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [74]:
df.dropDuplicates(['userId']).groupby('Churn').count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+
|Churn|count|
+-----+-----+
|    1| 5003|
|    0|17274|
+-----+-----+

In [75]:
def build_features(df, label='Churn'):
    """Build features to be used for modeling
    
    Args:
    df: (spark dataframe) a cleaned and labeled Sparkify dataset
    label: (string) label name
    
    Returns:
    user_df: (spark dataframe) a labeled dataset with features of interest grouped by user ids
    """
    userWindow = Window.partitionBy('userId').orderBy('ts').rangeBetween(Window.unboundedPreceding,Window.unboundedFollowing)
    
    # calculate the duration between registration to last activity (in days)
    regist_duration_df = df.groupBy('userId') \
        .agg(((last(col('ts'))-last(col('registration')))/1000/3600/24).alias('registDuration'))

    # compute average session duration (in hours)
    avg_session_duration_df = df \
        .groupBy(['userId', 'sessionId']).agg(min(col('ts')).alias('session_start'), max(col('ts')).alias('session_end'))\
        .groupBy('userId').agg(avg((col('session_end') - col('session_start'))/1000/3600).alias('avgSessionDuration'))

    # define the default start and end of the observation period
    obs_start_default = df.select(min(col('ts'))).collect()[0][0]
    obs_end_default = df.select(max(col('ts'))).collect()[0][0]

    # compute the observation period
    df = df \
        .withColumn('obs_start', when(col('registration') > obs_start_default, first(col('ts')).over(userWindow)) \
                    .otherwise(obs_start_default)) \
        .withColumn('end_state', last(col('page')).over(userWindow)) \
        .withColumn('obs_end', when(col('end_state') == 'Cancellation Confirmation', last(col('ts')).over(userWindow)) \
                    .otherwise(obs_end_default)) \
        .withColumn('obsDays', (col('obs_end') - col('obs_start'))/1000/3600/24)

    # aggregate activity statistics
    user_df = df.groupBy('userId') \
        .agg(first(col(label)).alias(label), \
             first(col('obsDays')).alias('obsDays'), \
             sum(when(col('page') == 'NextSong', 1).otherwise(0)).alias('nSongs'), \
             sum(when(col('page') == 'Thumbs Up', 1).otherwise(0)).alias('nThumbsUp'), \
             sum(when(col('page') == 'Thumbs Down', 1).otherwise(0)).alias('nThumbsDown'), \
             sum(when((col('page') == 'Upgrade') | (col('page') == 'Submit Upgrade'), 1) \
                 .otherwise(0)).alias('nUpgrade'), \
             sum(when((col('page') == 'Downgrade') | (col('page') == 'Submit Downgrade'), 1) \
                 .otherwise(0)).alias('nDowngrade'), \
             sum(when(col('page') == 'Add Friend', 1).otherwise(0)).alias('nAddFriend'), \
             sum(when(col('page') == 'Add to Playlist', 1).otherwise(0)).alias("nAddPlaylist"), \
             sum(when(col('page') == 'Roll Advert', 1).otherwise(0)).alias('nAdvert'), \
             sum(when((col('page') == 'Help'), 1).otherwise(0)).alias('nHelp'), \
             sum(when((col('page') == 'Error'), 1).otherwise(0)).alias('nError')) \
        .join(regist_duration_df, on='userId') \
        .join(avg_session_duration_df, on='userId')

    user_df = user_df \
        .withColumn('avgDailySongs', col('nSongs') / col('obsDays')) \
        .withColumn('avgDailyThumbsUp', col('nThumbsUp') / col('obsDays')) \
        .withColumn('avgDailyThumbsDown', col('nThumbsDown') / col('obsDays')) \
        .withColumn('avgDailyUpgrade', col('nUpgrade') / col('obsDays')) \
        .withColumn('avgDailyDowngrade', col('nDowngrade') / col('obsDays')) \
        .withColumn('avgDailyAddFriend', col('nAddFriend') / col('obsDays')) \
        .withColumn('avgDailyAddPlaylist', col('nAddPlaylist') / col('obsDays')) \
        .withColumn('avgDailyAdvert', col('nAdvert') / col('obsDays')) \
        .withColumn('avgDailyHelp', col('nHelp') / col('obsDays')) \
        .withColumn('avgDailyError', col('nError') / col('obsDays')) \
        #.drop('userId', 'obsDays', 'nSongs', 'nThumbsUp', 'nThumbsDown', 'nUpgrade', 'nDowngrade', \
        #      'nAddFriend', 'nAddPlaylist', 'nAdvert', 'nHelp', 'nError')
    
    return user_df
user_df = build_features(df)
user_df.head(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(userId='1000280', Churn=1, obsDays=43.95814814814815, nSongs=1022, nThumbsUp=53, nThumbsDown=33, nUpgrade=10, nDowngrade=4, nAddFriend=14, nAddPlaylist=25, nAdvert=74, nHelp=8, nError=3, registDuration=77.30377314814815, avgSessionDuration=3.248434343434344, avgDailySongs=23.24938704323136, avgDailyThumbsUp=1.205692283063857, avgDailyThumbsDown=0.7507140630397601, avgDailyUpgrade=0.2274891100120485, avgDailyDowngrade=0.0909956440048194, avgDailyAddFriend=0.3184847540168679, avgDailyAddPlaylist=0.5687227750301213, avgDailyAdvert=1.683419414089159, avgDailyHelp=0.1819912880096388, avgDailyError=0.06824673300361456)]

In [76]:
user_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- userId: string (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- obsDays: double (nullable = true)
 |-- nSongs: long (nullable = true)
 |-- nThumbsUp: long (nullable = true)
 |-- nThumbsDown: long (nullable = true)
 |-- nUpgrade: long (nullable = true)
 |-- nDowngrade: long (nullable = true)
 |-- nAddFriend: long (nullable = true)
 |-- nAddPlaylist: long (nullable = true)
 |-- nAdvert: long (nullable = true)
 |-- nHelp: long (nullable = true)
 |-- nError: long (nullable = true)
 |-- registDuration: double (nullable = true)
 |-- avgSessionDuration: double (nullable = true)
 |-- avgDailySongs: double (nullable = true)
 |-- avgDailyThumbsUp: double (nullable = true)
 |-- avgDailyThumbsDown: double (nullable = true)
 |-- avgDailyUpgrade: double (nullable = true)
 |-- avgDailyDowngrade: double (nullable = true)
 |-- avgDailyAddFriend: double (nullable = true)
 |-- avgDailyAddPlaylist: double (nullable = true)
 |-- avgDailyAdvert: double (nullable = true)
 |-- avgDailyHe

In [77]:
user_df.groupby('Churn').count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+
|Churn|count|
+-----+-----+
|    1| 5003|
|    0|17274|
+-----+-----+

In [78]:
user_df.head(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(userId='1000280', Churn=1, obsDays=43.95814814814815, nSongs=1022, nThumbsUp=53, nThumbsDown=33, nUpgrade=10, nDowngrade=4, nAddFriend=14, nAddPlaylist=25, nAdvert=74, nHelp=8, nError=3, registDuration=52.24008101851852, avgSessionDuration=3.2484343434343432, avgDailySongs=23.24938704323136, avgDailyThumbsUp=1.205692283063857, avgDailyThumbsDown=0.7507140630397601, avgDailyUpgrade=0.2274891100120485, avgDailyDowngrade=0.0909956440048194, avgDailyAddFriend=0.3184847540168679, avgDailyAddPlaylist=0.5687227750301213, avgDailyAdvert=1.683419414089159, avgDailyHelp=0.1819912880096388, avgDailyError=0.06824673300361456)]

In [79]:
#No of sessions
print('The dataset has {} rows.'.format(user_df.count()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The dataset has 22277 rows.

In [81]:
user_df.coalesce(1).write.csv("s3a://aws-emr-resources-693516134661-us-east-2/notebooks/sparkify_preprocessed.csv",mode="overwrite",header='true')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…