# Read, Parse, Process E-Commerce data with NVTabular
eCommerce dataset: https://www.kaggle.com/mkechinov/ecommerce-behavior-data-from-multi-category-store

## Data Download from Kaggle

In [1]:
# !pip install kaggle --upgrade

In [2]:
# # NOTE: first to get kaggle api tiken from account page in Kaggle. Place it at ~/.kaggle/kaggle.json
# !mkdir -p ~/.kaggle/ && cp /mount/workspace/kaggle.json ~/.kaggle/ && chmod 600 ~/.kaggle/kaggle.json
# !mkdir -p ~/data
# !cd ~/data && kaggle datasets download mkechinov/ecommerce-behavior-data-from-multi-category-store
# !cd ~/data && unzip ecommerce-behavior-data-from-multi-category-store.zip

### Downloading additional months from Google Drive

In [3]:
# !pip install gdown
# !cd ~/data 

In [4]:
# !gdown https://drive.google.com/uc?id=1qZIwMbMgMmgDC5EoMdJ8aI9lQPsWA3-P -O .
# !echo "Unziping" && gunzip 2019-Dec.csv.gz

In [5]:
# !cd ~/data && gdown https://drive.google.com/uc?id=1x5ohrrZNhWQN4Q-zww0RmXOwctKHH9PT
# !cd ~/data && echo "Unziping" && gunzip 2020-Jan.csv.gz

In [6]:
# !cd ~/data && gdown https://drive.google.com/uc?id=1-Rov9fFtGJqb7_ePc6qH-Rhzxn0cIcKB
# !cd ~/data && echo "Unziping" && gunzip 2020-Feb.csv.gz

In [7]:
# !cd ~/data && gdown https://drive.google.com/uc?id=1zr_RXpGvOWN2PrWI6itWL8HnRsCpyqz8
# !cd ~/data && echo "Unziping" && gunzip 2020-Mar.csv.gz

In [8]:
# !cd ~/data && gdown https://drive.google.com/uc?id=1g5WoIgLe05UMdREbxAjh0bEFgVCjA1UL
# !cd ~/data && echo "Unziping" && gunzip 2020-Apr.csv.gz

## Configurations

In [9]:
import os

In [10]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from dask_cuda import LocalCUDACluster
from dask.distributed import Client

import rmm

import glob

import cudf, dask_cudf
import cupy
import nvtabular as nvt

import pandas as pd
import numpy as np
import shutil

## Set up Dask Cuda Cluster

In [11]:
# define some information about where to get our data
BASE_DIR = os.environ.get("BASE_DIR", "/workspace/")
INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", BASE_DIR + "ecommerce/")
dask_workdir = os.path.join(BASE_DIR, "test_dask/workdir")
OUTPUT_DATA_DIR = os.environ.get("OUTPUT_DATA_DIR", BASE_DIR + "/ecommerce/output")
stats_path = os.path.join(BASE_DIR, "test_dask/stats")

# Make sure we have a clean worker space for Dask
if os.path.isdir(dask_workdir):
    shutil.rmtree(dask_workdir)
os.makedirs(dask_workdir)

# Make sure we have a clean stats space for Dask
if os.path.isdir(stats_path):
    shutil.rmtree(stats_path)
os.mkdir(stats_path)

# Make sure we have a clean output path
if os.path.isdir(OUTPUT_DATA_DIR):
    shutil.rmtree(OUTPUT_DATA_DIR)
os.mkdir(OUTPUT_DATA_DIR)

In [12]:
# Dask dashboard
from nvtabular.utils import _pynvml_mem_size, device_mem_size
dashboard_port = "8787"

# Deploy a Single-Machine Multi-GPU Cluster
protocol = "tcp"  # "tcp" or "ucx"
NUM_GPUS = [0, 1]
visible_devices = ",".join([str(n) for n in NUM_GPUS])  # Delect devices to place workers
device_limit_frac = 0.7  # Spill GPU-Worker memory to host at this limit.
device_pool_frac = 0.8
part_mem_frac = 0.15

# Use total device size to calculate args.device_limit_frac
device_size = device_mem_size(kind="total")
device_limit = int(device_limit_frac * device_size)
device_pool_size = int(device_pool_frac * device_size)
part_size = int(part_mem_frac * device_size)

# Check if any device memory is already occupied
for dev in visible_devices.split(","):
    fmem = _pynvml_mem_size(kind="free", index=int(dev))
    used = (device_size - fmem) / 1e9
    if used > 1.0:
        warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")

cluster = None  # (Optional) Specify existing scheduler port
if cluster is None:
    cluster = LocalCUDACluster(
        protocol=protocol,
        n_workers=len(visible_devices.split(",")),
        CUDA_VISIBLE_DEVICES=visible_devices,
        device_memory_limit=device_limit,
        local_directory=dask_workdir,
        dashboard_address=":" + dashboard_port,
    )

# Create the distributed client
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:38185  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 100.00 GiB


In [13]:
NUM_MONTHS_TO_PREPROCESS = 1 #For the eCommerce dataset there are up to 7 months (2019-Oct to 2020-Apr)
KEEP_REPEATED_USER_INTERACTIONS = False

In [14]:
MONTHS_FILES = ["2019-Oct.csv", "2019-Nov.csv", "2019-Dec.csv", "2020-Jan.csv", "2020-Feb.csv", "2020-Mar.csv", "2020-Apr.csv"]

In [15]:
selected_months = MONTHS_FILES[:NUM_MONTHS_TO_PREPROCESS]
selected_months

['2019-Oct.csv']

In [16]:
files_paths = [os.path.join(INPUT_DATA_DIR, file) for file in selected_months]
files_paths

['/workspace//ecommerce/2019-Oct.csv']

#### Read through Dask-cudf from CSV

In [17]:
%%time
raw_df = dask_cudf.read_csv(files_paths, inferSchema = True) 
raw_df.head()

CPU times: user 667 ms, sys: 418 ms, total: 1.08 s
Wall time: 2.77 s


Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


In [18]:
raw_df.dtypes

event_time        object
event_type        object
product_id         int64
category_id        int64
category_code     object
brand             object
price            float64
user_id            int64
user_session      object
dtype: object

## Convert timestamp from datetime

In [20]:
raw_df['event_time_dt'] = raw_df['event_time'].astype('datetime64[s]')
raw_df['event_time_ts']= raw_df['event_time_dt'].astype('int')
raw_df.head()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session,event_time_dt,event_time_ts
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c,2019-10-01 00:00:00,1569888000
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc,2019-10-01 00:00:00,1569888000
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8,2019-10-01 00:00:01,1569888001
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713,2019-10-01 00:00:01,1569888001
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d,2019-10-01 00:00:04,1569888004


## Removing repeated (user,item) interactions

In [21]:
df = raw_df

In [22]:
cols = list(df.columns)
cols.remove('user_session')
cols

['event_time',
 'event_type',
 'product_id',
 'category_id',
 'category_code',
 'brand',
 'price',
 'user_id',
 'event_time_dt',
 'event_time_ts']

## Categorify `user_session` column

In [23]:
#  load data 
df_event = nvt.Dataset(df) 
# categorify features 
cat_feats = ['user_session'] >> nvt.ops.Categorify()

workflow = nvt.Workflow(cols + cat_feats)
workflow.fit(df_event)
df = workflow.transform(df_event).to_ddf()

In [24]:
#Keeping only the first user interaction with an item (ignores all future repeated interactions)
if not KEEP_REPEATED_USER_INTERACTIONS:
    df_first_user_item_interaction_df = df.groupby(['user_id', 'product_id']).agg({'event_time_ts': 'min'})
    df_first_user_item_interaction_df = df_first_user_item_interaction_df.compute().reset_index().rename(columns={'event_time_ts': 'first_user_item_event_time_ts'})
    df = df.merge(df_first_user_item_interaction_df, how='inner', left_on=['user_id', 'product_id', 'event_time_ts'], right_on=['user_id', 'product_id','first_user_item_event_time_ts'])
    df = df.drop(columns=['first_user_item_event_time_ts'])

#Keeps repeated interactions on the same items, removing only consecutive interactions, because it might be due to browser tab refreshes or different interaction types (e.g. click, add-to-card, purchase)
else:
    print("Count with in-session repeated interactions: {}".format(len(df)))
    # Sorts the dataframe by session and timestamp, to remove consective repetitions
    df = df.sort_values(['user_session', 'event_time_ts'])
    df['product_id_past'] = df['product_id'].shift(1)
    df['session_id_past'] = df['user_session'].shift(1)
    #Keeping only no consectutive repeated in session interactions
    df = df[~((df['user_session'] == df['session_id_past']) & \
                 (df['product_id'] == df['product_id_past']))]
    print("Count after removed in-session repeated interactions: {}".format(len(interactions_df)))
    del(df['product_id_past'])
    del(df['session_id_past'])

In [25]:
df.head()

Unnamed: 0,user_session,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,event_time_dt,event_time_ts
0,7353546,2019-10-01 17:28:04 UTC,view,1005118,2053013555631882655,electronics.smartphone,apple,975.57,555727499,2019-10-01 17:28:04,1569950884
1,2309416,2019-10-02 08:31:29 UTC,view,6501029,2053013554155487563,computers.components.motherboard,gigabyte,1172.06,516419954,2019-10-02 08:31:29,1570005089
2,7549019,2019-10-02 07:26:47 UTC,view,2400060,2053013563743667055,appliances.kitchen.hood,,271.0,554190762,2019-10-02 07:26:47,1570001207
3,1513742,2019-10-02 14:20:54 UTC,view,2701646,2053013563911439225,appliances.kitchen.refrigerators,indesit,270.02,539931286,2019-10-02 14:20:54,1570026054
4,7607749,2019-10-02 03:43:53 UTC,view,1801991,2053013554415534427,electronics.video.tv,irbis,100.39,514491621,2019-10-02 03:43:53,1569987833


**Full Dataset (7 months) Stats - Number of interactions**
- No filter: 411709736
- Removed iser consecutive repeated interactions in the same items: 261390136
- Removing all user repeated interactions with the same items: 204098003

**1 Months Stats - Number of interactions**
- No filter: 42448764
- Removed iser consecutive repeated interactions in the same items: 26565608 
- Removing all user repeated interactions with the same items: 23312920

### Categorical features encoding


In [26]:
cols= ['event_time',
 'event_type',
 'category_id',
 'category_code',
 'brand',
 'price',
 'user_session',
 'event_time_dt',
 'event_time_ts',
 'user_id',
 'product_id']

In [27]:
# categorify features 
cat_feats = ['user_id', 'product_id', 'category_id', 'category_code', 'brand', 'event_type'] >> nvt.ops.Rename(postfix = '_idx') >> nvt.ops.Categorify()

In [28]:
cat_feats.columns

['user_id_idx',
 'product_id_idx',
 'category_id_idx',
 'category_code_idx',
 'brand_idx',
 'event_type_idx']

### Extracts Temporal Features

In [29]:
item_first_interaction_df = df.groupby('product_id').agg({'event_time_ts': 'min'}).reset_index().rename(columns={'event_time_ts': 'prod_first_event_time_ts'})
item_first_interaction_df['prod_first_event_time_ts'] = item_first_interaction_df['prod_first_event_time_ts'].astype('datetime64[s]')
item_first_interaction_df.head()

Unnamed: 0,product_id,prod_first_event_time_ts
0,18900237,2019-10-08 15:31:19
1,28401063,2019-10-01 05:57:35
2,17500826,2019-10-20 04:51:14
3,21409547,2019-10-01 03:48:11
4,11800032,2019-10-01 04:51:58


In [30]:
df = df.merge(item_first_interaction_df, on=['product_id'], how='left')

In [41]:
#df.to_parquet('input_df')

In [31]:
# calculate item recency 
# create custom op
from nvtabular.ops import Operator

class ItemRecency(Operator):
    def transform(self, columns, gdf):
        for column in columns:
            col = gdf[column]
            item_first_timestamp = gdf['prod_first_event_time_ts']
            delta_days = (col - item_first_timestamp).dt.days
            gdf[column + "_age_days"] = delta_days * (delta_days >=0)
        return gdf
            
    def output_column_names(self, columns):
        return [column + "_age_days" for column in columns]
            
    def dependencies(self):
        return ["prod_first_event_time_ts"]

In [32]:
# create time features
sessionTime = ['event_time_dt']

sessionTime_hour = (
    sessionTime >> 
    nvt.ops.LambdaOp(lambda col: col.dt.hour) >> 
    nvt.ops.Rename(name = 'et_hour')
)
sessionTime_weekday = (
    sessionTime >> 
    nvt.ops.LambdaOp(lambda col: col.dt.weekday) >> 
    nvt.ops.Rename(name ='et_dayofweek')
)
sessionTime_day = (
    sessionTime >> 
    nvt.ops.LambdaOp(lambda col: col.dt.day) >> 
    nvt.ops.Rename(name ="et_dayofmonth")
)
sessionTime_month = (
    sessionTime >> 
    nvt.ops.LambdaOp(lambda col: col.dt.month) >> 
    nvt.ops.Rename(name ="et_month")
)

In [33]:
def get_cycled_feature_value_sin(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_sin = np.sin(2*np.pi*value_scaled)
    return value_sin

def get_cycled_feature_value_cos(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_cos = np.cos(2*np.pi*value_scaled)
    return value_cos

In [34]:
hour_sin = sessionTime_hour >> (lambda col: get_cycled_feature_value_sin(col, 24)) >> nvt.ops.Rename(name='et_hour_sin')
hour_cos = sessionTime_hour >> (lambda col: get_cycled_feature_value_cos(col, 24)) >> nvt.ops.Rename(name ='et_hour_cos')
weekday_sin = sessionTime_weekday >> (lambda col: get_cycled_feature_value_sin(col+1, 7)) >> nvt.ops.Rename(name = 'et_dayofweek_sin')
weekday_cos= sessionTime_weekday >> (lambda col: get_cycled_feature_value_cos(col+1, 7)) >> nvt.ops.Rename(name = 'et_dayofweek_cos')

dayofmonth_sin = sessionTime_hour >> (lambda col: get_cycled_feature_value_sin(col, 31)) >> nvt.ops.Rename(name ='et_dayofmonth_sin')
dayofmonth_cos = sessionTime_hour >> (lambda col: get_cycled_feature_value_cos(col, 31)) >> nvt.ops.Rename(name='et_dayofmonth_cos')
month_sin = sessionTime_weekday >> (lambda col: get_cycled_feature_value_sin(col, 12)) >> nvt.ops.Rename(name='et_month_sin')
month_cos= sessionTime_weekday >> (lambda col: get_cycled_feature_value_cos(col, 12)) >> nvt.ops.Rename(name = 'et_month_cos')

In [35]:
cycled_features = hour_sin + hour_cos + weekday_sin + weekday_cos + dayofmonth_sin + dayofmonth_cos + month_sin + month_cos

In [36]:
recency_features = ["event_time_dt"] >> ItemRecency() >>  nvt.ops.Rename(name='product_recency_days')
recency_features_norm = recency_features >> nvt.ops.LogOp() >> nvt.ops.Normalize() >> nvt.ops.Rename(name='product_recency_days_log_norm')

In [37]:
time_features = (
    sessionTime_hour +
    sessionTime_day + 
    sessionTime_month + 
    sessionTime_weekday +
    recency_features +
    recency_features_norm + 
    cycled_features
    #cycled_features_norm
)
#time_features.graph

In [38]:
#Smoothing price long-tailed distribution
price_log = ['price'] >> nvt.ops.LogOp() >> nvt.ops.Normalize() >> nvt.ops.Rename(name='price_log_norm')

In [39]:
avg_category_id_price = ['category_id'] >> nvt.ops.JoinGroupby(cont_cols =['price'], stats=["mean"]) >> nvt.ops.Rename(name='avg_category_id_prices')
# df_indexed = df_indexed.join(avg_category_id_prices_df, on='category_id', how='inner') \
#         .withColumn('relative_price_to_avg_category_id', (F.col('price') - F.col('avg_category_id_price')) / F.col('avg_category_id_price'))

In [52]:
#  load data 
path = '/workspace/Transformers4Rec/datasets/ecommerce_rees46/preprocessing/'
input_paths = glob.glob(os.path.join(path, "input_df", "*.parquet"))
dataset = nvt.Dataset(input_paths, part_size="100MB")
workflow = nvt.Workflow(cols + cat_feats + time_features + avg_category_id_price + price_log)
workflow.fit(dataset)
df_indexed = workflow.transform(dataset).to_ddf()



In [53]:
df_indexed.head()

Unnamed: 0,user_id_idx,product_id_idx,category_id_idx,category_code_idx,brand_idx,event_type_idx,event_time,event_type,category_id,category_code,...,et_hour_sin,et_hour_cos,et_dayofweek_sin,et_dayofweek_cos,et_dayofmonth_sin,et_dayofmonth_cos,et_month_sin,et_month_cos,avg_category_id_prices,price_log_norm
0,1174806,163410,504,1,2608,3,2019-10-01 16:00:24 UTC,view,2126679654801604876,accessories.bag,...,-0.866026,-0.499999,0.974928,-0.222522,-0.101169,-0.994869,0.5,0.866025,18.227099,-2.511035
1,2006121,126257,273,0,1846,3,2019-10-01 17:50:25 UTC,view,2053013563550729061,,...,-0.965926,-0.258819,0.974928,-0.222522,-0.299364,-0.954139,0.5,0.866025,224.449294,0.456359
2,683589,91653,225,97,0,3,2019-10-01 15:57:26 UTC,view,2053013561579406073,electronics.clocks,...,-0.707107,-0.707107,0.974928,-0.222522,0.101168,-0.994869,0.5,0.866025,293.260562,-0.892912
3,1853381,26927,197,114,649,3,2019-10-02 13:38:57 UTC,view,2053013560346280633,kids.carriage,...,-0.258819,-0.965926,0.433883,-0.900969,0.485302,-0.874347,0.866026,0.5,213.358342,-0.554439
4,59077,848,85,98,2701,3,2019-10-01 14:43:32 UTC,view,2053013555631882655,electronics.smartphone,...,-0.5,-0.866025,0.974928,-0.222522,0.299363,-0.954139,0.5,0.866025,503.092535,0.440936


### Analyze categorical statistics

In [99]:
#Relative Price to the average price for the category_id
avg_category_id_prices_df = df_indexed.groupBy('category_id').agg(F.mean('price').alias('avg_category_id_price'))
df_indexed = df_indexed.join(avg_category_id_prices_df, on='category_id', how='inner') \
        .withColumn('relative_price_to_avg_category_id', (F.col('price') - F.col('avg_category_id_price')) / F.col('avg_category_id_price'))

In [100]:
#_df_sb_a.groupBy('product_id').agg(F.stddev('price').alias('std')).where(~F.isnan('std')).agg(F.mean('std')).show()

In [101]:
#_df_sb_a.groupBy('category_id').agg(F.stddev('price').alias('std')).where(~F.isnan('std')).agg(F.mean('std')).show()

In [102]:
#_df_sb_a.groupBy('category_code').agg(F.stddev('price').alias('std')).agg(F.mean('std')).show()

In [103]:
df_indexed.dtypes

[('category_id', 'bigint'),
 ('product_id', 'int'),
 ('event_time', 'string'),
 ('event_type', 'string'),
 ('category_code', 'string'),
 ('brand', 'string'),
 ('price', 'double'),
 ('user_session', 'string'),
 ('event_time_dt', 'timestamp'),
 ('event_time_ts', 'bigint'),
 ('user_id', 'int'),
 ('product_idx', 'int'),
 ('category_sub_idx', 'int'),
 ('category_code_idx', 'int'),
 ('brand_idx', 'int'),
 ('event_type_idx', 'int'),
 ('user_idx', 'int'),
 ('et_hour', 'int'),
 ('et_dayofweek', 'int'),
 ('et_dayofmonth', 'int'),
 ('et_month', 'int'),
 ('et_hour_sin', 'float'),
 ('et_hour_cos', 'float'),
 ('et_dayofweek_sin', 'float'),
 ('et_dayofweek_cos', 'float'),
 ('et_dayofmonth_sin', 'float'),
 ('et_dayofmonth_cos', 'float'),
 ('et_month_sin', 'float'),
 ('et_month_cos', 'float'),
 ('prod_first_event_time_ts', 'bigint'),
 ('product_recency_days', 'double'),
 ('product_recency_days_log', 'double'),
 ('price_log', 'double'),
 ('avg_category_id_price', 'double'),
 ('relative_price_to_avg_cate

### Normalize Continuous Features

#### Price (log)

In [104]:
%%time
price_log_mean, price_log_std = tuple(df_indexed.agg(F.mean('price_log'), F.stddev('price_log')).take(1)[0])
print(price_log_mean, price_log_std)

4.985035180576169 1.2631603275977517
CPU times: user 10.4 ms, sys: 9.86 ms, total: 20.2 ms
Wall time: 1min 35s


In [105]:
#Z-norm
df_indexed = df_indexed.withColumn('price_log_norm', (F.col('price_log') - price_log_mean) / price_log_std)

#### Elapsed days (log)

In [106]:
%%time
product_recency_days_log_mean, product_recency_days_log_std = tuple(df_indexed.agg(F.mean('product_recency_days_log'), F.stddev('product_recency_days_log')).take(1)[0])
print(product_recency_days_log_mean, product_recency_days_log_std)

2.4707174040997737 0.8161943934923657
CPU times: user 1.96 ms, sys: 18.1 ms, total: 20 ms
Wall time: 1min 20s


In [107]:
#Z-norm
df_indexed = df_indexed.withColumn('product_recency_days_log_norm', (F.col('product_recency_days_log') - product_recency_days_log_mean) / product_recency_days_log_std)

In [108]:
df_indexed.count()

23312920

## Computing elapsed time since last interaction (on non-repeated items)

In [110]:
user_window = Window.partitionBy('user_idx').orderBy('event_time_ts')

In [111]:
df_indexed = df_indexed.withColumn('prev_event_time_ts', F.lag('event_time_ts').over(user_window)) \
                            .withColumn('delta_event_secs',F.when(F.isnull(F.col('prev_event_time_ts')), 0) \
                                                            .otherwise(F.col('event_time_ts') - F.col('prev_event_time_ts'))) \
                            .withColumn('delta_event_secs_log', F.log1p('delta_event_secs'))

In [112]:
%%time
session_delta_time_mean, session_delta_time_std = tuple(df_indexed.agg(
                            F.mean('delta_event_secs_log').alias('delta_event_secs_log_mean'),
                            F.stddev('delta_event_secs_log').alias('delta_event_secs_log_std')).take(1)[0])
print(session_delta_time_mean, session_delta_time_std)

#Z-norm
df_indexed = df_indexed.withColumn('delta_event_secs_log_norm', (F.col('delta_event_secs_log') - session_delta_time_mean) / session_delta_time_std)

4.783448763418383 3.492059719833784
CPU times: user 3.88 ms, sys: 16.6 ms, total: 20.5 ms
Wall time: 1min 21s


In [113]:
df_indexed.printSchema()

root
 |-- category_id: long (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_session: string (nullable = true)
 |-- event_time_dt: timestamp (nullable = true)
 |-- event_time_ts: long (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- product_idx: integer (nullable = true)
 |-- category_sub_idx: integer (nullable = true)
 |-- category_code_idx: integer (nullable = true)
 |-- brand_idx: integer (nullable = true)
 |-- event_type_idx: integer (nullable = true)
 |-- user_idx: integer (nullable = true)
 |-- et_hour: integer (nullable = true)
 |-- et_dayofweek: integer (nullable = true)
 |-- et_dayofmonth: integer (nullable = true)
 |-- et_month: integer (nullable = true)
 |-- et_hour_sin: float (nullable = true)
 |-- et_hour_cos: float (nullable = true)
 |--

## Processing sessions sequences

#### Aggregate by session id (create sequence as type of array)

In [114]:
session_window = Window.partitionBy('user_session').orderBy('event_time_ts')

In [115]:
def get_non_repeated_items(values):
    #Returns unique items, keep the order of their first occurence
    result = []
    for v in values:
        if v not in result:
            result.append(v)
    return result

@udf(returnType=ArrayType(IntegerType()))
def get_non_repeated_items_integer_udf(values):
    result = get_non_repeated_items(values)
    result = list([int(x) for x in result])
    return result

#########################################

def get_non_repeated_additional_items(item_ids, additional_feature_values):
    #Returns unique items, keep the order of their first occurence
    ids = []
    result = []
    for i, v in zip(item_ids, additional_feature_values):
        if i not in ids:
            ids.append(i)
            result.append(v)
    return result

@udf(returnType=ArrayType(IntegerType()))
def get_non_repeated_additional_items_integer_udf(item_ids, additional_feature_values):
    result = get_non_repeated_additional_items(item_ids, additional_feature_values)
    result = list([int(x) for x in result])
    return result

@udf(returnType=ArrayType(FloatType()))
def get_non_repeated_additional_items_float_udf(item_ids, additional_feature_values):
    result =  get_non_repeated_additional_items(item_ids, additional_feature_values)
    result = list([float(x) for x in result])
    return result

In [116]:
SESSIONS_MAX_LENGTH = 20

In [117]:
df_sb_grouped_df = df_indexed \
                .select('user_idx', 'user_session', 
                        F.first('event_time_ts').over(session_window).alias('session_start_ts'),
                        F.last('event_time_ts').over(session_window).alias('session_end_ts'),
                        F.slice(F.collect_list('product_idx').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('pid_seq_als'), 
                        F.slice(F.collect_list('event_time_ts').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('etime_seq_als'),
                        F.slice(F.collect_list('event_type_idx').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('event_type_seq_als'), 
                        F.slice(F.collect_list('category_sub_idx').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('csid_seq_als'),
                        F.slice(F.collect_list('category_code_idx').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('ccid_seq_als'),
                        F.slice(F.collect_list('brand_idx').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('bid_seq_als'),
                        F.slice(F.collect_list('price_log_norm').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('price_log_norm_seq_als'),                    
                        F.slice(F.collect_list('delta_event_secs').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('dtime_secs_seq_als'),
                        F.slice(F.collect_list('delta_event_secs_log_norm').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('dtime_secs_log_norm_seq_als'),                     
                        F.slice(F.collect_list('product_recency_days').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('prod_recency_days_als'),
                        F.slice(F.collect_list('product_recency_days_log_norm').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('prod_recency_days_log_norm_als'),
                        F.slice(F.collect_list('relative_price_to_avg_category_id').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('relative_price_to_avg_category_als'),
                        F.slice(F.collect_list('et_hour_sin').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('et_hour_sin_seq_als'),
                        F.slice(F.collect_list('et_hour_cos').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('et_hour_cos_seq_als'),
                        F.slice(F.collect_list('et_month_sin').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('et_month_sin_seq_als'),
                        F.slice(F.collect_list('et_month_cos').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('et_month_cos_seq_als'),
                        F.slice(F.collect_list('et_dayofweek_sin').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('et_dayofweek_sin_seq_als'),
                        F.slice(F.collect_list('et_dayofweek_cos').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('et_dayofweek_cos_seq_als'),
                        F.slice(F.collect_list('et_dayofmonth_sin').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('et_dayofmonth_sin_seq_als'),
                        F.slice(F.collect_list('et_dayofmonth_cos').over(session_window), 1, SESSIONS_MAX_LENGTH).alias('et_dayofmonth_cos_seq_als'),                        
                       )\
                .groupBy('user_idx', 'user_session').agg(
                    F.min('session_start_ts').alias('session_start_ts'),
                    F.max('session_end_ts').alias('session_end_ts'),
                    F.max('pid_seq_als').alias('sess_pid_seq'),
                    F.max('etime_seq_als').alias('sess_etime_seq'),
                    F.max('event_type_seq_als').alias('sess_etype_seq'),    
                    F.max('csid_seq_als').alias('sess_csid_seq'),
                    F.max('ccid_seq_als').alias('sess_ccid_seq'),
                    F.max('bid_seq_als').alias('sess_bid_seq'),
                    F.max('price_log_norm_seq_als').alias('sess_price_log_norm_seq'),        
                    F.max('dtime_secs_seq_als').alias('sess_dtime_secs_seq'),
                    F.max('dtime_secs_log_norm_seq_als').alias('sess_dtime_secs_log_norm_seq'),                    
                    F.max('prod_recency_days_als').alias('sess_prod_recency_days_seq'),
                    F.max('prod_recency_days_log_norm_als').alias('sess_prod_recency_days_log_norm_seq'),
                    F.max('relative_price_to_avg_category_als').alias('sess_relative_price_to_avg_category_seq'),
                    F.max('et_hour_sin_seq_als').alias('sess_et_hour_sin_seq'),
                    F.max('et_hour_cos_seq_als').alias('sess_et_hour_cos_seq'),
                    F.max('et_month_sin_seq_als').alias('sess_et_month_sin_seq'),
                    F.max('et_month_cos_seq_als').alias('sess_et_month_cos_seq'),
                    F.max('et_dayofweek_sin_seq_als').alias('sess_et_dayofweek_sin_seq'),
                    F.max('et_dayofweek_cos_seq_als').alias('sess_et_dayofweek_cos_seq'),
                    F.max('et_dayofmonth_sin_seq_als').alias('sess_et_dayofmonth_sin_seq'),
                    F.max('et_dayofmonth_cos_seq_als').alias('sess_et_dayofmonth_cos_seq'),               
                        )

In [118]:
'''
df_sb_grouped_nr_df = df_sb_grouped_df.select('user_idx', 'user_session', 'session_start_ts', 'session_end_ts',
                        get_non_repeated_items_integer_udf('sess_pid_seq').alias('sess_pid_seq'),
                        get_non_repeated_additional_items_integer_udf(F.col("sess_pid_seq"), F.col('sess_etime_seq')).alias('sess_etime_seq'),
                        get_non_repeated_additional_items_integer_udf(F.col("sess_pid_seq"), F.col('sess_etype_seq')).alias('sess_etype_seq'),                        
                        get_non_repeated_additional_items_integer_udf(F.col("sess_pid_seq"), F.col('sess_csid_seq')).alias('sess_csid_seq'),
                        get_non_repeated_additional_items_integer_udf(F.col("sess_pid_seq"), F.col('sess_ccid_seq')).alias('sess_ccid_seq'),
                        get_non_repeated_additional_items_integer_udf(F.col("sess_pid_seq"), F.col('sess_bid_seq')).alias('sess_bid_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_price_log_norm_seq')).alias('sess_price_log_norm_seq'),                                              
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_dtime_secs_seq')).alias('sess_dtime_secs_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_dtime_secs_log_norm_seq')).alias('sess_dtime_secs_log_norm_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_prod_recency_days_seq')).alias('sess_prod_recency_days_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_prod_recency_days_log_norm_seq')).alias('sess_prod_recency_days_log_norm_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_relative_price_to_avg_category_seq')).alias('sess_relative_price_to_avg_category_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_et_hour_sin_seq')).alias('sess_et_hour_sin_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_et_hour_cos_seq')).alias('sess_et_hour_cos_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_et_month_sin_seq')).alias('sess_et_month_sin_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_et_month_cos_seq')).alias('sess_et_month_cos_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_et_dayofweek_sin_seq')).alias('sess_et_dayofweek_sin_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_et_dayofweek_cos_seq')).alias('sess_et_dayofweek_cos_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_et_dayofmonth_sin_seq')).alias('sess_et_dayofmonth_sin_seq'),
                        get_non_repeated_additional_items_float_udf(F.col("sess_pid_seq"), F.col('sess_et_dayofmonth_cos_seq')).alias('sess_et_dayofmonth_cos_seq'),
                       ) \
            .where(F.size('sess_pid_seq') >= 2) \
            .withColumn('sess_seq_len', F.size('sess_pid_seq'))
'''            


df_sb_grouped_nr_df = df_sb_grouped_df.select('user_idx', 'user_session', 'session_start_ts', 'session_end_ts',
                        'sess_pid_seq', 'sess_etime_seq', 'sess_etype_seq', 'sess_csid_seq', 'sess_ccid_seq', 'sess_bid_seq', 
                        'sess_price_log_norm_seq', 'sess_dtime_secs_seq', 'sess_dtime_secs_log_norm_seq',
                        'sess_prod_recency_days_seq', 'sess_prod_recency_days_log_norm_seq', 'sess_relative_price_to_avg_category_seq',
                        'sess_et_hour_sin_seq', 'sess_et_hour_cos_seq', 'sess_et_month_sin_seq', 'sess_et_month_cos_seq',
                        'sess_et_dayofweek_sin_seq', 'sess_et_dayofweek_cos_seq', 'sess_et_dayofmonth_sin_seq', 'sess_et_dayofmonth_cos_seq',
                       ) \
            .where(F.size('sess_pid_seq') >= 2) \
            .withColumn('sess_seq_len', F.size('sess_pid_seq'))

In [119]:
df_sb_grouped_nr_df.printSchema()

root
 |-- user_idx: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- session_start_ts: long (nullable = true)
 |-- session_end_ts: long (nullable = true)
 |-- sess_pid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_etime_seq: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- sess_etype_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_csid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_ccid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_bid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_price_log_norm_seq: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- sess_dtime_secs_seq: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- sess_dtime_secs_log_norm_seq: array (nullable = true)
 |    |-

In [120]:
percentiles = np.arange(0., 1.1, 0.1).tolist()+[0.95,0.99,0.999]

In [121]:
'''
session_sizes_pdf = pd.DataFrame(zip(percentiles, 
                                          df_sb_grouped_nr_df.approxQuantile('sess_seq_len', probabilities=percentiles, relativeError=0.00001)),
                                         columns=['percentile', 'session_sizes']).sort_values('percentile')
session_sizes_pdf
'''

"\nsession_sizes_pdf = pd.DataFrame(zip(percentiles, \n                                          df_sb_grouped_nr_df.approxQuantile('sess_seq_len', probabilities=percentiles, relativeError=0.00001)),\n                                         columns=['percentile', 'session_sizes']).sort_values('percentile')\nsession_sizes_pdf\n"

In [122]:
df_sb_grouped_nr_df.printSchema()

root
 |-- user_idx: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- session_start_ts: long (nullable = true)
 |-- session_end_ts: long (nullable = true)
 |-- sess_pid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_etime_seq: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- sess_etype_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_csid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_ccid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_bid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_price_log_norm_seq: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- sess_dtime_secs_seq: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- sess_dtime_secs_log_norm_seq: array (nullable = true)
 |    |-

In [123]:
##Generating an example parquet file with sequences with different length (sparse) to test with NVT + PyTorch pipeline
#df_sb_grouped_nr_df.withColumn('session_start_date', F.date_trunc('day', F.to_timestamp(F.col('session_start_ts'))).cast('string').substr(0,10).alias()) \
#            .repartition(F.col('session_start_date')) \
#                .write.partitionBy('session_start_date').parquet('/mount/workspace/transformers/data_sparse_example')

## Processing user sequences

In [124]:
MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION = 20

#### Delta time between user clicks (secs)

In [125]:
df_user_seq_grouped_df = df_indexed \
                .select('user_idx',  
                        F.collect_list('user_session').over(user_window).alias('session_seq_als'),
                        F.collect_list('product_idx').over(user_window).alias('pid_seq_als'), 
                        F.collect_list('event_time_ts').over(user_window).alias('etime_seq_als'),
                        F.collect_list('event_type_idx').over(user_window).alias('event_type_seq_als'), 
                        F.collect_list('category_sub_idx').over(user_window).alias('csid_seq_als'),
                        F.collect_list('category_code_idx').over(user_window).alias('ccid_seq_als'),
                        F.collect_list('brand_idx').over(user_window).alias('bid_seq_als'),
                        F.collect_list('price_log_norm').over(user_window).alias('price_log_seq_als'),
                        
                        F.collect_list('delta_event_secs').over(session_window).alias('dtime_secs_seq_als'),
                        F.collect_list('delta_event_secs_log_norm').over(session_window).alias('dtime_secs_log_norm_seq_als'),                     
                        F.collect_list('product_recency_days').over(session_window).alias('prod_recency_days_als'),
                        F.collect_list('product_recency_days_log_norm').over(session_window).alias('prod_recency_days_log_norm_als'),
                                        
                        F.collect_list('relative_price_to_avg_category_id').over(user_window).alias('relative_price_to_avg_category_als'),
                        F.collect_list('et_hour_sin').over(user_window).alias('et_hour_sin_seq_als'),
                        F.collect_list('et_hour_cos').over(user_window).alias('et_hour_cos_seq_als'),
                        F.collect_list('et_month_sin').over(user_window).alias('et_month_sin_seq_als'),
                        F.collect_list('et_month_cos').over(user_window).alias('et_month_cos_seq_als'),
                        F.collect_list('et_dayofweek_sin').over(user_window).alias('et_dayofweek_sin_seq_als'),
                        F.collect_list('et_dayofweek_cos').over(user_window).alias('et_dayofweek_cos_seq_als'),
                        F.collect_list('et_dayofmonth_sin').over(user_window).alias('et_dayofmonth_sin_seq_als'),
                        F.collect_list('et_dayofmonth_cos').over(user_window).alias('et_dayofmonth_cos_seq_als'),                     
                       )\
                .groupBy('user_idx').agg(
                    F.max('session_seq_als').alias('user_session_seq'),
                    F.max('pid_seq_als').alias('user_pid_seq'),
                    F.max('etime_seq_als').alias('user_etime_seq'),
                    F.max('event_type_seq_als').alias('user_etype_seq'),    
                    F.max('csid_seq_als').alias('user_csid_seq'),
                    F.max('ccid_seq_als').alias('user_ccid_seq'),
                    F.max('bid_seq_als').alias('user_bid_seq'),
                    F.max('price_log_seq_als').alias('user_price_log_seq'),  
    
                    F.max('dtime_secs_seq_als').alias('user_dtime_secs_seq'),
                    F.max('dtime_secs_log_norm_seq_als').alias('user_dtime_secs_log_norm_seq'),
                    F.max('prod_recency_days_als').alias('user_prod_recency_days_seq'),
                    F.max('prod_recency_days_log_norm_als').alias('user_prod_recency_days_log_norm_seq'),
    
                    F.max('relative_price_to_avg_category_als').alias('user_relative_price_to_avg_category_seq'),
                    F.max('et_hour_sin_seq_als').alias('user_et_hour_sin_seq'),
                    F.max('et_hour_cos_seq_als').alias('user_et_hour_cos_seq'),
                    F.max('et_month_sin_seq_als').alias('user_et_month_sin_seq'),
                    F.max('et_month_cos_seq_als').alias('user_et_month_cos_seq'),
                    F.max('et_dayofweek_sin_seq_als').alias('user_et_dayofweek_sin_seq'),
                    F.max('et_dayofweek_cos_seq_als').alias('user_et_dayofweek_cos_seq'),
                    F.max('et_dayofmonth_sin_seq_als').alias('user_et_dayofmonth_sin_seq'),
                    F.max('et_dayofmonth_cos_seq_als').alias('user_et_dayofmonth_cos_seq'),               
                        )

In [126]:
df_user_seq_grouped_df.printSchema()

root
 |-- user_idx: integer (nullable = true)
 |-- user_session_seq: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- user_pid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- user_etime_seq: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- user_etype_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- user_csid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- user_ccid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- user_bid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- user_price_log_seq: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- user_dtime_secs_seq: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- user_dtime_secs_log_norm_seq: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |--

In [127]:
df_sb_grouped_nr_df.columns

['user_idx',
 'user_session',
 'session_start_ts',
 'session_end_ts',
 'sess_pid_seq',
 'sess_etime_seq',
 'sess_etype_seq',
 'sess_csid_seq',
 'sess_ccid_seq',
 'sess_bid_seq',
 'sess_price_log_norm_seq',
 'sess_dtime_secs_seq',
 'sess_dtime_secs_log_norm_seq',
 'sess_prod_recency_days_seq',
 'sess_prod_recency_days_log_norm_seq',
 'sess_relative_price_to_avg_category_seq',
 'sess_et_hour_sin_seq',
 'sess_et_hour_cos_seq',
 'sess_et_month_sin_seq',
 'sess_et_month_cos_seq',
 'sess_et_dayofweek_sin_seq',
 'sess_et_dayofweek_cos_seq',
 'sess_et_dayofmonth_sin_seq',
 'sess_et_dayofmonth_cos_seq',
 'sess_seq_len']

### Joining users and sessions sequences

In [128]:
df_user_seq_grouped_df = df_user_seq_grouped_df.withColumnRenamed('user_idx', 'user_idx2')
users_and_session_seq_joined_df = df_sb_grouped_nr_df \
                .join(df_user_seq_grouped_df,
                      on=(df_sb_grouped_nr_df['user_idx'] == df_user_seq_grouped_df['user_idx2']), how='inner') \
                .drop(F.col('user_idx2'))

In [129]:
users_and_session_seq_joined_df.printSchema()

root
 |-- user_idx: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- session_start_ts: long (nullable = true)
 |-- session_end_ts: long (nullable = true)
 |-- sess_pid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_etime_seq: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- sess_etype_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_csid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_ccid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_bid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_price_log_norm_seq: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- sess_dtime_secs_seq: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- sess_dtime_secs_log_norm_seq: array (nullable = true)
 |    |-

In [130]:
def user_seq_feature_before_session(session_start, user_ts_seq, user_feature_seq, limit):
    new_seq = list(map(lambda y: y[1], filter(lambda x: x[0] < session_start, zip(user_ts_seq, user_feature_seq))))[-limit:]
    return new_seq

@udf(returnType=ArrayType(IntegerType()))
def user_seq_before_session_integer_udf(session_start, user_ts_seq, user_feature_seq, limit):
    result = user_seq_feature_before_session(session_start, user_ts_seq, user_feature_seq, limit)
    return list([int(x) for x in result])

@udf(returnType=ArrayType(FloatType()))
def user_seq_before_session_float_udf(session_start, user_ts_seq, user_feature_seq, limit):
    result = user_seq_feature_before_session(session_start, user_ts_seq, user_feature_seq, limit)
    return list([float(x) for x in result])

@udf(returnType=ArrayType(StringType()))
def user_seq_before_session_str_udf(session_start, user_ts_seq, user_feature_seq, limit):
    result = user_seq_feature_before_session(session_start, user_ts_seq, user_feature_seq, limit)
    return list([str(x) for x in result])

In [131]:
def sessions_reversed_order(session_ids):
    last_session_id = ""
    counter = 1
    sessions_orders = []
    for session_id in reversed(session_ids):
        if session_id != last_session_id:
            counter += 1
            last_session_id = session_id
        sessions_orders.append(counter)
    return list(reversed(sessions_orders))


@udf(returnType=ArrayType(IntegerType()))
def sessions_reversed_order_udf(session_ids):
    result = sessions_reversed_order(session_ids)
    return list([int(x) for x in result])

In [132]:
@udf(returnType=ArrayType(IntegerType()))
def get_repeated_values_array_int_udf(value, repeat_times):
    return [value] * repeat_times

In [133]:
sessions_reversed_order(["a", "a", "a", "a", "b", "c", "c", "d"])

[5, 5, 5, 5, 4, 3, 3, 2]

In [134]:
session_seq_and_prev_users_seq_df = users_and_session_seq_joined_df \
            .withColumn('bef_sess_pid_seq', user_seq_before_session_integer_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_pid_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_etime_seq', user_seq_before_session_integer_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_etime_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_etype_seq', user_seq_before_session_integer_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_etype_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_csid_seq', user_seq_before_session_integer_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_csid_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_ccid_seq', user_seq_before_session_integer_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_ccid_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_bid_seq', user_seq_before_session_integer_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_bid_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_price_log_norm_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_price_log_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_dtime_secs_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_dtime_secs_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_dtime_secs_log_norm_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_dtime_secs_log_norm_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_prod_recency_days_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_prod_recency_days_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_prod_recency_days_log_norm_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_prod_recency_days_log_norm_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_relative_price_to_avg_category_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_relative_price_to_avg_category_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_et_hour_sin_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_et_hour_sin_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_et_hour_cos_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_et_hour_cos_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_et_month_sin_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_et_month_sin_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_et_month_cos_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_et_month_cos_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_et_dayofweek_sin_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_et_dayofweek_sin_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_et_dayofweek_cos_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_et_dayofweek_cos_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_et_dayofmonth_sin_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_et_dayofmonth_sin_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_et_dayofmonth_cos_seq', user_seq_before_session_float_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_et_dayofmonth_cos_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION))) \
            .withColumn('bef_sess_session_reversed_order_seq', sessions_reversed_order_udf(user_seq_before_session_str_udf(F.col('session_start_ts'), F.col('user_etime_seq'),  F.col('user_session_seq'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION)))) \
            .withColumn('sess_session_reversed_order_seq', get_repeated_values_array_int_udf(F.lit(1), F.lit(SESSIONS_MAX_LENGTH))) \
            .withColumn('bef_sess_seq_length', F.size('bef_sess_pid_seq'))

# Exporting data

#### Option A: Pad zeros to make all sequences same length
NOTE: it would be not memory-efficient. But currently it seems no way to utilize Petastorm (parquet -> dataloader) without this option.

In [135]:
from pyspark.sql.types import ArrayType, IntegerType

def pad_array(values, expected_length, trunc_start, dtype=int):
    #if dtype is float:
    #    value_to_pad = [0.]
    #else:
    #    value_to_pad = [0]
    #value_to_pad = [dtype(0)]
        
    res = list([dtype(x) for x in values]) + ([dtype(0)] * (expected_length-len(values)))
    
    # Pick first N
    if trunc_start:
        res = res[:expected_length]
    #Pick last N
    else:
        res = res[-expected_length:]
        
    return res


@udf(returnType=ArrayType(IntegerType()))
def pad_array_int(values, expected_length, trunc_start):
    return pad_array(values, expected_length, trunc_start)

@udf(returnType=ArrayType(LongType()))
def pad_array_long(values, expected_length, trunc_start):
    return pad_array(values, expected_length, trunc_start)

@udf(returnType=ArrayType(FloatType()))
def pad_array_float(values, expected_length, trunc_start):
    return pad_array(values, expected_length, trunc_start, dtype=float)

In [136]:
'''
#Export padded sequences for compatibility with PetaStorm data loader. NVTabular and PyArrow supports list columns with different lengths
sessions_users_seqs_to_export_df = \
        session_seq_and_prev_users_seq_df.select('user_idx', 'user_session', 'sess_seq_len', 
                                                 'session_start_ts',  
                                                 F.date_trunc('day', F.to_timestamp(F.col('session_start_ts'))).cast('string').substr(0,10).alias('session_start_date'), 
                                                 'user_seq_length_bef_sess', 'user_elapsed_days_bef_sess', 'user_elapsed_days_log_bef_sess_norm',
                        # Session sequences (first N interactions)
                        pad_array_long(F.col('sess_pid_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_pid_seq'),
                        pad_array_long(F.col('sess_etime_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_etime_seq'),
                        pad_array_int(F.col('sess_etype_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_etype_seq'),
                        pad_array_int(F.col('sess_csid_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_csid_seq'),
                        pad_array_int(F.col('sess_ccid_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_ccid_seq'),
                        pad_array_int(F.col('sess_bid_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_bid_seq'),
                        pad_array_float(F.col('sess_price_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_price_seq'),
                        pad_array_float(F.col('sess_dtime_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_dtime_seq'),
                        pad_array_float(F.col('sess_product_recency_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_product_recency_seq'),
                        pad_array_float(F.col('sess_relative_price_to_avg_category_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_relative_price_to_avg_category_seq'),
                        pad_array_float(F.col('sess_et_hour_sin_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_et_hour_sin_seq'), 
                        pad_array_float(F.col('sess_et_hour_cos_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_et_hour_cos_seq'),
                        pad_array_float(F.col('sess_et_month_sin_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_et_month_sin_seq'),
                        pad_array_float(F.col('sess_et_month_cos_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_et_month_cos_seq'),
                        pad_array_float(F.col('sess_et_dayofweek_sin_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_et_dayofweek_sin_seq'),
                        pad_array_float(F.col('sess_et_dayofweek_cos_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_et_dayofweek_cos_seq'),
                        pad_array_float(F.col('sess_et_dayofmonth_sin_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_et_dayofmonth_sin_seq'),
                        pad_array_float(F.col('sess_et_dayofmonth_cos_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_et_dayofmonth_cos_seq'),
                         # Users sequences before session (last M interactions) 
                         pad_array_long(F.col('user_pid_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_pid_seq_bef_sess'),
                         pad_array_long(F.col('user_etime_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_etime_seq_bef_sess'),
                         pad_array_int(F.col('user_etype_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_etype_seq_bef_sess'),
                         pad_array_int(F.col('user_csid_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_csid_seq_bef_sess'),
                         pad_array_int(F.col('user_ccid_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_ccid_seq_bef_sess'),
                         pad_array_int(F.col('user_bid_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_bid_seq_bef_sess'),
                         pad_array_float(F.col('user_price_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_price_seq_bef_sess'),
                         pad_array_float(F.col('user_dtime_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_dtime_seq_bef_sess'),
                         pad_array_float(F.col('user_product_recency_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_product_recency_seq_bef_sess'),
                         pad_array_float(F.col('user_relative_price_to_avg_category_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_relative_price_to_avg_category_seq_bef_sess'),
                         pad_array_float(F.col('user_et_hour_sin_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_et_hour_sin_seq_bef_sess'),
                         pad_array_float(F.col('user_et_hour_cos_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_et_hour_cos_seq_bef_sess'),
                         pad_array_float(F.col('user_et_month_sin_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_et_month_sin_seq_bef_sess'),
                         pad_array_float(F.col('user_et_month_cos_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_et_month_cos_seq_bef_sess'),
                         pad_array_float(F.col('user_et_dayofweek_sin_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_et_dayofweek_sin_seq_bef_sess'),
                         pad_array_float(F.col('user_et_dayofweek_cos_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_et_dayofweek_cos_seq_bef_sess'),
                         pad_array_float(F.col('user_et_dayofmonth_sin_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_et_dayofmonth_sin_seq_bef_sess'),
                         pad_array_float(F.col('user_et_dayofmonth_cos_seq_bef_sess'), F.lit(MAX_LENGTH_USER_SEQUENCE_BEFORE_SESSION), F.lit(False)).alias('user_et_dayofmonth_cos_seq_bef_sess'),
                    )
'''                    

"\n#Export padded sequences for compatibility with PetaStorm data loader. NVTabular and PyArrow supports list columns with different lengths\nsessions_users_seqs_to_export_df =         session_seq_and_prev_users_seq_df.select('user_idx', 'user_session', 'sess_seq_len', \n                                                 'session_start_ts',  \n                                                 F.date_trunc('day', F.to_timestamp(F.col('session_start_ts'))).cast('string').substr(0,10).alias('session_start_date'), \n                                                 'user_seq_length_bef_sess', 'user_elapsed_days_bef_sess', 'user_elapsed_days_log_bef_sess_norm',\n                        # Session sequences (first N interactions)\n                        pad_array_long(F.col('sess_pid_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_pid_seq'),\n                        pad_array_long(F.col('sess_etime_seq'), F.lit(SESSIONS_MAX_LENGTH), F.lit(True)).alias('sess_etime_seq'),\n            

In [137]:
sessions_users_seqs_to_export_df = \
        session_seq_and_prev_users_seq_df.select(
                        'user_idx', 'user_session', 'session_start_ts', 'sess_seq_len',  'bef_sess_seq_length',  
                        F.date_trunc('day', F.to_timestamp(F.col('session_start_ts'))).cast('string').substr(0,10).alias('session_start_date'), 
                        # Session sequences (first N interactions)
                        'sess_pid_seq',
                        'sess_etime_seq',
                        'sess_etype_seq',
                        'sess_csid_seq',
                        'sess_ccid_seq',
                        'sess_bid_seq',
                        'sess_price_log_norm_seq',
                        'sess_dtime_secs_seq',
                        'sess_dtime_secs_log_norm_seq',
                        'sess_prod_recency_days_seq',
                        'sess_prod_recency_days_log_norm_seq',
                        'sess_relative_price_to_avg_category_seq',
                        'sess_et_hour_sin_seq', 
                        'sess_et_hour_cos_seq',
                        'sess_et_month_sin_seq',
                        'sess_et_month_cos_seq',
                        'sess_et_dayofweek_sin_seq',
                        'sess_et_dayofweek_cos_seq',
                        'sess_et_dayofmonth_sin_seq',
                        'sess_et_dayofmonth_cos_seq',
                        'sess_session_reversed_order_seq',
                         # Users sequences before session (last M interactions) 
                         'bef_sess_pid_seq',
                         'bef_sess_etime_seq',
                         'bef_sess_etype_seq',
                         'bef_sess_csid_seq',
                         'bef_sess_ccid_seq',
                         'bef_sess_bid_seq',
                         'bef_sess_price_log_norm_seq',
                         'bef_sess_dtime_secs_seq',
                         'bef_sess_dtime_secs_log_norm_seq',                         
                         'bef_sess_prod_recency_days_seq',
                         'bef_sess_prod_recency_days_log_norm_seq',   
                         'bef_sess_relative_price_to_avg_category_seq',
                         'bef_sess_et_hour_sin_seq',
                         'bef_sess_et_hour_cos_seq',
                         'bef_sess_et_month_sin_seq',
                         'bef_sess_et_month_cos_seq',
                         'bef_sess_et_dayofweek_sin_seq',
                         'bef_sess_et_dayofweek_cos_seq',
                         'bef_sess_et_dayofmonth_sin_seq',
                         'bef_sess_et_dayofmonth_cos_seq',
                         'bef_sess_session_reversed_order_seq',
                    )

In [138]:
sessions_users_seqs_to_export_df.printSchema()

root
 |-- user_idx: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- session_start_ts: long (nullable = true)
 |-- sess_seq_len: integer (nullable = false)
 |-- bef_sess_seq_length: integer (nullable = false)
 |-- session_start_date: string (nullable = true)
 |-- sess_pid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_etime_seq: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- sess_etype_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_csid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_ccid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_bid_seq: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- sess_price_log_norm_seq: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- sess_dtime_secs_seq: array (nullable = true)
 |    |-

In [139]:
#sessions_users_seqs_to_export_df.show()

#### Export dataset to parquet, partioned by the session date

In [140]:
%%time
OUTPUT_PATH = '/mount/results/repeated_interactions={}/total_months={}'.format(KEEP_REPEATED_USER_INTERACTIONS, NUM_MONTHS_TO_PREPROCESS)
sessions_users_seqs_to_export_df.repartition(F.col('session_start_date')) \
                .write.partitionBy('session_start_date') \
                .parquet(os.path.join(OUTPUT_PATH, 'ecommerce_preproc.parquet'))

CPU times: user 241 ms, sys: 217 ms, total: 457 ms
Wall time: 15min 20s


#### Check exported dataset

In [141]:
check_df = spark.read.parquet(os.path.join(OUTPUT_PATH, 'ecommerce_preproc.parquet'))

In [142]:
check_df.take(10)

[Row(user_idx=503175767, user_session='a0bd23ae-2ee8-4b0f-b628-437b9137ccc9', session_start_ts=1570938717, sess_seq_len=6, bef_sess_seq_length=1, sess_pid_seq=[19764, 23487, 20743, 30416, 19282, 48790], sess_etime_seq=[1570938717, 1570938775, 1570938826, 1570938875, 1570938890, 1570938930], sess_etype_seq=[2, 2, 2, 2, 2, 2], sess_csid_seq=[59, 59, 59, 59, 59, 59], sess_ccid_seq=[38, 38, 38, 38, 38, 38], sess_bid_seq=[80, 80, 80, 80, 80, 80], sess_price_log_norm_seq=[-0.051558973702671464, -0.19539975681449054, 0.22817432520591252, -0.187058017060996, -0.30135946042794687, 0.42666204410908015], sess_dtime_secs_seq=[63846, 58, 51, 49, 15, 40], sess_dtime_secs_log_norm_seq=[1.7985935567321982, -0.20214755077162974, -0.23831352027294844, -0.24954491844478394, -0.5758378156471834, -0.3063741122860863], sess_prod_recency_days_seq=[12.038055555555555, 11.815636574074073, 11.812222222222223, 11.476759259259259, 11.99837962962963, 11.054872685185185], sess_prod_recency_days_log_norm_seq=[0.1190

In [143]:
check_df.count()

3929658