<img src="https://developer.download.nvidia.com/notebooks/dlsw-notebooks/merlin_merlin_getting-started-movielens-03-training-with-pytorch/nvidia_logo.png" style="width: 90px; float: right;">

# Getting Started MovieLens: Training with PyTorch

This notebook is created using the latest stable [merlin-pytorch-training](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-pytorch-training/tags) container.

## Overview

We observed that PyTorch training pipelines can be slow as the dataloader is a bottleneck. The native dataloader in PyTorch randomly sample each item from the dataset, which is very slow. In our experiments, we are able to speed-up existing PyTorch pipelines using a highly optimized dataloader.<br><br>

Applying deep learning models to recommendation systems faces unique challenges in comparison to other domains, such as computer vision and natural language processing. The datasets and common model architectures have unique characteristics, which require custom solutions. Recommendation system datasets have terabytes in size with billion examples but each example is represented by only a few bytes. For example, the [Criteo CTR dataset](https://ailab.criteo.com/download-criteo-1tb-click-logs-dataset/), the largest publicly available dataset, is 1.3TB with 4 billion examples. The model architectures have normally large embedding tables for the users and items, which do not fit on a single GPU. You can read more in our [blogpost](https://medium.com/nvidia-merlin/why-isnt-your-recommender-system-training-faster-on-gpu-and-what-can-you-do-about-it-6cb44a711ad4).

### Learning objectives

This notebook explains, how to use the NVTabular dataloader to accelerate PyTorch training.

1. Use **NVTabular dataloader** with PyTorch
2. Leverage **multi-hot encoded input features**

### MovieLens25M

The [MovieLens25M](https://grouplens.org/datasets/movielens/25m/) is a popular dataset for recommender systems and is used in academic publications. The dataset contains 25M movie ratings for 62,000 movies given by 162,000 users. Many projects use only the user/item/rating information of MovieLens, but the original dataset provides metadata for the movies, as well. For example, which genres a movie has. Although we may not improve state-of-the-art results with our neural network architecture, the purpose of this notebook is to explain how to integrate multi-hot categorical features into a neural network.

## NVTabular dataloader for PyTorch

We’ve identified that the dataloader is one bottleneck in deep learning recommender systems when training pipelines with PyTorch. The dataloader cannot prepare the next batch fast enough, so and therefore, the GPU is not utilized. 

As a result, we developed a highly customized tabular dataloader for accelerating existing pipelines in PyTorch.  NVTabular dataloader’s features are:

- removing bottleneck of item-by-item dataloading
- enabling larger than memory dataset by streaming from disk
- reading data directly into GPU memory and remove CPU-GPU communication
- preparing batch asynchronously in GPU to avoid CPU-GPU communication
- supporting commonly used .parquet format for efficient data format
- easy integration into existing PyTorch pipelines by using similar API than the native one


In [111]:
# External dependencies
import os
import gc
import glob
import pandas as pd
import nvtabular as nvt
from sklearn import metrics

We define our base directory, containing the data.

In [112]:
import dotenv
dotenv.load_dotenv('.env')

True

In [113]:
# INPUT_DATA_DIR = os.environ.get(
#     "INPUT_DATA_DIR", os.path.expanduser("~/nvt-examples/movielens/data/")
# )

# DATASET_NAME = "ml-1m"
# DATASET_NAME = "ml-25m"
DATASET_NAME = os.getenv("DATASET_NAME")
# INPUT_DATA_DIR = "/Users/nelsonlin/Desktop/data/movielens"
INPUT_DATA_DIR = os.getenv("INPUT_DATA_DIR")
INPUT_DATA_DIR = os.path.join(INPUT_DATA_DIR,DATASET_NAME)

### Defining Hyperparameters

First, we define the data schema and differentiate between single-hot and multi-hot categorical features. Note, that we do not have any numerical input features. 

In [114]:
BATCH_SIZE = 1024 * 32  # Batch Size
CATEGORICAL_COLUMNS = ["userId", "movieId"]  # Single-hot
CATEGORICAL_MH_COLUMNS = ["genres"]  # Multi-hot
NUMERIC_COLUMNS = []

# Output from ETL-with-NVTabular
TRAIN_PATHS = sorted(glob.glob(os.path.join(INPUT_DATA_DIR, "train", "*.parquet")))
VALID_PATHS = sorted(glob.glob(os.path.join(INPUT_DATA_DIR, "valid", "*.parquet")))

In [115]:
BATCH_SIZE

32768

In [116]:
pd.read_parquet(TRAIN_PATHS[0])

Unnamed: 0,userId,movieId,genres,rating
0,35987,9,"[3, 1, 14]",1.0
1,143647,62,"[5, 13, 10, 2, 9]",1.0
2,57269,3,"[2, 8, 1, 4]",0.0
3,13803,17,"[3, 5]",1.0
4,47329,47,"[5, 2, 9]",1.0
...,...,...,...,...
20000071,139906,1035,"[1, 7, 4]",1.0
20000072,391,2481,"[1, 14]",1.0
20000073,84805,85,"[3, 5, 2, 8]",0.0
20000074,66030,7,"[3, 5, 7, 4]",0.0


In the previous notebook, we used NVTabular for ETL and stored the workflow to disk. We can load the NVTabular workflow to extract important metadata for our training pipeline.

In [117]:
INPUT_DATA_DIR

'/Users/nelsonlin/Desktop/data/movielens/ml-25m'

In [118]:
proc = nvt.Workflow.load(os.path.join(INPUT_DATA_DIR, "workflow"))

The embedding table shows the cardinality of each categorical variable along with its associated embedding size. Each entry is of the form `(cardinality, embedding_size)`.

In [119]:
proc.graph

<merlin.dag.graph.Graph at 0x4e4d8d850>

In [120]:
EMBEDDING_TABLE_SHAPES, MH_EMBEDDING_TABLE_SHAPES = nvt.ops.get_embedding_sizes(proc)
EMBEDDING_TABLE_SHAPES, MH_EMBEDDING_TABLE_SHAPES

({'userId': (162542, 512), 'movieId': (56604, 512)}, {'genres': (21, 16)})

In [121]:
EMBEDDING_TABLE_SHAPES

{'userId': (162542, 512), 'movieId': (56604, 512)}

In [122]:
MH_EMBEDDING_TABLE_SHAPES

{'genres': (21, 16)}

### Initializing NVTabular Dataloader for PyTorch

We import PyTorch and the NVTabular dataloader for PyTorch.

In [123]:
import torch
from nvtabular.loader.torch import TorchAsyncItr, DLDataLoader
from nvtabular.framework_utils.torch.models import Model
from nvtabular.framework_utils.torch.utils import process_epoch

First, we take a look on our dataloader and how the data is represented as tensors. The NVTabular dataloaders are initialized as usual and we specify both single-hot and multi-hot categorical features as cats. The dataloader can automatically recognize the single/multi-hot columns and represent them accordingly.

In [124]:
train_dataset = TorchAsyncItr(
    nvt.Dataset(TRAIN_PATHS),
    batch_size=BATCH_SIZE,
    cats=CATEGORICAL_COLUMNS + CATEGORICAL_MH_COLUMNS,
    conts=NUMERIC_COLUMNS,
    labels=["rating"],
)



In [125]:
train_loader = DLDataLoader(
    train_dataset, batch_size=None, collate_fn=lambda x: x, pin_memory=False, num_workers=0
)

In [126]:
train_loader 

<nvtabular.loader.torch.DLDataLoader at 0x3de5517c0>

In [127]:
valid_dataset = TorchAsyncItr(
    nvt.Dataset(VALID_PATHS),
    batch_size=BATCH_SIZE,
    cats=CATEGORICAL_COLUMNS + CATEGORICAL_MH_COLUMNS,
    conts=NUMERIC_COLUMNS,
    labels=["rating"],
)

In [128]:
valid_loader = DLDataLoader(
    valid_dataset, batch_size=None, collate_fn=lambda x: x, pin_memory=False, num_workers=0
)

Let's generate a batch and take a look on the input features.<br><br>
The single-hot categorical features (`userId` and `movieId`) have a shape of `(32768, 1)`, which is the batch size (as usually). For the multi-hot categorical feature `genres`, we receive two Tensors `genres__values` and `genres__nnzs`.<br><br>
- `values` are the actual data, containing the genre IDs. Note that the Tensor has more values than the batch_size. The reason is, that one datapoint in the batch can contain more than one genre (multi-hot).<br>
- `nnzs` are a supporting Tensor, describing how many genres are associated with each datapoint in the batch.<br><br>
For example,
- if the first two values in `nnzs` is `0`, `2`, then the first 2 values (0, 1) in `values` are associated with the first datapoint in the batch (movieId/userId).<br>
- if the next value in `nnzs` is `6`, then the 3rd, 4th and 5th value in `values` are associated with the second datapoint in the batch (continuing after the previous value stopped).<br> 
- if the third value in `nnzs` is `7`, then the 6th value in `values` are associated with the third datapoint in the batch. 
- and so on

In [129]:
batch = next(iter(train_loader))
# batch

In [130]:
batch

({'genres': (tensor([ 3,  1, 14,  ...,  3,  4,  2]),
   tensor([[    0],
           [    3],
           [    8],
           ...,
           [88770],
           [88773],
           [88775]])),
  'userId': tensor([[ 35987],
          [143647],
          [ 57269],
          ...,
          [ 20368],
          [ 76978],
          [  5542]]),
  'movieId': tensor([[  9],
          [ 62],
          [  3],
          ...,
          [261],
          [564],
          [ 50]])},
 tensor([1., 1., 0.,  ..., 0., 0., 1.]))

In [131]:
X,y = batch

In [132]:
X.keys()

dict_keys(['genres', 'userId', 'movieId'])

In [133]:
values,nnzs =  X['genres']

In [134]:
values

tensor([ 3,  1, 14,  ...,  3,  4,  2])

In [135]:
"""expanded"""
values.shape

torch.Size([88776])

In [136]:
"""index"""
nnzs

tensor([[    0],
        [    3],
        [    8],
        ...,
        [88770],
        [88773],
        [88775]])

In [137]:
nnzs.shape

torch.Size([32768, 1])

In [138]:
X['userId']

tensor([[ 35987],
        [143647],
        [ 57269],
        ...,
        [ 20368],
        [ 76978],
        [  5542]])

In [139]:
X['userId'].shape

torch.Size([32768, 1])

In [140]:
X['movieId'].shape

torch.Size([32768, 1])

In [141]:
X['movieId'].shape

torch.Size([32768, 1])

`X_cat_multihot` is a tuple of two Tensors. For the multi-hot categorical feature `genres`, we receive two Tensors `values` and `nnzs`.

In [142]:
X_cat_multihot = batch[0]['genres']
X_cat_multihot

(tensor([ 3,  1, 14,  ...,  3,  4,  2]),
 tensor([[    0],
         [    3],
         [    8],
         ...,
         [88770],
         [88773],
         [88775]]))

In [143]:
X_cat_multihot[0].shape

torch.Size([88776])

In [144]:
X_cat_multihot[1].shape

torch.Size([32768, 1])

As each datapoint can have a different number of genres, it is more efficient to represent the genres as two flat tensors: One with the actual values (`values`) and one with the length for each datapoint (`nnzs`).

In [145]:
del batch
gc.collect()

236

### Defining Neural Network Architecture

We implemented a simple PyTorch architecture.

* Single-hot categorical features are fed into an Embedding Layer
* Each value of a multi-hot categorical features is fed into an Embedding Layer and the multiple Embedding outputs are combined via summing
* The output of the Embedding Layers are concatenated
* The concatenated layers are fed through multiple feed-forward layers (Dense Layers, BatchNorm with ReLU activations)

You can see more details by checking out the implementation.

We initialize the model. `EMBEDDING_TABLE_SHAPES` needs to be a Tuple representing the cardinality for single-hot and multi-hot input features.

In [146]:
CATEGORICAL_COLUMNS

['userId', 'movieId']

In [147]:
EMBEDDING_TABLE_SHAPES_TUPLE = (
    {
        CATEGORICAL_COLUMNS[0]: EMBEDDING_TABLE_SHAPES[CATEGORICAL_COLUMNS[0]],
        CATEGORICAL_COLUMNS[1]: EMBEDDING_TABLE_SHAPES[CATEGORICAL_COLUMNS[1]],
    },
    {CATEGORICAL_MH_COLUMNS[0]: MH_EMBEDDING_TABLE_SHAPES[CATEGORICAL_MH_COLUMNS[0]]},
)
EMBEDDING_TABLE_SHAPES_TUPLE

({'userId': (162542, 512), 'movieId': (56604, 512)}, {'genres': (21, 16)})

In [148]:
model = Model(
    embedding_table_shapes=EMBEDDING_TABLE_SHAPES_TUPLE,
    num_continuous=0,
    emb_dropout=0.0,
    layer_hidden_dims=[128, 128, 128],
    layer_dropout_rates=[0.0, 0.0, 0.0],
).to("cuda" if torch.cuda.is_available() else "cpu")

In [149]:
model

Model(
  (initial_cat_layer): ConcatenatedEmbeddings(
    (embedding_layers): ModuleList(
      (0): Embedding(162542, 512)
      (1): Embedding(56604, 512)
    )
    (dropout): Dropout(p=0.0, inplace=False)
  )
  (mh_cat_layer): MultiHotEmbeddings(
    (embedding_layers): ModuleList(
      (0): EmbeddingBag(21, 16, mode=sum)
    )
    (dropout): Dropout(p=0.0, inplace=False)
  )
  (initial_cont_layer): BatchNorm1d(0, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (layers): ModuleList(
    (0): Sequential(
      (0): Linear(in_features=1040, out_features=128, bias=True)
      (1): ReLU(inplace=True)
      (2): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (3): Dropout(p=0.0, inplace=False)
    )
    (1): Sequential(
      (0): Linear(in_features=128, out_features=128, bias=True)
      (1): ReLU(inplace=True)
      (2): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (3): Dropout(p=0.0, in

We initialize the optimizer.

In [150]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

We use the `process_epoch` function to train and validate our model. It iterates over the dataset and calculates as usually the loss and optimizer step.

In [151]:
%%time
from time import time
EPOCHS = 5
for epoch in range(EPOCHS):
    start = time()
    train_loss, y_pred, y = process_epoch(train_loader,
                                          model,
                                          train=True,
                                          optimizer=optimizer)
    valid_loss, y_pred, y = process_epoch(valid_loader,
                                          model,
                                          train=False)
    fpr, tpr, thresholds = metrics.roc_curve(y.numpy(), y_pred.numpy(), pos_label=1)
    auc = metrics.auc(fpr, tpr)
    print(f"Epoch {epoch:02d}. Train loss: {train_loss:.4f}. Valid loss: {valid_loss:.4f}. AUC:{auc:4f}")
t_final = time() - start
total_rows = train_dataset.num_rows_processed + valid_dataset.num_rows_processed
print(
    f"run_time: {t_final} - rows: {total_rows * EPOCHS} - epochs: {EPOCHS} - dl_thru: {(total_rows * EPOCHS) / t_final}"
)



Total batches: 610
Total batches: 152
Epoch 00. Train loss: 0.1910. Valid loss: 0.1688. AUC:0.807453




Total batches: 610
Total batches: 152
Epoch 01. Train loss: 0.1636. Valid loss: 0.1661. AUC:0.816713




Total batches: 610
Total batches: 152
Epoch 02. Train loss: 0.1554. Valid loss: 0.1680. AUC:0.817878




Total batches: 610
Total batches: 152
Epoch 03. Train loss: 0.1451. Valid loss: 0.1757. AUC:0.813898




Total batches: 610
Total batches: 152
Epoch 04. Train loss: 0.1340. Valid loss: 0.1819. AUC:0.807674
run_time: 304.9971218109131 - rows: 11460 - epochs: 5 - dl_thru: 37.57412506700563
CPU times: user 1h 24min 50s, sys: 34min 8s, total: 1h 58min 59s
Wall time: 25min 31s
