# NVIDIA Merlin on Microsoft's News Dataset (MIND)

## Overview

In this tutorial notebook, we would be using the [Microsoft's News Dataset (MIND)](https://msnews.github.io/) to demonstrate NVTabular for ETL the data and HugeCTR for training Deep Neural Network models for building a Recommender System.

The MIND dataset contains 15M impressions generated by 1M users over 160k news articles. Our goal from this jupyter notebook would be to train a model that can predict whether a user would click on a news article or not.

In order to build a Recommender System, we would be first cleaning and pre-processing the data, then developing simple time based and complex target & count encoded features to finally train and evaluate Deep Learning Recommendation Model (DLRM).

Please remember to run this jupyter notebook in the [merlin-training:0.6](https://ngc.nvidia.com/catalog/containers/nvidia:merlin:merlin-training) docker container.

## Step 1: Import libraries and create directories

In [1]:
# Install packages required for this notebook
!pip install tqdm
!apt install wget unzip

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
You should consider upgrading via the '/usr/bin/python -m pip install --upgrade pip' command.[0m
Reading package lists... Done
Building dependency tree       
Reading state information... Done
unzip is already the newest version (6.0-25ubuntu1).
wget is already the newest version (1.20.3-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 4 not upgraded.


In [2]:
import time, glob, shutil, sys, os, pickle, json
from tqdm import tqdm

import cupy as cp          # CuPy is an implementation of NumPy-compatible multi-dimensional array on GPU
import cudf                # cuDF is an implementation of Pandas-like Dataframe on GPU
import rmm                 # library for pre-allocating memory on GPU

import numpy as np

# NVTabular is the core library we will use here for feature engineering/preprocessing on GPU
from nvtabular.ops import Operator
import nvtabular as nvt
from nvtabular.utils import device_mem_size

# Dask is the backend job scheduler used by NVTabular
import dask   
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from dask.distributed import wait
from dask.utils import parse_bytes
from dask.delayed import delayed

It is often a good idea to set-aside (fast) dedicated disk space for dask workers to spill data and logging information. To make things simple, we will perform all IO within a single `BASE_DIR` for this example. Feel free to set this variable yourself.

In [3]:
# Define "fast" root directory for this example
BASE_DIR = os.environ.get("BASE_DIR", "./basedir")

# Define worker/output directories
dask_workdir = os.path.join(BASE_DIR, "workdir")

# Directory to store the raw downloaded dataset
data_input_path = os.path.join(BASE_DIR, "dataset")
data_path_train = os.path.join(data_input_path, "train")
data_path_valid = os.path.join(data_input_path, "valid")

# Directory to store NVTabular's processed dataset
data_output_path = os.path.join(BASE_DIR, "processed_nvt")
output_train_path = os.path.join(data_output_path, "train")
output_valid_path = os.path.join(data_output_path, "valid")

# Directory to store HugeCTR's train configurations and weights
config_output_path = os.path.join(BASE_DIR, "configs")
weights_path = os.path.join(BASE_DIR, "weights")

#Creating and cleaning our worker/output directories
try:
    # Ensure BASE_DIR exists
    if not os.path.isdir(BASE_DIR):
        os.mkdir(BASE_DIR)

    # Make sure we have a clean worker space for Dask
    if os.path.isdir(dask_workdir):
        shutil.rmtree(dask_workdir)
    os.mkdir(dask_workdir)

    # Make sure we have a clean path for downloading dataset and preprocessing
    if os.path.isdir(data_input_path):
        shutil.rmtree(data_input_path)
    os.mkdir(data_input_path)
    os.mkdir(data_path_train)
    os.mkdir(data_path_valid)

    # Make sure we have a clean output path
    if os.path.isdir(data_output_path):
        shutil.rmtree(data_output_path)
    os.mkdir(data_output_path)
    os.mkdir(output_train_path)
    os.mkdir(output_valid_path)
    
    # Make sure we have a clean configs and weights path
    if os.path.isdir(config_output_path):
        shutil.rmtree(config_output_path)
    os.mkdir(config_output_path)    
        
    if os.path.isdir(weights_path):
        shutil.rmtree(weights_path)
    os.mkdir(weights_path)

except OSError:
    print ("Creation of the directories failed")
else:
    print ("Successfully created the directories")

The following directory structure has been created and would be used to store everything concerning this tutorial:

basedir <br>
&emsp; |--- workdir    
&emsp; |--- dataset <br>
&emsp; &emsp; |--- train <br>
&emsp; &emsp; |--- valid  <br>
&emsp; |--- processed_nvt <br>
&emsp;  &emsp; |--- train <br>
&emsp;  &emsp; |--- valid  <br>
&emsp; |--- configs <br>
&emsp; |--- weights <br>    

## Step 2: Deploy a Distributed-Dask cluster

In [4]:
# Check the GPUs that are available to this notebook
!nvidia-smi

Tue Jun 22 22:41:58 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.119.04   Driver Version: 450.119.04   CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla V100-SXM2...  On   | 00000000:06:00.0 Off |                    0 |
| N/A   36C    P0    42W / 300W |      3MiB / 16160MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  Tesla V100-SXM2...  On   | 00000000:07:00.0 Off |                    0 |
| N/A   40C    P0    46W / 300W |      3MiB / 16160MiB |      0%      Default |
|       

### Initialize Dask GPU cluster

In [5]:
NUM_GPUS = [0,1,2] # Set this to the GPU IDs that are observed from the above cell

# Dask dashboard
dashboard_port = "8787"

# Deploy a single-machine multi-GPU cluster
protocol = "tcp"             # "tcp" or "ucx"
visible_devices = ",".join([str(n) for n in NUM_GPUS])  # Detect devices to place workers
device_spill_frac = 0.9      # Spill GPU-Worker memory to host at this limit.
                             # Reduce if spilling fails to prevent
                             # device memory errors.

# Get device memory capacity
capacity = device_mem_size(kind="total") 

# Check if any device memory is already occupied
"""
for dev in visible_devices.split(","):
    fmem = _pynvml_mem_size(kind="free", index=int(dev))
    used = (device_size - fmem) / 1e9
    if used > 1.0:
        warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")
"""

cluster = None               # (Optional) Specify existing scheduler port
if cluster is None:
    cluster = LocalCUDACluster(
        protocol = protocol,
        n_workers=len(visible_devices.split(",")),
        CUDA_VISIBLE_DEVICES = visible_devices,
        device_memory_limit = capacity * device_spill_frac,
        local_directory=dask_workdir,
        dashboard_address=":" + dashboard_port,
    )

# Create the distributed client
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:44281  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 3  Cores: 3  Memory: 503.79 GiB


In [6]:
# Initialize RMM pool on ALL workers
def _rmm_pool():
    rmm.reinitialize(
        pool_allocator=True,
        initial_pool_size=None, # Use default size
    )
    
client.run(_rmm_pool)

{'tcp://127.0.0.1:33443': None,
 'tcp://127.0.0.1:34239': None,
 'tcp://127.0.0.1:37923': None}

## Step 3:  Download & explore MIND dataset

MIcrosoft News Dataset (MIND) is a large-scale dataset for news recommendation research. It was collected from anonymized behavior logs of Microsoft News website.

Please read and accept the Microsoft Research License Terms before downloading.

Let's download the train and validation set, and unzip them to their respective directories. 

In [None]:
!wget https://mind201910small.blob.core.windows.net/release/MINDlarge_train.zip
!wget https://mind201910small.blob.core.windows.net/release/MINDlarge_dev.zip

In [None]:
!unzip MINDlarge_train.zip -d $BASE_DIR/dataset/train
!unzip MINDlarge_dev.zip -d  $BASE_DIR/dataset/valid

The MIND dataset for news recommendation was collected from anonymized behavior logs of Microsoft News website. To protect user's privacy, each user is de-linked from the production system when securely hashed into an anonymized ID. 
MIND dataset team has randomly sampled 1M users who had at least 5 news clicks from October 12 to November 22, 2019 (6 weeks).

Microsoft has provided train, validation and test sets of this data but we are going to use the train and validation set for this tutorial.

### Dataset format 

Each set of this data contains the following 4 files:

1. behaviors.tsv - The click history and impression logs of users
2. news.tsv - Details of news articles mapped with the news ID
3. entity_embedding.vec - The embeddings of entities in news extracted from knowledge graph
4. relation_embedding.vec - The embeddings of relations between entities extracted from knowledge graph

Let's take a look at both these TSV files and understand how we can utilise them for our Recommendation System. <br>
Note - For the ease of this tutorial, we are ignoring the embeddings provided by the MIND team.

### Behaviors data

In [7]:
behaviors_train = cudf.read_csv(os.path.join(data_path_train , 'behaviors.tsv'), 
                                header=None, 
                                sep='\t',)
behaviors_train.head()

Unnamed: 0,0,1,2,3,4
0,1,U87243,11/10/2019 11:30:54 AM,N8668 N39081 N65259 N79529 N73408 N43615 N2937...,N78206-0 N26368-0 N7578-0 N58592-0 N19858-0 N5...
1,2,U598644,11/12/2019 1:45:29 PM,N56056 N8726 N70353 N67998 N83823 N111108 N107...,N47996-0 N82719-0 N117066-0 N8491-0 N123784-0 ...
2,3,U532401,11/13/2019 11:23:03 AM,N128643 N87446 N122948 N9375 N82348 N129412 N5...,N103852-0 N53474-0 N127836-0 N47925-1
3,4,U593596,11/12/2019 12:24:09 PM,N31043 N39592 N4104 N8223 N114581 N92747 N1207...,N38902-0 N76434-0 N71593-0 N100073-0 N108736-0...
4,5,U239687,11/14/2019 8:03:01 PM,N65250 N122359 N71723 N53796 N41663 N41484 N11...,N76209-0 N48841-0 N67937-0 N62235-0 N6307-0 N3...


Each row in this data file represents one instance of an impression generated by the user. The columns of behaviors data are represented as:<br>

[Impression ID] [User ID] [Time until when Impression Recorded] [User Click History] [Impression News]

**Column 0**: Impression ID (int64)<br>
This is the ID of the impression generated.<br>
e.g. 1,2,3,4,5
        
**Column 1**: User ID (string)<br>
The anonymous ID of a user who has generated that impression.<br>
e.g. U89 , U395 , U60005, U3965770
        
**Column 2**: Time (timestamp)<br>
The impression time with format `MM/DD/YYYY HH:MM:SS AM/PM` <br>
This is the point of time upto which the user's impression have been captured. 
            
**Column 3**: History (string)<br>
The news click history of this user before this impression. The clicked news articles are ordered by time.<br>
e.g. N106403 N71977 N97080 N102132 N97212 N121652
        
**Column 4**: Impressions (string)<br>
List of news displayed to the user and user's click behaviors on them (1 for click and 0 for non-click).<br>
e.g. N129416-0 N26703-1 N120089-1 N53018-0 N89764-0 N91737-0 N29160-0
   
The corresponding details of news ID in history and impression columns would be present in the news.tsv file.

For more details on dataset: Official MIND Dataset Description, click [Official Dataset Description](https://github.com/msnews/msnews.github.io/blob/master/assets/doc/introduction.md)

Let's reload the data with their respective column names.

In [8]:
behaviors_columns = ['impression_id', 'uid', 'time', 'history', 'impressions']

In [9]:
behaviors_train = cudf.read_csv(os.path.join(data_path_train , 'behaviors.tsv'), 
                          header=None, 
                          names=behaviors_columns,
                    sep='\t',)
behaviors_train.head()

Unnamed: 0,impression_id,uid,time,history,impressions
0,1,U87243,11/10/2019 11:30:54 AM,N8668 N39081 N65259 N79529 N73408 N43615 N2937...,N78206-0 N26368-0 N7578-0 N58592-0 N19858-0 N5...
1,2,U598644,11/12/2019 1:45:29 PM,N56056 N8726 N70353 N67998 N83823 N111108 N107...,N47996-0 N82719-0 N117066-0 N8491-0 N123784-0 ...
2,3,U532401,11/13/2019 11:23:03 AM,N128643 N87446 N122948 N9375 N82348 N129412 N5...,N103852-0 N53474-0 N127836-0 N47925-1
3,4,U593596,11/12/2019 12:24:09 PM,N31043 N39592 N4104 N8223 N114581 N92747 N1207...,N38902-0 N76434-0 N71593-0 N100073-0 N108736-0...
4,5,U239687,11/14/2019 8:03:01 PM,N65250 N122359 N71723 N53796 N41663 N41484 N11...,N76209-0 N48841-0 N67937-0 N62235-0 N6307-0 N3...


In [10]:
behaviors_valid = cudf.read_csv(os.path.join(data_path_valid , 'behaviors.tsv'), 
                          header=None, 
                          names=behaviors_columns,
                    sep='\t',)
behaviors_valid.head()

Unnamed: 0,impression_id,uid,time,history,impressions
0,1,U134050,11/15/2019 8:55:22 AM,N12246 N128820 N119226 N4065 N67770 N33446 N10...,N91737-0 N30206-0 N54368-0 N117802-0 N18190-0 ...
1,2,U254959,11/15/2019 11:42:35 AM,N34011 N9375 N67397 N7936 N118985 N109453 N103...,N119999-0 N24958-0 N104054-0 N33901-0 N9250-0 ...
2,3,U499841,11/15/2019 9:08:21 AM,N63858 N26834 N6379 N85484 N15229 N65119 N1047...,N18190-0 N89764-0 N91737-0 N54368-0 N49978-1 N...
3,4,U107107,11/15/2019 5:50:31 AM,N12959 N8085 N18389 N3758 N9740 N90543 N129790...,N122944-1 N18190-0 N55801-0 N59297-0 N128045-0...
4,5,U492344,11/15/2019 5:02:25 AM,N109183 N48453 N85005 N45706 N98923 N46069 N35...,N64785-0 N82503-0 N32993-0 N122944-0 N29160-0 ...


### News data

In [11]:
news_train = cudf.read_csv(os.path.join(data_path_train , 'news.tsv'), 
                          header=None, 
                          sep='\t',)
news_train.head()

Unnamed: 0,0,1,2,3,4,5,6,7
0,N88753,lifestyle,lifestyleroyals,"The Brands Queen Elizabeth, Prince Charles, an...","Shop the notebooks, jackets, and more that the...",https://assets.msn.com/labs/mind/AAGH0ET.html,"[{""Label"": ""Prince Philip, Duke of Edinburgh"",...",[]
1,N45436,news,newsscienceandtechnology,Walmart Slashes Prices on Last-Generation iPads,Apple's new iPad releases bring big deals on l...,https://assets.msn.com/labs/mind/AABmf2I.html,"[{""Label"": ""IPad"", ""Type"": ""J"", ""WikidataId"": ...","[{""Label"": ""IPad"", ""Type"": ""J"", ""WikidataId"": ..."
2,N23144,health,weightloss,50 Worst Habits For Belly Fat,These seemingly harmless habits are holding yo...,https://assets.msn.com/labs/mind/AAB19MK.html,"[{""Label"": ""Adipose tissue"", ""Type"": ""C"", ""Wik...","[{""Label"": ""Adipose tissue"", ""Type"": ""C"", ""Wik..."
3,N86255,health,medical,Dispose of unwanted prescription drugs during ...,,https://assets.msn.com/labs/mind/AAISxPN.html,"[{""Label"": ""Drug Enforcement Administration"", ...",[]
4,N93187,news,newsworld,The Cost of Trump's Aid Freeze in the Trenches...,Lt. Ivan Molchanets peeked over a parapet of s...,https://assets.msn.com/labs/mind/AAJgNsz.html,[],"[{""Label"": ""Ukraine"", ""Type"": ""G"", ""WikidataId..."


Each row in this data file represents a news article and its attributes. The columns of this data file are:

[News ID] [Category] [Subcategory] [News Title] [News Abstract] [News Url] [Entities in News Title] [Entities in News Abstract]

**Column 0**: News ID (string)<br>
This is the ID of the news article<br>
e.g. N89 , N395 , N60005, N3965770
        
**Column 1**: Category (string)<br>
Category of the news. There are 18 categories<br>
e.g. sports , health , news ... etc
        
**Column 2**: SubCategory (string)<br>
Sub-category of the news. There are 242 unique sub-categories.<br>
e.g. golf, newsscienceandtechnology, medical, newsworld ... etc
            
**Column 3**: Title (string)<br>
Title of the news article<br>
e.g. PGA Tour winners, 50 Worst Habits For Belly Fats ... etc
        
**Column 4**: Abstract (string)<br>
Abstract of the news article<br>
e.g. A gallery of recent winners on the PGA Tour, These seemingly harmless habits are holding
          
**Column 5**: URL (string)<br>
URL to the MSN site where the news article was published.<br>
e.g. https://www.msn.com/en-us/sports/golf/pga-tour-winners/ss-AAjnQjj?ocid=chopendata
        
**Column 6**: Title Entities (string)<br>
Entities present in the title
        
**Column 7**: Abstract Entites (string)<br>
Entites present in the abstract

Let's reload the data with their respective column names.

In [12]:
news_columns = ['did', 'cat', 'sub_cat', 'title', 'abstract', 'url', 'title_entities', 'abstract_entities']

In [13]:
news_train = cudf.read_csv(os.path.join(data_path_train , 'news.tsv'), 
                          header=None, 
                          names=news_columns,
                    sep='\t',)
news_train.head()

Unnamed: 0,did,cat,sub_cat,title,abstract,url,title_entities,abstract_entities
0,N88753,lifestyle,lifestyleroyals,"The Brands Queen Elizabeth, Prince Charles, an...","Shop the notebooks, jackets, and more that the...",https://assets.msn.com/labs/mind/AAGH0ET.html,"[{""Label"": ""Prince Philip, Duke of Edinburgh"",...",[]
1,N45436,news,newsscienceandtechnology,Walmart Slashes Prices on Last-Generation iPads,Apple's new iPad releases bring big deals on l...,https://assets.msn.com/labs/mind/AABmf2I.html,"[{""Label"": ""IPad"", ""Type"": ""J"", ""WikidataId"": ...","[{""Label"": ""IPad"", ""Type"": ""J"", ""WikidataId"": ..."
2,N23144,health,weightloss,50 Worst Habits For Belly Fat,These seemingly harmless habits are holding yo...,https://assets.msn.com/labs/mind/AAB19MK.html,"[{""Label"": ""Adipose tissue"", ""Type"": ""C"", ""Wik...","[{""Label"": ""Adipose tissue"", ""Type"": ""C"", ""Wik..."
3,N86255,health,medical,Dispose of unwanted prescription drugs during ...,,https://assets.msn.com/labs/mind/AAISxPN.html,"[{""Label"": ""Drug Enforcement Administration"", ...",[]
4,N93187,news,newsworld,The Cost of Trump's Aid Freeze in the Trenches...,Lt. Ivan Molchanets peeked over a parapet of s...,https://assets.msn.com/labs/mind/AAJgNsz.html,[],"[{""Label"": ""Ukraine"", ""Type"": ""G"", ""WikidataId..."


In [14]:
news_valid = cudf.read_csv(os.path.join(data_path_valid , 'news.tsv'), 
                          header=None, 
                          names=news_columns,
                    sep='\t',)
news_valid.head()

Unnamed: 0,did,cat,sub_cat,title,abstract,url,title_entities,abstract_entities
0,N88753,lifestyle,lifestyleroyals,"The Brands Queen Elizabeth, Prince Charles, an...","Shop the notebooks, jackets, and more that the...",https://assets.msn.com/labs/mind/AAGH0ET.html,"[{""Label"": ""Prince Philip, Duke of Edinburgh"",...",[]
1,N23144,health,weightloss,50 Worst Habits For Belly Fat,These seemingly harmless habits are holding yo...,https://assets.msn.com/labs/mind/AAB19MK.html,"[{""Label"": ""Adipose tissue"", ""Type"": ""C"", ""Wik...","[{""Label"": ""Adipose tissue"", ""Type"": ""C"", ""Wik..."
2,N86255,health,medical,Dispose of unwanted prescription drugs during ...,,https://assets.msn.com/labs/mind/AAISxPN.html,"[{""Label"": ""Drug Enforcement Administration"", ...",[]
3,N93187,news,newsworld,The Cost of Trump's Aid Freeze in the Trenches...,Lt. Ivan Molchanets peeked over a parapet of s...,https://assets.msn.com/labs/mind/AAJgNsz.html,[],"[{""Label"": ""Ukraine"", ""Type"": ""G"", ""WikidataId..."
4,N75236,health,voices,I Was An NBA Wife. Here's How It Affected My M...,"I felt like I was a fraud, and being an NBA wi...",https://assets.msn.com/labs/mind/AACk2N6.html,[],"[{""Label"": ""National Basketball Association"", ..."


## Step 4 : Initial pre-processing to prepare dataset for feature engineering

Before we use the data in NVTabular for pre-processing and feature engineering, we have to make a few changes to make it efficient for GPU operations.<br>
The changes we have to make in the behaviours data file are:
   - The history column is a long string and not a list. NVTabular support multi-hot categorical features but HugeCTR parquet reader does not. Thus we need to extend the dataframe with multiple history columns, capturing each element in this long string. While extending the history columns, we have to make sure we pick the most recent history (in reverse chronological order).


  - The impression column contains a long string of unique negative and positive values for the same impression event. Each of these unique values in this column is a data point for our model to learn from. Thus, these unique positive & negative entries should be unrolled into multiple rows. The row expansion operation is not supported in NVtabular and hence we're going to perform it with cuDF.

As for the news data file, we would just be using the news id, category and sub-category columns.<br>
Their are many ways to use the other columns (title, abstract, entities etc.) as features but we would leave it up to you to explore those.

In a nutshell, we are going to take the raw downloaded dataset, do these basic pre-processing using cuDF, generate a new train dataset which will then be used for further processing.

### Pre-process 1: Drop columns from the news dataset

The columns that we would drop from the news.tsv are: 'title', 'abstract', 'url', 'title_entities', 'abstract_entities'

We encourage you to explore using 'title_entities' and 'abstract_entities' as categorical features.

In [15]:
news_train = news_train.drop(['title', 'abstract', 'url', 'title_entities', 'abstract_entities'],axis = 1)
news_valid = news_valid.drop(['title', 'abstract', 'url', 'title_entities', 'abstract_entities'],axis = 1)

# Merging news train/valid dataset to have a single view of news and their attributes
news = cudf.concat([news_train,news_valid]).drop_duplicates().reset_index().drop(['index'],axis=1)

# Freeing up memory by nulling the variables
news_train = None
news_valid = None

news.head()

Unnamed: 0,did,cat,sub_cat
0,N1,sports,football_nfl
1,N100,finance,markets
2,N1000,weather,weathertopstories
3,N10000,entertainment,celebrity
4,N100000,sports,football_nfl


###  Pre-process 2:  Label encoding for categorical variables

Strings require significant amount of memory as compared to integers. As an example, representing the string `cfcd208495d565ef66e7dff9f98764` as integer `0` can save upto 90% memory.

Thus, we would be label encoding the categorical variables in our dataset so that the downstream pre-preprocessing and feature engineering pipelines doesn't consume a high amount of memory.<br>
We will also label encode low cardinality columns in news.tsv like the news_categories and news_subcategories.

In [16]:
# Encoding user id from both train and validation dataframe
user_index = {} 

temp = cudf.concat([behaviors_train['uid'],behaviors_valid['uid']]).unique().to_pandas() 
for i in tqdm(range(len(temp)),total = len(temp)):
    user_index[temp[i]] = i + 1    

100%|██████████| 750434/750434 [00:03<00:00, 207351.53it/s]


In [17]:
# Replacing uid in the dataset with their respective indexes

behaviors_train['uid'] = behaviors_train['uid'].replace([i for i in user_index],[str(user_index[i]) for i in user_index]).astype('int')
behaviors_valid['uid'] = behaviors_valid['uid'].replace([i for i in user_index],[str(user_index[i]) for i in user_index]).astype('int')

In [18]:
# Encoding news id from the combined news dataframe
news_index = {}

for n,data in tqdm(news.to_pandas().iterrows(),total = len(news)):
    news_index[data['did']] = n + 1

100%|██████████| 104151/104151 [00:09<00:00, 10495.89it/s]


In [19]:
# Encoding new's category and subcategories
cat = {}
subcat = {}

temp = news['cat'].unique()
for i in tqdm(range(len(temp)),total = len(temp)):
    cat[temp[i]] = i + 1
    
temp = news['sub_cat'].unique()
for i in tqdm(range(len(temp)),total = len(temp)):
    subcat[temp[i]] = i + 1

# Replacing did, cat and sub_cate with their respective indexes in the news dataframe
news = news.replace({'did': [i for i in news_index], 'cat': [i for i in cat], 'sub_cat': [i for i in subcat]},{'did': [str(news_index[i]) for i in news_index], 'cat': [str(cat[i]) for i in cat], 'sub_cat': [str(subcat[i]) for i in subcat]}).astype('int')
news = news.set_index('did').to_pandas().T.to_dict()

100%|██████████| 18/18 [00:00<00:00, 54.26it/s]
100%|██████████| 285/285 [00:00<00:00, 357.35it/s]


We will replace the news id with their corresponding news_index in the behaviours dataframe in the pre-process step 3.

###  Pre-process 3: Unroll items in history column

As an example, consider the below row in behaviours dataframe

|impression_id | uid | time | history | impressions |
| :-: | :-: | :-: |:-: |:-: |
| 1 | U64099 | 11/19/2019 11:37:45 AM |	N121133 N104200 N43255 N55860 N128965 N38014 | N78206-0 N26368-1 N7578-1 N58592-0 N19858-0 |

We have to convert one history column with many news id to multiple history columns with single news id. 

| hist_0 | hist_1 | hist_2 | hist_3 | hist_4 | hist_5 |
| :-: | :-: | :-: | :-: | :-: | :-: |
|	N121133 | N104200 | N43255 | N55860 | N128965 | N38014 |

Finally, we will add the news category and subcategory for these news ids. The row after these transformations would look like this:

|impression_id | uid | time | hist_cat_0 | hist_cat_1 | hist_cat_2 | ... | hist_subcat_3 | hist_subcat_4 | hist_subcat_5 | impressions |
| :-: | :-: | :-: |:-: | :-: | :-: | :-: | :-: | :-: | :-: | :-: |
| 1 | U64099 | 11/19/2019 11:37:45 AM |	sports | finance | entertainment | ... | markets | celebrity | football_nfl | N78206-0 N26368-1 N7578-1 N58592-0 N19858-0 |

We observed that the maximum number of items in the history column is 400 & the mean is 30.<br>
We have to unroll the same number of history items for each row and thus would define a variable that will control this number. Feel free to increase this number to include more items.

For this tutorial, `max_hist` i.e. the number of history columns to be unrolled is set to 10. 

In [20]:
max_hist = 10

Lets expand the history column into individual columns of histories with the limit `max_hist`. 
During expansion, we will use the last `max_hist` items from history column as those items would be the most recent ones (since the news id in this column is ordered by time).

In addition, we're also saving the length of history in a seperate column which could be used as a feature too.

We will also replace the news id with their news indexes in the behaviours dataframe.

In [21]:
# Making a new gdf for storing history
hist = cudf.DataFrame() 

# Splitting the long string of history into several columns
hist[['hist_'+str(i) for i in range(max_hist)]] = behaviors_train.history.str.rsplit(n=max_hist,expand=True).fillna(0)[[i for i in range(1,max_hist+1)]]

# Replacing string news id in history with respective indexes
hist = hist.replace([i for i in news_index],[str(news_index[i]) for i in news_index]).astype('int')

# Appending news category corresponding to these newly created history columns
behaviors_train[['hist_cat_'+str(i) for i in range(max_hist)]] = hist.replace([int(i) for i in news],[int(news[i]['cat']) for i in news])

# Appending news sub-category corresponding to these newly created history columns
behaviors_train[['hist_subcat_'+str(i) for i in range(max_hist)]] = hist.replace([int(i) for i in news],[int(news[i]['sub_cat']) for i in news])

# Creating a column for the length of history 
behaviors_train['hist_count'] = behaviors_train.history.str.count(" ")+1

# Dropping the long string history column
behaviors_train = behaviors_train.drop(['history'],axis=1)

In [22]:
# Repeating the same for validation set
hist = cudf.DataFrame()

hist[['hist_'+str(i) for i in range(max_hist)]] = behaviors_valid.history.str.rsplit(n=max_hist,expand=True).fillna(0)[[i for i in range(1,max_hist+1)]]

hist = hist.replace([i for i in news_index],[str(news_index[i]) for i in news_index]).astype('int')

behaviors_valid[['hist_cat_'+str(i) for i in range(max_hist)]] = hist.replace([int(i) for i in news],[int(news[i]['cat']) for i in news])

behaviors_valid[['hist_subcat_'+str(i) for i in range(max_hist)]] = hist.replace([int(i) for i in news],[int(news[i]['sub_cat']) for i in news])

behaviors_valid['hist_count'] = behaviors_valid.history.str.count(" ")+1

behaviors_valid = behaviors_valid.drop(['history'],axis=1)

### Pre-process 4 : Unroll items in impression column

As an example, consider the below expanded history column row from the behaviours dataframe:

|impression_id | uid | time | hist_cat_0 | hist_cat_1 | hist_cat_2 | ... | hist_subcat_3 | hist_subcat_4 | hist_subcat_5 | impressions |
| :-: | :-: | :-: |:-: | :-: | :-: | :-: | :-: | :-: | :-: | :-: |
| 1 | U64099 | 11/19/2019 11:37:45 AM |	sports | finance | entertainment | ... | markets | celebrity | football_nfl | N78206-0 N26368-1 N7578-1 N58592-0 N19858-0 |

The impression column contains the positive and negetive samples as a long string.

After unrolling one row of impressions into multiple rows, the resulting dataframe would look like this:

|impression_id | uid | time | hist_cat_0 | hist_cat_1 | hist_cat_2 | ... | hist_subcat_3 | hist_subcat_4 | hist_subcat_5 | impressions | label |
| :-: | :-: | :-: |:-: | :-: | :-: | :-: | :-: | :-: | :-: | :-: | :-: |
| 1 | U64099 | 11/19/2019 11:37:45 AM |	sports | finance | entertainment | ... | markets | celebrity | football_nfl | N78206 | 0 |
| 1 | U64099 | 11/19/2019 11:37:45 AM |	sports | finance | entertainment | ... | markets | celebrity | football_nfl | N26368 | 1 |
| 1 | U64099 | 11/19/2019 11:37:45 AM |	sports | finance | entertainment | ... | markets | celebrity | football_nfl | N7578 | 1 |
| 1 | U64099 | 11/19/2019 11:37:45 AM |	sports | finance | entertainment | ... | markets | celebrity | football_nfl | N58592 | 0 |
| 1 | U64099 | 11/19/2019 11:37:45 AM |	sports | finance | entertainment | ... | markets | celebrity | football_nfl | N19858 | 0 |

Note that all the 5 generated rows have the same impression_id, uid, time and history data columns.

We have observed that the maximum number of items in impression column is 105 and the mean is 40. <br>
We will limit the items to unroll from impression column by defining the variable `max_impr` and set it to 100. Feel free to increase or decrease this value.

**Note** - Make sure you're using a GPU with atleast 16GB memory to avoid OOM errors with the below set values.

In [23]:
max_impr = 100

Row expansion is a memory and I/O intensive operation thus, we will perform it in 2 steps. We will first create a dictionary with impression-label and later merge it with the train set.

Let's convert impression column as dictionary of list with impression id as key and the impression items as value.

In [24]:
# For train dataset
impr_train = behaviors_train.set_index('impression_id').impressions.to_pandas().str.split()
impr_train = impr_train.to_dict()
behaviors_train = behaviors_train.drop(['impressions'],axis=1)

# For validation dataset
impr_valid = behaviors_valid.set_index('impression_id').impressions.to_pandas().str.split()
impr_valid = impr_valid.to_dict()
behaviors_valid = behaviors_valid.drop(['impressions'],axis=1)

Since the number of negative samples (labelled with 0) are greater than the positive samples, we can define a ratio between the negatives and positives to sampling a balanced distribution. <br>
For now, let's set this variable to -1 to include all the samples from the impression column. Feel free to set this variable to a value greater than 1 to downsample the negative samples.

In [25]:
np_ratio = -1 # ratio of neg-to-pos samples

Iterating over the above dictionary to create a new dataframe with individual impression news in a new row with its corresponding label.<br>
This is a time consuming operation!

In [26]:
# For train set

imp_id = []
imp_list = []
imp_label = []
for i in tqdm(impr_train,total = len(impr_train)):
    imp, label = np.transpose([[news_index[imp.split('-')[0]],imp.split('-')[1]] for imp in impr_train[i]])
    pos = (label == '1').sum()
    neg = 0
    for j in range(min(len(imp),max_impr)):
        if label[j] == '0' and np_ratio > -1:
            if neg <= pos*np_ratio :
                imp_id.append(i)
                imp_list.append(imp[j])
                imp_label.append(label[j])
                neg+=1
        else:
            imp_id.append(i)
            imp_list.append(imp[j])
            imp_label.append(label[j])

impr_train = None 

# Creating a new gdf with impression id, news id and its label
impressions_train = cudf.DataFrame({'imp_id': imp_id,'impr': imp_list,'label': imp_label})

# Appending news category corresponding to above impression news in the above created DataFrame
impressions_train['impr_cat'] = impressions_train['impr'].replace([int(i) for i in news],[int(news[i]['cat']) for i in news])

# Appending news sub-category corresponding to above impression news in above created DataFrame
impressions_train['impr_subcat'] = impressions_train['impr'].replace([int(i) for i in news],[int(news[i]['sub_cat']) for i in news])

# Droping impr columns as news data is added for it.
impressions_train = impressions_train.drop(['impr'],axis=1)

impressions_train.head()

100%|██████████| 2232748/2232748 [07:20<00:00, 5064.69it/s]


Unnamed: 0,imp_id,label,impr_cat,impr_subcat
0,1,0,84769,84769
1,1,0,38819,38819
2,1,0,82639,82639
3,1,0,67404,67404
4,1,0,32985,32985


In [27]:
# For validation set

imp_id = []
imp_list = []
imp_label = []
for i in tqdm(impr_valid,total = len(impr_valid)):
    imp, label = np.transpose([[news_index[imp.split('-')[0]],imp.split('-')[1]] for imp in impr_valid[i]])
    pos = (label == '1').sum()
    neg = 0
    for j in range(min(len(imp),max_impr)):
        if label[j] == '0' and np_ratio > -1:
            if neg <= pos*np_ratio :
                imp_id.append(i)
                imp_list.append(imp[j])
                imp_label.append(label[j])
                neg+=1
        else:
            imp_id.append(i)
            imp_list.append(imp[j])
            imp_label.append(label[j])

impr_valid = None 

impressions_valid = cudf.DataFrame({'imp_id': imp_id,'impr': imp_list,'label': imp_label})
impressions_valid['impr_cat'] = impressions_valid['impr'].replace([int(i) for i in news],[int(news[i]['cat']) for i in news])
impressions_valid['impr_subcat'] = impressions_valid['impr'].replace([int(i) for i in news],[int(news[i]['sub_cat']) for i in news])
impressions_valid = impressions_valid.drop(['impr'],axis=1)

impressions_valid.head()

100%|██████████| 376471/376471 [01:14<00:00, 5078.98it/s]


Unnamed: 0,imp_id,label,impr_cat,impr_subcat
0,1,0,96774,96774
1,1,0,42250,42250
2,1,0,63649,63649
3,1,0,15784,15784
4,1,0,31476,31476


### Pre-process 5: Merge behaviour and news datasets 

Collating all the required columns from both behaviours and news dataset would make the feature engineering process much more faster. 

We will merge the history columns (from behaviors dataframe) with the above created impression data and save it as a parquet file. <br>
We will also re-initialize RMM to allow us to perform memory intensive merge operation.

In [29]:
# For training set
rmm.reinitialize(managed_memory=True)

final_data = impressions_train.merge(behaviors_train,left_on = ['imp_id'],right_on = ['impression_id']).drop(['imp_id'],axis=1)
final_data = cudf.concat([final_data.drop(['time'],axis=1).astype('int'),final_data['time']],axis=1)
final_data.to_parquet(os.path.join(data_input_path, 'train.parquet'), compression = None)

#client.run(_rmm_pool)

In [30]:
# For validation set
rmm.reinitialize(managed_memory=True)

final_data = impressions_valid.merge(behaviors_valid,left_on = ['imp_id'],right_on = ['impression_id']).drop(['imp_id'],axis=1)
final_data = cudf.concat([final_data.drop(['time'],axis=1).astype('int'),final_data['time']],axis=1)
final_data.to_parquet(os.path.join(data_input_path, 'valid.parquet'),compression = None)

Finally, we have our initial pre-processed data - **train.parquet** and **valid.parquet** - that would be used for feature engineering and further processing. 

## Step 5: Feature Engineering - time-based features

To get started with NVTabular, we'll first use it for creating simple time based features that would be extracted from the timestamp column in the behaviours data. <br>

In [31]:
# Declaring features of train set that we created

cat_features = [
 'hist_cat_0',
 'hist_subcat_0',
 'hist_cat_1',
 'hist_subcat_1',
 'hist_cat_2',
 'hist_subcat_2',
 'hist_cat_3',
 'hist_subcat_3',
 'hist_cat_4',
 'hist_subcat_4',
 'hist_cat_5',
 'hist_subcat_5',
 'hist_cat_6',
 'hist_subcat_6',
 'hist_cat_7',
 'hist_subcat_7',
 'hist_cat_8',
 'hist_subcat_8',
 'hist_cat_9',
 'hist_subcat_9',
 'impr_cat',
 'impr_subcat',
 'impression_id',
 'uid']

cont_features = ['hist_count']

labels = ['label']

In [32]:
# Creating time based features by extracting the relevant elements using cuDF

datetime = nvt.ColumnGroup(['time']) >> (lambda col: cudf.to_datetime(col,format="%m/%d/%Y %I:%M:%S %p"))

hour = datetime >> (lambda col: col.dt.hour) >> nvt.ops.Rename(postfix = '_hour')
minute = datetime >> (lambda col: col.dt.minute) >> nvt.ops.Rename(postfix = '_minute')
seconds = datetime >> (lambda col: col.dt.second) >> nvt.ops.Rename(postfix = '_second')

weekday = datetime >> (lambda col: col.dt.weekday) >> nvt.ops.Rename(postfix = '_wd')
day = datetime >> (lambda col: cudf.to_datetime(col, unit='s').dt.day) >> nvt.ops.Rename(postfix = '_day')

week = day >> (lambda col: (col/7).floor().astype('int')) >> nvt.ops.Rename(postfix = '_week')

To create embedding tables and segregate the pre-processing functions (normalization, fill missing values etc.) among categorical and continous features, we define and pipeline them using NVTabular's operator overloading.

In [33]:
cat_features = cat_features + hour + minute + seconds + weekday + day + week + datetime >> nvt.ops.Categorify(out_path = data_output_path)
cont_features = cont_features >> nvt.ops.FillMissing() >> nvt.ops.NormalizeMinMax()
labels = ['label']

We can visualize the complete workflow pipeline.

In [34]:
output = cat_features + cont_features
output.graph

ExecutableNotFound: failed to execute ['dot', '-Kdot', '-Tsvg'], make sure the Graphviz executables are on your systems' PATH

<graphviz.dot.Digraph at 0x7f3f64e80910>

To run this graph, we would create a workflow object that calculates statistics and performs the relevant transformations.

In [35]:
proc = nvt.Workflow(cat_features + cont_features + labels[0])

In [36]:
# Initialize a nvt.Dataset from parquet file that was created in step 4.

data_train = nvt.Dataset(os.path.join(data_input_path, "train.parquet"), engine="parquet",part_size="256MB")
data_valid = nvt.Dataset(os.path.join(data_input_path, "valid.parquet"), engine="parquet",part_size="256MB")

Since we are going to train the DNNs using HugeCTR, we need to conform to the following dtypes:

- categorical feature columns in int64
- continuous feature columns in float32
- label columns in float32
    
We will make a dictionary containing names of columns as key and the required datatype as value. This dictionary will be used by NVTabular for type casting.

In [37]:
dict_dtypes={}

for col in cat_features.columns:
    dict_dtypes[col] = np.int64

for col in cont_features.columns:
    dict_dtypes[col] = np.float32

for col in labels:
    dict_dtypes[col] = np.float32

Let's fit the workflow on the training set to record the statistics.

In [38]:
%%time
proc.fit(data_train)

CPU times: user 1min 49s, sys: 23.6 s, total: 2min 13s
Wall time: 2min 13s


Next, we apply the transformation to the dataset and persist it to disk as parquet.

In [39]:
%%time

# For training set
proc.transform(data_train).to_parquet(output_path= output_train_path,
                                shuffle=nvt.io.Shuffle.PER_PARTITION,
                                dtypes=dict_dtypes,
                                out_files_per_proc=10,
                                cats = cat_features.columns,
                                conts = cont_features.columns,
                                labels = labels)

CPU times: user 1min 46s, sys: 50 s, total: 2min 36s
Wall time: 2min 34s


In [40]:
%%time

# For validation set
proc.transform(data_valid).to_parquet(output_path= output_valid_path,
                                shuffle=nvt.io.Shuffle.PER_PARTITION,
                                dtypes=dict_dtypes,
                                out_files_per_proc=10,
                                cats = cat_features.columns,
                                conts = cont_features.columns,
                                labels = labels)

CPU times: user 16.2 s, sys: 7.03 s, total: 23.2 s
Wall time: 22.8 s


Let's load the NVTabular processed parquet files and look at our first NVTabular pre-processed dataset.

In [41]:
df = dask_cudf.read_parquet(os.path.join(output_train_path, '*.parquet'))
df.head()

Unnamed: 0,time_hour,hist_cat_0,hist_subcat_0,hist_cat_1,hist_subcat_1,hist_cat_2,hist_subcat_2,hist_cat_3,hist_subcat_3,hist_cat_4,...,impression_id,uid,time_minute,time_second,time_wd,time_day,time_day_week,time,hist_count,label
0,10,8,108,3,56,9,108,10,144,17,...,2146,352684,6,9,3,5,1,67921,0.01623,0.0
1,13,10,143,12,159,14,87,12,157,4,...,3165,255108,2,5,1,3,1,31343,0.038702,0.0
2,3,4,63,4,184,15,211,4,70,12,...,2009,401055,52,16,2,4,1,45137,0.076155,0.0
3,3,12,158,12,173,16,222,12,171,8,...,1635,232239,46,48,4,1,1,0,0.11985,0.0
4,10,12,172,13,87,14,87,13,30,6,...,2636,122099,3,2,2,4,1,49783,0.014981,0.0


After transformation and persisting the data on the disk, the following files will be created:
   1. parquet
       - The number of parquet files depends on `out_files_per_proc` in `proc_train.transform()` 
   2. _file_list.txt
       - The 1st line contains the number of parquet files
       - The subsequent lines are the paths to each parquet file.
   3. _metadata.json
       - This file is used by HugeCTR in parsing the processed parquet files.
       - 'file_stats' contains the name of the parquet files and their corresponding number of rows.
       - 'cats' is a list of categorical features/columns in the dataset and their index.
       - 'conts' is a list of continous/dense columns in the dataset and their index.
       - 'labels' is a list of labels in the dataset and their index.
       - This file shouldn't be edited manually.

Let's look at the contents of _metadata.json

In [42]:
with open(os.path.join(output_train_path, '_metadata.json'),'r') as f:
    metadata = json.load(f)

metadata

{'file_stats': [{'file_name': '0.ee88d3947f75411ebecea519a8d84e1e.parquet',
   'num_rows': 7714011},
  {'file_name': '1.7b819cdbe9b74b8d8e8d70a82c5592bd.parquet',
   'num_rows': 7716568},
  {'file_name': '2.96ff5b25178442e1a0afe5a043bddb4e.parquet',
   'num_rows': 7719289},
  {'file_name': '3.239a792c3e4a452cbd670fe0dcf7f7d9.parquet',
   'num_rows': 7717958},
  {'file_name': '4.48ae392a00604b37b3c1a30396c6e7ed.parquet',
   'num_rows': 7713252},
  {'file_name': '5.540012b6dd0941e2a5717557fc389fbc.parquet',
   'num_rows': 7714906},
  {'file_name': '6.85b26a9252c44eae8276210f5dfffe74.parquet',
   'num_rows': 7719046},
  {'file_name': '7.b32bd793c054446fb135709f3e18ef72.parquet',
   'num_rows': 7723206},
  {'file_name': '8.6f58be3da99a49038adfbc79d4bfddb9.parquet',
   'num_rows': 7719582},
  {'file_name': '9.8749db3725574f7d8d710885d411b8e6.parquet',
   'num_rows': 7717609}],
 'cats': [{'col_name': 'time_hour', 'index': 0},
  {'col_name': 'hist_cat_0', 'index': 1},
  {'col_name': 'hist_sub

Next, we need to get the embedding size for the categorical variables. This will be an important input for defining the embedding table size to be used by HugeCTR.

In [43]:
from nvtabular.ops import get_embedding_sizes
embeddings_simple_time = get_embedding_sizes(proc)
embeddings_simple_time

{'hist_cat_0': (18, 16),
 'hist_cat_1': (18, 16),
 'hist_cat_2': (19, 16),
 'hist_cat_3': (18, 16),
 'hist_cat_4': (18, 16),
 'hist_cat_5': (18, 16),
 'hist_cat_6': (18, 16),
 'hist_cat_7': (18, 16),
 'hist_cat_8': (17, 16),
 'hist_cat_9': (17, 16),
 'hist_subcat_0': (235, 34),
 'hist_subcat_1': (239, 34),
 'hist_subcat_2': (236, 34),
 'hist_subcat_3': (235, 34),
 'hist_subcat_4': (229, 34),
 'hist_subcat_5': (224, 33),
 'hist_subcat_6': (225, 33),
 'hist_subcat_7': (219, 33),
 'hist_subcat_8': (213, 32),
 'hist_subcat_9': (199, 31),
 'impr_cat': (26708, 482),
 'impr_subcat': (26708, 482),
 'impression_id': (2232749, 512),
 'time': (90397, 512),
 'time_day': (7, 16),
 'time_day_week': (3, 16),
 'time_hour': (16, 16),
 'time_minute': (61, 16),
 'time_second': (61, 16),
 'time_wd': (6, 16),
 'uid': (711223, 512)}

In [44]:
# Reformatting the above output for ease of copy paste in HugeCTRs config.json

embedding_size_str_simple_time = [embeddings_simple_time[x][0] for x in cat_features.columns]
embedding_size_str_simple_time

[16,
 18,
 235,
 18,
 239,
 19,
 236,
 18,
 235,
 18,
 229,
 18,
 224,
 18,
 225,
 18,
 219,
 17,
 213,
 17,
 199,
 26708,
 26708,
 2232749,
 711223,
 61,
 61,
 6,
 7,
 3,
 90397]

We can also check the name of the categorical and continuous features that we've defined. This should match with the cats and conts dictionaries in the _metadata.json

In [45]:
cat_features.columns

['time_hour',
 'hist_cat_0',
 'hist_subcat_0',
 'hist_cat_1',
 'hist_subcat_1',
 'hist_cat_2',
 'hist_subcat_2',
 'hist_cat_3',
 'hist_subcat_3',
 'hist_cat_4',
 'hist_subcat_4',
 'hist_cat_5',
 'hist_subcat_5',
 'hist_cat_6',
 'hist_subcat_6',
 'hist_cat_7',
 'hist_subcat_7',
 'hist_cat_8',
 'hist_subcat_8',
 'hist_cat_9',
 'hist_subcat_9',
 'impr_cat',
 'impr_subcat',
 'impression_id',
 'uid',
 'time_minute',
 'time_second',
 'time_wd',
 'time_day',
 'time_day_week',
 'time']

In [46]:
cont_features.columns

['hist_count']

Before moving on to training a DNN, let's try few complex feature engineering techniques using NVTabular. We would later train DNNs on both these feature engineered dataset and compare their performances.

## Step 6: Feature Engineering - count and target encoding

We will now perform count and target encoding on the processed dataset generated in step 4. Let's start by defining directories for the input dataset and the output processed dataset. 

In [None]:
# Define our worker/output directories
dask_workdir = os.path.join(BASE_DIR, "workdir")

# Mapping our processed_nvt output directories as input directories for new workflow.
data_input_path = os.path.join(BASE_DIR, "dataset")

# Defining new directories for output
data_output_path = os.path.join(BASE_DIR, "processed_ce-te")
output_train_path = os.path.join(data_output_path, "train")
output_valid_path = os.path.join(data_output_path, "valid")

# Creating and cleaning our worker/output directories
try:
    # Ensure BASE_DIR exists
    if not os.path.isdir(BASE_DIR):
        os.mkdir(BASE_DIR)
        
    # Make sure we have a clean worker space for Dask
    if os.path.isdir(dask_workdir):
        shutil.rmtree(dask_workdir)
    os.mkdir(dask_workdir)

    # Make sure we have a clean output path for our new dataset
    if os.path.isdir(data_output_path):
        shutil.rmtree(data_output_path)
        
    os.mkdir(data_output_path)
    os.mkdir(output_train_path)
    os.mkdir(output_valid_path)

except OSError:
    print ("Creation of the directories failed")
else:
    print ("Successfully created the directories")

As you would observe, we have created a new directory by the name `processed_ce-te`. The complete directory structure now is:

basedir <br>
&emsp; |--- workdir    
&emsp; |--- dataset <br>
&emsp; &emsp; |--- train <br>
&emsp; &emsp; |--- valid  <br>
&emsp; |--- processed_nvt <br>
&emsp;  &emsp; |--- train <br>
&emsp;  &emsp; |--- valid  <br>
&emsp; |--- processed_ce-te <br>
&emsp;  &emsp; |--- train <br>
&emsp;  &emsp; |--- valid  <br>
&emsp; |--- configs <br>
&emsp; |--- weights <br>    

Again, defining the categorical and continous features based on processed data generated in step-4.

In [8]:
cat_features = ['hist_cat_0',
 'hist_subcat_0',
 'hist_cat_1',
 'hist_subcat_1',
 'hist_cat_2',
 'hist_subcat_2',
 'hist_cat_3',
 'hist_subcat_3',
 'hist_cat_4',
 'hist_subcat_4',
 'hist_cat_5',
 'hist_subcat_5',
 'hist_cat_6',
 'hist_subcat_6',
 'hist_cat_7',
 'hist_subcat_7',
 'hist_cat_8',
 'hist_subcat_8',
 'hist_cat_9',
 'hist_subcat_9',
 'impr_cat',
 'impr_subcat',
 'impression_id',
 'uid',]

cont_features = ['hist_count']

labels = ['label']

**Count Encoding** calculates the frequency of one or more categorical features. For the purpose of this tutorial, we will count how often the user had clicked on news with the same category/sub-category in a given impression.

To calculate the occurence of the same news category/sub-category in history, we will iterate over the group of rows with the same impression id. We will also consider the category/sub-category of the impression news.<br>
Let's start by defining supportive functions for counting the category and subcategory from history columns. This supportive function will be used by `apply_rows()` in LambdaOp `create_count_features`

We can also limit the number of history columns to be considered for count encoding. For now, let's use all the history columns that we have in the dataset i.e. all 10.

In [9]:
max_hist = 10

In [10]:
def add_cat_count(
         hist_cat_0,
         hist_cat_1,
         hist_cat_2,
         hist_cat_3,
         hist_cat_4,
         hist_cat_5,
         hist_cat_6,
         hist_cat_7,
         hist_cat_8,
         hist_cat_9,
         impr_cat,
         impr_cat_count,
         k):
    
    # Following loop iterates over each row of columns hist_cat_0->9 and impr_cat
    for i, temp in enumerate(zip(hist_cat_0,
                                 hist_cat_1,
                                 hist_cat_2,
                                 hist_cat_3,
                                 hist_cat_4,
                                 hist_cat_5,
                                 hist_cat_6,
                                 hist_cat_7,
                                 hist_cat_8,
                                 hist_cat_9,
                                 impr_cat,
                                )):
        
        # Iterate over each column and check if history category matches with impression category.
        for j in temp[:-1]:
            if j == temp[-1]:
                k += 1
        
        # Update the count in the corresponding row of output column (impr_cat_count)
        impr_cat_count[i] = k

In [11]:
def add_subcat_count(
         hist_subcat_0,
         hist_subcat_1,
         hist_subcat_2,
         hist_subcat_3,
         hist_subcat_4,
         hist_subcat_5,
         hist_subcat_6,
         hist_subcat_7,
         hist_subcat_8,
         hist_subcat_9,
         impr_subcat,
         impr_subcat_count,
         k):
    
    # Following loop iterates over each row of columns hist_subcat_0->9 and impr_cat
    for i, temp in enumerate(zip(
                                 hist_subcat_0,
                                 hist_subcat_1,
                                 hist_subcat_2,
                                 hist_subcat_3,
                                 hist_subcat_4,
                                 hist_subcat_5,
                                 hist_subcat_6,
                                 hist_subcat_7,
                                 hist_subcat_8,
                                 hist_subcat_9,
                                 impr_subcat,
                                )):

        # Iterate over each column and check if history sub-category matches with impression sub-category.
        for j in temp[:-1]:
            if j == temp[-1]:
                k += 1      
                
        # Update the count(occurence) in corresponding row of output column (impr_cat_count)        
        impr_subcat_count[i] = k

To add the count encoding for 'categories' and 'sub_categories' to each row for their corresponding news_id, we will write a LambdaOp by simply inhereting from NVTabular's `Operator` class and defining the `transform` and `output_column_names` methods.

In [12]:
class create_count_features(Operator):
    def transform(self, columns, gdf):
        if columns[-1] == 'impr_cat':
            gdf = gdf.apply_rows(add_cat_count,incols = ['hist_cat_{}'.format(i) for i in range(max_hist)]+['impr_cat'],outcols = {'impr_cat_count': np.int64},kwargs={'k': 0})
            return(gdf.drop(columns,axis=1))
        if columns[-1] == 'impr_subcat':
            gdf = gdf.apply_rows(add_subcat_count,incols = ['hist_subcat_{}'.format(i) for i in range(max_hist)]+['impr_subcat'],outcols = {'impr_subcat_count': np.int64},kwargs={'k': 0})
            return(gdf.drop(columns,axis=1))

    def output_column_names(self, columns):
        col = []
        if columns[-1] == 'impr_cat':
            col.append('impr_cat_count')
        if columns[-1] == 'impr_subcat':
            col.append('impr_subcat_count')
        return col

    def dependencies(self):
        return None

**Target encoding** is used to average the target value by some category/group. This technique is used to find numeric mean relationship between the categorical features and target.

We have observed that the hist_cat columns are the most suitable for target encoding. Rather than using just 1 history category column, we also found that a group of history columns encode better probabilities with the target variable. 

For this tutorial, we are going use 5 history category columns, in a moving window fashion, along with the impression category column to calculate the target encoding.

In [13]:
te_columns = [['hist_cat_'+str(j) for j in range(i-5+1, i+1)] + ['impr_cat'] for i in range(4, max_hist)]

In [14]:
target_encode = (
    te_columns >>
    nvt.ops.TargetEncoding(
        ['label'],
        out_path = BASE_DIR,
        kfold=5,
        p_smooth=20,
        out_dtype="float32",
    )
)

We'll also create the time based features in the same way as we did in step-5.

In [15]:
datetime = nvt.ColumnGroup(['time']) >> (lambda col: cudf.to_datetime(col,format="%m/%d/%Y %I:%M:%S %p"))

hour = datetime >> (lambda col: col.dt.hour) >> nvt.ops.Rename(postfix = '_hour')
minute = datetime >> (lambda col: col.dt.minute) >> nvt.ops.Rename(postfix = '_minute')
seconds = datetime >> (lambda col: col.dt.second) >> nvt.ops.Rename(postfix = '_second')

weekday = datetime >> (lambda col: col.dt.weekday) >> nvt.ops.Rename(postfix = '_wd')
day = datetime >> (lambda col: cudf.to_datetime(col, unit='s').dt.day) >> nvt.ops.Rename(postfix = '_day')

week = day >> (lambda col: (col/7).floor().astype('int')) >> nvt.ops.Rename(postfix = '_week')

In [16]:
cat_count_encode = ['hist_cat_{}'.format(i) for i in range(max_hist)] + ['impr_cat'] >> create_count_features()

subcat_count_encode = ['hist_subcat_{}'.format(i) for i in range(max_hist)] + ['impr_subcat'] >> create_count_features()

In [17]:
cat_features = cat_features + datetime + hour + minute + seconds + weekday + day + week >> nvt.ops.Categorify(out_path = data_output_path)
cont_features = cont_features + cat_count_encode + subcat_count_encode >> nvt.ops.FillMissing() >> nvt.ops.NormalizeMinMax()
cont_features += target_encode >> nvt.ops.Rename(postfix = '_TE')

We can visualize the complete workflow pipeline.

In [18]:
output = cat_features + cont_features
output.graph

ExecutableNotFound: failed to execute ['dot', '-Kdot', '-Tsvg'], make sure the Graphviz executables are on your systems' PATH

<graphviz.dot.Digraph at 0x7f3de4f240a0>

In [19]:
proc = nvt.Workflow(cat_features + cont_features + labels[0])

We initialize a nvt.Dataset object from parquet dataset that was created in step 5.

In [20]:
data_train = nvt.Dataset(os.path.join(data_input_path, "train.parquet"), engine="parquet",part_size="256MB")
data_valid = nvt.Dataset(os.path.join(data_input_path, "valid.parquet"), engine="parquet",part_size="256MB")

In [21]:
dict_dtypes={}

for col in cat_features.columns:
    dict_dtypes[col] = np.int64

for col in cont_features.columns:
    dict_dtypes[col] = np.float32

for col in labels:
    dict_dtypes[col] = np.float32

Let's fit the workflow on our training dataset to record the statistics.

In [22]:
%%time
proc.fit(data_train)

CPU times: user 4min 49s, sys: 1min 29s, total: 6min 19s
Wall time: 6min 6s


In [23]:
%%time

# For training set
proc.transform(data_train).to_parquet(output_path=output_train_path,
                                shuffle=nvt.io.Shuffle.PER_PARTITION,
                                dtypes=dict_dtypes,
                                out_files_per_proc=10,
                                cats = cat_features.columns,
                                conts = cont_features.columns,
                                labels = labels)

rmm.reinitialize(managed_memory=False)

CPU times: user 39min 18s, sys: 34min 31s, total: 1h 13min 49s
Wall time: 1h 12min 51s


In [24]:
%%time

# For validation set
proc.transform(data_valid).to_parquet(output_path=output_valid_path,
                                shuffle=nvt.io.Shuffle.PER_PARTITION,
                                dtypes=dict_dtypes,
                                out_files_per_proc=10,
                                cats = cat_features.columns,
                                conts = cont_features.columns,
                                labels = labels)

rmm.reinitialize(managed_memory=False)

CPU times: user 6min 35s, sys: 5min 45s, total: 12min 20s
Wall time: 12min 10s


Let's take a quick look at the contents of _metadata.json

In [25]:
with open(os.path.join(output_train_path, '_metadata.json'),'r') as f:
    metadata = json.load(f)

metadata

{'file_stats': [{'file_name': '0.515824acdb824a4dad10e1443a742f4f.parquet',
   'num_rows': 7717106},
  {'file_name': '1.90aa8f40137340a6815f0af4a41724b0.parquet',
   'num_rows': 7719406},
  {'file_name': '2.b81c1cd3a59a46f196b758c89e940763.parquet',
   'num_rows': 7717841},
  {'file_name': '3.e8ee8aaacd2548398c2346aed4b45d28.parquet',
   'num_rows': 7720591},
  {'file_name': '4.e37c20946d274693895b3fd6592051ab.parquet',
   'num_rows': 7715286},
  {'file_name': '5.0ed0ef8ce7754beaa876d7dc98316a9b.parquet',
   'num_rows': 7716759},
  {'file_name': '6.b8d293674b2e4d529e3967a4f212acbb.parquet',
   'num_rows': 7715976},
  {'file_name': '7.af52633b932d49d18d50dbeeb0fb749e.parquet',
   'num_rows': 7716664},
  {'file_name': '8.bf7424d0fe5148c193e49a2c32f065a2.parquet',
   'num_rows': 7724349},
  {'file_name': '9.68efef93eb024e6f9678b952fcef33d2.parquet',
   'num_rows': 7711449}],
 'cats': [{'col_name': 'time', 'index': 0},
  {'col_name': 'hist_cat_0', 'index': 1},
  {'col_name': 'hist_subcat_0

In [26]:
from nvtabular.ops import get_embedding_sizes
embeddings_count_encode =  get_embedding_sizes(proc)
embeddings_count_encode

{'hist_cat_0': (18, 16),
 'hist_cat_1': (18, 16),
 'hist_cat_2': (19, 16),
 'hist_cat_3': (18, 16),
 'hist_cat_4': (18, 16),
 'hist_cat_5': (18, 16),
 'hist_cat_6': (18, 16),
 'hist_cat_7': (18, 16),
 'hist_cat_8': (17, 16),
 'hist_cat_9': (17, 16),
 'hist_subcat_0': (235, 34),
 'hist_subcat_1': (239, 34),
 'hist_subcat_2': (236, 34),
 'hist_subcat_3': (235, 34),
 'hist_subcat_4': (229, 34),
 'hist_subcat_5': (224, 33),
 'hist_subcat_6': (225, 33),
 'hist_subcat_7': (219, 33),
 'hist_subcat_8': (213, 32),
 'hist_subcat_9': (199, 31),
 'impr_cat': (26708, 482),
 'impr_subcat': (26708, 482),
 'impression_id': (2232749, 512),
 'time': (90397, 512),
 'time_day': (7, 16),
 'time_day_week': (3, 16),
 'time_hour': (16, 16),
 'time_minute': (61, 16),
 'time_second': (61, 16),
 'time_wd': (6, 16),
 'uid': (711223, 512)}

In [27]:
# Reformatting the above output for ease of copy paste in HugeCTRs config.json

embedding_size_str_count_encode = [embeddings_count_encode[x][0] for x in cat_features.columns]
embedding_size_str_count_encode

[90397,
 18,
 235,
 18,
 239,
 19,
 236,
 18,
 235,
 18,
 229,
 18,
 224,
 18,
 225,
 18,
 219,
 17,
 213,
 17,
 199,
 26708,
 26708,
 2232749,
 711223,
 16,
 61,
 61,
 6,
 7,
 3]

Now that we have 2 versions of our dataset ready, one with time based features and other with count + target encoded features, we can start training a few DNNs using HugeCTR.

Next, we'll shutdown our Dask client from earlier to free up some memory so that we can share it with HugeCTR.

In [28]:
client.shutdown()
cluster.close()

## Step 7: Train DNN with HugeCTR

In this section, we would be training Deep Learning Recommendation Model (DLRM) using HugeCTR's high level python API. We would also be using the inference python API for evaluation on the validation set.

We would be training 2 models, one each for the 2 datasets that we've processed.

### Train configuration for simple time based features dataset

HugeCTR now offers a high-level, Keras-like python API suite for defining the model, layers, optimizer and executing training<br>
As a first step, we will develop a train python file for our feature engineered dataset and DLRM model.

In [29]:
# Define paths to save the config and the weights

train_file_path = os.path.join(config_output_path,'train_dlrm_fp32_simple-time_1gpu.py')
weights_output_path = os.path.join(weights_path,'dlrm_fp32_simple-time_1gpu/')

# Directory inside weights folder for saving weights of this training
if os.path.isdir(weights_output_path):
    shutil.rmtree(weights_output_path)
os.mkdir(weights_output_path)

For using the HugeCTR's high level python API, we will follow [this documentation](https://github.com/NVIDIA/HugeCTR/blob/master/docs/python_interface.md) which is available on the github repository, and apply it for our dataset and model.

The parameters that we should modify are:  

- solver:
        - max_iter: Num. of samples / batch size
        - gpu: List of GPU IDs to use for training
        - batchsize: Num. of samples to process in the batch training mode
        - eval_interval: Num. of iterations after which evaluation should trigger on the validation set


- optimizer:
        - type: Adam 
        - learning_rate: 1e-4 (smaller value to begin with)


- layers
        - format: Parquet (since our dataset is in parquet)
        - source and eval_source: Path to _file_list.txt for the train and eval dataset produced by NVTabular
        - slot_num: For LocalizedSlot, set it to the number of categorical features
        - max_feature_num_per_sample: For LocalizedSlot, this can be the same as slot_num
        - slot_size_array: Cardinality of the categorical features (in the same order as column names in 'cats' dictionary of _metadata.json)
        - embedding_vec_size: Dimension of the embedding vectors for the categorical features
        - label_dim: Labels dimension
        - dense_dim: Number of dense/continous features
        - sparse: Dimensions of categorical features
        - DLRM layer fc3: Output dimension of fc3 should be the same as embedding_vec_size
         
We've developed one such training python file below with the appropriate path to the dataset and default batch size for a 32GB GPU.<br>

Let's make use of the data path and other variables we've defined in the steps above and re-define the ones which may have changed throughout the pre-processing step.
The model graph can be saved into a JSON file by calling model.graph_to_json, which will be used for inference afterwards.

In [37]:
%%writefile $train_file_path

import os
import hugectr
from mpi4py import MPI

# Define "fast" root directory for this example
BASE_DIR = os.environ.get("BASE_DIR", "./basedir")

# Directory to load NVTabular's processed dataset
data_input_path = os.path.join(BASE_DIR, "processed_nvt")
input_train_path = os.path.join(data_input_path, "train")
input_valid_path = os.path.join(data_input_path, "valid")

# Directory to store HugeCTR's train weights
config_output_path = os.path.join(BASE_DIR, "configs")
weights_path = os.path.join(BASE_DIR, "weights")
weights_output_path = os.path.join(weights_path,'dlrm_fp32_simple-time_1gpu/')

# GPUs used for training
NUM_GPUS = [0]

# Model related parameter
embedding_vec_size = 4                                             
batchsize = 2048                                                   # Batch size used for training
batchsize_eval = 2048                                              # Batch size used for evaluation
max_eval_batchsize = 3768                                          # Iterations required to go through the complete validation set with the set batchsize_eval

# Training related parameters
num_iter = 30001                                                   # Iterations to train the model for
eval_trigger = 10000                                               # Start evaluation after these iterations
snapshot_trigger = 10000                                           # Save model checkpoints after these iterations

# Input for slot size array
embedding_size_str_simple_time = [16,18,235,18,239,19,236,18,235,18,229,18,224,18,225,18,219,17,213,17,199,26708,26708,2232749,
                                    711223,61,61,6,7,3,90397]

embeddings_simple_time ={'hist_cat_0': (18, 16),
 'hist_cat_1': (18, 16),
 'hist_cat_2': (19, 16),
 'hist_cat_3': (18, 16),
 'hist_cat_4': (18, 16),
 'hist_cat_5': (18, 16),
 'hist_cat_6': (18, 16),
 'hist_cat_7': (18, 16),
 'hist_cat_8': (17, 16),
 'hist_cat_9': (17, 16),
 'hist_subcat_0': (235, 34),
 'hist_subcat_1': (239, 34),
 'hist_subcat_2': (236, 34),
 'hist_subcat_3': (235, 34),
 'hist_subcat_4': (229, 34),
 'hist_subcat_5': (224, 33),
 'hist_subcat_6': (225, 33),
 'hist_subcat_7': (219, 33),
 'hist_subcat_8': (213, 32),
 'hist_subcat_9': (199, 31),
 'impr_cat': (26708, 482),
 'impr_subcat': (26708, 482),
 'impression_id': (2232749, 512),
 'time': (90397, 512),
 'time_day': (7, 16),
 'time_day_week': (3, 16),
 'time_hour': (16, 16),
 'time_minute': (61, 16),
 'time_second': (61, 16),
 'time_wd': (6, 16),
 'uid': (711223, 512)}

## Creating the model

solver = hugectr.CreateSolver(
                              vvgpu = [NUM_GPUS],                       # GPU Indices to be used for training
                              max_eval_batches = max_eval_batchsize,    # Max no. of eval batches on which eval will be done
                              batchsize_eval = batchsize_eval,          # Minibatch size for eval
                              batchsize = batchsize,                    # Minibatch size for training
                              
                              # learning rate parameters
                              lr = 0.001,
                              warmup_steps = 10000,
                              decay_start = 20000,
                              decay_steps = 200000,
                              decay_power = 1,
                              end_lr = 1e-06,
                              
                              # traning setting
                              i64_input_key = True,                     # As we are using Parquet from NVTabular, I64 should be true
                              repeat_dataset = True,                    # Repeat the dataset for training loop, True for Non Epoch Based Training
                              use_mixed_precision = False,              # Flag to indicate use of Mixed precision training
                              use_cuda_graph = False,                   # cuda graph for forward and back proppogation
                              use_algorithm_search = False,             # algo search within the fc-layers
                              )

# create datareader object
reader = hugectr.DataReaderParams(
    
    data_reader_type = hugectr.DataReaderType_t.Parquet,                # Dataset format selection: Parquet
    source = [input_train_path+"/_file_list.txt"],                     # path to file list of processed NVT Train Dataset, can be used to input various file lists
    eval_source = input_valid_path+"/_file_list.txt",                  # path to file list of processed NVT Valid Dataset
    check_type = hugectr.Check_t.Non,                                   # data error detection turned off
    slot_size_array = embedding_size_str_simple_time,                  # embedding slot array size list which is obtained from NVT operations
    
    )

# create optimiser
optimizer = hugectr.CreateOptimizer(
        optimizer_type = hugectr.Optimizer_t.Adam, # Select optimizer type : Adam
        update_type = hugectr.Update_t.Local,      # Select update type : Local
        epsilon = 1e-07,                           # Adam parameter
        beta1 = 0.9,                               # Adam parameter
        beta2 = 0.999,                             # Adam parameter
        atomic_update = False,                     # Atomic update for SGD, we are using adam,so set to false
)

# create a hugectr model object
model = hugectr.Model(solver, reader, optimizer)

# adding layers to the model

model.add(hugectr.Input(
                        # label parameters
                        label_name = "label",                                 # Name of the Label column in the dataset
                        label_dim = 1,                                        # label dimension, 1 for binary label based dataset. Can be customised according to users need
                        # continous feature parameters
                        dense_dim = 1,                                        # total number of dense (continous) features
                        dense_name = "dense",                                 # Name of the dense (continuous) features
                        # Sparse parameters for categorial inputs
                        data_reader_sparse_param_array = [hugectr.DataReaderSparseParam(
                            hugectr.DataReaderSparse_t.Localized,             # selecting localised or distributed reading of data
                            max_feature_num = len(embeddings_simple_time) ,   # total number of features in the dataset
                            max_nnz = 1,                                      # set 1 for one hot label dataset
                            slot_num = len(embeddings_simple_time),           # total number of slots used for this sparse input in the dataset
                        )],
                        sparse_names = ["data1"]                              #list of names of the sparse input tensors to be referenced by following layers
                        ))

# adding sparse layer
model.add(hugectr.SparseEmbedding(
                            embedding_type = hugectr.Embedding_t.LocalizedSlotSparseEmbeddingHash,
                            max_vocabulary_size_per_gpu = 10000000,           # maximum vocabulary size or cardinality across all the input features (can be calculated from embedding size list)
                            embedding_vec_size = embedding_vec_size,          # model parameter
                            combiner = 0,                                     # intra-slot reduction operation (0=sum, 1=average)
                            sparse_embedding_name = "sparse_embedding1",      #name of the sparse embedding tensor to be referenced by following layers
                            bottom_name = "data1",
                            optimizer = optimizer))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["dense"],
                            top_names = ["fc1"],
                            num_output=512))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc1"],
                            top_names = ["relu1"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu1"],
                            top_names = ["fc2"],
                            num_output=256))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc2"],
                            top_names = ["relu2"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu2"],
                            top_names = ["fc3"],
                            num_output=embedding_vec_size))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc3"],
                            top_names = ["relu3"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.Interaction,
                            bottom_names = ["relu3","sparse_embedding1"],
                            top_names = ["interaction1"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["interaction1"],
                            top_names = ["fc4"],
                            num_output=1024))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc4"],
                            top_names = ["relu4"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu4"],
                            top_names = ["fc5"],
                            num_output=1024))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc5"],
                            top_names = ["relu5"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu5"],
                            top_names = ["fc6"],
                            num_output=512))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc6"],
                            top_names = ["relu6"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu6"],
                            top_names = ["fc7"],
                            num_output=256))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc7"],
                            top_names = ["relu7"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu7"],
                            top_names = ["fc8"],
                            num_output=1))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.BinaryCrossEntropyLoss,
                            bottom_names = ["fc8", "label"],
                            top_names = ["loss"]))

model.compile()

model.summary()

model.graph_to_json(graph_config_file = os.path.join(config_output_path,"dlrm_fp32_simple-time_1gpu.json") )

model.fit(
          max_iter = num_iter ,                     # maximum number of iterations
          display = 1000,                           # display stats after no. of iterations
          eval_interval = eval_trigger,             # interval for performing evaluation
          snapshot = snapshot_trigger,              # interval after which model snapshots will be taken
          snapshot_prefix = weights_output_path     # path for saving weights
          )

Writing ./basedir/configs/train_dlrm_fp32_count-target-encode_1gpu.py


Now we are ready to train a DLRM model on simple time feature dataset with HugeCTR.

In [38]:
!python3 $train_file_path

[23d07h11m16s][HUGECTR][INFO]: Global seed is 3862588489
Device 0: Tesla V100-SXM2-16GB
Device 1: Tesla V100-SXM2-16GB
[23d07h11m18s][HUGECTR][INFO]: num of DataReader workers: 2
[23d07h11m18s][HUGECTR][INFO]: num_internal_buffers 1
[23d07h11m18s][HUGECTR][INFO]: num_internal_buffers 1
[23d07h11m18s][HUGECTR][INFO]: Vocabulary size: 3090372
[23d07h11m18s][HUGECTR][INFO]: max_vocabulary_size_per_gpu_=10000000
[23d07h11m18s][HUGECTR][INFO]: All2All Warmup Start
[23d07h11m18s][HUGECTR][INFO]: All2All Warmup End
[23d07h11m18s][HUGECTR][INFO]: gpu0 start to init embedding
[23d07h11m18s][HUGECTR][INFO]: gpu1 start to init embedding
[23d07h11m18s][HUGECTR][INFO]: gpu0 init embedding done
[23d07h11m18s][HUGECTR][INFO]: gpu1 init embedding done
Label                                   Dense                         Sparse                        
label                                   dense                          data1                         
(None, 1)                               (None, 1)  

### Train configuration for count and target encoded features dataset

Following the same methodology as done above, we will make a DLRM train python file for this version of the dataset.

In [32]:
# Define paths to save the training file and the weights

train_file_path = os.path.join(config_output_path, 'train_dlrm_fp32_count-target-encode_1gpu.py')
weights_output_path = os.path.join(weights_path,'dlrm_fp32_count-target-encode_1gpu/')

# Creating Directory inside weights folder for saving weights of this training
if os.path.isdir(weights_output_path):
    shutil.rmtree(weights_output_path)
os.mkdir(weights_output_path)

Write a python file for count and target encoded dataset.

In [39]:
%%writefile $train_file_path

import os
import hugectr
from mpi4py import MPI

# Define "fast" root directory for this example
BASE_DIR = os.environ.get("BASE_DIR", "./basedir")

# Directory for NVTabular's count and target encoded processed dataset
data_output_path = os.path.join(BASE_DIR, "processed_ce-te")
output_train_path = os.path.join(data_output_path, "train")
output_valid_path = os.path.join(data_output_path, "valid")

# Directory to store HugeCTR's train weights
config_output_path = os.path.join(BASE_DIR, "configs")
weights_path = os.path.join(BASE_DIR, "weights")
weights_output_path = os.path.join(weights_path,'dlrm_fp32_count-target-encode_1gpu/')

# GPUs used for training
NUM_GPUS = [0]

# Model related parameter
embedding_vec_size = 4                                          
batchsize = 2048                                                   # Batch size used for training
batchsize_eval = 2048                                              # Batch size used for evaluation
max_eval_batchsize = 3768                                          # Iterations required to go through the complete validation set with the set batchsize_eval

# Training related parameters
num_iter = 30001                                                   # Iterations to train the model for
eval_trigger = 10000                                               # Start evaluation after these iterations
snapshot_trigger = 10000                                           # Save model checkpoints after these iterations

# Input for slot size array
embedding_size_str_count_encode = [90397,18,235,18,239,19,236,18,235,18,229,18,224,18,225,18,
                                  219,17,213,17,199,26708,26708,2232749,711223,16,61,61,6,7,3]

embeddings_count_encode = {'hist_cat_0': (18, 16),
 'hist_cat_1': (18, 16),
 'hist_cat_2': (19, 16),
 'hist_cat_3': (18, 16),
 'hist_cat_4': (18, 16),
 'hist_cat_5': (18, 16),
 'hist_cat_6': (18, 16),
 'hist_cat_7': (18, 16),
 'hist_cat_8': (17, 16),
 'hist_cat_9': (17, 16),
 'hist_subcat_0': (235, 34),
 'hist_subcat_1': (239, 34),
 'hist_subcat_2': (236, 34),
 'hist_subcat_3': (235, 34),
 'hist_subcat_4': (229, 34),
 'hist_subcat_5': (224, 33),
 'hist_subcat_6': (225, 33),
 'hist_subcat_7': (219, 33),
 'hist_subcat_8': (213, 32),
 'hist_subcat_9': (199, 31),
 'impr_cat': (26708, 482),
 'impr_subcat': (26708, 482),
 'impression_id': (2232749, 512),
 'time': (90397, 512),
 'time_day': (7, 16),
 'time_day_week': (3, 16),
 'time_hour': (16, 16),
 'time_minute': (61, 16),
 'time_second': (61, 16),
 'time_wd': (6, 16),
 'uid': (711223, 512)}

solver = hugectr.CreateSolver(
                              vvgpu = [NUM_GPUS],                       # GPU Indices to be used for training
                              max_eval_batches = max_eval_batchsize,    # Max no. of eval batches on which eval will be done
                              batchsize_eval = batchsize_eval,          # Minibatch size for eval
                              batchsize = batchsize,                    # Minibatch size for training
                              
                              #learning rate parameters
                              lr = 0.001,
                              warmup_steps = 10000,
                              decay_start = 20000,
                              decay_steps = 200000,
                              decay_power = 1,
                              end_lr = 1e-06,
                              
                              # traning setting
                              i64_input_key = True,                     # As we are using Parquet from NVTabular, I64 should be true
                              repeat_dataset = True,                    # Repeat the dataset for training loop, True for Non Epoch Based Training
                              use_mixed_precision = False,              # Flag to indicate use of Mixed precision training
                              use_cuda_graph = False,                   # cuda graph for forward and back proppogation
                              use_algorithm_search = False,             # algo search within the fc-layers
                              )

# create datareader object
reader = hugectr.DataReaderParams(
    
    data_reader_type = hugectr.DataReaderType_t.Parquet,                # Dataset format selection: Parquet
    source = [output_train_path+"/_file_list.txt"],                     # path to file list of processed NVT Train Dataset, can be used to input various file lists
    eval_source = output_valid_path+"/_file_list.txt",                  # path to file list of processed NVT Valid Dataset
    check_type = hugectr.Check_t.Non,                                   # data error detection turned off
    slot_size_array = embedding_size_str_count_encode,                   # embedding slot array size list which is obtained from NVT operations
    
    )

# create optimiser
optimizer = hugectr.CreateOptimizer(
        optimizer_type = hugectr.Optimizer_t.Adam, # Select optimizer type : Adam
        update_type = hugectr.Update_t.Local,      # Select update type : Local
        epsilon = 1e-07,                           # Adam parameter
        beta1 = 0.9,                               # Adam parameter
        beta2 = 0.999,                             # Adam parameter
        atomic_update = False,                     # Atomic update for SGD, we are using adam,so set to false
)

# create a hugectr model object
model = hugectr.Model(solver, reader, optimizer)

# adding layers to the model

model.add(hugectr.Input(
                        # label parameters
                        label_name = "label",                                 # Name of the Label column in the dataset
                        label_dim = 1,                                        # label dimension, 1 for binary label based dataset. Can be customised according to users need
                        # continous feature parameters
                        dense_dim = 1,                                        # total number of dense (continous) features
                        dense_name = "dense",                                 # Name of the dense (continuous) features
                        # Sparse parameters for categorial inputs
                        data_reader_sparse_param_array = [hugectr.DataReaderSparseParam(
                            hugectr.DataReaderSparse_t.Localized,             # selecting localised or distributed reading of data
                            max_feature_num = len(embeddings_count_encode) ,   # total number of features in the dataset
                            max_nnz = 1,                                      # set 1 for one hot label dataset
                            slot_num = len(embeddings_count_encode),           # total number of slots used for this sparse input in the dataset
                        )],
                        sparse_names = ["data1"]                              #list of names of the sparse input tensors to be referenced by following layers
                        ))

# adding sparse layer
model.add(hugectr.SparseEmbedding(embedding_type = hugectr.Embedding_t.LocalizedSlotSparseEmbeddingHash,
                            max_vocabulary_size_per_gpu = 10000000, # maximum vocabulary size or cardinality across all the input features (can be calculated from embedding size list)
                            embedding_vec_size = embedding_vec_size, # model parameter
                            combiner = 0, # intra-slot reduction operation (0=sum, 1=average)
                            sparse_embedding_name = "sparse_embedding1", #name of the sparse embedding tensor to be referenced by following layers
                            bottom_name = "data1",
                            optimizer = optimizer))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["dense"],
                            top_names = ["fc1"],
                            num_output=512))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc1"],
                            top_names = ["relu1"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu1"],
                            top_names = ["fc2"],
                            num_output=256))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc2"],
                            top_names = ["relu2"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu2"],
                            top_names = ["fc3"],
                            num_output=embedding_vec_size))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc3"],
                            top_names = ["relu3"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.Interaction,
                            bottom_names = ["relu3","sparse_embedding1"],
                            top_names = ["interaction1"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["interaction1"],
                            top_names = ["fc4"],
                            num_output=1024))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc4"],
                            top_names = ["relu4"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu4"],
                            top_names = ["fc5"],
                            num_output=1024))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc5"],
                            top_names = ["relu5"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu5"],
                            top_names = ["fc6"],
                            num_output=512))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc6"],
                            top_names = ["relu6"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu6"],
                            top_names = ["fc7"],
                            num_output=256))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc7"],
                            top_names = ["relu7"]))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu7"],
                            top_names = ["fc8"],
                            num_output=1))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.BinaryCrossEntropyLoss,
                            bottom_names = ["fc8", "label"],
                            top_names = ["loss"]))

model.compile()

model.summary()

model.graph_to_json(graph_config_file = os.path.join(config_output_path,"dlrm_fp32_count-target-encode_1gpu.json"))

model.fit(
          max_iter = num_iter ,                     # maximum number of iterations
          display = 1000,                           # display stats after no. of iterations
          eval_interval = eval_trigger,             # interval for performing evaluation
          snapshot = snapshot_trigger,              # interval after which model snapshots will be taken
          snapshot_prefix = weights_output_path     # path for saving weights
          )    

Overwriting ./basedir/configs/train_dlrm_fp32_count-target-encode_1gpu.py


Now we are ready to train a DLRM model with HugeCTR.

In [40]:
!python3 $train_file_path

[23d07h13m04s][HUGECTR][INFO]: Global seed is 3766403167
Device 0: Tesla V100-SXM2-16GB
Device 1: Tesla V100-SXM2-16GB
[23d07h13m06s][HUGECTR][INFO]: num of DataReader workers: 2
[23d07h13m06s][HUGECTR][INFO]: num_internal_buffers 1
[23d07h13m06s][HUGECTR][INFO]: num_internal_buffers 1
[23d07h13m06s][HUGECTR][INFO]: Vocabulary size: 3090372
[23d07h13m06s][HUGECTR][INFO]: max_vocabulary_size_per_gpu_=10000000
[23d07h13m06s][HUGECTR][INFO]: All2All Warmup Start
[23d07h13m06s][HUGECTR][INFO]: All2All Warmup End
[23d07h13m06s][HUGECTR][INFO]: gpu0 start to init embedding
[23d07h13m06s][HUGECTR][INFO]: gpu1 start to init embedding
[23d07h13m06s][HUGECTR][INFO]: gpu0 init embedding done
[23d07h13m06s][HUGECTR][INFO]: gpu1 init embedding done
Label                                   Dense                         Sparse                        
label                                   dense                          data1                         
(None, 1)                               (None, 1)  

## Step 8: Inference on 1st validation set with HugeCTR

After training 2 DLRM models, let's evaluate them on the validation set using HugeCTR's python inference API. The evaluation metric is AUC.<br>
We will utilize the saved model graph in JSON format for inference, then prepare the validation data into CSR format and finally use the inference APIs to get the predictions.

Let's start with the first trained model i.e. DLRM trained on simple time based features. In the next step, we would repeat the same process for the second trained model. 

### Prepare the inference session


In [41]:
import sys
from hugectr.inference import InferenceParams, CreateInferenceSession
from mpi4py import MPI

config_inference_file_path = os.path.join(config_output_path,'dlrm_fp32_simple-time_1gpu_inference.json')
weights_output_path = os.path.join(weights_path,'dlrm_fp32_simple-time_1gpu/')

In [None]:
# create inference session
inference_params = InferenceParams(model_name = "dlrm",
                              max_batchsize = 2048,
                              hit_rate_threshold = 0.6,
                              dense_model_file = weights_output_path+"/_dense_30000.model",
                              sparse_model_files = [weights_output_path+"/0_sparse_30000.model"],
                              device_id = 0,
                              use_gpu_embedding_cache = True,
                              cache_size_percentage = 0.2,
                              i64_input_key = True)
inference_session = CreateInferenceSession(config_inference_file_path, inference_params)

### Prepare validation set for inference

In [None]:
import pandas as pd

output_valid_path = os.path.join(BASE_DIR, "processed_nvt/valid")

nvtdata_test = pd.read_parquet(output_valid_path)
nvtdata_test.head()

In [None]:
con_feats = ['hist_count']

cat_feats = ['time_hour',
 'hist_cat_0',
 'hist_subcat_0',
 'hist_cat_1',
 'hist_subcat_1',
 'hist_cat_2',
 'hist_subcat_2',
 'hist_cat_3',
 'hist_subcat_3',
 'hist_cat_4',
 'hist_subcat_4',
 'hist_cat_5',
 'hist_subcat_5',
 'hist_cat_6',
 'hist_subcat_6',
 'hist_cat_7',
 'hist_subcat_7',
 'hist_cat_8',
 'hist_subcat_8',
 'hist_cat_9',
 'hist_subcat_9',
 'impr_cat',
 'impr_subcat',
 'impression_id',
 'uid',
 'time_minute',
 'time_second',
 'time_wd',
 'time_day',
 'time_day_week',
 'time']

For inference, HugeCTR expects the data to conform to CSR format which mandates the categorical variables to occupy different integer ranges.<br>
As an example, if there are 10 users and 10 items then HugeCTR expects the users to be encoded in the 1-10 range, while the items to be encoded in the 11-20 range. NVTabular encodes both users and items in the 1-10 ranges.

For this reason, we need to shift the keys of the categorical variable produced by NVTabular to comply with HugeCTR.

In [None]:
import numpy as np

shift = np.insert(np.cumsum(embedding_size_str_simple_time), 0, 0)[:-1]
cat_data = nvtdata_test[cat_feats].values + shift
dense_data = nvtdata_test[con_feats].values

In [None]:
# Define a function to perform batched inference
def infer_batch(inference_session, dense_data_batch, cat_data_batch):
    dense_features = list(dense_data_batch.flatten())
    embedding_columns = list(cat_data_batch.flatten())
    row_ptrs= list(range(0,len(embedding_columns)+1))
    output = inference_session.predict(dense_features, embedding_columns, row_ptrs, True)
    return output

Now we are ready to carry out inference.

In [None]:
batch_size = 2048
num_batches = (len(dense_data) // batch_size) + 1
batch_idx = np.array_split(np.arange(len(dense_data)), num_batches)

In [None]:
labels = []

for batch_id in tqdm(batch_idx):
    dense_data_batch = dense_data[batch_id]
    cat_data_batch = cat_data[batch_id]
    results = infer_batch(inference_session, dense_data_batch, cat_data_batch)
    labels.extend(results)

In [None]:
# Extract ground truth to calculate AUC
ground_truth = nvtdata_test['label'].values

In [None]:
from sklearn.metrics import roc_auc_score

roc_auc_score(ground_truth, labels)

## Step 9: Inference on 2nd validation set with HugeCTR


Following the same procedure as in the last step, let's compute the AUC on the count plus target encoded feature engineered validation set.

In [None]:
config_inference_file_path = os.path.join(config_output_path, 'dlrm_fp32_count-target-encode_1gpu_inference.json')
weights_output_path = os.path.join(weights_path,'dlrm_fp32_count-target-encode_1gpu/')

# create inference session
inference_params = InferenceParams(model_name = "dlrm",
                              max_batchsize = 2048,
                              hit_rate_threshold = 0.6,
                              dense_model_file = weights_output_path+"/_dense_30000.model",
                              sparse_model_files = [weights_output_path+"/0_sparse_30000.model"],
                              device_id = 0,
                              use_gpu_embedding_cache = True,
                              cache_size_percentage = 0.2,
                              i64_input_key = True)
inference_session = CreateInferenceSession(config_inference_file_path, inference_params)

In [None]:
import pandas as pd
output_valid_path = os.path.join(BASE_DIR, "processed_ce-te/valid")

nvtdata_test = pd.read_parquet(output_valid_path)
nvtdata_test.head()

In [None]:
con_feats = [
 'TE_hist_cat_0_hist_cat_1_hist_cat_2_hist_cat_3_hist_cat_4_impr_cat_label_TE',
 'TE_hist_cat_1_hist_cat_2_hist_cat_3_hist_cat_4_hist_cat_5_impr_cat_label_TE',
 'TE_hist_cat_2_hist_cat_3_hist_cat_4_hist_cat_5_hist_cat_6_impr_cat_label_TE',
 'TE_hist_cat_3_hist_cat_4_hist_cat_5_hist_cat_6_hist_cat_7_impr_cat_label_TE',
 'TE_hist_cat_4_hist_cat_5_hist_cat_6_hist_cat_7_hist_cat_8_impr_cat_label_TE',
 'TE_hist_cat_5_hist_cat_6_hist_cat_7_hist_cat_8_hist_cat_9_impr_cat_label_TE',
 'hist_count',
 'impr_cat_count',
 'impr_subcat_count']

cat_feats = ['time',
 'hist_cat_0',
 'hist_subcat_0',
 'hist_cat_1',
 'hist_subcat_1',
 'hist_cat_2',
 'hist_subcat_2',
 'hist_cat_3',
 'hist_subcat_3',
 'hist_cat_4',
 'hist_subcat_4',
 'hist_cat_5',
 'hist_subcat_5',
 'hist_cat_6',
 'hist_subcat_6',
 'hist_cat_7',
 'hist_subcat_7',
 'hist_cat_8',
 'hist_subcat_8',
 'hist_cat_9',
 'hist_subcat_9',
 'impr_cat',
 'impr_subcat',
 'impression_id',
 'uid',
 'time_hour',
 'time_minute',
 'time_second',
 'time_wd',
 'time_day',
 'time_day_week']


In [None]:
shift = np.insert(np.cumsum(embedding_size_str_count_encode), 0, 0)[:-1]
cat_data = nvtdata_test[cat_feats].values + shift
dense_data = nvtdata_test[con_feats].values

In [None]:
# Define a function to perform batched inference
def infer_batch(inference_session, dense_data_batch, cat_data_batch):
    dense_features = list(dense_data_batch.flatten())
    embedding_columns = list(cat_data_batch.flatten())
    row_ptrs= list(range(0,len(embedding_columns)+1))
    output = inference_session.predict(dense_features, embedding_columns, row_ptrs, True)
    return output

Now we are ready to carry out inference on the test set.

In [None]:
batch_size = 2048
num_batches = (len(dense_data) // batch_size) + 1
batch_idx = np.array_split(np.arange(len(dense_data)), num_batches)

In [None]:
labels = []

for batch_id in tqdm(batch_idx):
    dense_data_batch = dense_data[batch_id]
    cat_data_batch = cat_data[batch_id]
    results = infer_batch(inference_session, dense_data_batch, cat_data_batch)
    labels.extend(results)

In [None]:
# Extract ground truth to calculate AUC
ground_truth = nvtdata_test['label'].values

In [None]:
from sklearn.metrics import roc_auc_score

roc_auc_score(ground_truth, labels)

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError


## Conclusion

In this tutorial notebook, we have walked through the process of data cleaning, pre-processing, feature engineering to model training and inferencing, all using the Merlin framework. We hope that this notebook would be helpful for building Recommendation Systems on your datasets as well.

Feel free to experiment with the various hyper-parameters on the feature engineering and model training side and share your results!