In [1]:
# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ================================

# Building Intelligent Recommender Systems with Merlin

Recommender Systems (RecSys) are the engine of the modern internet and the catalyst for human decisions. Building a recommendation system is challenging because it requires multiple stages (data preprocessing, offline training, item retrieval, filtering, ranking, ordering, etc.) to work together seamlessly and efficiently. The biggest challenges for new practitioners are the lack of understanding around what RecSys look like in the real world, and the gap between examples of simple models and a production-ready end-to-end recommender systems.

The figure below represents a four-stage recommender systems. This is more complex process than only training a single model and deploying it.

<img src="../images/fourstages.png"  width="70%">

The last step of machine learning (ML)/deep learning (DL) pipeline is to deploy the ETL workflow and saved model to production. In the production setting, we want to transform the input data as done during training (ETL). We need to apply the same mean/std for continuous features and use the same categorical mapping to convert the categories to continuous integer before we use the DL model for a prediction. Therefore, we deploy the NVTabular workflow with the Tensorflow model as an ensemble model to Triton Inference using Merlin Systems library very easily. The ensemble model guarantees that the same transformation is applied to the raw inputs.

### Learning objectives
- Understanding four stages of recommender systems
- Training retrieval and ranking recommender system models with Merlin Models
- Deploying trained models to Triton Inference Server with Merlin Systems


### Steps
- 
-
-

## Feature Engineering with NVTabular

In [2]:
# disable INFO and DEBUG logging everywhere
import logging
logging.disable(logging.WARNING)

In this example notebook, we use the [Ali-CCP: Alibaba Click and Conversion Prediction](https://tianchi.aliyun.com/dataset/dataDetail?dataId=408#1) dataset to build our recommender system models. Below, we will process input features with [NVTabular](https://github.com/NVIDIA-Merlin/NVTabular).

In [3]:
import os
os.environ["TF_GPU_ALLOCATOR"]="cuda_malloc_async"
import cudf
import glob
import gc

import nvtabular as nvt
from nvtabular.ops import *

from merlin.models.utils.example_utils import workflow_fit_transform

from merlin.schema.tags import Tags
from merlin.schema import Schema

import merlin.models.tf as mm
from merlin.io.dataset import Dataset
import tensorflow as tf

2022-03-29 17:33:40.983854: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-03-29 17:33:42.140928: I tensorflow/core/common_runtime/gpu/gpu_process_state.cc:214] Using CUDA malloc Async allocator for GPU: 0
2022-03-29 17:33:42.141064: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 16254 MB memory:  -> device: 0, name: Quadro GV100, pci bus id: 0000:15:00.0, compute capability: 7.0


First, we define our input and output paths.

In [4]:
train_path = '/workspace/data/train/*.parquet'
test_path = '/workspace/data/test/*.parquet'
output_path = '/workspace/processed/ranking'

DATA_FOLDER = os.environ.get("DATA_FOLDER", "/workspace/data/")
train_path = os.path.join(DATA_FOLDER, 'train/' '*.parquet')
valid_path = os.path.join(DATA_FOLDER, 'test/', '*.parquet')
output_path = os.path.join(DATA_FOLDER, 'processed/ranking')

<a id="etl"></a>
ETL Workflow:

In [5]:
%%time

user_id = ["user_id"] >> Categorify(dtype='int32') >> TagAsUserID()
item_id = ["item_id"] >> Categorify(dtype='int32') >> TagAsItemID()

item_features = ["item_category", "item_shop", "item_brand"] >> Categorify(dtype='int32') >> TagAsItemFeatures() 

user_features = ['user_shops', 'user_profile', 'user_group', 
       'user_gender', 'user_age', 'user_consumption_2', 'user_is_occupied',
       'user_geography', 'user_intentions', 'user_brands', 'user_categories'] \
    >> Categorify(dtype='int32') >> TagAsUserFeatures() 

targets = ["click"] >> AddMetadata(tags=[str(Tags.BINARY_CLASSIFICATION), "target"])

outputs = user_id+item_id+item_features+user_features+targets

workflow_fit_transform(outputs, train_path, test_path, output_path, 'workflow_ranking')



CPU times: user 17.1 s, sys: 19.1 s, total: 36.2 s
Wall time: 38.8 s


## Building Recommender Systems

NVTabular exported the schema file, `schema.pbtxt` a protobuf text file, of our processed dataset. To learn more about the schema object and schema file you can explore [02-Merlin-Models-and-NVTabular-applying-to-your-own-dataset.ipynb](https://github.com/NVIDIA-Merlin/models/blob/main/examples/02-Merlin-Models-and-NVTabular-applying-to-your-own-dataset.ipynb) notebook.

We use the `schema` object to define our model.

In [6]:
# define train and valid dataset objects
train = Dataset(os.path.join(output_path, 'train', '*.parquet'), part_size="500MB")
valid = Dataset(os.path.join(output_path, 'valid', '*.parquet'), part_size="500MB")

# define schema object
schema = train.schema



In [7]:
target_column = schema.select_by_tag(Tags.TARGET).column_names[0]
target_column

'click'

### Building a Ranking Model with DLRM

Deep Learning Recommendation Model [(DLRM)](https://arxiv.org/abs/1906.00091) architecture is a popular neural network model originally proposed by Facebook in 2019. The model was introduced as a personalization deep learning model that uses embeddings to process sparse features that represent categorical data and a multilayer perceptron (MLP) to process dense features, then interacts these features explicitly using the statistical techniques proposed in [here](https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=5694074). To learn more about DLRM architetcture please visit `Exploring-different-models` [notebook](https://github.com/NVIDIA-Merlin/models/blob/main/examples/Exploring-different-models.ipynb) in the Merlin Models GH repo.

In [8]:
model = mm.DLRMModel(
    schema,
    embedding_dim=64,
    bottom_block=mm.MLPBlock([128, 64]),
    top_block=mm.MLPBlock([128, 64, 32]),
    prediction_tasks=mm.BinaryClassificationTask(target_column, metrics=[tf.keras.metrics.AUC()])
)

In [9]:
opt = tf.keras.optimizers.Adagrad(learning_rate=0.003)
model.compile(optimizer=opt, run_eagerly=False)
model.fit(train, validation_data=valid, batch_size=16*1024)

2022-03-29 17:34:21.832840: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.




2022-03-29 17:36:14.850606: W tensorflow/core/grappler/optimizers/loop_optimizer.cc:907] Skipping loop optimization for Merge node with control input: cond/else/_1/cond/cond/branch_executed/_175




<keras.callbacks.History at 0x7f5eb8026af0>

In [10]:
model.save('dlrm')

## Building a Retrieval Model with Two-Tower Model

Now we move to retrieval stage. We are going to train a Two-Tower model. To learn more about the Two-tower model you can visit [04-Retrieval-Model.ipynb](https://github.com/NVIDIA-Merlin/models/blob/main/examples/04-Retrieval-Model.ipynb).

In [11]:
output_path = os.path.join(DATA_FOLDER, 'processed/retrieval')

We select only positive interaction rows therefore we remove rows where `click==0` from the dataset with `Filter()` op.

In [12]:
user_id = ["user_id"] >> Categorify(dtype='int32') >> TagAsUserID()
item_id = ["item_id"] >> Categorify(dtype='int32') >> TagAsItemID()

item_features = ["item_category", "item_shop", "item_brand"] >> Categorify(dtype='int32') >> TagAsItemFeatures()

user_features = ['user_shops', 'user_profile', 'user_group', 
       'user_gender', 'user_age', 'user_consumption_2', 'user_is_occupied',
       'user_geography', 'user_intentions', 'user_brands', 'user_categories'] \
        >> Categorify(dtype='int32') >> TagAsUserFeatures() 

inputs = user_id + item_id + item_features + user_features + ['click'] 

outputs = inputs >> Filter(f=lambda df: df["click"] == 1)

workflow_fit_transform(outputs, train_path, test_path, output_path, 'workflow_retrieval')



In [13]:
train_tt = Dataset(os.path.join(output_path, 'train', '*.parquet'), part_size="500MB")
valid_tt = Dataset(os.path.join(output_path, 'valid', '*.parquet'), part_size="500MB")

schema = train.schema
schema = schema.select_by_tag([Tags.ITEM_ID, Tags.USER_ID, Tags.ITEM, Tags.USER])



In [14]:
model = mm.TwoTowerModel(
    schema,
    query_tower=mm.MLPBlock([128, 64], no_activation_last_layer=True),        
    loss="categorical_crossentropy",  
    samplers=[mm.InBatchSampler()],
    embedding_options = mm.EmbeddingOptions(infer_embedding_sizes=True),
    metrics=[mm.RecallAt(10), mm.NDCGAt(10)]
)

In [15]:
model.set_retrieval_candidates_for_evaluation(train)
opt = tf.keras.optimizers.Adagrad(learning_rate=0.003)
model.compile(optimizer=opt, run_eagerly=False)
model.fit(train_tt, validation_data=valid_tt, batch_size=1024*8, epochs=2)

Epoch 1/2

2022-03-29 17:38:52.728848: W tensorflow/core/grappler/optimizers/loop_optimizer.cc:907] Skipping loop optimization for Merge node with control input: cond/then/_0/cond/cond/branch_executed/_184


Epoch 2/2


<keras.callbacks.History at 0x7f5df4eb0490>

### Exporting Retrieval Models

In [16]:
query_tower = model.retrieval_block.query_block()
query_tower.save('query_tower')

In [17]:
from merlin.models.utils.dataset import unique_rows_by_features
user_features = unique_rows_by_features(train, Tags.USER, Tags.USER_ID).compute().reset_index(drop=True)

In [18]:
user_features.head()

Unnamed: 0,user_id,user_shops,user_profile,user_group,user_gender,user_age,user_consumption_2,user_is_occupied,user_geography,user_intentions,user_brands,user_categories
0,0,0,1,5,2,2,2,1,0,0,0,0
1,1,109,0,0,0,0,0,0,0,69,131,9
2,2,301,1,1,1,1,1,1,2,57,4709,57
3,3,1876,23,7,2,3,1,1,1,5,63,3
4,4,534,1,2,1,2,1,1,0,40,22,108


We will artificially add `datetime` and `created` timestamp columns to our user_features dataframe.

In [21]:
from datetime import datetime
user_features["datetime"] = datetime.now()
user_features["datetime"] = user_features["datetime"].astype("datetime64[ns]")
user_features["created"] = datetime.now()
user_features["created"] = user_features["created"].astype("datetime64[ns]")

In [22]:
user_features.to_parquet('./feature_repo/data/user_features.parquet')

In [23]:
item_features = unique_rows_by_features(train, Tags.ITEM, Tags.ITEM_ID).compute().reset_index(drop=True)

In [24]:
item_features.head()

Unnamed: 0,item_id,item_category,item_shop,item_brand
0,0,0,0,0
1,1,441,432,474
2,2,193,1159,125
3,3,3,1463,872
4,4,282,2479,555


In [26]:
item_features.shape

(3078306, 4)

In [27]:
item_features["datetime"] = datetime.now()
item_features["datetime"] = item_features["datetime"].astype("datetime64[ns]")
item_features["created"] = datetime.now()
item_features["created"] = item_features["created"].astype("datetime64[ns]")

In [28]:
item_features.dtypes

item_id                   int32
item_category             int32
item_shop                 int32
item_brand                int32
datetime         datetime64[ns]
created          datetime64[ns]
dtype: object

In [29]:
item_features.head()

Unnamed: 0,item_id,item_category,item_shop,item_brand,datetime,created
0,0,0,0,0,2022-03-29 17:52:47.303269,2022-03-29 17:52:47.315186
1,1,441,432,474,2022-03-29 17:52:47.303269,2022-03-29 17:52:47.315186
2,2,193,1159,125,2022-03-29 17:52:47.303269,2022-03-29 17:52:47.315186
3,3,3,1463,872,2022-03-29 17:52:47.303269,2022-03-29 17:52:47.315186
4,4,282,2479,555,2022-03-29 17:52:47.303269,2022-03-29 17:52:47.315186


In [30]:
# save to disk
item_features.to_parquet('./feature_repo/data/item_features.parquet')

#### Extract and save Item embeddings

In [31]:
item_embs = model.item_embeddings(Dataset(item_features, schema=schema), batch_size=1024)
item_embs_df = item_embs.compute(scheduler="synchronous")

In [32]:
# select only embedding columns
item_embeddings = item_embs_df.iloc[:, 4:]

In [35]:
item_embeddings.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,54,55,56,57,58,59,60,61,62,63
0,-0.162772,0.008327,0.185154,0.017094,0.315708,0.060424,0.028892,-0.08304,-0.146384,0.091426,...,-0.132466,0.180954,-0.024743,-0.167329,-0.020423,-0.03955,0.012492,-0.165495,0.000473,-0.07608
1,0.039736,0.155663,0.0054,0.04005,-0.040316,0.004999,0.072903,-0.096331,-0.002126,-0.093287,...,-0.141285,0.054079,0.106289,-0.222468,-0.031031,-0.107662,-0.028323,-0.093981,0.044722,0.090785
2,0.028971,-0.107555,0.149855,0.129006,0.145146,0.215223,-0.066562,-0.210604,0.067737,0.033975,...,0.067761,-0.003892,0.11982,-0.11679,0.257497,-0.127932,-0.089856,-0.189543,-0.071478,0.055173
3,-0.015247,0.037491,0.059468,0.124438,0.173952,0.249256,-0.078472,-0.235041,-0.016098,0.230393,...,-0.004909,0.063526,0.017203,0.019717,0.080236,-0.033364,0.013381,-0.162241,-0.00242,0.134983
4,0.15972,0.080862,0.056772,0.160642,0.277119,0.093107,0.107751,-0.129591,-0.117683,0.1126,...,-0.155227,0.078523,-0.00125,0.039647,0.117704,-0.045046,0.235068,-0.216196,0.098009,0.095363


In [34]:
# save to disk
item_embeddings.to_parquet('item_embeddings.parquet')

### Next Steps
We trained and exported our ranking and retrieval models and NVTabular workflows. In the next step, we will learn how to deploy our trained models into Triton Inference Server with Merlin Sytems library. NVIDIA Triton Inference Server (TIS) simplifies the deployment of AI models at scale in production. TIS provides a cloud and edge inferencing solution optimized for both CPUs and GPUs. It supports a number of different machine learning frameworks such as TensorFlow and PyTorch.

For the next step, move on to the `Deploying-Model-with-Merlin-Systems.ipynb` notebook to deploy our saved models and NVTabular workflow model as an ensemble to TIS and obtain prediction results for a qiven request.