In [10]:
# Copyright 2022 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

<img src="https://developer.download.nvidia.com/notebooks/dlsw-notebooks/merlin_transformers4rec_end-to-end-session-based-01-etl-with-nvtabular/nvidia_logo.png" style="width: 90px; float: right;">

# ETL with NVTabular

This notebook is created using the latest stable [merlin-pytorch](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-pytorch) container.

**Launch the docker container**
```
docker run -it --gpus device=0 -p 8000:8000 -p 8001:8001 -p 8002:8002 -p 8888:8888 -v <path_to_data>:/workspace/data/  nvcr.io/nvidia/merlin/merlin-pytorch:22.XX
```
This script will mount your local data folder that includes your data files to `/workspace/data` directory in the merlin-pytorch docker container.

## Overview

This notebook demonstrates how to use NVTabular to perform the feature engineering that is needed to model the `YOOCHOOSE` dataset which contains a collection of sessions from a retailer. Each session  encapsulates the click events that the user performed in that session.

The dataset is available on [Kaggle](https://www.kaggle.com/chadgostopp/recsys-challenge-2015). You need to download it and copy to the `DATA_FOLDER` path. Note that we are only using the `yoochoose-clicks.dat` file.

First, let's start by importing several libraries:

In [1]:
import os
import os.path as op
import glob
import numpy as np
import gc

import cudf
import cupy
import nvtabular as nvt
from tqdm.auto import tqdm
from merlin.dag import ColumnSelector
from merlin.schema import Schema, Tags
from merlin.core.dispatch import get_lib
df_lib = get_lib()
import pandas as pd


  from .autonotebook import tqdm as notebook_tqdm


Set List values to filter training set

##  Define a preprocessing workflow with NVTabular

NVTabular is a feature engineering and preprocessing library for tabular data designed to quickly and easily manipulate terabyte scale datasets used to train deep learning based recommender systems. It provides a high level abstraction to simplify code and accelerates computation on the GPU using the RAPIDS cuDF library.

NVTabular supports different feature engineering transformations required by deep learning (DL) models such as Categorical encoding and numerical feature normalization. It also supports feature engineering and generating sequential features. 

More information about the supported features can be found <a href=https://nvidia-merlin.github.io/NVTabular/main/index.html> here. </a>

### Feature engineering: Create and Transform items features

In this cell, we are defining three transformations ops: 

- 1. Encoding categorical variables using `Categorify()` op. We set `start_index` to 1 so that encoded null values start from `1` instead of `0` because we reserve `0` for padding the sequence features.
- 2. Deriving temporal features from timestamp and computing their cyclical representation using a custom lambda function. 
- 3. Computing the item recency in days using a custom op. Note that item recency is defined as the difference between the first occurrence of the item in dataset and the actual date of item interaction. 

For more ETL workflow examples, visit NVTabular [example notebooks](https://github.com/NVIDIA-Merlin/NVTabular/tree/main/examples).

### Define the preprocessing of sequential features

Once the item features are generated, the objective of this cell is to group interactions at the session level, sorting the interactions by time. We additionally truncate all sessions to first 20 interactions and filter out sessions with less than 2 interactions.

In [2]:
# Encodes categorical features as contiguous integers
cat_feats = ColumnSelector(['session_id',
                            'pc9',
                            'product_item_type',
                            'color_group',
                           'action']) >> nvt.ops.Categorify(start_index=1)
# create time features
session_ts = ColumnSelector(['seconds'])
session_time = (
    session_ts >> 
    nvt.ops.LambdaOp(lambda col: cudf.to_datetime(col, unit='s')) >> 
    nvt.ops.Rename(name = 'event_time_dt')
)

sessiontime_weekday = (
    session_time >> 
    nvt.ops.LambdaOp(lambda col: col.dt.weekday) >> 
    nvt.ops.Rename(name ='et_dayofweek')
)

# calculate session day index based on 'timestamp-first' column in order to partition sessions by day when saving the parquet files.
day_index = session_time >> nvt.ops.LambdaOp(lambda col: col.dt.day) >> nvt.ops.Rename(name ='day')

# Derive cyclical features: Define a custom lambda function 
def get_cycled_feature_value_sin(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_sin = np.sin(2 * np.pi * value_scaled)
    return value_sin

weekday_sin = sessiontime_weekday >> (lambda col: get_cycled_feature_value_sin(col + 1, 7)) >> nvt.ops.Rename(name = 'et_dayofweek_sin')
# Apply standardization to continuous features
# sessiontime_weekday = sessiontime_weekday >> nvt.ops.LogOp() >> nvt.ops.Normalize(out_dtype=np.float32)
price_feature = ['price'] >> nvt.ops.LogOp() >> nvt.ops.Normalize(out_dtype=np.float32) >> nvt.ops.Rename(name='price_log_norm')

#set time features
time_features = (
    session_time +
    sessiontime_weekday +
    weekday_sin)


# Relative price to the average price for the category_id
def relative_price_to_avg_categ(col, gdf):
    epsilon = 1e-5
    col = ((gdf['price'] - col) / (col + epsilon)) * (col > 0).astype(int)
    return col.astype(np.float32)
    
avg_category_id_pr = ['product_item_type'] >> nvt.ops.JoinGroupby(cont_cols =['price'], stats=["mean"]) >> nvt.ops.Rename(name='avg_product_item_type_price')
relative_price_to_avg_category = avg_category_id_pr >> nvt.ops.LambdaOp(relative_price_to_avg_categ, dependency=['price']) >> nvt.ops.Rename(name="relative_price_to_avg_product_item_type")

#set all features
features = ['session_id','seconds'] + cat_feats + time_features + price_feature + relative_price_to_avg_category

# Define Groupby Operator
groupby_features = features >> nvt.ops.Groupby(
    groupby_cols= ['session_id'], 
    sort_cols=["seconds"],
    aggs={
        'pc9': ["list", "count"],
        'product_item_type': ["list"],  
        'color_group': ["list"],
        'action': ["list"],
        'price_log_norm': ["list"],
        'relative_price_to_avg_product_item_type':["list"],
        'event_time_dt': ["first"],
        'et_dayofweek_sin': ["first"]      
    },
    name_sep="-") #>> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])  # need to tag otherwise if throws an error when triton ties loading the workflow


groupby_features_list = groupby_features['pc9-list',
        'product_item_type-list',  
        'color_group-list', 
        'action-list', 
        'price_log_norm-list',
        'relative_price_to_avg_product_item_type-list']

# Truncate sequence features to first interacted 20 items 
SESSIONS_MAX_LENGTH = 20

# # Select and truncate the sequential features
# sequence_features_truncated = (
#     groupby_features['category-list']
#     >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH) 
#     >> nvt.ops.ValueCount()
# )

groupby_features_truncated = (
    groupby_features_list
    >> nvt.ops.ListSlice(0, SESSIONS_MAX_LENGTH, pad=True) 
    >>  nvt.ops.Rename(postfix = '_seq'))

day_index = day_index >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL]) # need to tag otherwise if throws an error when triton ties loading the workflow

# Select features for training 
# selected_features = groupby_features['session_id', 'pc9-count','event_time_dt-first','et_dayofweek_sin-first'] + groupby_features_truncated + day_index
selected_features = groupby_features['session_id', 'pc9-count','et_dayofweek_sin-first'] + groupby_features_truncated + day_index

#set meta data
sesion_id = selected_features['session_id'] >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])  
pc9_count = selected_features['pc9-count'] >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL,Tags.LIST,Tags.ITEM])  
et_dayofweek_sin_first = selected_features['et_dayofweek_sin-first'] >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL,Tags.LIST,Tags.ITEM])  
pc9_list_seq = selected_features["pc9-list_seq"] >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL,Tags.LIST,Tags.ITEM,Tags.ITEM_ID])  
product_item_type_list_seq = selected_features["product_item_type-list_seq"] >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL,Tags.LIST,Tags.ITEM])  
color_group_list_seq = selected_features["color_group-list_seq"] >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL,Tags.LIST,Tags.ITEM])  
action_list_seq  = selected_features["action-list_seq"] >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL,Tags.LIST,Tags.ITEM])  
price_log_norm_list_seq = selected_features["price_log_norm-list_seq"] >> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS,Tags.LIST])  
relative_price_to_avg_product_item_type_list_seq =selected_features["relative_price_to_avg_product_item_type-list_seq"] >> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS,Tags.LIST])  

final_selected_features = sesion_id + pc9_count + et_dayofweek_sin_first + pc9_list_seq + product_item_type_list_seq + color_group_list_seq + action_list_seq + price_log_norm_list_seq + relative_price_to_avg_product_item_type_list_seq + day_index


# Filter out sessions with less than 2 interactions 
MINIMUM_SESSION_LENGTH = 2
filtered_sessions = final_selected_features >> nvt.ops.Filter(f=lambda df: df["pc9-count"] >= MINIMUM_SESSION_LENGTH)

Avoid Numba low occupancy warnings:

In [3]:
from numba import config
config.CUDA_LOW_OCCUPANCY_WARNINGS = 0

### Execute NVTabular workflow

Once we have defined the general workflow (`filtered_sessions`), we provide our cudf dataset to `nvt.Dataset` class which is optimized to split data into chunks that can fit in device memory and to handle the calculation of complex global statistics. Then, we execute the pipeline that fits and transforms data to get the desired output features.

In [6]:
import glob
import os.path as op

In [7]:
DATA_FOLDER = '../data/fake_preprocessed_daily_input'


In [8]:
paths = glob.glob(op.join(DATA_FOLDER, '*parquet'))

In [9]:
workflow = nvt.Workflow(filtered_sessions)
try:
    dataset = nvt.Dataset(paths, engine='parquet')
    workflow.fit(dataset)
except Exception as e:
    print(e)
    df = df_lib.concat([df_lib.read_parquet(path) for path in sorted(paths)[:10]])
    dataset = nvt.Dataset(df, engine='parquet')
    workflow.fit(dataset)



In [43]:
!rm -rf workflow_etl

In [44]:
# Learn features statistics necessary of the preprocessing workflow
# Apply the preprocessing workflow in the dataset and convert the resulting Dask cudf dataframe to a cudf dataframe
sessions_gdf = workflow.transform(dataset).compute()

In [45]:
!rm -rf ../data/preproc_sessions_by_day

In [46]:
from transformers4rec.data.preprocessing import save_time_based_splits
save_path = "../data/preproc_sessions_by_day"
save_time_based_splits(data=nvt.Dataset(sessions_gdf),
                       output_dir=save_path,
                       partition_col='day',
                       timestamp_col='session_id', 
                      )

Creating time-based splits: 100%|███████████████| 31/31 [00:08<00:00,  3.69it/s]


In [47]:
workflow.output_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.start_index,properties.cat_path,properties.domain.min,properties.domain.max,properties.domain.name,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension
0,session_id,(Tags.CATEGORICAL),int64,False,False,,0.0,0.0,1.0,.//categories/unique.session_id.parquet,0.0,3714319.0,session_id,3714320.0,512.0
1,pc9-count,"(Tags.CATEGORICAL, Tags.LIST, Tags.ITEM)",int32,False,False,,0.0,0.0,1.0,.//categories/unique.pc9.parquet,0.0,14644.0,pc9,14645.0,344.0
2,et_dayofweek_sin-first,"(Tags.CATEGORICAL, Tags.LIST, Tags.ITEM)",float32,False,False,,,,,,,,,,
3,pc9-list_seq,"(Tags.CATEGORICAL, Tags.ID, Tags.ITEM, Tags.LI...",int64,True,False,,0.0,0.0,1.0,.//categories/unique.pc9.parquet,0.0,14644.0,pc9,14645.0,344.0
4,product_item_type-list_seq,"(Tags.CATEGORICAL, Tags.LIST, Tags.ITEM)",int64,True,False,,0.0,0.0,1.0,.//categories/unique.product_item_type.parquet,0.0,50.0,product_item_type,51.0,16.0
5,color_group-list_seq,"(Tags.CATEGORICAL, Tags.LIST, Tags.ITEM)",int64,True,False,,0.0,0.0,1.0,.//categories/unique.color_group.parquet,0.0,31.0,color_group,32.0,16.0
6,action-list_seq,"(Tags.CATEGORICAL, Tags.LIST, Tags.ITEM)",int64,True,False,,0.0,0.0,1.0,.//categories/unique.action.parquet,0.0,4.0,action,5.0,16.0
7,price_log_norm-list_seq,"(Tags.LIST, Tags.CONTINUOUS)",float32,True,False,,,,,,,,,,
8,relative_price_to_avg_product_item_type-list_seq,"(Tags.LIST, Tags.CONTINUOUS)",float32,True,False,,,,,,,,,,
9,day,(Tags.CATEGORICAL),int16,False,False,,,,,,,,,,


In [48]:
workflow.transform(dataset).to_parquet("../data/schema_generated", out_files_per_proc=1)


In [49]:
!cat ../data/schema_generated/schema.pbtxt

feature {
  name: "session_id"
  type: INT
  int_domain {
    name: "session_id"
    max: 3714319
    is_categorical: true
  }
  annotation {
    tag: "categorical"
    extra_metadata {
      type_url: "type.googleapis.com/google.protobuf.Struct"
      value: "\n\021\n\013num_buckets\022\002\010\000\n\033\n\016freq_threshold\022\t\021\000\000\000\000\000\000\000\000\n\025\n\010max_size\022\t\021\000\000\000\000\000\000\000\000\n\030\n\013start_index\022\t\021\000\000\000\000\000\000\360?\n5\n\010cat_path\022)\032\'.//categories/unique.session_id.parquet\nG\n\017embedding_sizes\0224*2\n\030\n\013cardinality\022\t\021\000\000\000\000\210VLA\n\026\n\tdimension\022\t\021\000\000\000\000\000\000\200@\n\034\n\017dtype_item_size\022\t\021\000\000\000\000\000\000P@\n\r\n\007is_list\022\002 \000\n\017\n\tis_ragged\022\002 \000"
    }
  }
}
feature {
  name: "pc9-count"
  type: INT
  int_domain {
    name: "pc9"
    max: 14644
    is_categorical: true
  }
  annotation {
    tag: "categorical"
  

In [50]:
workflow.save('../data/workflow_etl')

### Export pre-processed data by day

In this example we are going to split the preprocessed parquet files by days, to allow for temporal training and evaluation. There will be a folder for each day and three parquet files within each day: `train.parquet`, `validation.parquet` and `test.parquet`.
  
P.s. It is worthwhile to note that the dataset has a single categorical feature (category), which, however, is inconsistent over time in the dataset. All interactions before day 84 (2014-06-23) have the same value for that feature, whereas many other categories are introduced afterwards. Thus for this example, we save only the last five days.

That's it! We created our sequential features, now we can go to the next notebook to train a PyTorch session-based model.