# Sparkify Project Workspace

Insights about their users and estimation about their behavior is of upmost imporatance companies that rely on a interactive paltform. Based on those findings the provided content and used concept can be adjusted to garantee a good user experience, freequent and longterm interaction and therefore profit for the company. For platfroms, that use a two level subscription model with free and paid subscription the prediction about likeleyhood of the users to churn and switch from a paid to free subscription is therefore of intrest. Based on those insights individualized offers or pricing can be used influence their decission and keep their paid subscription. In this work we investigate the interaction behavior of users of the music platform "sparkify", try to find determining factors that influence their decision to cancel their paid subscription and develop a model that allows the classification of users based on those factors. For the analysis and the modelling we use [spark](https://spark.apache.org/docs/latest/api/python/index.html) and the python api pyspark. This library was designed to allow wrangeling and modelling of data that is considered "big data" and can not be processed with only one machine. In this notebook we demonstrate the principales, using a 128 MB subset of the 12 GB data.

In [1]:
# import libraries
import pyspark as spark
from pyspark.sql import SparkSession
import zipfile
import numpy as np
import plotly.graph_objects as go
from tqdm import tqdm

# unzip compressed data file
zip_ref = zipfile.ZipFile('mini_sparkify_event_data.json.zip', 'r')
zip_ref.extractall()
zip_ref.close()

In [2]:
# create a Spark session
spark = SparkSession.builder \
        .appName("Sparkify") \
        .getOrCreate()


# Load and Clean Dataset

First we load the data and check it for missing values or erronous records that have to be cleaned

In [3]:
# Load data set
data = spark.read.json("mini_sparkify_event_data.json")
data.createOrReplaceTempView("data")
data.count()
data.columns
data.show()

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+---------------+-------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|           page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+---------------+-------------+---------+--------------------+------+-------------+--------------------+------+
|      Martha Tilston|Logged In|    Colin|     M|           50| Freeman|277.89016| paid|     Bakersfield, CA|   PUT|       NextSong|1538173362000|       29|           Rockpools|   200|1538352117000|Mozilla/5.0 (Wind...|    30|
|    Five Iron Frenzy|Logged In|    Micah|     M|           79|    Long|236.09424| free|Bost

In [4]:
# Check for missing values or erronous records
# Try to figure out which columns have missing values, are NULL

print('Check if Null')
for column in data.columns:
    print(column,spark.sql(f"SELECT * FROM data WHERE {column} IS NULL").count())
print()
print('Check if empty string')
for column in data.columns:
    print(column,spark.sql(f"SELECT * FROM data WHERE {column} = ''").count())

Check if Null
artist 58392
auth 0
firstName 8346
gender 8346
itemInSession 0
lastName 8346
length 58392
level 0
location 8346
method 0
page 0
registration 8346
sessionId 0
song 58392
status 0
ts 0
userAgent 8346
userId 0

Check if empty string
artist 0
auth 0
firstName 0
gender 0
itemInSession 0
lastName 0
length 0
level 0
location 0
method 0
page 0
registration 0
sessionId 0
song 0
status 0
ts 0
userAgent 0
userId 8346


- No missing values for auth, itemInSession, level, method, page, sessionID, status, ts, userID
- but 8346 empty string userIds
We are not going to consider empty string userIds therefore we create a new data frame

In [5]:
# Clean Data, create data frame without empty user ids 
data_cleaned = spark.sql(f"SELECT * FROM data WHERE userId != ''")
data_cleaned.createOrReplaceTempView("data_cleaned")

In [6]:
# Get number of unique items in each row
for column in data_cleaned.columns:
    print(column,spark.sql(f"SELECT COUNT(DISTINCT({column})) FROM data_cleaned").collect()[0][0])

artist 17655
auth 2
firstName 189
gender 2
itemInSession 1311
lastName 173
length 14865
level 2
location 114
method 2
page 19
registration 225
sessionId 2312
song 58480
status 3
ts 269770
userAgent 56
userId 225


- So 225 unique users, that we have to make predictions 

Short comparison of users with free and paid subscription as well as their interaction behavior

In [41]:
# Comparison of number of interactions
paid_user_interactions = np.array(data_cleaned[data_cleaned['level']=='paid'].groupBy('userId').count().orderBy('count', ascending=False).collect()).astype(int)
free_user_interactions = np.array(data_cleaned[data_cleaned['level']=='free'].groupBy('userId').count().orderBy('count', ascending=False).collect()).astype(int)

# Comparison number of paied and free users
fig = go.Figure()
fig.add_trace(go.Bar(x= ['free users'], y = [free_user_interactions.shape[0]], name = 'Free subscription'))
fig.add_trace(go.Bar(x = ['paid users'], y= [paid_user_interactions.shape[0]], name = 'Paid subscription'))
fig.add_trace(go.Bar(x = ['unique users'], y= [225], name = 'Unique users'))
fig.add_trace(go.Bar(x = ['canceling user'], y= [52], name = 'Canceling users'))
fig.add_trace(go.Bar(x = ['non-canceling user'], y= [173], name = 'Non-canceling users'))
fig.update_yaxes(title_text='Number of users')
fig.update_layout(title_text="Comparison of subscription levels",
                  template='plotly_white')
fig.show()
# Comparison of number of interactions
fig = go.Figure()
fig.add_trace(go.Histogram(x=free_user_interactions[:,1], name = 'Free subscription'))
fig.add_trace(go.Histogram(x=paid_user_interactions[:,1], name = 'paid subscription'))
fig.update_xaxes(title_text='Number of interactions')
fig.update_yaxes(title_text='Number of users')
fig.update_layout(title_text="Comparison interactions free and paying users",
                  template='plotly_white')
fig.show()

- Of the 225 unique users, 165 at one time paid had a paid account and 195 a free account
- The interaction behavior differs strongly:
    - most of the free unsers have only up to a few hundrets (400) interactions
    - paying users have more than 1000

# Exploratory Data Analysis

We explore the data with the goal to understand which factors indicate that a user is likely to "churn" and cancel their paid subscription. Based on those findings we try to engineer features that can be used for training a model that classifies the users in a later step. We define the churn a interaction with the "Cancelation confirmation page"

First we need to find the users who churned.


In [8]:
# Count of the interactions with the different pages
free_count_of_pages = np.array(data_cleaned[data_cleaned['level'] == 'free'].groupBy('page').count().orderBy('count', ascending=False).collect())
paid_count_of_pages = np.array(data_cleaned[data_cleaned['level'] == 'paid'].groupBy('page').count().orderBy('count', ascending=False).collect())
fig = go.Figure()
fig.add_trace(go.Bar(x=free_count_of_pages[:,1].astype(float), y=free_count_of_pages[:,0], orientation='h', name='Free subscription'))
fig.add_trace(go.Bar(x=paid_count_of_pages[:,1].astype(float), y=paid_count_of_pages[:,0], orientation='h', name='Paid subscription'))
fig.update_xaxes(title_text='Number of interactions')
fig.update_layout(title_text="Number of page interactions",
                  height=600,
                  template='plotly_white')
fig.show()

So compared to the total interactions the interactions with the "Cancellation Confirmation" page is maginal. It is therefore more reasonable to investigate the interactions per unique user id

In [9]:
# Count of the interactions with the different pages for unique users per page
import pyspark.sql.functions as func
free_count_of_pages = np.array(data_cleaned[data_cleaned['level'] == 'free'].groupby('page').agg(func.countDistinct('userID')).collect())
paid_count_of_pages = np.array(data_cleaned[data_cleaned['level'] == 'paid'].groupby('page').agg(func.countDistinct('userID')).collect())


fig = go.Figure()
fig.add_trace(go.Bar(x=free_count_of_pages[:,1].astype(float), y=free_count_of_pages[:,0], orientation='h', name='Unique free subscription users'))
fig.add_trace(go.Bar(x=paid_count_of_pages[:,1].astype(float), y=paid_count_of_pages[:,0], orientation='h', name='Unqiue paid subscription users'))
fig.update_xaxes(title_text='Number of unique users')
fig.update_layout(title_text="Number of page interactions per unqiue users",
                  height=600,
                  template='plotly_white')
fig.show()


This representation is more meaningfull we see:
- Both free and paying users interacted withe the "Cancellation Confirmation" page
- Overall 52 users of the 225 unqiue users canceled the subscription

## Let's investigate some factors that might influence the cancellation. We are going to focus on:
- Information and attributes of the user, e.g. gender, time since registration ...
- The way the user interacts with the platform, e.g. which pages are visted, how much time is spend ...
- If the interaction behavior changes
- If the user likes the content, e.g. Ratio of Thumbs Up to Thumps Down, number of songs in playlist ...
- If the user networks over the platform by adding friends
- If malfunctions or errors of the platform have an influence, interactions with the error page

### Information and attributes of the user

In [10]:
# Comparison difference between last time stamp ts and time of registration for users how canceled users and non canceled users
canceled_users = np.array(spark.sql("SELECT userId, max(ts)-max(registration) FROM data_cleaned WHERE userId IN \
                        (SELECT userId FROM data_cleaned WHERE page = 'Cancellation Confirmation') \
                        GROUP BY userId").collect()).astype(int)
non_canceled_users = np.array(spark.sql("SELECT userId, max(ts)-max(registration) FROM data_cleaned WHERE userId NOT IN \
                        (SELECT userId FROM data_cleaned WHERE page = 'Cancellation Confirmation') \
                        GROUP BY userId").collect()).astype(int)

fig = go.Figure()
fig.add_trace(go.Box(y=canceled_users[:,1]/1000/3600/24, name = 'Canceling users'))  
fig.add_trace(go.Box(y=non_canceled_users[:,1]/1000/3600/24, name = 'Non-Canceling users')) 
fig.update_yaxes(title_text='Time difference [days]')
fig.update_layout(title_text="Difference between time of registration and last time stamp",
                  template='plotly_white')
fig.show()    
                  


- Non canceling users are on average longer on the platform the platform with a median of 75.4 days since the registration
- Most of the canceling users cancel after 51.3 days on the platform

In [11]:
# Distribution of gender of canceling and non canceling users
canceled_users = np.array(spark.sql("SELECT gender, COUNT(DISTINCT(userId)) FROM data_cleaned WHERE userId IN \
                        (SELECT userId FROM data_cleaned WHERE page = 'Cancellation Confirmation') \
                        GROUP BY gender").collect())
non_canceled_users = np.array(spark.sql("SELECT gender, COUNT(DISTINCT(userId)) FROM data_cleaned WHERE userId NOT IN \
                        (SELECT userId FROM data_cleaned WHERE page = 'Cancellation Confirmation') \
                        GROUP BY gender").collect())

from plotly.subplots import make_subplots

fig = make_subplots(rows=1, cols=2,
             subplot_titles=("Canceling users", "Non-canceling users"),
             specs=[[{"type": "domain"}, {"type": "domain"}]])

fig.add_trace(
    go.Pie(values=canceled_users[:,1], labels=canceled_users[:,0], name='Canceling users'),
    row=1, col=1
)

fig.add_trace(
    go.Pie(values=non_canceled_users[:,1], labels=non_canceled_users[:,0], name='Non-Canceling users'),
    row=1, col=2
)

fig.update_layout(title_text="Comparison of gender fraction",
                  template='plotly_white')
fig.show()

- The fraction of users with male gender is larger for cencelling users

### Interaction of users with platform, Networking of users, Malfunction & Erros

In [12]:
# Interactions with pages
canceled_users = np.array(spark.sql("SELECT page, COUNT(DISTINCT(userId)) FROM data_cleaned WHERE userId IN \
                        (SELECT userId FROM data_cleaned WHERE page = 'Cancellation Confirmation') \
                        GROUP BY page").collect())
non_canceled_users = np.array(spark.sql("SELECT page, COUNT(DISTINCT(userId)) FROM data_cleaned WHERE userId NOT IN \
                        (SELECT userId FROM data_cleaned WHERE page = 'Cancellation Confirmation') \
                        GROUP BY page").collect())
num_cancels=52
num_non_cancels=173
fig = go.Figure()
fig.add_trace(go.Bar(x=canceled_users[:,1].astype(float)/num_cancels*100, y=canceled_users[:,0], orientation='h', name='Canceling users'))
fig.add_trace(go.Bar(x=non_canceled_users[:,1].astype(float)/num_non_cancels*100, y=non_canceled_users[:,0], orientation='h', name='Non-canceling users'))
fig.update_xaxes(title_text='%')
fig.update_layout(title_text="Percentage of users visiting page to users",
                  height=600,
                  template='plotly_white')
fig.show()

- The percent of interactions with error page higher for non canceling users
- Slightly more "Thumbs Down" and less "Thumbs Up" for canceling users
- Non-canceling users add more friends


In [13]:
# Avg. length of interaction

canceled_users = np.array(spark.sql("SELECT userId, AVG(length) FROM data_cleaned WHERE userId IN \
                        (SELECT userId FROM data_cleaned WHERE page = 'Cancellation Confirmation') \
                        GROUP BY userId").collect())
non_canceled_users = np.array(spark.sql("SELECT userId, AVG(length) FROM data_cleaned WHERE userId NOT IN \
                        (SELECT userId FROM data_cleaned WHERE page = 'Cancellation Confirmation') \
                        GROUP BY userId").collect())

fig = go.Figure()
fig.add_trace(go.Box(y=canceled_users[:,1], name = 'Canceling users'))  
fig.add_trace(go.Box(y=non_canceled_users[:,1], name = 'Non-canceling users')) 
fig.update_yaxes(title_text='Length of interaction [s]')
fig.update_layout(title_text="Comparison of avg. length of interaction",
                  template='plotly_white')
fig.show()

- Avg. length of interaction is very similar
- probably the change of a interaction behavior has a higher influence

### Change of interaction behavior

We want to investigate the change of time spend per session to investigate the long time change of the interaction behavior, therefore we extract the sum of the lengths of all interactions or songs played for every session and take the last time stamp recorded for the id of the session. Then we calculate a simple linear fit for each user. If the gradient of the line is positive, the time spend per session is increasing and vice versa. The principle is shown for user 51 who canceled his subscription.

In [14]:
# Change of time per session id, example for user 51

canceled_user_ids =np.array(spark.sql("SELECT DISTINCT(userId) FROM data_cleaned WHERE userId IN \
                         (SELECT userId FROM data_cleaned WHERE page = 'Cancellation Confirmation')").collect())
non_canceled_user_ids =np.array(spark.sql("SELECT DISTINCT(userId) FROM data_cleaned WHERE userId NOT IN \
                         (SELECT userId FROM data_cleaned WHERE page = 'Cancellation Confirmation')").collect())

user = 51
arr = np.array(spark.sql(f"SELECT sessionId, SUM(length), MAX(ts)/1000/3600 FROM data_cleaned WHERE userId = {user} GROUP BY sessionId ORDER BY MAX(ts)").collect())

# Get time spend per individual session, calculate if positive or negative trend (linear regression)
model = np.polyfit(arr[:,2], arr[:,1], 1)
predict = np.poly1d(model)
x_pred = np.linspace(min(arr[:,2]), max(arr[:,2]),10)
y_pred = predict(x_pred)

fig = go.Figure()
fig.add_trace(go.Scatter(x=arr[:,2], y=arr[:,1], mode="lines+markers", name=f"Time spend per session user {user}"))
fig.add_trace(go.Scatter(x=x_pred, y=y_pred, mode="lines", name=f"Linear fit"))
fig.update_yaxes(title_text='Sum of lengths per interaction [s]')
fig.update_xaxes(title_text='Time stamp in [h]')
fig.update_layout(title_text=f"Change in time spend per session over time for user {user}",
                  template='plotly_white')
fig.show()


For this cancelling user the time spend per session is decreasing, the gradient of the line therefore negative. The gradients are now calculated for all individual users

In [15]:
# Calculate for all users
canceled_users_length_change = []
non_canceled_users_length_change = []

# Cancelling users

for user in tqdm(canceled_user_ids):
    user = int(user[0])
    arr = np.array(spark.sql(f"SELECT sessionId, SUM(length), MAX(ts)/1000/3600 FROM data_cleaned WHERE userId = {user} GROUP BY                                        sessionId ORDER BY MAX(ts)").collect())
    if len(arr)>1:
        model = np.polyfit(arr[:,2].astype(float), arr[:,1].astype(float), 1)
        canceled_users_length_change.append((user,model[0]))
    else:
        canceled_users_length_change.append((user,0))

# Non canceling users

for user in tqdm(non_canceled_user_ids):
    user = int(user[0])
    arr = np.array(spark.sql(f"SELECT sessionId, SUM(length), MAX(ts)/1000/3600 FROM data_cleaned WHERE userId = {user} GROUP BY                                        sessionId ORDER BY MAX(ts)").collect())
    if len(arr)>1:
        model = np.polyfit(arr[:,2].astype(float), arr[:,1].astype(float), 1)
        non_canceled_users_length_change.append((user,model[0]))
    else:
        non_canceled_users_length_change.append((user,0))

canceled_users_length_change = np.array(canceled_users_length_change)
non_canceled_users_length_change = np.array(non_canceled_users_length_change)
fig = go.Figure()
fig.add_trace(go.Box(y=canceled_users_length_change[:,1], name = 'cancelling users', ))  
fig.add_trace(go.Box(y=non_canceled_users_length_change[:,1], name = 'non cancelling users', ))
fig.update_yaxes(title_text='Trend of time spend per session')
fig.update_layout(title_text="Comparison of trend of time spend per session",
                  template='plotly_white')
fig.show()

100%|██████████| 52/52 [00:42<00:00,  1.22it/s]
100%|██████████| 173/173 [02:52<00:00,  1.00it/s]


We also want to investigate the change of time spend per session item for the last session, to consider the change over a short time. Therefore we extract lengths of all interactions or songs played for the session id with the highest time stamp. Then we calculate a simple linear fit for each user. If the gradient of the line is positive, the time spend per interaction is increasing and vice versa. The principle is shown for user 121 who canceled his subscription.

In [16]:
# Change of length of interaction for last session
user = 121
arr = np.array(spark.sql(f"SELECT sessionId, ts/1000/3600, length FROM data_cleaned WHERE userId = {user} AND \
            sessionId = (SELECT MAX(sessionId) FROM data_cleaned WHERE userId = {user}) AND length IS NOT NULL \
            ORDER BY ts DESC").collect())

# Get time spend per individual session, calculate if positive or negative trend (linear regression)
model = np.polyfit(arr[:,1], arr[:,2], 1)

predict = np.poly1d(model)
x_pred = np.linspace(min(arr[:,1]), max(arr[:,1]),10)
y_pred = predict(x_pred)

fig = go.Figure()
fig.add_trace(go.Scatter(x=arr[:,1], y=arr[:,2], mode="lines+markers", name=f"Time spend per session user {user}"))
fig.add_trace(go.Scatter(x=x_pred, y=y_pred, mode="lines", name=f"Linear fit"))
fig.update_yaxes(title_text='Sum of lengths per interaction [s]')
fig.update_xaxes(title_text='Time stamp in [h]')
fig.update_layout(title_text=f"Change in time spend per session over time for user {user}",
                  template='plotly_white')
fig.show()

In [17]:
# Calculate for all users
canceled_users_length_change_last_sess = []
non_canceled_users_length_change_last_sess = []

# Cancelling users

for user in tqdm(canceled_user_ids):
    user = int(user[0])
    arr = np.array(spark.sql(f"SELECT sessionId, ts/1000/3600, length FROM data_cleaned WHERE userId = {user} AND \
            sessionId = (SELECT MAX(sessionId) FROM data_cleaned WHERE userId = {user}) AND length IS NOT NULL \
            ORDER BY ts DESC").collect())
    if len(arr)>1:
        model = np.polyfit(arr[:,1].astype(float), arr[:,2].astype(float), 1)
        canceled_users_length_change_last_sess.append((user,model[0]))
    else:
        canceled_users_length_change_last_sess.append((user,0))

# Non canceling users

for user in tqdm(non_canceled_user_ids):
    user = int(user[0])
    arr = np.array(spark.sql(f"SELECT sessionId, ts/1000/3600, length FROM data_cleaned WHERE userId = {user} AND \
            sessionId = (SELECT MAX(sessionId) FROM data_cleaned WHERE userId = {user}) AND length IS NOT NULL \
            ORDER BY ts DESC").collect())
    if len(arr)>1:
        model = np.polyfit(arr[:,1].astype(float), arr[:,2].astype(float), 1)
        non_canceled_users_length_change_last_sess.append((user,model[0]))
    else:
        non_canceled_users_length_change_last_sess.append((user,0))

canceled_users_length_change_last_sess = np.array(canceled_users_length_change_last_sess)
non_canceled_users_length_change_last_sess = np.array(non_canceled_users_length_change_last_sess)
fig = go.Figure()
fig.add_trace(go.Box(y=canceled_users_length_change_last_sess[:,1], name = 'Cancelling users', ))  
fig.add_trace(go.Box(y=non_canceled_users_length_change_last_sess[:,1], name = 'Non-cancelling users', ))
fig.update_yaxes(title_text='Trend of time spend per interation')
fig.update_layout(title_text="Comparison of trend of time spend per interaction in last session",
                  template='plotly_white')
fig.show()

100%|██████████| 52/52 [01:52<00:00,  2.17s/it]
100%|██████████| 173/173 [06:18<00:00,  2.19s/it]


# Feature Engineering

## For the chosen aspects we decided for the following features: 

### Information and attributes of the user
    - Time difference between registration and last time stamp: To consider influence of time since registration
    - Gender: To consider influence of gender

### Change of interaction behavior:
    - Trend of time spend per session: To consider long-term intrest in the platform for users with a lot of interactions
    - Trend time spend in last interaction: To consider short-term intrest in the platform e.g. users that only use the platform for 1 or two session, or a sudden                                                  decission

### Liked content
    - Radio of Thumps up to thumps down: To consider overall liking of the offered music and the mood
    - Number of Songs in Playlist: To consider if content is liked and if longer interaction is anticipated

### Networking
    - Number of Friends: To consider if users network over the platform

### Malfunctions and erros of function
    - Number of interactions with error page: To consider effect of malfunctions of the plaform

In [18]:
# Create data frame with chose features
canceled_users_length_change = np.nan_to_num(canceled_users_length_change)
non_canceled_users_length_change = np.nan_to_num(non_canceled_users_length_change)
canceled_users_length_change_last_sess = np.nan_to_num(canceled_users_length_change_last_sess)
non_canceled_users_length_change_last_sess = np.nan_to_num(non_canceled_users_length_change_last_sess)

data_feat_eng = spark.sql(""" SELECT
                 userId, 
                 (MAX(ts)-MIN(registration))/1000/3600 AS time_since_reg,
                 MAX(CASE WHEN gender = 'M' Then 1 ELSE 0 END) AS gender,
                 SUM(CASE WHEN page = 'Add Friend' THEN 1 ELSE 0 END) AS num_friends,
                 SUM(CASE WHEN page = 'Add to Playlist' THEN 1 ELSE 0 END) AS songs_in_playlist,
                 NVL(SUM(CASE WHEN page = 'Thumbs Up' THEN 1 ELSE 0 END)/SUM(CASE WHEN page = 'Thumbs Down' THEN 1 ELSE 0 END),0) AS TU_TD_ratio,
                 SUM(CASE WHEN page = 'Error' THEN 1 ELSE 0 END) AS error_page_inter,
                 MAX(CASE WHEN page = 'Cancellation Confirmation' THEN 1 ELSE 0 END) AS churn
                 FROM data_cleaned GROUP BY userId
                """)

def length_trend(value):
    if float(value) in canceled_users_length_change[:, 0]: return float(round(canceled_users_length_change[np.where(canceled_users_length_change[:, 0]==float(value))[0][0],1],2))
    elif float(value) in non_canceled_users_length_change[:, 0]: return float(round(non_canceled_users_length_change[np.where(non_canceled_users_length_change[:, 0]==float(value))[0][0],1],2))
    else: None
def length_trend_last_session(value):
    if float(value) in canceled_users_length_change_last_sess[:, 0]: return float(round(canceled_users_length_change_last_sess[np.where(canceled_users_length_change_last_sess[:, 0]==float(value))[0][0],1],2))
    elif float(value) in non_canceled_users_length_change_last_sess[:, 0]: return float(round(non_canceled_users_length_change_last_sess[np.where(non_canceled_users_length_change_last_sess[:, 0]==float(value))[0][0],1],2))
    else: None


from pyspark.sql.functions import udf
from pyspark.sql.types import *
length_trend = udf(length_trend, FloatType())
length_trend_last_session = udf(length_trend_last_session, FloatType())
data_feat_eng = data_feat_eng.withColumn("length_change", length_trend('userId'))
data_feat_eng = data_feat_eng.withColumn("length_change_last_sess", length_trend_last_session('userId'))
# reorder columns
data_feat_eng = data_feat_eng.select("userId", "time_since_reg", "gender", "num_friends", "songs_in_playlist", "TU_TD_ratio" , "error_page_inter", "length_change", "length_change_last_sess", "churn")

# Modeling

For the modeling we use the pyspark Machine Learning library ml. We decided for two different algorithms, that are tuned using a grid search in a three fold cross valiation. Subsequent to this step the tuned models are tested on a hold-off data set. The data is split 0.75 % for cross valiation and 25 % for the subsequent testing. To be able to review the importance of the individual features for the classification and therefore for the churning of a user we decided for two algorithms calculate the feature importance: the Logistic Regression and the Decission Tree. For the assessment of the classification performance of the algorithms we use typical classification metrics: Confusion matrix, receiver operating characteristic curve (ROC) and its area under the curve, as well as a classification report. The confusion matrix lists the number of correlclty and falsely predicted canceling users (true positive TP and false positive FP) and non-canceling users (false negative FN and true negative TN). The ROC plots the rate  a cancelling user was correctly classified (true positive rate TPR or recall) against the rate the user was falsely calssified (false positive rate FPR) for different thresholds. These values are calculated using the TP, FP, FN and TN values described above, see the follwing equations:

$$recall, TPR = \frac{TP}{TP+FN}$$

$$FPR = \frac{FP}{FP+TP}$$

For a good classification the curve has to lie as close as possible to the axis for the rate of correct classification which results in a high AUC. The AUC is a discrete numeric value that gives information about the expectation that a random cancelling user is correctly classified. It directly gives information about the achieved classification quality. The classification report additional contains the positive predictive value (FPR) precission and the F1 score, the definition are given in the equations below.

$$precision, PPV = \frac{TP}{TP+FP}$$

$$F1 = \frac{2TP}{2TP+FP+FN}$$

The F1 score is the harmonic mean of the TPR and the PPV, for a good model the F1 score is close to one.


In [19]:
# Import mllip packages
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [20]:
# Assemble columns with VectorAssembler
assembler = VectorAssembler(inputCols=['time_since_reg',
                                        'gender',
                                        'num_friends',
                                        'songs_in_playlist',
                                        'TU_TD_ratio',
                                        'error_page_inter',
                                        'length_change',
                                        'length_change_last_sess'],
                            outputCol = 'Features')
data_feat_eng = assembler.transform(data_feat_eng)

# Normalize Vectors
scaler = Normalizer(inputCol="Features", outputCol="Scaled_Features")
scaler2 = StandardScaler(inputCol="Features",outputCol="Scaled_Features2")

scalerModel = scaler2.fit(data_feat_eng)
data_feat_eng = scaler.transform(data_feat_eng)
data_feat_eng = scalerModel.transform(data_feat_eng)
data_feat_eng.show()

+------+------------------+------+-----------+-----------------+------------------+----------------+-------------+-----------------------+-----+--------------------+--------------------+--------------------+
|userId|    time_since_reg|gender|num_friends|songs_in_playlist|       TU_TD_ratio|error_page_inter|length_change|length_change_last_sess|churn|            Features|     Scaled_Features|    Scaled_Features2|
+------+------------------+------+-----------+-----------------+------------------+----------------+-------------+-----------------------+-----+--------------------+--------------------+--------------------+
|100010|1335.4477777777777|     0|          4|                7|               3.4|               0|         5.74|                   8.29|    0|[1335.44777777777...|[0.99995003469118...|[1.47746907860760...|
|200002|1681.7911111111111|     1|          4|                8|               3.5|               0|        26.01|                  -4.63|    0|[1681.79111111111...|[0.

In [21]:
from pyspark.sql.functions import col
features_labels = data_feat_eng.select(col('Scaled_Features').alias('features'), col('churn').alias('label'))
# Shuffle data
cv_data, hold_off_data = features_labels.randomSplit([0.75, 0.25], seed=42)

In [22]:
# Cross validation using logistic regression
clf = LogisticRegression()

cv_grid = ParamGridBuilder() \
    .addGrid(clf.elasticNetParam,[0.0, 0.1]) \
    .addGrid(clf.regParam,[0.0, 0.1]) \
    .build()

cross_val = CrossValidator(estimator = clf,
                          estimatorParamMaps = cv_grid,
                          evaluator = BinaryClassificationEvaluator(),
                          numFolds = 3)
cv_model_lr = cross_val.fit(cv_data)

In [23]:
# Cross validation Support Vector Machine
clf = DecisionTreeClassifier()
cv_grid = ParamGridBuilder() \
    .addGrid(clf.maxDepth,[5, 10, 20]) \
    .addGrid(clf.impurity,['entropy','gini']) \
    .build()

cross_val = CrossValidator(estimator = clf,
                          estimatorParamMaps = cv_grid,
                          evaluator = BinaryClassificationEvaluator(),
                          numFolds = 3)
cv_model_dt = cross_val.fit(cv_data)

In [24]:
# Copied it https://stackoverflow.com/questions/52847408/pyspark-extract-roc-curve (08.05.2020)
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Scala version implements .roc() and .pr()
class CurveMetrics(BinaryClassificationMetrics):
    def __init__(self, *args):
        super(CurveMetrics, self).__init__(*args)

    def _to_list(self, rdd):
        points = []
        # Note this collect could be inefficient for large datasets 
        # considering there may be one probability per datapoint (at most)
        # The Scala version takes a numBins parameter, 
        # but it doesn't seem possible to pass this from Python to Java
        for row in rdd.collect():
            # Results are returned as type scala.Tuple2, 
            # which doesn't appear to have a py4j mapping
            points += [(float(row._1()), float(row._2()))]
        return points

    def get_curve(self, method):
        rdd = getattr(self._java_model, method)().toJavaRDD()
        return self._to_list(rdd)
    def get_value(self, method):
        rdd = getattr(self._java_model, method)()
        return rdd

In [25]:
# Calculate ROC and AUC
# CV data 
pred_cv_lr = cv_model_lr.transform(cv_data)
pred_cv_dt = cv_model_dt.transform(cv_data)
# Hold off data
pred_hold_off_lr = cv_model_lr.transform(hold_off_data)
pred_hold_off_dt = cv_model_dt.transform(hold_off_data)

pred_cv_lr_roc = pred_cv_lr.select('label','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['label'])))
fpr_tpr_cv_lr = CurveMetrics(pred_cv_lr_roc).get_curve('roc')
pred_cv_dt_roc = pred_cv_dt.select('label','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['label'])))
fpr_tpr_cv_dt = CurveMetrics(pred_cv_dt_roc).get_curve('roc')
pred_hold_off_lr_roc = pred_hold_off_lr.select('label','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['label'])))
fpr_tpr_hold_off_lr = CurveMetrics(pred_hold_off_lr_roc).get_curve('roc')
pred_hold_off_dt_roc = pred_hold_off_dt.select('label','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['label'])))
fpr_tpr_hold_off_dt  = CurveMetrics(pred_hold_off_dt_roc).get_curve('roc')



In [26]:
# Plot ROC Cuves and calculate AUC
from sklearn.metrics import auc

lr_cv_auc = CurveMetrics(pred_cv_lr_roc).get_value('areaUnderROC')
dt_cv_auc = CurveMetrics(pred_cv_dt_roc).get_value('areaUnderROC')
lr_hold_off_auc = CurveMetrics(pred_hold_off_lr_roc).get_value('areaUnderROC')
dt_hold_off_auc = CurveMetrics(pred_hold_off_dt_roc).get_value('areaUnderROC')

# Cross validation
fig = go.Figure()
fig.add_trace(go.Scatter(x = [x[0] for x in fpr_tpr_cv_lr], y = [x[1] for x in fpr_tpr_cv_lr], name = f'Logistic Regression AUC:{round(lr_cv_auc,2)}', mode = 'lines'))
fig.add_trace(go.Scatter(x = [x[0] for x in fpr_tpr_cv_dt], y = [x[1] for x in fpr_tpr_cv_dt], name = f'Decission Tree AUC: {round(dt_cv_auc,2)}', mode = 'lines'))
fig.add_trace(go.Scatter(x=[0, 1],y=[0,1], name = 'Unity Line', line_color='black', mode = 'lines'))
fig.update_xaxes(title_text='FPR')
fig.update_yaxes(title_text='TPR')
fig.update_layout(title_text="ROC Curve Cross Validation Data",
                  template='plotly_white')
fig.show()
# Hold-off Data
fig = go.Figure()
fig.add_trace(go.Scatter(x = [x[0] for x in fpr_tpr_hold_off_lr], y = [x[1] for x in fpr_tpr_hold_off_lr], name = f'Logistic Regression AUC: {round(lr_hold_off_auc,2)}', mode = 'lines'))
fig.add_trace(go.Scatter(x = [x[0] for x in fpr_tpr_hold_off_dt ], y = [x[1] for x in fpr_tpr_hold_off_dt ], name = f'Decission Tree AUC: {round(dt_hold_off_auc,2)}', mode = 'lines'))
fig.add_trace(go.Scatter(x=[0, 1],y=[0,1], name = 'Unity Line', line_color='black', mode = 'lines'))
fig.update_xaxes(title_text='FPR')
fig.update_yaxes(title_text='TPR')
fig.update_layout(title_text="ROC Curve Hold-Off Data",
                  template='plotly_white')
fig.show()


The comparison of the ROC curves and the calculted AUC clearly show, that the Decision Tree is the more suited algorithm. The ROC curves lie closer to the TPR axis and the AUC is significantly higher both in cross validation and in the subsequent testing using the hold-off data.

In [28]:
# Confusion Matrices and Classification Reports Cross Validation
from sklearn.metrics import classification_report, confusion_matrix
import plotly.figure_factory as ff
conv_mat_cv_lr = confusion_matrix(pred_cv_lr.select('label').collect(), pred_cv_lr.select('label').collect())
conv_mat_cv_dt = confusion_matrix(pred_cv_dt.select('label').collect(), pred_cv_dt.select('label').collect())

# Logistic Regression
z = conv_mat_cv_lr 
annotation_text=np.core.defchararray.add(np.array([['TP: ','FP: '],['FN: ','TN: ']]),z.astype(str))

fig = ff.create_annotated_heatmap(np.flipud(z), annotation_text = np.flipud(annotation_text), colorscale='Blues')
fig.update_layout(title_text="Confusion Matrix Logistic Regression Cross Validation",
                  template='plotly_white')
fig.show()
print(classification_report(pred_cv_lr.select('label').collect(), pred_cv_lr.select('label').collect()))

# Decission Tree
z = conv_mat_cv_dt 
annotation_text=np.core.defchararray.add(np.array([['TP: ','FP: '],['FN: ','TN: ']]),z.astype(str))

fig = ff.create_annotated_heatmap(np.flipud(z), annotation_text = np.flipud(annotation_text), colorscale='Blues')
fig.update_layout(title_text="Confusion Matrix Decission Tree Cross Validation",
                  template='plotly_white')
fig.show()
print(classification_report(pred_cv_dt.select('label').collect(), pred_cv_dt.select('label').collect()))

precision    recall  f1-score   support

           0       1.00      1.00      1.00       136
           1       1.00      1.00      1.00        42

    accuracy                           1.00       178
   macro avg       1.00      1.00      1.00       178
weighted avg       1.00      1.00      1.00       178



precision    recall  f1-score   support

           0       1.00      1.00      1.00       136
           1       1.00      1.00      1.00        42

    accuracy                           1.00       178
   macro avg       1.00      1.00      1.00       178
weighted avg       1.00      1.00      1.00       178



In [29]:
# Confusion Matrices and Classification Reports Hold Off
conv_mat_hold_off_lr = confusion_matrix(pred_hold_off_lr.select('label').collect(), pred_hold_off_lr.select('label').collect())
conv_mat_hold_off_dt = confusion_matrix(pred_hold_off_dt.select('label').collect(), pred_hold_off_dt.select('label').collect())

# Logistic Regression
z = conv_mat_hold_off_lr 
annotation_text=np.core.defchararray.add(np.array([['TP: ','FP: '],['FN: ','TN: ']]),z.astype(str))

fig = ff.create_annotated_heatmap(np.flipud(z), annotation_text = np.flipud(annotation_text), colorscale='Blues')
fig.update_layout(title_text="Confusion Matrix Logistic Regression Hold-off Data",
                  template='plotly_white')
fig.show()
print(classification_report(pred_hold_off_lr.select('label').collect(), pred_hold_off_lr.select('label').collect()))

# Decission Tree
z = conv_mat_hold_off_dt 
annotation_text=np.core.defchararray.add(np.array([['TP: ','FP: '],['FN: ','TN: ']]),z.astype(str))

fig = ff.create_annotated_heatmap(np.flipud(z), annotation_text = np.flipud(annotation_text), colorscale='Blues')
fig.update_layout(title_text="Confusion Matrix Decission Tree Hold-off Data",
                  template='plotly_white')
fig.show()
print(classification_report(pred_hold_off_dt.select('label').collect(), pred_hold_off_dt.select('label').collect()))


precision    recall  f1-score   support

           0       1.00      1.00      1.00        37
           1       1.00      1.00      1.00        10

    accuracy                           1.00        47
   macro avg       1.00      1.00      1.00        47
weighted avg       1.00      1.00      1.00        47



precision    recall  f1-score   support

           0       1.00      1.00      1.00        37
           1       1.00      1.00      1.00        10

    accuracy                           1.00        47
   macro avg       1.00      1.00      1.00        47
weighted avg       1.00      1.00      1.00        47



The other classification metrics show a perfect result for both the Logistic Regression and the Decision Tree. The confusion matrice show, that all users were classified corretly and the precission and the F1 score reached their maximum value of 1. Considering the results of the ROC curve and the AUC however, the Decision Tree clearly is the better suited algorithm.

For the comparison of the feature importance we display the weights calculated by the algorithms for the different features as bar plot. Since both algorithms rely on different principles only a qualitative comparison between the rankings is possible.

In [30]:
# Feature importance
from plotly.subplots import make_subplots
feature_names = data_feat_eng.columns[1:-4]

fig = make_subplots(rows=1, cols=2, shared_yaxes=True, subplot_titles=("Logistic Regression", "Decission Tree"))
fig.add_trace(go.Bar(x=[x for x in cv_model_lr.bestModel.coefficients], y=feature_names, orientation='h', name='Feature coefficients Logistic Regression'), col = 1, row =1)
fig.add_trace(go.Bar(x=[x for x in cv_model_dt.bestModel.featureImportances], y=feature_names, orientation='h', name='Feature importances Decission Tree'), col = 2, row =1)

fig.update_layout(title_text="Feature importances",
                  width=1000,
                  height=600,
                  template='plotly_white')
fig.show()

The Logistic Regressor rates the number of interactions with the error page and the number of songs as most important, while the other features are rated to have almost no importance. The Decision Tree on the other hands rates the number of interactions with the error page on rank 5 and 7, the time since the registration and the number of friends are rated much more important.

# Summary & Conclusion

In this project we analyzed data of user interactions with the music streaming platform “sparkify”. The goal was to analyze the data and try to understand factors that indicate if a user is likely to “churn” and cancel their paid subscription and chance to a free one. Based on those findings, features were engineered and a model was fitted that allows the classification of users based on their probability to churn. The data wrangling and modeling was done using the library [spark](https://spark.apache.org/docs/latest/api/python/index.html) and its Machine Learning library ml/mllib, which is allows data science for large amounts of data (big data).

The data consisted of a json document, that holds records of user interaction, that give information about the interaction (timestamp, session id), information about the user (user id, gender, name, location, registration time, level of subscription) and the interaction by the visited page (homepage, next song, advertisement, which song and artist ..). We read in the data as a spark data frame and checked it for duplicates, erroneous records etc. and cleaned it. We observed, that the data holds 225 unique users, of which 195 had a free subscription at one time, 165 a paid one, and 52 canceled their subscription. This churn was indicated by a interaction with the “Cancellation Confirmation” page. Next we analyzed the analyzed the data to find features that set cancelling users and non-cancelling users apart. We focus on features hold information about the users, their interaction behavior, their change in interaction behavior, possible malfunctions, if they like the contact and if they network over the platform. We decided for the following 8 engineered features: Gender, Time between registration and last time step, Ratio of Thumps Up to Thumps Down, Number of songs in Playlist, Trend time spend per session, Trend time spend in last session per song, Number of friends, Interactions with error page.

We engineered those features and assembled them into a feature vector and normalized them. For the modelling we tried two different classification algorithms the Logistic Regression and a Decision Tree with different configurations. We decided for those algorithms, because they calculate a rating of the different features for the classification. Based on that we can check which aspect has the most impact on a users decision to churn. To try different model configurations we implemented a three fold cross validation with a grid of different model parameters for hyper parameter tuning. For the cross validation we used 75 % of the data and 25 % for the subsequent validation of the tuned models.  The assess the models used typical classification metrics, like the confusion matrix and receiver operating characteristic curve (ROC) and its area under curve (AUC) and the F1 score. Both algorithms classified all users correctly both in cross validation and in the subsequent validation. For the confusion matrix and the F1 therefore perfect values were reached for both algorithms. The comparison of the ROC curve and the AUC however showed, that the Decision Tree is to be preferred over the Logistic Regression for this task. With respect to the feature importance both algorithms rate the gender as features with the highest importance. For the features the rating of the features contradict each other. The Logistic Regressor rates the number of interactions with the error page and the number of songs as most important, while the other features are rated to have almost no importance. The Decision Tree on the other hands rates the number of interactions with the error page on rank 5 and 7, the time since the registration and the number of friends are rated much more important.

All in all we reached good insightful results and were able to build a model that reached good classification scores. However it has to be noted, the number of unique users was pretty small with 225. In order to realy check the models for its capability classification should be carried out for several thousends of users. The utilzed features proved to usefull for the classification, hoever the data has to variety to engieer and investigate much more different features that might be more intuitive. Furthermore the algorithms were tuned with limited grid due to computational resources, therefore there is maybe also some room for improvement.