# Sparkify Project Workspace

## Introduction

This is the final project of my Udacity Nanodegree as Data Scientist. Goal of the project is to predict user churn of an online music streaming service using machine learning algorithms. 

Basis of the project is 12 GB dataset containing all information about user interactions with the online streaming service. The data is stored in a AWS Simple Storage Service (S3) bucket in JSON format.
Datasets of such a large scale are challenging to process on a single computer and can therefore be referred to as big data. 

Apache Spark is a popular tool for large scale data processing and will be used to work with the full dataset. It allows to efficiently spread data and computations across a network of distributed computers, called clusters. Each cluster has nodes (computers) that do the computations in parallel.

It is best practice to explore the data using a smaller subset to reduce necessary computation. This workspace contains a tiny subset (128MB) of the full dataset available. Functions of the Spark SQL library will serve to find features in the data via descriptive statistics. After finding those features in the dataset needed to predict user churn, the next step is to create a supervised machine learning model with Spark ML first based on the small subset and later the full dataset. 

The full dataset will be processed in Amazon Web Services (AWS) with an Elastic Map Reduce (EMR) cluster of 3 m5.xlarge machines. 

The project is structured following CRISP-DM, the Cross Industry Process for Data Mining. These are the steps:

- [1. Business Understanding](#BU)
- [2. Data Understanding](#DU)
- [3. Prepare Data](#prepare)
- [4. Data Modeling](#modeling)
- [5. Evaluate the Results](#evaluate)
- [6. Deploy](#deploy)

### Load libraries, create Spark session and import data 

In [1]:
# import libraries for Spark Session
import pyspark
from pyspark import SparkConf

# import libraries for sql actions
from pyspark.sql import SparkSession
# from pyspark.sql.functions import isnan, count, when, col, asc, desc, udf, col, sort_array, avg, countDistinct, datediff
#from pyspark.sql.functions import sum as Fsum
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, StringType, IntegerType

# other libraries
import datetime
import re
import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns

# Spark ML libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, VectorAssembler, Normalizer, StandardScaler, StringIndexer, Normalizer, PCA, MinMaxScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

In [3]:
# view information about spark configuration
spark.sparkContext.getConf().getAll()

[('spark.app.id', 'local-1610457312368'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.name', 'Sparkify'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.driver.port', '64232'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '192.168.178.21'),
 ('spark.ui.showConsoleProgress', 'true')]

In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`.

In [4]:
# define path and load data
path = 'data/mini_sparkify_event_data.json'
user_log = spark.read.json(path)

<a id='BU'></a>
## 1. Business Understanding

Sparkify is a (fictional) music streaming service with a business case similar to Spotify. The service can be used on two levels, free tier or premium tier. Both levels of service generate revenue for Sparkify. The free tier service is financed by advertisement in between the songs, the premium tier consists of a monthly or yearly subscription fee to have an advertisement free experience. 
At any moment the user can decide to downgrade from paid/premium to free, upgrade from free to paid/premium or to cancel from the service completely.
    
Customer churn is when a customer unsubscribes from a service, ceases to purchase a product or stops engaging with a service [[1](https://www.retentionscience.com/blog/why-measuring-your-customer-churn-rate-increases-revenue/)]. In the case of the music streaming service of this project churn will be defined as when a user lands on the 'Cancellation Confirmation' page, which can happen for both paid and free tier users.

Usually it is more expensive for a business to acquire new customers than retaining existing customers. Statistics vary from industry to industry, but research indicates that it may cost up to 5 times more to acquire a new customer than to keep an existing one [[2](https://www.forbes.com/sites/jiawertz/2018/09/12/dont-spend-5-times-more-attracting-new-customers-nurture-the-existing-ones/?sh=36d7a6225a8e)].

To prevent churn candidates special discounts or other costly measures are offered to customers. These measures usually lower the revenue per customer. The goal is to identify users who are about to churn ahead of time and only target them with marketing campaigns. 

Churn prediction is an important classification use case for streaming services such as Netflix, Spotify or Apple Music. Companies that can predict customers who are about to churn ahead of time can implement a more effective customer retention strategy.

<a id='DU'></a>
## 2. Data Understanding

First step is to understand the length of the dataset and the available columns of the data subset (128MB):

In [None]:
# check column names and datatypes of object
user_log.printSchema()

In [None]:
# look at first row
user_log.head(n=1)

In [None]:
# check length of dataset
print('The dataset contains {} rows'.format(user_log.count()))

#### Column 'artist'

In [None]:
# count unique artists in dataset
unique_artists = user_log.select('artist').dropDuplicates().count()

print('There are {} unique artists in the dataset.'.format(unique_artists))

In [None]:
# list artists by occurance
user_log.select(['artist', 'song']).groupby('artist').count().orderBy('count', ascending=False).show(n=8)

The most common value for the artist column in the dataset is 'null'. If the rows containing 'null' as value are valid has to be investigated further.

#### Column 'auth'

In [None]:
# list unique values in authentication column by occurances
user_log.select(['auth', 'userId']).groupby('auth').count().orderBy('count', ascending=False).show()

In [None]:
# look into sample rows of authentication status guest
user_log.filter('auth = "Guest"').select(
    'artist', 'auth', 'firstName', 'method', 'page', 'status', 'userId', 'sessionId').show(n=5)

It is possible to interact with Sparkify as a guest. These rows can be dropped, as it is not possible to assign the user interactions of guests to a userId.

In [None]:
# look into sample rows of authentication status guest
user_log.filter('auth = "Cancelled"').select(
    'artist', 'auth', 'firstName', 'method', 'page', 'status', 'userId', 'sessionId').show(n=5)

The rows with authentication cancelled seem to be valid. It appears as if the authentification status 'cancelled' appears, when a user lands on the 'Cancellation Confirmation' page.

#### Columns 'gender' and 'userId'

In [None]:
# show unique possible values of column 'gender' and occurances
user_log.select(['gender', 'userId']).groupby('gender').count().orderBy('count', ascending=False).show()

The table above shows the count of female/male/null entried in ther gender column for all rows. To see how many users are actually of which gender this table has to be filtered to unique userIds. This will help to answer if the dataset is balanced regarding gender.

In [None]:
user_log.select(['gender', 'userId']) \
    .groupby('gender') \
    .agg(F.countDistinct('userId')) \
    .orderBy('count(userId)').show()


There are 226 unique userIds (225 valid) in the dataset. 
Of those users 121 are male and 104 are female users. 
There are 0 userId's where the gender is not known
There is only a slight imbalance regarding gender of users. 

#### Columns 'itemInSession', 'sessionId', 'song', 'ts'  and 'length'

In [None]:
# look at behaviour of values in itemInSession for one user as example
user_log.select(['itemInSession', 'userId', 'sessionId', 'ts', 'page', 'song', 'firstName', 'length']) \
    .filter(user_log.userId == "18") \
    .show(n=8)

The values in column 'itemInSession' counts the interactions which happend for one user during the same sessionId. The value in column 'length' describes the duration of time a song was played and therefore is null for all  page events other than 'NextSong'. The column 'ts' stands for timestamp.

#### Column 'level'

In [None]:
# show unique possible values of column 'level'
user_log.select(['level', 'userId']) \
    .dropDuplicates() \
    .groupby('level').count() \
    .orderBy('count', ascending=False).show()

There are 196 unique userIds of free tier and 166 of paid tier users.

#### Column 'location'

In [None]:
# show unique possible values of column 'location'
user_log.select('userId', 'location') \
    .dropDuplicates() \
    .sort('location').show(n=8, truncate=False)

In [None]:
# check the highest count of unique locations per userId
user_log.select(['userId', 'location']) \
    .groupby('userId').agg(F.countDistinct('location')) \
    .orderBy('count(location)', ascending=False).show(n=3)

In the dataset users always access the streaming service from the same location.

#### Column 'method'

In [None]:
# show unique possible values of column 'method'
user_log.select(['method', 'userId']) \
    .groupby('method').count() \
    .orderBy('count', ascending=False).show()

#### Column 'page'

In [None]:
# show unique possible values of column 'page'
page_events = user_log.select(['page', 'userId']) \
    .groupby('page').count() \
    .withColumnRenamed('count', 'page_event_count') \
    .orderBy('count', ascending=False)

page_events.show(n=30)

In [None]:
# convert pyspark df to pandas df
page_events_pd = page_events.toPandas()

# plot graph for distribution
barplot = sns.barplot(x = 'page_event_count', y = 'page', data=page_events_pd, palette = 'dark')
plt.title('State Distribution of Users', size=20)
plt.show()

The most occured page event is 'NextSong', which is the main function of the streaming service. It seems like the 'NextSong' page get loaded automatically once a song ends. The Home page is the page the user enters when starting a streaming session and the second most common called page event. 

#### Column 'registration'

In [None]:
# look at behaviour of values in the column registration for one user as example
user_log.select(['userId', 'registration']) \
    .groupby('userId') \
    .agg(F.countDistinct('registration')) \
    .orderBy('count(registration)', ascending=False) \
    .show(n=3)

When ordering the amount of registrations per user descending from highest amount per user, it is visible that ther is no userId with more than one registration. Therefore the conclusion is that every userId has exactly one registration.

#### Column 'userAgent'

In [None]:
# show unique possible values of column 'userAgent'
user_log.select('userAgent').dropDuplicates().show(n=5, truncate=False)

The column user agent describes the software used to access the streaming service. Examples are Mac OS X with Safari, Windows or Ubuntu.

#### column ts

In [None]:
udf_convert_ts = F.udf(lambda timestamp: datetime.datetime.fromtimestamp(
    timestamp / 1000.0).strftime("%Y-%m-%d %H:%M:%S"))

min_ts = user_log.agg({'ts':'min'}) 
max_ts = user_log.agg({'ts':'max'})

min_ts.withColumn('first_date', udf_convert_ts('min(ts)')).show()
max_ts.withColumn('last_date', udf_convert_ts('max(ts)')).show()


The observed time frame is from 2018-10-01 to 2018-12-03.

<a id='prepare'></a>
## 3. Prepare Data

After getting a Data Understanding, the next step of the CRISP-DM is to prepare the dataset. 
The first step of data preparation in this project is cleaning the data from invalid or missing data - for example, records without userids or sessionids. 
After that an exploratory data analysis will be conducted to find possible features for the customer churn prediction.


### 3.1 Clean Dataset


In [None]:
# change format of timestamp to human readable

udf_convert_ts_to_datetime = F.udf(lambda timestamp: datetime.datetime.fromtimestamp(
    timestamp / 1000.0).strftime('%Y-%m-%d %H:%M:%S'))

udf_convert_ts_to_date = F.udf(lambda timestamp: datetime.datetime.fromtimestamp(
    timestamp / 1000.0).strftime('%Y-%m-%d'))

user_log = user_log.withColumn('datetime', udf_convert_ts_to_datetime(user_log.ts)) \
    .withColumn("date", udf_convert_ts_to_date(user_log.ts))


Drop any row/record where there is no userId or sessionId.

In [None]:
print('The dataset contains {} rows before cleaning'.format(user_log.count()))

# drop any record with NANs in user ID OR session ID and save to new object
user_log_valid = user_log.dropna(how = 'any', subset = ['userId', 'sessionId'])

print('The dataset contains {} rows after dropping any NA value in userId and sessionId'.format(
    user_log_valid.count()))

It turns out there are no missing values in the userId or sessionId columns.

In [None]:
# show unique user IDs
user_log.select('userId').dropDuplicates().sort('userId').show(n=5)

There are userId values that are empty strings. These empty userIds appear for example when a user enters the streaming service without logging in.

In [None]:
# drop empty userIds
user_log_valid = user_log_valid.filter(user_log_valid['userId'] != '')

print('The dataset contains {} rows after filtering out empty userIds'.format(
    user_log_valid.count()))

In [None]:
# check if guest entries are dropped all
user_log_valid.select(['auth', 'userId']).groupby('auth').count().orderBy('count', ascending=False).show()

In [None]:
# calculate amount of users dropped
amount_users_dropped = user_log.count() - user_log_valid.count()
amount_logged_out = user_log.select('auth').filter('auth = "Logged Out"').count()
amount_guest = user_log.select('auth').filter('auth = "Guest"').count()
amount_rows = user_log_valid.count()

print('{} Rows without userId were dropped. This were the users with authentification status "Logged Out" ({}) \
and "Guest" ({}). Resulting into {} rows in the cleaned dataset.'.format(
    amount_users_dropped, amount_logged_out, amount_guest, amount_rows))

### 3.2 Exploratory Data Analysis

Goal of the following exploratory data analysis is to observe differences in the behavior of customers who stayed vs customers who churned. 
One way is to explore aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

#### Define Churn

First a column 'churned' will be created to use as label to differentiate between customers who churned and those who stayed with the service. 
This column will be used later by the supervised machine learning model as label to train.
The 'Cancellation Confirmation' events serve to define the exact moment of churn, which happens for both paid and free users. 

In [None]:
# show records where Cancellation was confirmed (churn)
user_log_valid.filter("page = 'Cancellation Confirmation'") \
    .select('auth', 'firstName', 'gender', 'itemInSession', 'level', 'location', 'userId') \
    .show(n=5)

In [None]:
# Find users who had event 'Cancellation Confirmation'
user_log_valid.select('userID').where(user_log.page == 'Cancellation Confirmation').show(n=2)

In [None]:
# inspect example funnel with event 'Cancellation Confirmation' (without most common event NextSong event)
user_log_valid.select(['userId', 'level', 'sessionId', 'page']) \
    .where((user_log_valid.userId == '18') & (user_log_valid.page != 'NextSong')) \
    .tail(num=8)


In [None]:
# user defined function to flag the events page='Cancellation Confirmation' with extra column
flag_cancellation_event = F.udf(
    lambda cancellation_event: 1 if cancellation_event=='Cancellation Confirmation' else 0, IntegerType())

# create extra column 'Cancellation Confirmation'
user_log_valid = user_log_valid.withColumn('cancellation_event', flag_cancellation_event('page'))

In [None]:
# check if column 'cancellation_event' is created correctly
user_log_valid.select(['userId', 'level', 'sessionId', 'page', 'cancellation_event']) \
    .where((user_log_valid.userId == '18') & (user_log_valid.page != 'NextSong')) \
    .tail(num=3)

The new column 'cancellation_event' was created to mark the exact event of cancellation confirmation. As the goal of this project is to predict users who eventually churn, it is important to find the features which describe best the differences between users who churn and those who not.
There is another column necessary to compare all user intereactions made by users who eventually churned to users who did not churn. The values of the new column 'churned users' are true if the user will eventually churn and false if not:

In [None]:
# create column with churned users = true
churned_users = user_log_valid.select('userId') \
    .filter(user_log.page == 'Cancellation Confirmation') \
    .dropDuplicates().collect()

userid_churn = []
for i in churned_users:
    userid_churn.append(i[0])
    
user_log_valid = user_log_valid \
    .withColumn('churned',user_log_valid['userId'].isin(userid_churn))

In [None]:
# check if column 'churned' is filled correctly
user_log_valid.select(['userId', 'level', 'page', 'cancellation_event', 'churned']) \
    .where((user_log_valid.userId == '18') & (user_log_valid.page != 'NextSong')) \
    .tail(num=3)

With the new column 'churned' it is possible to check, if the dataset it balanced regarding the number of users who eventually churn and those who stay. The churn rate for ther observed period can be calculated.

In [None]:
# check if dataset is balanced regarding churned users
users = user_log_valid.select('userId').dropDuplicates().count()
churned_users = user_log_valid.filter('churned = True').select('userId').dropDuplicates().count()
stayed_users = user_log_valid.filter('churned = False').select('userId').dropDuplicates().count()

print('Of total {} users, {} users stayed with the streaming service during the observed time and \
{} users eventually churned (churn rate: {:2.2f}% ).'.format(users, stayed_users, churned_users, 
                                                         churned_users/users*100))

There is a imbalance in the dataset regarding the amount of users who churned versus those who stayed. Next step is to check how this imbalance in amount of users scales on the amount of interactions.

In [None]:
user_interactions = user_log_valid.count()
churned_users_interactions = user_log_valid.filter('churned = True').count()
stayed_users_interactions = user_log_valid.filter('churned = False').count()

print('Of total {} user interactions, {} user interactions were recorderd of useres who stayed with the \
streaming service during the observed timer and {} user interactions were recorded of users who eventually \
churned ({:2.2f}% of user interaction are by churned customers).'.format(user_interactions, churned_users_interactions, stayed_users_interactions,
                                    churned_users_interactions/user_interactions*100))

The amount of data available regarding interactions to analyze the difference in behaviour for users who stayed versus users who churned is clearly imbalanced. 

Imbalance in the training data can lead to naive behaviour in the prediction of the supervised machine learning model. With 76.89% of users not churning a prediction accuracy of 76.89% can be achieved by simply always predicting 'no churn' [3](https://towardsdatascience.com/handling-imbalanced-datasets-in-machine-learning-7a0e84220f28).

There are different ways to handle imbalanced data before feeding it to machine learning algorithms. 
One way would be to manipulate the input data by eiter undersampling data of loyal users, oversampling the data of churned users or generating synthetic data.
In this project the way to handle the imbalance in data of interaction with the service by churned users will be creating additional features.

#### Calculate days since registration

In [None]:
# calculate total number of days since registration
user_log_valid = user_log_valid.withColumn('days_since_registration', F.ceil(
    (user_log_valid.ts - user_log_valid.registration)/(1000*60*60*24)))

#### Calculating Statistics by Hour

In the next few cells will be calculated if there amount of songs played by users differs during the hours of a day:

In [None]:
# create user defined function to convert the ts column into hour format
get_hour = F.udf(lambda timestamp: datetime.datetime.fromtimestamp(timestamp / 1000.0). hour)

# create new column "hour" by applying udf get_hour
user_log_valid = user_log_valid.withColumn('hour', get_hour(user_log_valid.ts))

# take only amount of action "NextSong" and group it by hour of day
songs_in_hour = user_log_valid.filter(user_log_valid.page == 'NextSong').groupby(
    'hour').count().orderBy(user_log_valid.hour.cast('float'))

In [None]:
# convert spark object ot pandas dataframe
songs_in_hour_pd = songs_in_hour.toPandas()
songs_in_hour_pd.hour = pd.to_numeric(songs_in_hour_pd.hour)

# plot the amount of songs played over hours of day
plt.scatter(songs_in_hour_pd['hour'], songs_in_hour_pd['count'])
plt.xlim(-1, 24);
plt.ylim(0, 1.2 * max(songs_in_hour_pd['count']))
plt.xlabel('Hour')
plt.ylabel('Songs played');

#### Total Amount of Page Events per UserId

How many page events happen per userId for the observed time period in total comparing churned with loyal users.

In [None]:
# total amount of page events per userId
page_event_per_user = user_log_valid.select('page','userId', 'churned') \
    .groupby('userId', 'churned').count() \
    .withColumnRenamed('count', 'total_page_events') \

page_event_per_user.orderBy('total_page_events', ascending=False).show(n=5)

In [None]:
# mean of page events per user
page_event_per_user.groupby('churned').mean().show()

In [None]:
# convert spark object to pandas dataframe
page_event_per_user_pd = page_event_per_user.toPandas()

# Plot
ax = sns.violinplot(data=page_event_per_user_pd, y='churned', x='total_page_events', orient='h')
plt.xlabel('Page Events')
plt.ylabel('Customer Evantually Churned')
plt.title('Page Events per User Churned versus Loyal')
sns.despine(ax=ax);


Comparing users who churned with those who stayed, there is clearly a difference in mean page events per user. It seems plausible that, in average, users who do not churn might stay longer with the service and therfore as well have more page events. 
More interesting would be if users have more page events per time period.
In the following cells will be investigated if there is a difference in page events per session as well. 

#### Functions to create Features 

In [None]:
def create_statistic_per_userId_streamingTime(pyspark_df, event_type):
    '''
    Function to calculate amount of certain events per userId.
    
    args:
        pyspark_df - (pyspark dataframe) user_log of Sparkify
        event - (string) page event
    return:
    '''
    streaming_time_df = pyspark_df.filter(pyspark_df.page == 'NextSong') \
        .groupBy('userId', 'churned') \
        .agg((F.sum('length')/3600).alias('streamingTime_h'))
        #(F.count('length').alias('streamingTime_h')))
        #

    event_statistics_df = pyspark_df.filter(pyspark_df.page == event_type) \
        .groupBy('userId') \
        .agg(F.count('userId').alias(event_type))

    feature_df = streaming_time_df.join(event_statistics_df, on=['userId'], how='inner')

    feature_df = feature_df.withColumn(
        event_type+'_per_streamingTime', feature_df[event_type]/feature_df.streamingTime_h)

    return feature_df

In [None]:
def plot_events_per_userStreaming(feature_pyspark_df, event_type):
    '''
    Function to create boxplot of pyspark dataframe created with 'count_events_per_user' function.
    
    args:
        count_pysparkdf - (pyspark dataframe) created with 'count_events_per_user'
        event - (string) page event
    
    return:
    '''
    feature_pyspark_pd = feature_pyspark_df.toPandas()
    
    ax = sns.violinplot(data=feature_pyspark_pd, y='churned', x=event_type, orient='h')
    plt.xlabel(event_type + ' per UserId')
    plt.ylabel('Customers Churned')
    plt.title(event_type + ' per UserId per StreamingTime churned/not churned')
    sns.despine(ax=ax);
    
    return 

In [None]:
def plot_events_per_user(feature_pyspark_df, event_type):
    '''
    Function to create boxplot of pyspark dataframe created with 'count_events_per_user' function.
    
    args:
        count_pysparkdf - (pyspark dataframe) created with 'count_events_per_user'
        event - (string) page event
    
    return:
    '''
    feature_pyspark_pd = feature_pyspark_df.toPandas()
    
    ax = sns.violinplot(data=feature_pyspark_pd, y='churned', x=event_type, orient='h')
    plt.xlabel(event_type)
    plt.ylabel('Customers Churned')
    plt.title(event_type + ' per UserId churned/not churned')
    sns.despine(ax=ax);
    
    return 

In [None]:
def calculate_avg_events_lastXsessions(pyspark_df, amount_sessions):
    '''
    Function to calculate mean of all events per userId for last x sessionIds.
    
    args:
        pyspark_df - (pyspark dataframe) user_log of Sparkify
        amount_sessions - amount of last sessions from which mean is calculated
        event_type - (string) page event
    return:
    '''
    
    user_window = Window \
        .partitionBy('userID') \
        .orderBy(F.desc('max(ts)')) 

    pyspark_df = pyspark_df.select('userId', 'churned', 'sessionId', 'page', 'ts') \
        .groupBy('userId', 'churned', 'sessionId') \
        .agg(F.max('ts'), F.count('page')) \
        .withColumn('session_number', F.row_number().over(user_window))
    
    pyspark_df = pyspark_df.filter(pyspark_df.session_number <= amount_sessions) \
        .groupBy('userId', 'churned') \
        .agg(F.avg('count(page)')) \
        .withColumnRenamed('avg(count(page))', 'average_amount_songs_lastXsessions')
    
    return pyspark_df

def find_optimal_amount_sessions_for_max_diff(pySpark_df):
    '''
    Function to find amount of sessions with highest difference in mean.
    
    args:
        pyspark_df - (pyspark dataframe) user_log of Sparkify
        event_type - (string) page event
    return:
    '''
    for i in range(1,10):
        avg_amount_songs_lastXsessions = calculate_avg_events_lastXsessions(pySpark_df, i)

        diff_pd = avg_amount_songs_lastXsessions.groupBy('churned').mean() \
            .select('avg(average_amount_songs_lastXsessions)').toPandas()

        print('Difference in average events for last {} sessions is {}'.format(
            i, diff_pd.values[1][0] - diff_pd.values[0][0]))
          

In [None]:
def calculate_avg_page_event_lastXsessions(pyspark_df, amount_sessions, event_type):
    '''
    Function to calculate mean of certain events per userId for last x sessionIds.
    
    args:
        pyspark_df - (pyspark dataframe) user_log of Sparkify
        amount_sessions - amount of last sessions from which mean is calculated
        event_type - (string) page event
    return:
    '''
    
    user_window = Window \
        .partitionBy('userID') \
        .orderBy(F.desc('max(ts)')) 

    pyspark_df = pyspark_df.select('userId', 'churned', 'sessionId', 'page', 'ts') \
        .filter(pyspark_df.page == event_type) \
        .groupBy('userId', 'churned', 'sessionId') \
        .agg(F.max('ts'), F.count('page')) \
        .withColumn('session_number', F.row_number().over(user_window))
    
    pyspark_df = pyspark_df.filter(pyspark_df.session_number <= amount_sessions) \
        .groupBy('userId', 'churned') \
        .agg(F.avg('count(page)')) \
        .withColumnRenamed('avg(count(page))', 'average_amount_songs_lastXsessions')
    
    return pyspark_df


In [None]:
def find_amount_sessions_for_max_diff(pySpark_df, event_type):
    '''
    Function to find amount of sessions with highest difference in mean.
    
    args:
        pyspark_df - (pyspark dataframe) user_log of Sparkify
        event_type - (string) page event
    return:
    '''
    for i in range(1,10):
        avg_amount_songs_lastXsessions = calculate_avg_page_event_lastXsessions(pySpark_df, i, event_type)

        diff_pd = avg_amount_songs_lastXsessions.groupBy('churned').mean() \
            .select('avg(average_amount_songs_lastXsessions)').toPandas()

        print('Difference in average {} events for last {} sessions is {}'.format(
            event_type, i, diff_pd.values[1][0] - diff_pd.values[0][0]))
          

#### Thumbs Up Events per UserId per StreamingTime

In [None]:
# calculate events per user and streaming time
thumbsUp_per_streamingTime = create_statistic_per_userId_streamingTime(user_log_valid, 'Thumbs Up')

# show table with mean of churned/stayed
thumbsUp_per_streamingTime.groupBy('churned').mean() \
    .select('churned', 'avg(Thumbs Up_per_streamingTime)').show()

# create plot 
plot_events_per_userStreaming(thumbsUp_per_streamingTime, 'Thumbs Up')

Users who eventually churn give in average 37.18 Thumbs Up during usage of the service, users who stay with the service 62.98. To eliminate the influence of average longer registration duration for customers who did not churn, the graphic above shows Thumbs Up Events per UserIs per total streaming time of the user. There is only a slight difference in mean.

#### Thumbs Down Events per UserId per StreamingTime

In [None]:
# calculate events per user and streaming time
thumbsDown_per_streamingTime = create_statistic_per_userId_streamingTime(user_log_valid, 'Thumbs Down')

# show table with mean of churned/stayed
thumbsDown_per_streamingTime.groupBy('churned').mean() \
    .select('churned', 'avg(Thumbs Down_per_streamingTime)').show()

# create plot 
plot_events_per_userStreaming(thumbsDown_per_streamingTime, 'Thumbs Down')

The graphic above shows Thumbs Down Events per UserIs per total streaming time of the user. The difference in mean of Thumbs Down Events is higher than for Thumbs Up Events.

#### Roll Advert Events per UserId per StreamingTime

In [None]:
# calculate events per user and streaming time
rollAdvert_per_streamingTime = create_statistic_per_userId_streamingTime(user_log_valid, 'Roll Advert')

# show table with mean of churned/stayed
rollAdvert_per_streamingTime.groupBy('churned').mean() \
    .select('churned', 'avg(Roll Advert_per_streamingTime)').show()

# create plot 
plot_events_per_userStreaming(rollAdvert_per_streamingTime, 'Roll Advert')

In the dataset analyzed the average amount of adverts users experience is significantly higher for those who eventually churn, than for those who stay with the service. It is possible that this is due to a higher percentage of users using the free tier among those who churn.

In [None]:
user_log_valid.groupBy('churned','level') \
    .agg(F.count('userId')).show()

In [None]:
print('There are {:2.2f}% free tier users among those who churned and {:2.2f}% free tier users among those who \
    stayed'.format(12388/(32476+12388)*100,43333/(189957+43333)*100))

#### Downgrade Events per UserId per StreamingTime

In [None]:
# calculate events per user and streaming time
downgrade_per_streamingTime = create_statistic_per_userId_streamingTime(user_log_valid, 'Downgrade')

# show table with mean of churned/stayed
downgrade_per_streamingTime.groupBy('churned').mean() \
    .select('churned', 'avg(Downgrade_per_streamingTime)').show()

# create plot 
plot_events_per_userStreaming(downgrade_per_streamingTime, 'Downgrade')

There is a small difference in mean amount of Downgrade Page visits among User who churned versus not chunred relative to the his/her total streaming time.

#### Downgrades per UserId in last x sessions

In [None]:
# find amount of sessions with highest difference in mean
find_amount_sessions_for_max_diff(user_log_valid, 'Downgrade')

In [None]:
# average songs played for the last three sessionIds per userId

avg_amount_songs_lastXsessions = calculate_avg_page_event_lastXsessions(user_log_valid, 3, 'Downgrade')

avg_amount_songs_lastXsessions.groupBy('churned').mean().show()

plot_events_per_user(avg_amount_songs_lastXsessions, 'average_amount_songs_lastXsessions')

Now comparing the average amount of Downgrade events in the last observed sessionId for users who churned versus users who did stay with the service, the results seem more intuitive. Still, it is not clear if this feature helps to predict users who eventually churn.

#### Number of days from registration date to last observed event

In [None]:
# log entry with max amount days since registration
days_since_registration = user_log_valid.groupBy('userId', 'churned') \
        .agg(F.max('days_since_registration').alias('days_since_registration'))

In [None]:
# compare difference in mean
days_since_registration.groupBy('churned').mean().show()

# convert to pandas df to plot
days_since_registration_pd = days_since_registration.toPandas()

# plot
ax = sns.violinplot(data=days_since_registration_pd, y='churned', x='days_since_registration', orient='h')
plt.xlabel('Number of days from registration date to last observed event')
plt.ylabel('Customers Churned')
plt.title('Number of days from registration date to last observed event churned/not churned')
sns.despine(ax=ax);

There is a significant difference in registration duration comparing churned customers with those who stayed with the service. The mean duration from registration to last interaction with the streaming service is 57.8 days for users who eventually churn and 87.1 days for users who stay with the service. Therefore the time passed since a user registered to the streaming service is feature that can be useful to predict users who are prone to churn. There are some outliers visible where churned is false, these are the users who registered to the streaming service and stopped using it without cancelling the service.
The observed time period is about 2 months.
A user could be inactive for two month and use the service again.
Therefore in this analysis those users who are just inactivte, but do not cancel from the service, will be defined as not churned.

#### Average amount songs played between visiting home

In [None]:
# How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

udf_homevisit = F.udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userID') \
    .orderBy(F.desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

songs_per_homevisit = user_log_valid.filter((user_log_valid.page == "NextSong") | (user_log_valid.page == 'Home')) \
    .select('userID', 'churned', 'page', 'ts') \
    .withColumn('homevisit', udf_homevisit(F.col('page'))) \
    .withColumn('period', F.sum('homevisit').over(user_window))

songs_per_homevisit = songs_per_homevisit.filter((songs_per_homevisit.page == 'NextSong')) \
    .groupBy('userID', 'churned', 'period') \
    .agg({'period':'count'}) \
    .groupBy('userID', 'churned').mean() \
    .withColumnRenamed('avg(count(period))', 'avg_songs_between_home') \
    .select('userID', 'churned', 'avg_songs_between_home')


songs_per_homevisit.groupBy('churned') \
    .agg({'avg_songs_between_home':'avg'}).show()

In [None]:
# show lowest values of average songs between home visit for userIds
songs_per_homevisit.orderBy('avg_songs_between_home', ascending=True).show(n=8)

In [None]:
# convert spark object ot pandas dataframe
songs_per_homevisit_pd = songs_per_homevisit.toPandas()

# Plot
ax = sns.violinplot(data=songs_per_homevisit_pd, y='churned', x='avg_songs_between_home', orient='h')
plt.xlabel('Average amount of songs played between visiting the Home Page')
plt.ylabel('Customer Evantually Churned')
plt.title('Average Amount Songs played per userId between visiting Home Page')
sns.despine(ax=ax);

Users who evantually churned did in average play about two songs less between visiting the Home page than users who stayed with the service. Churned users played in average 20.16 songs between visiting the Home page, loyal users played in average 22.316 songs.

#### Average Amount Songs played per Session

In [None]:
avg_amount_songs_played_per_session = user_log_valid.select('userId', 'sessionId', 'churned', 'page') \
    .filter('page = "NextSong"') \
    .groupby('userId', 'sessionId', 'churned').count() \
    .select('userId', 'churned', 'count') \
    .groupby('userId', 'churned').mean() \
    .withColumnRenamed('avg(count)', 'avg_amount_songs_played_per_session') \

avg_amount_songs_played_per_session.groupBy('churned').mean().show()

plot_events_per_user(avg_amount_songs_played_per_session, 'avg_amount_songs_played_per_session')

#### Page Events per SessionId per UserId

In [None]:
find_optimal_amount_sessions_for_max_diff(user_log_valid)

In [None]:
# average songs played for the last three sessionIds per userId

avg_amount_songs_lastXsessions = calculate_avg_events_lastXsessions(user_log_valid, 3)

avg_amount_songs_lastXsessions.groupBy('churned').mean().show()

plot_events_per_user(avg_amount_songs_lastXsessions, 'average_amount_songs_lastXsessions')

#### Average amount songs played in last x sessions


In [None]:
# count sessionIds per userId

user_log_valid.select('userId', 'sessionId') \
    .groupBy('userId').count() \
    .orderBy('count') \
    .show(n=5)

There is no user in the dataset with less than 6 sessions.
Next step is to define a function to reuse for calculating average occurances of certain events for a defined amount of sessions per user.

In [None]:
# find amount of sessions with highest difference in mean
find_amount_sessions_for_max_diff(user_log_valid, 'NextSong')

The difference in average songs played comparing churned users with those who stayed is maximal when taking the average songs played for the last three sessionIds.

In [None]:
# average songs played for the last three sessionIds per userId

avg_amount_songs_lastXsessions = calculate_avg_page_event_lastXsessions(user_log_valid, 3, 'NextSong')

avg_amount_songs_lastXsessions.groupBy('churned').mean().show()

plot_events_per_user(avg_amount_songs_lastXsessions, 'average_amount_songs_lastXsessions')

#### Percentage of days since registration where user was active

In [None]:
activity_df = user_log_valid.groupBy('userId', 'churned') \
    .agg(F.max('days_since_registration').alias('days_since_registration'), 
         F.countDistinct('date').alias('days_active'))

activity_df = activity_df.filter('days_since_registration >= 1')
    
activity_df = activity_df \
    .withColumn('percentage_active_days', activity_df.days_active/activity_df.days_since_registration)

In [None]:
# compare difference in mean
activity_df.groupBy('churned').mean().show()

# convert to pandas df to plot
activity_pd = activity_df.toPandas()

# plot
ax = sns.violinplot(data=activity_pd, y='churned', x='percentage_active_days', orient='h')
plt.xlabel('Percentage of days since registration where user was active')
plt.ylabel('Customers Churned')
plt.title('Percentage of days since registration where user was active churned/not churned')
sns.despine(ax=ax);

#### Streaming time per active day

In [None]:
streaming_df = user_log_valid.filter(user_log_valid.page == 'NextSong') \
    .groupBy('userId', 'churned') \
    .agg((F.sum('length')/3600).alias('streamingTime_h'),
         F.countDistinct('date').alias('days_active'))
    
streaming_df = streaming_df \
    .withColumn('streaming_per_active_day', streaming_df.streamingTime_h/streaming_df.days_active)

In [None]:
# compare difference in mean
streaming_df.groupBy('churned').mean().show()

# convert to pandas df to plot
streaming_pd = streaming_df.toPandas()

# plot
ax = sns.violinplot(data=streaming_pd, y='churned', x='streaming_per_active_day', orient='h')
plt.xlabel('Streaming hours per active day')
plt.ylabel('Customers Churned')
plt.title('Streaming hours per active day churned/not churned')
sns.despine(ax=ax);

### 3.3 Feature Selection

After exploring possible features for a prediction model, the next step is to select the features which should be used for the machine learning model to decide if a user will churn or not.
If the model will be based on algorithms like Logistic Regression or Linear Regression, the features have to be checked for Multicollinearity. 
When the features have high correlation and one feature can be predicted from other features there might be Multicollinearity. This can lead to misleading results in the prediction of the label. 
Decision trees and boosted trees algorithms are immune to multicollinearity. When they decide to split, the tree will choose only one of the perfectly correlated features.
 [3](https://towardsdatascience.com/why-feature-correlation-matters-a-lot-847e8ba439c4)

In [16]:
# function to clean dataframe

def clean_dataset(user_log_df):
    '''
    Function to clean user_log datase from userIds and sessionIds from NA and empty values.
    '''

    user_log_clean_df = user_log_df.dropna(how = 'any', subset = ['userId', 'sessionId'])
    user_log_clean_df = user_log_clean_df.filter(user_log_clean_df['userId'] != '')

    print('The dataset originally contained {} rows. \nAfter cleaning there are {} rows left.'
          .format(user_log_df.count(), user_log_clean_df.count()))
    
    return user_log_clean_df

In [15]:
# create labeled dataframe

def create_churn_label(user_log_clean_df):
    '''
    Function to create dataframe with numeric labels for churn, gender and level per userId from user_log.
    '''
    
    user_log_flagged_df = flag_cancellation_events(user_log_clean_df)
    labeled_data = create_labeled_userIds(user_log_flagged_df)
    
    return labeled_data
     
    
def flag_cancellation_events(user_log_clean_df):
    '''
    Function to create new column in user_log where cancellation events are lagged with 1.
    '''
    
    flag_cancellation_event = F.udf(
        lambda cancellation_event: 1 if cancellation_event=='Cancellation Confirmation' else 0, IntegerType())
    
    user_log_flagged_df = user_log_clean_df.withColumn('cancellation_event', flag_cancellation_event('page'))

    return user_log_flagged_df


def create_labeled_userIds(user_log_flagged_df):
    '''
    Function to convert user_log to dataframe with one line per userId.
    '''
    
    window_timeOrdered = Window.partitionBy('userId').orderBy(F.desc('ts'), F.desc('itemInSession'))
    user_log_flagged_df = user_log_flagged_df.withColumn('rank', F.rank().over(window_timeOrdered))
    labeled_df = user_log_flagged_df.select('userId', 'cancellation_event', 'gender', 'level') \
        .filter(user_log_flagged_df.rank == 1)
    
    labeled_df = convert_to_numeric(labeled_df, 'gender')
    labeled_df = convert_to_numeric(labeled_df, 'level')
    
    return labeled_df
    
def convert_to_numeric(labeled_data, categegorical_column):  
    '''
    Function to convert a categorical dataframe to a numerical dataframe.
    '''
    
    indexer = StringIndexer(inputCol=categegorical_column, outputCol=categegorical_column+'Numeric')
    indexed_df = indexer.fit(labeled_data).transform(labeled_data).drop(categegorical_column)

    return indexed_df


In [None]:
# create features dataframe


In [14]:
user_log_clean = clean_dataset(user_log)
    
labeled_df = create_churn_label(user_log_clean)

# feature_df = create_features(user_log_clean)

# join feature_df and labeled_df



The dataset originally contained 286500 rows. 
After cleaning there are 278154 rows left.
+------+------------------+-------------+------------+
|userId|cancellation_event|genderNumeric|levelNumeric|
+------+------------------+-------------+------------+
|100010|                 0|          1.0|         1.0|
|200002|                 0|          0.0|         0.0|
|   125|                 1|          0.0|         1.0|
|   124|                 0|          1.0|         0.0|
|    51|                 1|          0.0|         0.0|
|     7|                 0|          0.0|         1.0|
|    15|                 0|          0.0|         0.0|
|    54|                 1|          1.0|         0.0|
|   155|                 0|          1.0|         0.0|
|100014|                 1|          0.0|         0.0|
|   132|                 0|          1.0|         0.0|
|   154|                 0|          1.0|         1.0|
|   101|                 1|          0.0|         0.0|
|    11|                 0|   

In [None]:
# create feature dataframe
pyspark_df = user_log_valid
# tier
features = pyspark_df.groupBy('userId').agg((F.))



df1.show(3)

# Thumbs Up Events per UserId per StreamingTime
# Thumbs Down Events per UserId per StreamingTime
# Roll Advert Events per UserId per StreamingTime
# Downgrade Events per UserId per StreamingTime
# Downgrades per UserId in last x sessions
# Number of days from registration date to last observed event
# Average amount songs played between visiting home
# Average Amount Songs played per Session
# Page Events per SessionId per UserId
# Average amount songs played in last 3 sessions
# Percentage of days since registration where user was active
# Streaming time per active day

## 4. Data Modeling

### 4.1 Create training and test data.

In [None]:
# create pyspark dataframe with userId, churned and features
# separate features from labels 
# use StringIndexer and OneHotEncoder to convert them into numeric features

In [None]:
# create vector use VectorAssembler to convert the newly created numeric features into vectors
# scaling
# separeate training from test data

## 5. Evaluate the Results

## 6. Deploy

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.

In [None]:
# number of days since registration

# for every user find registration event
min_ts = user_log_valid.select('userId', 'churned', 'ts') \
    .groupBy('userId', 'churned') \
    .agg({'ts':'min'})

# for every user find last event
max_ts = user_log_valid.select('userId', 'churned', 'ts') \
    .groupBy('userId') \
    .agg({'ts':'max'})

# for every user calculate time from registration event to last event
register_duration = min_ts.join(max_ts, on=['userId'], how='inner')

udf_convert_ts = F.udf(lambda timestamp: datetime.datetime.fromtimestamp(
    timestamp / 1000.0).strftime("%Y-%m-%d %H:%M:%S"))

register_duration = register_duration \
    .withColumn('registration', udf_convert_ts('min(ts)')) \
    .withColumn('last_page_event', udf_convert_ts('max(ts)')) \
    .select('userId', 'churned', 'registration', 'last_page_event') \

register_duration_a = register_duration \
    .withColumn('register_duration', F.datediff(F.col('last_page_event'), F.col('registration')))