# Sparkify Project
#### Author: Pawel Wnuk Lipinski

The objective of the project is to forecast churn of popular music streaming platform users. The dataset provided contains detailed information on events of users behavoiur on the platform, including among others the page they are on, the music they listen (artist, song title), users location and timestamp. <br>
The project is based on a tiny subset (128MB) of the full dataset available (12GB), due to workspace capabilities constraints. <br>
The Notebook is split into the following parts:
1. Import libraries, create Spark session
2. Load and Clean Dataset
3. Exploratory Data Analysis
4. Feature Engineering
5. Modeling
6. Summary

## 1. Import libraries, create Spark session

In [1]:
# import pyspark libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import to_date
from pyspark.sql.functions import datediff
from pyspark.sql.functions import sum, avg, max, count
from pyspark.sql.functions import when
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler

# Import Python libraries
import datetime
import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score

In [2]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify Project") \
    .getOrCreate()

## 2. Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. 
The dataset is loaded and cleaned. It is also checked for invalid or missing data for example, records without userids or sessionids.

In [3]:
# Reading the JSON file
path = "mini_sparkify_event_data.json"
df_raw = spark.read.json(path)

In [4]:
# Looking at first 2 rows
df_raw.take(2)

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30'),
 Row(artist='Five Iron Frenzy', auth='Logged In', firstName='Micah', gender='M', itemInSession=79, lastName='Long', length=236.09424, level='free', location='Boston-Cambridge-Newton, MA-NH', method='PUT', page='NextSong', registration=1538331630000, sessionId=8, song='Canada', status=200, ts=1538352180000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"', userId='9')]

In [5]:
# Checking column names and data types
df_raw.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [6]:
# We have 286,500 rows in total
df_raw.count()

286500

In [7]:
# Checking userId summary statistics
df_raw.describe('userId').show()

+-------+------------------+
|summary|            userId|
+-------+------------------+
|  count|            286500|
|   mean| 59682.02278593872|
| stddev|109091.94999910535|
|    min|                  |
|    max|                99|
+-------+------------------+



In [8]:
# Checking sessioinId summary statistics
df_raw.describe('sessionId').show()

+-------+-----------------+
|summary|        sessionId|
+-------+-----------------+
|  count|           286500|
|   mean|1041.526554973822|
| stddev|726.7762634630799|
|    min|                1|
|    max|             2474|
+-------+-----------------+



In [9]:
# There ar no missing values in userId or sessionId, but there might be empty strings
df = df_raw.dropna(how = "any", subset = ["userId", "sessionId"])
df.count()

286500

In [10]:
# Removing rows with empty userId, as userId is the main user identifier we will use in ML modelling
df = df.filter(df["userId"] != "")
df.count()

278154

## 3. Exploratory Data Analysis

### 3.1. Exploring the columns
Checking each column individually - what it represents, what type and range of values it has and if it might be useful for churn prediction.

In [11]:
# list all available columns
df.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId']

#### artist

In [12]:
df.groupBy('artist').count().sort(desc('count')).show()

+--------------------+-----+
|              artist|count|
+--------------------+-----+
|                null|50046|
|       Kings Of Leon| 1841|
|            Coldplay| 1813|
|Florence + The Ma...| 1236|
|       Dwight Yoakam| 1135|
|            BjÃÂ¶rk| 1133|
|      The Black Keys| 1125|
|                Muse| 1090|
|       Justin Bieber| 1044|
|        Jack Johnson| 1007|
|              Eminem|  953|
|           Radiohead|  884|
|     Alliance Ethnik|  876|
|               Train|  854|
|        Taylor Swift|  840|
|         OneRepublic|  828|
|         The Killers|  822|
|         Linkin Park|  787|
|         Evanescence|  781|
|            Harmonia|  729|
+--------------------+-----+
only showing top 20 rows



In [13]:
df.select("artist").dropDuplicates().count()

17656

The column has artists' names. More than 17k unique values

#### auth

In [14]:
df.groupBy('auth').count().sort(desc('count')).show()

+---------+------+
|     auth| count|
+---------+------+
|Logged In|278102|
|Cancelled|    52|
+---------+------+



In [15]:
df_auth_cancelled = df.filter(df["auth"] == "Cancelled")
df_auth_cancelled.select("page").dropDuplicates().collect()

[Row(page='Cancellation Confirmation')]

auth column has just two columns 'Logged In' and 'Cancelled'. Records with 'auth' == 'Cancelled' represent page "Cancellation Confirmation".

#### firstName

In [16]:
df.select("firstName").dropDuplicates().count()

189

In [17]:
df.groupBy('firstName').count().sort(desc('count')).show(10)

+---------+-----+
|firstName|count|
+---------+-----+
|   Payton| 9632|
|    Riley| 7970|
|   Lucero| 6880|
|   Emilia| 5732|
|     Emma| 5478|
|   Joseph| 5209|
|    Colin| 4989|
|   Nicole| 4825|
|   Joshua| 4654|
|  Brayden| 4621|
+---------+-----+
only showing top 10 rows



The name of the user should be irrelevant for the ML model

#### gender

In [18]:
df.groupBy('gender').count().sort(desc('count')).show()

+------+------+
|gender| count|
+------+------+
|     F|154578|
|     M|123576|
+------+------+



In [19]:
df.select("gender").describe().show()

+-------+------+
|summary|gender|
+-------+------+
|  count|278154|
|   mean|  null|
| stddev|  null|
|    min|     F|
|    max|     M|
+-------+------+



Gender is complete, with no Nulls. There are more females than males in the dataset. It can be a good feature column for ML model

#### itemInSession

In [20]:
df.groupBy('itemInSession').count().sort(desc('count')).show(10)

+-------------+-----+
|itemInSession|count|
+-------------+-----+
|            2| 2941|
|            3| 2917|
|            4| 2887|
|            5| 2836|
|            1| 2818|
|            6| 2803|
|            7| 2776|
|            8| 2735|
|            9| 2693|
|            0| 2689|
+-------------+-----+
only showing top 10 rows



The number of items in session is quite evenly distributed. This column can be potentially used as a feature

#### lastName

In [21]:
df.groupBy('lastName').count().sort(desc('count')).show(5)

+--------+-----+
|lastName|count|
+--------+-----+
|Campbell|14060|
|    Reed| 9284|
|Williams| 8410|
|  Taylor| 7230|
| Johnson| 6106|
+--------+-----+
only showing top 5 rows



In [22]:
df.select("lastName").dropDuplicates().count()

173

Not a good column for a feature in ML model

#### length

In [23]:
df.select("length").take(10)

[Row(length=277.89016),
 Row(length=236.09424),
 Row(length=282.8273),
 Row(length=262.71302),
 Row(length=223.60771),
 Row(length=208.29995),
 Row(length=260.46649),
 Row(length=185.44281),
 Row(length=None),
 Row(length=134.47791)]

In [24]:
df.describe('length').show()

+-------+------------------+
|summary|            length|
+-------+------------------+
|  count|            228108|
|   mean|249.11718197783375|
| stddev| 99.23517921058324|
|    min|           0.78322|
|    max|        3024.66567|
+-------+------------------+



In [25]:
# around 50k records are Nulls
df.filter(df['length'].isNull()).count()

50046

In [26]:
# Checking an example user
df.select(["userId", "length", "page", "sessionId"]).where(df.userId == "119").collect()

[Row(userId='119', length=None, page='Home', sessionId=437),
 Row(userId='119', length=147.04281, page='NextSong', sessionId=437),
 Row(userId='119', length=290.48118, page='NextSong', sessionId=437),
 Row(userId='119', length=294.84363, page='NextSong', sessionId=437),
 Row(userId='119', length=189.6224, page='NextSong', sessionId=437),
 Row(userId='119', length=288.57424, page='NextSong', sessionId=437),
 Row(userId='119', length=215.06567, page='NextSong', sessionId=437),
 Row(userId='119', length=264.93342, page='NextSong', sessionId=437),
 Row(userId='119', length=262.47791, page='NextSong', sessionId=437),
 Row(userId='119', length=None, page='Roll Advert', sessionId=437),
 Row(userId='119', length=None, page='Home', sessionId=633),
 Row(userId='119', length=425.01179, page='NextSong', sessionId=633),
 Row(userId='119', length=None, page='Add to Playlist', sessionId=633),
 Row(userId='119', length=None, page='Upgrade', sessionId=633),
 Row(userId='119', length=518.81751, page='Ne

If page is 'NextSong', length represents the song's length. Potentially useful column, partially with Nulls

#### level

In [27]:
df.groupBy('level').count().sort(desc('count')).show(5)

+-----+------+
|level| count|
+-----+------+
| paid|222433|
| free| 55721|
+-----+------+



In [28]:
df.filter(df['level'].isNull()).count()

0

Columns distinguishing paid users from free users.

#### location

In [29]:
df.groupBy('location').count().sort(desc('count')).show(15)

+--------------------+-----+
|            location|count|
+--------------------+-----+
|Los Angeles-Long ...|30131|
|New York-Newark-J...|23684|
|Boston-Cambridge-...|13873|
|Houston-The Woodl...| 9499|
|Charlotte-Concord...| 7780|
|Dallas-Fort Worth...| 7605|
|Louisville/Jeffer...| 6880|
|Philadelphia-Camd...| 5890|
|Chicago-Napervill...| 5114|
|    St. Louis, MO-IL| 4858|
|Phoenix-Mesa-Scot...| 4846|
|Vineland-Bridgeto...| 4825|
|          Wilson, NC| 4659|
|Denver-Aurora-Lak...| 4453|
|           Ionia, MI| 4428|
+--------------------+-----+
only showing top 15 rows



In [30]:
df.select('location').dropDuplicates().count()

114

114 unique locations. That would be a difficulat feature to be used

#### method

In [31]:
df.groupBy('method').count().sort(desc('count')).show(15)

+------+------+
|method| count|
+------+------+
|   PUT|257818|
|   GET| 20336|
+------+------+



It is hard to interpret the usefulness of this column

#### page

In [32]:
df.groupBy('page').count().sort(desc('count')).show(20)

+--------------------+------+
|                page| count|
+--------------------+------+
|            NextSong|228108|
|           Thumbs Up| 12551|
|                Home| 10082|
|     Add to Playlist|  6526|
|          Add Friend|  4277|
|         Roll Advert|  3933|
|              Logout|  3226|
|         Thumbs Down|  2546|
|           Downgrade|  2055|
|            Settings|  1514|
|                Help|  1454|
|             Upgrade|   499|
|               About|   495|
|       Save Settings|   310|
|               Error|   252|
|      Submit Upgrade|   159|
|    Submit Downgrade|    63|
|Cancellation Conf...|    52|
|              Cancel|    52|
+--------------------+------+



In [33]:
df.filter(df['page'].isNull()).count()

0

In [34]:
df.select('page').dropDuplicates().count()

19

Page has 19 possible different values. It is a potentially good column for both feature creation and churn definition

#### registration

In [35]:
df.groupBy('registration').count().sort(desc('count')).show(5)

+-------------+-----+
| registration|count|
+-------------+-----+
|1529027541000| 9632|
|1536403972000| 7230|
|1536642109000| 6880|
|1538336771000| 5732|
|1532224335000| 4825|
+-------------+-----+
only showing top 5 rows



In [36]:
df.select('registration').dropDuplicates().count()

225

In [37]:
df.filter(df['registration'].isNull()).count()

0

In [38]:
# Converting registration into readible format of registration_date
get_date = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime('%Y-%m-%d'))
df = df.withColumn("registration_date", get_date(df.registration))

The column represents registration time of each user. Therefore for a particular user there is one unique date. Registration date might be useful as a feature, we will keep it

#### sessionId

In [39]:
df.select('sessionId').dropDuplicates().count()

2312

In [40]:
df.filter(df['sessionId'].isNull()).count()

0

In [41]:
df.select('sessionId').take(5)

[Row(sessionId=29),
 Row(sessionId=8),
 Row(sessionId=29),
 Row(sessionId=8),
 Row(sessionId=29)]

There are more sessionIds than userIds which means that each user can have multiple sessions. Potentially the number of sessionIds for a user can be a feature

#### song

In [42]:
df.select('song').dropDuplicates().count()

58481

In [43]:
df.select('song').take(10)

[Row(song='Rockpools'),
 Row(song='Canada'),
 Row(song='Time For Miracles'),
 Row(song='Knocking On Forbidden Doors'),
 Row(song='Harder Better Faster Stronger'),
 Row(song="Don't Leave Me"),
 Row(song='Run Run Run'),
 Row(song='Passengers (Old Album Version)'),
 Row(song=None),
 Row(song='Fuck Kitty')]

This column represents the song's name

#### status

In [44]:
df.select('status').dropDuplicates().count()

3

In [45]:
df.select('status').take(10)

[Row(status=200),
 Row(status=200),
 Row(status=200),
 Row(status=200),
 Row(status=200),
 Row(status=200),
 Row(status=200),
 Row(status=200),
 Row(status=200),
 Row(status=200)]

In [46]:
df.filter(df['status'].isNull()).count()

0

In [47]:
df.groupBy('status').count().sort(desc('count')).show(20)

+------+------+
|status| count|
+------+------+
|   200|254718|
|   307| 23184|
|   404|   252|
+------+------+



We will keep this column for further exploration

#### ts

In [48]:
df.select('ts').take(5)

[Row(ts=1538352117000),
 Row(ts=1538352180000),
 Row(ts=1538352394000),
 Row(ts=1538352416000),
 Row(ts=1538352676000)]

In [49]:
df = df.withColumn("ts_date", get_date(df.ts))

In [50]:
df.take(2)

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30', registration_date='2018-09-28', ts_date='2018-10-01'),
 Row(artist='Five Iron Frenzy', auth='Logged In', firstName='Micah', gender='M', itemInSession=79, lastName='Long', length=236.09424, level='free', location='Boston-Cambridge-Newton, MA-NH', method='PUT', page='NextSong', registration=1538331630000, sessionId=8, song='Canada', status=200, ts=1538352180000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"', userId='9', registration_date='2018-09-30', ts_date='2018-10-01')]

Timestamp converted into a readable format might be useful

#### userAgent

In [51]:
df.select('userAgent').dropDuplicates().count()

56

In [52]:
df.groupBy('userAgent').count().sort(desc('count')).show(5)

+--------------------+-----+
|           userAgent|count|
+--------------------+-----+
|"Mozilla/5.0 (Win...|22751|
|"Mozilla/5.0 (Mac...|19611|
|"Mozilla/5.0 (Mac...|18448|
|"Mozilla/5.0 (Mac...|17348|
|Mozilla/5.0 (Wind...|16700|
+--------------------+-----+
only showing top 5 rows



It is not clear what this column actually is

#### userId

In [53]:
df.select('userId').dropDuplicates().count()

225

225 users will be analyzed

### 3.2. Churn definition

To create ML model for customer churn, the churn must be defined.
When user goes to page `Cancellation Confirmation` he quits and there are no more records for the user after this event.
Therefore the `Cancellation Confirmation` page is used to define `Churn` label. 

#### Defining churn:
* 'churn' column = 1, when user churned i.e. the 'Cancellation Confirmation' event exists under his userId
* 'churn' column = 0, when user hasn't churned i.e. the 'Cancellation Confirmation' event doesn't exist under his userId

In [54]:
# Preparing a list of user_ids for users who churned
user_ids_cancelled = df.select('userId').where(df.page == 'Cancellation Confirmation').dropDuplicates().collect()
user_ids_cancelled = [int(row.userId) for row in user_ids_cancelled]
user_ids_cancelled[:5]

[125, 51, 54, 100014, 101]

In [55]:
df = df.withColumn("churn", when(df.userId.isin(user_ids_cancelled), 1).otherwise(0))

### 3.3. Explore Data to look for relationship with churn

Exploring the columns to observe the behavior for users who stayed vs users who churned. The analysis will help in creating features for ML model.

#### artist

In [56]:
df_distinct_artists = df.groupBy(["churn", "userId"]).agg(countDistinct("artist")).groupby("churn").agg({'count(DISTINCT artist)': 'mean'})
df_distinct_artists.show()

+-----+---------------------------+
|churn|avg(count(DISTINCT artist))|
+-----+---------------------------+
|    1|          518.6923076923077|
|    0|          749.7861271676301|
+-----+---------------------------+



On average users who churn listen to lower number of distinct artists

#### gender

In [57]:
df_gender = df.groupBy(["gender", "churn"]).agg(countDistinct("userId")).sort("gender")
df_gender.show()

+------+-----+----------------------+
|gender|churn|count(DISTINCT userId)|
+------+-----+----------------------+
|     F|    1|                    20|
|     F|    0|                    84|
|     M|    1|                    32|
|     M|    0|                    89|
+------+-----+----------------------+



In [58]:
F_0 = df_gender.filter(df_gender['gender'] == 'F').filter(df_gender['churn'] == '0').collect()[0].asDict()['count(DISTINCT userId)']
F_1 = df_gender.filter(df_gender['gender'] == 'F').filter(df_gender['churn'] == '1').collect()[0].asDict()['count(DISTINCT userId)']
M_0 = df_gender.filter(df_gender['gender'] == 'M').filter(df_gender['churn'] == '0').collect()[0].asDict()['count(DISTINCT userId)']
M_1 = df_gender.filter(df_gender['gender'] == 'M').filter(df_gender['churn'] == '1').collect()[0].asDict()['count(DISTINCT userId)']

In [59]:
print(f'Probability of churning of female user is {(F_1 / (F_0 + F_1)):.2f}')
print(f'Probability of churning of male user is {(M_1 / (M_0 + M_1)):.2f}')

Probability of churning of female user is 0.19
Probability of churning of male user is 0.26


Gender can be helpful as a feature in the ML model.

#### itemInSession

In [60]:
# Relationship between sessionId and ItemInSession for a selected user
df.filter(df['userId'] == '30').select(['sessionId', 'itemInSession']).collect()

[Row(sessionId=29, itemInSession=50),
 Row(sessionId=29, itemInSession=51),
 Row(sessionId=29, itemInSession=52),
 Row(sessionId=29, itemInSession=53),
 Row(sessionId=29, itemInSession=54),
 Row(sessionId=29, itemInSession=55),
 Row(sessionId=29, itemInSession=56),
 Row(sessionId=29, itemInSession=57),
 Row(sessionId=29, itemInSession=58),
 Row(sessionId=29, itemInSession=59),
 Row(sessionId=29, itemInSession=60),
 Row(sessionId=29, itemInSession=61),
 Row(sessionId=29, itemInSession=62),
 Row(sessionId=29, itemInSession=63),
 Row(sessionId=29, itemInSession=64),
 Row(sessionId=29, itemInSession=65),
 Row(sessionId=29, itemInSession=66),
 Row(sessionId=29, itemInSession=67),
 Row(sessionId=29, itemInSession=68),
 Row(sessionId=29, itemInSession=69),
 Row(sessionId=29, itemInSession=70),
 Row(sessionId=29, itemInSession=71),
 Row(sessionId=29, itemInSession=72),
 Row(sessionId=29, itemInSession=73),
 Row(sessionId=29, itemInSession=74),
 Row(sessionId=29, itemInSession=75),
 Row(session

One session can have many itemInSession elements

In [61]:
df_itemInSessioncount = df.groupBy(["churn", "userId", "sessionId"]).agg({'itemInSession': 'count'}).sort('userId')
df_itemInSessioncount.show()

+-----+------+---------+--------------------+
|churn|userId|sessionId|count(itemInSession)|
+-----+------+---------+--------------------+
|    0|    10|     1592|                  78|
|    0|    10|        9|                  70|
|    0|    10|     1414|                  78|
|    0|    10|      595|                 449|
|    0|    10|     1047|                  25|
|    0|    10|     1981|                  95|
|    0|   100|     2075|                 235|
|    0|   100|     1760|                   1|
|    0|   100|      369|                 108|
|    0|   100|     1473|                 204|
|    0|   100|     1590|                 313|
|    0|   100|     1782|                   7|
|    0|   100|     2428|                  91|
|    0|   100|     1944|                 222|
|    0|   100|      256|                 177|
|    0|   100|     1231|                  18|
|    0|   100|     1269|                   5|
|    0|   100|     2304|                 196|
|    0|   100|     1707|          

In [62]:
df_itemInSessioncount2 = df_itemInSessioncount.groupBy(["churn", "userId"]).agg({'count(itemInSession)': 'mean'}).sort('userId')
df_itemInSessioncount2.show()

+-----+------+-------------------------+
|churn|userId|avg(count(itemInSession))|
+-----+------+-------------------------+
|    0|    10|                    132.5|
|    0|   100|        91.82857142857142|
|    1|100001|                    46.75|
|    0|100002|                     54.5|
|    1|100003|                     39.0|
|    0|100004|       59.285714285714285|
|    1|100005|                     43.2|
|    1|100006|                     44.0|
|    1|100007|        57.77777777777778|
|    0|100008|       156.66666666666666|
|    1|100009|                     67.1|
|    0|100010|        54.42857142857143|
|    1|100011|                     23.0|
|    1|100012|        85.71428571428571|
|    1|100013|        99.42857142857143|
|    1|100014|       51.666666666666664|
|    1|100015|                     87.5|
|    0|100016|                    79.75|
|    1|100017|                     75.0|
|    0|100018|       61.333333333333336|
+-----+------+-------------------------+
only showing top

In [63]:
df_itemInSessioncount2.groupBy("churn").agg({'avg(count(itemInSession))': 'mean'}).sort('churn').show()

+-----+------------------------------+
|churn|avg(avg(count(itemInSession)))|
+-----+------------------------------+
|    0|             86.99517396743398|
|    1|             77.61705074052895|
+-----+------------------------------+



Users who churn have on average less items per session

#### length

In [64]:
df_length = df.groupBy(["churn", "userId"]).agg({'length': 'sum'}).sort('churn')
df_length.show()

+-----+------+------------------+
|churn|userId|       sum(length)|
+-----+------+------------------+
|    0|100008|      191396.79602|
|    0|200022| 85942.33360000003|
|    0|300002|402107.92633999983|
|    0|300008|348739.33905000007|
|    0|200012|       16472.36843|
|    0|100010|       66940.89735|
|    0|    23|164815.40200000012|
|    0|    80| 89471.02372999999|
|    0|    97| 491231.4963499998|
|    0|    39|1991154.9155799998|
|    0|   149| 48136.58831000002|
|    0|200019|121838.16904000004|
|    0|    93|      157811.98088|
|    0|200007|15739.869310000002|
|    0|   114|      327836.51452|
|    0|200003|184388.15249999988|
|    0|    50|122751.27560999998|
|    0|    82| 424932.7910600001|
|    0|    14|304778.64878000005|
|    0|     8| 63393.53190999998|
+-----+------+------------------+
only showing top 20 rows



In [65]:
df_length.groupBy('churn').agg({'sum(length)': 'mean'}).sort('churn').show()

+-----+------------------+
|churn|  avg(sum(length))|
+-----+------------------+
|    0| 276166.9374678035|
|    1|174014.26855134612|
+-----+------------------+



On average the sum of length of the songs played is shorter for users which churned compared to users who stayed

#### page

Page can have 19 unique values. Let's check if the fact that user encountered Error can influence the usere's churn.

In [66]:
user_ids_error = df.select('userId').where(df.page == 'Error').dropDuplicates().collect()
user_ids_error = [int(row.userId) for row in user_ids_error]
df = df.withColumn("error_encountered", when(df.userId.isin(user_ids_error), 1).otherwise(0))
df.groupBy(["churn", "userId"]).agg({'error_encountered': 'mean'}).groupBy('churn').agg({'avg(error_encountered)': 'mean'}).show()

+-----+---------------------------+
|churn|avg(avg(error_encountered))|
+-----+---------------------------+
|    1|         0.4230769230769231|
|    0|         0.5491329479768786|
+-----+---------------------------+



Encountering an error does not imply quitting from Sparkify

#### sessionId

In [67]:
df.groupBy(["churn", "userId"]).agg({'sessionId': 'count'}).groupBy('churn').agg({'count(sessionId)': 'mean'}).show()

+-----+---------------------+
|churn|avg(count(sessionId))|
+-----+---------------------+
|    1|    862.7692307692307|
|    0|   1348.4971098265896|
+-----+---------------------+



Useres who churn have much less sessions

#### timestamp information
In the dateset there are two column with timestamp information: registration time and event time. Using this information we can calculate how long the user stays in Sparkify.

In [68]:
df = df.withColumn("registration_date_datetime", to_date(df.registration_date, "yyyy-MM-dd"))
df = df.withColumn("ts_date_datetime", to_date(df.ts_date, "yyyy-MM-dd"))

In [69]:
df_dates = df.groupBy(['churn', 'userId']).agg({'registration_date_datetime': 'max', 'ts_date_datetime': 'max'})
df_datediff = df_dates.withColumn('date_diff', datediff("max(ts_date_datetime)", "max(registration_date_datetime)"))
df_datediff.show()

+-----+------+---------------------+-------------------------------+---------+
|churn|userId|max(ts_date_datetime)|max(registration_date_datetime)|date_diff|
+-----+------+---------------------+-------------------------------+---------+
|    0|   114|           2018-11-23|                     2018-09-13|       71|
|    0|    39|           2018-11-30|                     2018-06-15|      168|
|    1|    12|           2018-10-22|                     2018-08-10|       73|
|    1|   103|           2018-11-04|                     2018-09-23|       42|
|    0|200012|           2018-11-30|                     2018-09-26|       65|
|    0|    82|           2018-11-28|                     2018-09-14|       75|
|    0|    14|           2018-11-27|                     2018-07-18|      132|
|    0|    80|           2018-11-28|                     2018-09-30|       59|
|    0|    97|           2018-11-30|                     2018-09-04|       87|
|    0|     8|           2018-11-30|                

In [70]:
df_datediff.groupBy(['churn']).agg({'date_diff': 'mean'}).show()

+-----+-----------------+
|churn|   avg(date_diff)|
+-----+-----------------+
|    1|57.36538461538461|
|    0|86.56647398843931|
+-----+-----------------+



Useres who churn do it on average 57 days after registration

## 4. Feature Engineering

After the detailed dataset analysis, the next step is to create features that can be helpful in user churn prediction.

In [71]:
# The core is the DataFrame with userId and churn information. Features will be added to this core subsequently.
df_main = df.select(['userId', 'churn', 'gender']).dropDuplicates().sort('userId')

In [72]:
# male feature
df_main = df_main.withColumn("male", when(df_main.gender == 'M', 1).otherwise(0))
df_main = df_main.drop("gender")

In [73]:
# Feature representing number of distinct artists listened by a particular user
df_distinct_artists = df.groupBy("userId").agg(countDistinct("artist").alias('artists_count')).sort('userId')

df_main = df_main.join(df_distinct_artists,
            ['userId'], "left")

In [74]:
# Feature representing average number of items in user's session
df_itemInSession = df.groupBy(["userId", "sessionId"]).agg({'itemInSession': 'count'}).groupBy("userId").agg(avg('count(itemInSession)').alias('avg_session_items'))
df_main = df_main.join(df_itemInSession,
            ['userId'], "left")

In [75]:
# Feature representing the period of time each user uses Sparkify
df_main = df_main.join(df_datediff.select(['userId', 'date_diff']),
            ['userId'], "left")

In [76]:
# After the DataFrame is complete with all features, userId is not necessary and can be dropped
df_main = df_main.drop("userId")

In [77]:
df_main.printSchema()

root
 |-- churn: integer (nullable = false)
 |-- male: integer (nullable = false)
 |-- artists_count: long (nullable = true)
 |-- avg_session_items: double (nullable = true)
 |-- date_diff: integer (nullable = true)



In [78]:
df_main.show()

+-----+----+-------------+------------------+---------+
|churn|male|artists_count| avg_session_items|date_diff|
+-----+----+-------------+------------------+---------+
|    0|   0|          252| 54.42857142857143|       55|
|    0|   1|          339|              79.0|       70|
|    1|   1|            8|              11.0|       72|
|    0|   0|         2232| 166.3793103448276|      131|
|    1|   1|         1385|             246.4|       20|
|    0|   1|          142|28.714285714285715|       73|
|    0|   1|         1302|151.86666666666667|       57|
|    1|   0|         1744| 92.89189189189189|      110|
|    0|   0|          643|             167.0|       24|
|    1|   1|          233|51.666666666666664|       85|
|    0|   0|         1299|             144.0|       67|
|    0|   0|           78|39.333333333333336|       24|
|    1|   1|         1241|             214.9|       54|
|    0|   0|          534|              53.0|      125|
|    0|   1|         1332|             164.6|   

## 5. Modeling
In this part the data will be split into test and train datasets. Two machine learning models will be run. The best model will be chosen based on accuracy and F1-score. 
The cross validation was not run due to long time and often disconnection of Udacity Workspace.

In [79]:
# Vectorizing the data into a new column "features" 
assembler = VectorAssembler(inputCols=['artists_count',
                                       'avg_session_items',
                                       'date_diff',
                                       'male'],
                                    outputCol='features')

In [80]:
# Splitting the data into train and test
train_data, test_data = df_main.randomSplit([0.7, 0.3], seed=1)

In [81]:
#Checking how balanced is the dataset
number_of_churned_users = df_main.filter(df_main['churn'] == 1).count()
number_of_not_churned_users = df_main.filter(df_main['churn'] == 0).count()

print(f'{number_of_churned_users} users churned')
print(f'{number_of_not_churned_users} users did not churn')
print(f'proportion of users who churned: {number_of_churned_users / (number_of_churned_users + number_of_not_churned_users)}')

52 users churned
173 users did not churn
proportion of users who churned: 0.2311111111111111


#### Logistic Regression

In [82]:
log_reg = LogisticRegression(featuresCol='features',
                             labelCol='churn')
  
# Creating the pipeline
pipe = Pipeline(stages=[assembler, log_reg])

In [83]:
# Fitting the model on training data
fit_model = pipe.fit(train_data)
  
# Storing the results on test data
results = fit_model.transform(test_data)
  
# Showing the results
results.take(2)

[Row(churn=0, male=1, artists_count=142, avg_session_items=28.714285714285715, date_diff=73, features=DenseVector([142.0, 28.7143, 73.0, 1.0]), rawPrediction=DenseVector([0.9098, -0.9098]), probability=DenseVector([0.713, 0.287]), prediction=0.0),
 Row(churn=1, male=1, artists_count=1385, avg_session_items=246.4, date_diff=20, features=DenseVector([1385.0, 246.4, 20.0, 1.0]), rawPrediction=DenseVector([0.1604, -0.1604]), probability=DenseVector([0.54, 0.46]), prediction=0.0)]

In [84]:
results.printSchema()

root
 |-- churn: integer (nullable = false)
 |-- male: integer (nullable = false)
 |-- artists_count: long (nullable = true)
 |-- avg_session_items: double (nullable = true)
 |-- date_diff: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [85]:
# Conversion to pandas DataFrames for the purpose of optimization metrics calculation
y_predict = results.select('prediction').toPandas()
y_test = results.select('churn').toPandas()

In [86]:
cm = confusion_matrix(y_test, y_predict)
print("Confusion Matrix:")
print(cm)

Confusion Matrix:
[[44  3]
 [10  3]]


In [87]:
# Calculate accuracy, precision, recall, and F1 score
acc = accuracy_score(y_test, y_predict)
prec = precision_score(y_test, y_predict, average='weighted')
rec = recall_score(y_test, y_predict, average='weighted')
f1 = f1_score(y_test, y_predict, average='weighted')

print(f"Accuracy: {acc:.3f}")
print(f"Precision: {prec:.3f}")
print(f"Recall: {rec:.3f}")
print(f"F1 Score: {f1:.3f}")

Accuracy: 0.783
Precision: 0.747
Recall: 0.783
F1 Score: 0.751


#### Random Forest

In [88]:

rf = RandomForestClassifier(featuresCol='features',
                             labelCol='churn', 
                            numTrees=30, maxDepth=30)
pipeline = Pipeline(stages=[assembler, rf])

In [89]:
# Fitting the model on training data
fit_model = pipeline.fit(train_data)
  
# Storing the results on test data
results = fit_model.transform(test_data)
  
# Showing the results
results.take(2)

[Row(churn=0, male=1, artists_count=142, avg_session_items=28.714285714285715, date_diff=73, features=DenseVector([142.0, 28.7143, 73.0, 1.0]), rawPrediction=DenseVector([26.96, 3.04]), probability=DenseVector([0.8987, 0.1013]), prediction=0.0),
 Row(churn=1, male=1, artists_count=1385, avg_session_items=246.4, date_diff=20, features=DenseVector([1385.0, 246.4, 20.0, 1.0]), rawPrediction=DenseVector([9.0, 21.0]), probability=DenseVector([0.3, 0.7]), prediction=1.0)]

In [90]:
# Conversion to pandas DataFrames for the purpose of optimization metrics calculation
y_predict = results.select('prediction').toPandas()
y_test = results.select('churn').toPandas()

In [91]:
cm = confusion_matrix(y_test, y_predict)
print("Confusion Matrix:")
print(cm)

Confusion Matrix:
[[42  5]
 [ 8  5]]


In [92]:
# Calculate accuracy, precision, recall, and F1 score
acc = accuracy_score(y_test, y_predict)
prec = precision_score(y_test, y_predict, average='weighted')
rec = recall_score(y_test, y_predict, average='weighted')
f1 = f1_score(y_test, y_predict, average='weighted')

print(f"Accuracy: {acc:.3f}")
print(f"Precision: {prec:.3f}")
print(f"Recall: {rec:.3f}")
print(f"F1 Score: {f1:.3f}")

Accuracy: 0.783
Precision: 0.766
Recall: 0.783
F1 Score: 0.773


## 6. Summary
Two machine learning models were run. Random Forest provides better results according to F1-score. The F1-score was chosen as optimization metric because it presents the model’s balanced ability to both capture positive cases (recall) and be accurate with the cases it does capture (precision). F1-score of about 80% indicates that the model performance on unseen data is very good. <br>
The following improvements can be made to get a better model:
* develop new features. For example a good candidate is a `page` column representing 19 events. Occurence of some of those events might indicate user behaviour that helps predicting its' churn.
* try other machine learning models. For example Gradient-boosted trees, SVMs.
* check for optimal hyperparameters for the currently checked and new models.