# Feature Engineering - Sessions dataset

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql import functions as f
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType
from pyspark.sql import SQLContext
from pyspark.sql import Window
from datetime import datetime

In [3]:
# load data and drop irrelevant columns
df = pd.read_csv("/Users/Reagan/Dropbox/zleanplum_data/sessions.csv")
df.drop(['app_id',"device_id","latitude","longitude",
         'is_wau','is_mau','country','city','timezone',
         'timezone_offset','os_name','locale','session_id'], axis=1, inplace=True)

In [284]:
# timestamps
# create (weekofday, month, day) for start_timestamp; create (how many days is sign-up day from dec1) 

In [8]:
df['start_timestamp'] = df['start_timestamp'].apply(lambda x:datetime.fromtimestamp(x/1000))
df['user_created_timestamp'] = df['user_created_timestamp'].apply(lambda x:datetime.fromtimestamp(x/1000))

df['st_weekday']=df['start_timestamp'].dt.weekday
df['st_month']=df['start_timestamp'].dt.month
df['st_day']=df['start_timestamp'].dt.day

dec1 = datetime(2018, 12, 1, 00, 00, 00,0)  # here is dec 15th
df['cr_from_now'] = df['user_created_timestamp'].apply(lambda x: pd.Timedelta(dec1 - x).days)

In [9]:
df.drop(['start_timestamp','user_created_timestamp'],axis=1, inplace=True)

In [8]:
# #delete those offline session. 
# df = df[df['is_session']==True]
# df.drop('is_session',axis=1, inplace=True)

In [10]:
df.head()

Unnamed: 0,previous_sessions_duration,is_user_first_session,is_session,is_developer,region,session_index,user_id_hash,st_weekday,st_month,st_day,cr_from_now
0,25837591,False,True,False,0,30,9943447915df3a45fd6720a026af905b6da6b56a37701b...,2,11,14,55
1,35050130,False,True,False,0,47,9943447915df3a45fd6720a026af905b6da6b56a37701b...,5,12,1,55
2,11343848,False,True,False,0,10,9943447915df3a45fd6720a026af905b6da6b56a37701b...,2,10,10,55
3,13499724,False,True,False,11,13,9943447915df3a45fd6720a026af905b6da6b56a37701b...,6,10,21,55
4,32788010,False,True,False,0,41,9943447915df3a45fd6720a026af905b6da6b56a37701b...,0,11,19,55


In [11]:
df.to_csv('session_step1_training5.csv', encoding='utf-8', index=False)

In [2]:
# convert to spark dataframe
ss = SparkSession.builder.getOrCreate()
myschema= StructType([ StructField("previous_sessions_duration", IntegerType(), True)\
                       ,StructField("is_user_first_session", StringType(), True)\
                       ,StructField("is_developer", StringType(), True)\
                       ,StructField("is_session", StringType(), True)\
                       ,StructField("region", StringType(), True)\
                       ,StructField("session_index", IntegerType(), True)\
                       ,StructField("user_id_hash", StringType(), True)\
                       ,StructField("st_weekday", IntegerType(), True)\
                       ,StructField("st_month", IntegerType(), True)\
                       ,StructField("st_day", IntegerType(), True)\
                       ,StructField("cr_from_now", IntegerType(), True)])

In [4]:
path = '/Users/Reagan/Dropbox/SF_DS/COURSE/5.630_advanced_ml/final project/session_step1_training5.csv'
spark_df = ss.read.csv(path,schema=myschema,header = True)

In [7]:
# convert user_id_hash to integers to save memory
userid = pd.read_csv('/Users/Reagan/Downloads/user_dict.csv', header=None)
userid_dict = {row[1][0]:row[1][1] for row in userid.iterrows()}
fn = UserDefinedFunction(lambda x: userid_dict[x], IntegerType())
spark_df = spark_df.withColumn('id_', fn(spark_df.user_id_hash)).drop('user_id_hash')

In [8]:
spark_df.select('id_').distinct().count()

621106

In [295]:
# convert T/F to 1/0 for "is_user_first_session" and "is_developer"

In [9]:
TF_dict ={"True":1, "False":0}
fn3 = UserDefinedFunction(lambda x: TF_dict[x], IntegerType())
spark_df = spark_df.withColumn('is_user_first_session2', fn3(spark_df.is_user_first_session)).drop('is_user_first_session')
spark_df = spark_df.withColumn('is_developer2', fn3(spark_df.is_developer)).drop('is_developer')
spark_df = spark_df.withColumn('is_session2', fn3(spark_df.is_session)).drop('is_session')

In [11]:
# feature 1 
cr = spark_df.groupBy('id_').agg(avg("cr_from_now").alias("cr_from_now")).cache()  #becuz of an annoying user
cr.count()

621106

In [12]:
developer = spark_df.groupBy('id_').agg(max("is_developer2").alias("is_developer3")).cache()  #becuz of an annoying user
developer.count()

621106

In [27]:
# region
cnts = spark_df.groupBy(['id_','region']).agg(count("id_").alias('count')).cache()
w = Window.partitionBy('id_')
region = cnts.withColumn('max_count', f.max('count').over(w)).where(f.col('count') == f.col('max_count')).cache() # we have some id that its region has multiple mode.
region = region.dropDuplicates(subset = ['id_']).select('id_','region')   # just keep the first one if there are duplicates
region.show(3)
region.count()

+---+------+
|id_|region|
+---+------+
|148|    is|
|463|    il|
|471|    fy|
+---+------+
only showing top 3 rows



621106

In [301]:
# #get the training data（we only need before 12.1） OR not doing so for submitting final results
# spark_df = spark_df.filter(spark_df.st_month != 12)
# spark_df.count()

### cautious! numbers associated with time ! 

In [28]:
# feature 3
last2week_ss_duration = spark_df.filter(spark_df.st_month==11).filter(spark_df.st_day > 15).groupBy('id_').agg(sum("previous_sessions_duration").alias("last2week_ss_duration")).cache()
print(last2week_ss_duration.count())
print(last2week_ss_duration.select(last2week_ss_duration.id_).distinct().count())

230672
230672


In [29]:
# feature 4
total = spark_df.groupBy('id_').agg(count("id_").alias("total_count")).cache()
total.count()

621106

In [30]:
# feature 5
last2week = spark_df.filter(spark_df.st_month==11).filter(spark_df.st_day > 15).groupBy('id_').agg(count("id_").alias("last2week_count")).cache()
last2week.count()

230672

In [31]:
# feature 6
last5day = spark_df.filter(spark_df.st_month==11).filter(spark_df.st_day > 25).groupBy('id_').agg(count("id_").alias("last5day_count")).cache()
last5day.count()

101059

In [32]:
# feature 7
weekend = spark_df.filter((spark_df.st_weekday==5) | (spark_df.st_weekday==6)).groupBy('id_').agg(count("id_").alias("weekend_count")).cache()
weekend.count()

379136

In [33]:
# feature 8 
weekday = spark_df.filter((spark_df.st_weekday==0) | (spark_df.st_weekday==1) | (spark_df.st_weekday==2)| (spark_df.st_weekday==3) | (spark_df.st_weekday==4)).groupBy('id_').agg(count("id_").alias("weekday_count")).cache()
weekday.count()

561847

In [34]:
# join these small tables together
join0 = cr.join(region, on='id_', how='left')
join1 = join0.join(last2week_ss_duration, on='id_', how='left') 
join2 = join1.join(total, on='id_', how='left') 
join3 = join2.join(last2week, on='id_', how='left') 
join4 = join3.join(last5day, on='id_', how='left') 
join5 = join4.join(weekend, on='id_', how='left')
ss_features = join5.join(weekday, on='id_', how='left').cache()
# ss_features = ss_features.drop('st_weekday','st_month','st_day','previous_sessions_duration')

In [35]:
ss_features.count()

621106

In [36]:
ss_features.show(3)

+---+-----------+------+---------------------+-----------+---------------+--------------+-------------+-------------+
|id_|cr_from_now|region|last2week_ss_duration|total_count|last2week_count|last5day_count|weekend_count|weekday_count|
+---+-----------+------+---------------------+-----------+---------------+--------------+-------------+-------------+
|148|       29.0|    is|                 null|          6|           null|          null|            3|            3|
|463|       42.0|    il|                 null|          1|           null|          null|         null|            1|
|471|       42.0|    fy|                 null|          4|           null|          null|            3|            1|
+---+-----------+------+---------------------+-----------+---------------+--------------+-------------+-------------+
only showing top 3 rows



### Convert to Pandas

In [37]:
sf = ss_features.toPandas()

In [38]:
sf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 621106 entries, 0 to 621105
Data columns (total 9 columns):
id_                      621106 non-null int32
cr_from_now              621106 non-null float64
region                   620446 non-null object
last2week_ss_duration    230672 non-null float64
total_count              621106 non-null int64
last2week_count          230672 non-null float64
last5day_count           101059 non-null float64
weekend_count            379136 non-null float64
weekday_count            561847 non-null float64
dtypes: float64(6), int32(1), int64(1), object(1)
memory usage: 40.3+ MB


In [39]:
# if the count feature is NaN, it means the count value should be zero

In [40]:
sf['last2week_ss_duration'].fillna(0, inplace=True)
sf['total_count'].fillna(0, inplace=True)
sf['last2week_count'].fillna(0, inplace=True)
sf['last5day_count'].fillna(0, inplace=True)
sf['weekend_count'].fillna(0, inplace=True)
sf['weekday_count'].fillna(0, inplace=True)

In [42]:
# unknown regions
sf['region'].fillna('unknown', inplace=True)

In [44]:
# using feature hashing on "region"
from sklearn.feature_extraction import FeatureHasher
h = FeatureHasher(n_features=20)
D = [{x:1} for x in sf.region.values]
f = h.transform(D)
region_hashing = f.toarray()

In [45]:
region_hashing_df = pd.DataFrame(columns=['r1', 'r2', 'r3','r4', 'r5', 'r6','r7', 'r8', 'r9','r10',
                                 'r11', 'r12', 'r13','r14', 'r15', 'r16','r17', 'r18', 'r19','r20'])
region_df = region_hashing_df.append(pd.DataFrame(region_hashing, columns=region_hashing_df.columns))

## Final features

In [47]:
session_features = pd.concat([sf, region_df], axis=1)
session_features.drop(['region'], axis=1, inplace=True)
session_features.head(3)

Unnamed: 0,id_,cr_from_now,last2week_ss_duration,total_count,last2week_count,last5day_count,weekend_count,weekday_count,r1,r2,...,r11,r12,r13,r14,r15,r16,r17,r18,r19,r20
0,148,29.0,0.0,6,0.0,0.0,3.0,3.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
1,463,42.0,0.0,1,0.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,471,42.0,0.0,4,0.0,0.0,3.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [48]:
len(session_features)

621106

In [49]:
dummy = pd.DataFrame({'id_': list(userid_dict.values()), 'dummy': [0 for _ in range(len(userid_dict.values()))]})
features = pd.merge(dummy,session_features, on='id_', how='left')
features = features.drop(['dummy'],axis=1)

In [50]:
features.to_csv('features_session_training5.0.csv', encoding='utf-8', index=False)