# Sparkify capstone Project part2 - Featue Engineering

In [161]:
#import library
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max, split, udf, isnull, stddev, stddev_pop
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
import datetime
import seaborn as sns
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from matplotlib.ticker import MaxNLocator
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
#create spark session and load raw data
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()
path = "medium-sparkify-event-data.json"
spark_df = spark.read.json(path)

In [3]:
# remove empyt user ID and sessionID
spark_df_clean=spark_df.filter(spark_df["userId"]!="")
spark_df_clean=spark_df_clean.dropna(how="any",subset=["userId",'sessionId'])
# convert ts to real time
gen_time = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime("%Y-%m-%d %H:%M:%S"))
spark_df_clean = spark_df_clean.withColumn("time", gen_time(spark_df_clean['ts']))
# get hour, weekday and day
gen_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour)
spark_df_clean = spark_df_clean.withColumn("hour", gen_hour(spark_df_clean['ts']))

gen_weekday = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime("%w"))
spark_df_clean = spark_df_clean.withColumn("weekday", gen_weekday(spark_df_clean['ts']))

gen_day = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).day)
spark_df_clean = spark_df_clean.withColumn("day", gen_day(spark_df_clean['ts']))
# convert ts to real time
gen_time_ymd = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime("%Y-%m-%d"))
spark_df_clean = spark_df_clean.withColumn("time_ymd", gen_time_ymd(spark_df_clean['ts']))
# convert ts to time interval_week
gen_time_weel_interval = udf(lambda x: int(x/1000/60/60/24/7))
spark_df_clean = spark_df_clean.withColumn("time_week", gen_time_weel_interval(spark_df_clean['ts']))
# convert ts to time interval_half_month
gen_time_hm_interval = udf(lambda x: int(x/1000/60/60/24/7/15))
spark_df_clean = spark_df_clean.withColumn("time_half_month", gen_time_hm_interval(spark_df_clean['ts']))
# convert ts to time interval_month
gen_time_m_interval = udf(lambda x: int(x/1000/60/60/24/7/30))
spark_df_clean = spark_df_clean.withColumn("time_month", gen_time_m_interval(spark_df_clean['ts']))
# convert registration to month
gen_time_m_interval = udf(lambda x: int(x/1000/60/60/24/7/30))
spark_df_clean = spark_df_clean.withColumn("reg_month", gen_time_m_interval(spark_df_clean['registration']))
spark_df_clean = spark_df_clean.withColumn("reg_ymd", gen_time_ymd(spark_df_clean['registration']))
spark_df_clean = spark_df_clean.withColumn("reg_week", gen_time_weel_interval(spark_df_clean['registration']))
# convert location to state
# the location column contains the city name and the state name. This cell is used to remove the city and only keep the state name
get_state=udf(lambda x:x[-2:])
spark_df_clean = spark_df_clean.withColumn("location_state", get_state(spark_df_clean['location']))
# simplify userAgent
# userAgent contains the system and software info of each customer. To simplify userAgent, I only kept the system info. for example, Macintosh, or Windows NT
simp_useragent=udf(lambda x:"".join(x[x.index('(')+1:x.index(')')]))
spark_df_clean= spark_df_clean.withColumn("sim_user_agent", simp_useragent(spark_df_clean['userAgent']))
# create a churn column, if a customer has cancel confirmation in his/her page column, the churn value will be 1 in all rows of this customer.
get_cancel=udf(lambda x:1 if x=='Cancellation Confirmation' else 0)
spark_df_clean=spark_df_clean.withColumn("Churn",get_cancel(spark_df_clean['page']))

churnlist=spark_df_clean.filter(spark_df_clean["Churn"]==1).select('userId').dropDuplicates().collect()
churn_list=[x['userId'] for x in churnlist]
assign_churn=udf(lambda x:1 if x in churn_list else 0)
spark_df_clean=spark_df_clean.withColumn("Churn",assign_churn(spark_df_clean['userId']))
# create a downgrade column, if a customer has cancel confirmation in his/her page column, the churn value will be 1 in all rows of this customer.
get_downgrade=udf(lambda x:1 if x=='Downgrade' else 0)
spark_df_clean=spark_df_clean.withColumn("Downgrade",get_cancel(spark_df_clean['page']))

downgradelist=spark_df_clean.filter(spark_df_clean["Downgrade"]==1).select('userId').dropDuplicates().collect()
downgrade_list=[x['userId'] for x in downgradelist]
assign_downgrade=udf(lambda x:1 if x in downgrade_list else 0)
spark_df_clean=spark_df_clean.withColumn("Downgrade",assign_downgrade(spark_df_clean['userId']))

In [4]:
spark_df_clean.show(5)

+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+------------------+------+-------------+--------------------+------+-------------------+----+-------+---+----------+---------+---------------+----------+---------+----------+--------+--------------+--------------------+-----+---------+
|           artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|              song|status|           ts|           userAgent|userId|               time|hour|weekday|day|  time_ymd|time_week|time_half_month|time_month|reg_month|   reg_ymd|reg_week|location_state|      sim_user_agent|Churn|Downgrade|
+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+------------------+------+-------------+--------------------+------+-------------------+--

## Feature Engineering

In [5]:
# get user ID
user_list=spark_df_clean.select('userId').dropDuplicates().sort('userId').toPandas()['userId']
user_list=list(user_list)
#create feature dataframe
feature_index=np.arange(0,len(user_list))
feature_df=pd.DataFrame(index=feature_index)
feature_df['userId']=user_list

### add general cat feature

In [6]:
# add gender
feature_df['cat_gender']=spark_df_clean.select('userId','gender').dropDuplicates().sort('userId').toPandas()['gender']
# Add level
feature_df['cat_level']=spark_df_clean.select('userId','level').dropDuplicates().sort('userId').toPandas()['level']
# Add method
feature_df['cat_method']=spark_df_clean.select('userId','method').dropDuplicates().sort('userId').toPandas()['method']
# Add location_state
feature_df['cat_location_state']=spark_df_clean.select('userId','location_state').dropDuplicates().sort('userId').toPandas()['location_state']
# Add user_agent
feature_df['cat_user_agent']=spark_df_clean.select('userId','sim_user_agent').dropDuplicates().sort('userId').toPandas()['sim_user_agent']

### time sensitive feature

In [13]:
# create a list to store items in page columns
page_list = [(row['page']) for row in spark_df_clean.select("page").dropDuplicates().collect()]

In [14]:
# remove cancel and cancellation confirmation
removelist=['Cancel','Cancellation Confirmation']
for i in removelist:
    page_list.remove(i)

In [21]:
def features_merge(df1, df2):
    """
    This function is used to merge the feature using left join
    input: two data frame to be merged
    output: merged dataframe
    """
    result=pd.merge(df1,df2,on='userId',how='left')
    return result

#### page interaction frequency during active time

In [32]:
# create a dataframe tp store userId and totaly active time based on the max value in ts and min value in ts
user_time_df=spark_df_clean.groupby('userId').agg(((max(spark_df_clean['ts'])-min(spark_df_clean['ts']))/1000/60/60).alias('total_active_time')).sort('userId').toPandas()

In [33]:
# count the total interaction times of each page feature for each user
for i in page_list:
    col_name = "count_per_activetime_" + i.replace(" ", "_")
    temp_page_featuredf=spark_df_clean.filter(spark_df_clean['page']==i).groupby('userId','page').count().sort('userId').select('userId','count').withColumnRenamed("count",col_name).toPandas()
    user_time_df=features_merge(user_time_df,temp_page_featuredf)

In [42]:
# calculate the frequency of each page feature usage (count/total time)
user_time_df.fillna(0,inplace=True)
for i in range(2,user_time_df.shape[1]):
    user_time_df.iloc[:,i]=np.array(user_time_df.iloc[:,i])/np.array(user_time_df.iloc[:,1])

In [51]:
#put the page interaction frequency in feature dataframe
feature_df=features_merge(feature_df,user_time_df.drop(columns=['total_active_time']))

#### weekly page interaction std during active time

In [74]:
# find out the standard deviation of weekly page interaction for each user
for i in page_list:
    col_name = "weely_page_std_" + i.replace(" ", "_")
    temp_page_featuredf=spark_df_clean.filter(spark_df_clean['page']==i).groupby('userId','time_week').count().sort('userId').drop('time_week').groupby('userId').agg(stddev("count")).withColumnRenamed("stddev_samp(count)",col_name).toPandas()
    feature_df=features_merge(feature_df,temp_page_featuredf)

#### daily page interaction std during active time

In [76]:
# find out the daily deviation of weekly page interaction for each user
for i in page_list:
    col_name = "daily_page_std_" + i.replace(" ", "_")
    temp_page_featuredf=spark_df_clean.filter(spark_df_clean['page']==i).groupby('userId','time_ymd').count().sort('userId').drop('time_ymd').groupby('userId').agg(stddev("count")).withColumnRenamed("stddev_samp(count)",col_name).toPandas()
    feature_df=features_merge(feature_df,temp_page_featuredf)

#### page interaction time interval
the earliest time each user use the page feature to the lastest time each use the page feature

In [81]:
for i in page_list:
    col_name = "interaction_timelength_" + i.replace(" ", "_")
    temp_page_featuredf=spark_df_clean.filter(spark_df_clean['page']==i).groupby('userId').agg(((max('ts')-min('ts'))/1000/60/60).alias(col_name)).sort('userId').toPandas()
    feature_df=features_merge(feature_df,temp_page_featuredf)

#### number of unique songs per active time

In [90]:
tempdf1=spark_df_clean.filter(spark_df_clean['page']=="NextSong").groupby("userId",'song').count().sort("userId").groupby("userId").count().withColumnRenamed("count",'num_uni_songs_per_time').toPandas()
tempdf2=user_time_df[['userId','total_active_time']]
tempdf2=features_merge(tempdf2,tempdf1)
tempdf2['num_uni_songs_per_time']=np.array(tempdf2['num_uni_songs_per_time'])/np.array(tempdf2['total_active_time'])
feature_df=features_merge(feature_df,tempdf2[['userId','num_uni_songs_per_time']])

#### number of unique artists per active time

In [92]:
tempdf1=spark_df_clean.filter(spark_df_clean['page']=="NextSong").groupby("userId",'artist').count().sort("userId").groupby("userId").count().withColumnRenamed("count",'num_uni_artists_per_time').toPandas()
tempdf2=user_time_df[['userId','total_active_time']]
tempdf2=features_merge(tempdf2,tempdf1)
tempdf2['num_uni_artists_per_time']=np.array(tempdf2['num_uni_artists_per_time'])/np.array(tempdf2['total_active_time'])
feature_df=features_merge(feature_df,tempdf2[['userId','num_uni_artists_per_time']])

#### number of unique sessions per active time

In [97]:
tempdf1=spark_df_clean.groupby('userId','sessionId').count().sort('userId').groupby('userId').count().withColumnRenamed("count",'num_uni_sessions_per_time').toPandas()
tempdf2=user_time_df[['userId','total_active_time']]
tempdf2=features_merge(tempdf2,tempdf1)
tempdf2['num_uni_sessions_per_time']=np.array(tempdf2['num_uni_sessions_per_time'])/np.array(tempdf2['total_active_time'])
feature_df=features_merge(feature_df,tempdf2[['userId','num_uni_sessions_per_time']])

### Session sensitive features

#### average page interaction per session

In [102]:
for i in page_list:
    col_name = i.replace(" ", "_")+"_interaction_per_session"
    temp_page_featuredf=spark_df_clean.filter(spark_df_clean['page']==i).groupby('userId','page','sessionId').count().drop('page','sessionId').withColumnRenamed("count",col_name).groupby('userId').avg().sort('userId').toPandas()
    feature_df=features_merge(feature_df,temp_page_featuredf)

#### max page interaction per session

In [103]:
for i in page_list:
    col_name = i.replace(" ", "_")+"_interaction_per_session"
    temp_page_featuredf=spark_df_clean.filter(spark_df_clean['page']==i).groupby('userId','page','sessionId').count().drop('page','sessionId').withColumnRenamed("count",col_name).groupby('userId').max().sort('userId').toPandas()
    feature_df=features_merge(feature_df,temp_page_featuredf)

#### min page interaction per session

In [104]:
for i in page_list:
    col_name = i.replace(" ", "_")+"_interaction_per_session"
    temp_page_featuredf=spark_df_clean.filter(spark_df_clean['page']==i).groupby('userId','page','sessionId').count().drop('page','sessionId').withColumnRenamed("count",col_name).groupby('userId').min().sort('userId').toPandas()
    feature_df=features_merge(feature_df,temp_page_featuredf)

In [105]:
feature_df.to_csv('feature_df.csv',index=False)

#### average,max,min session time

In [108]:
temp_session_df=spark_df_clean.groupby("userId",'sessionId').agg(((max(spark_df_clean['ts'])-min(spark_df_clean['ts']))/1000/60).alias('sessiontime')).sort('userId','sessionId')

In [112]:
temp_session_avg=temp_session_df.groupby('userId').avg('sessiontime').sort('userId').toPandas()
feature_df=features_merge(feature_df,temp_session_avg)

In [113]:
temp_session_max=temp_session_df.groupby('userId').max('sessiontime').sort('userId').toPandas()
feature_df=features_merge(feature_df,temp_session_max)

In [114]:
temp_session_min=temp_session_df.groupby('userId').min('sessiontime').sort('userId').toPandas()
feature_df=features_merge(feature_df,temp_session_min)

#### average, max, min unique songs per session

In [121]:
temp_song=spark_df_clean.filter(spark_df_clean['page']=='NextSong').groupby("userId",'sessionId','song').count().sort('userId','sessionId').drop('count').groupby("userId",'sessionId').count()

In [126]:
temp_song_df=temp_song.groupby('userId').avg('count').sort('userId').withColumnRenamed("avg(count)",'avg_unique_songs_per_session').toPandas()
feature_df=features_merge(feature_df,temp_song_df)

In [127]:
temp_song_df=temp_song.groupby('userId').max('count').sort('userId').withColumnRenamed("max(count)",'max_unique_songs_per_session').toPandas()
feature_df=features_merge(feature_df,temp_song_df)

In [128]:
temp_song_df=temp_song.groupby('userId').min('count').sort('userId').withColumnRenamed("min(count)",'max_unique_songs_per_session').toPandas()
feature_df=features_merge(feature_df,temp_song_df)

In [148]:
churn_count=spark_df_clean.select('userId','Churn').dropDuplicates().sort('userId').toPandas()

In [152]:
feature_df=features_merge(feature_df,churn_count)

If one column contains more than 20% null data, this colum will be dropped. The rest null data will be filled with 0.

In [154]:
feature_df=feature_df[feature_df.columns[feature_df.isnull().mean()<0.2]]
feature_df.fillna(0,inplace=True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  downcast=downcast, **kwargs)


In [160]:
feature_df.to_csv('feature_df.csv',index=False)

In [162]:
feature_df

Unnamed: 0,userId,cat_gender,cat_level,cat_method,cat_location_state,cat_user_agent,count_per_activetime_Submit_Downgrade,count_per_activetime_Thumbs_Down,count_per_activetime_Home,count_per_activetime_Downgrade,...,min(NextSong_interaction_per_session),min(Thumbs_Up_interaction_per_session),min(Help_interaction_per_session),avg(sessiontime),max(sessiontime),min(sessiontime),avg_unique_songs_per_session,max_unique_songs_per_session_x,max_unique_songs_per_session_y,Churn
0,10,M,paid,GET,MS,Macintosh; Intel Mac OS X 10_9_4,0.000000,0.039936,0.599042,0.039936,...,360,17.0,3.0,1502.400000,1502.400000,1502.400000,350.000000,350,350,1
1,100,M,paid,PUT,TX,Windows NT 6.1; WOW64; rv:30.0,0.000000,0.025351,0.092497,0.027407,...,1,1.0,1.0,350.477778,1645.233333,0.016667,84.897436,380,1,0
2,100001,F,free,GET,FL,Macintosh; Intel Mac OS X 10_6_8,0.000000,0.031208,0.124832,0.000000,...,10,1.0,1.0,137.661111,202.133333,42.133333,32.000000,47,10,1
3,100002,F,free,PUT,CA,Windows NT 6.1; WOW64; rv:32.0,0.001771,0.003541,0.021246,0.003541,...,14,1.0,1.0,150.262500,333.433333,52.633333,34.250000,73,14,0
4,100003,F,paid,PUT,FL,Windows NT 6.1; WOW64; rv:31.0,0.000000,0.014056,0.043574,0.018273,...,2,1.0,1.0,296.927778,1277.233333,0.000000,81.375000,311,2,1
5,100004,F,free,GET,NY,compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0,0.000000,0.007368,0.062631,0.000000,...,11,4.0,1.0,222.096667,444.750000,48.516667,52.000000,102,11,1
6,100005,M,free,PUT,LA,Macintosh; Intel Mac OS X 10_9_4,0.000000,0.008703,0.026108,0.000000,...,15,2.0,0.0,139.033333,223.683333,54.383333,36.000000,57,15,1
7,100006,F,paid,GET,MI,Windows NT 6.3; WOW64,0.000738,0.002215,0.008862,0.000738,...,41,2.0,1.0,235.976667,415.050000,0.000000,72.500000,107,41,0
8,100007,F,free,PUT,AR,Windows NT 6.3; WOW64,0.000726,0.004356,0.030492,0.013068,...,9,1.0,1.0,288.901389,749.250000,37.116667,70.666667,184,9,0
9,100008,F,free,GET,CA,Macintosh; Intel Mac OS X 10.7; rv:31.0,0.000000,0.003640,0.010921,0.003640,...,37,1.0,1.0,262.294444,386.000000,141.416667,63.500000,90,37,0
