# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [71]:
# import libraries
import datetime
import plotly.graph_objs as go
from plotly.subplots import make_subplots
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, concat, count, col, desc, from_unixtime, isnan, lag, lit, udf, split, weekofyear
from pyspark.sql.functions import max as Fmax
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.types import IntegerType
from pyspark.sql import Window

In [2]:
# create a Spark session

spark = SparkSession \
.builder \
.appName("sparkify") \
.getOrCreate() 

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

**Load the dataset**

In [3]:
#Let's load the data and observe what it looks like

data_path = 'mini_sparkify_event_data.json'
event_data = spark.read.json(data_path)

In [4]:
event_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [5]:
event_data.count()

286500

In [6]:
event_data.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')

**Get familiar with the dataset**

In [7]:
# Let's see what are the categorical features in our dataset

cat_cols = []
num_cols = []

for i in range(len(event_data.dtypes)):
    if event_data.dtypes[i][1] == 'string':
        cat_cols.append(event_data.dtypes[i][0])
    else:
        num_cols.append(event_data.dtypes[i][0])

print('Categorical columns: ')
print(cat_cols)
print('\n')
print('Numerical columns: ')
print(num_cols)

Categorical columns: 
['artist', 'auth', 'firstName', 'gender', 'lastName', 'level', 'location', 'method', 'page', 'song', 'userAgent', 'userId']


Numerical columns: 
['itemInSession', 'length', 'registration', 'sessionId', 'status', 'ts']


Now let's take a look at the values some of those categorical features can have...
We are mainly interested in the following columns:
- auth
- gender
- level
- location - mainly how many unique values
- method
- page
- userAgent
- userId - mainly how many unique values

In [8]:
event_data.select("auth").dropDuplicates().sort("auth").show()

+----------+
|      auth|
+----------+
| Cancelled|
|     Guest|
| Logged In|
|Logged Out|
+----------+



In [9]:
event_data.select("gender").dropDuplicates().sort("gender").show()

+------+
|gender|
+------+
|  null|
|     F|
|     M|
+------+



In [10]:
event_data.select("level").dropDuplicates().sort("level").show()

+-----+
|level|
+-----+
| free|
| paid|
+-----+



In [11]:
print('Number of unique location')
event_data.select("location").dropDuplicates().count()

Number of unique location


115

In [12]:
event_data.select("method").dropDuplicates().sort("method").show()

+------+
|method|
+------+
|   GET|
|   PUT|
+------+



In [13]:
event_data.select("page").dropDuplicates().sort("page").show()

+--------------------+
|                page|
+--------------------+
|               About|
|          Add Friend|
|     Add to Playlist|
|              Cancel|
|Cancellation Conf...|
|           Downgrade|
|               Error|
|                Help|
|                Home|
|               Login|
|              Logout|
|            NextSong|
|            Register|
|         Roll Advert|
|       Save Settings|
|            Settings|
|    Submit Downgrade|
| Submit Registration|
|      Submit Upgrade|
|         Thumbs Down|
+--------------------+
only showing top 20 rows



In [14]:
event_data.select("userAgent").dropDuplicates().sort("userAgent").show()

+--------------------+
|           userAgent|
+--------------------+
|                null|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Mac...|
|"Mozilla/5.0 (Win...|
|"Mozilla/5.0 (Win...|
+--------------------+
only showing top 20 rows



In [15]:
print('Number of unique users')
event_data.select(['userId']).dropDuplicates().count()

Number of unique users


226

In [16]:
event_data.select("userId").dropDuplicates().sort("userId").show()

+------+
|userId|
+------+
|      |
|    10|
|   100|
|100001|
|100002|
|100003|
|100004|
|100005|
|100006|
|100007|
|100008|
|100009|
|100010|
|100011|
|100012|
|100013|
|100014|
|100015|
|100016|
|100017|
+------+
only showing top 20 rows



**First set of observations**

1. gender has a null value -> we should get rid of the corresponding rows
2. userId has an empty string as a value -> we should get rid of the corresponding rows
3. userAgent and method do not seem to be relevant features to look at
4. auth has a Cancelled status that can be useful later on to look at churn rates
5. the number of unique users (225) and unique locations (115) tends to indicate that the population is wide-spread in multiple city, and this feature may not be of great value to predict churn (we will confirm that when exploring deeper the dataset)


**Clean the dataset**

In [17]:
# Let's take a look at the type of user actions we have in this dataset

event_data.select("page").dropDuplicates().sort("page").show()

+--------------------+
|                page|
+--------------------+
|               About|
|          Add Friend|
|     Add to Playlist|
|              Cancel|
|Cancellation Conf...|
|           Downgrade|
|               Error|
|                Help|
|                Home|
|               Login|
|              Logout|
|            NextSong|
|            Register|
|         Roll Advert|
|       Save Settings|
|            Settings|
|    Submit Downgrade|
| Submit Registration|
|      Submit Upgrade|
|         Thumbs Down|
+--------------------+
only showing top 20 rows



In [18]:
action_list = event_data.select("page").dropDuplicates().sort("page").collect()


In [19]:
print('Ratio of actions')
action_list = event_data.select("page").dropDuplicates().sort("page").collect()
for i in range(len(action_list)):
    action_name = action_list[i].__getitem__(0)
    action_count = event_data.where(event_data["page"]==action_name).count()/event_data.count()*100
    print('Column {}: '.format(action_name), action_count)

Ratio of actions
Column About:  0.3225130890052356
Column Add Friend:  1.492844677137871
Column Add to Playlist:  2.2778359511343806
Column Cancel:  0.018150087260034906
Column Cancellation Confirmation:  0.018150087260034906
Column Downgrade:  0.7172774869109948
Column Error:  0.0900523560209424
Column Help:  0.6024432809773124
Column Home:  5.046073298429319
Column Login:  1.131239092495637
Column Logout:  1.126003490401396
Column NextSong:  79.61884816753927
Column Register:  0.006282722513089006
Column Roll Advert:  1.3727748691099475
Column Save Settings:  0.1082024432809773
Column Settings:  0.5284467713787085
Column Submit Downgrade:  0.02198952879581152
Column Submit Registration:  0.0017452006980802793
Column Submit Upgrade:  0.05549738219895288
Column Thumbs Down:  0.8886561954624781
Column Thumbs Up:  4.380802792321116
Column Upgrade:  0.17417102966841186


It seems like the vast majority of the dataset is composed of the action "NextSong" (about 80%). And the columns Artist and Song are filled only when the value in the page column is "NextSong".
In the 20% of the row remaining however (Upgrade, Home,....) we are not expecting to find Artist or Song - which represents about 57000 rows. 

In [20]:
event_data.select("page").where((event_data["artist"].isNotNull())).dropDuplicates().show()



+--------+
|    page|
+--------+
|NextSong|
+--------+



In [21]:
# Check for missing or invalid data

print('Missing values count per column')
for col in event_data.columns:
    missing_count = event_data.filter((event_data[col] == "") | event_data[col].isNull() | isnan(event_data[col])).count()
    print('Column {}: '.format(col), missing_count)


Missing values count per column
Column artist:  58392
Column auth:  0
Column firstName:  8346
Column gender:  8346
Column itemInSession:  0
Column lastName:  8346
Column length:  58392
Column level:  0
Column location:  8346
Column method:  0
Column page:  0
Column registration:  8346
Column sessionId:  0
Column song:  58392
Column status:  0
Column ts:  0
Column userAgent:  8346
Column userId:  8346


So it seems like we have 8346 missing data points.

Note that the columns artist and songs have a lot more missing values. This is due to the fact that the nature of the records is not only playing a song, but also login to the service, going to the home page, ... and as we saw about 20% of the rows are not expected to have either Artist or Song values.

Well let's get rid of those missing values, it represents less than 3% of the dataset.

In [22]:
def drop_na(df):
    df_clean = df.filter((df[col] != "") & df[col].isNotNull() & ~isnan(df[col]))
    return df_clean

In [23]:
event_data_clean = drop_na(event_data)

In [24]:
print('Missing values count per column')
for col in event_data_clean.columns:
    missing_count = event_data_clean.filter((event_data_clean[col] == "") | event_data_clean[col].isNull() | isnan(event_data_clean[col])).count()
    print('Column {}: '.format(col), missing_count)

Missing values count per column
Column artist:  50046
Column auth:  0
Column firstName:  0
Column gender:  0
Column itemInSession:  0
Column lastName:  0
Column length:  50046
Column level:  0
Column location:  0
Column method:  0
Column page:  0
Column registration:  0
Column sessionId:  0
Column song:  50046
Column status:  0
Column ts:  0
Column userAgent:  0
Column userId:  0


Now that we have a dataset cleaned - no missing values, let's convert the ts column to a datetime format.

Let's create a function to perform all tasks

In [25]:
def convert_ts(df):
    # get the year of the action
    get_year = udf(lambda ts: datetime.datetime.fromtimestamp(ts/1000.0).year)
    df = df.withColumn("ts_year", get_year(df.ts))

    # get the month of the action
    get_month = udf(lambda ts: datetime.datetime.fromtimestamp(ts/1000.0).month)
    df = df.withColumn("ts_month", get_month(df.ts))

    # get the month day of the action
    get_day = udf(lambda ts: datetime.datetime.fromtimestamp(ts/1000.0).day)
    df = df.withColumn("ts_day", get_day(df.ts))
    
    # get the hour of day of the action
    get_hour = udf(lambda ts: datetime.datetime.fromtimestamp(ts/1000.0).hour)
    df = df.withColumn("ts_hour", get_hour(df.ts))
    
    # get the weekday of the action
    # 0 being Monday, 6 being Sunday
    get_weekday = udf(lambda ts: datetime.datetime.fromtimestamp(ts/1000.0).weekday())
    df = df.withColumn("ts_weekday", get_weekday(df.ts))

    return df


In [26]:
event_data_clean = drop_na(event_data)
event_data_clean = convert_ts(event_data_clean)

In [27]:
event_data_clean.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30', ts_year='2018', ts_month='10', ts_day='1', ts_hour='2', ts_weekday='0')

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

**Define churn**

Let's take a look at the rows where page is Cancellation Confirmation.

In [28]:
event_data_clean.select(['auth', 'gender', 'level', 'ts_year', 'ts_month', 'ts_day', 'ts_hour', 'ts_weekday']).where(event_data_clean['page']=='Cancellation Confirmation').show()


+---------+------+-----+-------+--------+------+-------+----------+
|     auth|gender|level|ts_year|ts_month|ts_day|ts_hour|ts_weekday|
+---------+------+-----+-------+--------+------+-------+----------+
|Cancelled|     M| paid|   2018|      10|     7|     22|         6|
|Cancelled|     M| paid|   2018|      10|     8|     23|         0|
|Cancelled|     M| free|   2018|      10|    12|      6|         4|
|Cancelled|     M| paid|   2018|      10|    12|     22|         4|
|Cancelled|     F| paid|   2018|      10|    13|     23|         5|
|Cancelled|     F| free|   2018|      10|    15|      9|         0|
|Cancelled|     M| paid|   2018|      10|    17|      0|         2|
|Cancelled|     M| paid|   2018|      10|    17|      2|         2|
|Cancelled|     M| paid|   2018|      10|    17|      9|         2|
|Cancelled|     M| paid|   2018|      10|    17|      9|         2|
|Cancelled|     M| paid|   2018|      10|    20|     17|         5|
|Cancelled|     F| paid|   2018|      10|    20|

We notice that the column auth is filled with the value "Cancelled". Let's confirm that whenever we have the page "Cancellation Confirmation" we have the auth set to "Cancelled".

In [29]:
event_data_clean.select('auth').dropDuplicates().show()

+---------+
|     auth|
+---------+
|Cancelled|
|Logged In|
+---------+



*Note that before removing the empty values (dropna function), we had two additional values for the auth feature: Guest and Logged Out.*
*It makes sense that those states are associated with an unknown userId, and that therefore we do not have any row with this auth values in the cleaned dataset.*

In [30]:
event_data_clean.filter((event_data_clean['auth']=='Cancelled')&(event_data_clean['page']!='Cancellation Confirmation')).show()


+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+-------+--------+------+-------+----------+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId|song|status| ts|userAgent|userId|ts_year|ts_month|ts_day|ts_hour|ts_weekday|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+-------+--------+------+-------+----------+
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+-------+--------+------+-------+----------+



Great! It seems like we have a direct relationship between auth = "Cancelled" and page = "Cancellation Confirmation".
So we can use either one to define churn. 

Let's stick to the page value (Cancellation Confirmation) as we are sure it indicates the action of the user to cancel the service. 

Now let's take a look at the Downgrade event, for users who go from a paid service to a free service.

In [31]:
event_data_clean.select(['userId', 'ts', 'page', 'level', 'song']).where((event_data_clean['userId']=='30')
                                                                         &(event_data_clean['ts']>1538994832000)
                                                                         &(event_data_clean['ts']<1538996254000)).collect()



[Row(userId='30', ts=1538995016000, page='NextSong', level='paid', song='Horseshoes and Hand Grenades'),
 Row(userId='30', ts=1538995447000, page='NextSong', level='paid', song='Jamaica Roots II(Agora E Sempre)'),
 Row(userId='30', ts=1538995453000, page='Downgrade', level='paid', song=None),
 Row(userId='30', ts=1538995454000, page='Submit Downgrade', level='paid', song=None),
 Row(userId='30', ts=1538995509000, page='Home', level='free', song=None),
 Row(userId='30', ts=1538995657000, page='NextSong', level='free', song='Crimewave (Crystal Castles vs Health)'),
 Row(userId='30', ts=1538995915000, page='NextSong', level='free', song='Hey (Album Version)')]

We see that from the moment a user goes to the page "Submit Downgrade", his/her level goes from paid to free.

Now let's add columns to our dataset as to visualise when a user churn_service (i.e cancelled the subscription), and when a user churn_paid (i.e downgraded from paid to free service). We can use these two statuses later on to predict whether a user is at risk to leave Sparkify, or to downgrade his/her subscription. 

In [32]:
# we create a flag for each special event
flag_downgrade_event = udf(lambda x: 1 if x == 'Submit Downgrade' else 0, IntegerType())
flag_cancel_event = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())

# we create a new column with the value of the flag
# create a new column for the downgraded event
event_data_clean = event_data_clean.withColumn("downgraded", flag_downgrade_event("page"))
# create a new column for the cancel event
event_data_clean = event_data_clean.withColumn("cancelled", flag_cancel_event("page"))


In [33]:
# we verify that the flag is properly set in the downgraded column
event_data_clean.select(['userId', 'ts', 'page', 'level', 'downgraded', 'cancelled']).where((event_data_clean['userId']=='30')
                                                                         &(event_data_clean['ts']>1538994832000)
                                                                         &(event_data_clean['ts']<1538996254000)).collect()



[Row(userId='30', ts=1538995016000, page='NextSong', level='paid', downgraded=0, cancelled=0),
 Row(userId='30', ts=1538995447000, page='NextSong', level='paid', downgraded=0, cancelled=0),
 Row(userId='30', ts=1538995453000, page='Downgrade', level='paid', downgraded=0, cancelled=0),
 Row(userId='30', ts=1538995454000, page='Submit Downgrade', level='paid', downgraded=1, cancelled=0),
 Row(userId='30', ts=1538995509000, page='Home', level='free', downgraded=0, cancelled=0),
 Row(userId='30', ts=1538995657000, page='NextSong', level='free', downgraded=0, cancelled=0),
 Row(userId='30', ts=1538995915000, page='NextSong', level='free', downgraded=0, cancelled=0)]

In [34]:
# we verify that the flag is properly set in the cancelled column
event_data_clean.select(['userId', 'ts', 'page', 'level', 'downgraded', 'cancelled']).where((event_data_clean['userId']=='100011')).collect()


[Row(userId='100011', ts=1538414422000, page='Home', level='free', downgraded=0, cancelled=0),
 Row(userId='100011', ts=1538414456000, page='NextSong', level='free', downgraded=0, cancelled=0),
 Row(userId='100011', ts=1538414463000, page='Home', level='free', downgraded=0, cancelled=0),
 Row(userId='100011', ts=1538414867000, page='NextSong', level='free', downgraded=0, cancelled=0),
 Row(userId='100011', ts=1538414867000, page='Roll Advert', level='free', downgraded=0, cancelled=0),
 Row(userId='100011', ts=1538415056000, page='NextSong', level='free', downgraded=0, cancelled=0),
 Row(userId='100011', ts=1538415104000, page='Home', level='free', downgraded=0, cancelled=0),
 Row(userId='100011', ts=1538415285000, page='NextSong', level='free', downgraded=0, cancelled=0),
 Row(userId='100011', ts=1538415557000, page='NextSong', level='free', downgraded=0, cancelled=0),
 Row(userId='100011', ts=1538415581000, page='Roll Advert', level='free', downgraded=0, cancelled=0),
 Row(userId='100

In [35]:
# we create a phase column to distinguish the change of status in both cases (i.e we identify the moment of switch)
#group by userID, order desc by timesstamp, take all preceeding rows
windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)
event_data_clean = event_data_clean.withColumn("phase_downgrade", Fsum("downgraded").over(windowval))
event_data_clean = event_data_clean.withColumn("phase_cancel", Fsum("cancelled").over(windowval))


In [36]:
# we verify that the phase is properly set in the downgraded column
event_data_clean.select(['userId', 'ts', 'page', 'level', 'downgraded', 'phase_downgrade']).where((event_data_clean['userId']=='30')
                                                                         &(event_data_clean['ts']>1538994832000)
                                                                         &(event_data_clean['ts']<1538996254000)).collect()



[Row(userId='30', ts=1538995915000, page='NextSong', level='free', downgraded=0, phase_downgrade=0),
 Row(userId='30', ts=1538995657000, page='NextSong', level='free', downgraded=0, phase_downgrade=0),
 Row(userId='30', ts=1538995509000, page='Home', level='free', downgraded=0, phase_downgrade=0),
 Row(userId='30', ts=1538995454000, page='Submit Downgrade', level='paid', downgraded=1, phase_downgrade=1),
 Row(userId='30', ts=1538995453000, page='Downgrade', level='paid', downgraded=0, phase_downgrade=1),
 Row(userId='30', ts=1538995447000, page='NextSong', level='paid', downgraded=0, phase_downgrade=1),
 Row(userId='30', ts=1538995016000, page='NextSong', level='paid', downgraded=0, phase_downgrade=1)]

In [37]:
# we verify that the phase is properly set in the cancelled column
event_data_clean.select(['userId', 'ts', 'page', 'level', 'cancelled', 'phase_cancel']).where((event_data_clean['userId']=='100011')).collect()


[Row(userId='100011', ts=1538417085000, page='Cancellation Confirmation', level='free', cancelled=1, phase_cancel=1),
 Row(userId='100011', ts=1538417064000, page='Cancel', level='free', cancelled=0, phase_cancel=1),
 Row(userId='100011', ts=1538417063000, page='Thumbs Down', level='free', cancelled=0, phase_cancel=1),
 Row(userId='100011', ts=1538417062000, page='NextSong', level='free', cancelled=0, phase_cancel=1),
 Row(userId='100011', ts=1538416899000, page='Home', level='free', cancelled=0, phase_cancel=1),
 Row(userId='100011', ts=1538416875000, page='Settings', level='free', cancelled=0, phase_cancel=1),
 Row(userId='100011', ts=1538416874000, page='NextSong', level='free', cancelled=0, phase_cancel=1),
 Row(userId='100011', ts=1538416667000, page='NextSong', level='free', cancelled=0, phase_cancel=1),
 Row(userId='100011', ts=1538416419000, page='Add to Playlist', level='free', cancelled=0, phase_cancel=1),
 Row(userId='100011', ts=1538416415000, page='NextSong', level='free',

In [38]:
# based on the two columns (event and phase) for each case, we add a label to each user

event_data_churn = event_data_clean.withColumn("churn_paid", Fmax('downgraded').over(Window.partitionBy("userId")))
event_data_churn = event_data_churn.withColumn("churn_service", Fmax('cancelled').over(Window.partitionBy("userId")))


In [39]:
# example of a user who churned paid (i.e downgraded the subscription)
# churn_paid = 1
event_data_churn.select(['userId', 'ts', 'page', 'level', 'downgraded', 'phase_downgrade', 'churn_paid']).where((event_data_churn['userId']=='30')
                                                                         &(event_data_churn['ts']>1538994832000)
                                                                         &(event_data_churn['ts']<1538996254000)).show(10)




+------+-------------+----------------+-----+----------+---------------+----------+
|userId|           ts|            page|level|downgraded|phase_downgrade|churn_paid|
+------+-------------+----------------+-----+----------+---------------+----------+
|    30|1538995915000|        NextSong| free|         0|              0|         1|
|    30|1538995657000|        NextSong| free|         0|              0|         1|
|    30|1538995509000|            Home| free|         0|              0|         1|
|    30|1538995454000|Submit Downgrade| paid|         1|              1|         1|
|    30|1538995453000|       Downgrade| paid|         0|              1|         1|
|    30|1538995447000|        NextSong| paid|         0|              1|         1|
|    30|1538995016000|        NextSong| paid|         0|              1|         1|
+------+-------------+----------------+-----+----------+---------------+----------+



In [40]:
event_data_churn.select(['userId', 'ts', 'page', 'level', 'downgraded', 'phase_downgrade', 'churn_paid']).where((event_data_churn['userId']=='32')).show(10)


+------+-------------+--------------------+-----+----------+---------------+----------+
|userId|           ts|                page|level|downgraded|phase_downgrade|churn_paid|
+------+-------------+--------------------+-----+----------+---------------+----------+
|    32|1539033046000|Cancellation Conf...| paid|         0|              0|         0|
|    32|1539033031000|              Cancel| paid|         0|              0|         0|
|    32|1539033030000|           Downgrade| paid|         0|              0|         0|
|    32|1539032978000|            NextSong| paid|         0|              0|         0|
|    32|1539032741000|            NextSong| paid|         0|              0|         0|
|    32|1539032608000|            NextSong| paid|         0|              0|         0|
|    32|1539032235000|            NextSong| paid|         0|              0|         0|
|    32|1539031887000|            NextSong| paid|         0|              0|         0|
|    32|1539031661000|          

In [41]:
# example of a user who churned service (i.e left the platform)
# churn_service = 1
event_data_churn.select(['userId', 'ts', 'page', 'level', 'cancelled', 'phase_cancel', 'churn_service']).where((event_data_churn['userId']=='100011')).show(10)


+------+-------------+--------------------+-----+---------+------------+-------------+
|userId|           ts|                page|level|cancelled|phase_cancel|churn_service|
+------+-------------+--------------------+-----+---------+------------+-------------+
|100011|1538417085000|Cancellation Conf...| free|        1|           1|            1|
|100011|1538417064000|              Cancel| free|        0|           1|            1|
|100011|1538417063000|         Thumbs Down| free|        0|           1|            1|
|100011|1538417062000|            NextSong| free|        0|           1|            1|
|100011|1538416899000|                Home| free|        0|           1|            1|
|100011|1538416875000|            Settings| free|        0|           1|            1|
|100011|1538416874000|            NextSong| free|        0|           1|            1|
|100011|1538416667000|            NextSong| free|        0|           1|            1|
|100011|1538416419000|     Add to Playlist|

In [42]:
# example of a user who did not churn service
# churn_service = 0
event_data_churn.select(['userId', 'ts', 'page', 'level', 'cancelled', 'phase_cancel', 'churn_service']).where((event_data_churn['userId']=='100010')).show(10)


+------+-------------+-----------+-----+---------+------------+-------------+
|userId|           ts|       page|level|cancelled|phase_cancel|churn_service|
+------+-------------+-----------+-----+---------+------------+-------------+
|100010|1542823952000|     Logout| free|        0|           0|            0|
|100010|1542823951000|   NextSong| free|        0|           0|            0|
|100010|1542823682000|   NextSong| free|        0|           0|            0|
|100010|1542823567000|   NextSong| free|        0|           0|            0|
|100010|1542823257000|Roll Advert| free|        0|           0|            0|
|100010|1542823248000|   NextSong| free|        0|           0|            0|
|100010|1542822946000|   NextSong| free|        0|           0|            0|
|100010|1542822641000|   NextSong| free|        0|           0|            0|
|100010|1542822434000|   NextSong| free|        0|           0|            0|
|100010|1542822170000|   NextSong| free|        0|           0| 

Let's refactor this step into a function, and apply it to our initial dataset.

In [43]:
def set_churn_label(df, churn_event):
    if churn_event == 'downgrade':
        flag_event_name = "Submit Downgrade"
        flag_col_name = "downgraded"
        phase_col_name = "phase_downgrade"
        label_col_name = "churn_paid"
    else:
        flag_event_name = "Cancellation Confirmation"
        flag_col_name = "cancelled"
        phase_col_name = "phase_cancel"
        label_col_name = "churn_service"        
    
    
    # we create a flag for the special event
    flag_event = udf(lambda x: 1 if x == flag_event_name else 0, IntegerType())
    
    # we create a new column with the value of the flag
    df_event = df.withColumn(flag_col_name, flag_event("page"))
    
    # we create a phase column to distinguish the change of status
    #group by userID, order desc by timesstamp, take all preceeding rows
    windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)
    df_phase = df_event.withColumn(phase_col_name, Fsum(flag_col_name).over(windowval))
    
    # we add a churn label to each user
    df_churn = df_phase.withColumn(label_col_name, Fmax(flag_col_name).over(Window.partitionBy("userId")))
    
    return df_churn
    

In [44]:
event_data_churn = set_churn_label(event_data_clean, 'downgrade')
event_data_churn = set_churn_label(event_data_churn, 'cancel')

In [45]:
event_data_churn.select(['userId', 'page', 'level', 'cancelled', 'phase_cancel', 'churn_service', 'downgraded', 'phase_downgrade', 'churn_paid']).orderBy(['userId', 'ts']).dropDuplicates().show(20)



+------+--------------+-----+---------+------------+-------------+----------+---------------+----------+
|userId|          page|level|cancelled|phase_cancel|churn_service|downgraded|phase_downgrade|churn_paid|
+------+--------------+-----+---------+------------+-------------+----------+---------------+----------+
|100012|        Logout| free|        0|           1|            1|         0|              0|         1|
|100021|          Home| free|        0|           1|            1|         0|              0|         0|
|   129| Save Settings| paid|        0|           1|            1|         0|              0|         0|
|   141|    Add Friend| free|        0|           0|            0|         0|              0|         1|
|   152|Submit Upgrade| free|        0|           0|            0|         0|              0|         0|
|    24| Save Settings| free|        0|           0|            0|         0|              0|         1|
|    25|    Add Friend| paid|        0|           0|   

Just to have an idea of the proportion, how many users did cancel the service, and how many did downgrade in the period we are looking at?


In [46]:
# Total number of users in this dataset
n_users = event_data_churn.select(['userId']).dropDuplicates().count()

In [47]:
churn_service = event_data_churn.select('userId').where(event_data_churn['churn_service']==1).dropDuplicates().count()
print('{} users unsubscribed from the service in the period considered.'.format(churn_service))
print('Percentage of users who downgraded: {:.2f}%.'.format(churn_service/n_users*100))


52 users unsubscribed from the service in the period considered.
Percentage of users who downgraded: 23.11%.


In [48]:
churn_paid = event_data_churn.select('userId').where(event_data_churn['churn_paid']==1).dropDuplicates().count()
print('{} users downgraded from paid to free service in the period considered.'.format(churn_paid))
print('Percentage of users who downgraded: {:.2f}%.'.format(churn_paid/n_users*100))


49 users downgraded from paid to free service in the period considered.
Percentage of users who downgraded: 21.78%.


In [49]:
print('The following users downgraded from paid to free, and then cancelled the service in the period considered:')
event_data_churn.select('userId').where((event_data_churn['churn_paid']==1)&(event_data_churn['churn_service']==1)).dropDuplicates().show()



The following users downgraded from paid to free, and then cancelled the service in the period considered:
+------+
|userId|
+------+
|    54|
|100025|
|100009|
|100015|
|   103|
|100012|
|200020|
|200011|
|    12|
+------+



In [50]:
#Let's look at one of these users, for example user 54
event_data_churn.select(['userId', 'level', 'page']).where((event_data_churn['userId']=='54')).dropDuplicates().sort(desc('ts')).collect()

# This user downgraded, from paid to free, then upgraded again, and then cancelled the service. 


[Row(userId='54', level='paid', page='Cancellation Confirmation'),
 Row(userId='54', level='paid', page='Cancel'),
 Row(userId='54', level='free', page='Submit Upgrade'),
 Row(userId='54', level='free', page='Upgrade'),
 Row(userId='54', level='free', page='Save Settings'),
 Row(userId='54', level='free', page='Help'),
 Row(userId='54', level='free', page='Settings'),
 Row(userId='54', level='free', page='Add Friend'),
 Row(userId='54', level='free', page='Add to Playlist'),
 Row(userId='54', level='free', page='Thumbs Down'),
 Row(userId='54', level='free', page='Logout'),
 Row(userId='54', level='free', page='Thumbs Up'),
 Row(userId='54', level='free', page='About'),
 Row(userId='54', level='free', page='Roll Advert'),
 Row(userId='54', level='free', page='NextSong'),
 Row(userId='54', level='free', page='Home'),
 Row(userId='54', level='paid', page='Submit Downgrade'),
 Row(userId='54', level='paid', page='Save Settings'),
 Row(userId='54', level='paid', page='Error'),
 Row(userId=

Note that these last set of users is counted twice - as churn_service and churn_paid users.

**Explore Data**

Let's recap quickly where we are right now....
1. we have a cleaned dataset, without any unknown userId
2. the ts column was splitted into multiple columns to segment time into year, month, day, weekday and hour
3. we defined two states of churn: either leaving the service (churn_service) or downgrading from paid to free service (churn_paid). These two can be analysed separately, knowing that for 9 users we observed both a downgrade and an unsubscription in the period of time we are looking at. 

In the rest of this notebook we are going to conduct the analysis of the two situations simultaneously, but independently. In this case, the users who first downgraded then left the service are going to be considered twice: we want to catch them before they downgrade (downgrade prediction) but if we fail and they downgrade anyway, we want to catch them before they cancel the service (cancellation prediction). The events are considered independent.
Besides, when comparing users who churned vs active users, in the case of a **downgrade** we compare with **active users with a paid level**!

*Note: we have too little data to determine whether downgrading can have an impact on cancelling the service, i.e whether knowing if a user downgraded can be used as a feature to predict his/her cancellation of the service. But this is definitely a direction to explore!*

It is now time to observe what are the behaviors of the users who churned, versus those who did not!
Let's explore a few directions:
- impact of the gender
- impact of the level (paid/free) of the user
- impact of the location of the user
- impact of the average number of songs listened to, artists, repeats, partial listening
- impact of the average listening time per day
- impact of the time of the activity (count of actions): 
    - time of the day,
    - day of the week, 
    - day of the month,
    - month of the year
- impact of the moment the user joined (month, day of the week, day of the month)

*Note that we are using plotly to render the graphs, and we must convert the dataframe to a pandas dataframe in order to render the plot.*

*Impact of the gender*

In [220]:
gender_no_churn = event_data_churn.select(['userId', 'gender', 'level']).where((event_data_churn['churn_paid']==0)&(event_data_churn['churn_service']==0)).dropDuplicates().toPandas()
gender_churn_paid = event_data_churn.select(['userId', 'gender']).where((event_data_churn['churn_paid']==1)&(event_data_churn['churn_service']==0)).dropDuplicates().toPandas()
gender_churn_service = event_data_churn.select(['userId', 'gender']).where((event_data_churn['churn_paid']==0)&(event_data_churn['churn_service']==1)).dropDuplicates().toPandas()


In [223]:
fig = make_subplots(rows=1, cols=2, shared_yaxes=True,
                   subplot_titles=("Service cancellation analysis", "Service downgrade analysis"))

fig.append_trace(go.Bar(
    x=['Male', 'Female'],
    y=[len(gender_churn_service[gender_churn_service['gender']=='M']), len(gender_churn_service[gender_churn_service['gender']=='F'])],
    name='Cancel churned users',
    marker=dict(
        color='rgb(68, 1, 84)'
    )
),1,1)
fig.append_trace(go.Bar(
    x=['Male', 'Female'],
    y=[len(gender_no_churn[gender_no_churn['gender']=='M']),
       len(gender_no_churn[gender_no_churn['gender']=='F'])],
    name='Active users',
    marker=dict(
        color='rgb(220, 227, 25)'
    )
),1,1)
fig.append_trace(go.Bar(
    x=['Male', 'Female'],
    y=[len(gender_churn_paid[gender_churn_paid['gender']=='M']), len(gender_churn_paid[gender_churn_paid['gender']=='F'])],
    name='Downgrade churned users',
    marker=dict(
        color='rgb(45, 112, 142)'
    ),
),1,2)
fig.append_trace(go.Bar(
    x=['Male', 'Female'],
    y=[len(gender_no_churn[(gender_no_churn['gender']=='M')&(gender_no_churn['level']=='paid')]),
       len(gender_no_churn[(gender_no_churn['gender']=='F')&(gender_no_churn['level']=='paid')])],
    name='Active paid users',
    marker=dict(
        color='rgb(149, 216, 64)'
    ),
),1,2)
fig.layout.update(go.Layout(
    title_text="Distribution of active vs churned user per gender",
    yaxis = {'title':"Count of users"}, 
    barmode='group'
))
fig.update_xaxes(title_text="Gender", row=1, col=1)
fig.update_xaxes(title_text="Gender", row=1, col=2)

fig.show()


At first glance, it looks like men cancel the service more in proportion than women, and that the gender is not of relevant influence to predict the downgrade of a user. Overall, this feature does not seem to bring much to predict churn, we will decide later on if we want to keep it or not to train our model. 

*Impact of the subscription level (paid/free)*

In this case, we are only interested in the link between the user leaving the service and his/her subscription level. As a matter of fact, it only makes sense to predict whether a user is about to downgrade his/her service is he/she has a paid subscription - it is assumed in this case that the user has a paid subscription level. 


In [53]:
level_no_churn = event_data_churn.select(['userId', 'level']).where((event_data_churn['churn_paid']==0)&(event_data_churn['churn_service']==0)).dropDuplicates().toPandas()
level_churn_service = event_data_churn.select(['userId', 'level']).where((event_data_churn['churn_paid']==0)&(event_data_churn['churn_service']==1)).dropDuplicates().toPandas()


In [218]:
fig = go.Figure(data=[
    go.Bar(
    x=['Paid', 'Free'],
    y=[100*len(level_churn_service[level_churn_service['level']=='paid'])/level_churn_service.shape[0],
       100*len(level_churn_service[level_churn_service['level']=='free'])/level_churn_service.shape[0]],
    name='Churned users',
    marker=dict(
        color='rgb(68, 1, 84)'
    )
),
    go.Bar(
    x=['Paid', 'Free'],
    y=[100*len(level_no_churn[level_no_churn['level']=='paid'])/level_no_churn.shape[0],
       100*len(level_no_churn[level_no_churn['level']=='free'])/level_no_churn.shape[0]],
    name='Active users',
    marker=dict(
        color='rgb(220, 227, 25)'
    )
)
])

fig.layout.update(go.Layout(
    title_text="Ratio of active vs churned user per subscription level",
    xaxis = {'title':"Level"},
    yaxis = {'title':"Percentage of users"}, 
    barmode='group'
))

fig.show()


At first glance, it looks like in proportion the users with a paid subscription are slightly less likely to cancel the service.


*Impact of the location of the user*

In [55]:
split_col = split(event_data_churn['location'], ', ')
event_data_churn_with_state =  event_data_churn.withColumn("state", split_col.getItem(1))

In [224]:
location_no_churn = event_data_churn_with_state.select(['userId', 'state', 'level']).where((event_data_churn_with_state['churn_paid']==0)&(event_data_churn_with_state['churn_service']==0)).dropDuplicates().toPandas()
location_churn_paid = event_data_churn_with_state.select(['userId', 'state']).where((event_data_churn_with_state['churn_paid']==1)&(event_data_churn_with_state['churn_service']==0)).dropDuplicates().toPandas()
location_churn_service = event_data_churn_with_state.select(['userId', 'state']).where((event_data_churn_with_state['churn_paid']==0)&(event_data_churn_with_state['churn_service']==1)).dropDuplicates().toPandas()
locations = event_data_churn_with_state.select(['state']).dropDuplicates().toPandas()['state']


In [226]:
fig = make_subplots(rows=2, cols=1,
                   subplot_titles=("Service cancellation analysis", "Service downgrade analysis"))

fig.append_trace(go.Bar(
    x=locations,
    y=location_churn_service.groupby('state')['userId'].count(),
    name='Cancel churned users',
    marker=dict(
        color='rgb(68, 1, 84)'
    )
),1,1)
fig.append_trace(go.Bar(
    x=locations,
    y=location_no_churn.groupby('state')['userId'].count(),
    name='Active users',
    marker=dict(
        color='rgb(220, 227, 25)'
    )
),1,1)
fig.append_trace(go.Bar(
    x=locations,
    y=location_churn_paid.groupby('state')['userId'].count(),
    name='Downgrade churned users',
    marker=dict(
        color='rgb(45, 112, 142)'
    ),
),2, 1)
fig.append_trace(go.Bar(
    x=locations,
    y=location_no_churn[location_no_churn['level']=='paid'].groupby('state')['userId'].count(),
    name='Active paid users',
    marker=dict(
        color='rgb(149, 216, 64)'
    ),
),2, 1)
fig.layout.update(go.Layout(
    title_text="Distribution of active vs churned user per state",
    barmode='group'
))
fig.update_xaxes(title_text="Location", tickangle=-45, row=1, col=1)
fig.update_xaxes(title_text="Location", tickangle=-45, row=2, col=1)
fig.update_yaxes(title_text="Count of users", row=1, col=1)
fig.update_yaxes(title_text="Count of users", row=2, col=1)

fig.show()

We can clearly see a pattern in those two graphs, where some states have only active users and no churned users. 

*Impact of the average number of songs listened to, artists, repeats, partial listening*

In this section we will not focus on *who* the user is, but *what* he/she does on the platform, by looking at the songs and artist he/she listens to, how many times to the same song or artist, how many partial listening, how many thumbs up or thumbs down.

In order to have a better idea on the usage, let's explore the average per week and per month separately.

Note that we have a partial listening when the difference between the timestamp of a "NextSong" action and the following action is less than the value of the length feature of the "NextSong" action.
For example, length = 185.25995, and the difference between the NextSong action associated with this length and the following user action is diff : 120.12556.


*Partial listenings flag*

Let's add a column to our dataset to flag when a "NextSong" action is a partial listening.
We are going to flag whenever a user skipped a song - i.e. went from NextSong to another NextSong page within the first 30 seconds of the current song being listened to. 

Note that we could also consider a user skipped a song if he/she went to another page than NextSong (for example, went to Home, or Submit Upgrade). Unfortunately, we do not have enough evidence that the song being listened to stops when going to any other page except another NextSong.

In [58]:
# create a partition grouping by userId and sorting by descending timestamp
user_category = Window.partitionBy("userId").orderBy(desc("ts"))

# add two new columns containing the value of the action and the timestamp of the next event logged for a user
partial_listening = event_data_churn.withColumn("next_ts", lag('ts', 1).over(user_category))
partial_listening = partial_listening.withColumn("next_action", lag('page', 1).over(user_category))

# calculate the difference between the timestamps of two following actions
partial_listening = partial_listening.withColumn("diff_ts", (partial_listening.next_ts.cast("integer")-partial_listening.ts.cast("integer"))/1000)

# drop the rows with a null value in the length column
#partial_listening = partial_listening.filter((~partial_listening["next_action"].isin(['Thumbs Up', 'Thumbs Down', 'Add to Playlist', 'Roll Advert']))).filter(partial_listening["length"].isNotNull())
partial_listening = partial_listening.filter((partial_listening["next_action"]=='NextSong')).filter(partial_listening["length"].isNotNull())

# compute to the time a song was listened two (difference between the length of a song and the diff between
# two following actions)
partial_listening = partial_listening.withColumn("time_song_left", partial_listening.length - partial_listening.diff_ts)




In [59]:
partial_listening.select(['userId', 'page', 'ts', 'next_action', 'next_ts', 'length', 'diff_ts', 'time_song_left']).show(20)


+------+--------+-------------+-----------+-------------+---------+-------+--------------------+
|userId|    page|           ts|next_action|      next_ts|   length|diff_ts|      time_song_left|
+------+--------+-------------+-----------+-------------+---------+-------+--------------------+
|100010|NextSong|1542823682000|   NextSong|1542823951000|269.76608|  269.0|  0.7660799999999881|
|100010|NextSong|1542823567000|   NextSong|1542823682000|115.90485|  115.0|  0.9048499999999962|
|100010|NextSong|1542822946000|   NextSong|1542823248000|302.57587|  302.0|   0.575870000000009|
|100010|NextSong|1542822641000|   NextSong|1542822946000|305.05751|  305.0|  0.0575099999999793|
|100010|NextSong|1542822434000|   NextSong|1542822641000|207.64689|  207.0|  0.6468900000000133|
|100010|NextSong|1542822170000|   NextSong|1542822434000|264.93342|  264.0|  0.9334200000000124|
|100010|NextSong|1542821662000|   NextSong|1542821943000| 281.5473|  281.0|   0.547300000000007|
|100010|NextSong|1542821317000

In [60]:
# create a flag column that tracks if a song was not listened to completely
def is_partial(song_length, listen_time):
    if listen_time > 30 and song_length > 30:
        # arbitrarily, we consider that a user skipped a song if the following action logged 
        # occured within 30 seconds after the beginning of the song
        return 0
    elif song_length < 30:
        if listen_time > 0.5*song_length:
            return 0  
    return 1

flag_partial_event = udf(lambda x, y: is_partial(x, y), IntegerType())
partial_listening = partial_listening.withColumn("partial_listening", flag_partial_event(partial_listening.length, partial_listening.diff_ts))


In [61]:
# let's see how many users actually listen partially to songs
partial_listening.select(['userId', 'partial_listening']).where(partial_listening['partial_listening']==1).show()

+------+-----------------+
|userId|partial_listening|
+------+-----------------+
|300002|                1|
|   120|                1|
|    92|                1|
|300021|                1|
|    39|                1|
|    91|                1|
|    45|                1|
+------+-----------------+



It looks like only 9 songs were partially listened to! Not relevant enough to be plotted unfortunately with this tiny dataset. 
However, let's keep in mind the logic to build this feature as we can use it later on to train our model!

*Repeats*

*RollAdvert*

*Login*

*Number of distinct artist listened to*


*Number of songs, thumbs up and thumbs down*

In this section, we are going to look at the **daily** average behavior of each user.
We could perform this analysis looking at weekly or monthly aggregates, so let's arbitrarily decide to look at a weekly aggregate. 

If our churn prediction model were to be deployed in production for a company trying to anticipate churning, we would expect they want an updated estimate that evolves fast, preferably every week instead of every month.


In [248]:
# we define user functions to track whether the user is listening to a song, or evaluating the song
is_song = udf(lambda action: 1 if action == 'NextSong' else 0, IntegerType())
is_thumb_up = udf(lambda action: 1 if action == 'Thumbs Up' else 0, IntegerType())
is_thumb_down = udf(lambda action: 1 if action == 'Thumbs Down' else 0, IntegerType())

# add columns to tag songs, thumb_up and thumb_down
agg_df = event_data_churn.withColumn('is_song', is_song(event_data_churn['page'])) \
                        .withColumn('is_thumb_up', is_thumb_up(event_data_churn['page'])) \
                        .withColumn('is_thumb_down', is_thumb_down(event_data_churn['page']))

In [249]:
agg_df['userId', 'page', 'is_song', 'is_thumb_up', 'is_thumb_down'].dropDuplicates().show(50)


+------+---------------+-------+-----------+-------------+
|userId|           page|is_song|is_thumb_up|is_thumb_down|
+------+---------------+-------+-----------+-------------+
|    28|     Add Friend|      0|          0|            0|
|   129|       Settings|      0|          0|            0|
|    10|       Settings|      0|          0|            0|
|    37|        Upgrade|      0|          0|            0|
|    57|      Thumbs Up|      0|          1|            0|
|   149|Add to Playlist|      0|          0|            0|
|   155|     Add Friend|      0|          0|            0|
|    30| Submit Upgrade|      0|          0|            0|
|100025|    Thumbs Down|      0|          0|            1|
|200001|    Thumbs Down|      0|          0|            1|
|200021|      Thumbs Up|      0|          1|            0|
|300022|      Downgrade|      0|          0|            0|
|    27|           Home|      0|          0|            0|
|   100|    Thumbs Down|      0|          0|            

In [250]:
# add a column with the week number in the year
agg_df = agg_df.withColumn('ts_to_date', from_unixtime(event_data_churn['ts']/1000, format='yyyy-MM-dd HH:mm:ss'))
agg_df = agg_df.withColumn('week_number', weekofyear(agg_df['ts_to_date']))


In [251]:
agg_df['userId', 'ts', 'ts_year', 'ts_month', 'ts_day', 'ts_to_date', 'week_number'].show(5)

+------+-------------+-------+--------+------+-------------------+-----------+
|userId|           ts|ts_year|ts_month|ts_day|         ts_to_date|week_number|
+------+-------------+-------+--------+------+-------------------+-----------+
|    30|1538352117000|   2018|      10|     1|2018-10-01 02:01:57|         40|
|     9|1538352180000|   2018|      10|     1|2018-10-01 02:03:00|         40|
|    30|1538352394000|   2018|      10|     1|2018-10-01 02:06:34|         40|
|     9|1538352416000|   2018|      10|     1|2018-10-01 02:06:56|         40|
|    30|1538352676000|   2018|      10|     1|2018-10-01 02:11:16|         40|
+------+-------------+-------+--------+------+-------------------+-----------+
only showing top 5 rows



In [254]:
# create a rolling window per week
rolling_week = Window.partitionBy(["userId", "ts_year", "week_number"]).orderBy("ts").rangeBetween(Window.unboundedPreceding, 0)

agg_df = agg_df.withColumn('tot_song_week', Fsum('is_song').over(rolling_week))\
                    .withColumn('tot_thumb_up_week', Fsum('is_thumb_up').over(rolling_week))\
                    .withColumn('tot_thumb_down_week', Fsum('is_thumb_down').over(rolling_week))

agg_df_filt = agg_df.groupby(['userId', "ts_year", "week_number", 'level', 'churn_service', 'churn_paid']) \
                    .agg(Fmax('tot_song_week').alias('max_song_week'), \
                         Fmax('tot_thumb_up_week').alias('max_thumb_up_week'), \
                         Fmax('tot_thumb_down_week').alias('max_thumb_down_week'))

agg_df_filt = agg_df_filt.withColumn('avg_song_week', col('max_song_week')/7) \
                        .withColumn('avg_thumb_up_week', col('max_thumb_up_week')/7) \
                        .withColumn('avg_thumb_down_week', col('max_thumb_down_week')/7)



In [255]:
agg_df_filt['userId', "ts_year", "week_number", 'avg_song_week', 'avg_thumb_up_week', 'avg_thumb_down_week'].show(20)


+------+-------+-----------+-------------------+-------------------+-------------------+
|userId|ts_year|week_number|      avg_song_week|  avg_thumb_up_week|avg_thumb_down_week|
+------+-------+-----------+-------------------+-------------------+-------------------+
|100002|   2018|         41|0.42857142857142855|                0.0|                0.0|
|   110|   2018|         45| 18.285714285714285| 1.4285714285714286| 0.2857142857142857|
|200024|   2018|         41| 18.285714285714285| 0.7142857142857143| 0.7142857142857143|
|    26|   2018|         42|  7.428571428571429|0.42857142857142855|                0.0|
|300007|   2018|         40| 15.857142857142858| 1.5714285714285714|0.14285714285714285|
|    83|   2018|         44| 16.571428571428573| 0.7142857142857143|                0.0|
|    86|   2018|         41| 2.2857142857142856|                0.0|                0.0|
|200007|   2018|         45|                3.0|0.14285714285714285|                0.0|
|    30|   2018|     

In [256]:
agg_df_filt.count()

1462

In [257]:
avg_week_no_churn = agg_df_filt.filter((agg_df_filt['churn_paid']==0)&(agg_df_filt['churn_service']==0)).dropDuplicates().toPandas()
avg_week_churn_service = agg_df_filt.filter((agg_df_filt['churn_paid']==0)&(agg_df_filt['churn_service']==1)).dropDuplicates().toPandas()
avg_week_churn_paid = agg_df_filt.filter((agg_df_filt['churn_paid']==1)&(agg_df_filt['churn_service']==0)).dropDuplicates().toPandas()


In [259]:
fig = make_subplots(rows=2, cols=1, shared_xaxes=True)

fig.append_trace(go.Box(
    x=avg_week_churn_service['avg_song_week'].unique(),
    name='Cancel churned users',
    marker=dict(
        color='rgb(68, 1, 84)'
    )
),1,1)
fig.append_trace(go.Box(
    x=avg_week_no_churn['avg_song_week'].unique(),
    name='Active users',
    marker=dict(
        color='rgb(220, 227, 25)'
    )
),1,1)
fig.append_trace(go.Box(
    x=avg_week_churn_paid['avg_song_week'].unique(),
    name='Downgrade churned users',
    marker=dict(
        color='rgb(45, 112, 142)'
    ),
),2, 1)
fig.append_trace(go.Box(
    x=avg_week_no_churn[avg_week_no_churn['level']=='paid']['avg_song_week'].unique(),
    name='Active paid users',
    marker=dict(
        color='rgb(149, 216, 64)'
    ),
),2, 1)

fig.layout.update(go.Layout(
    title_text="Distribution of active vs churned user per average number of songs per week",
    barmode='group',
    showlegend=True
))
fig.update_xaxes(title_text="Average number of songs per week", tickangle=-45, row=2, col=1)
fig.update_yaxes(title_text="Service cancellation analysis", row=1, col=1)
fig.update_yaxes(title_text="Service downgrade analysis", row=2, col=1)

fig.show()

In [260]:
fig = make_subplots(rows=2, cols=1, shared_xaxes=True)

fig.append_trace(go.Box(
    x=avg_week_churn_service['avg_thumb_up_week'].unique(),
    name='Cancel churned users',
    marker=dict(
        color='rgb(68, 1, 84)'
    )
),1,1)
fig.append_trace(go.Box(
    x=avg_week_no_churn['avg_thumb_up_week'].unique(),
    name='Active users',
    marker=dict(
        color='rgb(220, 227, 25)'
    )
),1,1)
fig.append_trace(go.Box(
    x=avg_week_churn_paid['avg_thumb_up_week'].unique(),
    name='Downgrade churned users',
    marker=dict(
        color='rgb(45, 112, 142)'
    ),
),2, 1)
fig.append_trace(go.Box(
    x=avg_week_no_churn[avg_week_no_churn['level']=='paid']['avg_thumb_up_week'].unique(),
    name='Active paid users',
    marker=dict(
        color='rgb(149, 216, 64)'
    ),
),2, 1)

fig.layout.update(go.Layout(
    title_text="Distribution of active vs churned user per average number of thumb up per week",
    barmode='group',
    showlegend=True
))
fig.update_xaxes(title_text="Average number of thumb up per week", tickangle=-45, row=2, col=1)
fig.update_yaxes(title_text="Service cancellation analysis", row=1, col=1)
fig.update_yaxes(title_text="Service downgrade analysis", row=2, col=1)

fig.show()



In [261]:
fig = make_subplots(rows=2, cols=1, shared_xaxes=True)

fig.append_trace(go.Box(
    x=avg_week_churn_service['avg_thumb_down_week'].unique(),
    name='Cancel churned users',
    marker=dict(
        color='rgb(68, 1, 84)'
    )
),1,1)
fig.append_trace(go.Box(
    x=avg_week_no_churn['avg_thumb_down_week'].unique(),
    name='Active users',
    marker=dict(
        color='rgb(220, 227, 25)'
    )
),1,1)
fig.append_trace(go.Box(
    x=avg_week_churn_paid['avg_thumb_down_week'].unique(),
    name='Downgrade churned users',
    marker=dict(
        color='rgb(45, 112, 142)'
    ),
),2, 1)
fig.append_trace(go.Box(
    x=avg_week_no_churn[avg_week_no_churn['level']=='paid']['avg_thumb_down_week'].unique(),
    name='Active paid users',
    marker=dict(
        color='rgb(149, 216, 64)'
    ),
),2, 1)

fig.layout.update(go.Layout(
    title_text="Distribution of active vs churned user per average number of thumb down per week",
    barmode='group',
    showlegend=True
))
fig.update_xaxes(title_text="Average number of thumb down per week", tickangle=-45, row=2, col=1)
fig.update_yaxes(title_text="Service cancellation analysis", row=1, col=1)
fig.update_yaxes(title_text="Service downgrade analysis", row=2, col=1)

fig.show()

Looking at the daily average number of songs, thumbs up and thumbs down, it looks like:
- cancel churned users have a lower average than active users
- downgrade churned users have a slightly higher average than active (paid) users

*Impact of the daily average listening time*

*Impact of the time of the activity (count of actions based on the day, month, hour...)*

*Impact of the registration time (month, day of the week, day of the month)*

*Number of active users per month*

*User upgrade/downgrade within 3 months depending on when they joined (monthly cohorts)*

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

Build a summary table (define logic - week aggregate of daily average)
Train 3 independent models with difference weighted average of current week vs summary (0.8/0.2, 0.5/0.5, 0.6/0.4)
Deviations
- number of songs on weekdays, number of songs on weekends
- number of logins
- time between logins

Other features: gender, location, label
Split in two df: cancel churn and downgrade churn

**Intro comment**

To be able to engineer new features, we are actually going to reuse a lot of the code we wrote previously to plot the data and have an insight on the information it could bring. 

The goal here is to define a function that will process the initial dataset as to produce two twin dataframes:
- one dataframe that will focus on cancel churned users
- and the other that will focus on downgrade churned users

In both dataframe we will have the similar following structure:
- one row per userId (without any userId feature though)
- the following columns or features:
    - generic features: gender, level (paid/free), location (state)
    - week aggregation features:
        - daily average number of songs, thumb up, thumb down, distinct artist
        - daily average time listening
    - month aggregation features:
        - daily average number of songs, thumb up, thumb down, distinct artist
        - daily average time listening
        - weekly average number of visits
        - average time between login
    - label: churn/no_churn

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.