In [1]:
# Copyright 2022 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ================================

<img src="" style="width: 90px; float: right;">

# Multi-GPU Data Parallel Training with Merlin Models and Horovod

This notebook is created using the latest stable [merlin-tensorflow](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow/tags) container. 

There are multiple ways to scale training pipeline to multiple GPUs:
- Model Parallel: If the model is too large to fit on a single GPU, the parameters are distributed over multiple GPUs
- Data Parallel: Every GPU has a copy of all model parameters and runs the forward/backward pass for it's batch.

In this example, we demonstrate how to scale a training pipeline to multi-GPU, single node. The goal is to maximize throughput and reduce training time. In that way, models can be trained more frequently and researches can run more experiments in a shorter time duration.

It is equivalent to train with a larger batch-size. As we are using more GPUs, we have more computational resources and can achieve higher throughput. All model parameters fits on a single GPU. Every worker (each GPU) has a copy of the model parameters and runs the forward pass on their local batch. The workers synchronize the gradients with each other, which can introduce an overhead. 

We are using [horovod](https://github.com/horovod/horovod) to schedule data parallel model training. NVIDIA Merlin implemented horovod to reduce required code changes and enables the users to scale their pipeline with minimal code changes.

The example is based on [03-Exploring-different-models.ipynb](../03-Exploring-different-models.ipynb). We will focus on data parallel training and do not cover functionality from the notebook.

**Learning objectives**
- Scaling training pipeline to multiple GPUs

## Importing Libraries

First, import some libraries and set some hyperparameters.

In [2]:
import os
import nvtabular as nvt
from nvtabular.ops import *

from merlin.models.utils.example_utils import workflow_fit_transform
from merlin.schema.tags import Tags

from merlin.datasets.synthetic import generate_data

DATA_FOLDER = os.environ.get("DATA_FOLDER", "/raid/data/")
NUM_ROWS = os.environ.get("NUM_ROWS", 1000000)
SYNTHETIC_DATA = eval(os.environ.get("SYNTHETIC_DATA", "True"))
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 16384))
NUM_GPUs = int(os.environ.get("NUM_GPUs", 2))

## Generating the Dataset

We will use the same dataset and preprocessing with NVTabular as in [03-Exploring-different-models.ipynb](../03-Exploring-different-models.ipynb). Please review the example, if you have any questions.

**There is one difference**:
We set `out_files_per_proc` to `NUM_GPUs` in the `to_parquet` function. We want to have one output parquet file for each available GPU.

In [3]:
if SYNTHETIC_DATA:
    train, valid = generate_data("aliccp-raw", int(NUM_ROWS), set_sizes=(0.7, 0.3))
    # save the datasets as parquet files
    train.to_ddf().to_parquet(os.path.join(DATA_FOLDER, "train"))
    valid.to_ddf().to_parquet(os.path.join(DATA_FOLDER, "valid"))

train_path = os.path.join(DATA_FOLDER, "train", "*.parquet")
valid_path = os.path.join(DATA_FOLDER, "valid", "*.parquet")
output_path = os.path.join(DATA_FOLDER, "processed")

category_temp_directory = os.path.join(DATA_FOLDER, "categories")
user_id = ["user_id"] >> Categorify(out_path=category_temp_directory) >> TagAsUserID()
item_id = ["item_id"] >> Categorify(out_path=category_temp_directory) >> TagAsItemID()
targets = ["click"] >> AddMetadata(tags=[Tags.BINARY_CLASSIFICATION, "target"])

item_features = ["item_category", "item_shop", "item_brand"] >> Categorify(out_path=category_temp_directory) >> TagAsItemFeatures()

user_features = (
    [
        "user_shops",
        "user_profile",
        "user_group",
        "user_gender",
        "user_age",
        "user_consumption_2",
        "user_is_occupied",
        "user_geography",
        "user_intentions",
        "user_brands",
        "user_categories",
    ]
    >> Categorify(out_path=category_temp_directory)
    >> TagAsUserFeatures()
)

outputs = user_id + item_id + item_features + user_features + targets

workflow = nvt.Workflow(outputs)

train_dataset = nvt.Dataset(train_path)
valid_dataset = nvt.Dataset(valid_path)

workflow.fit(train_dataset)
workflow.transform(train_dataset).to_parquet(
    output_path=output_path + "/train/",
    out_files_per_proc = NUM_GPUs
)
workflow.transform(valid_dataset).to_parquet(
    output_path=output_path + "/valid/",
    out_files_per_proc = NUM_GPUs
)



## Data Parallel Training with Merlin Models and Horovod

First, we can take a look on the training dataset. We can see that we have the same number of parquet files as we have GPUs available (set by `NUM_GPUs`).

In [4]:
!ls $output_path/train/

_file_list.txt	_metadata.json	part_1.parquet
_metadata	part_0.parquet	schema.pbtxt


We use [horovod](https://github.com/horovod/horovod) to schedule and distribute our training pipeline. Horovod requires multiple code changes, which Merlin Models handles automatically (see reference [Horovod Keras Example](https://horovod.readthedocs.io/en/stable/keras.html)).

Let's take a look on the required code changes to our previous examples:
- We need to write our training pipeline as a separate `.py` file, as we need to start the training run with `horovodrun`
- We need to set `os.environ["CUDA_VISIBLE_DEVICES"] = str(MPI_RANK)`. `MPI_RANK` is the ID of the current worker (starting at 0). The line ensures that each worker can access only one GPU. The first worker (ID=0) can access GPU ID=0 and so on.
- We need to select only one parquet file per worker (`os.path.join(args.path, "train", "part_" + str(MPI_RANK) + ".parquet")`. Similar to assigning one GPU, we select only one `part_<ID>.parquet` file for each GPU. In that way, one epoch is only one pass through the full dataset.
- We need to set `drop_last=True`. As the last batch is not complete and can cause errors.

**Important: The individual parquet files require to have the same number of batches. If one worker has more batches than another, the training process will freeze. The worker with more batches waits for the gradients from the other worker, but it finished the training run.**

We will print number of batches by using `print("Number batches: " + str(len(train_loader)))`.

We can take a look what required code changes are applied automatically by Merlin Models. You do not need to care about them:
- Horovod requires to initialize horovod (`hvd.init()`)
- After the first batch, we need to broadcast the initial variables from one worker to all other ones. As we initialize the model randomly, every worker would have different parameters. In the beginning, we need to ensure every worker starts with the same state.
- We need to scale the learning rate by number of workers
- We need to average the evaluation metrics (Note: AUC metrics could slightly be different, as averaging AUC vs. calculating AUC on the full dataset is different)
- We need to wrap the optimizer to use distributed optimizer

Let's write the training pipeline to a `.py file`.

In [5]:
%%writefile './tf_trainer.py'

import os

MPI_SIZE = int(os.getenv("OMPI_COMM_WORLD_SIZE"))
MPI_RANK = int(os.getenv("OMPI_COMM_WORLD_RANK"))

os.environ["CUDA_VISIBLE_DEVICES"] = str(MPI_RANK)

import nvtabular as nvt
from nvtabular.ops import *

from merlin.models.utils.example_utils import workflow_fit_transform
from merlin.schema.tags import Tags

import merlin.models.tf as mm
from merlin.io.dataset import Dataset
import tensorflow as tf

import argparse

parser = argparse.ArgumentParser(
    description='Hyperparameters for model training'
)
parser.add_argument(
    '--batch-size', 
    type=str,
    help='Batch-Size per GPU worker'
)
parser.add_argument(
    '--path', 
    type=str,
    help='Directory with training and validation data'
)
args = parser.parse_args()

# define train and valid dataset objects
train = Dataset(os.path.join(args.path, "train", "part_" + str(MPI_RANK) + ".parquet"))
valid = Dataset(os.path.join(args.path, "valid", "part_" + str(MPI_RANK) + ".parquet"))

# define schema object
target_column = train.schema.select_by_tag(Tags.TARGET).column_names[0]

train_loader = mm.Loader(
    train,
    schema=train.schema,
    batch_size=int(args.batch_size),
    shuffle=True,
    drop_last=True,
)

valid_loader = mm.Loader(
    valid,
    schema=valid.schema,
    batch_size=int(args.batch_size),
    shuffle=False,
    drop_last=True,
)

print("Number batches: " + str(len(train_loader)))

model = mm.DLRMModel(
    train.schema,
    embedding_dim=16,
    bottom_block=mm.MLPBlock([32, 16]),
    top_block=mm.MLPBlock([32, 16]),
    prediction_tasks=mm.BinaryClassificationTask(target_column),
)

opt = tf.keras.optimizers.Adagrad(learning_rate=0.01)
model.compile(optimizer=opt, run_eagerly=False, metrics=[tf.keras.metrics.AUC()])
losses = model.fit(
    train_loader
)

print(model.evaluate(valid, batch_size=int(args.batch_size), return_dict=True))

Overwriting ./tf_trainer.py


We can start the training run with

```bash
horovod -np <number of GPUs> python <script> [--args]
```

In [7]:
!horovodrun -np $NUM_GPUs python tf_trainer.py --batch-size $BATCH_SIZE --path $output_path

[1,1]<stderr>:2022-11-03 13:24:49.132058: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE3 SSE4.1 SSE4.2 AVX
[1,1]<stderr>:To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
[1,0]<stderr>:2022-11-03 13:24:49.193852: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE3 SSE4.1 SSE4.2 AVX
[1,0]<stderr>:To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
[1,1]<stderr>:2022-11-03 13:24:59.091618: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 16255 MB memory:  -> device: 0, name: Tesla V100-SXM2-32GB-LS, pci b

## Summary

In this example notebook we learned how to scale a Merlin Models training pipeline using Horovod to multiple GPUs to reduce training time. In theory, you should be able to use horovod for multi-node training, as well.

Check out our [example](https://github.com/NVIDIA-Merlin/NVTabular/blob/main/examples/03-Running-on-multiple-GPUs-or-on-CPU.ipynb), how to scale NVTabular workflows to multi-GPU feature engineering.