# Sparkify Project
This Notebook contains the code of the Data Science Nanodegree Sparkify Capstone Project. Sparkify is a fictional music provider just like Spotify. In the Project, we analyse event data of Sparkify users. The data is given in terms of an event log which records events such as to which songs users are listening, are they adding friends or are they downgrading from paid to a free account etc.
We work only on a tiny subset (128MB) of the full dataset available (12GB) since the analysis is done locally. The goal is to predict churn.

In [68]:
# import libraries
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.feature import RegexTokenizer, CountVectorizer, \
    IDF, StringIndexer, VectorAssembler, Normalizer, StandardScaler
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

import numpy as np
import pandas as pd
#%matplotlib inline
#import matplotlib.pyplot as plt

## Create a Spark Session
We set up a spark session where the master node is given by the local computer. In a real spark cluster, we would refer to the IP address of the machine which defines the master node.

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Capstone_Project") \
    .getOrCreate()

## Load Data
First, we load the data into a Spark Data Frame, the data is contained in a JSON file in the data folder of the project with the name ‘mini_sparkify_event_data.json’

In [3]:
path_to_data = r"C:\Repositories\Sparkify-Project\data\mini_sparkify_event_data.json"

In [4]:
data = spark.read.json(path_to_data)
data.persist()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In order to get an idea of the columns and the corresponding data types, we print out the schema of the data frame: 

In [5]:
data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



## Clean Dataset
In this section, we will clean the dataset, i.e. checking for invalid or missing data - for example, records without userids or sessionids. Since we will primarily with the SQL abstraction of spark data frames, we need to register a temporary view of the data frame: 

In [6]:
data.createOrReplaceTempView('data_tbl')

First, we check if there are missing values (null values) in the userId column: 

In [7]:
spark.sql("""
    SELECT
        COUNT(userId)
    FROM
        data_tbl
    WHERE userId IS NULL
    """).show()

+-------------+
|count(userId)|
+-------------+
|            0|
+-------------+



We can see that there are no missing user IDs, but let’s also check if there are IDs which contain an empty string (which could correspond to no registered users): 

In [8]:
spark.sql("""
    SELECT
        COUNT(userId)
    FROM
        data_tbl
    WHERE userId == ''
    """).show()

+-------------+
|count(userId)|
+-------------+
|         8346|
+-------------+



We see that there are empy user IDs. Let's check for invalid session IDs:

In [9]:
spark.sql("""
    SELECT
        COUNT(userId)
    FROM
        data_tbl
    WHERE
        sessionId == ''
    OR
        sessionId IS NULL
    """).show()

+-------------+
|count(userId)|
+-------------+
|            0|
+-------------+



There are no invalid session IDs. We will now remove the invalid user IDs from the dataset:

In [10]:
data = spark.sql("""
    SELECT
        *
    FROM
        data_tbl
    WHERE
        userId != ''
    """)

In [11]:
data.createOrReplaceTempView('data_tbl')

## Exploratory Data Analysis
In this section, we perform EDA to get a better understanding of the data. Further, we need to provide a formal definition of churn. We start with the investigation of the ‘page’ column: 

In [12]:
spark.sql("""
    SELECT
        page,
        COUNT(UserId)
    FROM
        data_tbl
    GROUP BY
        page
    """).toPandas()

Unnamed: 0,page,count(UserId)
0,Cancel,52
1,Submit Downgrade,63
2,Thumbs Down,2546
3,Home,10082
4,Downgrade,2055
5,Roll Advert,3933
6,Logout,3226
7,Save Settings,310
8,Cancellation Confirmation,52
9,About,495


This gives us already a good indicator for the churn definition: we could define churned users as those users who have visited the ‘Cancelation Confirmation’ page. Further, we could take the ‘Submit Downgrade’ column into account. But maybe it is better to use it as a feature for the churn prediction, since it could be considered as an early warning indicator for churn, if a user submits a downgrade. We will investigate that later.

### Define Churn

Let’s start with the definition of an indicator if a user has visited the ‘Cancel Confirmation’ page:

In [13]:
data = spark.sql("""
    SELECT
        *,
        CASE
            WHEN page == 'Cancellation Confirmation' THEN 1
            ELSE 0
        END as hasVisitCancel
    FROM
        data_tbl
    """)

In [14]:
data.createOrReplaceTempView('data_tbl')

Based on that indicator, we can identify all churned users:

In [15]:
churned_users = spark.sql("""
                            SELECT
                                DISTINCT userID
                            FROM
                                data_tbl
                            WHERE
                                hasVisitCancel = 1
                            """).toPandas().values

In [16]:
churned_users = [item[0] for item in churned_users]

In [17]:
churned_users[0:5]

['125', '51', '54', '100014', '101']

Next, let us define a column churn by using a user defined function which flags all churned user with a 1 and 0 oterwise:

In [18]:
has_churned = lambda user: 1 if user in churned_users else 0
spark.udf.register('has_churned', has_churned)

<function __main__.<lambda>>

In [19]:
data = spark.sql("""
    SELECT
        *,
        has_churned(userId) as churn
    FROM
        data_tbl""")

In [20]:
data.createOrReplaceTempView('data_tbl')

Let's check how many churned and non-churned user we have in the dataset:

In [21]:
spark.sql("""
          SELECT
              churn,
              count(distinct userId)
            FROM
                data_tbl
            GROUP BY
                churn
            """).show()

+-----+----------------------+
|churn|count(DISTINCT userId)|
+-----+----------------------+
|    0|                   173|
|    1|                    52|
+-----+----------------------+



This indicates some imbalance between churned and non-churned users.

# Feature Engineering
In this section we are looking for features which provide signals if a user will potentially churn. As mentioned above, an early warning indicator could be the fact if a user has visited the ‘Submit Downgrade’ page. Let us first investigate if this assumption holds true. We apply analog steps as in the definition of the 'churn' column: 

In [22]:
user_with_downgrade = spark.sql("select distinct userId from data_tbl where page = 'Submit Downgrade'")
user_with_downgrade = user_with_downgrade.toPandas()['userId'].values

In [23]:
has_sub_downgrade = udf(lambda user: 1 if user in user_with_downgrade else 0, IntegerType())
spark.udf.register('has_sub_downgrade', has_sub_downgrade)

<function __main__.<lambda>>

In [24]:
data = spark.sql("""
    SELECT
        *,
        has_sub_downgrade(userId) as hasSubDowngrade
    FROM
        data_tbl
    """)

In [25]:
data.createOrReplaceTempView('data_tbl')

In [26]:
feature_downgrade = spark.sql("""
    SELECT
        DISTINCT userId,
        hasSubDowngrade,
        churn
    FROM
        data_tbl
    """)

In [27]:
feature_downgrade.createOrReplaceTempView('feature_downgrade_tbl')

In [28]:
spark.sql("select churn, sum(hasSubDowngrade)/count(distinct userId) from feature_downgrade_tbl group by churn").show()

+-----+-----------------------------------------------------------------------------------------------+
|churn|(CAST(sum(CAST(hasSubDowngrade AS BIGINT)) AS DOUBLE) / CAST(count(DISTINCT userId) AS DOUBLE))|
+-----+-----------------------------------------------------------------------------------------------+
|    0|                                                                            0.23121387283236994|
|    1|                                                                            0.17307692307692307|
+-----+-----------------------------------------------------------------------------------------------+



As we can see, there is basically no (major) difference in submitting a downgrade for churned and non-churned users. Interestingly, non-churned users are submitting downgrades more frequently. As a result, we plan to not to include the submit downgrade indicator as a feature.

### Feature Usage Time
We can use the length column to define a 'usage time' feature:

In [29]:
feature_usage_time = spark.sql("""
    SELECT
        DISTINCT userId,
        sum(nvl(length, 0)) as usageTime
    FROM
        data_tbl
    WHERE
        page = 'NextSong'
    GROUP BY
        userId
    """)
feature_usage_time.createOrReplaceTempView('feature_usage_time_tbl')

In [30]:
usage_time = spark.sql("""
    SELECT
        churn,
        sum(length)/count(distinct userId) as usageTime
    FROM
        data_tbl
    WHERE
        page = 'NextSong'
    GROUP BY
        churn
    """).toPandas()

In [31]:
usage_time

Unnamed: 0,churn,usageTime
0,0,276166.937468
1,1,174014.268551


We see that churned usere have a lower usage time. Hence we will include the usage time as feature:

In [32]:
feature_usage_time = spark.sql("""
    SELECT
        DISTINCT userId,
        sum(nvl(length, 0)) as usageTime
    FROM
        data_tbl
    WHERE
        page = 'NextSong'
    GROUP BY
        userId
    """)
feature_usage_time.createOrReplaceTempView('feature_usage_time_tbl')

### Feature Number of Added Friends
We assume that non-churned users will add friends more frequently, lets check if this is true:

In [33]:
spark.sql("""
    SELECT
        churn,
        count(1)/count(DISTINCT userId)
    FROM
        data_tbl
    WHERE
        page = 'Add Friend'
    GROUP BY
        churn
    """).show()

+-----+-------------------------------------------------------------------+
|churn|(CAST(count(1) AS DOUBLE) / CAST(count(DISTINCT userId) AS DOUBLE))|
+-----+-------------------------------------------------------------------+
|    0|                                                  22.47530864197531|
|    1|                                                 14.454545454545455|
+-----+-------------------------------------------------------------------+



We see that non-churned users indeed add friends more often. Hence we include that as a feature:

In [34]:
feature_add_friends = spark.sql("""
    SELECT
        DISTINCT userId,
        count(1) as n_addFriend
    FROM
        data_tbl
    WHERE
        page = 'Add Friend'
    GROUP BY
        userId
    """)

feature_add_friends.createOrReplaceTempView('feature_add_friends_tbl')

### Feature Asking for Help
It seems to be plausible that non-churned usere contact the help more frequently. Lets see if this is correct:

In [35]:
spark.sql("""
    SELECT
        churn,
        count(1)/count(DISTINCT userId)
    FROM
        data_tbl
    WHERE
        page = 'Help'
    GROUP BY
        churn
    """).show()

+-----+-------------------------------------------------------------------+
|churn|(CAST(count(1) AS DOUBLE) / CAST(count(DISTINCT userId) AS DOUBLE))|
+-----+-------------------------------------------------------------------+
|    0|                                                  8.154362416107382|
|    1|                                                  5.558139534883721|
+-----+-------------------------------------------------------------------+



As we can see, non-churned users contact the help page more often. Thus, we include it as feature:

In [36]:
feature_help = spark.sql("""
    SELECT
        DISTINCT userId,
        count(1) as n_help
    FROM
        data_tbl
    WHERE
        page = 'Help'
    GROUP BY
        userId
    """)

feature_help.createOrReplaceTempView('feature_help_tbl')

### Feature Length Paid User
Maybe the time of using payed services is longer for non-churned users. Lets investigate:  

In [37]:
spark.sql("""
    SELECT
        churn,
        count(1)/count(DISTINCT userId)
    FROM
        data_tbl
    WHERE
        level = 'paid'
    GROUP BY
        churn
    """).show()

+-----+-------------------------------------------------------------------+
|churn|(CAST(count(1) AS DOUBLE) / CAST(count(DISTINCT userId) AS DOUBLE))|
+-----+-------------------------------------------------------------------+
|    0|                                                 1472.5348837209303|
|    1|                                                  902.1111111111111|
+-----+-------------------------------------------------------------------+



Our assumption is correct, hence we include the corresponding feature:

In [38]:
feature_length_paid = spark.sql("""
    SELECT
        DISTINCT userId,
        sum(nvl(length, 0)) as lengthAsPaid
    FROM
        data_tbl
    WHERE
        level = 'paid'
    GROUP BY
        userId
    """)

feature_length_paid.createOrReplaceTempView('feature_length_paid_tbl')

### Feature Length Free User
Analog as above, we define a feature which measures the time of being a user which uses free services:

In [39]:
feature_length_free = spark.sql("""
    SELECT
        DISTINCT userId,
        sum(nvl(length, 0)) as lengthAsFree
    FROM
        data_tbl
    WHERE
        level = 'free'
    GROUP BY
        userId
    """)

feature_length_free.createOrReplaceTempView('feature_length_free_tbl')

## Combining the Features

Before we can combine all the features, we need to do some minimal cleaning, since the filtering in the feature definition can lead to the loss of some user IDs. But since we want to include all users, we need to left join some of the feature tables with the overall data table in order to not lose some of the users:   

In [40]:
feature_add_friends = spark.sql("""
    SELECT
        DISTINCT d.userId,
        n_addFriend
    FROM
        data_tbl as d
    LEFT JOIN
        feature_add_friends_tbl as f
    ON
        d.userId=f.userId
    """)

feature_add_friends.createOrReplaceTempView('feature_add_friends_tbl')

In [41]:
feature_help = spark.sql("""
    SELECT
        DISTINCT d.userId,
        n_help
    FROM
        data_tbl as d
    LEFT JOIN
        feature_help_tbl as f
    ON
        d.userId=f.userId
    """)

feature_help.createOrReplaceTempView('feature_help_tbl')

In [42]:
feature_length_paid = spark.sql("""
    SELECT
        DISTINCT d.userId,
        lengthAsPaid
    FROM
        data_tbl as d
    LEFT JOIN
        feature_length_paid_tbl as f
    ON
        d.userId=f.userId
    """)

feature_length_paid.createOrReplaceTempView('feature_length_paid_tbl')

In [43]:
feature_length_free = spark.sql("""
    SELECT
        DISTINCT d.userId,
        f.lengthAsFree
    FROM
        data_tbl as d
    LEFT JOIN
        feature_length_free_tbl as f
    ON
        d.userId=f.userId
    """)

feature_length_free.createOrReplaceTempView('feature_length_free_tbl')

Now we are able to include all the features in one table:

In [44]:
features = spark.sql("""
    SELECT
        usage.userId,
        free.lengthAsFree,
        paid.lengthAsPaid,
        n_help,
        n_addFriend,
        usageTime 
    FROM
        feature_usage_time_tbl as usage
    LEFT JOIN
        feature_length_free_tbl as free
    ON
        usage.userId=free.userId 
    LEFT JOIN
        feature_length_paid_tbl as paid
    ON
        paid.userId = free.userId 
    LEFT JOIN
        feature_help_tbl as help
    ON
        help.userId=paid.userId 
    LEFT JOIN
        feature_add_friends_tbl as friends
    ON
        friends.userId=help.userId
    """)

features.createOrReplaceTempView('features_tbl')

In [45]:
features = spark.sql("""
    SELECT
        nvl(userId,0) as userId,
        nvl(lengthAsFree,0) as lengthAsFree,
        nvl(lengthAsPaid,0) as lenghtAsPaid,
        nvl(n_help,0) as n_help,
        nvl(n_addFriend,0) as n_addFriend,
        nvl(usageTime,0) as usageTime
    FROM
        features_tbl
    """)

features.createOrReplaceTempView('features_tbl')

Now we include also the target label:

In [46]:
targets = spark.sql("""
    SELECT
        distinct userId,
        churn as label
    FROM
        data_tbl"""
         )

targets.createOrReplaceTempView('targets_tbl')

In [48]:
features = spark.sql("""
    SELECT
        f.*,
        CAST(t.label AS INT) as label
    FROM
        features_tbl as f,
        targets_tbl as t
    WHERE
        f.userId=t.userId
        """)

features.createOrReplaceTempView('features_tbl')

# Use Machine Learning to Predict Churn
In this section we will train a binary classifier to predict churn. Different models (cf. below) will be used.
In a first step, we need to vectorize the features and scale them:

In [58]:
assembler = VectorAssembler(inputCols=["lengthAsFree", "lenghtAsPaid", "n_help", "n_addFriend", "usageTime"], outputCol="AllFeatures")
scaler = Normalizer(inputCol="AllFeatures", outputCol="ScaledAllFeatures")

Now we initialite two different models and wrap them in a pipeline object:

In [73]:
model_lr = LogisticRegression(featuresCol="ScaledAllFeatures", labelCol="label", maxIter=20, regParam=0.01)
model_gbt = GBTClassifier(featuresCol="ScaledAllFeatures", labelCol="label")

pipeline_lr = Pipeline(stages=[assembler, scaler, model_lr])
pipeline_gbt = Pipeline(stages=[assembler, scaler, model_gbt])

We use 80% of the data for training and 20% for testing:

In [74]:
training, test = features.randomSplit([0.8, 0.2])

With the fit method of the pipeline object, we can train the models on the training data set:

In [75]:
model_lr_fitted = pipeline_lr.fit(training)
model_gbt_fitted = pipeline_gbt.fit(training)

In order the evaluate the performance of the different models, we define a helper function:

In [69]:
def model_performance(model, test_data, metric = 'f1'):
    """ Evaluate a machine learning model's performance 
    
        Input: 
            model(object) - trained model or pipeline object
            metric - the metric used to measure performance
            data - test data on which performance measurement should be performed
        Output:
            score
    """
    
    evaluator = MulticlassClassificationEvaluator(metricName = metric)
    predictions = model.transform(test_data)
    
    # calcualte score
    score = evaluator.evaluate(predictions)
    
    return score

In [76]:
model_performance(model_lr_fitted, test)

0.6238720262510253

In [77]:
model_performance(model_gbt_fitted, test)

0.6774823708785972

We conclude that the Logistic Regression Model performs best in terms of the metric f1-score.