# Sparkify project - Feature engineering 
In the previous notebook, an extensive data exploration has been done to identify the different information in the dataset and how the information spreads and varies across users. Some categorical variables could be extracted from parsing some of the fields such as State or Device/experience used to access the platform. 
The user registration date distribution has also shown a wide distribution across the users.
In this notebook, I will combine the gender categorical feature, the behavioral data related to page visited, and teh session activity numbers.
This step is preliminary to the modeling of churn prediction.
Once the feature dataset is built, it will be used for teh modeling part in the next notenbook. 

In [1]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import udf, concat, lit, col , explode, array, least, dense_rank
from pyspark.sql.functions import isnan, when
from pyspark.sql.functions import sum as Fsum 
from pyspark.sql.functions import min as Fmin
from pyspark.sql.functions import max as Fmax
from pyspark.sql.functions import unix_timestamp, to_timestamp, datediff
from pyspark.sql.functions import avg, stddev , count
from pyspark.sql.functions import asc, desc, log
from pyspark.sql.types import IntegerType, StringType, DoubleType, LongType, BooleanType, TimestampType, DateType
from pyspark.ml.feature import RegexTokenizer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier, LinearSVC, LogisticRegression, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import numpy as np
import pandas as pd 
from matplotlib import pyplot as plt 
import seaborn as sns 
import scipy.stats as sp
from datetime import datetime as dt 
from datetime import timedelta as td
import itertools
import time

In [2]:
# create a Spark session
spark = SparkSession.builder \
        .master("local") \
        .appName("Sparkify") \
        .getOrCreate()

In [3]:
# Utility functions
def load_df(filename):
    df = spark.read.json(data_path)
    df.persist()
    return df 

def drop_null_userId(df):
    df = df.filter("userId <> ''")
    return df 

# Creating device type
def get_device_type(x):
    if 'Macintosh' in x:
        return 'Mac'
    elif 'Windows' in x:
        return 'Windows'
    elif 'Linux' in x:
        return 'Android'
    elif 'iPhone' in x:
        return 'iPhone'
    elif 'iPad' in x:
        return 'iPad'
    else: 
        return 'Unknown'

get_device_udf = udf(lambda x:get_device_type(x), StringType())

def take_last_2_char(x):
    return x[-2:]
take_last_2_char_udf = udf(lambda x:take_last_2_char(x), StringType())

def add_columns(df):
    df = df.withColumn('device',get_device_udf(col('userAgent')))
    df = df.withColumn('experience',when(col('device').isin(['iPhone','iPad','Android']), 'Mobile').otherwise('Desktop'))
    df = df.withColumn('location_state', take_last_2_char_udf(col('location')))
    df = df.withColumn('state_group',when(~col('location_state').isin(['CA','TX','PA','FL']),'others').otherwise(col('location_state')))
    return df 
    

def clean_df(df):
    # drop rows where userId is null
    df = drop_null_userId(df)
    
    # convert timestamps fields to datetime with timestamp
    from_ts_to_date = udf(lambda x: dt.fromtimestamp(x / 1000.0), TimestampType())
    df = df.withColumn('registration_date', from_ts_to_date(col('registration')))
    df = df.withColumn('event_date', from_ts_to_date(col('ts')))
    
    # Adding minimum and maximum timestamp of activities per user
    df = df.withColumn('min_event_date', Fmin(col('event_date')).over(Window.partitionBy('UserId')))
    df = df.withColumn('max_event_date', Fmax(col('event_date')).over(Window.partitionBy('UserId')))
    
    # Getting the corrected registration date for users starting to use the platform before registration date.
    df = df.withColumn('registration_date_cor', least(col('registration_date'),col('min_event_date')))

    # Dropping users acquired after the start of the data collection window
    data_min_date = df.select('min_event_date').agg(Fmin('min_event_date')).collect()[0][0]
    df = df.where(col('registration_date_cor') <= data_min_date)
    
    return df 

def save_df_to_json(df, path):
    df.write.json(path)

# Loading and cleaning dataset

In [4]:
data_path = "mini_sparkify_event_data.json"

In [5]:
df = load_df(data_path)
df.persist()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [6]:
df = clean_df(df)
df.persist()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, registration_date: timestamp, event_date: timestamp, min_event_date: timestamp, max_event_date: timestamp, registration_date_cor: timestamp]

In [7]:
df = add_columns(df)
df.persist()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, registration_date: timestamp, event_date: timestamp, min_event_date: timestamp, max_event_date: timestamp, registration_date_cor: timestamp, device: string, experience: string, location_state: string, state_group: string]

In [8]:
df.select('userId').dropDuplicates().count()

220

# Feature extraction

### List of features to build
Let's try to predict churn with the following features aggregated at user level: 
- gender
- total number of active days
- tenure (or number of days of registration as of last day of the data collection window)
- total number of sessions
- total number of songs
- total listening length
- number of paid sessions
- total length of paid sessions
- number of free sessions
- total length of free sessions
- paid session share
- features related to page usage : 
    - number of next song 
    - number of add to playlist
    - number of Roll advert
    - number of submit upgrades
    - number of upgrades 
    - number of submit downgrades
    - number of downgrades 
    - number of added friends
    - number of errors
    - number of help page visits 
    - number of settings page visit
    - number of thumbs-up
    - number of thumbs-down
    - number of logins/logouts
    - number of home page bisit 
    - number of about page visit

In [9]:
# Creating the user list
features = df.select('userId').dropDuplicates()

## Categorical features

### Gender

In [10]:
gender = df.select('userId','gender')\
            .dropDuplicates()\
            .withColumn('gender_flag', when(col('gender')=='F',1).otherwise(0))\
            .drop('gender')\
            .withColumnRenamed('gender_flag','gender')
features = features.join(gender,on=['userId'], how='left')

## Numerical

In [11]:
# adding columns
data_max_date = df.select('event_date').agg(Fmax('event_date')).collect()[0][0]
df = df.withColumn('active_day', col("event_date").cast(DateType()))
df = df.withColumn('tenure',datediff(lit(data_max_date), col('registration_date_cor')))

In [12]:
# tenure 
tenure = df.select('userId','tenure').dropDuplicates()
features = features.join(tenure,on=['userId'], how='left')

In [14]:
# number of active days 
active_days = df.select('userId','active_day')\
                    .dropDuplicates()\
                    .groupby('userId')\
                    .count()\
                    .withColumnRenamed("count", "active_days_count")
features = features.join(active_days,on=['userId'], how='left')

In [15]:
# total number of sessions
session_count_per_user = df.select('userId','sessionId')\
                                .dropDuplicates()\
                                .groupby('userId')\
                                .count()\
                                .withColumnRenamed("count", "session_count")
features = features.join(session_count_per_user,on=['userId'], how='left')

In [16]:
# total number of songs per user
song_count_per_user = df.select('userId','active_day','song')\
                                .dropDuplicates()\
                                .groupby('userId')\
                                .count()\
                                .withColumnRenamed("count", "song_count")
features = features.join(song_count_per_user,on=['userId'], how='left')

In [17]:
# total length of active sessions
total_length = df.select('userId','length')\
                                .groupby('userId')\
                                .sum()\
                                .withColumnRenamed("sum(length)", "total_length")
features = features.join(total_length,on=['userId'], how='left')

In [19]:
# number of paid sessions
paid_sessions = df.select('userId', 'sessionId')\
                        .where(df.level == 'paid')\
                        .groupBy('userId')\
                        .count() \
                        .withColumnRenamed('count', 'paid_sessions')
features = features.join(paid_sessions,on=['userId'], how='left')

In [20]:
# length of paid sessions
paid_sessions_length = df.select('userId', 'length')\
                        .where(df.level == 'paid')\
                        .groupBy('userId')\
                        .sum() \
                        .withColumnRenamed('sum(length)', 'paid_sessions_length')
features = features.join(paid_sessions_length,on=['userId'], how='left')

In [21]:
# number of free sessions
free_sessions = df.select('userId', 'sessionId')\
                        .where(df.level == 'free')\
                        .groupBy('userId')\
                        .count() \
                        .withColumnRenamed('count', 'free_sessions')
features = features.join(free_sessions,on=['userId'], how='left')

In [22]:
# length of free sessions
free_sessions_length = df.select('userId', 'length')\
                        .where(df.level == 'free')\
                        .groupBy('userId')\
                        .sum() \
                        .withColumnRenamed('sum(length)', 'free_sessions_length')
features = features.join(free_sessions_length,on=['userId'], how='left')

In [23]:
# paid_session share
paid_session_share = paid_sessions.join(session_count_per_user, on=['userId'], how='left')\
                            .withColumn('paid_session_share',col('paid_sessions')/col('session_count'))\
                            .select('userId','paid_session_share')
features = features.join(paid_session_share,on=['userId'], how='left')

In [24]:
# page usage 
page_count = df.select("userId", "page").groupBy("userId").pivot("page") \
    .count().drop("Cancel", "Cancellation Confirmation").fillna(0)
page_count.persist()

DataFrame[userId: string, About: bigint, Add Friend: bigint, Add to Playlist: bigint, Downgrade: bigint, Error: bigint, Help: bigint, Home: bigint, Logout: bigint, NextSong: bigint, Roll Advert: bigint, Save Settings: bigint, Settings: bigint, Submit Downgrade: bigint, Submit Upgrade: bigint, Thumbs Down: bigint, Thumbs Up: bigint, Upgrade: bigint]

In [25]:
# joining page count to feature dataset
features = features.join(page_count, on=['userId'], how='left')
features = features.na.fill(0)

In [26]:
# creating the churn flag
df = df.withColumn('churn', when(df.page == 'Cancellation Confirmation', 1)\
                       .otherwise(0))
df = df.withColumn('user_churn_flag', Fmax('churn').over(Window.partitionBy('UserId')))
label = df.select('userId','user_churn_flag').dropDuplicates()

In [27]:
# adding the churn label to the final table 
final_dataset = features.join(label,on=['userId'],how='left')

In [28]:
# save the datframe for modeling 
save_df_to_json(final_dataset,'mini_sparkify_features.json')