# Sparkify Churn Prediction

## 1. Load and Clean Dataset
Loading and cleaning the dataset, checking for invalid or missing data - for example, records without user IDs or session IDs.

### 1.1. Importing libraries and setting up notebook

In [5]:
# Import libraries

from itertools import chain

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, udf, from_unixtime, year, weekofyear, substring, encode, decode, split, desc, avg, first, concat_ws, countDistinct, sum as Fsum, max as Fmax, min as Fmin
from pyspark.sql.types import StringType, LongType, IntegerType, DateType, TimestampType
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, PCA
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.pipeline import PipelineModel

from user_agents import parse

### 1.2. Loading data

In [6]:
# Create spark session

spark = SparkSession \
    .builder \
    .appName('SparkifyCluster') \
    .getOrCreate()

In [7]:
# Read sparkify dataset

filepath = 'mini_sparkify_event_data.json'
df_log = spark.read.json(filepath)

In [8]:
print('df_log shape:', (df_log.count(), len(df_log.columns)))

df_log shape: (286500, 18)


### 1.3. Adjusting data types

In [9]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [10]:
# Adjust the data types

df_log = df_log \
    .withColumn('registration', from_unixtime(col('registration')/1000).cast(TimestampType())) \
    .withColumn('status', col('status').cast(StringType())) \
    .withColumn('ts', from_unixtime(col('ts')/1000).cast(TimestampType())) \
    .withColumn('userId', col('userId').cast(LongType()))

### 1.4. Removing missing data

In [11]:
df_log_valid = df_log.dropna(how='any', subset=['userId', 'sessionId'])

old_rows, old_columns = df_log.count(), df_log.columns
new_rows, new_columns = df_log_valid.count(), df_log_valid.columns

print('df_log shape:\t\t', (old_rows, len(old_columns)))
print('df_log_valid shape:\t', (new_rows, len(new_columns)))
print('{} rows with empty user and session IDs removed'.format(old_rows - new_rows))

df_log shape:		 (286500, 18)
df_log_valid shape:	 (278154, 18)
8346 rows with empty user and session IDs removed


### 1.5. Correcting encoding

In [12]:
# Show example of string with wrong encoding

record = '''userId == 30
            and sessionId == 29
            and itemInSession == 68'''

encoding_example = df_log_valid.where(record).collect()[0][0]

print('String should be:\t Björk')
print('String currently is:\t {}'.format(encoding_example))

String should be:	 Björk
String currently is:	 BjÃÂ¶rk


In [13]:
def encode_decode_column(df, column, encoding, decoding):
    """Encode a column from a dataframe and then decode it.
    
    Parameters:
        df (Spark dataframe): The dataframe that contains the column.
        column (String): The name of the column to be encoded and decoded.
        encoding (String): The charset of the encoding (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16').
        decoding (String): The charset of the decoding (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16').
    
    Returns:
        df (Spark dataframe): The dataframe with the column properly encoded and decoded
        
    Example:
        df = encode_decode_column(df, 'column_name', 'ISO-8859-1', 'UTF-8')
    """
    
    df = df.withColumn(column, encode(column, encoding))
    df = df.withColumn(column, decode(column, decoding))
    
    return df

In [14]:
categorical_columns = list(filter(lambda c: c[1] == 'string', df_log_valid.dtypes))
categorical_columns = [item[0] for item in categorical_columns]

In [15]:
# Fix the wrong encoding for the columns that are strings,
# and in order to retrieve the correct characters the encode-decode process must be done twice.

for column in categorical_columns:
    df_log_valid = encode_decode_column(df_log_valid, column, 'ISO-8859-1', 'UTF-8')
    df_log_valid = encode_decode_column(df_log_valid, column, 'ISO-8859-1', 'UTF-8')

In [16]:
# Test if the previous example is correct now

encoding_example_fixed = df_log_valid.where(record).collect()[0][0]

print('String should be:\t Björk')
print('String currently is:\t {}'.format(encoding_example_fixed))

String should be:	 Björk
String currently is:	 Björk


### 1.6. Parsing user agent

In [17]:
# Create the parsing functions with the user_agents library

get_browser     = udf(lambda x: parse(x).browser.family, StringType())
get_os          = udf(lambda x: parse(x).os.family, StringType())
get_device      = udf(lambda x: parse(x).device.family, StringType())
get_is_phone    = udf(lambda x: 1 if parse(x).is_mobile else 0, IntegerType())
get_is_tablet   = udf(lambda x: 1 if parse(x).is_tablet else 0, IntegerType())
get_is_computer = udf(lambda x: 1 if parse(x).is_pc else 0, IntegerType())

In [18]:
df_log_valid = df_log_valid \
    .withColumn('browser', get_browser('userAgent')) \
    .withColumn('os', get_os('userAgent')) \
    .withColumn('device', get_device('userAgent')) \
    .withColumn('isPhone', get_is_phone('userAgent')) \
    .withColumn('isTablet', get_is_tablet('userAgent')) \
    .withColumn('isComputer', get_is_computer('userAgent'))

## 2. Exploratory Data Analysis

### 2.1. Main stats

In [19]:
distinct_user_ids = df_log_valid.dropDuplicates(['userId']).count()
print('Distinct user IDs:', distinct_user_ids)

Distinct user IDs: 225


### 2.3. Defining churn
Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

In [20]:
# Flag the churn

flag_cancellation_event = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())
df_log_valid = df_log_valid.withColumn('churn', flag_cancellation_event('page'))

In [21]:
window_val = Window.partitionBy('userId').orderBy(desc('ts')).rangeBetween(Window.unboundedPreceding, Window.currentRow)
df_log_valid = df_log_valid.withColumn('churned', Fsum('churn').over(window_val))

In [22]:
df_log_valid \
    .select('userId', 'churned') \
    .dropDuplicates(['userId']) \
    .groupBy('churned') \
    .count() \
    .show()

+-------+-----+
|churned|count|
+-------+-----+
|      0|  173|
|      1|   52|
+-------+-----+



In [23]:
# Add the cohort

df_log_valid = df_log_valid.withColumn('cohort', substring('registration', 1, 7))

## 3. Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.

- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

### 3.1. Building users features matrix

In [24]:
df_users_features = df_log_valid.select('userId', 'churned').dropDuplicates(['userId'])

In [25]:
# Create year and week columns to aggregate data in the next steps

df_log_valid = df_log_valid \
    .withColumn('year', year(col('ts').cast(DateType()))) \
    .withColumn('week', weekofyear(col('ts').cast(DateType()))) \
    .withColumn('yearWeek', concat_ws('-', col('year'), col('week')))

In [26]:
# Add the avg. songs per week

df_avg_songs_week = df_log_valid \
    .where('page = "NextSong"') \
    .groupby('userId','yearWeek') \
    .count() \
    .groupBy('userId') \
    .agg(avg('count').alias('avgSongsWeek'))

In [27]:
# Add the avg. sessions per week

df_avg_sessions_week = df_log_valid \
    .groupby('userId','yearWeek') \
    .agg(countDistinct('sessionId').alias('sessions')) \
    .groupBy('userId') \
    .agg(avg('sessions').alias('avgSessionsWeek'))

In [28]:
# Add the avg. session duration

df_avg_session_duration = df_log_valid \
    .groupby('userId','sessionId') \
    .agg(Fmin('ts').alias('start'), Fmax('ts').alias('end')) \
    .withColumn('sessionDuration', col('end').cast(LongType()) - col('start').cast(LongType())) \
    .groupBy('userId') \
    .agg(avg('sessionDuration').alias('avgSessionDuration'))

In [29]:
# Add the cohort

df_cohort = df_log_valid \
    .withColumn('cohort', substring('registration', 1, 7)) \
    .select('userId', 'cohort') \
    .dropDuplicates(['userId'])

In [30]:
# Add the average song length

df_length = df_log_valid \
    .groupBy('userId') \
    .agg(avg('length').alias('length')) \

In [31]:
# Add the most recent metropolitan area

df_metro_area = df_log_valid \
    .withColumn('metropolitanArea', split('location', ',')[0]) \
    .orderBy(desc('ts')) \
    .groupBy('userId') \
    .agg(first('metropolitanArea').alias('metropolitanArea'))

In [32]:
# Add the most recent state

df_state = df_log_valid \
    .withColumn('state', split('location', ',')[1]) \
    .orderBy(desc('ts')) \
    .groupBy('userId') \
    .agg(first('state').alias('state'))

In [33]:
# Add the most recent gender

df_gender = df_log_valid \
    .orderBy(desc('ts')) \
    .groupBy('userId') \
    .agg(first('gender').alias('gender'))

In [34]:
# Add the most recent level

df_level = df_log_valid \
    .orderBy(desc('ts')) \
    .groupBy('userId') \
    .agg(first('level').alias('level'))

In [35]:
# Add the most used browser

df_browser = df_log_valid \
    .select('userId', 'browser') \
    .groupBy('userId', 'browser') \
    .count() \
    .orderBy(desc('count')) \
    .groupBy('userId') \
    .agg(first('browser').alias('browser'))

In [36]:
# Add the most used OS

df_os = df_log_valid \
    .select('userId', 'os') \
    .groupBy('userId', 'os') \
    .count() \
    .orderBy(desc('count')) \
    .groupBy('userId') \
    .agg(first('os').alias('os'))

In [37]:
# Add the most used device

df_device = df_log_valid \
    .select('userId', 'device') \
    .groupBy('userId', 'device') \
    .count() \
    .orderBy(desc('count')) \
    .groupBy('userId') \
    .agg(first('device').alias('device'))

In [38]:
# Add the most used is phone

df_is_phone = df_log_valid \
    .select('userId', 'isPhone') \
    .groupBy('userId', 'isPhone') \
    .count() \
    .orderBy(desc('count')) \
    .groupBy('userId') \
    .agg(first('isPhone').alias('isPhone'))

In [39]:
# Add the most used is tablet

df_is_tablet = df_log_valid \
    .select('userId', 'isTablet') \
    .groupBy('userId', 'isTablet') \
    .count() \
    .orderBy(desc('count')) \
    .groupBy('userId') \
    .agg(first('isTablet').alias('isTablet'))

In [40]:
# Add the most used is computer

df_is_computer = df_log_valid \
    .select('userId', 'isComputer') \
    .groupBy('userId', 'isComputer') \
    .count() \
    .orderBy(desc('count')) \
    .groupBy('userId') \
    .agg(first('isComputer').alias('isComputer'))

In [41]:
df_users_features = df_users_features \
    .join(df_avg_songs_week, on='userId') \
    .join(df_avg_sessions_week, on='userId') \
    .join(df_avg_session_duration, on='userId') \
    .join(df_cohort, on='userId') \
    .join(df_length, on='userId') \
    .join(df_metro_area, on='userId') \
    .join(df_state, on='userId')

In [41]:
df_users_features = df_users_features \
    .join(df_gender, on='userId') \
    .join(df_level, on='userId') \
    .join(df_browser, on='userId') \
    .join(df_os, on='userId') \
    .join(df_device, on='userId') \
    .join(df_is_phone, on='userId') \
    .join(df_is_tablet, on='userId') \
    .join(df_is_computer, on='userId')

In [None]:
df_users_features.show()

## 4. Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

### 4.1. Processing features

In [43]:
feature_vector    = 'features'
target_vector     = 'label'
prediction_vector = 'prediction'
seed = 0

In [44]:
df_processed = df_users_features \
    .drop('userId', 'metropolitanArea', 'state') \
    .withColumnRenamed('churned', target_vector)

In [45]:
df_names = df_users_features \
    .drop('userId', 'metropolitanArea', 'state') \
    .withColumnRenamed('churned', target_vector)

In [46]:
indexer_level   = StringIndexer(inputCol='level',   outputCol='levelIndex')
indexer_gender  = StringIndexer(inputCol='gender',  outputCol='genderIndex')
indexer_cohort  = StringIndexer(inputCol='cohort',  outputCol='cohortIndex')
indexer_browser = StringIndexer(inputCol='browser', outputCol='browserIndex')
indexer_device  = StringIndexer(inputCol='device',  outputCol='deviceIndex')
indexer_os      = StringIndexer(inputCol='os',      outputCol='osIndex')

In [47]:
ohe_inputs  = ['levelIndex', 'genderIndex', 'cohortIndex', 'browserIndex', 'deviceIndex', 'osIndex']
ohe_outputs = ['levelOhe',   'genderOhe',   'cohortOhe',   'browserOhe',   'deviceOhe',   'osOhe']

In [48]:
one_hot_encoder = OneHotEncoderEstimator(inputCols=ohe_inputs, outputCols=ohe_outputs)

In [49]:
va_inputs = ['avgSongsWeek',
             'avgSessionsWeek',
             'avgSessionDuration',
             'length',
             'isPhone',
             'isTablet',
             'isComputer',
             'levelOhe',
             'genderOhe',
             'cohortOhe',
             'browserOhe',
             'deviceOhe',
             'osOhe']

In [50]:
vector_assembler = VectorAssembler(inputCols=va_inputs, outputCol=feature_vector)

In [51]:
pipeline_process = Pipeline(stages=[
    indexer_level,
    indexer_gender,
    indexer_cohort,
    indexer_browser,
    indexer_device,
    indexer_os,
    one_hot_encoder,
    vector_assembler
])

In [52]:
%%time
df_processed = pipeline_process.fit(df_processed).transform(df_processed)

CPU times: user 2.37 s, sys: 1.66 s, total: 4.04 s
Wall time: 5min 58s


### 4.2. Establishing a baseline

In [53]:
df_train, df_test = df_processed.randomSplit((0.8, 0.2), seed=seed)

### 4.3. Tunning parameters

In [54]:
rf = RandomForestClassifier(featuresCol=feature_vector, labelCol=target_vector, predictionCol=prediction_vector, seed=seed)

In [55]:
estimator = Pipeline(stages=[rf])

In [56]:
grid = ParamGridBuilder().addGrid(rf.numTrees, [20]) \
                         .addGrid(rf.maxDepth, [5]) \
                         .build()

In [57]:
metric = 'f1'

In [58]:
evaluator = MulticlassClassificationEvaluator(predictionCol=prediction_vector, labelCol=target_vector, metricName=metric)

In [59]:
cv = CrossValidator(estimator=estimator, estimatorParamMaps=grid, evaluator=evaluator, numFolds=3, seed=seed)

In [60]:
%%time
model = cv.fit(df_train)

CPU times: user 4.12 s, sys: 2.69 s, total: 6.8 s
Wall time: 13min 8s


In [367]:
%%time
score = evaluator.evaluate(model.transform(df_test))

best_model = model.bestModel
max_depth = best_model.stages[0].getOrDefault('maxDepth')
num_trees = best_model.stages[0].getOrDefault('numTrees')

print('The {} score on the test set is: {:.4%}'.format(metric.upper(), score))
print('Best parameters: maxDepth: {}, numTrees: {}'.format(max_depth, num_trees))

The F1 score on the test set is: 83.1727%
Best parameters: maxDepth: 5, numTrees: 20
CPU times: user 521 ms, sys: 342 ms, total: 862 ms
Wall time: 2min 16s


In [132]:
def print_feature_importances(best_model, features, decimal=4):

    scores = list(best_model.stages[0].featureImportances)
    
    # Sort features according to the highest score
    zipped_lists = zip(scores, features)
    sorted_zipped_lists = sorted(zipped_lists, reverse=True)
    unzipped_list = list(zip(*sorted_zipped_lists))
    scores, features = list(unzipped_list[0]), list(unzipped_list[1])
    
    # Define the name of the columns for the table header
    header_1_name = 'feature'
    header_2_name = 'importance'
    
    # Garantee the width of the columns are at least the same width of the header names
    width = len(max(features, key=len))
    if width < len(header_1_name):
        border_header_1 = len(header_1_name)
    else:
        border_header_1 = width
    
    if len('0.') + decimal < len(header_2_name):
        border_header_2 = len(header_2_name)
    else:
        border_header_2 = len('0.') + decimal
    
    # Number of characters to left align the strings
    align = border_header_2
    
    # Calculate number of spaces needed after the header name
    header_1_spaces = width - len(header_1_name)
    header_2_spaces = (len('0.') + decimal) - len(header_2_name)
    
    # Build border and header strings according to the width of the header names, features and decimal places
    border = '+' + '-'*border_header_1 + '+' + '-'*border_header_2 + '+'
    header = '|' + header_1_name + ' '*header_1_spaces + '|' + header_2_name + ' '*header_2_spaces + '|'
    
    # Print results in a fancy table
    print(border)
    print(header)
    print(border)
    for i in range(len(features)):
        print('|{:{width}}|{:<{align}.{decimal}f}|'.format(features[i], scores[i], width=width, align=align, decimal=decimal))
    print(border)

In [130]:
features = [attr['name'] for attr in (chain(*df_processed.schema[feature_vector].metadata['ml_attr']['attrs'].values()))]

In [133]:
print_feature_importances(best_model, features)

+------------------------+----------+
|feature                 |importance|
+------------------------+----------+
|length                  |0.2065    |
|avgSessionsWeek         |0.1813    |
|avgSessionDuration      |0.1505    |
|avgSongsWeek            |0.1490    |
|genderOhe_M             |0.0531    |
|browserOhe_Firefox      |0.0396    |
|levelOhe_paid           |0.0362    |
|deviceOhe_Mac           |0.0255    |
|cohortOhe_2018-08       |0.0234    |
|browserOhe_Chrome       |0.0225    |
|cohortOhe_2018-07       |0.0169    |
|osOhe_Windows           |0.0140    |
|osOhe_iOS               |0.0139    |
|cohortOhe_2018-09       |0.0136    |
|deviceOhe_Other         |0.0108    |
|browserOhe_Safari       |0.0107    |
|browserOhe_Mobile Safari|0.0082    |
|cohortOhe_2018-06       |0.0080    |
|osOhe_Linux             |0.0057    |
|osOhe_Mac OS X          |0.0037    |
|cohortOhe_2018-11       |0.0034    |
|isComputer              |0.0024    |
|deviceOhe_iPhone        |0.0009    |
|isPhone    