# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [None]:
# import libraries
from pyspark.sql import SparkSession

from pyspark.sql.functions import udf, desc, asc, sum, max, min, avg, countDistinct,\
            row_number, col, expr, round,first
from pyspark.sql.types import StringType, IntegerType, LongType
from pyspark.sql import Window

import datetime
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Project") \
    .getOrCreate()

# 1. Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [None]:
data = 'mini_sparkify_event_data.json'
df = spark.read.json(data)
df.persist()

In [None]:
df.printSchema()

In [None]:
print(df.count())
df.head()


#### Deal with null userId and sessionId

In [None]:
df_valid = df.dropna(how = 'any',subset = ['userId','sessionId'])
df_valid.count()



In [None]:
df_valid.select('userId').dropDuplicates().sort('userId').show(5)

In [None]:
df_valid.filter(df_valid.userId == '').select('page').dropDuplicates().show()

In [None]:
df_valid = df_valid.filter(df_valid.userId != '' )
df_valid.count()

#### Time frame standardization

Within all features, most numerical features are related to time period. But every user has a different active period, before explore, I should standnize the time perion to make the numerical features comparable. I think there are two ways to deal with the problem:
- Divide numerical features by total active hours or days.
- Filter actions within a fix time period, such as the last and first 14 days or 30 days.

In [None]:
# built a new column, to save the lastest timestamp
windowval = Window.partitionBy('userId').orderBy(desc('ts')).\
        rangeBetween(Window.unboundedPreceding,0)

df_valid = df_valid.withColumn('last_ts', first('ts').over(windowval))

# add two columns, one is the duration between registration and the last active
# one is the time passed after registration(to get the first 14 days)
# then trans them into hours
df_valid = df_valid.withColumn('active_time',col('last_ts').cast('long') - col('registration').cast('long'))\
                   .withColumn('active_hour',round((col('active_time')/3600000),2))\
                   .withColumn('passed_time',col('ts').cast('long') - col('registration').cast('long'))\
                   .withColumn('passed_hour',round((col('passed_time')/3600000),2))\
                   .withColumn('time_till_last', col('last_ts').cast('long')- col('ts').cast('long'))\
                   .withColumn('hour_till_last',round((col('time_till_last')/3600000),2))\
                   .drop('active_time')\
                   .drop('passed_time')\
                   .drop('time_till_last')

df_valid.show(1)


In [None]:
# get a sub dataframe with the latest 336 hours
df_valid_14day = df_valid.where(df_valid.hour_till_last <= 336)
df_valid_14day.count()

In [None]:
# get another sub dataframe with the first 336 hours
df_valid_14day_lead = df_valid.where(df_valid.passed_hour <= 336)
df_valid_14day_lead.count()

There are 97388 records within the last 14days for all users.

There are 16989 records within the first 14 days for all users.

In [None]:
# users may change their level situation, I will find all user's latest level situation
windowval2 = Window.partitionBy('userId').orderBy(desc('ts'))

df_valid = df_valid.withColumn('latest_level', row_number().over(windowval2))

labeled_level_df = df_valid.select(['userId','level','latest_level','Churn']).where(df_valid.latest_level == 1)
labeled_level_df.collect()


# 2. Exploratory Data Analysis

## 2.1 preliminary analysis

In [None]:
# Define a function to plot value from a single column for future easy using
def single_column_plot(dataframe,title = '',x_label = '', y_label= '',ticks = None, rotation = 0, *args):
    plt.figure(figsize = (8,6))
    df_plot_pd = dataframe.toPandas()
    plt.bar(df_plot_pd.iloc[:,0],df_plot_pd.iloc[:,1])
    plt.xlabel(x_label)
    plt.ylabel(y_label)
    if ticks == None:
        plt.xticks(np.array(df_plot_pd.iloc[:,0]),rotation = rotation,*args)
    else:
        plt.xticks(ticks,rotation = rotation, *args)
    plt.title(title)

    
# Define another function to plot with two columns
def two_column_plot(dataframe,title = '',x_label = '', y_label= ''):
    plt.figure(figsize = (8,6))
    df_plot_pd = dataframe.toPandas()
    sns.barplot(data = df_plot_pd, x = df_plot_pd.columns[0], y = df_plot_pd.columns[2],
            hue = df_plot_pd.columns[1])
    plt.xlabel(x_label)
    plt.ylabel(y_label)
    
    plt.title(title)



In [None]:
get_hour = udf(lambda x : datetime.datetime.
               fromtimestamp(x / 1000.0).hour,IntegerType())
get_day = udf(lambda x : datetime.datetime.
               fromtimestamp(x / 1000.0).day,IntegerType())
get_month = udf(lambda x : datetime.datetime.
               fromtimestamp(x / 1000.0).month,IntegerType())
get_year = udf(lambda x : datetime.datetime.
               fromtimestamp(x / 1000.0).year,IntegerType())

df_valid_temp = df_valid.withColumn('reg_month', get_month(df_valid.registration))\
                        .withColumn('reg_year', get_year(df_valid.registration))\
                        .withColumn('year', get_year(df_valid.ts))\
                        .withColumn('month', get_month(df_valid.ts))\
                        .withColumn('day', get_day(df_valid.ts))\
                        .withColumn('hour', get_hour(df_valid.ts))\


In [None]:
# first see the registration time column
df_valid_temp.select(['reg_year','reg_month']).dropDuplicates().show()

It shows that all user was registed in 2018. Next, I will plot the count in a bar chart.

In [None]:
df_plot = df_valid_temp.groupby('reg_month').agg({'reg_month':'max','reg_month':'count'}).\
                            sort('reg_month')

In [None]:
    
single_column_plot(df_plot,'Month Registration','Month','Count',[i for i in range(3,12)],0)

It shows that the majority users registrated on September, then August and July.

In [None]:
# check the year and month features
df_valid_temp.select(['year','month']).dropDuplicates().show()

In [None]:
df_plot_month = df_valid_temp.groupby('month').agg({'month':'max','month':'count'}).sort('month')
single_column_plot(df_plot_month,'Month recods','Month','Count',[10,11,12],0)


In [None]:
# as there is too few cecords in December, I will check the day feature of Oct and Nov
df_plot_day = df_valid_temp.where(df_valid_temp.month.isin([10,11])).groupby('day').agg({'day':'max','day':'count'}).sort('day')
single_column_plot(df_plot_day,'day recods','Day','Count')

In [None]:
# check the hour feature
df_plot_hour = df_valid_temp.groupby('hour').agg({'hour':'max','hour':'count'}).sort('hour')
single_column_plot(df_plot_hour,'hour recods','Hour','Count')


- All the records are happend in 2018, from Oct to Dec.
- There is a peak and a velley for the trend of the hour feature.

In [None]:
# total records dividened by Gender and Level
gender_level_plot = df_valid.groupby(['gender','level']).count()
print(gender_level_plot.show())
two_column_plot(gender_level_plot,'Record Count for Gender and Level chart', 
                'Gender', 'Count')


In [None]:
# Distinct user gender and level features
dist_gender_level_plot = df_valid.groupby(['gender','level']).agg(countDistinct('userId'))
print(dist_gender_level_plot.show())
two_column_plot(dist_gender_level_plot,'Users Count for Gender and Level chart', 
                'Gender', 'Count');


We can see, there are more male users of both free and paid levels.

But female users are active than male, but not much.

And paid users are much more active.

In [None]:
# page feature
level_plot = df_valid.groupby('page').count()
single_column_plot(level_plot,'Page recods','Page','Count',rotation=90);


In [None]:
# page feature
method_plot = df_valid.groupby('method').count()

single_column_plot(method_plot,'Methods recods','Method','Count');

In [None]:
itemInSession_plot = df_valid.groupby('userId').avg().select('avg(itemInSession)')
itemInSession_plot = itemInSession_plot.toPandas()

plt.figure(figsize = (8,6))

bins = np.arange(0,400,20)
labels = [50*i for i in range(9)]
plt.hist(itemInSession_plot['avg(itemInSession)'],bins = bins)
plt.xticks(labels)
plt.xlabel('Average itemInSession')
plt.ylabel('counts')
plt.title('Histogram for Average ItemInSession');


In [None]:
# page feature
status_plot = df_valid.groupby(df_valid.status.cast('string')).count()
single_column_plot(status_plot,'Status recods','Status','Count')


In [None]:
# auth
auth_plot = df_valid.groupby('auth').count()

single_column_plot(auth_plot,'auth recods','Auth','Count')

### 2.2 Define Churn

Create a column `Churn` to label the users who has interactive with 'Cancellation Confirmation' page. I will use all information to predict this label.

For users who had Cancellation Confirmation, they will be marked as 1 at any time before the Cancellation Confirmation operation.

For users who had no relation with Cancellation Confirmation page, will be marked as 0.


In [None]:
flag_cancel_confirm_event = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0,
                          IntegerType())
windowval = Window.partitionBy('userId').orderBy(desc('ts')).\
        rangeBetween(Window.unboundedPreceding,0)


df_valid = df_valid.withColumn('cancle_confirmed', flag_cancel_confirm_event('page'))
df_valid = df_valid.withColumn('Churn', sum('cancle_confirmed').over(windowval)).drop('cancle_confirmed')

df_valid.head(2)

### 2.3 Exploratory data analysis
I will perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned.

#### 2.3.1 time
Firstly, I will see how long on average users stayed with sparkify

In [None]:
labeled_active_period_df = df_valid.groupby('userId').agg({'Churn':'max','active_hour':'avg'})

labeled_active_period_df.groupby('max(Churn)').agg(avg(col('avg(active_hour)'))).show()


It is obviouly that, churned users have less average active than stayed users, but this can not explane anything.

#### 2.3.2 Action counts

In [None]:
# total average
action_time_df = df_valid.groupby('userId').agg({'Churn':'max','sessionId':'count'})
action_time_df.groupby('max(Churn)').agg(avg(col('count(sessionId)'))).show()



In [None]:
# average action_per_hour
action_time_df = df_valid.groupby('userId').agg({'Churn':'max','sessionId':'count','active_hour':'max'})
action_time_df = action_time_df.withColumn('action_per_hour', col('count(sessionId)')/col('max(active_hour)'))
action_time_df.groupby('max(Churn)').agg(avg(col('action_per_hour'))).show()



In [None]:
# total average of last 14 days
df_valid_14day = df_valid.where(df_valid.hour_till_last <= 336)
action_time_df = df_valid_14day.groupby('userId').agg({'Churn':'max','sessionId':'count'})
action_time_df.groupby('max(Churn)').agg(avg(col('count(sessionId)'))).show()


In [None]:
# total average of first 14 days
df_valid_14day_lead = df_valid.where(df_valid.passed_hour <= 336)
action_time_df = df_valid_14day_lead.groupby('userId').agg({'Churn':'max','sessionId':'count'})
action_time_df.groupby('max(Churn)').agg(avg(col('count(sessionId)'))).show()


It seems that whatever total average on any 14 days and average action_per_hour, churned users took more actions than stayed user.

But on average, churned users became more activer than stayed (512/327 =1.56 > 1.44 = 409/285)

This feature can be used to get the action pencentage(ratio), such as thumb up ratio, next song ratio.

#### 2.3.3 Artist

This a numerical feature, I will explore the standardized value using the last 14 days feature

In [None]:
# average of total distinct artist for each users in each group
average_artist_df = df_valid.groupby('userId').agg(countDistinct(df_valid.artist),max(df_valid.Churn))

average_artist_df.groupby('max(Churn)').agg(avg(col('count(DISTINCT artist)'))).show()

In [None]:
# average of total distinct artist within last 14 days for each users in each group 
average_artist_df = df_valid_14day.groupby('userId').agg(countDistinct(df_valid.artist),max(df_valid.Churn))

average_artist_df.groupby('max(Churn)').agg(avg(col('count(DISTINCT artist)'))).show()

I can see, usually, stayed user selected more artists, but within the latest 14 days, churned user seems to choose more artists.


#### 2.3.4 Gender Feature

It's a categorical feature, do not need to consider the time frame.

In [None]:
labeled_gender_df = df_valid.groupby('userId').agg({'Churn':'min','Gender':'min'}).\
                        groupby(['min(Gender)','min(Churn)']).count()

two_column_plot(labeled_gender_df,'Users Count for Gender and Churn chart', 
                'Gender', 'Count');




It shows that male user have a higher churn rate.

#### 2.3.5 itemInSession

I will see the average itemInSession for each user, and get the average of average for both groups. As I used the average value, I do not need to standard by divide active hours

But I will check the value within 14 days.

In [None]:
# average average itemInSession
labeled_itemInSession_df = df_valid.groupby('userId').agg({'Churn':'max','itemInSession':'avg'})
labeled_itemInSession_df.groupby('max(Churn)').agg(avg(col('avg(itemInSession)'))).show()


In [None]:
# average average itemInSession in last 14 days
labeled_itemInSession_df = df_valid_14day.groupby('userId').agg({'Churn':'max','itemInSession':'avg'})
labeled_itemInSession_df.groupby('max(Churn)').agg(avg(col('avg(itemInSession)'))).show()



In [None]:
# average average itemInSession in first 14 days
labeled_itemInSession_df = df_valid_14day_lead.groupby('userId').agg({'Churn':'max','itemInSession':'avg'})
labeled_itemInSession_df.groupby('max(Churn)').agg(avg(col('avg(itemInSession)'))).show()



The average itemInSession have a small difference betweent two groups.

#### 2.3.6 length

In [None]:
df_valid.where(df_valid.page != 'NextSong').select('length').dropDuplicates().show()

In [None]:
labeled_length_df = df_valid.where(df_valid.page == 'NextSong').groupby('userId').agg({'Churn':'max','length':'avg'})
labeled_length_df.groupby('max(Churn)').agg(avg(col('avg(length)'))).show()



In [None]:
labeled_length_df = df_valid_14day.where(df_valid_14day.page == 'NextSong').groupby('userId').agg({'Churn':'max','length':'avg'})
labeled_length_df.groupby('max(Churn)').agg(avg(col('avg(length)'))).show()



only nextsong page have length reocrds, it seems that there are no sinifigent difference in song length between two groups. 

#### 2.3.7 song


In [None]:
# average user_total_song_palyed in both group
labeled_song_df = df_valid.groupby('userId').agg({'Churn':'max','song':'count'})
labeled_song_df.groupby('max(Churn)').agg(avg(col('count(song)'))).show()


In [None]:
# average user_songs_played_per_hour for each group
labeled_song_df = df_valid.groupby('userId').agg({'Churn':'max','song':'count','active_hour':'max'})
labeled_song_df = labeled_song_df.withColumn('song_per_hour', col('count(song)')/col('max(active_hour)'))
labeled_song_df.groupby('max(Churn)').agg(avg(col('song_per_hour'))).show()


In [None]:
# average user_total_song_palyed for each group in latest 14 days
labeled_song_df = df_valid_14day.groupby('userId').agg({'Churn':'max','song':'count'})
labeled_song_df.groupby('max(Churn)').agg(avg(col('count(song)'))).show()



In [None]:
# average user_total_song_palyed for each group in first 14 days
labeled_song_df = df_valid_14day_lead.groupby('userId').agg({'Churn':'max','song':'count'})
labeled_song_df.groupby('max(Churn)').agg(avg(col('count(song)'))).show()



It shows a great difference of average play times by a single user between two groups. Similar to artist feature, churned user seems to listen more songs than stayed users in an certain period.

#### 2.3.2 Level Feature
For there are several users have changed their level, I will see the latest level situaiton

In [None]:
# find all user's latest level situation
labeled_level_df = labeled_level_df.select(['userId','level','latest_level','Churn']).where(labeled_level_df.latest_level == 1)
labeled_level_df.show(5)


In [None]:
labeled_level_df_plot = labeled_level_df.groupby(['level','Churn']).count()
print(labeled_level_df_plot.show())
two_column_plot(labeled_level_df_plot,'Users Count for Level and Churn chart', 
                'Level', 'Count');



It shows a great difference of average play times by a single user between two groups. Similar to artist feature, churned user seems to listen more songs than stayed users in an certain period.

#### 2.3.7 page items

In [None]:
def page_item_func(item):

    print('Analysis "'+item +'"')
    print('-'*10)

    func = udf(lambda x : 1 if x == item else 0, IntegerType())
    temp_df = df_valid.withColumn(item,func('page'))
    temp_df1 = temp_df.groupby('userId').agg({'Churn':'max',item:'sum'})
    print('Full_data')
    print(temp_df1.groupby('max(Churn)').agg(avg(col('sum('+item+')'))).show())

    temp_df2 = temp_df.groupby('userId').agg({'Churn':'max',item:'sum','active_hour':'max'})
    temp_df2 = temp_df2.withColumn('new_col', col('sum('+item+')')/col('max(active_hour)'))
    print('time_standard_full_data')
    print(temp_df2.groupby('max(Churn)').agg(avg(col('new_col'))).show())

    temp_df3 = df_valid_14day.withColumn(item,func('page'))
    temp_df3 = temp_df3.groupby('userId').agg({'Churn':'max',item:'sum'})
    print('14day_data')
    print(temp_df3.groupby('max(Churn)').agg(avg(col('sum('+item+')'))).show())

    print('*'*30 +'\n\n')

    
    
page_item = df_valid.select('page').dropDuplicates().collect()
page_list = [item.page for item in page_item]  

for item in page_list:
    page_item_func(item)

    

'Submit Downgrade','Thumbs Down','Home','Downgrade','Roll Advert','Logout','Add to Playlist','Settings','Add Friend','NextSong','Thumbs Up','Help','Upgrade','Error' all have some relations with churned

Finally, I want to combine the home page with nextsong, to get how many songs users listen to on average between visiting home page 

In [None]:
# build a custom function, if the page is 'home', return 1,else 0
function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

# set up a window partitionby userID, and order by ts desc
user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

# get a new column, store the 'Home' page situation
cusum = df_valid.filter((df_valid.page == 'NextSong') | (df_valid.page == 'Home')) \
    .select('userID', 'page', 'ts','Churn') \
    .withColumn('homevisit', function(col('page')))

cusum = cusum.withColumn('period', sum('homevisit').over(user_window))

cusum = cusum.filter((cusum.page == 'NextSong'))

cusum = cusum.groupBy('userID', 'period').agg({'period':'count','Churn':'max'})

cusum.show(5)
cusum = cusum.groupBy('userID').agg({'count(period)':'avg','max(Churn)':'max'})
cusum.show(5)

cusum.groupBy('max(max(Churn))').agg({'avg(count(period))':'avg'}).show()

In [None]:
cusum = cusum.groupBy('userID').agg({'count(period)':'avg','max(Churn)':'max'})
cusum.show(5)

In [None]:
cusum.groupBy('max(max(Churn))').agg({'avg(count(period))':'avg'}).show()

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

I want to use the following 32 features to train the model.

1. Categorical Features (2 features):
    - gender
    - latest level

2. Numerical pre hour Features (14 features):
    - Action per hour(Action_ph)
    - Session per hour(Session_ph)
    - Nextsong per hour(Nextsong_ph)
    - Submit Downgrade per hour(Downgrade_ph)
    - Submit Upgrade per hour(Upgrade_ph)
    - Thumbs Down per hour(ThumbDown_ph)
    - Thumbs Up per hour(ThumbUp_ph)
    - Home page per hour(Home_ph)
    - Roll Advert per hour(Adv_ph)
    - Add to play list per hour(Addtolist_ph)
    - Setting per hour(Set_ph)
    - Add friends per hour(Addfriend_ph)
    - Error per hour(Error_ph)
    - Help per hour(Help_ph)
    

3. Ratio of Action Features (13 features):
    - Ratio of Actions to Session(Action_toSession)
    - Ratio of Nextsong to Actions(Nextsong_toAct)
    - Ratio of Upgrade to Actions(Upgrade_toAct)
    - Ratio of Downgrade to Actions(Downgrade_toAct)
    - Ratio of Thumbs Up to Actions(Thumbup_toAct)
    - Ratio of Thumbs Down to Actions(Thumbdown_toAct)
    - Ratio of Home page to Actions(Home_toAct)
    - Ratio of Roll Advert to Actions(Adv_toAct)
    - Ratio of Add to play list to Actions(Addtolist_toAct)
    - Ratio of Setting to Actions(Setting_toAct)
    - Ratio of Add friends to Actions(Addfriend_toAct)
    - Ratio of error to Actions(Error_toAct)
    - Ratio of Help to Actions(Help_toAct)
    

4. Active trend Features (2 features):
    - Ratio of last 14 days Nextsong count divide first 14 days(Nextsong_trend)
    - Ratio of last 14 days action count divide first 14 days(Action_trend)
    

5. Other Features (2 features):
    - Average nextsong between homepages(Nextsong_betweenHome)
    - Average session item counts()




In [3]:
# import libraries
from pyspark.sql import SparkSession

from pyspark.sql.functions import udf, desc, asc, sum, max, min, avg, countDistinct,\
            row_number, col, expr, round,first, count
from pyspark.sql.types import StringType, IntegerType, LongType
from pyspark.sql import Window

import datetime
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt


# build spark session
spark = SparkSession.builder \
    .master("local") \
    .config("spark.driver.memory", "15g") \
    .appName("Project") \
    .getOrCreate()

# load data and clean data
data = 'mini_sparkify_event_data.json'
df = spark.read.json(data)
df_valid = df.dropna(how = 'any',subset = ['userId','sessionId'])\
             .filter(col('userId') != '' )
df_valid.persist()

# Clean and wrangling dataframe
# window functions
windowval = Window.partitionBy('userId')\
                  .orderBy('ts')\
                  .rangeBetween(Window.unboundedPreceding,0)
windowval_desc = Window.partitionBy('userId')\
                  .orderBy(desc('ts'))\
                  .rangeBetween(Window.unboundedPreceding,0)
windowval_desc_nobound = Window.partitionBy('userId')\
                               .orderBy(desc('ts'))
flag_cancel_confirm_event = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0,IntegerType())
tem_df = df_valid.groupby('userId').agg(count('page').alias('actions'))
# built two new columns, to save the first and the last timestamp
df_valid = df_valid.withColumn('first_ts', first('ts').over(windowval))\
                   .withColumn('last_ts', first('ts').over(windowval_desc))
# built a column to save the active hours between first_ts and last_ts
df_valid = df_valid.withColumn('active_time',col('last_ts').cast('long') - col('first_ts').cast('long'))\
                   .withColumn('active_hour',round((col('active_time')/3600000),2))\
                   .drop('active_time')
# built a columns to save the time passed from first_ts to this record
df_valid = df_valid.withColumn('passed_time',col('ts').cast('long') - col('first_ts').cast('long'))\
                   .withColumn('passed_hour',round((col('passed_time')/3600000),2))\
                   .drop('passed_time')
# built a columns to save the time passed from this record to the last_ts
df_valid = df_valid.withColumn('time_till_last', col('last_ts').cast('long')- col('ts').cast('long'))\
                   .withColumn('hour_till_last',round((col('time_till_last')/3600000),2))\
                   .drop('time_till_last')
# add a column to save the time order of all records for each user
df_valid = df_valid.withColumn('latest_level', row_number().over(windowval_desc_nobound))
# add Churn feature, if the user churned marked all records with 1, else 0
df_valid = df_valid.withColumn('cancle_confirmed', flag_cancel_confirm_event('page'))\
                   .withColumn('Churn', sum('cancle_confirmed').over(windowval))\
                   .drop('cancle_confirmed')
# add a column to save the all action counts for each user
df_valid = df_valid.join(tem_df, on = ['userId'], how = 'left' )


# extract the necessary features to a new dataframe

# userId + categorical features
new_df = df_valid.where(df_valid.latest_level == 1)\
                 .select('userId','Churn','gender','level')


# Numerical pre hour features
# Action_ph_df
Action_ph_df = df_valid.groupby('userId')\
                       .agg(count('artist').alias('action'),max('active_hour').alias('active_hour'))\
                       .withColumn('Action_ph', round(col('action')/col('active_hour'),8))\
                       .drop('action','active_hour')
new_df = new_df.join(Action_ph_df, on=['userId'], how='left')
# Session_ph_df
Session_ph_df = df_valid.groupby('userId')\
                        .agg(countDistinct('sessionId').alias('sessionId'),max('active_hour').alias('active_hour'))\
                        .withColumn('Session_ph', round(col('sessionId')/col('active_hour'),8))\
                        .drop('sessionId','active_hour')
new_df = new_df.join(Session_ph_df, on=['userId'], how='left')
# all item in page column
name_dict = {'NextSong':'Nextsong_ph',
             'Downgrade':'Downgrade_ph',
             'Upgrade':'Upgrade_ph',
             'Thumbs Down':'ThumbDown_ph',
             'Thumbs Up':'ThumbUp_ph',
             'Home':'Home_ph',
             'Roll Advert':'Adv_ph',
             'Add to Playlist':'Addtolist_ph',
             'Settings':'Set_ph',
             'Add Friend':'Addfriend_ph',
             'Error':'Error_ph',
             'Help':'Help_ph'}
for item in name_dict.keys():
    temp_df = df_valid.where(df_valid.page == item)\
                              .groupby('userId')\
                              .agg(count('page').alias('page'),max('active_hour').alias('active_hour'))\
                              .withColumn(name_dict[item], round(col('page')/col('active_hour'),8))\
                              .drop('page','active_hour')
    new_df = new_df.join(temp_df, on=['userId'], how='left')
    
    
# Ratio of Action Features 
# Action_toSession
Action_toSession_df = df_valid.groupby('userId','sessionId')\
            .agg({'page':'count'})\
            .groupby('userId')\
            .agg(round(avg('count(page)'),8).alias('Action_toSession'))
new_df = new_df.join(Action_toSession_df, on=['userId'], how='left')
# other page items to action ratio
name_dict2 = {'NextSong':'Nextsong_toAct',
             'Downgrade':'Downgrade_toAct',
             'Upgrade':'Upgrade_toAct',
             'Thumbs Down':'ThumbDown_toAct',
             'Thumbs Up':'ThumbUp_toAct',
             'Home':'Home_toAct',
             'Roll Advert':'Adv_toAct',
             'Add to Playlist':'Addtolist_toAct',
             'Settings':'Set_toAct',
             'Add Friend':'Addfriend_toAct',
             'Error':'Error_toAct',
             'Help':'Help_toAct'}
for item in name_dict2.keys():
    temp_df = df_valid\
        .where(df_valid.page == item)\
        .groupby('userId')\
        .agg(count('page').alias('page'),max('actions').alias('actions'))\
        .withColumn(name_dict2[item], round(col('page')/col('actions'),8))\
        .drop('page','actions')
    new_df = new_df.join(temp_df, on=['userId'], how='left')

# Time trend Features


# Action_trend
Action_last_14day = df_valid\
        .where(df_valid.hour_till_last <= 336)\
        .groupby(col('userId'))\
        .agg(count('page').alias('last14'))
Action_first_14day = df_valid\
        .where(df_valid.passed_hour <= 336)\
        .groupby(col('userId'))\
        .agg(count('page').alias('first14'))
Action_trend_df = Action_last_14day\
        .join(Action_first_14day, on = ['userId'], how = 'inner')\
        .withColumn('Action_trend',round(col('last14')/(col('first14')+0.01),8))\
        .drop('last14','first14')
# Nextsong_trend
Nextsong_last_14day = df_valid\
        .where((df_valid.hour_till_last <= 336) & (df_valid.page == 'NextSong'))\
        .groupby(col('userId'))\
        .agg(count('page').alias('last14'))
Nextsong_first_14day = df_valid\
        .where((df_valid.passed_hour <= 336) & (df_valid.page == 'NextSong'))\
        .groupby(col('userId'))\
        .agg(count('page').alias('first14'))
Nextsong_trend_df = Nextsong_last_14day\
        .join(Nextsong_first_14day, on = ['userId'], how = 'inner')\
        .withColumn('Nextsong_trend',round(col('last14')/(col('first14')+0.01),8))\
        .drop('last14','first14')
new_df = new_df\
        .join(Action_trend_df, on=['userId'], how='left')\
        .join(Nextsong_trend_df, on=['userId'], how='left')    


# Behavioral Features
# Nextsong_betweenHome show the average nextsong palyed between two home pages for each user
function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())
Nextsong_betweenHome_df = df_valid\
        .filter((df_valid.page == 'NextSong') | (df_valid.page == 'Home')) \
        .select('userId', 'page', 'ts','Churn') \
        .withColumn('homevisit', function(col('page')))\
        .withColumn('period', sum('homevisit')\
        .over(windowval_desc))\
        .filter((col('page') == 'NextSong'))\
        .groupBy('userId', 'period')\
        .agg({'period':'count'})\
        .groupBy('userId')\
        .agg(round(avg('count(period)'),8).alias('Nextsong_betweenHome'))
new_df = new_df.join(Nextsong_betweenHome_df, on=['userId'], how='left')




new_df.show(5)



+------+-----+------+-----+-----------+----------+-----------+------------+----------+------------+----------+----------+----------+------------+----------+------------+----------+----------+----------------+--------------+---------------+-------------+---------------+-------------+----------+----------+---------------+----------+---------------+-----------+----------+------------+--------------+--------------------+
|userId|Churn|gender|level|  Action_ph|Session_ph|Nextsong_ph|Downgrade_ph|Upgrade_ph|ThumbDown_ph|ThumbUp_ph|   Home_ph|    Adv_ph|Addtolist_ph|    Set_ph|Addfriend_ph|  Error_ph|   Help_ph|Action_toSession|Nextsong_toAct|Downgrade_toAct|Upgrade_toAct|ThumbDown_toAct|ThumbUp_toAct|Home_toAct| Adv_toAct|Addtolist_toAct| Set_toAct|Addfriend_toAct|Error_toAct|Help_toAct|Action_trend|Nextsong_trend|Nextsong_betweenHome|
+------+-----+------+-----+-----------+----------+-----------+------------+----------+------------+----------+----------+----------+------------+----------+--

In [7]:
spark.sparkContext.getConf().getAll()

[('spark.master', 'local'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1589690666485'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.memory', '15g'),
 ('spark.driver.port', '60534'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '192.168.31.21'),
 ('spark.app.name', 'Project'),
 ('spark.ui.showConsoleProgress', 'true')]

In [6]:
a = df_valid.where(df_valid.userId == 125).toPandas()
a.page

0                      NextSong
1                      NextSong
2                      NextSong
3                      NextSong
4                      NextSong
5                      NextSong
6                      NextSong
7                      NextSong
8                   Roll Advert
9                        Cancel
10    Cancellation Confirmation
Name: page, dtype: object

In [1]:
# import libraries
from pyspark.sql import SparkSession

from pyspark.sql.functions import udf, desc, asc, sum, max, min, avg, countDistinct,\
            row_number, col, expr, round,first, count
from pyspark.sql.types import StringType, IntegerType, LongType
from pyspark.sql import Window

import datetime
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt


spark = SparkSession.builder \
    .master("local") \
    .config("spark.driver.memory", "15g") \
    .appName("Project") \
    .getOrCreate()



In [4]:
# Clean and wrangling dataframe
windowval = Window.partitionBy('userId')\
                  .orderBy('ts')\
                  .rangeBetween(Window.unboundedPreceding,0)

windowval_desc = Window.partitionBy('userId')\
                  .orderBy(desc('ts'))\
                  .rangeBetween(Window.unboundedPreceding,0)

windowval_desc_nobound = Window.partitionBy('userId')\
                               .orderBy(desc('ts'))

flag_cancel_confirm_event = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0,IntegerType())

tem_df = df_valid.groupby('userId').agg(count('page').alias('actions'))

# built two new columns, to save the first and the last timestamp
df_valid = df_valid.withColumn('first_ts', first('ts').over(windowval))\
                   .withColumn('last_ts', first('ts').over(windowval_desc))

# built a column to save the active hours between first_ts and last_ts
df_valid = df_valid.withColumn('active_time',col('last_ts').cast('long') - col('first_ts').cast('long'))\
                   .withColumn('active_hour',round((col('active_time')/3600000),2))\
                   .drop('active_time')

# built a columns to save the time passed from first_ts to this record
df_valid = df_valid.withColumn('passed_time',col('ts').cast('long') - col('first_ts').cast('long'))\
                   .withColumn('passed_hour',round((col('passed_time')/3600000),2))\
                   .drop('passed_time')

# built a columns to save the time passed from this record to the last_ts
df_valid = df_valid.withColumn('time_till_last', col('last_ts').cast('long')- col('ts').cast('long'))\
                   .withColumn('hour_till_last',round((col('time_till_last')/3600000),2))\
                   .drop('time_till_last')

# add a column to save the time order of all records for each user
df_valid = df_valid.withColumn('latest_level', row_number().over(windowval_desc_nobound))

# add Churn feature, if the user churned marked all records with 1, else 0
df_valid = df_valid.withColumn('cancle_confirmed', flag_cancel_confirm_event('page'))\
                   .withColumn('Churn', sum('cancle_confirmed').over(windowval))\
                   .drop('cancle_confirmed')

# add a column to save the all action counts for each user
df_valid = df_valid.join(tem_df, on = ['userId'], how = 'left' )

df_valid.show(1)

278154

In [5]:
# extract the necessary features to a new dataframe

# userId + categorical features
new_df = df_valid.where(df_valid.latest_level == 1)\
                 .select('userId','Churn','gender','level')

new_df.show(1)

+------+-----+------+-----+
|userId|Churn|gender|level|
+------+-----+------+-----+
|100010|    0|     F| free|
+------+-----+------+-----+
only showing top 1 row



In [6]:
# Numerical pre hour features

# Action_ph_df
Action_ph_df = df_valid.groupby('userId')\
                       .agg(count('artist').alias('action'),max('active_hour').alias('active_hour'))\
                       .withColumn('Action_ph', round(col('action')/col('active_hour'),8))\
                       .drop('action','active_hour')
new_df = new_df.join(Action_ph_df, on=['userId'], how='left')

# Session_ph_df
Session_ph_df = df_valid.groupby('userId')\
                        .agg(countDistinct('sessionId').alias('sessionId'),max('active_hour').alias('active_hour'))\
                        .withColumn('Session_ph', round(col('sessionId')/col('active_hour'),8))\
                        .drop('sessionId','active_hour')
new_df = new_df.join(Session_ph_df, on=['userId'], how='left')

# all item in page column
name_dict = {'NextSong':'Nextsong_ph',
             'Downgrade':'Downgrade_ph',
             'Upgrade':'Upgrade_ph',
             'Thumbs Down':'ThumbDown_ph',
             'Thumbs Up':'ThumbUp_ph',
             'Home':'Home_ph',
             'Roll Advert':'Adv_ph',
             'Add to Playlist':'Addtolist_ph',
             'Settings':'Set_ph',
             'Add Friend':'Addfriend_ph',
             'Error':'Error_ph',
             'Help':'Help_ph'}

for item in name_dict.keys():
    temp_df = df_valid.where(df_valid.page == item)\
                              .groupby('userId')\
                              .agg(count('page').alias('page'),max('active_hour').alias('active_hour'))\
                              .withColumn(name_dict[item], round(col('page')/col('active_hour'),8))\
                              .drop('page','active_hour')
    new_df = new_df.join(temp_df, on=['userId'], how='left')

new_df.show(1)


+----------+----------+-----------+------------+----------+------------+----------+----------+----------+------------+------+------------+--------+----------+
| Action_ph|Session_ph|Nextsong_ph|Downgrade_ph|Upgrade_ph|ThumbDown_ph|ThumbUp_ph|   Home_ph|    Adv_ph|Addtolist_ph|Set_ph|Addfriend_ph|Error_ph|   Help_ph|
+----------+----------+-----------+------------+----------+------------+----------+----------+----------+------------+------+------------+--------+----------+
|0.25913327|0.00659612| 0.25913327|        null|0.00188461|  0.00471151|0.01601915|0.01036533|0.04899975|  0.00659612|  null|  0.00376921|    null|0.00188461|
+----------+----------+-----------+------------+----------+------------+----------+----------+----------+------------+------+------------+--------+----------+
only showing top 1 row



In [7]:
# Ratio of Action Features 

# Action_toSession
Action_toSession_df = df_valid.groupby('userId','sessionId')\
            .agg({'page':'count'})\
            .groupby('userId')\
            .agg(round(avg('count(page)'),8).alias('Action_toSession'))

new_df = new_df.join(Action_toSession_df, on=['userId'], how='left')

# other page items to action ratio
name_dict2 = {'NextSong':'Nextsong_toAct',
             'Downgrade':'Downgrade_toAct',
             'Upgrade':'Upgrade_toAct',
             'Thumbs Down':'ThumbDown_toAct',
             'Thumbs Up':'ThumbUp_toAct',
             'Home':'Home_toAct',
             'Roll Advert':'Adv_toAct',
             'Add to Playlist':'Addtolist_toAct',
             'Settings':'Set_toAct',
             'Add Friend':'Addfriend_toAct',
             'Error':'Error_toAct',
             'Help':'Help_toAct'}

for item in name_dict2.keys():
    temp_df = df_valid\
        .where(df_valid.page == item)\
        .groupby('userId')\
        .agg(count('page').alias('page'),max('actions').alias('actions'))\
        .withColumn(name_dict2[item], round(col('page')/col('actions'),8))\
        .drop('page','actions')
    new_df = new_df.join(temp_df, on=['userId'], how='left')


new_df.show(1)


In [10]:
# Time trend Features

# Action_trend
Action_last_14day = df_valid\
        .where(df_valid.hour_till_last <= 336)\
        .groupby(col('userId'))\
        .agg(count('page').alias('last14'))

Action_first_14day = df_valid\
        .where(df_valid.passed_hour <= 336)\
        .groupby(col('userId'))\
        .agg(count('page').alias('first14'))

Action_trend_df = Action_last_14day\
        .join(Action_first_14day, on = ['userId'], how = 'inner')\
        .withColumn('Action_trend',round(col('last14')/(col('first14')+0.01),8))\
        .drop('last14','first14')

# Nextsong_trend
Nextsong_last_14day = df_valid\
        .where((df_valid.hour_till_last <= 336) & (df_valid.page == 'NextSong'))\
        .groupby(col('userId'))\
        .agg(count('page').alias('last14'))

Nextsong_first_14day = df_valid\
        .where((df_valid.passed_hour <= 336) & (df_valid.page == 'NextSong'))\
        .groupby(col('userId'))\
        .agg(count('page').alias('first14'))

Nextsong_trend_df = Nextsong_last_14day\
        .join(Nextsong_first_14day, on = ['userId'], how = 'inner')\
        .withColumn('Nextsong_trend',round(col('last14')/(col('first14')+0.01),8))\
        .drop('last14','first14')


new_df = new_df\
        .join(Action_trend_df, on=['userId'], how='left')\
        .join(Nextsong_trend_df, on=['userId'], how='left')

new_df.show(1)

In [31]:
# Behavioral Features

# Nextsong_betweenHome show the average nextsong palyed between two home pages for each user
function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

Nextsong_betweenHome_df = df_valid\
        .filter((df_valid.page == 'NextSong') | (df_valid.page == 'Home')) \
        .select('userId', 'page', 'ts','Churn') \
        .withColumn('homevisit', function(col('page')))\
        .withColumn('period', sum('homevisit')\
        .over(windowval_desc))\
        .filter((col('page') == 'NextSong'))\
        .groupBy('userId', 'period')\
        .agg({'period':'count'})\
        .groupBy('userId')\
        .agg(round(avg('count(period)'),8).alias('Nextsong_betweenHome'))


new_df = new_df.join(Nextsong_betweenHome_df, on=['userId'], how='left')

new_df.show(5)


peak memory: 107.30 MiB, increment: 0.19 MiB
+------+-----+------+-----+-----------+----------+-----------+------------+----------+------------+----------+----------+----------+------------+----------+------------+----------+----------+----------------+--------------+---------------+-------------+---------------+-------------+---------------+---------------+--------------+------------+--------------+--------------------+--------------------+--------------------+
|userId|Churn|gender|level|  Action_ph|Session_ph|Nextsong_ph|Downgrade_ph|Upgrade_ph|ThumbDown_ph|ThumbUp_ph|   Home_ph|    Adv_ph|Addtolist_ph|    Set_ph|Addfriend_ph|  Error_ph|   Help_ph|Action_toSession|Nextsong_toAct|Downgrade_toAct|Upgrade_toAct|ThumbDown_toAct|ThumbUp_toAct|Addtolist_toAct|Addfriend_toAct|Nextsong_trend|Action_trend|Nextsong_trend|Nextsong_betweenHome|Nextsong_betweenHome|Nextsong_betweenHome|
+------+-----+------+-----+-----------+----------+-----------+------------+----------+------------+----------+-

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.

In [12]:
# Clean and wrangling dataframe
windowval = Window.partitionBy('userId')\
                  .orderBy('ts')\
                  .rangeBetween(Window.unboundedPreceding,0)

windowval_desc = Window.partitionBy('userId')\
                  .orderBy(desc('ts'))\
                  .rangeBetween(Window.unboundedPreceding,0)

# built two new columns, to save the first and the last timestamp
df_valid = df_valid.withColumn('first_ts', first('ts').over(windowval))\
                   .withColumn('last_ts', first('ts').over(windowval_desc))

# built a column to save the active hours between first_ts and last_ts
df_valid = df_valid.withColumn('active_time',col('last_ts').cast('long') - col('first_ts').cast('long'))\
                   .withColumn('active_hour',round((col('active_time')/3600000),2))\
                   .drop('active_time')

# built a columns to save the time passed from first_ts to t between 






.withColumn('active_time',col('last_ts').cast('long') - col('registration').cast('long'))\
                   .withColumn('active_hour',round((col('active_time')/3600000),2))\


# add a columns showing hours between first active
df_valid = df_valid.withColumn('passed_time',col('ts').cast('long') - col('registration').cast('long'))\
                   .withColumn('passed_hour',round((col('passed_time')/3600000),2))\
                   .drop('passed_time')\








# add two columns, one is hours between registration and the last active
# one is the hours after registration(to get the first 14 days)
df_valid = df_valid.withColumn('last_ts', first('ts').over(windowval))\
                   .withColumn('active_time',col('last_ts').cast('long') - col('registration').cast('long'))\
                   .withColumn('active_hour',round((col('active_time')/3600000),2))\
                   .drop('active_time')\
                   .withColumn('passed_time',col('ts').cast('long') - col('registration').cast('long'))\
                   .withColumn('passed_hour',round((col('passed_time')/3600000),2))\
                   .drop('passed_time')\
                   .withColumn('time_till_last', col('last_ts').cast('long')- col('ts').cast('long'))\
                   .withColumn('hour_till_last',round((col('time_till_last')/3600000),2))\
                   .drop('time_till_last')\
                   .drop('last_ts')

# add a column to save the time order of each record groupby each user
windowval2 = Window.partitionBy('userId')\
                   .orderBy(desc('ts'))

df_valid = df_valid.withColumn('latest_level', row_number().over(windowval2))

flag_cancel_confirm_event = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0,IntegerType())
# add Churn feature, if the user churned marked all records with 1, else 0
df_valid = df_valid.withColumn('cancle_confirmed', flag_cancel_confirm_event('page'))\
                   .withColumn('Churn', sum('cancle_confirmed').over(windowval))\
                   .drop('cancle_confirmed')

# add a column to save the action counts
tem_df = df_valid.groupby('userId').agg(count('page').alias('actions'))
df_valid = df_valid.join(tem_df, on = ['userId'], how = 'left' )

df_valid.show(1)

+------+------+---------+---------+------+-------------+---------+------+-----+--------------------+------+------+-------------+---------+----+------+-------------+--------------------+-----------+-----------+--------------+------------+-----+-------+
|userId|artist|     auth|firstName|gender|itemInSession| lastName|length|level|            location|method|  page| registration|sessionId|song|status|           ts|           userAgent|active_hour|passed_hour|hour_till_last|latest_level|Churn|actions|
+------+------+---------+---------+------+-------------+---------+------+-----+--------------------+------+------+-------------+---------+----+------+-------------+--------------------+-----------+-----------+--------------+------------+-----+-------+
|100010|  null|Logged In| Darianna|     F|           34|Carpenter|  null| free|Bridgeport-Stamfo...|   PUT|Logout|1538016340000|      187|null|   307|1542823952000|"Mozilla/5.0 (iPh...|    1335.45|    1335.45|           0.0|           1|    0| 