In [1]:
# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

# Sparse Operation Kit with NVTabular Data Reader

## Overview

Sparse Operation Kit (hereafter SOK) is a toolkit aiming at wrapping sparse operation algorithms, commonly used in recommendation scenarios, into a user-friendly library. When users want to leverage those GPU-accelerated algorithms to speed up their application, they can quickly start from the Python toolkit.

[NVTabular](https://github.com/NVIDIA/NVTabular) is a feature engineering and preprocessing library for tabular data that is designed to easily manipulate terabyte scale datasets and train deep learning (DL) based recommender systems. It's identified that the dataloader is one major bottleneck in deep learning recommender systems when being trained with TensorFlow. The dataloader cannot prepare the next batch fast enough and therefore, the GPU is not fully utilized.

NVTabular dataloader’s features are:

- removing bottleneck of item-by-item dataloading
- enabling larger than memory dataset by streaming from disk
- reading data directly into GPU memory and remove CPU-GPU communication
- preparing batch asynchronously in GPU to avoid CPU-GPU communication
- supporting commonly used .parquet format
- easy integration into existing TensorFlow pipelines by using similar API - works with tf.keras models

More information about NVTabular in [blogpost](https://medium.com/nvidia-merlin/training-deep-learning-based-recommender-systems-9x-faster-with-tensorflow-cc5a2572ea49).

This notebook demonstrates how to use Sparse Operation Kit (SOK demo can be found [here](./sparse_operation_kit_demo.ipynb)) with NVTabular data reader. Instead of TensorFlow Dataset APIs, NVIDIA's NVTabular data loading library can alleviate existing bottlenecks and bring to bear the full power of GPU acceleration.

### Model structure

This demo model is constructed with a dense embedding layer and 7 fully connected layers, where the former 6 fully connected layers have 1024 outsput units, and the last one has 1 output unit.

![model structure](https://raw.githubusercontent.com/NVIDIA/HugeCTR/master/sparse_operation_kit/documents/source/images/demo_model_structure.png)

In our experiments, we see a speed-up by 3x on data-loading of the same training workflow with NVTabular dataloader for this SOK demo. 

## Table of Contents

- [Overview](#Overview)
- [Installation](#Installation)
- [Prepare training data](#Prepare-training-data)
    - [Generate synthetic dataset](#Generate-synthetic-dataset)
- [Convert data](#Convert-data)
- [Training SOK model with NVTabular data reader](#Trainig-SOK-model-with-NVTabular-data-reader)

## Installation

### Requirements

- TensorFlow == 2.x
- NVTabular == 0.6.0

### Get SOK from NGC

The SparseOperationKit is preinstalled in the [Merlin Tensorflow Training Container](https://ngc.nvidia.com/catalog/containers/nvidia:merlin:merlin-tensorflow-training). You can simply start the notebook inside the running NGC docker container:
`nvcr.io/nvidia/merlin/merlin-training:21.12`

And you can check the existence of required libraries by running the following Python code after launching this container.

```python
$ python3 -c "import sparse_operation_kit as sok"
```

### Build SOK from Source Code

Sometimes new features are not merged into NGC container in time, so you can build SOK from source code. If you want to build SparseOperationKit from the souce code instead of using the NGC container, please refer to the [Setup development environment](../../docs/hugectr_contributor_guide.md#build-sparse-operation-kit-sok-from-source-code).

## Prepare training data

We generate synthetic dataset which is more practical and provides better understanding of our SOK demo. It contains one dense feature and related label. We can set the number of feature fields in the used embedding layers and the number of keys in each slot.

### Generate synthetic dataset

First, specify hyper parameters for training.

In [2]:
%reset -f
# Resets the namespace by removing all names defined by the user without asking for confirmation

args = dict()

args["gpu_num"] = 8                               # the number of available GPUs
args["global_batch_size"] = 8192                  # the globally batchsize for all GPUs
args["slot_num"] = 100                            # the number of feature fields in this embedding layer
args["nnz_per_slot"] = 10                         # the number of keys in each slot
args["vocabulary_size"] = 1024 * 8
args["iter_num"] = 30                             # the number of training iterations
args["demo_dirpath"] = "../documents/tutorials/DenseDemo/" # the path to SOK demo
args["data_dirpath"] = args["demo_dirpath"] + "data/"                           # the path used to save the generated data

Then we will generate synthetic dataset and initial values that is used to initialize embedding parameters.

In [3]:
# Standard Libraries
import sys, os
from pathlib import Path

# External Dependencies
import numpy as np
import pandas as pd

# SOK python scripts
sys.path.append("../")
sys.path.append("../unit_test/test_scripts/")
sys.path.append("../documents/tutorials/")
import utility

[INFO]: sparse_operation_kit is imported


2021-09-13 07:08:53.058873: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-09-13 07:08:54.287132: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0


In [4]:
# Create directory for generated data
Path(args["data_dirpath"]).mkdir(parents=True, exist_ok=True)
data_filename = args["data_dirpath"] + "data.file"

counts = args["iter_num"] // 10
total_samples, total_labels = None, None

for _ in range(counts):
    random_samples, random_labels = utility.generate_random_samples(
                                        num_of_samples=args["global_batch_size"] * 10,
                                        vocabulary_size=args["vocabulary_size"],
                                        slot_num=args["slot_num"],
                                        max_nnz=args["nnz_per_slot"],
                                        use_sparse_mask=False)
    if total_samples is None:
        total_samples = random_samples
        total_labels = random_labels
    else:
        total_samples = np.concatenate([total_samples, random_samples], axis=0)
        total_labels = np.concatenate([total_labels, random_labels], axis=0)

print("[INFO]: total_samples.shape={}, total_labels.shape={}".format(total_samples.shape, total_labels.shape))
utility.save_to_file(data_filename, total_samples, total_labels)

[INFO]: begin to generate random samples
[INFO]: generated random samples
[INFO]: begin to generate random samples
[INFO]: generated random samples
[INFO]: begin to generate random samples
[INFO]: generated random samples
[INFO]: total_samples.shape=(245760, 100, 10), total_labels.shape=(245760, 1)
[INFO]: dumpped items to file ../documents/tutorials/DenseDemo/data/data.file


## Convert data

### Convert synthetic data to parquet file

We are supposed to convert the data to parquet file first for use of NVTabular data reader. And we split the data into several parts for every worker.

In [5]:
total_labels = total_labels.reshape(-1) 
num_parts = args["gpu_num"]
samples = np.split(total_samples, num_parts, axis=0)
labels = np.split(total_labels, num_parts, axis=0)

We create several pandas DataFrames and then concatenante them for avoidance of limitation of Python list.

In [6]:
sample_name = "feature"
label_name = "label"
dfs = list()
for i in range(num_parts):
    dfs.append(pd.DataFrame({
        sample_name : samples[i].tolist(),
        label_name : labels[i].tolist(),
    }))
    print(dfs[-1].tail(5))

                                                 feature  label
30715  [[77, 74, 68, 59, 55, 40, 29, 21, 12, 2], [159...      0
30716  [[73, 72, 71, 46, 45, 44, 19, 19, 18, 17], [15...      0
30717  [[69, 65, 53, 49, 37, 33, 29, 17, 12, 1], [154...      1
30718  [[78, 78, 69, 69, 23, 23, 14, 14, 6, 5], [158,...      1
30719  [[79, 72, 62, 62, 53, 35, 26, 17, 7, 0], [161,...      1
                                                 feature  label
30715  [[80, 80, 80, 58, 58, 42, 33, 21, 20, 13], [15...      1
30716  [[79, 72, 68, 46, 43, 39, 35, 31, 6, 2], [151,...      1
30717  [[66, 64, 63, 40, 38, 37, 35, 13, 11, 9], [154...      0
30718  [[72, 70, 68, 67, 65, 64, 32, 29, 26, 25], [15...      1
30719  [[76, 68, 67, 66, 53, 39, 24, 24, 9, 1], [136,...      0
                                                 feature  label
30715  [[77, 64, 64, 63, 61, 53, 25, 23, 22, 22], [16...      1
30716  [[65, 47, 38, 30, 29, 20, 19, 12, 11, 2], [154...      0
30717  [[80, 77, 62, 48, 47, 46, 31, 28,

We concatenante the DataFrames and the "index" column should be popped out.

In [7]:
df = pd.concat(dfs)
## Reset index
df = df.reset_index()
df.pop("index")
print(df.tail(5))

                                                  feature  label
245755  [[69, 63, 56, 45, 38, 37, 31, 30, 13, 6], [158...      0
245756  [[74, 72, 59, 56, 50, 48, 26, 24, 2, 0], [159,...      0
245757  [[73, 72, 54, 51, 34, 33, 32, 13, 12, 11], [15...      1
245758  [[76, 63, 53, 38, 29, 28, 19, 15, 6, 5], [159,...      0
245759  [[72, 64, 64, 59, 56, 51, 48, 43, 40, 31], [15...      1


We can simply transfer it to parquet file with Pandas API `to_parquet`.

In [8]:
parquet_filename = args["data_dirpath"] + "data.parquet"
df.to_parquet(parquet_filename)

When MPI is used, we'd like to let each CPU process have its own datareader, and each datareader reads from different data source. Therefore the whole dataset is splited. We use `nvt.Workflow` to transform original data to split data.

In [9]:
# Split 
import nvtabular as nvt

workflow = nvt.Workflow(nvt.ColumnGroup(sample_name) + label_name)
workflow.transform(nvt.Dataset(parquet_filename)).to_parquet(
    output_path=args["data_dirpath"] + "convert/",
    shuffle=None,
    out_files_per_proc=num_parts,
)

## Trainig SOK model with NVTabular data reader

We train the SOK model with TensorFlow MultiWorkerMirroredStrategy and MPI.

In [10]:
%%writefile run_sok_nvt.py

import argparse
import sys,os,json,glob
sys.path.append("../")
sys.path.append("../documents/tutorials/")
sys.path.append("../unit_test/test_scripts/")
sys.path.append("../documents/tutorials/DenseDemo/")

MPI_SIZE = int(os.getenv("OMPI_COMM_WORLD_SIZE"))
MPI_RANK = int(os.getenv("OMPI_COMM_WORLD_RANK"))

def main(args, task_id):
    import nvtabular as nvt  # noqa: E402 isort:skip
    from nvtabular.framework_utils.tensorflow import layers  # noqa: E402 isort:skip
    from nvtabular.loader.tensorflow import KerasSequenceLoader  # noqa: E402 isort:skip
    from nvtabular.io.dataset import Dataset

    import tensorflow as tf
    from models import SOKDenseDemo
    import utility
    from utility import sparse_operation_kit as sok
    import nvtx


    comm_options = tf.distribute.experimental.CommunicationOptions(
        bytes_per_pack=0,
        timeout_seconds=None,
        implementation=tf.distribute.experimental.CommunicationImplementation.NCCL
    )

    if args.total_gpu_num == 1:
        strategy = tf.distribute.MirroredStrategy()
    else:
        port = 12345
        os.environ["TF_CONFIG"] = json.dumps({
            "cluster": {"worker": ["localhost" + ":" + str(port + i) 
                                    for i in range(args.worker_num)]},
            "task": {"type": "worker", "index": task_id}
        })
        strategy = tf.distribute.MultiWorkerMirroredStrategy(
            communication_options=comm_options)

    sample_name = "feature"
    label_name = "label"

    nvt_loader = KerasSequenceLoader(
        paths_or_dataset=sorted(glob.glob(args.data_filename)),
        batch_size=args.global_batch_size//MPI_SIZE,
        label_names=[label_name],
        cont_names=[sample_name],
        engine="parquet",
        shuffle=False,
        buffer_size=0.06,  # how many batches to load at once
        sparse_as_dense=True,
        global_size=MPI_SIZE,
        global_rank=task_id,
    )

    with strategy.scope():
        sok.Init(global_batch_size=args.global_batch_size)

        model = SOKDenseDemo(max_vocabulary_size_per_gpu=args.max_vocabulary_size_per_gpu,
                             embedding_vec_size=args.embedding_vec_size,
                             slot_num=args.slot_num,
                             nnz_per_slot=args.nnz_per_slot,
                             num_dense_layers=args.num_dense_layers)

        embedding_optimizer = utility.get_embedding_optimizer(args.optimizer)(learning_rate=0.1)
        dense_optimizer = utility.get_dense_optimizer(args.optimizer)(learning_rate=0.1)

    loss_fn = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)
    def _replica_loss(labels, logits):
        loss = loss_fn(labels, logits)
        return tf.nn.compute_average_loss(loss, global_batch_size=args.global_batch_size)

    @tf.function
    def _train_step(inputs, labels):
        with tf.GradientTape() as tape:
            logit = model(inputs, training=True)
            loss = _replica_loss(labels, logit)
        emb_variable, other_variable = sok.split_embedding_variable_from_others(model.trainable_variables)
        grads, emb_grads = tape.gradient(loss, [other_variable, emb_variable])
        if "plugin" not in args.optimizer:
            with sok.OptimizerScope(emb_variable):
                embedding_optimizer.apply_gradients(zip(emb_grads, emb_variable),
                                                    experimental_aggregate_gradients=False)
        else:
            embedding_optimizer.apply_gradients(zip(emb_grads, emb_variable),
                                                experimental_aggregate_gradients=False)
        
        # mannually all-reduce dense gradients
        replica_context = tf.distribute.get_replica_context()
        grads = replica_context.all_reduce("sum", grads, 
                                            options=comm_options)
        dense_optimizer.apply_gradients(zip(grads, other_variable),
                                        experimental_aggregate_gradients=False)

        # manually all-reduce loss, it is ok, because replica_loss has already been used to 
        # update local variables.
        loss = replica_context.all_reduce(tf.distribute.ReduceOp.SUM, loss,
                                          options=comm_options)
        return loss

    input_tensor_shape = [-1, args.slot_num, args.nnz_per_slot]
    for i, (feats, labels) in enumerate(nvt_loader):
        if args.stop_at_iter > 0 and i >= args.stop_at_iter:
            break

        rng = nvtx.start_range(message="Iteration_" + str(i), color="blue")
        
        input_vals, input_nnzs = feats[sample_name]
        inputs = tf.reshape(input_vals, input_tensor_shape)
        total_loss = strategy.run(_train_step, args=(inputs, labels))

        nvtx.end_range(rng)
        print("[INFO]: Iteration: {}, loss={}".format(i, total_loss))

# def set_affinity(rank):
#     affinity_map = {0: list(range(48,64)) + list(range(176,192)),
#                     1: list(range(48,64)) + list(range(176,192)),
#                     2: list(range(16,32)) + list(range(144,160)),
#                     3: list(range(16,32)) + list(range(144,160)),
#                     4: list(range(112,128)) + list(range(240,256)),
#                     5: list(range(112,128)) + list(range(240,256)),
#                     6: list(range(80,96)) + list(range(208,224)),
#                     7: list(range(80,96)) + list(range(208,224))}

#     my_affinity = affinity_map[rank]
#     import os
#     os.sched_setaffinity(0, my_affinity)

def set_affinity(rank):
    num_part = MPI_SIZE
    part_cpu_cnt = os.cpu_count() // num_part
    total_affinity = os.sched_getaffinity(0)
    affinity_map = dict()
    for i in range(num_part):
        tmp_map = list(total_affinity)[:part_cpu_cnt] if i % 2 == 0 else list(total_affinity)[part_cpu_cnt:]
        affinity_map.update({i : tmp_map})

    part = num_part // 2
    my_affinity = affinity_map[rank % part] + affinity_map[(rank+part-1) % part + part]

    try:
        os.sched_setaffinity(0, my_affinity)
    except OSError:
        import traceback
        print("[rank-{}] os.sched_setaffinity(0, {}) -> OSError".format(rank, my_affinity))
        traceback.print_exc()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="run DNN model with SparseOperationKit")

    parser.add_argument("--data_filename", type=str,
                        help="the filename of training datas",
                        default="./data/convert/*.parquet")
    parser.add_argument("--global_batch_size", type=int,
                        required=True)
    parser.add_argument("--max_vocabulary_size_per_gpu", type=int,
                        required=True)
    parser.add_argument("--slot_num", type=int, required=True,
                        help="the number of feature fields")
    parser.add_argument("--nnz_per_slot", type=int, required=True,
                        help="the number of keys in each slot")
    parser.add_argument("--num_dense_layers", type=int, required=True,
                        help="the number of fully connected layers in this DNN model")
    parser.add_argument("--embedding_vec_size", type=int, required=True,
                        help="the dimension of embedding vectors")
    parser.add_argument('--optimizer', type=str,
                        help="use what optimizer",
                        required=False, default='plugin_adam',
                        choices=['plugin_adam', 'adam', 'sgd'])
    parser.add_argument("--stop_at_iter", type=int, required=False,
                        help="early stop the process if iteration reachs this setting.",
                        default=-1)
    parser.add_argument("--data_splited", type=int, required=False,
                        default=0, choices=[0, 1],
                        help="it is a flag used to denotes whether the data is already splited."+\
                             "by default, it is set to 0, which means the data is not splited.")

    args = parser.parse_args()

    size = MPI_SIZE
    args.worker_num = size
    args.total_gpu_num = size

    task_id = MPI_RANK

    set_affinity(task_id)

    os.environ["CUDA_VISIBLE_DEVICES"] = str(task_id)
    main(args, task_id)

Overwriting run_sok_nvt.py


Add `--oversubscribe` to `mpiexec` if there is not enough slots.

In [11]:
!mpiexec -n 8 --allow-run-as-root \
    python3 run_sok_nvt.py \
    --data_filename="../documents/tutorials/DenseDemo/data/convert/*.parquet" \
    --global_batch_size=8192 \
    --max_vocabulary_size_per_gpu=8192 \
    --slot_num=100 \
    --nnz_per_slot=10 \
    --num_dense_layers=6 \
    --embedding_vec_size=4 \
    --data_splited=1 \
    --optimizer="adam"

2021-09-13 07:10:51.231643: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-09-13 07:10:51.304766: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-09-13 07:10:51.304764: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-09-13 07:10:51.402127: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-09-13 07:10:51.402127: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-09-13 07:10:51.404622: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-09-13 07:10:51.429951: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic libr

[INFO]: Iteration: 2, loss=2216.47412109375
[INFO]: Iteration: 2, loss=2216.47412109375
[INFO]: Iteration: 3, loss=0.7021139860153198
[INFO]: Iteration: 3, loss=0.7021139860153198
[INFO]: Iteration: 3, loss=0.7021139860153198
[INFO]: Iteration: 3, loss=0.7021139860153198
[INFO]: Iteration: 3, loss=0.7021139860153198
[INFO]: Iteration: 3, loss=0.7021139860153198
[INFO]: Iteration: 3, loss=0.7021139860153198
[INFO]: Iteration: 3, loss=0.7021139860153198
...
[INFO]: Iteration: 29, loss=0.6933262944221497
[INFO]: Iteration: 29, loss=0.6933262944221497
[INFO]: Iteration: 29, loss=0.6933262944221497
[INFO]: Iteration: 29, loss=0.6933262944221497
[INFO]: Iteration: 29, loss=0.6933262944221497
[INFO]: Iteration: 29, loss=0.6933262944221497
[INFO]: Iteration: 29, loss=0.6933262944221497
[INFO]: Iteration: 29, loss=0.6933262944221497
