In [1]:
# Copyright 2022 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =====

## 3. Customize and Extend Merlin Models

Merlin Models provides common and state-of-the-art RecSys architectures in a high-level API as well as all the required low-level building blocks (e.g., input blocks, MLP layers, prediction tasks, loss functions, etc.) or you to create your own architecture. 

In this lab, we define DLRM model architecture from scratch and customize it with Merlin Models.

**Learning Objectives**

- Understand the building blocks of Merlin Models
- Define DLRM model architecture from scratch and customize it with Merlin Models: 
    - add HashedCross features to DLRM Model
    - replace DotProductInteractionBlock with CrossBlock

**Import Required Libraries**

In [2]:
import os

import glob
import cudf 
import pandas as pd
import numpy as np
import nvtabular as nvt
from nvtabular.ops import *
import gc

from merlin.schema.tags import Tags
import merlin.models.tf as mm
from merlin.io.dataset import Dataset

import tensorflow as tf

2022-09-02 19:45:17.346126: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE3 SSE4.1 SSE4.2 AVX
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-09-02 19:45:24.353773: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 8080 MB memory:  -> device: 0, name: Tesla V100-SXM2-16GB-N, pci bus id: 0000:06:00.0, compute capability: 7.0


In [3]:
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

Define data paths.

In [4]:
data_path = '/workspace/data/ecom/'
output_path = os.path.join(data_path,'processed_nvt')

Read processed parquet files as Dataset objects.

In [5]:
train = Dataset(os.path.join(output_path, "train", "*.parquet"), part_size="500MB")
valid = Dataset(os.path.join(output_path, "valid", "*.parquet"), part_size="500MB")

# define schema object
schema = train.schema.without(['event_time_ts', 'user_id_raw', 'product_id_raw'])



In [6]:
target_column = schema.select_by_tag(Tags.TARGET).column_names[0]
target_column

'target'

In [7]:
schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.start_index,properties.cat_path,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension,properties.domain.min,properties.domain.max
0,user_id,"(Tags.USER_ID, Tags.CATEGORICAL, Tags.USER)",int32,False,False,,0.0,0.0,0.0,.//categories/unique.user_id.parquet,351050.0,512.0,0.0,351049.0
1,ts_weekday,"(Tags.CATEGORICAL, Tags.USER)",int32,False,False,,0.0,0.0,0.0,.//categories/unique.ts_weekday.parquet,8.0,16.0,0.0,7.0
2,ts_hour,"(Tags.CATEGORICAL, Tags.USER)",int32,False,False,,0.0,0.0,0.0,.//categories/unique.ts_hour.parquet,25.0,16.0,0.0,24.0
3,product_id,"(Tags.ITEM_ID, Tags.ITEM, Tags.CATEGORICAL)",int32,False,False,,0.0,0.0,0.0,.//categories/unique.product_id.parquet,51425.0,512.0,0.0,51424.0
4,cat_0,"(Tags.ITEM, Tags.CATEGORICAL)",int32,False,False,,0.0,0.0,0.0,.//categories/unique.cat_0.parquet,14.0,16.0,0.0,13.0
5,cat_1,"(Tags.ITEM, Tags.CATEGORICAL)",int32,False,False,,0.0,0.0,0.0,.//categories/unique.cat_1.parquet,61.0,16.0,0.0,60.0
6,cat_2,"(Tags.ITEM, Tags.CATEGORICAL)",int32,False,False,,0.0,0.0,0.0,.//categories/unique.cat_2.parquet,90.0,20.0,0.0,89.0
7,brand,"(Tags.ITEM, Tags.CATEGORICAL)",int32,False,False,,0.0,0.0,0.0,.//categories/unique.brand.parquet,2654.0,132.0,0.0,2653.0
8,price,"(Tags.ITEM, Tags.CONTINUOUS)",float32,False,False,,,,,,,,,
9,relative_price,"(Tags.ITEM, Tags.CONTINUOUS)",float32,False,False,,,,,,,,,


## 3.1. Introduction to Merlin-models core building blocks

Let's explain the functions and blocks that we use to build our DLRM model from scratch. The `Block` is the core abstraction in Merlin Models and is the class from which all blocks inherit. The class extends the `tf.keras.layers.Layer` base class and implements a number of properties that simplify the creation of custom blocks and models. These properties include the Schema object for determining the embedding dimensions, input shapes, and output shapes. Additionally, the Block has a ModelContext instance to store and retrieve public variables and share them with other blocks in the same model as additional meta-data.

**Features Blocks** <br>

`Embeddings:` Creates a ParallelBlock with an EmbeddingTable for each categorical feature in the schema. <br>
`ContinuousFeatures:` Input block for continuous features.

**Connects Methods** <br>
The base class `Block` implements different connects methods that control how to link a given block to other blocks:

- `connect:` Connect the block to other blocks sequentially. The output is a tensor returned by the last block.
- `connect_branch:` Link the block to other blocks in parallel. The output is a dictionary containing the output tensor of each block.
- `connect_with_shortcut:` Connect the block to other blocks sequentially and apply a skip connection with the block's output.
- `connect_with_residual:` Connect the block to other blocks sequentially and apply a residual sum with the block's output.

### 3.2. Build a custom DLRM model

Let's  convert the first five rows of the valid dataset to a batch of input tensors, so that we can check out the outputs from each block below.

In [8]:
batch = mm.sample_batch(valid, batch_size=5, shuffle=False, include_targets=False)

We define the continuous layer based on the schema.

In [28]:
continuous_block = mm.ContinuousFeatures.from_schema(schema, tags=Tags.CONTINUOUS)

We connect the continuous block to an `MLPBlock` to project them into the same dimensionality as the embedding width of categorical features.

In [29]:
bottom_block = continuous_block.connect(mm.MLPBlock([128,64]))
bottom_block(batch).shape

TensorShape([5, 64])

We define the categorical embedding block based on the schema.

In [30]:
from merlin.models.utils.schema_utils import infer_embedding_dim

embeddings_block = mm.Embeddings(
    schema.select_by_tag(Tags.CATEGORICAL),
    dim = 64
)

We display the output tensor of the categorical embedding block using the data from the first batch. We can see the embeddings tensors of categorical features with a default dimension of 64.

In [31]:
embeddings = embeddings_block(batch)
embeddings.keys(), embeddings["user_id"].shape

(dict_keys(['user_id', 'ts_weekday', 'ts_hour', 'product_id', 'cat_0', 'cat_1', 'cat_2', 'brand']),
 TensorShape([5, 64]))

Let's store the continuous and categorical representations in a single dictionary using a ParallelBlock instance.

In [32]:
dlrm_input_block = mm.ParallelBlock(
    {"embeddings": embeddings_block, "bottom_block": bottom_block}
)

By looking at the output, we can see that the ParallelBlock class applies embedding and continuous blocks, in parallel, to the same input batch. Additionally, it merges the resulting tensors into one dictionary.

In [33]:
print("Output shapes of DLRM input block:")
for key, val in dlrm_input_block(batch).items():
    print("\t%s : %s" % (key, val.shape))

Output shapes of DLRM input block:
	user_id : (5, 64)
	ts_weekday : (5, 64)
	ts_hour : (5, 64)
	product_id : (5, 64)
	cat_0 : (5, 64)
	cat_1 : (5, 64)
	cat_2 : (5, 64)
	brand : (5, 64)
	bottom_block : (5, 64)


**Define the interaction block**

Now that we have a vector representation of each input feature, we will create the DLRM interaction block. It consists of three operations:

- Apply a dot product between all continuous and categorical features to learn pairwise interactions.
- Concat the resulting pairwise interaction with the deep representation of conitnuous features (skip-connection).
- Apply an `MLPBlock` with a series of dense layers to the concatenated tensor.

First, we use the `connect_with_shortcut` method to create first two operations of the DLRM interaction block:

In [34]:
from merlin.models.tf.blocks.dlrm import DotProductInteractionBlock

dlrm_interaction = dlrm_input_block.connect_with_shortcut(
    DotProductInteractionBlock(), shortcut_filter=mm.Filter("bottom_block"), aggregation="concat"
)

The `Filter` operation allows us to select the deep_continuous tensor from the dlrm_input_block outputs.

Comment out the line below if you want to see the tensor outputs from `dlrm_interaction` block.

In [35]:
#dlrm_interaction(batch)

**Add HashedCross features to DLRM Model**

We can syntheticly form new features by multiplying (crossing) two or more features. Crossing combinations of features can provide predictive abilities beyond  those features provide individually (see ref [website](https://developers.google.com/machine-learning/crash-course/feature-crosses/video-lecture#:~:text=A%20feature%20cross%20is%20a,an%20understanding%20of%20feature%20crosses)). Merlin Models has `HashedCross` class that allows us to create new features and add non-linearity. `HashedCross` is a transformation block which crosses categorical features using the "hashing trick". Conceptually, the transformation can be thought of as: `hash(concatenation of features) % num_bins`

In [36]:
cross_schema = schema.select_by_name(names=["cat_0", "cat_1"])
cross = mm.HashedCross(cross_schema, num_bins=10, output_mode="one_hot")

In [37]:
cross(batch)

{'cross_cat_0_cat_1': <tf.Tensor: shape=(5, 10), dtype=float32, numpy=
 array([[0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.]], dtype=float32)>}

In [38]:
cross_body = mm.HashedCross(cross_schema, num_bins=1000, output_mode="one_hot").connect(
    mm.MLPBlock([1], no_activation_last_layer=True), block_name='cross_model'
)

In [45]:
cross_body(batch)

<tf.Tensor: shape=(5, 1), dtype=float32, numpy=
array([[-0.100559  ],
       [-0.04721604],
       [-0.04721604],
       [-0.04721604],
       [-0.04721604]], dtype=float32)>

Concat `cross_body` layer to interaction layer using `ParallelBlock`.

In [39]:
dlrm_with_crossbody = mm.ParallelBlock(
    {"dlrm_interaction": dlrm_interaction, "cross_body": cross_body},
    aggregation="concat"
)

Then, we project the learned interaction using a series of dense layers, this defines the top block.

In [40]:
dlrm_with_cross = dlrm_with_crossbody.connect(mm.MLPBlock([64, 128, 256]))

In [41]:
dlrm_with_cross(batch)

<tf.Tensor: shape=(5, 256), dtype=float32, numpy=
array([[0.01488442, 0.        , 0.06350309, ..., 0.        , 0.        ,
        0.01973904],
       [0.01538824, 0.00798569, 0.0539087 , ..., 0.01523179, 0.00435049,
        0.03548171],
       [0.01885886, 0.01465276, 0.05206125, ..., 0.012095  , 0.        ,
        0.05857629],
       [0.00554505, 0.        , 0.04383301, ..., 0.00032771, 0.        ,
        0.04733135],
       [0.01442945, 0.00686804, 0.05356035, ..., 0.01299383, 0.00503429,
        0.03631819]], dtype=float32)>

**Define the Prediction block**

At this stage, we have created the DLRM block that accepts a dictionary of categorical and continuous tensors as input. The output of this block is the interaction representation vector of shape 256. The next step is to use this hidden representation to conduct a given prediction task. In our case, we use the label rating_binary and the objective is: to predict if a user A will give a high rating to a movie B or not.

We use the BinaryClassificationTask class and evaluate the performances using the AUC metric. We also use the LogitsTemperatureScaler block as a pre-transformation operation that scales the logits returned by the task before computing the loss and metrics:

In [42]:
from merlin.models.tf.core.transformations import LogitsTemperatureScaler

binary_task = mm.BinaryClassificationTask(
    schema,
    pre=LogitsTemperatureScaler(temperature=2),
)

**LogitsTemperatureScaler:** It scales the output tensor of predicted logits to lower the model's confidence.

We connect the deep DLRM interaction to the binary task and the method automatically generates the Model class for us. We note that the Model class inherits from `tf.keras.Model` class.

In [43]:
model = mm.Model(dlrm_with_cross, binary_task)

In [44]:
%%time 
model.compile(optimizer='adam', run_eagerly=False, metrics=[tf.keras.metrics.AUC()])
model.fit(train, validation_data=valid, batch_size=4096, epochs=2)

Epoch 1/2
Epoch 2/2
CPU times: user 1min 16s, sys: 13.4 s, total: 1min 30s
Wall time: 52.8 s


<keras.callbacks.History at 0x7f9e37672700>

**Replace `DotProductInteractionBlock` with `CrossBlock`**

In this section, we will replace DotProductInteractionBlock layer with `CrossBlock` that is used in DCN architecture. We will keep bottom layer same as we did above.

In [50]:
continuous_block = mm.ContinuousFeatures.from_schema(schema, tags=Tags.CONTINUOUS)
bottom_block = continuous_block.connect(mm.MLPBlock([128,64]))

In [51]:
embeddings_block = mm.Embeddings(
    schema.select_by_tag(Tags.CATEGORICAL),
    dim = 64
)

In [52]:
embeddings = embeddings_block(batch)
embeddings.keys(), embeddings["user_id"].shape

(dict_keys(['user_id', 'ts_weekday', 'ts_hour', 'product_id', 'cat_0', 'cat_1', 'cat_2', 'brand']),
 TensorShape([5, 64]))

In [53]:
embeddings.keys(), embeddings["cat_0"].shape

(dict_keys(['user_id', 'ts_weekday', 'ts_hour', 'product_id', 'cat_0', 'cat_1', 'cat_2', 'brand']),
 TensorShape([5, 64]))

In [54]:
dlrm_input_block = mm.ParallelBlock(
    {"embeddings": embeddings_block, "bottom_block": bottom_block},
    aggregation="concat"
)

In [55]:
dlrm_input_block(batch)

<tf.Tensor: shape=(5, 576), dtype=float32, numpy=
array([[ 0.        ,  0.        ,  0.20814615, ...,  0.04508896,
        -0.04613587, -0.03192444],
       [ 0.10800916,  0.12711321,  0.        , ...,  0.04508896,
        -0.04613587, -0.03192444],
       [ 0.05967027,  0.21339999,  0.        , ...,  0.04508896,
        -0.04613587, -0.03192444],
       [ 0.11435552,  0.21084045,  0.03167137, ...,  0.04508896,
        -0.04613587, -0.03192444],
       [ 0.10668534,  0.1265437 ,  0.        , ...,  0.04508896,
        -0.04613587, -0.03192444]], dtype=float32)>

In [56]:
#stacked
cross_inter_body = dlrm_input_block.connect(mm.CrossBlock(2))

In [57]:
cross_inter_body(batch)

<tf.Tensor: shape=(5, 576), dtype=float32, numpy=
array([[ 0.        ,  0.        ,  0.2026599 , ...,  0.0473071 ,
        -0.04986251, -0.03376845],
       [ 0.1138205 ,  0.13566758,  0.        , ...,  0.04329231,
        -0.04455113, -0.03160335],
       [ 0.0614365 ,  0.21829154,  0.        , ...,  0.0454263 ,
        -0.04787958, -0.03412252],
       [ 0.12333938,  0.21843155,  0.03011557, ...,  0.04480383,
        -0.04538551, -0.03175518],
       [ 0.10927905,  0.13214834,  0.        , ...,  0.04289481,
        -0.04829   , -0.03109186]], dtype=float32)>

In [58]:
dlrm_interaction = mm.ParallelBlock(
    {"cross_inter_body": cross_inter_body, "bottom_block": bottom_block},
    aggregation="concat"
)                                                

In [59]:
deep_dlrm_interaction = dlrm_interaction.connect(mm.MLPBlock([64, 128, 512]))
deep_dlrm_interaction(batch)

<tf.Tensor: shape=(5, 512), dtype=float32, numpy=
array([[0.        , 0.        , 0.00709292, ..., 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.01600555, ..., 0.        , 0.00905023,
        0.        ],
       [0.        , 0.        , 0.00627399, ..., 0.        , 0.00516872,
        0.        ],
       [0.        , 0.        , 0.0034229 , ..., 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.00772928, ..., 0.        , 0.00478202,
        0.        ]], dtype=float32)>

In [60]:
from merlin.models.tf.core.transformations import LogitsTemperatureScaler

binary_task = mm.BinaryClassificationTask(
    schema,
    pre=LogitsTemperatureScaler(temperature=2),
)

In [61]:
model = mm.Model(deep_dlrm_interaction, binary_task)

In [62]:
%%time 
model.compile(optimizer='adam', run_eagerly=False, metrics=[tf.keras.metrics.AUC()])
model.fit(train, validation_data=valid, batch_size=4096, epochs=2)

Epoch 1/2
Epoch 2/2
CPU times: user 47.2 s, sys: 8.23 s, total: 55.5 s
Wall time: 42.1 s


<keras.callbacks.History at 0x7f9e357c85b0>

### Summary 

In this hands-on lab we learned

- how to use a subset of these pre-existing blocks to create the DLRM model
- how to add HashedCross features to DLRM Model
- how to replace DotProductInteractionBlock with CrossBlock

Please execute the cell below to shut down the kernel before moving on to the next notebook, `04-Building-multi-stage-RecSys-with-Merlin-Systems`.

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)