# Feature engineering

Once I explored dataset I need to prepare data to be used with prediction model. To do this I need to create features and label columns.

In [14]:
# import libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import datetime
import pandas as pd
from pyspark.sql.window import Window

In [2]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify_Project") \
    .getOrCreate()

In [6]:
# loading cleaned dataset
path = "mini_sparkify_event_data_cleaned.json"
data = spark.read.json(path)

Let's start with label column. We need to detect all users that have cancelled their subscribtions. We can define churn using the Cancellation Confirmation events:

In [8]:
# Let's first separate users with Cancellation Confirmation events:

users_churn = (
    data.
    filter(data.page == "Cancellation Confirmation").
    select("userId").
    distinct().
    withColumn("cancelled", F.lit("1"))
)

In [11]:
users_churn.count()

52

In [10]:
# Now we can use users_churn df to add label column:

data_labels = (
    data.
    join(users_churn, ["userId"], "left").
    withColumn("cancelled", F.when(F.col("cancelled") == "1", "1").otherwise("0"))
)

Now let's build out the features to train model on. By analyzing different interactions users can make I came up with fallowing features:

- total time on a paid version 
- time on a paid or free version
- number of songs played
- number of Thumbs Down 
- number of Roll Advert
- number of Add to Playlist
- number of Add Friend
- number of Thumbs Up
- number of Errors

Let's implement this features:

In [16]:
# Total time on paid version / whole

user_window_desc = Window \
    .partitionBy('userID') \
    .orderBy(F.col('ts').desc()) 

user_window_asc = Window \
    .partitionBy('userID') \
    .orderBy(F.col('ts'))

time_on_paid_version = (
    data_labels.
    filter(F.col("level") == "paid").
    withColumn("beggining_ts", F.first("ts").over(user_window_asc)).
    withColumn("end_ts", F.first("ts").over(user_window_desc)).
    withColumn("time_on_paid_version", F.unix_timestamp("end_ts") - F.unix_timestamp("beggining_ts")).
    select("userId", "time_on_paid_version").
    distinct().
    withColumn("time_on_paid_version", F.when(F.col("time_on_paid_version").isNull(), 0).otherwise(F.col("time_on_paid_version")))
)

time_whole = (
    data_labels.
    withColumn("beggining_ts", F.first("ts").over(user_window_asc)).
    withColumn("end_ts", F.first("ts").over(user_window_desc)).
    withColumn("time_whole", F.unix_timestamp("end_ts") - F.unix_timestamp("beggining_ts")).
    select("userId", "time_whole").
    distinct()
)


In [17]:
# Other features:

data_features = (
    data_labels.
    groupBy("userId").
    agg(
        F.count(F.when(F.col("page") == "NextSong", True)).alias("number_of_songs_played"),
        F.count(F.when(F.col("page") == "Thumbs Down", True)).alias("number_of_thumbs_down"),
        F.count(F.when(F.col("page") == "Thumbs Up", True)).alias("number_of_thumbs_up"),
        F.count(F.when(F.col("page") == "Roll Advert", True)).alias("number_of_roll_advert"),
        F.count(F.when(F.col("page") == "Add to Playlist", True)).alias("number_of_add_to_playlist"),
        F.count(F.when(F.col("page") == "Add Friend", True)).alias("number_of_add_friend"),
        F.count(F.when(F.col("page") == "Error", True)).alias("number_of_errors"),
        F.first("cancelled").alias("label")
    ).
    join(time_on_paid_version, ["userId"], "left").
    withColumn("time_on_paid_version", F.when(F.col("time_on_paid_version").isNull(), 0).otherwise(F.col("time_on_paid_version"))).
    join(time_whole, ["userId"], "left")
)

In [18]:
data_features.orderBy("userId").show(10)

+------+----------------------+---------------------+-------------------+---------------------+-------------------------+--------------------+----------------+-----+--------------------+----------+
|userId|number_of_songs_played|number_of_thumbs_down|number_of_thumbs_up|number_of_roll_advert|number_of_add_to_playlist|number_of_add_friend|number_of_errors|label|time_on_paid_version|time_whole|
+------+----------------------+---------------------+-------------------+---------------------+-------------------------+--------------------+----------------+-----+--------------------+----------+
|    10|                   673|                    4|                 37|                    1|                        9|                  12|               0|    0|             3670168|   3670168|
|   100|                  2682|                   27|                148|                   25|                       61|                  49|               3|    0|             5098287|   5098287|
|100001|  

In [19]:
data_features.count()

225

In [None]:
# It looks good. Let's save dataset with features:

In [20]:
path_save = "mini_sparkify_data_features.json"
data_features.coalesce(1).write.mode("overwrite").format('json').save(path_save)