## Work Summary

1. Load data into Spark DataFrame and perform brief data exploration
2. Label definition
3. Data cleaning  
  3.1 Data processing of "song_length"  
  3.2 Data processing of "play_time"  
4. Feature generation  
  4.1 Generate Features A: Play time percentage related features  
  4.1.1 Generate Features A1: Proportion features of different level of play time percentage  
  4.1.2 Generate Features A2: Acceleration features of different level of play time percentage    
  
  4.2 Generate Feature B: Play_time related feature  
  4.2.1 Generate Features B1: Total play_time  
  4.2.2 Generate Features B2: Acceleration features of total play_time  
  4.2.3 Generate Features B3: Average play_time of songs  
  
  4.3 Generate Features C: Event related features  
  4.3.1 Generate Event features C1: events frequency in given windows  
  4.3.2 Generate Event features C2:  Ratio of event frequency of nearest 7 days to that of nearest 30 days  
  
  4.4 Generate Features D: Recency related features  
  4.4.1 Generate Recency features D1: Last Event Time from feature_window_end_date  
  
  4.5 Generate Features E: Profile related features  
  4.5.1 Generate Profile features E1: device_feature  
5. Form training data for prediction models  
6. Form rating data for recommendation system  

## 1. Load data into Spark DataFrame and perform brief data exploration

#### Load data into Spark DataFrame

In [1]:
from pyspark import SparkContext 

In [2]:
import pyspark.sql.functions as F

In [3]:
df = spark.read.csv('../data/event_ds.csv',header=True).cache()
# cache(): run only when there is a action
df

DataFrame[uid: string, event: string, device: string, song_id: string, date: string, play_time: string, song_length: string]

In [4]:
# create new or overwrite original field with withColumn
df = df.withColumn('date',F.col('date').cast('date'))
df

DataFrame[uid: string, event: string, device: string, song_id: string, date: date, play_time: string, song_length: string]

In [5]:
df.show(5)

+---------+-----+------+-------+----------+---------+-----------+
|      uid|event|device|song_id|      date|play_time|song_length|
+---------+-----+------+-------+----------+---------+-----------+
|168540455|    P|    ar| 298250|2017-03-30|      189|        190|
|168535490|    P|    ar|6616004|2017-03-30|      283|        283|
|168530895|    P|    ar|      0|2017-03-30|      264|        265|
|168551548|    P|    ar|1474915|2017-03-30|        5|        243|
|168551509|    P|    ar|6329735|2017-03-30|      289|        289|
+---------+-----+------+-------+----------+---------+-----------+
only showing top 5 rows



#### Briefly data exploration

In [6]:
# simple count rows
print("total number of entries:", df.count())

total number of entries: 12445200


In [7]:
# select operation, count distinct rows: number of user_id 
print("total number of users:", df.select('uid').distinct().count())

total number of users: 59754


In [8]:
# group by aggregation
df.groupBy('event').count().show()

+-----+--------+
|event|   count|
+-----+--------+
|    D|  646004|
|    S|  779581|
|    P|11019615|
+-----+--------+



In [17]:
print("The most popular songs as above", 
df.groupby('song_id').count().sort(F.col('count').desc()).show(5))

+--------+------+
| song_id| count|
+--------+------+
|       0|914738|
|    null|785038|
| 9950164| 87436|
|15249349| 55854|
| 5237384| 41825|
+--------+------+
only showing top 5 rows

The most popular songs as above None


In [18]:
print("The most active user_id as above", 
df.groupby('uid').count().sort(F.col('count').desc()).show(5))

+---------+-----+
|      uid|count|
+---------+-----+
|168954949| 7131|
|167925318| 6829|
|167979374| 6343|
|168416042| 5564|
|168442087| 5447|
+---------+-----+
only showing top 5 rows

The most active user_id as above None


## 2. Label definition

#### We define churn and time windows as below: 
#### 1 for  churn: user has P/D/S entries in 'Feature window' while has no P/D/S entries in 'Label window'
#### 0 for not churn: user has P/D/S entries in both two windows
#### Label window: 2017-04-29 ~ 2017-05-12 days: 14
#### Feature window: 2017-03-30 ~ 2017-04-28 days: 30
##### note: here we ignore user who has P/D/S entries in ' Label window' while has no P/D/S entries in 'Feature window'

In [116]:
import datetime
from dateutil import parser

label_window_size = 14
label_window_end_date = parser.parse('2017-05-12').date()
label_window_start_date = label_window_end_date - datetime.timedelta(label_window_size - 1)
print('label window:',label_window_start_date,'~',label_window_end_date,'days:',label_window_size)

feature_window_size = 30
feature_window_end_date = label_window_start_date - datetime.timedelta(1)
feature_window_start_date = feature_window_end_date  - datetime.timedelta(feature_window_size - 1)
print('feature window:',feature_window_start_date,'~',feature_window_end_date,'days:',feature_window_size)

label window: 2017-04-29 ~ 2017-05-12 days: 14
feature window: 2017-03-30 ~ 2017-04-28 days: 30


In [117]:
# all the uid in feature window we will model
df_model_uid = df.filter((F.col('date')>=feature_window_start_date) & (F.col('date')<=feature_window_end_date))\
                    .select('uid').distinct()
# active uid in label window (active label=0)
df_active_uid_in_label_window = df.filter((F.col('date')>=label_window_start_date) & (F.col('date')<=label_window_end_date))\
                            .select('uid').distinct().withColumn('label',F.lit(0))

In [118]:
# prepare label data (churn label=1; active label=0)
df_label = df_model_uid.join(df_active_uid_in_label_window,on=['uid'],how='left')
df_label = df_label.fillna(1)   # 1 for churn
print("Sample size of users in feature window is", df_label.count())

Sample size of users in feature window is 58432


In [119]:
df_label.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|36272|
|    0|22160|
+-----+-----+



##### We have 36272 churn label and 22160 active label, labels are comparable.

## 3. Data cleaning

In [120]:
# event_data in feature_window
df_feature_window = df.filter((F.col('date')>=feature_window_start_date) & (F.col('date')<=feature_window_end_date))

In [121]:
df_feature_window.count()

9557651

In [122]:
df_feature_window.describe().show()

+-------+--------------------+-------+-------+--------------------+-------------------+-------------------+
|summary|                 uid|  event| device|             song_id|          play_time|        song_length|
+-------+--------------------+-------+-------+--------------------+-------------------+-------------------+
|  count|             9557651|9557651|9557651|             8934375|            8391865|            8393403|
|   mean|1.6642767226301807E8|   null|   null| 1.55567109663087E14|  28226.20623478178|-1231.9704732739265|
| stddev|1.4495014041825246E7|   null|   null|4.486923339065008...|7.927618484536014E7| 1820892.4337731672|
|    min|           100077577|      D|     ar|                  -1|       -0.011976957|                 -1|
|    max|            99850419|      S|     ip|             9999722|                nan|                999|
+-------+--------------------+-------+-------+--------------------+-------------------+-------------------+



In [123]:
df_play = df_feature_window.filter(F.col('event') == 'P') 
df_play.describe().show()

+-------+--------------------+-------+-------+--------------------+-------------------+-------------------+
|summary|                 uid|  event| device|             song_id|          play_time|        song_length|
+-------+--------------------+-------+-------+--------------------+-------------------+-------------------+
|  count|             8395152|8395152|8395152|             8391657|            8391865|            8393403|
|   mean|1.6628846441030693E8|   null|   null|1.656281819333759...|  28226.20623478178|-1231.9704732739265|
| stddev| 1.500893981882352E7|   null|   null|4.629741133728094...|7.927618484536014E7| 1820892.4337731672|
|    min|           100077577|      P|     ar|                  -1|       -0.011976957|                 -1|
|    max|            99850419|      P|     ip|             9999722|                nan|                999|
+-------+--------------------+-------+-------+--------------------+-------------------+-------------------+



#### We notice some issue with our data as below: 

1/ We need to check whether there are songs with zero or negative 'song_length', if so, we need to manipulate these values.  
2/ We notice that the mean value of 'song_length' is negative, there may be some outliers, we need to exclude them.  
3/ We notice that  in column 'play_time', there are 'nan' and negative value, we need to manipulate these values.  
4/ We notice that the mean value of 'play_time' is incredibly large, there may be some outliers, we need to exclude them.  

### 3.1 Data processing of  "song_length"

##### Now we will try to solve the following problem: 

1/ We need to check whether there are songs with zero or negative 'song_length', if so, we need to manipulate these values.   
2/ We notice that the mean value of song_length' is incredibly small, there may be some outliers, we need to exclude them.  

##### count 'nan' song_length

In [124]:
# count null song_lenght
print("There are", 
      df_play.filter(F.col('song_length').isNull()).count(),\
     "songs with NULL song_lenght")

There are 1749 songs with NULL song_lenght


##### Let's check whether these NULL song_length also have NULL song_id and NULL play_time

In [125]:
print("There are", 
      df_play.filter(F.col('song_length').isNull()).filter(F.col('song_id').isNull()).filter(F.col('play_time').isNull()).count(),\
     "songs with NULL song_lenght, song_id and play_time")

There are 1749 songs with NULL song_lenght, song_id and play_time


##### We find that songs with NULL song_lenght also have NULL song_id and NULL play_time. Let's count these odd songs by uid.

In [126]:
odd_songs_by_uid = df_play.filter(F.col('song_length').isNull()) \
                                                      .filter(F.col('song_id').isNull()) \
                                                      .filter(F.col('play_time').isNull()) \
                                                      .groupBy(F.col('uid')).count().sort(F.col('count').desc())
print("These odd songs are played by", 
      odd_songs_by_uid.count(),\
     "uid")

These odd songs are played by 17 uid


##### As these odd songs are played by only 17 uid, we can remove these records directly

In [127]:
df_play_song_length_without_nan = df_play.filter(F.col('song_length').isNotNull())
df_feature_window_cleaned = df_feature_window.filter(F.col('event') == 'P').filter(F.col('song_length').isNotNull()) \
                                  .union(df_feature_window.filter(F.col('event') == 'D')) \
                                  .union(df_feature_window.filter(F.col('event') == 'S'))

##### to check if there are songs with “song_length == 0” while “play_time ≠ 0”, if so, we need to deal with  “song_length == 0”

In [128]:
# to check if there are songs with “song_length == 0” while “play_time ≠ 0”
df_play_song_length_without_nan = df_play_song_length_without_nan \
                                            .withColumn('song_length', F.col('song_length').cast('int'))

print("There are", 
      df_play_song_length_without_nan.filter((F.col('song_length') == 0) & (F.col('play_time') != 0)).count(),\
     "songs with “song_length == 0” while “play_time ≠ 0”")

There are 287252 songs with “song_length == 0” while “play_time ≠ 0”


##### We need to deal with the the songs with “song_length == 0”, while before that, I need to check whether the churn rate of songs with “song_length == 0” and that of songs with “song_length !=0” are significant different

In [129]:
# define function to calculate label_column rate of different feature_column value
def whether_churn_rate_significant_diff_with_zero(df, feature_column, label_column):
    feature_column_vs_label = df \
                 .withColumn("feature_column_unnormal", F.when(F.col(feature_column) == 0, 1).otherwise(0)) \
                 .agg(F.sum(F.when((F.col("feature_column_unnormal") == 1) & (F.col(label_column) == 1),1).otherwise(0)).alias('feature_column_unnormal_label_column'),
                      F.sum(F.when((F.col("feature_column_unnormal") == 1) & (F.col(label_column) == 0),1).otherwise(0)).alias('feature_column_unnormal_not_label_column'),
                      F.sum(F.when((F.col("feature_column_unnormal") == 0) & (F.col(label_column) == 1),1).otherwise(0)).alias('feature_column_normal_label_column'),
                      F.sum(F.when((F.col("feature_column_unnormal") == 0) & (F.col(label_column) == 0),1).otherwise(0)).alias('feature_column_normal_not_label_column'))
    feature_column_vs_label = feature_column_vs_label.select(F.round(F.col('feature_column_unnormal_label_column') / (F.col('feature_column_unnormal_label_column') + F.col('feature_column_unnormal_not_label_column')),2).alias(feature_column + '_unnormal_churn_rate'),\
                                                     F.round(F.col('feature_column_normal_label_column') / (F.col('feature_column_normal_label_column') + F.col('feature_column_normal_not_label_column')),2).alias(feature_column + '_normal_churn_rate'))
    return feature_column_vs_label   

In [130]:
# show whether the churn rate of songs with '0' song_length and that of songs with “song_length >=0” are significant different
df_play_song_length_without_nan = df_play_song_length_without_nan.join(df_label,on='uid',how='left')
whether_churn_rate_significant_diff_with_zero(df_play_song_length_without_nan, "song_length", "label").show()

+-------------------------------+-----------------------------+
|song_length_unnormal_churn_rate|song_length_normal_churn_rate|
+-------------------------------+-----------------------------+
|                           0.32|                         0.26|
+-------------------------------+-----------------------------+



##### From the table above we find the churn rate of songs with “song_length == 0” and that of songs with “song_length !=0” are not significant different, which means “song_length == 0” appears randomly without pattern, so that we can replace them with mean value of column "song_length" after we remove the outlier.

#####  Get the 99th of song_length as 'top_song_length_threshold', and calculate mean value of larger or equal to zero "song_length" records

In [131]:
# order column 'play_time' and sort, then find 99% top value
df_play_song_length_larger_or_equals_to_zero = df_play_song_length_without_nan.filter(F.col('song_length') >= 0)

top_song_length_threshold_cut = int(df_play_song_length_larger_or_equals_to_zero.count() * 0.01)    # top 1%

df_play_song_length_larger_or_equals_to_zero_cut = df_play_song_length_larger_or_equals_to_zero \
                                                        .sort(F.col("song_length").desc()) \
                                                        .select(F.col('song_length')) \
                                                        .limit(top_song_length_threshold_cut)     # limit top 1% play_time 

In [132]:
# get the 99th of song_length as top_song_length_threshold to remove outliers.
top_song_length_threshold = df_play_song_length_larger_or_equals_to_zero_cut.sort(F.col("song_length")).take(1)[0][0]
print('99th of song_length as top_song_length_threshold to remove outliers is', str(top_song_length_threshold))

99th of song_length as top_song_length_threshold to remove outliers is 1333


In [133]:
# filter song_length less than 'top_song_length_threshold' 
# then calculate mean value of song_length

from pyspark.sql.functions import col, avg

song_length_mean = df_play_song_length_larger_or_equals_to_zero\
                            .withColumn("song_length_is_outlier", F.when(F.col('song_length') >= top_song_length_threshold, 1).otherwise(0)) \
                            .filter(F.col("song_length_is_outlier") == 0) \
                            .agg(F.round(avg(col("song_length")), 0).alias('song_length_mean'))

song_length_mean = song_length_mean.take(1)[0][0]
print('mean value of larger or equal to zero "song_length" records is', str(song_length_mean))

mean value of larger or equal to zero "song_length" records is 223.0


##### Replace negative and zero song_length with  'song_length_mean',  and remove outlier whose 'song_length' value less than 'top_song_length_threshold'

In [136]:
# Replace negative and zero song_length with 'song_length_mean', and remove song_length outlier

df_play_song_length_cleaned = df_play_song_length_without_nan \
                            .withColumn('song_length', F.when(F.col('song_length') <= 0, song_length_mean).otherwise(F.col('song_length'))) \
                            .filter(F.col("song_length") <= top_song_length_threshold) 

df_feature_window_cleaned = df_feature_window_cleaned.withColumn('song_length', F.col('song_length').cast('int'))\
                                  .filter(F.col('event') == 'P').filter(F.col('song_length') <= top_song_length_threshold) \
                                  .union(df_feature_window.filter(F.col('event') == 'D')) \
                                  .union(df_feature_window.filter(F.col('event') == 'S'))

##### We finished the cleaning of 'song_length'. 
### 3.2 Data processing of  "play_time"
##### Next, we will solve the following two problems:

3/ We notice that  in column 'play_time', there are 'nan' and negative value, we need to manipulate these values.  
4/ We notice that the mean value of 'play_time' is incredibly large, there may be some outliers, we need to exclude them.  

##### count NULL play_time

In [138]:
# count NULL play_time
print("There are", 
      df_play_song_length_cleaned.filter(F.col('play_time').isNull()).count(),\
     "songs with NULL play_time")

There are 1183 songs with NULL play_time


##### Before I deal with the songs with NULL play_time, I need to check whether the churn rate of songs with NULL play_time and that of songs with “play_time >=0” are significant different?

In [139]:
def whether_churn_rate_significant_diff_with_nan(df, feature_column, label_column):
    feature_column_vs_label = df \
                 .withColumn("feature_column_unnormal", F.when(F.col(feature_column).isNull(), 1).otherwise(0)) \
                 .agg(F.sum(F.when((F.col("feature_column_unnormal") == 1) & (F.col(label_column) == 1),1).otherwise(0)).alias('feature_column_unnormal_label_column'),
                      F.sum(F.when((F.col("feature_column_unnormal") == 1) & (F.col(label_column) == 0),1).otherwise(0)).alias('feature_column_unnormal_not_label_column'),
                      F.sum(F.when((F.col("feature_column_unnormal") == 0) & (F.col(label_column) == 1),1).otherwise(0)).alias('feature_column_normal_label_column'),
                      F.sum(F.when((F.col("feature_column_unnormal") == 0) & (F.col(label_column) == 0),1).otherwise(0)).alias('feature_column_normal_not_label_column'))
    feature_column_vs_label = feature_column_vs_label.select(F.round(F.col('feature_column_unnormal_label_column') / (F.col('feature_column_unnormal_label_column') + F.col('feature_column_unnormal_not_label_column')),2).alias(feature_column + '_unnormal_churn_rate'),\
                                                     F.round(F.col('feature_column_normal_label_column') / (F.col('feature_column_normal_label_column') + F.col('feature_column_normal_not_label_column')),2).alias(feature_column + '_normal_churn_rate'))
    return feature_column_vs_label      

In [140]:
# show whether the churn rate of songs with 'nan' play_time and that of songs with “play_time >=0” are significant different
whether_churn_rate_significant_diff_with_nan(df_play_song_length_cleaned, "play_time", "label").show()

+-----------------------------+---------------------------+
|play_time_unnormal_churn_rate|play_time_normal_churn_rate|
+-----------------------------+---------------------------+
|                         0.28|                       0.27|
+-----------------------------+---------------------------+



##### From the table above we find the churn rate of songs NULL play_time and that of songs with “play_time >=0” are not significant different, which means play_time = NULL appears randomly without pattern, so that we can replace 'nan' with mean value of “play_time >=0” records.

##### count negative play_time

In [141]:
# filter play_time is not NULL and convert column 'play_time' to 'int'
# df_play_play_time_not_nan = df_play.withColumn("play_time_is_nan", F.when(F.col('play_time').isNull(), 1).otherwise(0))  
df_play_play_time_not_nan = df_play_song_length_cleaned.filter(F.col('play_time').isNotNull())\
                            .withColumn('play_time',F.col('play_time').cast('int')) 

In [142]:
# count negative play_time
print("There are", 
      df_play_play_time_not_nan.filter(F.col('play_time') < 0).count(),\
     "songs with negative play_time")

There are 6 songs with negative play_time


#####  As there are only 6 songs with negative play_time, we will manipulate them together with NULL play_time records, replace them with mean value of “play_time >=0” records

##### Get the 99th of play_time as 'top_play_time_threshold', and calculate mean value of larger or equal to zero "play_time" records

In [143]:
# order column 'play_time' and sort, then find 99% top value
df_play_play_time_larger_or_equals_to_zero = df_play_play_time_not_nan.filter(F.col('play_time') >= 0)

top_play_time_threshold_cut = int(df_play_play_time_larger_or_equals_to_zero.count() * 0.01)    # top 1%

df_play_play_time_larger_or_equals_to_zero_cut = df_play_play_time_larger_or_equals_to_zero \
                                                        .sort(F.col("play_time").desc()) \
                                                        .select(F.col('play_time')) \
                                                        .limit(top_play_time_threshold_cut)     # limit top 1% play_time 

In [144]:
# get the 99th of play_time as top_play_time_threshold to remove outliers.
top_play_time_threshold = df_play_play_time_larger_or_equals_to_zero_cut.sort(F.col("play_time")).take(1)[0][0]
print('99th of play_time as top_play_time_threshold to remove outliers',str(top_play_time_threshold))

99th of play_time as top_play_time_threshold to remove outliers 11830


In [145]:
# filter play_time less than 'top_play_time_threshold' 
# then calculate mean value of play_time
play_time_mean = df_play_play_time_larger_or_equals_to_zero\
                            .withColumn("play_time_is_outlier", F.when(F.col('play_time') >= top_play_time_threshold, 1).otherwise(0)) \
                            .filter(F.col("play_time_is_outlier") == 0) \
                            .agg(F.round(avg(col("play_time")), 0).alias('play_time_mean'))

play_time_mean = play_time_mean.take(1)[0][0]
print('mean value of larger or equal to zero "play_time" records is', str(play_time_mean))

mean value of larger or equal to zero "play_time" records is 144.0


#### Replace NULL and negative play_time with  'play_time_mean',  and remove outlier whose 'play_time' value less than 'top_play_time_threshold'

In [146]:
# Replace NULL play_time with 'play_time_mean'
df_play_song_length_play_time_cleaned = df_play_song_length_cleaned.withColumn("play_time", F.when(F.col('play_time').isNull(), play_time_mean).otherwise(F.col('play_time')))  
df_play_song_length_play_time_cleaned = df_play_song_length_play_time_cleaned.withColumn("play_time", F.when(F.col('play_time') == 'nan', play_time_mean).otherwise(F.col('play_time'))) 

In [147]:
df_play_song_length_play_time_cleaned = df_play_song_length_play_time_cleaned.withColumn('play_time',F.col('play_time').cast('int')) \
                            .withColumn('play_time', F.when(F.col('play_time') < 0, play_time_mean).otherwise(F.col('play_time'))) \
                            .filter(F.col("play_time") <= top_play_time_threshold) 


df_feature_window_cleaned = df_feature_window_cleaned.withColumn('play_time', F.col('play_time').cast('int'))\
                                  .filter(F.col('event') == 'P').filter(F.col('play_time') <= top_play_time_threshold) \
                                  .union(df_feature_window.filter(F.col('event') == 'D')) \
                                  .union(df_feature_window.filter(F.col('event') == 'S'))

In [148]:
df_play_song_length_play_time_cleaned.describe().show()

+-------+--------------------+-------+-------+--------------------+------------------+-----------------+-------------------+
|summary|                 uid|  event| device|             song_id|         play_time|      song_length|              label|
+-------+--------------------+-------+-------+--------------------+------------------+-----------------+-------------------+
|  count|             8182785|8182785|8182785|             8181042|           8182785|          8182785|            8182785|
|   mean| 1.663321958387472E8|   null|   null|1.682889676009665...| 144.4299803795407|240.9282491230064|0.26507148849688705|
| stddev|1.4871156627324002E7|   null|   null|4.679619339481816...|272.75805647582456|95.71229897828493| 0.4413712930063083|
|    min|           100077577|      P|     ar|                  -1|               0.0|              1.0|                  0|
|    max|            99850419|      P|     ip|             9999722|           11830.0|           1333.0|                  1|


##### From the table above, we can see the statistics of 'play_time' and 'song_length' are more reasonable now. 
##### Next, we start to generate features.

## 4. Feature generation

### 4.1 Generate Features A:  Play time percentage related features

### 4.1.1 Generate Features A1:  Proportion features of different level of play time percentage

#### Generate play time percentage proportion feature as:
#### time_percentage_0_to_20,  time_percentage_20_to_40,  time_percentage_40_to_60,  time_percentage_60_to_80,  and time_percentage_larger_than_80 
#### for example: time_percentage_0_to_20 means 0<=percentage<20, which is ratio of (number of played songs with play time percentage less than 20%) to (total number of played songs)  per uid

In [149]:
# generate column 'play_time_percentage_of_song_length'
df_play_song_length_play_time_cleaned = df_play_song_length_play_time_cleaned. \
                                            withColumn('play_time_percentage_of_song_length', 
                                                       F.round((F.col('play_time') / F.col('song_length')),2))

In [150]:
# define a function to generate percentage features for proportion of different level of play time percentage

def percentage_proportion_feature_generation_by_percentage_level(df, play_time_percentage_list, percentage_gap):
    """
    generate percentage features for proportion of different play time percentage level  
    """
    df_feature = df \
        .filter(F.col('event') == 'P') \
        .groupBy('uid') \
        .agg(*[F.round(F.sum(F.when((F.col('Play_time_percentage_of_song_length') >= (play_time_percentage - percentage_gap) / 100) & (F.col('Play_time_percentage_of_song_length') < play_time_percentage / 100),1).otherwise(0))/ \
             F.count(F.col('Play_time_percentage_of_song_length')), 2) \
             .alias('time_percentage_' + str(play_time_percentage - percentage_gap) + '_to_' +str(play_time_percentage)) \
             for play_time_percentage in play_time_percentage_list]
            ) # *[] opens list and make them comma separated
    return df_feature

In [151]:
# generate frequency features for all event_list, time_window_list
play_time_percentage_list = [20,40,60,80]
percentage_gap = 20
df_percentage_proportion_feature_list = []
df_percentage_proportion_feature_list.append(percentage_proportion_feature_generation_by_percentage_level(df_play_song_length_play_time_cleaned, play_time_percentage_list, percentage_gap))
df_percentage_proportion_feature = df_percentage_proportion_feature_list[0]
df_percentage_proportion_feature = df_percentage_proportion_feature.withColumn("time_percentage_larger_than_80",F.round(1 - F.col('time_percentage_0_to_20') - F.col('time_percentage_20_to_40')\
                                 - F.col('time_percentage_40_to_60')- F.col('time_percentage_60_to_80'), 2))

In [152]:
df_percentage_proportion_feature.show(5)

+---------+-----------------------+------------------------+------------------------+------------------------+------------------------------+
|      uid|time_percentage_0_to_20|time_percentage_20_to_40|time_percentage_40_to_60|time_percentage_60_to_80|time_percentage_larger_than_80|
+---------+-----------------------+------------------------+------------------------+------------------------+------------------------------+
|104777734|                    1.0|                     0.0|                     0.0|                     0.0|                           0.0|
| 11596711|                   0.47|                    0.11|                    0.02|                    0.06|                          0.34|
|118301183|                    0.7|                     0.1|                     0.0|                     0.0|                           0.2|
|151294213|                   0.81|                     0.0|                     0.0|                    0.13|                          0.06|
|16660

In [153]:
# create a list to concatenate features generated so far
features_to_final_join = []
features_to_final_join.append(df_percentage_proportion_feature)

### 4.1.2 Generate Features A2: Acceleration features of different level of play time percentage

#### Ratio of count of songs played with >=80 percentage of nearest 7 days to that of nearest 30 days

In [154]:
# generate feature as ratio of count of songs played with >= 80 percentage of nearest 7 days to that of nearest 30 days
shorter_time_window = 7
longer_time_window = 30
snapshot_date = feature_window_end_date
df_percentage_acceleration_feature = df_play_song_length_play_time_cleaned \
                            .filter(F.col('play_time_percentage_of_song_length') >= 0.8) \
                            .groupBy('uid') \
                            .agg(F.round(F.sum(F.when((F.col('date') >= snapshot_date-datetime.timedelta(shorter_time_window-1)) & (F.col('date')<=snapshot_date),1).otherwise(0))/ \
                             F.sum(F.when((F.col('date') >= snapshot_date-datetime.timedelta(longer_time_window-1)) & (F.col('date')<=snapshot_date),1).otherwise(0)), 2) \
                            .alias('time_percentage_larger_than_80' + '_' + str(shorter_time_window) + 'd_over_' + str(longer_time_window) + 'd'))

In [155]:
df_percentage_acceleration_feature.show(5)

+---------+------------------------------------------+
|      uid|time_percentage_larger_than_80_7d_over_30d|
+---------+------------------------------------------+
| 11596711|                                      0.45|
|118301183|                                       0.0|
|151294213|                                       0.0|
|166601616|                                      0.45|
|167570658|                                       0.1|
+---------+------------------------------------------+
only showing top 5 rows



In [156]:
# concatenate features generated so far
features_to_final_join.append(df_percentage_acceleration_feature)

### 4.2 Generate Feature B: Play_time related feature 

#### 4.2.1 Generate Features B1: Total play_time

In [157]:
# generate 'total_play_time'
total_play_time_feature = df_play_song_length_play_time_cleaned \
        .groupBy('uid') \
        .agg(F.sum(F.col('play_time').cast('int')).alias('total_play_time'))

In [158]:
total_play_time_feature.show(5)

+---------+---------------+
|      uid|total_play_time|
+---------+---------------+
|104777734|             37|
| 11596711|          11476|
|118301183|            611|
|151294213|            737|
|166601616|           7238|
+---------+---------------+
only showing top 5 rows



In [159]:
# concatenate features generated so far
features_to_final_join.append(total_play_time_feature)

### 4.2.2 Generate Features B2: Acceleration features of total play_time

#### Ratio of total play_time of nearest 7 days to that of nearest 30 days

In [160]:
# generate feature as ratio of total number of play_time of nearest 7 days to that of nearest 30 days
shorter_time_window = 7
longer_time_window = 30
df_total_play_time_acceleration_feature = df_play_song_length_play_time_cleaned \
                            .groupBy('uid') \
                            .agg(F.round(F.sum(F.when((F.col('date') >= snapshot_date-datetime.timedelta(shorter_time_window-1)) & (F.col('date')<=snapshot_date), F.col('play_time')).otherwise(0))/ \
                             F.sum(F.when((F.col('date') >= snapshot_date-datetime.timedelta(longer_time_window-1)) & (F.col('date')<=snapshot_date), F.col('play_time')).otherwise(0)),2) \
                            .alias('total_play_time' + '_' + str(shorter_time_window) + 'd_over_' + str(longer_time_window) + 'd'))

In [194]:
df_total_play_time_acceleration_feature.show(5)

+---------+---------------------------+
|      uid|total_play_time_7d_over_30d|
+---------+---------------------------+
|104777734|                        0.0|
| 11596711|                       0.47|
|118301183|                        0.0|
|151294213|                        0.0|
|166601616|                       0.57|
+---------+---------------------------+
only showing top 5 rows



In [161]:
# concatenate features generated so far
features_to_final_join.append(df_total_play_time_acceleration_feature)

### 4.2.3 Generate Features B3: Average play_time of songs

In [162]:
average_play_time_feature = df_play_song_length_play_time_cleaned \
        .groupBy('uid') \
        .agg(F.round(F.sum(F.col('play_time')) / \
                     F.count(F.col('play_time')), 2) \
             .alias('average_play_time'))

In [163]:
average_play_time_feature.show(5)

+---------+-----------------+
|      uid|average_play_time|
+---------+-----------------+
|104777734|            12.33|
| 11596711|            88.96|
|118301183|             61.1|
|151294213|            46.06|
|166601616|            92.79|
+---------+-----------------+
only showing top 5 rows



In [164]:
# concatenate features generated so far
features_to_final_join.append(average_play_time_feature)

### 4.3 Generate Features C: Event related features 

### 4.3.1 Generate Event features C1: events frequency in given windows

In [165]:
# define a function to generate frequency features for a list of time windows
# using when().otherwise(), and list comprehension trick!
def frequency_feature_generation_time_windows(df,event,time_window_list,snapshot_date):
    """
    generate frequency features for one event type and a list of time windows
    """
    df_feature = df \
        .filter(F.col('event')==event) \
        .groupBy('uid') \
        .agg(*[F.sum(F.when((F.col('date')>=snapshot_date-datetime.timedelta(time_window-1)) & (F.col('date')<=snapshot_date),1).otherwise(0)).alias('freq_'+event+'_last_'+str(time_window)) \
                for time_window in time_window_list]
            )# *[] opens list and make them comma separated
    return df_feature

In [166]:
# generate frequency features for all event_list, time_window_list
event_list = ['P','D','S']
time_window_list = [1,3,7,14,30]
df_event_frequrency_feature_list = []
for event in event_list:
    df_event_frequrency_feature_list.append(frequency_feature_generation_time_windows(df_feature_window_cleaned,event,time_window_list,snapshot_date))

In [167]:
df_event_frequrency_feature_list    # list of three feature dataframe

[DataFrame[uid: string, freq_P_last_1: bigint, freq_P_last_3: bigint, freq_P_last_7: bigint, freq_P_last_14: bigint, freq_P_last_30: bigint],
 DataFrame[uid: string, freq_D_last_1: bigint, freq_D_last_3: bigint, freq_D_last_7: bigint, freq_D_last_14: bigint, freq_D_last_30: bigint],
 DataFrame[uid: string, freq_S_last_1: bigint, freq_S_last_3: bigint, freq_S_last_7: bigint, freq_S_last_14: bigint, freq_S_last_30: bigint]]

In [168]:
df_event_frequrency_feature_list[0].show(5) 

+---------+-------------+-------------+-------------+--------------+--------------+
|      uid|freq_P_last_1|freq_P_last_3|freq_P_last_7|freq_P_last_14|freq_P_last_30|
+---------+-------------+-------------+-------------+--------------+--------------+
| 81114900|            0|            0|            0|             0|            14|
|168555344|            0|            0|           35|           111|           252|
|168572740|            0|            0|            0|             0|             5|
|168580671|           24|           97|          126|           276|          1524|
|168610161|            0|            0|            0|             2|            49|
+---------+-------------+-------------+-------------+--------------+--------------+
only showing top 5 rows



### 4.3.2 Generate Event features C2:  Ratio of event frequency of nearest 7 days to that of nearest 30 days

In [169]:
# define a function to generate acceleration features for Ratio of event frequency of different time window (acceleration)
def event_ratio_of_different_time_window(df,event, shorter_time_window, longer_time_window, snapshot_date):
    """
    generate frequency features for Ratio of event frequency of different time window
    """
    df_feature = df \
        .filter(F.col('event')==event) \
        .groupBy('uid') \
        .agg(F.round(F.sum(F.when((F.col('date') >= snapshot_date-datetime.timedelta(shorter_time_window-1)) & (F.col('date')<=snapshot_date),1).otherwise(0))/ \
             F.sum(F.when((F.col('date') >= snapshot_date-datetime.timedelta(longer_time_window-1)) & (F.col('date')<=snapshot_date),1).otherwise(0)), 2) \
             .alias(event + '_' + str(shorter_time_window) + 'd_over_' + event + '_'+ str(longer_time_window) + 'd')) 
    return df_feature

In [170]:
# generate recency features for all event_list
event_list = ['P','D','S']
shorter_time_window = 7
longer_time_window = 30
df_event_accerlaration_feature_list = []
for event in event_list:
    df_event_accerlaration_feature_list.append(event_ratio_of_different_time_window(df_feature_window_cleaned, event, shorter_time_window, longer_time_window, snapshot_date))

In [171]:
df_event_accerlaration_feature_list

[DataFrame[uid: string, P_7d_over_P_30d: double],
 DataFrame[uid: string, D_7d_over_D_30d: double],
 DataFrame[uid: string, S_7d_over_S_30d: double]]

In [172]:
df_event_accerlaration_feature_list[0].show(5)

+---------+---------------+
|      uid|P_7d_over_P_30d|
+---------+---------------+
| 81114900|            0.0|
|168555344|           0.14|
|168572740|            0.0|
|168580671|           0.08|
|168610161|            0.0|
+---------+---------------+
only showing top 5 rows



### 4.4 Generate Features D: Recency related features

### 4.4.1 Generate Recency features D1: Last Event Time from feature_window_end_date

In [173]:
from pyspark.sql.functions import datediff, lit

In [174]:
# define a function to generate recency features for event time from a given Snapshot Time
def last_event_time_from_snapshot_time(df,event,snapshot_date):
    """
    generate frequency features for one event type and a list of time windows
    """
    df_feature = df \
        .filter(F.col('event')==event) \
        .groupBy('uid') \
        .agg(datediff(lit(snapshot_date), F.max(F.col('date'))).alias('last_'+event+'_time_from_'+str(snapshot_date)))# *[] opens list and make them comma separated
    return df_feature

In [175]:
# generate recency features for all event_list
event_list = ['P','D','S']
df_recency_feature_list = []
for event in event_list:
    df_recency_feature_list.append(last_event_time_from_snapshot_time(df_feature_window_cleaned,event,snapshot_date))

In [176]:
df_recency_feature_list

[DataFrame[uid: string, last_P_time_from_2017-04-28: int],
 DataFrame[uid: string, last_D_time_from_2017-04-28: int],
 DataFrame[uid: string, last_S_time_from_2017-04-28: int]]

In [177]:
df_recency_feature_list[0].show(5)

+---------+---------------------------+
|      uid|last_P_time_from_2017-04-28|
+---------+---------------------------+
| 81114900|                         29|
|168555344|                          4|
|168572740|                         29|
|168580671|                          0|
|168610161|                          9|
+---------+---------------------------+
only showing top 5 rows



### 4.5 Generate Features E: Profile related features

### 4.5.1 Generate Profile features E1: device_feature

In [178]:
df_profile_tmp = df_feature_window_cleaned.select('uid', 'device')

##### Check user uses multiple devices

In [179]:
# check user with multiple device value
from pyspark.sql.functions import countDistinct
uid_with_mutiple_devices = df_profile_tmp.groupBy('uid').agg(countDistinct("device").alias('number_of_devices'))
uid_with_mutiple_devices = uid_with_mutiple_devices.filter(F.col('number_of_devices') > 1)
print("There are " + str(uid_with_mutiple_devices.count()) + " users use multiple devices")

There are 20 users use multiple devices


##### Check user has no  'device' value

In [180]:
# check user with no device value
df_profile_tmp = df_profile_tmp.groupBy('uid') \
                    .agg(F.sum(F.when(F.col('device') == 'ar', 1).otherwise(0)).alias('number_of_ar'),
                         F.sum(F.when(F.col('device') == 'ip', 1).otherwise(0)).alias('number_of_ip'))
uid_with_no_devices = df_profile_tmp.filter(F.col('number_of_ar') == 0).filter(F.col('number_of_ip') == 0)
print("There are " + str(uid_with_no_devices.count()) + " users use has no devices value")

There are 0 users use has no devices value


#####  As there are 20 users use multiple devices, we assigned the device label as device with more entries by correspond user, if user has the same entries number of 'ar' and 'ip', we assign its device label as 'ip'.
##### Here we have device_feature with value '0' for 'ip' and '1' for 'ar'.

In [181]:
df_profile_tmp = df_profile_tmp.withColumn('device', F.when(F.col('number_of_ar') > F.col('number_of_ip'), 1).otherwise(0))
df_device_feature = df_profile_tmp.select(F.col('uid'), F.col('device'))

In [182]:
df_device_feature.show(5)

+---------+------+
|      uid|device|
+---------+------+
| 81114900|     1|
|168555344|     1|
|168572740|     1|
|168580671|     1|
|168610161|     1|
+---------+------+
only showing top 5 rows



In [183]:
# concatenate features generated so far
features_to_final_join.append(df_device_feature)

## 5. Form training data for prediction models

In [184]:
# define function to join list of tables
def join_feature_data(df_master,df_feature_list):
    for df_feature in df_feature_list:
        df_master = df_master.join(df_feature,on='uid',how='left')
        #df_master.persist() # uncomment if number of joins is too many
    return df_master    

In [185]:
# join all percentage related features, total play time related features and profile features.
df_model_final = join_feature_data(df_label, features_to_final_join)

In [186]:
# join all event related features and recency related features.
df_model_final = join_feature_data(df_model_final, df_event_frequrency_feature_list)
df_model_final = join_feature_data(df_model_final, df_event_accerlaration_feature_list)
df_model_final = join_feature_data(df_model_final, df_recency_feature_list)

In [187]:
df_model_final.fillna(0).toPandas().to_csv('../data/df_model_final.csv',index=False)

## 6. Form rating data for recommendation system 

### We define 'rating' as the larger value of play_score and download_score：

#### play_score:

"play_score" are generate by 'play_time_percentage_of_song_length'.  
The idea is that the larger percentage is, the more likely the user like the song, rules as below:   
0.8 <= 'play_time_percentage_of_song_length', assign "play_score" 5  
0.6 <= 'play_time_percentage_of_song_length' < 0.8, assign "play_score" 4  
0.4 <= 'play_time_percentage_of_song_length' < 0.6, assign "play_score" 3  
0.2 <= 'play_time_percentage_of_song_length'< 0.4, assign "play_score" 2  
0 <= 'play_time_percentage_of_song_length' < 0.2, assign "play_score" 1  
Note: If per uid per song_id has multiple ratings, we take average.  

#### download_score:

"download_score" are generate by whether user has download entry in feature window: 2017-03-30 ~ 2017-04-28.  
The idea is that if a user download a song, he has great probability to like the song, rules as below:   
If have download entry, assign "download_score" 5  
If no download entry, assign "download_score" 0  

In [188]:
# assign 'df_rating'
df_recommendation = df_play_song_length_play_time_cleaned \
                        .filter(F.col('song_id').isNotNull())  \
                        .withColumn('play_score', F.when(F.col('play_time_percentage_of_song_length') >= 0.8, 5)  \
                                    .otherwise(F.when(F.col('play_time_percentage_of_song_length') >= 0.6, 4)  \
                                    .otherwise(F.when(F.col('play_time_percentage_of_song_length') >= 0.4, 3)  \
                                    .otherwise(F.when(F.col('play_time_percentage_of_song_length') >= 0.2, 2)  \
                                    .otherwise(1))))) 
# generate rating table and take average.
df_rating = df_recommendation.groupBy(F.col('uid'), F.col('song_id')) \
                            .agg(F.round(F.sum(F.col('play_score')) / F.count(F.col('play_score')), 0).alias('play_score'))

In [189]:
# assign 'download_score'
df_download_score = df_feature_window_cleaned \
                         .filter(F.col('event') == 'D')  \
                         .groupBy(F.col('uid'), F.col('song_id')).count() 
df_download_score = df_download_score.withColumn("download_score", F.when(F.col("count") >= 1, 5).otherwise(0))  \
                                     .select(F.col('uid'), F.col('song_id'), F.col('download_score'))

In [190]:
# join df_rating and df_download_score, and fill empty download_score as 0
# generate 'rating' as larger value of play_score adn download_score
# drop records with song_id is zero
df_rating = df_rating.join(df_download_score, on=['uid', 'song_id'],how='left') \
                     .fillna(0)  \
                     .withColumn('rating', F.when(F.col('play_score') < F.col('download_score'), F.col('download_score')).otherwise(F.col('play_score'))) \
                     .select(F.col('uid'), F.col('song_id'), F.col('rating')) \
                     .filter(F.col('song_id') != 0)   

In [191]:
df_rating.toPandas().to_csv('../data/df_rating.csv',index=False)