In [1]:
# Copyright 2022 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

<img src="http://developer.download.nvidia.com/notebooks/dlsw-notebooks/merlindataloader02-multi-gpu-tensorflow-with-horovod/nvidia_logo.png" style="width: 90px; float: right;">

# Multi-GPU training with Tensorflow and Horovod

This notebook is created using the latest stable [merlin-tensorflow](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow) container.

## Overview

In this notebook we will look at multi-GPU training with Tensorflow and Horovod. [Horovod](https://horovod.ai/) is a distributed deep learning framework that aims to make distributed deep learning fast and easy to use.

In this example, we will provide a simple pipeline to train a MatrixFactorization Model in TensorFlow on multiple GPUs (for the example we will use two but this method can be easily extended to use more).

### Learning objectives

- Training on multiple GPUs with Merlind Dataloader and Horovod.

# Downloading and preparing the dataset

We will base our example on the  [MovieLens25M](https://grouplens.org/datasets/movielens/25m/) dataset.

In [2]:
import os

from merlin.core.utils import download_file
from merlin.core.dispatch import get_lib

In [3]:
DATA_PATH = os.environ.get("DATA_PATH", os.path.expanduser("~/workspace"))
download_file("http://files.grouplens.org/datasets/movielens/ml-25m.zip", DATA_PATH + "/ml-25m.zip")

downloading ml-25m.zip: 262MB [00:10, 24.4MB/s]                                                                                                                         
unzipping files: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8/8 [00:04<00:00,  1.60files/s]


# Training a TensorFlow Keras Model with Merlin dataloader and Horovod

This example can be scaled to running on as many GPUs as you would like.

In this example, we will implement data parallel training. Each GPU will have an exact copy of our model, however it will train on different subsets of data.

Let's us split our train data into as many parquet files as are needed.

In [4]:
GPU_COUNT = 2  # specify how many GPUs you would like to train on

ratings = get_lib().read_csv(DATA_PATH + "/ml-25m/ratings.csv")

ratings.to_parquet(os.path.join(DATA_PATH, "train.parquet"))

Let us now take a closer look at what else we will need to train with Horovod.

### Write the training script to a file

We need to have a `.py` file we will be able to load into each process using `horovodrun`.

### Set `CUDA visible devices` correctly inside each process

We need to set the visible device in each process to its `rank`. This way process with `rank 0` will use the zeroth GPU, process with `rank 1` will use the first GPU, and so on. This ensures that each worker can access only a single GPU.

In [5]:
%%writefile "./tf_trainer.py"

import os

# the order of statements and imports is imoportant
# for instance, we need to make sure we set
# CUDA_VISIBLE_DEVICES before we import Loader and cudf

MPI_SIZE = int(os.getenv("OMPI_COMM_WORLD_SIZE"))
MPI_RANK = int(os.getenv("OMPI_COMM_WORLD_RANK"))

os.environ["CUDA_VISIBLE_DEVICES"] = str(MPI_RANK)


from merlin.io import Dataset

import tensorflow as tf
import horovod.tensorflow as hvd

from merlin.core.dispatch import get_lib

os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"

hvd.init()

from merlin.loader.tensorflow import Loader


DATA_PATH = os.getenv("DATA_PATH", os.path.expanduser("~/workspace"))

dataset = Dataset(os.path.join(DATA_PATH, "train.parquet"))
dataset = dataset.repartition(MPI_SIZE)

loader = Loader(
    dataset,
    batch_size=64 * 1024,
    global_size=MPI_SIZE,
    global_rank=MPI_RANK,
    device=MPI_RANK,
)

label_column = 'rating'


def process_batch(data, _):
    x = {col: data[col] for col in data.keys() if col != label_column}
    y = data[label_column]
    return (x, y)


loader._map_fns = [process_batch]


class MatrixFactorization(tf.keras.Model):
    def __init__(self, n_factors):
        super().__init__()
        self.user_embeddings = tf.keras.layers.Embedding(162542, n_factors)
        self.movie_embeddings = tf.keras.layers.Embedding(209172, n_factors)

    def call(self, batch, training=False):
        user_embs = self.user_embeddings(batch['userId'])
        movie_embs = self.movie_embeddings(batch['movieId'])

        tensor = (tf.squeeze(user_embs) * tf.squeeze(movie_embs))
        return tf.reduce_sum(tensor, 1)


model = MatrixFactorization(64)
loss = tf.keras.losses.MeanSquaredError()
opt = tf.optimizers.Adam(1e-2 * hvd.size())

checkpoint_prefix = "./checkpoints"
checkpoint = tf.train.Checkpoint(model=model, optimizer=opt)


@tf.function
def training_step(features, labels, first_batch):
    with tf.GradientTape() as tape:
        probs = model(features, training=True)
        loss_value = loss(labels, probs)

    # Horovod: add Horovod Distributed GradientTape.
    tape = hvd.DistributedGradientTape(tape)

    grads = tape.gradient(loss_value, model.trainable_variables)
    opt.apply_gradients(zip(grads, model.trainable_variables))

    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    #
    # Note: broadcast should be done after the first gradient step to ensure optimizer
    # initialization.
    if first_batch:
        hvd.broadcast_variables(model.variables, root_rank=0)
        hvd.broadcast_variables(opt.variables(), root_rank=0)

    return loss_value


# Horovod: adjust number of steps based on number of GPUs.
for batch, (features, labels) in enumerate(loader):
    loss_value = training_step(features, labels, batch == 0)

    if batch % 10 == 0 and hvd.rank() == 0:
        print('Step #%d\tLoss: %.6f' % (batch, loss_value))

hvd.join()

# Horovod: save checkpoints only on worker 0 to prevent other workers from
# corrupting it.
if hvd.rank() == 0:
    checkpoint.save(checkpoint_prefix)

Overwriting ./tf_trainer.py


We now can run our distributed training using `horovodrun`!

All we need to do is provide the number of GPUs we would like to run on and the script to execute.

In [6]:
!horovodrun -np {GPU_COUNT} python tf_trainer.py

2023-06-03 21:35:18.892140: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-06-03 21:35:18.932879: I tensorflow/core/platform/cpu_feature_guard.cc:183] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE3 SSE4.1 SSE4.2 AVX, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[1,1]<stderr>:2023-06-03 21:35:23.549563: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[1,0]<stderr>:2023-06-03 21:35:23.568539: I tensorflow/core/util/port.cc

## Conclusion

We demonstrated how to train a TensorFlow Keras model with the Merlin dataloader on multiple GPUs using Horovod.

# Next Steps

Merlin dataloader is part of NVIDIA Merlin, a open source framework for recommender systems. In this example, we looked only on a specific use-case to accelerate existing training pipelines. We provide more libraries to make recommender system pipelines easier and faster to work with:

* [NVTabular](https://github.com/NVIDIA-Merlin/NVTabular) is a library to accelerate and scale feature engineering
* [Merlin Models](https://github.com/NVIDIA-Merlin/models) is a library with high-quality implementations of popular recommender systems architectures

The libraries are designed to work closely together. We recommend to check out our examples:

* [Getting Started with NVTabular: Process Tabular Data On GPU](https://github.com/NVIDIA-Merlin/NVTabular/blob/main/examples/01-Getting-started.ipynb)
* [Getting Started with Merlin Models: Develop a Model for MovieLens](https://github.com/NVIDIA-Merlin/models/blob/main/examples/01-Getting-started.ipynb)

In the example, [From ETL to Training RecSys models - NVTabular and Merlin Models integrated example](https://github.com/NVIDIA-Merlin/models/blob/main/examples/02-Merlin-Models-and-NVTabular-integration.ipynb), we explain how the close collaboration works.