## Feature engineering - events

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType
from datetime import datetime


In [2]:
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

In [14]:
users = pd.read_csv('data/user_dict.csv', header=None)
user_dict = {row[1][0]:int(row[1][1]) for row in users.iterrows()}

In [15]:
name = 'user_id_hash'
fn = UserDefinedFunction(lambda x: user_dict[x], IntegerType())
df = ss.read.csv('data/events.csv', header=True, inferSchema=True)


In [None]:
# Convert user_id_hash to user_id
purchases = df.withColumn('user_id', fn(df.user_id_hash))\
            .drop('user_id_hash', 'app_id', 'session_id')\
            .filter("event == 8")\
            .toPandas()

In [None]:
purchases['datetime'] = purchases['event_timestamp']\
                        .apply(lambda x:datetime.fromtimestamp(x/1000))
purchases = purchases[(purchases.datetime < '2018-12-01') & (purchases['event_value'] != 0)]

In [61]:
f1 = purchases.groupby('user_id')\
                            .size()\
                            .reset_index(name='purchase_count_total')\
                            .set_index('user_id')
f2 = purchases[purchases.datetime >= '2018-11-24']\
                            .groupby('user_id')\
                            .size()\
                            .reset_index(name='purchase_count_last_week')\
                            .set_index('user_id')
f3 = purchases[purchases.datetime >= '2018-11-17']\
                            .groupby('user_id').size()\
                            .reset_index(name='purchase_count_2_weeks')\
                            .set_index('user_id')

In [62]:
f4 = purchases.groupby('user_id')['event_value']\
                            .agg('sum')\
                            .reset_index(name='purchase_sum_total')\
                            .set_index('user_id')
f5 = purchases[purchases.datetime >= '2018-11-24']\
                            .groupby('user_id')['event_value']\
                            .agg('sum')\
                            .reset_index(name='purchase_sum_last_week')\
                            .set_index('user_id')
f6 = purchases[purchases.datetime >= '2018-11-17']\
                            .groupby('user_id')['event_value']\
                            .agg('sum')\
                            .reset_index(name='purchase_sum_2_weeks')\
                            .set_index('user_id')

In [None]:
# first create a dummy df with all user_ids and then join with other features
dummy = pd.DataFrame({'user_id': list(user_dict.values()), 'dummy': [0 for _ in range(len(user_dict.values()))]})
features_lst = [f1, f2, f3, f4, f5, f6]
features = dummy.join(features_lst[:]).drop(axis=1, columns='dummy').fillna(value=0)


In [78]:
print(sum(features.purchase_count_total != 0))
print(sum(features.purchase_count_total == 0))
32752/588748

32752
588748


0.055629912967857215

In [76]:
features.to_csv(path_or_buf='features_events.csv', index=False)