# Load data into Spark DataFrame

In [1]:
from pyspark import SparkContext

In [2]:
import pyspark.sql.functions as F

In [3]:
# We use matplotlib for plotting
import matplotlib.pyplot as plt

# This statement allow to display plot without asking to
%matplotlib inline

Reference: https://gist.github.com/ololobus/4c221a0891775eaa86b0
My Spark version is 2.3.1, so first set environment variable "SPARK_HOME" to be "/usr/local/Cellar/apache-spark/2.3.1/libexec/"

```
export SPARK_HOME="/usr/local/Cellar/apache-spark/2.3.1/libexec/"
```
before launching jupyter notebook by
```
jupyter-notebook
```

In [4]:
import os
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/

Using Python version 2.7.15 (default, May  1 2018 18:37:05)
SparkSession available as 'spark'.


In [5]:
sc

In [7]:
df = spark.read.csv('../data/event_ds.csv',header=True).cache()
df

AnalysisException: u'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'

In [None]:
df.show()

In [None]:
df.count()

In [None]:
# create new or overwrite original field with withColumn
df = df.withColumn('date',F.col('date').cast('date'))
df

In [None]:
df.show()

## Objective 1.3 Exploratory data analysis, e.g. find most popular songs, most active users.

### 10 Top popular songs

In [None]:
df_song = df.groupBy("song_id").count()
df_song.show()

In [None]:
df_song.sort("count", ascending=False).head(10)

We could see that the most popular song based on Play, Download and Search frequency is song_id=u'0', count=733382, which is much greater than the second popular song of song_id=u'9950164', count=85684. One reason could be that song_id=u'0' may be the an imputed data for NA or missing data.

### 10 Top most active users

In [None]:
df_user = df.groupBy("uid").count()
df_user.show()

In [None]:
df_user.sort("count", ascending=False).head(10)

The result of these top 10 active users are based on Play and Download frequency.

In [None]:
# simple count rows
df.count()

In [None]:
# select operation, count distinct rows
df.select('uid').distinct().count()

In [None]:
# group by aggregation
df.groupBy('event').count().show()

In [None]:
# group by aggregation, more general (count, min, max, mean), multiple at once
df.groupBy('event').agg(
    F.count(F.col('uid')).alias('count'),
    F.max(F.col('uid')).alias('max_uid')
).show()

In [None]:
# filter operation
# group by aggregation
# order by operation
df.filter((F.col('date')>='2017-04-01') & (F.col('date')<='2017-04-05')) \
                    .groupBy('date','event').count() \
                    .orderBy('date','event').show()

In [None]:
date_count = df.groupBy('date').count().toPandas()
date_count.head(5)

In [None]:
type(date_count['count'])

In [None]:
from matplotlib.dates import date2num  
date_count['date'] = date_count['date'].apply(date2num)
plt.bar(date_count['date'],date_count['count'])
plt.xticks(rotation='vertical')

# Label definition

In [None]:
import datetime
from dateutil import parser

label_window_size = 14
label_window_end_date = parser.parse('2017-05-12').date()
label_window_start_date = label_window_end_date - datetime.timedelta(label_window_size - 1)
print('label window:',label_window_start_date,'~',label_window_end_date,'days:',label_window_size)

feature_window_size = 30
feature_window_end_date = label_window_start_date - datetime.timedelta(1)
feature_window_start_date = feature_window_end_date  - datetime.timedelta(feature_window_size - 1)
print('feature window:',feature_window_start_date,'~',feature_window_end_date,'days:',feature_window_size)

In [None]:
# all the uid we will model
df_model_uid = df.filter((F.col('date')>=feature_window_start_date) & (F.col('date')<=feature_window_end_date))\
                    .select('uid').distinct()
# active in label window (active label=0)
df_active_uid_in_label_window = df.filter((F.col('date')>=label_window_start_date) & (F.col('date')<=label_window_end_date))\
                            .select('uid').distinct().withColumn('label',F.lit(0))

In [None]:
# prepare label data (churn label=1; active label=0)
df_label = df_model_uid.join(df_active_uid_in_label_window,on=['uid'],how='left')
df_label = df_label.fillna(1)

In [None]:
df_label.groupBy('label').count().show()

# Feature generation

In [None]:
# event_data in feature_window
df_feature_window = df.filter((F.col('date')>=feature_window_start_date) & (F.col('date')<=feature_window_end_date))

### Frequency features

##### method 1

In [None]:
# define a function to generate frequency features
def frequency_feature_generation(df,event,time_window,snapshot_date):
    """
    generate frequency features for one event type and one time window
    """
    df_feature = df.filter(F.col('event')==event)\
            .filter((F.col('date')>=snapshot_date-datetime.timedelta(time_window-1)) & (F.col('date')<=snapshot_date))\
            .groupBy('uid').agg(F.count(F.col('uid')).alias('freq_'+event+'_last_'+str(time_window)))
    return df_feature

In [None]:
# generate one feature
event = 'P'
time_window = 3
snapshot_date = feature_window_end_date
df_feature = frequency_feature_generation(df_feature_window,event,time_window,snapshot_date)

In [None]:
df_feature.show(5)

In [None]:
# generate frequency features for all event_list, time_window_list
# event_list = ['P','D','S']
event_list = ['P','D']
time_window_list = [1,3,7,14,30]
df_feature_list = []
for event in event_list:
    for time_window in time_window_list:
        df_feature_list.append(frequency_feature_generation(df_feature_window,event,time_window,snapshot_date))


In [None]:
df_feature_list

##### method 2: too many dfs to join? do it another way

In [None]:
# define a function to generate frequency features for a list of time windows
# using when().otherwise(), and list comprehension trick!
def frequency_feature_generation_time_windows(df,event,time_window_list,snapshot_date):
    """
    generate frequency features for one event type and a list of time windows
    """
    df_feature = df \
        .filter(F.col('event')==event) \
        .groupBy('uid') \
        .agg(*[F.sum(F.when((F.col('date')>=snapshot_date-datetime.timedelta(time_window-1)) & (F.col('date')<=snapshot_date),1).otherwise(0)).alias('freq_'+event+'_last_'+str(time_window)) \
                for time_window in time_window_list]
            )# *[] opens list and make them comma separated
    return df_feature

In [None]:
# generate one event type, all time windows 
event = 'P'
time_window_list = [1,3,7,14,30]
snapshot_date = feature_window_end_date
df_feature = frequency_feature_generation_time_windows(df_feature_window,event,time_window_list,snapshot_date)
df_feature.show(5)

In [None]:
# generate frequency features for all event_list, time_window_list
# event_list = ['P','D','S']
event_list = ['P','D']
time_window_list = [1,3,7,14,30]
df_feature_list = []
for event in event_list:
    df_feature_list.append(frequency_feature_generation_time_windows(df_feature_window,event,time_window_list,snapshot_date))


In [None]:
df_feature_list

### Recency feature

In [None]:
# event_data in feature_window
import datetime

In [None]:
current_date = datetime.date.today()
current_date

In [None]:
# defined as days from last event
# can generate one feature for each type of event
def recency_feature_generation(df):
    """
    generate recency features for one event type and a list of time windows
    """
    df_feature = df.withColumn('recency', F.datediff(F.current_date(), F.col('date')))
    df_feature = df_feature.select('uid', 'recency')
    df_feature.show(5)
    return df_feature

In [None]:
df_feature_list.append(recency_feature_generation(df))
df_feature_list

In [None]:
df_feature_list[2].count()

### Profile features

In [None]:
df_play = spark.read.csv('../data/play_ds.csv',header=True)
df_play.show(5)

In [None]:
df_play_feature_window = df_play.filter((F.col('date')>=feature_window_start_date) & (F.col('date')<=feature_window_end_date))
df_profile_tmp = df_play_feature_window.select('uid','device').distinct()

In [None]:
df_profile_tmp.groupBy('device').count().show()

In [None]:
# check if one user has two devices
df_profile_tmp.count()

In [None]:
df_profile_tmp.distinct().count()

In [None]:
df_profile_tmp = df_profile_tmp.withColumn('device_type',F.when(F.col('device')=='ip',1).otherwise(2))
df_profile_tmp.groupBy('device_type').count().show()

In [None]:
df_profile = df_label.select('uid').join(df_profile_tmp.select('uid','device_type'),on='uid',how='left')
df_profile.groupBy('device_type').count().show()

### Total play time features

In [None]:
# Can you generate total song play time features (using play_ds data) for different time window
# using play data (need to clean play time first, play time may be negative in data)
df_play.show(5)

### Fancier frequency features

In [None]:
# Can you generate counts of songs play 80% of their song length (using play_ds data) for different time window
# using play data (need to clean play time and song length first, play time may be negative in data, song length may be zeros)


# Form training data

In [None]:
def join_feature_data(df_master,df_feature_list):
    for df_feature in df_feature_list:
        df_master = df_master.join(df_feature,on='uid',how='left')
        #df_master.persist() # uncomment if number of joins is too many
    return df_master
    

In [None]:
# join all behavior features
df_model_final = join_feature_data(df_label,df_feature_list)

In [None]:
# join all profile features
df_model_final = join_feature_data(df_model_final,[df_profile])

In [None]:
#df_model_final.fillna(0).toPandas().to_csv('../data/df_model_final_bryanbc.csv',index=False)

In [None]:
df_model_final.head(10)

In [None]:
df_model_final.count()

From the first 10 data of df_model_final, we can see there are the left join produces too many uncessary rows.

In [None]:
def join_feature_data(df_master,df_feature_list):
    for df_feature in df_feature_list:
        df_master = df_master.join(df_feature,on='uid',how='inner')
        df_master.persist() # uncomment if number of joins is too many
    return df_master
    

In [None]:
# join all behavior features
df_model_final = join_feature_data(df_label,df_feature_list)
df_model_final

In [None]:
# join all profile features
df_model_final = join_feature_data(df_model_final,[df_profile])
df_model_final

In [None]:
df_model_final.count()

In [None]:
df_model_final.distinct().count()

In [None]:
df_model_final.head(5)

In [None]:
df_model_final = df_model_final.distinct()
df_model_final.count()

In [None]:
df_model_final_pd = df_model_final.fillna(0).toPandas()
df_model_final_pd

In [None]:
df_model_final_pd.to_csv('../data/df_model_final_bryanbc.csv',index=False)