# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [95]:
# import libraries
import datetime
import numpy as np
import time
import pandas as pd
import pyspark.sql.functions as F

from lightgbm import LGBMClassifier
from sklearn.feature_selection import SelectFromModel, RFE
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler, OneHotEncoderEstimator, StringIndexer, VectorAssembler # PCA, IDF, 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col, udf
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import IntegerType, StringType, TimestampType

In [2]:
# create a Spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Sparkify") \
    .getOrCreate()

# Create Feature Dataframe
Using the `mini_sparkify_event_data.json` to create the features relevant for churn prediction. Here we use the information gained during the data exploration and visualization.

In [3]:
events = spark.read.json("mini_sparkify_event_data.json")

The userId contains values with an empty string. These entries need to be removed.

In [4]:
# events where a userId is an empty string are not valid, remove these
valid_events = events.where(col("userId") != "")
print("Number of total events: {}; number of valid events {}".format(events.count(), valid_events.count()))
print("Number of users: {}".format(valid_events.select("userId").distinct().count()))

Number of total events: 286500; number of valid events 278154
Number of users: 225


In [5]:
# udf for transforming timestamp to datetime
get_date = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0), TimestampType()) # udf to convert to timestamp/date

In [6]:
# create column with the date when the log entry was done
valid_events = valid_events.withColumn("log_date", get_date(col("ts")))

In [7]:
# udf that defines churn
find_churn = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())

In [8]:
# add a column "churn" to the dataframe indicating that a cancellation was confirmed
valid_events = valid_events.withColumn("churn", find_churn(col("page")))

In [9]:
# dataframe for the lengths users listen to music
user_length = valid_events.groupBy("userId").agg({"length": "mean"}) \
    .withColumnRenamed("avg(length)", "avg_length")

In [10]:
# some values can be set into relation to a certain time period, e.g. the duration the user is active
# hence we need to create a df with all users and their active time period
# first find the first and last log entry for each user and how much log entries exist per user (all actions)
time_df = valid_events.groupBy(["userId"]) \
    .agg(F.sum("churn").alias("churned"), F.min("log_date").alias("first_log"),
         F.max("log_date").alias("last_log"), F.count("page").alias("log_counts"), F.max("ts").alias("last_ts"))

In [11]:
def get_time_difference(date_1, date_2):
    """Caculates the time difference between to dates in days.
    
    :param date_1: First date
    :type: datetime instance
    :param date_2: Second date
    :type: datetime instance
    :return: Difference between the two dates in days
    """
    # difference between the dates
    delta = date_2 - date_1
    
    # minimum difference is one 1
    if delta.days == 0:
        return 1
    else:
        return delta.days

# register the function as a udf
get_time_difference_udf = udf(get_time_difference, IntegerType())

# add the duration as a time difference between first and last log date
time_df = time_df.withColumn("duration", get_time_difference_udf(col("first_log"), col("last_log"))) \
            .drop("first_log", "last_log").withColumnRenamed("churned", "label")

In [12]:
# create a dummy dataframe where each action (About, Thumbs Up, ...) from page is a new column with the number
# how often this action appeared in the data for each user
dummy_df = valid_events.select("userId", "page").groupBy("userId").pivot("page") \
    .count().drop("Cancel", "Cancellation Confirmation")
# fill null values
dummy_df = dummy_df.na.fill(0)

In [13]:
# last valid user level
user_level = valid_events.orderBy("log_date", ascending=False).groupBy("userId").agg(F.first("level").alias('valid_level'))

In [14]:
# gender of the users
user_gender = valid_events.select(["userId", "gender"]).distinct()

In [37]:
# calculate the total amount of days the user listened to music
songs_per_date = valid_events.withColumn("date", F.to_date(col("log_date"))).where(col("page") == "NextSong") \
    .groupBy(["userId", "date"]).agg(F.lit(1).alias("played_music"))
songs_per_day = songs_per_date.groupBy("userId").agg(F.sum("played_music").alias("music_days"))

In [38]:
# join all previous created dataframes
df = time_df.join(dummy_df, on="userId").join(user_level, on="userId") \
    .join(user_gender, on="userId").join(user_length, on="userId").join(songs_per_day, on="userId")

In [39]:
# divide the actions by the amount of logs or the overall duration of their registration
def divide_columns_by(df, columns, value, appendix):
    """Generic function for dividing pyspark columns by a certain value.
    
    :param df: Dataframe containing the columns to divide
    :type: Pyspark Dataframe
    :param columns: Name of the columns to divide
    :type: list[str]
    :param value: Value the columns are divided with
    :type: int or float
    :appendix: String that is added to the new columns
    :type: str
    :return: Pyspark dataframe with new columns containing the division
    """
    for name in columns:
        new_name = name+"_"+appendix
        df = df.withColumn(new_name, col(name) / col(value))
    return df

In [40]:
# variables which shall be divided by a certain value like the duration or log count
cols_to_divide = ['music_days', 'About', 'Add Friend', 'Add to Playlist', 'Downgrade', 'Error', 'Help', 'Home',
               'Logout', 'NextSong', 'Roll Advert', 'Save Settings', 'Settings', 'Submit Downgrade',
               'Submit Upgrade', 'Thumbs Down', 'Thumbs Up', 'Upgrade']

In [41]:
# dataframe with new columns for the values per duration
df_duration = divide_columns_by(df, cols_to_divide, "duration", "per_day")
# final feature dataframe also containing the values per log count
df_features = divide_columns_by(df_duration, cols_to_divide, "log_counts", "per_log")

In [None]:
# calculate the number of sessions per user and the average number of sessions per day
user_sessions = valid_events.groupBy(["userId"]).agg(F.countDistinct("sessionId").alias("number_sessions"))
df_features = df_features.join(user_sessions, on="userId")
df_features = df_features.withColumn("sessions_per_day", col("number_sessions") / col("duration"))

# ratio of days the user actually listened to music
df_features = df_features.withColumn("ratio_music_days", col("music_days") / col("duration"))

# ratio of songs the user pressed the "Thumbs Up" button
df_features = df_features.withColumn("like_ratio", col("Thumbs Up") / col("NextSong"))

# Feature Selection
Once the dataframe with all the features exist, the importance of the features is analyzed to find the ones that seem promising for the model. The feature importance is stored in a pandas dataframe and also saved in a csv-file. This csv-file is loaded before the modelling to decide which ones to keep and which ones to drop. For this 5 different features selection algorithms are implemented (see reference https://towardsdatascience.com/the-5-feature-selection-algorithms-every-data-scientist-need-to-know-3a6b566efd2).

Selecting features by:
* correlation
* feature importance of random forest classifier (select from model)
* recursive feature elimination
* feature importance of logistic regression (select from model)
* feature importance of light gbm classifier (select from model)

In [46]:
# convert the dataframe to pandas for feature analysis
pd_df = df_features.toPandas()
# maximum numbers of features to consider
num_features = 25

In [None]:
# split into numerical features and response variable
X = pd_df.drop(["label"], axis=1).select_dtypes(include=["int64", "int32", "float64", "datetime64[ns]"])
y = pd_df["label"]

In [47]:
def cor_selector(X, y,num_feats):
    """Selects features based on pearson correlation.
    
    :param X: Matrix with the feature values
    :type: Pandas Dataframe
    :param y: Response vector
    :type: Pandas Series
    :num_featues: Number of features to keep
    :type: Int
    :return:
        cor_support: Binary list for feature importance
        cor_feature: List of feature names
    """
    cor_list = []
    feature_name = X.columns.tolist()
    # calculate the correlation with y for each feature
    for i in X.columns.tolist():
        cor = np.corrcoef(X[i], y)[0, 1]
        cor_list.append(cor)
    # replace NaN with 0
    cor_list = [0 if np.isnan(i) else i for i in cor_list]
    # feature name
    cor_feature = X.iloc[:,np.argsort(np.abs(cor_list))[-num_feats:]].columns.tolist()
    # feature selection? 0 for not select, 1 for select
    cor_support = [True if i in cor_feature else False for i in feature_name]
    return cor_support, cor_feature

In [49]:
# select features on correlation
cor_support, cor_features = cor_selector(X, y, num_features)

In [50]:
def select_from_model(X, y, num_features):
    """Selects features based on random forest classifier.
    
    :param X: Matrix with the feature values
    :type: Pandas Dataframe
    :param y: Response vector
    :type: Pandas Series
    :num_featues: Number of features to keep
    :type: Int
    :return:
        embeded_rf_support: Binary list for feature importance
        embeded_rf_feature: List of feature names
    """
    embeded_rf_selector = SelectFromModel(RandomForestClassifier(n_estimators=100), max_features=num_features)
    embeded_rf_selector.fit(X, y)

    embeded_rf_support = embeded_rf_selector.get_support()
    embeded_rf_feature = X.loc[:,embeded_rf_support].columns.tolist()
    print(str(len(embeded_rf_feature)), 'selected features')
    return embeded_rf_support, embeded_rf_feature

In [51]:
# select features from random forest model
embeded_rf_support, embeded_rf_feature = select_from_model(X, y, num_features)

16 selected features


In [52]:
def select_by_rfe(X, y, num_features):
    """Selects features based on recursive feature elimination.
    
    :param X: Matrix with the feature values
    :type: Pandas Dataframe
    :param y: Response vector
    :type: Pandas Series
    :num_featues: Number of features to keep
    :type: Int
    :return:
        rfe_support: Binary list for feature importance
        rfe_feature: List of feature names
    """
    X_norm=(X-X.mean())/X.std()
    rfe_selector = RFE(estimator=LogisticRegression(), n_features_to_select=num_features, step=10, verbose=5)
    rfe_selector.fit(X_norm, y)
    rfe_support = rfe_selector.get_support()
    rfe_feature = X.loc[:,rfe_support].columns.tolist()
    print(str(len(rfe_feature)), 'selected features')
    return rfe_support, rfe_feature

In [53]:
# select features from rfe
rfe_support, rfe_feature = select_by_rfe(X, y, num_features)

Fitting estimator with 62 features.
Fitting estimator with 52 features.
Fitting estimator with 42 features.
Fitting estimator with 32 features.
25 selected features




In [54]:
def select_from_model_lr(X, y, num_features):
    """Selects features based on logistic regression.
    
    :param X: Matrix with the feature values
    :type: Pandas Dataframe
    :param y: Response vector
    :type: Pandas Series
    :num_featues: Number of features to keep
    :type: Int
    :return:
        embeded_lr_support: Binary list for feature importance
        embeded_lr_feature: List of feature names
    """
    X_norm=(X-X.mean())/X.std()
    embeded_lr_selector = SelectFromModel(LogisticRegression(penalty="l1"), max_features=num_features)
    embeded_lr_selector.fit(X_norm, y)

    embeded_lr_support = embeded_lr_selector.get_support()
    embeded_lr_feature = X.loc[:,embeded_lr_support].columns.tolist()
    print(str(len(embeded_lr_feature)), 'selected features')
    return embeded_lr_support, embeded_lr_feature

In [55]:
# select features from logistic 
embeded_lr_support, embeded_lr_feature = select_from_model_lr(X, y, num_features)

25 selected features




In [56]:
def select_by_lgbm(X, y, num_features):
    """Selects features based on LGBM Classifier.
    
    :param X: Matrix with the feature values
    :type: Pandas Dataframe
    :param y: Response vector
    :type: Pandas Series
    :num_featues: Number of features to keep
    :type: Int
    :return:
        embeded_lgb_support: Binary list for feature importance
        embeded_lgb_feature: List of feature names
    """
    lgbc=LGBMClassifier(n_estimators=500, learning_rate=0.05, num_leaves=32, colsample_bytree=0.2,
                reg_alpha=3, reg_lambda=1, min_split_gain=0.01, min_child_weight=40)

    embeded_lgb_selector = SelectFromModel(lgbc, max_features=num_features)
    embeded_lgb_selector.fit(X, y)

    embeded_lgb_support = embeded_lgb_selector.get_support()
    embeded_lgb_feature = X.loc[:,embeded_lgb_support].columns.tolist()
    print(str(len(embeded_lgb_feature)), 'selected features')
    return embeded_lgb_support, embeded_lgb_feature

In [57]:
# select features from lgbm
embeded_lgb_support, embeded_lgb_feature = select_by_lgbm(X, y, num_features)

25 selected features


In [58]:
# prepare feature importance dataframe
feature_name = X.columns
feature_selection_df = pd.DataFrame({'Feature':feature_name, 'Pearson':cor_support, 'RFE':rfe_support, 'Logistics':embeded_lr_support,
                                    'Random Forest':embeded_rf_support, 'LightGBM':embeded_lgb_support})
# count the selected times for each feature
feature_selection_df['Total'] = np.sum(feature_selection_df, axis=1)
# sort values according to importance
feature_selection_df = feature_selection_df.sort_values(['Total','Feature'] , ascending=False)

In [64]:
# display feature importance
feature_selection_df.reset_index().head(len(feature_name))

Unnamed: 0,index,Feature,Pearson,RFE,Logistics,Random Forest,LightGBM,Total
0,1,last_ts,True,True,True,True,True,5
1,2,duration,True,True,True,True,True,5
2,21,music_days,True,True,True,False,True,4
3,55,Thumbs Down_per_log,True,True,True,True,False,4
4,36,Submit Upgrade_per_day,True,True,True,True,False,4
5,49,NextSong_per_log,True,True,True,True,False,4
6,24,Add Friend_per_day,True,True,True,False,True,4
7,3,About,True,True,True,False,True,4
8,40,music_days_per_log,False,True,True,True,False,3
9,22,music_days_per_day,True,False,False,True,True,3


In [65]:
# store feature importance dataframe as csv-file
feature_selection_df.to_csv("feature_selection_df.csv")