# Feature Engineering on cluster for Cancelled users

##### The feature is marked as candidate feature if

      |Δ| > 0.10

In [1]:
print("Welcome to my EMR Notebook!")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
47,application_1597998755054_0083,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Welcome to my EMR Notebook!

In [114]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, \
                                  avg, from_unixtime, split, min, max, lit, mean, col
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import abs as Fabs
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

import datetime
from pyspark.sql.types import IntegerType, TimestampType, FloatType
from pyspark.sql.functions import to_date, year, month, dayofmonth, dayofweek, hour, date_format, substring

import numpy as np
import time

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Set time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
data = spark.read.json(event_data)
data.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

### Prepare data

In [5]:
# Function that returns all users of a specified churn group
def get_users(churn):
    """Returns all distinct users of a specified churn group.
    
    Args:
        churn (int): A specified churn group - 1 for churned and 0 for non-churned users.
        
    Returns:
        DataFrame: A dataframe query with filtered users.
    """
    return data.where(data.churn == 1).select('userId').dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Count all users
data.select('userId').dropDuplicates().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22278

In [7]:
# Remove rows with missing users
data = data.where(~((col('userId').isNull()) | (col('userId') == '')))

# Exclude non-relevant columns
data = data.drop('firstName')
data = data.drop('lastName')

# Add tsDate and date column
data = data.withColumn('tsDate', (col('ts') / 1000.0).cast(TimestampType()))
data = data.withColumn('date', date_format(col('tsDate'), 'yyyy-MM-dd').alias('date').cast('date'))

# Label churned users using Cancellation Confirmation event
query_churn_by_cc = data.where(data.page == 'Cancellation Confirmation')
canceled = query_churn_by_cc.select('userId').dropDuplicates().select('userId')
canceled_uids = [row.userId for row in canceled.collect()];
set_churn = udf(lambda x: 1 if x in canceled_uids else 0, IntegerType())
data = data.withColumn('churn', set_churn('userId'))

# Add [userRowId] column that assigns a 1-based index to every user's log ordered by [ts]
w =  Window.partitionBy(data.userId).orderBy('ts', 'itemInSession')
data = data.withColumn('userRowId', row_number().over(w))

# Add [userRowDescId] column that assigns a 1-based index to every user's log ordered by [ts] descending.
w =  Window.partitionBy(data.userId).orderBy(col('ts').desc(), col('itemInSession').desc())
data = data.withColumn('userRowDescId', row_number().over(w))

# Add last level column
last_levels = dict()
for row in data.where(data.userRowDescId == 1).select('userId', 'level').collect():
    last_levels[row.userId] = row.level
get_level = udf(lambda userId: last_levels[userId])
data = data.withColumn('lastLevel', get_level('userId'))

# Prepare labels
labels = data.select(col('churn').alias('label'), 'userId').dropDuplicates()

# Count churned users
print(f'Churned (cancelled) users: {get_users(1).count()}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Churned (cancelled) users: 5003

In [8]:
data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- tsDate: timestamp (nullable = true)
 |-- date: date (nullable = true)
 |-- churn: integer (nullable = true)
 |-- userRowId: integer (nullable = true)
 |-- userRowDescId: integer (nullable = true)
 |-- lastLevel: string (nullable = true)

### Queries

In [12]:
# All unique users
users = data.select('userId').dropDuplicates()

# Pages without churn definition events
page_data = data.where(~data.page.isin(['Cancel', 'Cancellation Confirmation'])) \
    .select('churn', 'page', 'userId', 'sessionId', 'ts', 'date')

# Calc session duration (in hours)
session_hours = page_data \
    .groupby('userId', 'sessionId') \
    .agg(((max('ts') - min('ts'))/1000/3600).alias('sessionHours'))

# User interactions duration per user (in hours)
user_hours = page_data \
    .groupby('userId', 'sessionId') \
    .agg(((max('ts') - min('ts'))/1000/3600).alias('sessionHours')) \
    .groupby('userId') \
    .agg(Fsum('sessionHours').alias('hours'))  

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Global Variables

In [27]:
# Any feature whose delta fulfills this criterion will be automatically selected (by the show_delta function)
delta_threshold = 0.10

# The collection of all selected engineered features
selected_features = []

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Functions

In [76]:
# We'll calculate the ratio difference (delta) and show it through this function.
# Note that we haven't incroporated the delta calculation in spark querying due to 
# the simplicity of a manual solution (for a very limited number of features though).
def show_delta(feature, v1, v0, force_selection=False):
    """Calculate delta and print it.
    
    If delta is greater than delta_threshold the feature is selected.
    
    Args:
        feature (string): The name of a feature.
        v1 (float): The statistical value of the churned users.
        v0 (float): The statistical value of the non-churned users.
        force_selection (bool): If True then the feature is selected without the threshold condition. 
    Returns:
        None
    """
    # Calc delta
    delta = (v1 - v0)/(v1 + v0)
    
    # Delete the feature if it already exists (to avoid duplicates):
    ix = None
    try:
        ix = [x['feature'] for x in selected_features].index(feature)
        del selected_features[ix]
    except ValueError:
        pass
    
    if (force_selection == True) or (abs(delta) > delta_threshold):
        selected_features.append({'feature': feature, 'delta': delta})
        print(f'Δ for {feature} feature: {round(delta, 4)} (SELECTED)')
    else:
        print(f'Δ for {feature} feature: {round(delta, 4)}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
# Function that returns all users of a specified churn group
def get_users(churn):
    return data.where(data.churn == 1).select('userId').dropDuplicates()

# Count page logs
def page_count(page):
    return page_data \
        .where(data.page == page) \
        .groupby('userId') \
        .count() \
        .select('userId', col('count').alias(page.replace(' ', '') + 'Count'))

# Average page count per session hour
def page_session_hour(page):
    return page_data \
        .where(data.page == page) \
        .join(session_hours, ['userId', 'sessionId'], 'inner') \
        .groupby( 'userId', 'sessionId', 'sessionHours') \
        .agg((count('userId')/col('sessionHours')).alias('avgPerSession')) \
        .groupby('userId') \
        .agg(avg('avgPerSession').alias('avg')) \
        .select('userId', col('avg').alias(page.replace(' ', '') + 'PerSessionHour'))

# Average page count per hour
def page_hour(page):
    return page_data \
        .where(data.page == page) \
        .join(user_hours, 'userId', 'inner') \
        .groupby('userId', 'hours') \
        .agg((count('userId')/col('hours')).alias('avg')) \
        .select('userId', col('avg').alias(page.replace(' ', '') + 'PerHour'))

# Average page count per day
def page_day(page):
    return page_data \
        .where(data.page == page) \
        .groupby('userId', 'date') \
        .count() \
        .groupby('userId') \
        .agg(avg('count').alias(page.replace(' ', '') + 'PerDay'))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Features

In [31]:
    f_Gender = data \
        .select('userId', 'gender') \
        .dropDuplicates() \
        .replace(['M', 'F'], ['0', '1'], 'gender') \
        .select('userId', col('gender').cast('int').alias('Gender'))

    f_LastLevel = data \
        .select('userId', 'lastLevel') \
        .dropDuplicates() \
        .replace(['free', 'paid'], ['0', '1'], 'lastLevel') \
        .select('userId', col('lastLevel').cast('int').alias('LastLevel'))

    f_LogCount = data \
        .groupby('userId') \
        .agg(count('userId').alias('LogCount'))

    f_SongCount = data \
        .where(data.page == 'NextSong') \
        .groupby('userId') \
        .agg(count('userId').alias('SongCount'))

    f_NonSongCount = data \
        .where(data.page != 'NextSong') \
        .groupby('userId') \
        .agg(count('userId').alias('NonSongCount'))

    f_AboutCount = page_count('About')

    f_ThumbsUpCount = page_count('Thumbs Up')

    f_RollAdvertCount = page_count('Roll Advert')

    f_SessionCount = data \
        .select('userId', 'sessionId') \
        .dropDuplicates() \
        .groupby('userId') \
        .agg(count('userId').alias('SessionCount'))

    f_AvgSessionLength = data \
       .groupby('userId', 'sessionId') \
       .agg(((max('ts') - min('ts'))/1000).alias('sessionLength')) \
       .groupby('userId') \
       .agg(avg('sessionLength').alias('AvgSessionLength')) \

    f_AvgSessionGap = data \
        .groupby('userId', 'sessionId') \
        .agg(min('ts').alias('startTime'), max('ts').alias('endTime')) \
        .groupby('userId') \
        .agg(count('userId').alias('sessionCount'), \
            ((max('endTime') - min('startTime'))/1000).alias('observationPeriodTime'), \
            (Fsum(col('endTime') - col('startTime'))/1000).alias('totalSessionTime')) \
        .where(col('sessionCount') > 1) \
        .join(users, 'userId', 'outer') \
        .fillna(0) \
        .select('userId', \
            (col('observationPeriodTime') - col('totalSessionTime')/(col('sessionCount') - 1)).alias('AvgSessionGap'))

    f_DowngradePerSessionHour = page_session_hour('Downgrade')

    f_ErrorPerSessionHour = page_session_hour('Error')

    f_SettingsPerSessionHour = page_session_hour('Settings')

    f_SaveSettingsPerSessionHour = page_session_hour('Save Settings')

    f_LogoutPerSessionHour = page_session_hour('Logout')

    f_SubmitDowngradePerSessionHour = page_session_hour('Submit Downgrade')

    f_RollAdvertPerHour = page_hour('Roll Advert')

    f_ThumbsDownPerHour = page_hour('Thumbs Down')

    f_UpgradePerHour = page_hour('Upgrade')

    f_SubmitUpgradePerHour = page_hour('Submit Upgrade')

    f_SessionsPerDay = data \
        .select('userId', 'date', 'sessionId') \
        .dropDuplicates() \
        .groupby('userId', 'date') \
        .count() \
        .groupby('userId') \
        .agg(avg('count').alias('SessionsPerDay'))

    f_AddFriendPerDay = page_day('Add Friend')

    f_RollAdvertPerDay = page_day('Roll Advert')

    f_ThumbsDownPerDay = page_day('Thumbs Down')

    f_ThumbsUpPerDay = page_day('Thumbs Up')

    f_TotalSongLength = data \
        .where(data.page == 'NextSong') \
        .select('userId', 'length') \
        .groupby('userId') \
        .agg(Fsum('length').alias('TotalSongLength'))

    f_UniqueSongCount = data \
        .where(data.page == 'NextSong') \
        .select('userId', 'song') \
        .dropDuplicates() \
        .groupby('userId') \
        .agg(count('userId').alias('UniqueSongCount'))

    f_UniqueSongShare = data \
        .where(data.page == 'NextSong') \
        .select('userId', 'song') \
        .dropDuplicates() \
        .groupby('userId') \
        .count() \
        .join(f_SongCount, on = ['userId'], how = 'inner') \
        .select('userId', (col('count')/col('SongCount')).alias('UniqueSongShare')) 


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Feature Check

### AvgSessionGap (YES)

In [18]:
f_gaps = labels.join(f_AvgSessionGap, 'userId', 'outer') \
    .fillna(0)
f_gaps.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+-----------------+
| userId|label|    AvgSessionGap|
+-------+-----+-----------------+
|1001393|    0|5037908.333333333|
|1002143|    0|4925333.714285715|
|1002493|    1|682813.6666666666|
|1002749|    0|4810390.533333333|
|1004060|    0|       4118678.76|
|1004316|    0|4581602.857142857|
|1006033|    0|        3844641.0|
|1006411|    0|5115310.333333333|
|1006697|    0|4368399.181818182|
|1008244|    1|         195181.0|
|1008404|    1|3341917.830188679|
|1010669|    0|        5150961.3|
|1010907|    0|4222507.285714285|
|1011093|    0|4962723.866666666|
|1011149|    0|4317323.444444444|
|1011630|    0|       2698736.25|
|1012906|    1|         852531.0|
|1013788|    0|        3225160.0|
|1017431|    0|         307951.0|
|1018499|    0|        4995090.5|
+-------+-----+-----------------+
only showing top 20 rows

In [20]:
f_gaps.groupby('label').agg(avg('AvgSessionGap')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------------------+
|label|avg(AvgSessionGap)|
+-----+------------------+
|    1| 2083145.432607491|
|    0| 3935744.507611506|
+-----+------------------+

In [32]:
show_delta('AvgSessionGap', 2083145.432607491, 3935744.507611506)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for AvgSessionGap feature: -0.3078 (SELECTED)

### Per hour features

In [33]:
# Average page count per hour, per churn
page_data \
    .join(user_hours, 'userId', 'inner') \
    .groupby('churn', 'userId', 'page', 'hours') \
    .agg((count('userId')/col('hours')).alias('CountPerHour')) \
    .groupby('churn', 'page') \
    .agg(avg('CountPerHour').alias('AvgCountPerHour')) \
    .sort('page', 'churn') \
    .show(100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------------------+--------------------+
|churn|               page|     AvgCountPerHour|
+-----+-------------------+--------------------+
|    0|              About| 0.07056233416090642|
|    1|              About|0.055580346583583164|
|    0|         Add Friend| 0.32170083638953945|
|    1|         Add Friend|  0.2984439504534136|
|    0|    Add to Playlist|  0.4261184597926616|
|    1|    Add to Playlist| 0.42158988333321673|
|    0|          Downgrade| 0.13972529087490965|
|    1|          Downgrade|  0.1859004628041162|
|    0|              Error| 0.03694080108968873|
|    1|              Error| 0.03785066007418288|
|    0|               Help| 0.11350187298925041|
|    1|               Help| 0.11306976192257237|
|    0|               Home|  1.3388497396092536|
|    1|               Home|  0.8691192484357486|
|    0|              Login|0.020621423633006685|
|    0|             Logout|   1.450812997320228|
|    1|             Logout|  0.2551084689273071|
|    0|           Ne

##### Selected per hour features:

 - Downgrade
 - Home
 - Logout
 - Roll Advert
 - Settings
 - Submit Downgrade
 - Submit Upgrade
 - Thumbs Down

In [34]:
show_delta('DowngradePerHour', 0.1859004628041162, 0.13972529087490962)
show_delta('HomePerHour', 0.8691192484357485, 1.3388497396092534)
show_delta('LogoutPerHour', 0.25510846892730715, 1.450812997320228)
show_delta('RollAdvertPerHour', 0.9863086431306303, 0.6338012118429249)
show_delta('SettingsPerHour', 0.18234346960614486, 0.13177341107697765)
show_delta('SubmitDowngradePerHour', 0.024503729625633443, 0.018981978203069)
show_delta('SubmitUpgradePerHour', 0.03708193589793271, 0.02785325006807331)
show_delta('ThumbsDownPerHour', 0.251281486698442, 0.19713887432189978)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for DowngradePerHour feature: 0.1418 (SELECTED)
Δ for HomePerHour feature: -0.2127 (SELECTED)
Δ for LogoutPerHour feature: -0.7009 (SELECTED)
Δ for RollAdvertPerHour feature: 0.2176 (SELECTED)
Δ for SettingsPerHour feature: 0.161 (SELECTED)
Δ for SubmitDowngradePerHour feature: 0.127 (SELECTED)
Δ for SubmitUpgradePerHour feature: 0.1421 (SELECTED)
Δ for ThumbsDownPerHour feature: 0.1207 (SELECTED)

### Per day features

In [31]:
# Average page count per day, per churn
page_data \
    .groupby('churn', 'userId', 'page', 'date') \
    .count() \
    .groupby('churn', 'userId', 'page') \
    .agg(avg('count').alias('AvgCountPerDay')) \
    .groupby('churn', 'page') \
    .agg(avg('AvgCountPerDay').alias('AvgCountPerDay')) \
    .sort('page', 'churn') \
    .show(100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------------------+------------------+
|churn|               page|    AvgCountPerDay|
+-----+-------------------+------------------+
|    0|              About|1.2265077187557558|
|    1|              About|1.1872657560171735|
|    0|         Add Friend|2.4517891033084993|
|    1|         Add Friend| 2.564261041807439|
|    0|    Add to Playlist|2.6646704967460786|
|    1|    Add to Playlist| 2.827303473707138|
|    0|          Downgrade|1.8509720146725048|
|    1|          Downgrade| 1.940805990691478|
|    0|              Error|1.0762686323015083|
|    1|              Error|1.0822312859523107|
|    0|               Help|1.4402423265020787|
|    1|               Help|1.4400763045141485|
|    0|               Home|3.6046560619564607|
|    1|               Home|  3.59223269281641|
|    0|              Login| 4858.196721311476|
|    0|             Logout|1.6710914426251295|
|    1|             Logout|1.7796447995266915|
|    0|           NextSong|59.566890284449954|
|    1|      

##### Selected per day features:

 - Roll Advert

In [35]:
show_delta('RollAdvertPerDay', 3.9996697052271704, 3.014399340519118)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for RollAdvertPerDay feature: 0.1405 (SELECTED)

### SongCount (NO)

In [50]:
labels.join(f_SongCount, 'userId', 'outer') \
    .fillna(0) \
    .groupby('label').agg(avg('SongCount')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----------------+
|label|   avg(SongCount)|
+-----+-----------------+
|    1|876.7291625024985|
|    0|953.0533140376266|
+-----+-----------------+

In [36]:
show_delta('SongCount', 876.7291625024985, 953.0533140376266)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for SongCount feature: -0.0417

### NonSongCount (YES)

In [52]:
labels.join(f_NonSongCount, 'userId', 'outer') \
    .fillna(0) \
    .groupby('label').agg(avg('NonSongCount')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------------------+
|label| avg(NonSongCount)|
+-----+------------------+
|    1|199.11872876274236|
|    0|255.44057887120115|
+-----+------------------+

In [37]:
show_delta('NonSongCount', 199.11872876274236, 255.44057887120115)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for NonSongCount feature: -0.1239 (SELECTED)

### SessionCount (YES)

In [53]:
labels.join(f_SessionCount, 'userId', 'outer') \
    .fillna(0) \
    .groupby('label').agg(avg('SessionCount')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------------------+
|label| avg(SessionCount)|
+-----+------------------+
|    1|12.300219868079152|
|    0|22.786685962373372|
+-----+------------------+

In [38]:
show_delta('SessionCount', 12.300219868079152, 22.786685962373372)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for SessionCount feature: -0.2989 (SELECTED)

### AvgSessionLength (NO)

In [54]:
labels.join(f_AvgSessionLength, 'userId', 'outer') \
    .fillna(0) \
    .groupby('label').agg(avg('AvgSessionLength')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+---------------------+
|label|avg(AvgSessionLength)|
+-----+---------------------+
|    1|   17222.534302078067|
|    0|   16409.735186977854|
+-----+---------------------+

In [39]:
show_delta('AvgSessionLength', 17222.534302078067, 16409.735186977854)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for AvgSessionLength feature: 0.0242

### TotalSongLength (NO)

In [55]:
labels.join(f_TotalSongLength, 'userId', 'outer') \
    .fillna(0) \
    .groupby('label').agg(avg('TotalSongLength')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+
|label|avg(TotalSongLength)|
+-----+--------------------+
|    1|   218025.0358112713|
|    0|  237060.12599282028|
+-----+--------------------+

In [40]:
show_delta('TotalSongLength', 218025.0358112713, 237060.12599282028)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for TotalSongLength feature: -0.0418

### SessionsPerDay (NO)

In [56]:
labels.join(f_SessionsPerDay, 'userId', 'outer') \
    .fillna(0) \
    .groupby('label').agg(avg('SessionsPerDay')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------------------+
|label|avg(SessionsPerDay)|
+-----+-------------------+
|    1| 1.2193029303079534|
|    0| 1.3044363651611546|
+-----+-------------------+

In [41]:
show_delta('SessionPerDay', 1.2193029303079534, 1.3044363651611546)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for SessionPerDay feature: -0.0337

### UniqueSongCount (NO)

In [57]:
labels.join(f_UniqueSongCount, 'userId', 'outer') \
    .fillna(0) \
    .groupby('label').agg(avg('UniqueSongCount')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+
|label|avg(UniqueSongCount)|
+-----+--------------------+
|    1|   779.5772536478113|
|    0|   842.7589001447178|
+-----+--------------------+

In [42]:
show_delta('UniqueSongCount', 779.5772536478113, 842.7589001447178)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for UniqueSongCount feature: -0.0389

### UniqueSongShare (NO)

In [58]:
labels.join(f_UniqueSongShare, 'userId', 'outer') \
    .fillna(0) \
    .groupby('label').agg(avg('UniqueSongShare')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+
|label|avg(UniqueSongShare)|
+-----+--------------------+
|    1|  0.9428555327477975|
|    0|  0.9384634708370324|
+-----+--------------------+

In [43]:
show_delta('UniqueSongShare', 0.9428555327477975, 0.9384634708370324)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for UniqueSongShare feature: 0.0023

### LogCount (NO)

In [59]:
labels.join(f_LogCount, 'userId', 'outer') \
    .fillna(0) \
    .groupby('label').agg(avg('LogCount')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------------------+
|label|     avg(LogCount)|
+-----+------------------+
|    1|1075.8478912652408|
|    0|1208.4938929088278|
+-----+------------------+

In [44]:
show_delta('LogCount', 1075.8478912652408, 1208.4938929088278)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for LogCount feature: -0.0581

### Per session-hour features

In [64]:
page_data \
    .join(session_hours, ['userId', 'sessionId'], 'inner') \
    .groupby('churn', 'page', 'userId', 'sessionId', 'sessionHours') \
    .agg((count('userId')/col('sessionHours')).alias('AvgPerSessionHour')) \
    .groupby('churn', 'page', 'userId') \
    .agg(avg('AvgPerSessionHour').alias('AvgPerSessionHour')) \
    .groupby('churn', 'page') \
    .agg(avg('AvgPerSessionHour').alias('AvgPerSessionHour')) \
    .sort('page', 'churn') \
    .show(100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------------------+-------------------+
|churn|               page|  AvgPerSessionHour|
+-----+-------------------+-------------------+
|    0|              About| 1.3286997598868076|
|    1|              About| 1.7558983615657775|
|    0|         Add Friend| 10.566364547227176|
|    1|         Add Friend| 10.577382252179353|
|    0|    Add to Playlist| 0.8948913425563265|
|    1|    Add to Playlist|  0.845031938914712|
|    0|          Downgrade| 0.5251358741020042|
|    1|          Downgrade| 0.9873323822777235|
|    0|              Error| 0.7684184077909724|
|    1|              Error| 0.3551135184536166|
|    0|               Help| 1.3482266412324133|
|    1|               Help| 1.1534599873795206|
|    0|               Home| 13.079747649035781|
|    1|               Home| 12.947641686686476|
|    0|              Login|  916.3842438628917|
|    0|             Logout| 22.457422711291965|
|    1|             Logout| 16.212140451544194|
|    0|           NextSong| 29.666904382

##### Selected per session-hour features:

 - About
 - Downgrade
 - Error
 - Settings
 - Upgrade

In [45]:
show_delta('AboutPerSessionHour', 1.7558983615657775, 1.3286997598868076)
show_delta('DowngradePerSessionHour', 0.9873323822777235, 0.5251358741020042)
show_delta('ErrorPerSessionHour', 0.3551135184536166, 0.7684184077909724)
show_delta('SettingsPerSessionHour', 2.6505488541324906, 1.2855436334864851)
show_delta('UpgradePerSessionHour', 1.0592725947078798, 0.8361369349449518)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for AboutPerSessionHour feature: 0.1385 (SELECTED)
Δ for DowngradePerSessionHour feature: 0.3056 (SELECTED)
Δ for ErrorPerSessionHour feature: -0.3679 (SELECTED)
Δ for SettingsPerSessionHour feature: 0.3468 (SELECTED)
Δ for UpgradePerSessionHour feature: 0.1177 (SELECTED)

### Page count features

In [71]:
page_data \
    .groupby('churn', 'page', 'userId') \
    .count() \
    .groupby('churn', 'page') \
    .agg(avg('count').alias('AvgPageCount')) \
    .sort('page', 'churn') \
    .show(100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------------------+------------------+
|churn|               page|      AvgPageCount|
+-----+-------------------+------------------+
|    0|              About| 7.283196757423561|
|    1|              About|3.2461439588688945|
|    0|         Add Friend| 19.09927820691402|
|    1|         Add Friend|17.736643759698516|
|    0|    Add to Playlist|28.653205787976024|
|    1|    Add to Playlist|26.282099936748892|
|    0|          Downgrade|12.295119182746879|
|    1|          Downgrade|11.561235356762513|
|    0|              Error|2.3408214204674382|
|    1|              Error| 2.167547783651891|
|    0|               Help| 8.957388557388557|
|    1|               Help| 6.877219304826206|
|    0|               Home| 66.83783626072241|
|    1|               Home|  40.0530794165316|
|    0|              Login|          296350.0|
|    0|             Logout|14.109767160568492|
|    1|             Logout|13.556756756756757|
|    0|           NextSong| 953.9368445448752|
|    1|      

##### Selected per count features:
 - About
 - Thumbs Up
 - Roll Advert
 - Help
 - Home

In [90]:
show_delta('AboutCount', 3.2461439588688945, 7.283196757423561)
show_delta('ThumbsUpCount', 46.35935563816605, 54.88413262285376, True)
show_delta('RollAdvertCount', 22.040430674577017, 18.360894502803376, True)
show_delta('HelpCount', 6.877219304826206, 8.957388557388557)
show_delta('HomeCount', 40.0530794165316, 66.83783626072241)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Δ for AboutCount feature: -0.3834 (SELECTED)
Δ for ThumbsUpCount feature: -0.0842 (SELECTED)
Δ for RollAdvertCount feature: 0.0911 (SELECTED)
Δ for HelpCount feature: -0.1314 (SELECTED)
Δ for HomeCount feature: -0.2506 (SELECTED)

We have forced 2 features, *ThumbsUpCount* and *RollAdvertCount*, that do not comply with the *delta threshold condition*. Their delta is close to the threshold, on the other hand, our "intuition" tells us that these two features could play an important role in the ML model. 

## List of selected features (22 features)

In [123]:
spark.sparkContext.parallelize(selected_features).toDF(['delta', 'feature']) \
    .select('feature', 'delta', Fabs(col('delta'))) \
    .sort(desc('abs(delta)')) \
    .show(30, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+--------------------+-------------------+
|feature                |delta               |abs(delta)         |
+-----------------------+--------------------+-------------------+
|LogoutPerHour          |-0.7009141698785681 |0.7009141698785681 |
|AboutCount             |-0.38340983612658475|0.38340983612658475|
|ErrorPerSessionHour    |-0.3678621672272629 |0.3678621672272629 |
|SettingsPerSessionHour |0.34679195799886436 |0.34679195799886436|
|AvgSessionGap          |-0.3077974665435747 |0.3077974665435747 |
|DowngradePerSessionHour|0.3055908818093422  |0.3055908818093422 |
|SessionCount           |-0.29887121266740024|0.29887121266740024|
|HomeCount              |-0.2505802918282092 |0.2505802918282092 |
|RollAdvertPerHour      |0.21758242517045812 |0.21758242517045812|
|HomePerHour            |-0.21274324671988146|0.21274324671988146|
|SettingsPerHour        |0.1609912158149237  |0.1609912158149237 |
|SubmitUpgradePerHour   |0.14212149688291761 |0.14212149688291

## Preliminary fitting test using the selected features
### Random Forest method
(The model was fitted by the script submitted to the cluster.)

#### Evaluation

In [4]:
metrics = spark.read.json('s3n://amosvoron-sparkify/rfc-metrics-f22.json')
metrics.select('f1', 'accuracy', 'weightedPrecision', 'weightedRecall', 'AUC').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+------------------+-----------------+------------------+------------------+
|                f1|          accuracy|weightedPrecision|    weightedRecall|               AUC|
+------------------+------------------+-----------------+------------------+------------------+
|0.8386651060216977|0.8577847674679847|0.858875141192138|0.8577847674679848|0.7055267657733111|
+------------------+------------------+-----------------+------------------+------------------+

In [9]:
importances = spark.read.json('s3n://amosvoron-sparkify/rfc-feature-importances-f22.json')
importances.sort(desc('importance')).show(30, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+---------------------+
|feature                |importance           |
+-----------------------+---------------------+
|AvgSessionGap          |0.531326926781188    |
|RollAdvertPerHour      |0.07203686762635017  |
|DowngradePerHour       |0.06676894742434188  |
|SessionCount           |0.062376199033023286 |
|NonSongCount           |0.050628344100157416 |
|RollAdvertPerDay       |0.04571131362963781  |
|RollAdvertCount        |0.03022773918358667  |
|DowngradePerSessionHour|0.023059199682565503 |
|ThumbsDownPerHour      |0.021905197950220757 |
|HomeCount              |0.01935741525646098  |
|LogoutPerHour          |0.014225091004298893 |
|ThumbsUpCount          |0.014185526067462045 |
|SettingsPerSessionHour |0.011980284038583023 |
|AboutCount             |0.00900477412830605  |
|SubmitUpgradePerHour   |0.006285279375803763 |
|SettingsPerHour        |0.005847270443837536 |
|SubmitDowngradePerHour |0.004204600333886947 |
|UpgradePerSessionHour  |0.0030743231489

The preliminary test shows the promising result. There is still room for the feature/hyperparameter tuning. Some features can be dropped. We can see that the *AvgSessionGap* feature bears more than one half of the importance weight, perhaps too much. Will see if by feature tuning we can achieve more balanced importance weight distributed over the tunned features. 