# Feature Engineering
Summary:   
- Load and clean data based on conclusions from EDA process
- Generating features for monthly retention prediction 
- Event frequency features for last_1_day, last_3_day, last_7_day, last_14_day, last_30_day
- Recency feature: gaps between last usage and 05/01/2017
- Device feature: device the user hold
- Total play time feature: total music playing time for users


### 1. Load and filter data

In [1]:
from pyspark import SparkContext
import pyspark.sql.functions as F
# We use matplotlib for plotting
import matplotlib.pyplot as plt

# This statement allow to display plot without asking to
%matplotlib inline

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Feature_Engineer") \
    .config("Spark.some.config.option", "Value") \
    .getOrCreate()

In [3]:
df = spark.read.csv('../data/event_ds.csv',header=True).cache()
df 

DataFrame[uid: string, event: string, device: string, song_id: string, song_type: string, date: string, play_time: string, song_length: string, paid_flag: string]

In [4]:
df.show(5)

+---------+-----+------+-------+---------+----------+---------+-----------+---------+
|      uid|event|device|song_id|song_type|      date|play_time|song_length|paid_flag|
+---------+-----+------+-------+---------+----------+---------+-----------+---------+
|167923158|    P|    ar| 985228|        0|2017-04-01|      251|        251|        0|
|168045107|    P|    ar|6818758|        0|2017-04-01|        0|       1683|        0|
|167580792|    P|    ar|6989313|        0|2017-04-01|      113|        113|        0|
|167575198|    P|    ar|9953526|        0|2017-04-01|      277|        277|        0|
|168045107|    P|    ar|6796145|        0|2017-04-01|        0|       1799|        0|
+---------+-----+------+-------+---------+----------+---------+-----------+---------+
only showing top 5 rows



In [5]:
df = df.withColumn('date',F.col('date').cast('date'))
df = df.withColumn('play_time',F.col('play_time').cast('int'))
df = df.withColumn('song_length',F.col('song_length').cast('int'))
df

DataFrame[uid: string, event: string, device: string, song_id: string, song_type: string, date: date, play_time: int, song_length: int, paid_flag: string]

In [6]:
df = df.filter((F.col('song_length') > 0) | (F.col('event') != 'P'))

In [7]:
df.show(5)

+---------+-----+------+-------+---------+----------+---------+-----------+---------+
|      uid|event|device|song_id|song_type|      date|play_time|song_length|paid_flag|
+---------+-----+------+-------+---------+----------+---------+-----------+---------+
|167923158|    P|    ar| 985228|        0|2017-04-01|      251|        251|        0|
|168045107|    P|    ar|6818758|        0|2017-04-01|        0|       1683|        0|
|167580792|    P|    ar|6989313|        0|2017-04-01|      113|        113|        0|
|167575198|    P|    ar|9953526|        0|2017-04-01|      277|        277|        0|
|168045107|    P|    ar|6796145|        0|2017-04-01|        0|       1799|        0|
+---------+-----+------+-------+---------+----------+---------+-----------+---------+
only showing top 5 rows



In [8]:
df_total_play_time = df.filter(F.col('event') == 'P').groupBy('uid', 'date').agg(
    F.sum(F.col('play_time')).alias('total_play_time')
)

In [14]:
abnormal_id = set(df_total_play_time.filter((F.col('total_play_time') > 24 * 60 * 60)).select('uid').toPandas()['uid'])
df_filtered = df.filter(~F.col('uid').isin(abnormal_id))


### 2. Label definition

In [73]:
import datetime
from dateutil import parser

label_window_size = 12
label_window_end_date = parser.parse('2017-05-12').date()
label_window_start_date = label_window_end_date - datetime.timedelta(label_window_size - 1)
print('label window:',label_window_start_date,'~',label_window_end_date,'days:',label_window_size)

feature_window_size = 30
feature_window_end_date = label_window_start_date - datetime.timedelta(1)
feature_window_start_date = feature_window_end_date  - datetime.timedelta(feature_window_size - 1)
print('feature window:',feature_window_start_date,'~',feature_window_end_date,'days:',feature_window_size)

label window: 2017-05-01 ~ 2017-05-12 days: 12
feature window: 2017-04-01 ~ 2017-04-30 days: 30


In [74]:
# all the uid I will model
df_model_uid = df_filtered.filter((F.col('date')>=feature_window_start_date) & (F.col('date')<=feature_window_end_date))\
                    .select('uid').distinct()
# active in label window (active label=0)
df_active_uid_in_label_window = df_filtered.filter((F.col('date')>=label_window_start_date) & (F.col('date')<=label_window_end_date))\
                            .select('uid').distinct().withColumn('label',F.lit(0))

In [75]:
df_active_uid_in_label_window.show()

+---------+-----+
|      uid|label|
+---------+-----+
|167833570|    0|
|167935507|    0|
|167635050|    0|
|167677985|    0|
|167672887|    0|
|167883821|    0|
|167774834|    0|
|167794834|    0|
|167772496|    0|
|167718296|    0|
|167867356|    0|
|168034652|    0|
|167841001|    0|
|153361173|    0|
|167882566|    0|
|167792246|    0|
|167983710|    0|
|167979065|    0|
|167696079|    0|
|167851766|    0|
+---------+-----+
only showing top 20 rows



In [76]:
# prepare label data (churn label=1; active label=0)
df_label = df_model_uid.join(df_active_uid_in_label_window,on=['uid'],how='left')
df_label = df_label.fillna(1)

In [77]:
df_label.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|68074|
|    0|62366|
+-----+-----+



### 3. Feature generation

In [78]:
# event_data in feature_window
df_feature_window = df_filtered.filter((F.col('date')>=feature_window_start_date) & (F.col('date')<=feature_window_end_date))

#### 3.1 Frequency features

In [79]:
# define a function to generate frequency features for a list of time windows
# using when().otherwise(), and list comprehension trick!
def frequency_feature_generation_time_windows(df,event,time_window_list,snapshot_date):
    """
    generate frequency features for one event type and a list of time windows
    """
    df_feature = df \
        .filter(F.col('event')==event) \
        .groupBy('uid') \
        .agg(*[F.sum(F.when((F.col('date')>=snapshot_date-datetime.timedelta(time_window-1)) & (F.col('date')<=snapshot_date),1).otherwise(0)).alias('freq_'+event+'_last_'+str(time_window)) \
                for time_window in time_window_list]
            )# *[] opens list and make them comma separated
    return df_feature

In [80]:
# generate one event type, all time windows 
event = 'S'
time_window_list = [1,3,7,14,30]
snapshot_date = feature_window_end_date
df_feature = frequency_feature_generation_time_windows(df_feature_window,event,time_window_list,snapshot_date)
df_feature.show(5)

+---------+-------------+-------------+-------------+--------------+--------------+
|      uid|freq_S_last_1|freq_S_last_3|freq_S_last_7|freq_S_last_14|freq_S_last_30|
+---------+-------------+-------------+-------------+--------------+--------------+
|167718831|            0|           43|           47|            64|           136|
|167871297|            1|            3|            3|             3|            31|
|167979065|            0|            0|            0|             0|            14|
|167819127|            0|            0|            0|             0|            16|
|167823358|            1|            1|            1|             1|            30|
+---------+-------------+-------------+-------------+--------------+--------------+
only showing top 5 rows



In [81]:
# generate frequency features for all event_list, time_window_list
event_list = ['P','D','S']
time_window_list = [1,3,7,14,30]
df_feature_list = []
for event in event_list:
    df_feature_list.append(frequency_feature_generation_time_windows(df_feature_window,event,time_window_list,snapshot_date))


#### 3.2 Recency features

In [87]:
# defined as days from last event
# can generate one feature for each type of event
df_feature_window.groupBy('uid').agg(
   F.datediff(F.lit(label_window_start_date), F.max(F.col('date'))).alias('recency')
).show()


+---------+-------+
|      uid|recency|
+---------+-------+
|167979065|     13|
|167946935|      1|
|168071350|      1|
|167596545|     27|
|167586424|     29|
|167570658|      8|
|167819127|     22|
|167794834|      1|
|167696079|      1|
|167887737|      2|
|167981938|      2|
|167929480|     30|
|167641186|     27|
|167976118|      1|
|167587629|     28|
|167992594|      1|
|167587246|      1|
|167925319|     17|
|167762174|      3|
|168015877|      4|
+---------+-------+
only showing top 20 rows



In [90]:
df_recency = df_feature_window.groupBy('uid').agg(
   F.datediff(F.lit(label_window_start_date), F.max(F.col('date'))).alias('recency')
)

#### 3.3 Profile features

In [92]:
df_device = df_feature_window.select('uid', 'device').distinct()

#### 3.4 Total play time features

In [94]:
df_play_time = df_feature_window.filter(F.col('event') == 'P').groupBy('uid').agg(
    F.sum(F.col('play_time')).alias('total_play_time')
)


#### 3.5 Fancier frequency features

In [97]:
# generate counts of songs play 80% of their song length (using play_ds data) for different time window
df_feature_window.filter(F.col('event') == 'P').groupBy('uid').agg(
    F.sum(F.when((F.col('play_time') >= 0.8 * F.col('song_length')),1).otherwise(0)).alias('80% play counts')
).show()

+---------+---------------+
|      uid|80% play counts|
+---------+---------------+
|167979065|             34|
|167946935|            516|
|168071350|            276|
|167596545|             18|
|167586424|             22|
|167570658|            269|
|167819127|             67|
|167794834|            808|
|167696079|            217|
|167887737|            356|
|167981938|             44|
|167929480|              8|
|167641186|             75|
|167976118|            130|
|167587629|             12|
|167992594|             29|
|167587246|            424|
|167925319|            144|
|167762174|            151|
|168015877|            104|
+---------+---------------+
only showing top 20 rows



In [98]:
df_over80play_counts = df_feature_window.filter(F.col('event') == 'P').groupBy('uid').agg(
    F.sum(F.when((F.col('play_time') >= 0.8 * F.col('song_length')),1).otherwise(0)).alias('80% play counts')
)

### 4. Form training data

In [111]:
def join_feature_data(df_master,df_feature_list):
    for df_feature in df_feature_list:
        df_master = df_master.join(df_feature,on='uid',how='left')
        df_master.persist() # uncomment if number of joins is too many
    return df_master
    

In [112]:
# join all features
df_model_final = join_feature_data(df_label,df_feature_list)
df_model_final = join_feature_data(df_model_final,[df_recency])
df_model_final = join_feature_data(df_model_final,[df_device])
df_model_final = join_feature_data(df_model_final,[df_play_time])
df_model_final = join_feature_data(df_model_final,[df_over80play_counts])

In [113]:
df_model_final.fillna(0).toPandas().to_csv('../data/df_model_final.csv',index=False)