In [1]:
# Copyright 2020 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

<img src="http://developer.download.nvidia.com/compute/machine-learning/frameworks/nvidia_logo.png" style="width: 90px; float: right;">

# Benchmark NVTabular data loader
We are interested to benchmark the NVTabular data loader and compare its performance to the TensorFlow "native" data loader based on tf.records. In [benchmark-preprocess.ipynb](???), we preprocess the dataset, ready to use for NVTabular data loader (parquet) and TensorFlow native data loader (tf.records). In this notebook, we will train a neural network in TensorFlow using either data loader and measure the performance.

First, we install gpustat

In [None]:
!pip install gpustat

We run single GPU version and set only one visible device.

In [2]:
import os, time
os.environ["CUDA_VISIBLE_DEVICES"]="0"

We import the required libraries.

In [3]:
import glob
import nvtabular as nvt

from time import time
from tqdm.notebook import trange

import pickle

We define multiple helper functions.<br><br>
*get_dataloader* returns the NVTabular data loader or TensorFlow native data loader, depending on dl_type<br>
*get_model* returns a standard TensorFlow model<br>
*make_tf_dataset* is a helper function to initalize the TensorFlow data loader

In [4]:
### Helper Function

def get_dataloader(dl_type='NVTabular', columns=[], HASH=False):
    if dl_type=='NVTabular':
        if HASH:
            workflow, columns = make_feature_column_workflow(columns, LABEL_COLUMNS[0])
        train_dataset_tf = KerasSequenceLoader(
            output_train_dir + '*.parquet', # you could also use a glob pattern
            batch_size=BATCH_SIZE,
            label_names=LABEL_COLUMNS,
            cat_names=CATEGORICAL_COLUMNS,
            cont_names=CONTINUOUS_COLUMNS,
            engine='parquet',
            shuffle=True,
            buffer_size=0.06, # how many batches to load at once
            parts_per_chunk=PARTS_PER_CHUNK
        )
        valid_dataset_tf = KerasSequenceLoader(
            output_valid_dir + '*.parquet', # you could also use a glob pattern
            batch_size=BATCH_SIZE,
            label_names=LABEL_COLUMNS,
            cat_names = CATEGORICAL_COLUMNS,
            cont_names=CONTINUOUS_COLUMNS,
            engine='parquet',
            shuffle=False,
            buffer_size=0.06,
            parts_per_chunk=PARTS_PER_CHUNK
        )
        if HASH:
            train_dataset_tf.map(workflow)
            valid_dataset_tf.map(workflow)
    if dl_type=='TensorFlow':
        train_dataset_tf = make_tf_dataset(TFRECORDS_TRAIN, columns)
        valid_dataset_tf = make_tf_dataset(TFRECORDS_VALID, columns)
    return(train_dataset_tf, valid_dataset_tf, columns)

def get_model(hidden_dims, inputs, features, dl_type):
    if dl_type=='NVTabular':
        dense_layer = layers.DenseFeatures(features)
    if dl_type=='TensorFlow':
        dense_layer = tf.keras.layers.DenseFeatures(features)
    x = dense_layer(inputs)
    
    for hidden in hidden_dims:
        x = tf.keras.layers.Dense(hidden, activation='relu')(x)
    
    x = tf.keras.layers.Dense(1, activation='sigmoid')(x)
    model = tf.keras.Model(inputs=inputs, outputs=x)
    metrics = [tf.keras.metrics.AUC(curve="ROC", name="auroc")]
    model.compile('sgd', 'binary_crossentropy', metrics=metrics)
    return(model)

def make_tf_dataset(file_pattern, columns):
    # get rid of embeddings for "raw" columns
    columns = [getattr(col, "categorical_column", col) for col in columns]
    # feature spec tells us how to parse tfrecords
    # using FixedLenFeatures keeps from using sparse machinery,
    # but obviously wouldn't extend to multi-hot categoricals
    get_dtype = lambda col: getattr(col, "dtype", tf.int64)
    feature_spec = {column.name: tf.io.FixedLenFeature((1,), get_dtype(column)) for column in columns}
    feature_spec[LABEL_COLUMNS[0]] = tf.io.FixedLenFeature((1,), tf.int64)

    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern,
        BATCH_SIZE,
        feature_spec,
        label_key=LABEL_COLUMNS[0],
        num_epochs=EPOCHS,
        shuffle=True,
        shuffle_buffer_size=BATCH_SIZE,
    )
    return dataset

def log_textfile(filename, text, mode):
    print(text)
    f = open(filename, mode)
    f.write(str(text) + str('\n'))
    f.close()

In addition, we define functions to measure the performance.<br><br>
*time_only_dl* measures the time for just iterating through the dataset for 1 epoch WITHOUT training a model<br>
*time_training* measures the time for training a model for 1 epoch<br><br>
Note, that 1 epoch is defined by a number of steps. Tf.records does not allow partical batches in tf.records, so we approximated one epoch by the number of steps.

In [5]:
def time_only_dl(dl, num_steps):
    start = time.time()
    i = 0
    j= 0
    bl_done = False
    while not(bl_done) and i<num_steps:
        for _, batch in enumerate(dl):
            if i == num_steps:
                bl_done = True
                break
            i+=1
        j+=1
    end = time.time()
    return(end-start, i, j)

def time_training(model, train_dataset_tf, steps):
    start = time.time()
    history = model.fit(train_dataset_tf, epochs=1, steps_per_epoch=steps)
    end = time.time()
    return(end-start, steps, 1)

We define which benchmark, we want to run.

In [6]:
AMP = False
DL_TYPES = ['NVTabular', 'TensorFlow']
BENCHMARK_TYPES = ['time_only_dl', 'time_training', 'convergence_training_loss', 'convergence_val_loss']
DL_TYPE = 'NVTabular'
BENCHMARK_TYPE = 'time_training'
HASH = False
CPU = False

if DL_TYPE not in DL_TYPES:
    raise ValueError(DL_TYPE + ' is not supported.  Choose from ' + str(DL_TYPES))
    
if BENCHMARK_TYPE not in BENCHMARK_TYPES:
    raise ValueError(BENCHMARK_TYPE + ' is not supported. Choose from ' + str(BENCHMARK_TYPES))

We define the inpurt directory for the parquet and tf.records file.

In [7]:
# define some information about where to get our data
OUTPUT_DIR = '/raid/data/criteo/'
OUTPUT_DATA_DIR = os.environ.get('OUTPUT_DATA_DIR', OUTPUT_DIR + 'output') # where we'll save our procesed data to
TFRECORD_DIR = os.environ.get("TFRECORD_DIR", OUTPUT_DIR + 'tfrecords')
TFRECORDS_TRAIN = os.path.join(TFRECORD_DIR, 'train', '*.tfrecords')
TFRECORDS_VALID = os.path.join(TFRECORD_DIR, 'valid', '*.tfrecords')

output_train_dir = os.path.join(OUTPUT_DATA_DIR, 'train/')
output_valid_dir = os.path.join(OUTPUT_DATA_DIR, 'valid/')

We define some hyperparameters and network architecture.

In [8]:
# Batch size for training the deep learning model
BATCH_SIZE = int(os.environ.get('BATCH_SIZE', 1024*64))      
# Number of epochs (only for convergence_val_loss)
EPOCHS = 5
# Number of steps in training to collect train_loss (only for convergence_training_loss)
TRAIN_STEPS = 20
# Max. number of steps per epoch (tf.records allows only full batches)
STEPS = int(150000000/BATCH_SIZE)
# Number of units in hidden layer - length is number of hidden layers
HIDDEN_DIMS = [1024, 1024, 1024, 1024]
# Number of parts using in shuffling of NVTabular data loader
PARTS_PER_CHUNK = int(os.environ.get('PARTS_PER_CHUNK', 1))

We load the saved NVTabular workflow to extract the data schema and some statistics.

In [9]:
proc = nvt.Workflow(
    cat_names=[],
    cont_names=[],
    label_name=[]
)
proc.load_stats(OUTPUT_DATA_DIR + '/stats_and_workflow')
# for col in proc.stats["categories"]:
#     proc.stats["categories"][col] = proc.stats["categories"][col].replace('/raid/data/criteo/', OUTPUT_DIR)
EMBEDDING_TABLE_SHAPES = nvt.ops.get_embedding_sizes(proc)

CATEGORICAL_COLUMNS = proc.columns_ctx['categorical']['base']
CONTINUOUS_COLUMNS = proc.columns_ctx['continuous']['base']
LABEL_COLUMNS = proc.columns_ctx['label']['base']

We import TensorFlow and set *TF_MEMORY_ALLOCATION*, that TensorFlow will not reserve the full GPU memory.

In [10]:
import time

import tensorflow as tf

from tensorflow.python.feature_column import feature_column_v2 as fc

# we can control how much memory to give tensorflow with this environment variable
# IMPORTANT: make sure you do this before you initialize TF's runtime, otherwise
# TF will have claimed all free GPU memory
os.environ['TF_MEMORY_ALLOCATION'] = "0.5" # fraction of free memory
from nvtabular.loader.tensorflow import KerasSequenceLoader, KerasSequenceValidater
from nvtabular.framework_utils.tensorflow import layers
from tensorflow.python.feature_column import feature_column_v2 as fc

from nvtabular.framework_utils.tensorflow import make_feature_column_workflow

We define the tf.keras.Input tensor and tf.feature_column s. A common technique is to use hashing with `tf.feature_column.categorical_column_with_hash_bucket` to reduce the dimensonality of the embedding tables. Optional we can add hash columns to our workflow.

In [11]:
if not HASH:
    inputs = {}
    features = []

    for col in CATEGORICAL_COLUMNS:
        inputs[col] =  tf.keras.Input(
            name=col,
            dtype=tf.int32,
            shape=(1,)
        )
        features.append(
            tf.feature_column.embedding_column(
                tf.feature_column.categorical_column_with_identity(
                    col, 
                    EMBEDDING_TABLE_SHAPES[col][0]                    # Input dimension (vocab size)
                ), EMBEDDING_TABLE_SHAPES[col][1]                     # Embedding output dimension
            )
        )
    for col in CONTINUOUS_COLUMNS:
        inputs[col] =  tf.keras.Input(
            name=col,
            dtype=tf.float32,
            shape=(1,)
        )
        features.append(
            tf.feature_column.numeric_column(col, (1,))
        )
    hash_postfix = 'nohash'
else:
    inputs = {}
    features = []

    for col in CATEGORICAL_COLUMNS:
        inputs[col] =  tf.keras.Input(
            name=col,
            dtype=tf.int32,
            shape=(1,)
        )
        features.append(
            tf.feature_column.embedding_column(
                tf.feature_column.categorical_column_with_hash_bucket(
                    col, 
                    int(0.75*EMBEDDING_TABLE_SHAPES[col][0]),            # Input dimension (vocab size)
                    dtype=tf.int64
                ), EMBEDDING_TABLE_SHAPES[col][1]                        # Embedding output dimension
            )
        )
    for col in CONTINUOUS_COLUMNS:
        inputs[col] =  tf.keras.Input(
            name=col,
            dtype=tf.float32,
            shape=(1,)
        )
        features.append(
            tf.feature_column.numeric_column(col, (1,))
        )
    hash_postfix = 'hash'

We initialize the data loader, depending on the data loader type DL_TYPE.

In [12]:
train_dataset_tf, valid_dataset_tf, features = get_dataloader(DL_TYPE, features, HASH)

We can specify to use mixed precision for the calculation.

In [13]:
if AMP:
    from tensorflow.keras.mixed_precision import experimental as mixed_precision
    policy = mixed_precision.Policy("mixed_float16")
    mixed_precision.set_policy(policy)
    amp_postfix = 'amp'
else:
    amp_postfix = 'noamp'

We run the benchmark.

In [15]:
logfilename = DL_TYPE + '_cpu' + str(CPU) + '_' + BENCHMARK_TYPE + '_' + amp_postfix + '_' + hash_postfix + '.log'
if BENCHMARK_TYPE=='time_only_dl':
    os.system('gpustat --watch >> ' + DL_TYPE + '_cpu' + str(CPU) + '_only_dl_' + amp_postfix + '_' + hash_postfix + '.json &')
    run_time, num_steps_done, num_loops = time_only_dl(train_dataset_tf, STEPS)
    log_textfile(logfilename, 'Only Data Loader', 'w')
    log_textfile(logfilename, 'Time: ' + str(run_time), 'a')
    log_textfile(logfilename, 'Throughput: ' + str(BATCH_SIZE*num_steps_done/run_time), 'a')
    os.system('pkill -f "gpustat"')
if BENCHMARK_TYPE=='time_training':
    os.system('gpustat --watch >> ' + DL_TYPE + '_cpu' + str(CPU) + '_training_' + amp_postfix + '_' + hash_postfix + '.json &')
    if CPU:
        with tf.device("/CPU:0"):
            model = get_model(HIDDEN_DIMS, inputs, features, DL_TYPE)
            run_time, num_steps_done, num_loops = time_training(model, train_dataset_tf, STEPS)
    else:
        model = get_model(HIDDEN_DIMS, inputs, features, DL_TYPE)
        run_time, num_steps_done, num_loops = time_training(model, train_dataset_tf, 2)
    log_textfile(logfilename, 'Training', 'w')
    log_textfile(logfilename, 'Time: ' + str(run_time), 'a')
    log_textfile(logfilename, 'Throughput: ' + str(BATCH_SIZE*num_steps_done/run_time), 'a')
    os.system('pkill -f "gpustat"')
if BENCHMARK_TYPE=='convergence_training_loss':
    model = get_model(HIDDEN_DIMS, inputs, features)
    history = model.fit(train_dataset_tf, epochs=EPOCHS*int(STEPS/TRAIN_STEPS), steps_per_epoch=TRAIN_STEPS)
    pickle.dump(history.history, open(logfilename.replace('.log', '.pickle'), 'wb'))

  38/2288 [..............................] - ETA: 5:47 - loss: 0.4904 - auroc: 0.4728

KeyboardInterrupt: 

We define a custom validation callback to add support for `num_steps` for the validation call and a progress bar.

In [15]:
class KerasSequenceValidater(tf.keras.callbacks.Callback):
    # TODO: document
    _supports_tf_logs = True

    def __init__(self, dataloader, STEPS, DL_TYPE):
        self.dataloader = dataloader
        self.steps = STEPS
        self.dl_type = DL_TYPE 

    def on_epoch_end(self, epoch, logs={}):
        if self.dl_type=='NVTabular':
            pbar = trange(len(self.dataloader))
        else:
            pbar = trange(self.steps)
        i = 0
        for X, y_true in self.dataloader:
            y_pred = self.model(X)

            # TODO: how do we want to handle the multi-output case?
            for metric in self.model.metrics:
                metric.update_state(y_true, y_pred)
            pbar.update(1)
            i+=1
            if i>self.steps:
                break
                
        for metric in self.model.metrics:
            logs["val_" + metric.name] = metric.result().numpy()
        return logs

In [None]:
if BENCHMARK_TYPE=='convergence_val_loss':
    model = get_model(HIDDEN_DIMS, inputs, features, DL_TYPE)
    validation_callback = KerasSequenceValidater(valid_dataset_tf, STEPS, DL_TYPE)
    history = model.fit(train_dataset_tf, 
                        epochs=EPOCHS, 
                        steps_per_epoch=STEPS,
                        callbacks=[validation_callback])
    pickle.dump(history.history, open(logfilename.replace('.log', '.pickle'), 'wb'))