In [1]:
# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ================================

## Building Intelligent Recommender Systems with Merlin

Recommender Systems (RecSys) are the engine of the modern internet and the catalyst for human decisions. Building a recommendation system is challenging because it requires multiple stages (data preprocessing, offline training, item retrieval, filtering, ranking, ordering, etc.) to work together seamlessly and efficiently. The biggest challenges for new practitioners are the lack of understanding around what RecSys look like in the real world, and the gap between examples of simple models and a production-ready end-to-end recommender systems.

The figure below represents a four-stage recommender systems. This is more complex process than only training a single model and deploying it.

<img src="../images/fourstages.png" width="70%">

....

### Learning objectives
- Understanding four stages of recommender systems
- Training retrieval and ranking recommender system models with Merlin Models
- Deploying trained models to Triton Inference Server with Merlin Systems

In additon to NVIDIA Merlin libraries and `Triton` library, we are using two external libraries in these series of examples:

- [Feast](https://docs.feast.dev/): an end-to-end open source feature store library for machine learning
- [Faiss](https://github.com/facebookresearch/faiss): a library for efficient similarity search and clustering of dense vectors

Please follow the instructions in the README.md file to install these libraries.

### Import required libraries and functions

In [2]:
# disable INFO and DEBUG logging everywhere
import logging
logging.disable(logging.WARNING)

In [3]:
import os
os.environ["TF_GPU_ALLOCATOR"]="cuda_malloc_async"
import cudf
import glob
import gc

import nvtabular as nvt
from nvtabular.ops import *

from merlin.models.utils.example_utils import workflow_fit_transform

from merlin.schema.tags import Tags
from merlin.schema import Schema

import merlin.models.tf as mm
from merlin.io.dataset import Dataset
import tensorflow as tf

2022-03-29 23:42:55.288888: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-03-29 23:42:56.473363: I tensorflow/core/common_runtime/gpu/gpu_process_state.cc:214] Using CUDA malloc Async allocator for GPU: 0
2022-03-29 23:42:56.473496: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 16254 MB memory:  -> device: 0, name: Quadro GV100, pci bus id: 0000:15:00.0, compute capability: 7.0


In this example notebook, we use the [Ali-CCP: Alibaba Click and Conversion Prediction](https://tianchi.aliyun.com/dataset/dataDetail?dataId=408#1) dataset to build our recommender system models. Below, we will process input features with [NVTabular](https://github.com/NVIDIA-Merlin/NVTabular).

First, we define our input and output paths.

In [4]:
DATA_FOLDER = os.environ.get("DATA_FOLDER", "/workspace/data/")
train_path = os.path.join(DATA_FOLDER, 'train/' '*.parquet')
valid_path = os.path.join(DATA_FOLDER, 'test/', '*.parquet')
output_path = os.path.join(DATA_FOLDER, 'processed/ranking')

### Feature Engineering with NVTabular

In [5]:
%%time

user_id = ["user_id"] >> Categorify(dtype='int32') >> TagAsUserID()
item_id = ["item_id"] >> Categorify(dtype='int32') >> TagAsItemID()

item_features = ["item_category", "item_shop", "item_brand"] >> Categorify(dtype='int32') >> TagAsItemFeatures() 

user_features = ['user_shops', 'user_profile', 'user_group', 
       'user_gender', 'user_age', 'user_consumption_2', 'user_is_occupied',
       'user_geography', 'user_intentions', 'user_brands', 'user_categories'] \
    >> Categorify(dtype='int32') >> TagAsUserFeatures() 

targets = ["click"] >> AddMetadata(tags=[str(Tags.BINARY_CLASSIFICATION), "target"])

outputs = user_id+item_id+item_features+user_features+targets

workflow_fit_transform(outputs, train_path, valid_path, output_path, 'workflow_ranking')



CPU times: user 17 s, sys: 20.4 s, total: 37.4 s
Wall time: 39.9 s


## Building a Ranking Model with DLRM

NVTabular exported the schema file, `schema.pbtxt` a protobuf text file, of our processed dataset. To learn more about the schema object and schema file you can explore [02-Merlin-Models-and-NVTabular-applying-to-your-own-dataset.ipynb](https://github.com/NVIDIA-Merlin/models/blob/main/examples/02-Merlin-Models-and-NVTabular-applying-to-your-own-dataset.ipynb) notebook.

We use the `schema` object to define our model.

In [6]:
# define train and valid dataset objects
train = Dataset(os.path.join(output_path, 'train', '*.parquet'), part_size="500MB")
valid = Dataset(os.path.join(output_path, 'valid', '*.parquet'), part_size="500MB")

# define schema object
schema = train.schema



In [7]:
target_column = schema.select_by_tag(Tags.TARGET).column_names[0]
target_column

'click'

Deep Learning Recommendation Model [(DLRM)](https://arxiv.org/abs/1906.00091) architecture is a popular neural network model originally proposed by Facebook in 2019. The model was introduced as a personalization deep learning model that uses embeddings to process sparse features that represent categorical data and a multilayer perceptron (MLP) to process dense features, then interacts these features explicitly using the statistical techniques proposed in [here](https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=5694074). To learn more about DLRM architetcture please visit `Exploring-different-models` [notebook](https://github.com/NVIDIA-Merlin/models/blob/main/examples/Exploring-different-models.ipynb) in the Merlin Models GH repo.

In [8]:
model = mm.DLRMModel(
    schema,
    embedding_dim=64,
    bottom_block=mm.MLPBlock([128, 64]),
    top_block=mm.MLPBlock([128, 64, 32]),
    prediction_tasks=mm.BinaryClassificationTask(target_column, metrics=[tf.keras.metrics.AUC()])
)

In [9]:
opt = tf.keras.optimizers.Adagrad(learning_rate=0.003)
model.compile(optimizer=opt, run_eagerly=False)
model.fit(train, validation_data=valid, batch_size=16*1024)

2022-03-29 23:43:36.846224: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 383778816 exceeds 10% of free system memory.
2022-03-29 23:43:37.267331: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
2022-03-29 23:43:37.519839: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 383778816 exceeds 10% of free system memory.
2022-03-29 23:43:38.404223: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 383778816 exceeds 10% of free system memory.
2022-03-29 23:44:03.052781: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 383778816 exceeds 10% of free system memory.


   1/2442 [..............................] - ETA: 16:36:16 - auc: 0.5057 - loss: 0.6707 - regularization_loss: 0.0000e+00 - total_loss: 0.6707

2022-03-29 23:44:03.604666: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 383778816 exceeds 10% of free system memory.




2022-03-29 23:45:29.869725: W tensorflow/core/grappler/optimizers/loop_optimizer.cc:907] Skipping loop optimization for Merge node with control input: cond/then/_0/cond/cond/branch_executed/_161




<keras.callbacks.History at 0x7fc87c607460>

In [10]:
model.save('dlrm')

## Building a Retrieval Model with Two-Tower Model

Now we move to retrieval stage. We are going to train a Two-Tower model. To learn more about the Two-tower model you can visit [04-Retrieval-Model.ipynb](https://github.com/NVIDIA-Merlin/models/blob/main/examples/04-Retrieval-Model.ipynb).

In [11]:
output_path = os.path.join(DATA_FOLDER, 'processed/retrieval')

We select only positive interaction rows therefore we remove rows where `click==0` from the dataset with `Filter()` op.

In [12]:
user_id = ["user_id"] >> Categorify(dtype='int32') >> TagAsUserID()
item_id = ["item_id"] >> Categorify(dtype='int32') >> TagAsItemID()

item_features = ["item_category", "item_shop", "item_brand"] >> Categorify(dtype='int32') >> TagAsItemFeatures()

user_features = ['user_shops', 'user_profile', 'user_group', 
       'user_gender', 'user_age', 'user_consumption_2', 'user_is_occupied',
       'user_geography', 'user_intentions', 'user_brands', 'user_categories'] \
        >> Categorify(dtype='int32') >> TagAsUserFeatures() 

inputs = user_id + item_id + item_features + user_features + ['click'] 

outputs = inputs >> Filter(f=lambda df: df["click"] == 1)

workflow_fit_transform(outputs, train_path, valid_path, output_path, 'workflow_retrieval')



In [13]:
train_tt = Dataset(os.path.join(output_path, 'train', '*.parquet'), part_size="500MB")
valid_tt = Dataset(os.path.join(output_path, 'valid', '*.parquet'), part_size="500MB")

schema = train_tt.schema
schema = schema.select_by_tag([Tags.ITEM_ID, Tags.USER_ID, Tags.ITEM, Tags.USER])



In [14]:
model = mm.TwoTowerModel(
    schema,
    query_tower=mm.MLPBlock([128, 64], no_activation_last_layer=True),        
    loss="categorical_crossentropy",  
    samplers=[mm.InBatchSampler()],
    embedding_options = mm.EmbeddingOptions(infer_embedding_sizes=True),
    metrics=[mm.RecallAt(10), mm.NDCGAt(10)]
)

In [15]:
model.set_retrieval_candidates_for_evaluation(train_tt)
opt = tf.keras.optimizers.Adagrad(learning_rate=0.003)
model.compile(optimizer=opt, run_eagerly=False)
model.fit(train_tt, validation_data=valid_tt, batch_size=1024*8, epochs=2)

Epoch 1/2

2022-03-29 23:47:40.388242: W tensorflow/core/grappler/optimizers/loop_optimizer.cc:907] Skipping loop optimization for Merge node with control input: cond/then/_0/cond/cond/branch_executed/_184


Epoch 2/2


<keras.callbacks.History at 0x7fc7db22bd30>

In the following cells we are going to export the required user, item features files, and save the query (user) tower model. If you want to read more about exporting retrieval models, please visit [05-Retrieval-Model.ipynb](https://github.com/NVIDIA-Merlin/models/blob/main/examples/05-Retrieval-Model.ipynb) notebook in Merlin Models library repo.

Before we move onto the next step, we need to create a Feast feature repository from the command line. You can open the terminal and run the commands below:
    
```
cd /Merlin/examples/PoC/
feast init feature_repo
```

You should be seeing a message like <i>Creating a new Feast repository in /Merlin/examples/PoC/feature_repo </i> printed out in the terminal. Navigate to the `feature_repo` folder and remove the demo parquet file created by default, and `examples.py` file.

```
cd feature_repo
rm example.py
cd data
rm driver_stats.parquet
```

### Exporting query (user) model

In [16]:
query_tower = model.retrieval_block.query_block()
query_tower.save('query_tower')

### Exporting user and item features

In [17]:
from merlin.models.utils.dataset import unique_rows_by_features
user_features = unique_rows_by_features(train, Tags.USER, Tags.USER_ID).compute().reset_index(drop=True)

In [18]:
user_features.head()

Unnamed: 0,user_id,user_shops,user_profile,user_group,user_gender,user_age,user_consumption_2,user_is_occupied,user_geography,user_intentions,user_brands,user_categories
0,0,0,1,5,2,2,2,1,0,0,0,0
1,1,109,0,0,0,0,0,0,0,69,131,9
2,2,301,1,1,1,1,1,1,2,57,4709,57
3,3,1876,23,7,2,3,1,1,1,5,63,3
4,4,534,1,2,1,2,1,1,0,40,22,108


We will artificially add `datetime` and `created` timestamp columns to our user_features dataframe.

In [19]:
from datetime import datetime
user_features["datetime"] = datetime.now()
user_features["datetime"] = user_features["datetime"].astype("datetime64[ns]")
user_features["created"] = datetime.now()
user_features["created"] = user_features["created"].astype("datetime64[ns]")

In [20]:
user_features.head()

Unnamed: 0,user_id,user_shops,user_profile,user_group,user_gender,user_age,user_consumption_2,user_is_occupied,user_geography,user_intentions,user_brands,user_categories,datetime,created
0,0,0,1,5,2,2,2,1,0,0,0,0,2022-03-29 23:48:32.641878,2022-03-29 23:48:33.068569
1,1,109,0,0,0,0,0,0,0,69,131,9,2022-03-29 23:48:32.641878,2022-03-29 23:48:33.068569
2,2,301,1,1,1,1,1,1,2,57,4709,57,2022-03-29 23:48:32.641878,2022-03-29 23:48:33.068569
3,3,1876,23,7,2,3,1,1,1,5,63,3,2022-03-29 23:48:32.641878,2022-03-29 23:48:33.068569
4,4,534,1,2,1,2,1,1,0,40,22,108,2022-03-29 23:48:32.641878,2022-03-29 23:48:33.068569


In [21]:
user_features.dtypes

user_id                        int32
user_shops                     int32
user_profile                   int32
user_group                     int32
user_gender                    int32
user_age                       int32
user_consumption_2             int32
user_is_occupied               int32
user_geography                 int32
user_intentions                int32
user_brands                    int32
user_categories                int32
datetime              datetime64[ns]
created               datetime64[ns]
dtype: object

In [22]:
user_features.to_parquet('./feature_repo/data/user_features.parquet')

In [23]:
item_features = unique_rows_by_features(train, Tags.ITEM, Tags.ITEM_ID).compute().reset_index(drop=True)

In [24]:
item_features.head()

Unnamed: 0,item_id,item_category,item_shop,item_brand
0,0,0,0,0
1,1,441,432,474
2,2,193,1159,125
3,3,3,1463,872
4,4,282,2479,555


In [25]:
item_features.shape

(3078306, 4)

In [26]:
item_features["datetime"] = datetime.now()
item_features["datetime"] = item_features["datetime"].astype("datetime64[ns]")
item_features["created"] = datetime.now()
item_features["created"] = item_features["created"].astype("datetime64[ns]")

In [27]:
item_features.dtypes

item_id                   int32
item_category             int32
item_shop                 int32
item_brand                int32
datetime         datetime64[ns]
created          datetime64[ns]
dtype: object

In [28]:
item_features.head()

Unnamed: 0,item_id,item_category,item_shop,item_brand,datetime,created
0,0,0,0,0,2022-03-29 23:48:34.033435,2022-03-29 23:48:34.035852
1,1,441,432,474,2022-03-29 23:48:34.033435,2022-03-29 23:48:34.035852
2,2,193,1159,125,2022-03-29 23:48:34.033435,2022-03-29 23:48:34.035852
3,3,3,1463,872,2022-03-29 23:48:34.033435,2022-03-29 23:48:34.035852
4,4,282,2479,555,2022-03-29 23:48:34.033435,2022-03-29 23:48:34.035852


In [29]:
# save to disk
item_features.to_parquet('./feature_repo/data/item_features.parquet')

### Extract and save Item embeddings

In [30]:
item_embs = model.item_embeddings(Dataset(item_features, schema=schema), batch_size=1024)
item_embs_df = item_embs.compute(scheduler="synchronous")

In [31]:
# select only embedding columns
item_embeddings = item_embs_df.iloc[:, 4:]

In [32]:
item_embeddings.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,54,55,56,57,58,59,60,61,62,63
0,0.087966,-0.095411,0.095909,0.14638,0.097243,0.109508,0.083107,0.104098,0.047606,-0.016557,...,0.030678,0.047079,-0.154461,0.122583,0.311171,-0.120558,0.039577,-0.014079,-0.180747,-0.040095
1,0.134474,0.019899,0.086098,0.20198,0.185933,0.00752,-0.075239,-0.075921,0.059336,0.123993,...,0.200382,0.107429,0.017707,-0.019648,0.292447,-0.108777,-0.139637,-0.022911,-0.102338,0.358516
2,0.046249,0.018888,0.071851,0.034601,0.190652,0.088418,-0.062084,0.102151,0.037121,0.02837,...,0.076869,0.052328,-0.009836,-0.106719,0.17223,0.134776,-0.194744,0.128883,-0.110976,0.23922
3,0.084705,0.140259,0.082066,-0.004641,0.09033,0.013105,0.243955,0.053653,0.301302,0.028994,...,-0.081015,0.08821,-0.086933,0.086017,0.076796,0.11776,0.049366,0.06853,-0.189925,0.100747
4,0.256753,0.08787,0.247346,-0.013136,0.082029,0.005857,-0.024048,-0.048067,-0.085008,0.023413,...,0.165265,-0.003999,0.193471,-0.005712,0.195031,-0.115037,-0.132177,0.033123,-0.158193,0.027308


In [33]:
# save to disk
item_embeddings.to_parquet('item_embeddings.parquet')

### Create feature definitions 

Now we will create our user and item features definitions in the user_features.py and item_features.py files and save the files in the feature_repo.

In [36]:
%%writefile ./feature_repo/user_features.py
from google.protobuf.duration_pb2 import Duration
import datetime 
from feast import Entity, Feature, FeatureView, ValueType
from feast.infra.offline_stores.file_source import FileSource

user_features = FileSource(
    path="/Merlin/examples/PoC/feature_repo/data/user_features.parquet",
    event_timestamp_column="datetime",
    created_timestamp_column="created",
)

user = Entity(name="user_id", value_type=ValueType.INT32, description="user id",)

user_features_view = FeatureView(
    name="user_features",
    entities=["user_id"],
    ttl=Duration(seconds=86400 * 7),
    features=[
        Feature(name="user_shops", dtype=ValueType.INT32),
        Feature(name="user_profile", dtype=ValueType.INT32),
        Feature(name="user_group", dtype=ValueType.INT32),
        Feature(name="user_gender", dtype=ValueType.INT32),
        Feature(name="user_age", dtype=ValueType.INT32),
        Feature(name="user_consumption_2", dtype=ValueType.INT32),
        Feature(name="user_is_occupied", dtype=ValueType.INT32),
        Feature(name="user_geography", dtype=ValueType.INT32),
        Feature(name="user_intentions", dtype=ValueType.INT32),
        Feature(name="user_brands", dtype=ValueType.INT32),
        Feature(name="user_categories", dtype=ValueType.INT32),
    ],
    online=True,
    input=user_features,
    tags={},
)

Writing ./feature_repo/user_features.py


In [37]:
%%writefile ./feature_repo/item_features.py
from google.protobuf.duration_pb2 import Duration
import datetime 
from feast import Entity, Feature, FeatureView, ValueType
from feast.infra.offline_stores.file_source import FileSource

item_features = FileSource(
    path="/Merlin/examples/PoC/feature_repo/data/item_features.parquet",
    event_timestamp_column="datetime",
    created_timestamp_column="created",
)

item = Entity(name="item_id", value_type=ValueType.INT32, description="item id",)

item_features_view = FeatureView(
    name="item_features",
    entities=["item_id"],
    ttl=Duration(seconds=86400 * 7),
    features=[
        Feature(name="item_category", dtype=ValueType.INT32),
        Feature(name="item_shop", dtype=ValueType.INT32),
        Feature(name="item_brand", dtype=ValueType.INT32),
    ],
    online=True,
    input=item_features,
    tags={},
)

Writing ./feature_repo/item_features.py


Let's checkout our Feast feature repository structure.

In [38]:
!tree ./feature_repo

[01;34m./feature_repo[00m
├── __init__.py
├── [01;34mdata[00m
│   ├── item_features.parquet
│   └── user_features.parquet
├── feature_store.yaml
├── item_features.py
└── user_features.py

1 directory, 6 files


### Next Steps
We trained and exported our ranking and retrieval models and NVTabular workflows. In the next step, we will learn how to deploy our trained models into [Triton Inference Server (TIS)](https://github.com/triton-inference-server/server) with Merlin Sytems library.

For the next step, move on to the `02-Deploying-Model-with-Merlin-Systems.ipynb` notebook to deploy our saved models as an ensemble to TIS and obtain prediction results for a qiven request.