# Sparkify Project 

Sparkify is a fictitious music streaming platform and in this project we'll be building a model to predict customer churn.

The project starts off with initializing a Spark instance,  loading the data, dropping cases where there were no user IDs and then an exploration of the data.

Next the target variable (whether a user has churned or not) is defined and several features at user level are created.

These features are then compared across the two user groups, i.e. churn and no churn, to identify relevant features which may help in predicting users' behaviour.

A new dataframe containing only the relevant features and the labels (using the tiny dataset or the full one depending upon the notebook chosen) is then created. Before the data is split into a train and a test set, outliers of numerical features are removed, numerical features are scaled using min-max scaling and categorical features are encoded using one-hot encoding.

The training and test datasets are then used to compare a logistic regression, random forest classifier, gradient-boosted tree classifier, a linear support-vector classifier, a decision tree classifier and a naive Bayes classifier. The models are evaluated using the F1 score to account for the imbalanced nature of the dataset (i.e. there are only few users who have actually churned).

The best model is then chosen and optimised by using hyperparameter tuning.

Finally, the results of the optimised model are briefly discussed.

In [1]:
# import libraries
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import isnan, when, count, col, isnull, udf, last, first, struct, lit, approx_count_distinct
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import when as Fwhen
from pyspark.sql.functions import lag as Flag
from pyspark.sql.functions import isnull as Fisnull
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, MinMaxScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import matplotlib.pyplot as plt
from matplotlib.ticker import PercentFormatter
import seaborn as sns
from datetime import datetime
from datetime import timedelta
import pandas as pd
import numpy as np
# Magic command to display plots inline
%matplotlib inline

In [5]:
# create a Spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Creating Features") \
    .getOrCreate()
# Reduce number of partitions to only 5 to easily deal with small dataset
spark.conf.set("spark.sql.shuffle.partitions", 5)

Exception: Java gateway process exited before sending its port number

In [7]:
export PYSPARK_DRIVER_PYTHON=jupyter

SyntaxError: invalid syntax (<ipython-input-7-db353a463462>, line 1)

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [None]:
# Define path to the mini data set used for initial steps and load this data
path = "mini_sparkify_event_data.json"
user_log = spark.read.json(path)

In [None]:
# Display data schema to get an idea of what the data looks like
user_log.printSchema()

In [None]:
def print_shape (df):
    """
    This function prints out the shape of the input dataframe.
    Parameters:
    df(pandas.DataFrame): The input dataframe to be its size printed
    Returns:
    None - Prints out shape
    """
    df_len = df.count()
    df_cols = len(df.columns)
    print('DataFrame has {} log entries each having {} features'.format(df_len,df_cols))

In [None]:
print_shape(user_log)

In [None]:
user_log.head()

**dealing with missing values**

In [None]:
# Used both isnan and isnull PySpark SQL functions as suggested at 
# https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe
user_log.select([count(when(isnan(c), c)).alias(c) for c in user_log.columns]).show()
user_log.select([count(when(isnull(c), c)).alias(c) for c in user_log.columns]).show()
# Also look for cases with empty strings
user_log.select([count(when(col(c)=='', c)).alias(c) for c in user_log.columns]).show()

In [None]:
# Have a closer look at the missing values
# For cases where we have no first name we would expect users not to be registered and hence to have no user ID
user_log.filter(isnull(user_log.firstName)).select('registration').dropDuplicates().show()
user_log.filter(isnull(user_log.firstName)).select('userId').dropDuplicates().show()
# For cases where there is no information on the artist or the song check if people were doing other things
# than listening to a song
user_log.filter(isnull(user_log.artist)).select('page').dropDuplicates().show(30, False)

We have two cases of missing values:

* **8346** rows of unregistered users (the "userId" column is empty and there is no personal user information, i.e. first/ last name and gender available)
* **58,392** rows where users are not listening to a song and there is hence no song related information
For userId, we will simply drop these rows since we only care about the churn of existing users. Missing song data is by no means a deal breaker since in all cases where people are not listening to a song, we would not expect to get any song related information.

In [None]:
# inspect 'userId' column
user_log.select('userId').dropDuplicates().sort('userId').show(5)

In [None]:
# Drop rows with unregistered users
user_log = user_log.where(user_log.userId != '')

In [None]:
print_shape(user_log)

# Exploratory Data Analysis

In [None]:
# Have a look at the number of unique user IDs and the average number of interactions per user ID
num_users = user_log.select('userId').dropDuplicates().count()
print('Number of unique users: {}'.format(num_users))
print('Average number of interactions per user {}'.format(user_log.count()/num_users))

In [None]:
# Create a pandas dataframe with the distribution of the number of interactions and plot it
interactions_df = user_log.groupBy('userId').count().toPandas()
interactions_df.plot.hist(sns.color_palette()[0], legend = None)
plt.xlabel('Number of Interactions')
sns.set(style="ticks")
sns.despine(right=True,top=True)
plt.savefig('sparkify_num_interactions.png', dpi = 300)
print('Median number of interactions per user: {}'.format(interactions_df['count'].median()))
print('Standard deviation of interactions per user: {}'.format(interactions_df['count'].std()))
print('Minimum number of interactions for a single user: {}'.format(interactions_df['count'].min()))
print('Maximum number of interactions for a single user: {}'.format(interactions_df['count'].max()))


There is some variability in the number of interactions per user. A few users have very high number of interactions while most fall below 4000 interactions. 

In [None]:
user_log.toPandas().head()

As shown in the table above, the time stamp columns (ts and registration) are not in the right format. We will convert them to a more readable format.

In [None]:
# Convert timestamps to year-month-day and save this in a new column 
ts_to_year_month = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d'))
user_log = user_log.withColumn('year_months',ts_to_year_month(user_log.ts))
user_log = user_log.withColumn('registration_time', ts_to_year_month(user_log.registration))

In [None]:
user_log.toPandas().head()

### Define Churn ¶

A user churns when there's an event called Cancellation Confirmation appeared in activity log. This event happens for both paid and free users.
There may be users who perhaps never used the paid service and who stopped using the service, but never bothered to cancel their registration. Given that the data we have here only covers about two months, it is probably not reasonable to try to cover this problem, but it is certainly a point worth keeping in mind.

We'll perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. We'll explore aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [None]:
user_log.select('page').dropDuplicates().show()

In [None]:
# Identify users who canceled their registration
flag_cancel = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())
user_log = user_log.withColumn('cancel_page',flag_cancel('page'))
windowval = Window.partitionBy('userId')
user_log = user_log.withColumn('churned',Fsum('cancel_page').over(windowval))

In [None]:
# Check if values of churned column are as expected, i.e. either 0 or 1
user_log.select('churned').dropDuplicates().show()

In [None]:
lifetime_pd = user_log.where('page == "NextSong" OR page == "Thumbs Up"').groupby(['userId', 'churn', 'gender', 'page']).count().toPandas()
lifetime_pd = lifetime_pd.pivot_table(index=['userId','churn','gender'], values='count', columns='page').reset_index()

In [None]:
# Create a pandas dataframe for each user group containing the measures we are interested in
# Define the relevant columns in a list
cols_analysis = ['userId','sessions_num','dist_num_artists','avg_session_lgth','song_avg_session','home_avg_session', 
                 'thumbs_up_avg_session', 'thumbs_down_avg_session', 'add_playlist_avg_session', 'add_friend_avg_session', 
                 'settings_avg_session', 'help_avg_session', 'error_avg_session', 'usage_length', 'state', 'gender']
churned_df = user_log.filter(user_log.churned == 1).select(*cols_analysis).dropDuplicates().toPandas()
not_churned_df = user_log.filter(user_log.churned == 0).select(*cols_analysis).dropDuplicates().toPandas()

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.