# HugeCTR Embedding  Plugin for TensorFlow

This notebook introduces a TensorFlow (TF) plugin for the HugeCTR embedding layer, embedding_plugin, where users may benefit from both the computational efficiency of the HugeCTR embedding layer and the ease of use of TensorFlow (TF).

## What is new ##
- Support `Localized` embedding.
- No need to split DNN model into two sub-models, which means embedding layer can be put inside the scope of MirroredStrategy. 

## Check Docker Container ##
Please make sure that you have started the notebook inside the running NGC docker container: `nvcr.io/nvstaging/merlin/merlin-tensorflow-training:0.5`. Several dynamic libraries have been installed to the system path `/usr/local/hugectr/lib/` that you'll have to load using TensorFlow. For convenient usage, you can directly import `hugectr_tf_ops_v2.py`, where we prepare the codes to load that dynamic library and wrap some operations, in your python script to be used with the embedding_plugin.

## Verify Accuracy ##
To verify whether the embedding_plugin can obtain correct result, you can generate synthetic data for testing purposes as shown below.

In [1]:
# run this cell to clear all variables.
%reset -f

In [2]:
# import tensorflow and some modules
import tensorflow as tf
# do not let TF allocate all GPU memory
devices = tf.config.list_physical_devices("GPU")
for dev in devices:
    tf.config.experimental.set_memory_growth(dev, True)
    
import numpy as np

In [3]:
# import hugectr_tf_ops.py to use embedding_plugin ops
import sys
sys.path.append("../embedding_plugin/Deprecated/python")
import hugectr_tf_ops_v2

[INFO]: loadding from /usr/local/hugectr/lib/libembedding_plugin_v2.so


In [4]:
# generate a random embedding table and show
vocabulary_size = 8
slot_num = 3
embedding_vector_size = 4

table = np.float32([i for i in range(1, vocabulary_size * embedding_vector_size + 1)]).reshape(vocabulary_size, embedding_vector_size)
print("init embedding table value:\n", table)

init embedding table value:
 [[ 1.  2.  3.  4.]
 [ 5.  6.  7.  8.]
 [ 9. 10. 11. 12.]
 [13. 14. 15. 16.]
 [17. 18. 19. 20.]
 [21. 22. 23. 24.]
 [25. 26. 27. 28.]
 [29. 30. 31. 32.]]


In HugeCTR, the corresponding dense shape of the input keys is `[batch_size, slot_num, max_nnz]`, and `0` is a valid key. Therefore, `-1` is used to denote invalid keys, which only occupy that position in the corresponding dense keys matrix.

In [5]:
# generate random keys to lookup from embedding table.
keys = np.array([[[0, -1],   # nnz = 1
                  [1, -1],   # nnz = 1
                  [2,  6]],  # nnz = 2
                 
                 [[0, -1],   # nnz = 1
                  [1, -1],   # nnz = 1
                  [-1, -1]], # nnz = 0
                 
                 [[0, -1],   # nnz = 1
                  [1, -1],   # nnz = 1
                  [6, -1]],  # nnz = 1
                 
                 [[0, -1],   # nnz = 1
                  [1, -1],   # nnz = 1
                  [2, -1]]], # nnz = 1
                dtype=np.int64) 
print("the dense shape of inputs keys:", keys.shape)

the dense shape of inputs keys: (4, 3, 2)


In [6]:
# define a simple forward propagation and backward propagation with embedding_plugin
# NOTE: cause hugectr_tf_ops_v2.init() can only be called once, 
# if you want to run this cell multi-times, please restart the kernel,
# or explicitly release embedding_plugin resources by calling hugectr_tf_ops_v2.reset()

# try release embedding plugin resources.
hugectr_tf_ops_v2.reset()

# hugectr_tf_ops embedding_plugin initialize
hugectr_tf_ops_v2.init(visible_gpus=[0], seed=0, key_type='int64', value_type='float', batch_size=4, batch_size_eval=4)

# create a distributed embedding_layer with embedding_plugin
dis_embedding_name = hugectr_tf_ops_v2.create_embedding(init_value=table, opt_hparams=[0.1, 0.9, 0.99, 1e-3], 
                                          name_='embedding_verification', 
                                          max_vocabulary_size_per_gpu=vocabulary_size,
                                          slot_num=slot_num, embedding_vec_size=embedding_vector_size,
                                          embedding_type='distributed', max_nnz=2)

# create a localized embedding_layer with embedding_plugin
loc_embedding_name = hugectr_tf_ops_v2.create_embedding(init_value=table, opt_hparams=[0.1, 0.9, 0.99, 1e-3],
                                          name_='embedding_verification',
                                          max_vocabulary_size_per_gpu=vocabulary_size, 
                                          slot_num=slot_num, embedding_vec_size=embedding_vector_size,
                                          embedding_type='localized', max_nnz=2, update_type='Global')

# convert dense input keys to COO format
reshape_keys = tf.reshape(keys, [-1, keys.shape[-1]])
indices = tf.where(reshape_keys != -1)
values = tf.gather_nd(reshape_keys, indices)
row_indices = tf.transpose(indices, perm=[1, 0])

# create a Variable used for backward propagation
bp_trigger = tf.Variable(initial_value=1.0, trainable=True, dtype=tf.float32)

with tf.GradientTape(persistent=True) as tape:
    tape.watch(bp_trigger)

    # get distributed embedding forward result
    dis_each_replicas = hugectr_tf_ops_v2.broadcast_then_convert_to_csr(dis_embedding_name, row_indices, values,
                                                                        T = [tf.int32] * 1)
    dis_forward_result = hugectr_tf_ops_v2.fprop(dis_embedding_name, 0, dis_each_replicas, bp_trigger, is_training=True)
    print("Distributed Embedding first forward_result:\n", dis_forward_result, '\n')

    # get localized embedding forward result
    loc_each_replicas = hugectr_tf_ops_v2.broadcast_then_convert_to_csr(loc_embedding_name, row_indices, values, 
                                                                       T = [tf.int32] * 1)
    loc_forward_result = hugectr_tf_ops_v2.fprop(loc_embedding_name, 0, loc_each_replicas, bp_trigger, is_training=True)
    print("Localized Embedding first forward_result:\n", loc_forward_result, '\n')

# compute gradients & update params
dis_grads = tape.gradient(dis_forward_result, bp_trigger)
loc_grads = tape.gradient(loc_forward_result, bp_trigger)
    
# do second forward propagation to check whether embedding table is updated.
dis_forward_result_2 = hugectr_tf_ops_v2.fprop(dis_embedding_name, 0, dis_each_replicas, bp_trigger, is_training=True)
loc_forward_result_2 = hugectr_tf_ops_v2.fprop(loc_embedding_name, 0, loc_each_replicas, bp_trigger, is_training=True)
print("-"*100)
print("Distributed Embedding second forward_result:\n", dis_forward_result_2, '\n')
print("Localized Embedding second forward_result:\n", loc_forward_result_2, '\n')

# explicitly release embedding plugin resources
hugectr_tf_ops_v2.reset()


Distributed Embedding first forward_result:
 tf.Tensor(
[[[ 1.  2.  3.  4.]
  [ 5.  6.  7.  8.]
  [34. 36. 38. 40.]]

 [[ 1.  2.  3.  4.]
  [ 5.  6.  7.  8.]
  [ 0.  0.  0.  0.]]

 [[ 1.  2.  3.  4.]
  [ 5.  6.  7.  8.]
  [25. 26. 27. 28.]]

 [[ 1.  2.  3.  4.]
  [ 5.  6.  7.  8.]
  [ 9. 10. 11. 12.]]], shape=(4, 3, 4), dtype=float32) 

Localized Embedding first forward_result:
 tf.Tensor(
[[[ 1.  2.  3.  4.]
  [ 5.  6.  7.  8.]
  [34. 36. 38. 40.]]

 [[ 1.  2.  3.  4.]
  [ 5.  6.  7.  8.]
  [ 0.  0.  0.  0.]]

 [[ 1.  2.  3.  4.]
  [ 5.  6.  7.  8.]
  [25. 26. 27. 28.]]

 [[ 1.  2.  3.  4.]
  [ 5.  6.  7.  8.]
  [ 9. 10. 11. 12.]]], shape=(4, 3, 4), dtype=float32) 

----------------------------------------------------------------------------------------------------
Distributed Embedding second forward_result:
 tf.Tensor(
[[[ 0.90024936  1.9002494   2.9002495   3.9002495 ]
  [ 4.9002495   5.9002495   6.9002495   7.9002495 ]
  [33.800995   35.800995   37.800995   39.800995  ]]

 [[ 0.90

In [7]:
# similarly, use original tensorflow op to compare whether results are consistent.

# define a tf embedding layer
class EmbeddingLayer(tf.keras.layers.Layer):
    def __init__(self, vocabulary_size, embedding_vec_size,
                init_value):
        super(EmbeddingLayer, self).__init__()
        self.vocabulary_size = vocabulary_size
        self.embedding_vec_size = embedding_vec_size
        self.init_value = init_value
        
    def build(self, _):
        self.Var = self.add_weight(shape=(self.vocabulary_size, self.embedding_vec_size),
                                         initializer=tf.constant_initializer(value=self.init_value))
        
    def call(self, inputs):
        return tf.nn.embedding_lookup_sparse(self.Var, inputs, sp_weights=None, combiner="sum")
    
with tf.GradientTape() as tape:
    # reshape keys into [batch_size * slot_num, max_nnz]
    reshape_keys = np.reshape(keys, newshape=(-1, keys.shape[-1]))
    indices = tf.where(reshape_keys != -1)
    values = tf.gather_nd(reshape_keys, indices)

    # define a layer
    tf_layer = EmbeddingLayer(vocabulary_size, embedding_vector_size, table)
    
    # wrap input keys components into a SparseTensor
    sparse_tensor = tf.sparse.SparseTensor(indices, values, reshape_keys.shape)
    
    tf_forward = tf_layer(sparse_tensor)
    print("tf forward_result:\n", tf.reshape(tf_forward, [keys.shape[0], keys.shape[1], tf_forward.shape[-1]]))
    
    # define an optimizer
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.1, beta_1=0.9, beta_2=0.99, epsilon=1e-3)
    
    # compute gradients & update params
    grads = tape.gradient(tf_forward, tf_layer.trainable_weights)
    optimizer.apply_gradients(zip(grads, tf_layer.trainable_weights))
    
    # do second forward propagation to check whether params are updated.
    tf_forward_2 = tf_layer(sparse_tensor)
    print("\n")
    print("tf second forward_result:\n", tf.reshape(tf_forward_2, [keys.shape[0], keys.shape[1], tf_forward_2.shape[-1]]))

tf forward_result:
 tf.Tensor(
[[[ 1.  2.  3.  4.]
  [ 5.  6.  7.  8.]
  [34. 36. 38. 40.]]

 [[ 1.  2.  3.  4.]
  [ 5.  6.  7.  8.]
  [ 0.  0.  0.  0.]]

 [[ 1.  2.  3.  4.]
  [ 5.  6.  7.  8.]
  [25. 26. 27. 28.]]

 [[ 1.  2.  3.  4.]
  [ 5.  6.  7.  8.]
  [ 9. 10. 11. 12.]]], shape=(4, 3, 4), dtype=float32)


tf second forward_result:
 tf.Tensor(
[[[ 0.90024906  1.9002491   2.900249    3.900249  ]
  [ 4.900249    5.900249    6.900249    7.900249  ]
  [33.800995   35.800995   37.800995   39.800995  ]]

 [[ 0.90024906  1.9002491   2.900249    3.900249  ]
  [ 4.900249    5.900249    6.900249    7.900249  ]
  [ 0.          0.          0.          0.        ]]

 [[ 0.90024906  1.9002491   2.900249    3.900249  ]
  [ 4.900249    5.900249    6.900249    7.900249  ]
  [24.900497   25.900497   26.900497   27.900497  ]]

 [[ 0.90024906  1.9002491   2.900249    3.900249  ]
  [ 4.900249    5.900249    6.900249    7.900249  ]
  [ 8.900497    9.900497   10.900497   11.900497  ]]], shape=(4, 3, 4)

In [8]:
# assert whether embedding_plugin's results are consistent with tensorflow original ops

# verify first forward results consistency
dis_first_forward_consistent = np.allclose(dis_forward_result.numpy(), 
                                tf.reshape(tf_forward, [keys.shape[0], keys.shape[1], tf_forward.shape[-1]]).numpy())
loc_first_forward_consistent = np.allclose(loc_forward_result.numpy(),
                                tf.reshape(tf_forward, [keys.shape[0], keys.shape[1], tf_forward.shape[-1]]).numpy())
print("Consistent in first forward propagation for both Distributed & Localized Embedding?", 
     (dis_first_forward_consistent and loc_first_forward_consistent))

# verify second forward results consistency
dis_second_forward_consistent = np.allclose(dis_forward_result_2.numpy(), 
                                tf.reshape(tf_forward_2, [keys.shape[0], keys.shape[1], tf_forward_2.shape[-1]]))
loc_second_forward_consistent = np.allclose(loc_forward_result_2.numpy(),
                                tf.reshape(tf_forward_2, [keys.shape[0], keys.shape[1], tf_forward_2.shape[-1]]))
print("Consistent in second forward propagation for both Distributed & Localized Embedding?", 
      (dis_second_forward_consistent and loc_second_forward_consistent))

Consistent in first forward propagation for both Distributed & Localized Embedding? True
Consistent in second forward propagation for both Distributed & Localized Embedding? True


The results from embedding_plugins and original TF ops are consistent in both first and second forward propagation for both `Distributed Embedding` and `Localized Embedding`, which means the embedding_plugin can get the same forward result and perform the same backward propagation as TF ops. Therefore, the embedding_plugin can obtain correct results.

## DeepFM demo ##
In this notebook, TF 2.x is used to build the DeepFM model.

### Define Models with the Embedding_Plugin ###

In [9]:
# first, import tensorflow and import plugin ops from hugectr_tf_ops_v2.py
import tensorflow as tf
# do not let TF allocate all GPU memory
devices = tf.config.list_physical_devices("GPU")
for dev in devices:
    tf.config.experimental.set_memory_growth(dev, True)
import sys
sys.path.append("../embedding_plugin/Deprecated/python")
import hugectr_tf_ops_v2

In [10]:
# define TF layers
class Multiply(tf.keras.layers.Layer):
    def __init__(self, out_units):
        super(Multiply, self).__init__()
        self.out_units = out_units

    def build(self, input_shape):
        self.w = self.add_weight(name='weight_vector', shape=(input_shape[1], self.out_units),
                                 initializer='glorot_uniform', trainable=True)
    
    def call(self, inputs):
        return inputs * self.w

In [11]:
# build DeepFM with plugin ops
class DeepFM_PluginEmbedding(tf.keras.models.Model):
    def __init__(self, 
                 vocabulary_size, 
                 embedding_vec_size,
                 dropout_rate, # list of float
                 deep_layers, # list of int
                 initializer,
                 gpus,
                 batch_size,
                 batch_size_eval,
                 embedding_type = 'localized',
                 slot_num=1,
                 seed=123):
        super(DeepFM_PluginEmbedding, self).__init__()
        tf.keras.backend.clear_session()
        tf.compat.v1.set_random_seed(seed)

        self.vocabulary_size = vocabulary_size
        self.embedding_vec_size = embedding_vec_size
        self.dropout_rate = dropout_rate
        self.deep_layers = deep_layers
        self.gpus = gpus
        self.batch_size = batch_size
        self.batch_size_eval = batch_size_eval 
        self.slot_num = slot_num
        self.embedding_type = embedding_type

        if isinstance(initializer, str):
            initializer = False
            
        # when building model with embedding_plugin ops, init() should be called prior to any other ops.
        hugectr_tf_ops_v2.init(visible_gpus=gpus, seed=seed, key_type='int64', value_type='float', 
                        batch_size=batch_size, batch_size_eval=batch_size_eval)
        
        # create a embedding_plugin layer
        self.embedding_name = hugectr_tf_ops_v2.create_embedding(init_value=initializer, name_='hugectr_embedding',
                                            embedding_type=embedding_type, optimizer_type='Adam',
                                            max_vocabulary_size_per_gpu=(self.vocabulary_size // len(self.gpus)) + 1,
                                            opt_hparams=[0.1, 0.9, 0.99, 1e-5], update_type='Local',
                                            atomic_update=True, scaler=1.0, slot_num=self.slot_num, 
                                            max_nnz=1, max_feature_num=1*self.slot_num, 
                                            embedding_vec_size=self.embedding_vec_size + 1, combiner='sum')
        
        # other layers with TF original ops
        self.deep_dense = []
        for i, deep_units in enumerate(self.deep_layers):
            self.deep_dense.append(tf.keras.layers.Dense(units=deep_units, activation=None, use_bias=True,
                                                         kernel_initializer='glorot_normal', 
                                                         bias_initializer='glorot_normal'))
            self.deep_dense.append(tf.keras.layers.Dropout(dropout_rate[i]))
        self.deep_dense.append(tf.keras.layers.Dense(units=1, activation=None, use_bias=True,
                                                     kernel_initializer='glorot_normal',
                                                     bias_initializer=tf.constant_initializer(0.01)))
        self.add_layer = tf.keras.layers.Add()
        self.y_act = tf.keras.layers.Activation(activation='sigmoid')

        self.dense_multi = Multiply(1)
        self.dense_embedding = Multiply(self.embedding_vec_size)

        self.concat_1 = tf.keras.layers.Concatenate()
        self.concat_2 = tf.keras.layers.Concatenate()
        
    def build(self, _):
        self.bp_trigger = self.add_weight(name='bp_trigger', shape=(1,), dtype=tf.float32, trainable=True)

    @tf.function
    def call(self, dense_feature, each_replica, training=True):
        """
        forward propagation.
        #arguments:
            dense_feature: [batch_size, dense_dim]
        """
        with tf.name_scope("embedding_and_slice"):
            dense_0 = tf.cast(tf.expand_dims(dense_feature, 2), dtype=tf.float32) # [batchsize, dense_dim, 1]
            dense_mul = self.dense_multi(dense_0) # [batchsize, dense_dim, 1]
            dense_emb = self.dense_embedding(dense_0) # [batchsize, dense_dim, embedding_vec_size]
            dense_mul = tf.reshape(dense_mul, [dense_mul.shape[0], -1]) # [batchsize, dense_dim * 1]
            dense_emb = tf.reshape(dense_emb, [dense_emb.shape[0], -1]) # [batchsize, dense_dim * embedding_vec_size]

            sparse = hugectr_tf_ops_v2.fprop(self.embedding_name, 0, #replica_ctx.replica_id_in_sync_group,
                                                each_replica, self.bp_trigger, is_training=training) # [batch_size, self.slot_num, self.embedding_vec_size + 1]
            
            sparse_1 = tf.slice(sparse, [0, 0, self.embedding_vec_size], [-1, self.slot_num, 1]) #[batchsize, slot_num, 1]
            sparse_1 = tf.squeeze(sparse_1, 2) # [batchsize, slot_num]

            sparse_emb = tf.slice(sparse, [0, 0, 0], [-1, self.slot_num, self.embedding_vec_size]) #[batchsize, slot_num, embedding_vec_size]
            sparse_emb = tf.reshape(sparse_emb, [-1, self.slot_num * self.embedding_vec_size]) #[batchsize, slot_num * embedding_vec_size]
        
        with tf.name_scope("FM"):
            with tf.name_scope("first_order"):
                first = self.concat_1([dense_mul, sparse_1]) # [batchsize, dense_dim + slot_num]
                first_out = tf.reduce_sum(first, axis=-1, keepdims=True) # [batchsize, 1]
                
            with tf.name_scope("second_order"):
                hidden = self.concat_2([dense_emb, sparse_emb]) # [batchsize, (dense_dim + slot_num) * embedding_vec_size]
                second = tf.reshape(hidden, [-1, dense_feature.shape[1] + self.slot_num, self.embedding_vec_size])
                square_sum = tf.math.square(tf.math.reduce_sum(second, axis=1, keepdims=True)) # [batchsize, 1, embedding_vec_size]
                sum_square = tf.math.reduce_sum(tf.math.square(second), axis=1, keepdims=True) # [batchsize, 1, embedding_vec_size]
                
                second_out = 0.5 * (sum_square - square_sum) # [batchsize, 1, embedding_vec_size]
                second_out = tf.math.reduce_sum(second_out, axis=-1, keepdims=False) # [batchsize, 1]
                
        with tf.name_scope("Deep"):
            for i, layer in enumerate(self.deep_dense):
                if i % 2 == 0: # dense
                    hidden = layer(hidden)
                else: # dropout
                    hidden = layer(hidden, training)

        y = self.add_layer([hidden, first_out, second_out])
        y = self.y_act(y) # [batchsize, 1]

        return y
    
    @property
    def get_embedding_name(self):
        return self.embedding_name

The above cells use embedding plugin ops and TF layers to define a TF DeepFM model. Similarly, define an embedding layer with TF original ops, and define a DeepFM model with that layer. Because embedding_plugin supports model parallelism, the parameters of the original TF embedding layer are equally distributed to each GPU for a fair performance comparison.

### Define Models with the Original TF Ops ###

In [12]:
# define a TF embedding layer with TF original ops
class OriginalEmbedding(tf.keras.layers.Layer):
    def __init__(self, 
                 vocabulary_size,
                 embedding_vec_size,
                 initializer='uniform',
                 combiner="sum",
                 gpus=[0]):
        super(OriginalEmbedding, self).__init__()

        self.vocabulary_size = vocabulary_size
        self.embedding_vec_size = embedding_vec_size 
        if isinstance(initializer, str):
            self.initializer = tf.keras.initializers.get(initializer)
        else:
            self.initializer = initializer
        if combiner not in ["sum", "mean"]:
            raise RuntimeError("combiner must be one of \{'sum', 'mean'\}.")
        self.combiner = combiner
        if (not isinstance(gpus, list)) and (not isinstance(gpus, tuple)):
            raise RuntimeError("gpus must be a list or tuple.")
        self.gpus = gpus

    def build(self, _):
        if isinstance(self.initializer, tf.keras.initializers.Initializer):
            if len(self.gpus) > 1:
                self.embeddings_params = list()
                mod_size = self.vocabulary_size % len(self.gpus)
                vocabulary_size_each_gpu = [(self.vocabulary_size // len(self.gpus)) + (1 if dev_id < mod_size else 0)
                                            for dev_id in range(len(self.gpus))]

                for i, gpu in enumerate(self.gpus):
                    with tf.device("/gpu:%d" %gpu):
                        params_i = self.add_weight(name="embedding_" + str(gpu), 
                                                   shape=(vocabulary_size_each_gpu[i], self.embedding_vec_size),
                                                   initializer=self.initializer)
                    self.embeddings_params.append(params_i)

            else:
                self.embeddings_params = self.add_weight(name='embeddings', 
                                                        shape=(self.vocabulary_size, self.embedding_vec_size),
                                                        initializer=self.initializer)
        else:
            self.embeddings_params = self.initializer

    @tf.function
    def call(self, keys, output_shape):
        result = tf.nn.embedding_lookup_sparse(self.embeddings_params, keys, 
                                             sp_weights=None, combiner=self.combiner)
        return tf.reshape(result, output_shape)

In [13]:
# define DeepFM model with original TF embedding layer
class DeepFM_OriginalEmbedding(tf.keras.models.Model):
    def __init__(self, 
                 vocabulary_size, 
                 embedding_vec_size,
                 dropout_rate, # list of float
                 deep_layers, # list of int
                 initializer,
                 gpus,
                 batch_size,
                 batch_size_eval,
                 embedding_type = 'localized',
                 slot_num=1,
                 seed=123):
        super(DeepFM_OriginalEmbedding, self).__init__()
        tf.keras.backend.clear_session()
        tf.compat.v1.set_random_seed(seed)

        self.vocabulary_size = vocabulary_size
        self.embedding_vec_size = embedding_vec_size
        self.dropout_rate = dropout_rate
        self.deep_layers = deep_layers
        self.gpus = gpus
        self.batch_size = batch_size
        self.batch_size_eval = batch_size_eval 
        self.slot_num = slot_num
        self.embedding_type = embedding_type

        self.original_embedding_layer = OriginalEmbedding(vocabulary_size=vocabulary_size, 
                                            embedding_vec_size=embedding_vec_size + 1, 
                                            initializer=initializer, gpus=gpus)
        self.deep_dense = []
        for i, deep_units in enumerate(self.deep_layers):
            self.deep_dense.append(tf.keras.layers.Dense(units=deep_units, activation=None, use_bias=True,
                                                         kernel_initializer='glorot_normal', 
                                                         bias_initializer='glorot_normal'))
            self.deep_dense.append(tf.keras.layers.Dropout(dropout_rate[i]))
        self.deep_dense.append(tf.keras.layers.Dense(units=1, activation=None, use_bias=True,
                                                     kernel_initializer='glorot_normal',
                                                     bias_initializer=tf.constant_initializer(0.01)))
        self.add_layer = tf.keras.layers.Add()
        self.y_act = tf.keras.layers.Activation(activation='sigmoid')

        self.dense_multi = Multiply(1)
        self.dense_embedding = Multiply(self.embedding_vec_size)

        self.concat_1 = tf.keras.layers.Concatenate()
        self.concat_2 = tf.keras.layers.Concatenate()

    @tf.function
    def call(self, dense_feature, sparse_feature, training=True):
        """
        forward propagation.
        #arguments:
            dense_feature: [batch_size, dense_dim]
            sparse_feature: for OriginalEmbedding, it is a SparseTensor, and the dense shape is [batch_size * slot_num, max_nnz];
                            for PluginEmbedding, it is a list of [row_offsets, value_tensors, nnz_array]. 
        """
        with tf.name_scope("embedding_and_slice"):
            dense_0 = tf.cast(tf.expand_dims(dense_feature, 2), dtype=tf.float32) # [batchsize, dense_dim, 1]
            dense_mul = self.dense_multi(dense_0) # [batchsize, dense_dim, 1]
            dense_emb = self.dense_embedding(dense_0) # [batchsize, dense_dim, embedding_vec_size]
            dense_mul = tf.reshape(dense_mul, [dense_mul.shape[0], -1]) # [batchsize, dense_dim * 1]
            dense_emb = tf.reshape(dense_emb, [dense_emb.shape[0], -1]) # [batchsize, dense_dim * embedding_vec_size]

            sparse = self.original_embedding_layer(sparse_feature, output_shape=[-1, self.slot_num, self.embedding_vec_size + 1])

            sparse_1 = tf.slice(sparse, [0, 0, self.embedding_vec_size], [-1, self.slot_num, 1]) #[batchsize, slot_num, 1]
            sparse_1 = tf.squeeze(sparse_1, 2) # [batchsize, slot_num]

            sparse_emb = tf.slice(sparse, [0, 0, 0], [-1, self.slot_num, self.embedding_vec_size]) #[batchsize, slot_num, embedding_vec_size]
            sparse_emb = tf.reshape(sparse_emb, [-1, self.slot_num * self.embedding_vec_size]) #[batchsize, slot_num * embedding_vec_size]
        
        with tf.name_scope("FM"):
            with tf.name_scope("first_order"):
                first = self.concat_1([dense_mul, sparse_1]) # [batchsize, dense_dim + slot_num]
                first_out = tf.reduce_sum(first, axis=-1, keepdims=True) # [batchsize, 1]
                
            with tf.name_scope("second_order"):
                hidden = self.concat_2([dense_emb, sparse_emb]) # [batchsize, (dense_dim + slot_num) * embedding_vec_size]
                second = tf.reshape(hidden, [-1, dense_feature.shape[1] + self.slot_num, self.embedding_vec_size])
                square_sum = tf.math.square(tf.math.reduce_sum(second, axis=1, keepdims=True)) # [batchsize, 1, embedding_vec_size]
                sum_square = tf.math.reduce_sum(tf.math.square(second), axis=1, keepdims=True) # [batchsize, 1, embedding_vec_size]
                
                second_out = 0.5 * (sum_square - square_sum) # [batchsize, 1, embedding_vec_size]
                second_out = tf.math.reduce_sum(second_out, axis=-1, keepdims=False) # [batchsize, 1]
                
        with tf.name_scope("Deep"):
            for i, layer in enumerate(self.deep_dense):
                if i % 2 == 0: # dense
                    hidden = layer(hidden)
                else: # dropout
                    hidden = layer(hidden, training)

        y = self.add_layer([hidden, first_out, second_out])
        y = self.y_act(y) # [batchsize, 1]

        return y

Dataset is needed to use these models for training. [Kaggle Criteo datasets](http://labs.criteo.com/2014/02/kaggle-display-advertising-challenge-dataset/) provided by CriteoLabs is used as the training dataset. The original training set contains 45,840,617 examples. Each example contains a label (0 by default or 1 if the ad was clicked) and 39 features in which 13 of them are integer and the other 26 are categorial. Since TFRecord is suitable for the training process and the Criteo dataset is missing numerous values across the feature columns, preprocessing is needed. The original test set won't be used because it doesn't contain labels.

### Dataset processing ###
1. Download dataset from [https://ailab.criteo.com/download-criteo-1tb-click-logs-dataset/](http://azuremlsampleexperiments.blob.core.windows.net/criteo/day_1.gz).
2. Extract the dataset by running the following command. 
    ```shell
    $ gunzip day_1.gz
    ```
    
3. The whole dataset is too large, so get a subset with
    ```shell
    $ head -n 45840617 day_1 > train.txt
    ```
4. Preprocess the datast and set missing values.
Preprocessing functions are defined in [preprocess.py](../embedding_plugin/Deprecated/performance_profile/preprocess.py). Open that file and check the codes.

In [None]:
# specify source csv name and output csv name, run this command will do the preprocessing.
# Warning: this command will take serveral hours to do preprocessing.
%run ../embedding_plugin/Deprecated/performance_profile/preprocess.py \
    --src_csv_path=../tools/embedding_plugin/train.txt \
    --dst_csv_path=../tools/embedding_plugin/train.out.txt \
    --normalize_dense=0 --feature_cross=0

5. Split the dataset by running the following commands:
```shell
$ head -n 36672493 train.out.txt > train
$ tail -n 9168124 train.out.txt > valtest
$ head -n 4584062 valtest > val
$ tail -n 4584062 valtest > test
```

6. Convert the dataset to a TFRecord file. Converting functions are defined in [txt2tfrecord.py](../embedding_plugin/Deprecated/performance_profile/txt2tfrecord.py). Open that file and check the codes.
After the data preprocessing is completed, *.tfrecord file(s) will be generated, which can be used for training. The training loop can now be configured to use the dataset and models to perform the training.

In [None]:
# specify source name and output tfrecord name, run this command will do the converting.
# Warning: this command will take half an hour to do converting.
%run ../embedding_plugin/Deprecated/performance_profile/txt2tfrecord.py \
    --src_txt_name=train \
    --dst_tfrecord_name=train.tfrecord \
    --normalized=0 --use_multi_process=1 \
    --shard_num=1 
    # if multi tfrecord files are wanted, set shard_num to the number of files.

### Define training loop and do training ###
In [read_data.py](../tools/embedding_plugin/performance_profile/read_data.py), some preprocessing and TF data reading pipeline creation functions are defined.

In [14]:
# set env path, so that some modules can be imported
sys.path.append("../embedding_plugin/Deprecated/performance_profile/")

import txt2tfrecord as utils
from read_data import CreateDataset
import time
import logging
logging.basicConfig(format='%(asctime)s %(message)s')
logging.root.setLevel('INFO')

[INFO]: loadding from /usr/local/hugectr/lib/libembedding_plugin.so


In [15]:
# choose wich model for training
which_model = "Plugin" # change it to "Original", if you want to try the model define with original tf ops.

In [16]:
# set some hyper parameters for training process
if ("Plugin" == which_model):
    batch_size = 16384
    n_epochs = 1
    distribute_keys = 1 
    gpus = [0] # use GPU0
    embedding_type = 'distributed'
    vocabulary_size = 1737710
    embedding_vec_size = 10
    slot_num = 26
    batch_size_eval = 1 * len(gpus)
    
elif ("Original" == which_model):
    batch_size = 16384
    n_epochs = 1
    distribute_keys = 0
    gpus = [0] # use GPU0
    vocabulary_size = 1737710
    embedding_vec_size = 10
    slot_num = 26
    batch_size_eval = 1 * len(gpus)
    embedding_type = 'distributed'

In [17]:
# define feature_description to read tfrecord examples.
cols = [utils.idx2key(idx, False) for idx in range(0, utils.NUM_TOTAL_COLUMNS)]
feature_desc = dict()
for col in cols:
    if col == 'label' or col.startswith("I"):
        feature_desc[col] = tf.io.FixedLenFeature([], tf.int64) # scaler
    else: 
        feature_desc[col] = tf.io.FixedLenFeature([1], tf.int64) # [slot_num, nnz]

In [18]:
# please set data_path to your tfrecord
data_path = r"./"

In [19]:
# create tfrecord reading pipeling
dataset_names = [data_path + "./train.tfrecord"]
dataset = CreateDataset(dataset_names=dataset_names,
                       feature_desc=feature_desc,
                       batch_size=batch_size,
                       n_epochs=n_epochs,
                       slot_num=slot_num,
                       max_nnz=1,
                       convert_to_csr=False,
                       gpu_count=len(gpus),
                       embedding_type=embedding_type,
                       get_row_indices=True)()

In [20]:
# define loss function and optimizer used in other TF layers.
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_fn = tf.keras.losses.BinaryCrossentropy(from_logits=False)

In [21]:
# create model instance
if "Original" == which_model:
    model = DeepFM_OriginalEmbedding(vocabulary_size=vocabulary_size, embedding_vec_size=embedding_vec_size, 
                                     embedding_type=embedding_type,
                                     dropout_rate=[0.5] * 10, deep_layers=[1024] * 10,
                                     initializer='uniform', gpus=gpus, batch_size=batch_size, 
                                     batch_size_eval=batch_size_eval,
                                     slot_num=slot_num)
elif "Plugin" == which_model:
    hugectr_tf_ops_v2.reset()
    model = DeepFM_PluginEmbedding(vocabulary_size=vocabulary_size, embedding_vec_size=embedding_vec_size, 
                                   embedding_type=embedding_type,
                                   dropout_rate=[0.5] * 10, deep_layers=[1024] * 10,
                                   initializer='uniform', gpus=gpus, batch_size=batch_size, 
                                   batch_size_eval=batch_size_eval,
                                   slot_num=slot_num)

In [22]:
# define training step
@tf.function
def _train_step(dense_batch, sparse_batch, y_batch, model, loss_fn, optimizer):
    with tf.GradientTape() as tape:
        y_batch = tf.cast(y_batch, dtype=tf.float32)
        logits = model(dense_batch, sparse_batch, training=True)
        loss = loss_fn(y_batch, logits)
        loss /= dense_batch.shape[0]
    grads = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    return loss

In [23]:
# training loop
logging.info("begin to train")
begin_time = time.time()
display_begin = begin_time
for step, datas in enumerate(dataset):
    label, dense, others = datas[0], datas[1], datas[2:]
    if "Original" == which_model:
        sparse = others[-1]
    elif "Plugin" == which_model:
        sparse = others[0:2]
        sparse = hugectr_tf_ops_v2.broadcast_then_convert_to_csr(model.get_embedding_name, 
                                                                row_indices=sparse[0], values=sparse[1], 
                                                                T=[tf.int32]*len(gpus))
    train_loss = _train_step(dense, sparse, label, model, loss_fn, optimizer)
    loss_value = train_loss.numpy()
    
    if (step % 100 == 0 and step != 0):
        display_end = time.time()
        logging.info("step: %d, loss: %.7f, elapsed time: %.5f seconds." %(step, loss_value, (display_end - display_begin)))
        display_begin = display_end
        
end_time = time.time()
logging.info("Train End. Elapsed Time: %.3f seconds." %(end_time - begin_time))

2021-04-12 06:48:17,223 begin to train
2021-04-12 06:48:31,223 step: 100, loss: 0.0002303, elapsed time: 13.99873 seconds.
2021-04-12 06:48:43,373 step: 200, loss: 0.0002431, elapsed time: 12.14997 seconds.
2021-04-12 06:48:55,509 step: 300, loss: 0.0006980, elapsed time: 12.13682 seconds.
2021-04-12 06:49:07,648 step: 400, loss: 0.0006907, elapsed time: 12.13814 seconds.
2021-04-12 06:49:19,799 step: 500, loss: 0.0007016, elapsed time: 12.15193 seconds.
2021-04-12 06:49:31,941 step: 600, loss: 0.0006234, elapsed time: 12.14199 seconds.
2021-04-12 06:49:44,084 step: 700, loss: 0.0005613, elapsed time: 12.14260 seconds.
2021-04-12 06:49:56,225 step: 800, loss: 0.0005893, elapsed time: 12.14110 seconds.
2021-04-12 06:50:08,376 step: 900, loss: 0.0006023, elapsed time: 12.15081 seconds.
2021-04-12 06:50:20,537 step: 1000, loss: 0.0005908, elapsed time: 12.16115 seconds.
2021-04-12 06:50:32,685 step: 1100, loss: 0.0002340, elapsed time: 12.14798 seconds.
2021-04-12 06:50:44,836 step: 1200,

In this configuration, `tf.data.Dataset` produces training data slowly, which makes the whole training process slow. Therefore, the training elapsed time for `Original` and `Plugin` are similar.

## API signature ##
All embedding_plugin APIs are defined in [hugectr_tf_ops_v2.py](../embedding_plugin/Deprecated/python/hugectr_tf_ops_v2.py).

Embedding_plugin takes `COO (Coordinate)` format as input format when `fprop` is used. <br>
In some cases, `fprop_experimental` can get better performance than `fprop`, but it is not stable. <br>
If `fprop_experimental` is used, input data format should be `CSR (Compressed Sparse Row)`. 

For more detail about how to convert your input data to `CSR` or `COO` format, please refer to [samples/format_processing.py](../embedding_plugin/Deprecated/samples/format_processing.py). <br>
For more code samples, please refer to [samples/sample_with_fprop*.py](../embedding_plugin/Deprecated/samples/sample_with_fprop.py).

In [24]:
%%html
<style>
table {float:left}
</style>

---
---
+ ```python
  init(visible_gpus, seed=0, key_type='int64', value_type='float', batch_size=1, batch_size_eval=1)
  ```
  
This function is used to create resource manager, which manages resources used by embedding_plugin.
**IMPORTANT:** This function can only be called once. It must be called before any other embedding_plugin API is called. Currently, only key_type='int64', value_type='float' has been tested.


| Args ||
| :-----| :---- |
| visible_gpus | list of integers, used to specify which gpus will be used by embedding_plugin. |
| seed | integer, the initializer random seed for embedding_plugin. |
| key_type| string, can be one of {'uint32', 'int64'}. Used to specify the input keys data type. |
| value_type| string, can be one of {'float', 'half'}. Used to specify the data type of embedding_plugin forward result. |
| batch_size| integer, batch_size used in training process. |
| batch_size_eval| integer, batch_size used in evaluation process. |


---
---
+ ```python
  reset()
  ```
  
This function is used to explicitly release the resources created by embedding plugin. You can call this function multiple times, and it will not throw any error.

---
---
+ ```python
  embedding_name = create_embedding(init_value, name_='hugectr_embedding', embedding_type='localized',
                                     optimizer_type='Adam', max_vocabulary_size_per_gpu=1, slot_size_array=[],
                                     opt_hparams=[0.001], update_type='Local', atomic_update=true, scaler=1.0,
                                     slot_num=1, max_nnz=1, max_feature_num=1000, embedding_vec_size=1,
                                     combiner='sum')
  ```
  
| Args ||
| :-----| :---- |
|init_value| can be a `boolean` or a 2-D matrix with `dtype=tf.float32`. When it is `bool`, parameters will be randomly initialized. When it is a 2-D matrix with `dtype=tf.float32`, that matrix will be used to initialize embedding table parameters, and the matrix's row-index will be deemed to be key of the embedding table.|
|name_|string, the name of this embedding layer. If `name_` is unique, then it will be used as the embedding layer name, otherwise, numerical suffix will be automatically added to `name_` to form an unique name for this embedding layer. |
|embedding_type| string, can be one of {'localized', 'distributed'}. |
| optimizer_type| string, can be one of {'Adam', 'MomentumSGD', 'Nesterov', 'SGD'}. | 
|max_vocabulary_size_per_gpu| integer, used to allocate GPU memory spaces for embedding layer.|
|slot_size_array| list of integers, used to allocate GPU memory spaces precisely for embedding layer.|
|opt_hparams| list of floats, used to specify hyper parameters for optimizer.<br>For `Adam`, `opt_hparams` must be a list of `[learning_rate, beta1, beta2, epsilon]`.<br>For `MomentumSGD`, `opt_hparams` must be a list of `[learning_rate, momentum_factor]`.<br>For `Nesterov`, `opt_hparams` must be a list of `[learning_rate, momentum_factor]`.<br>For `SGD`, `opt_hparams` must be a list of `[learning_rate]`.|
|update_type| string, can be one of {'Local', 'Global', 'LazyGlobal'}. |
|atomic_update| bool, only used in `SGD` optimizer. |
|scaler| float, can be one of {1.0, 128.0, 256.0, 512.0, 1024.0}, used in `mixed_precission` training. |
|slot_num| integer, how many slots (feature-fields) are unified in a single embedding layer. |
|max_nnz| integer, the number of valid keys in a single slot.|
|max_feature_num| integer, the number of valid keys in a single input sample.|
|embedding_vec_size| integer, the embedding vector size of this embedding layer.|
|combier|string, can be one of {'mean', 'sum'}. specify how to combine different embedding vector in the same slot.|

|Returns||
|:----| :---- |
|embedding_name| tf.Tensor, dtype=tf.string. An unique name for this embedding layer.|



---
---
+ ```python
  each_replica = broadcast_then_convert_to_csr(embedding_name, row_indices, values, T, is_training=True)
  ```
  
This function take `COO` format as input on only one device. It will broadcast its input: `row_indices` and `values` to multiple GPUs, and then converting to `CSR` format will be conducted on each GPU independetely. This function **Must Not** be put inside of the scope of `MirroredStrategy` to avoid being called mutiple times in each iteration. After this function is called, `fprop` can be used to do the embedding forward propagation. 

For code sample, please refer to [samples/sample_with_fprop.py](../tools/embedding_plugin/samples/sample_with_fprop.py).

|Args||
|:----|:----|
|embedding_name| tf.Tensor with `dtype=tf.string`, use which embedding layer.|
|row_indices| 1-D tf.Tensor with `dtype=tf.int64`. It denotes the `row indices` of the `COO` format input data for all GPUs.|
|values| 1-D tf.Tensor with `dtype=tf.int64`. It denotes the `valid values` of the `COO` format input data for all GPUs.|
| T | A list of `tf.int32`, whose length must be equal to the number of GPUs.|

|Returns||
|:----|:----|
|each_replica| A list of tf.Tensor with `dtype=T`. Used as a placeholder to avoid this function being removed by `tf.Graph`.|

---
---
+ ```python
  replica_forward = fprop(embedding_name, replica_id, to_each_replica, bp_trigger, is_training=True)
  ```
  
This function can be used to do forward propagation for `Distributed` and `Localized` embedding layers. If multiple GPUs are used, this function must be put inside the scope of `MirroredStrategy` to do forward propagation on each GPU. If this function is used, `broadcast_then_convert_to_csr()` must be called before the calling of this function.

For code sample, please refer to [samples/sample_with_fprop.py](../tools/embedding_plugin/samples/sample_with_fprop.py).

|Args||
|:----|:----|
|embedding_name| tf.Tensor with `dtype=tf.string`, use which embedding layer.|
|replica_id|tf.Tensor with `dtype=tf.int32`, denotes the replica id in the group of GPUs. Can be obtained with `tf.distribute.get_replica_context().replica_id_in_sync_group`|
|to_each_replica| Scalar tf.Tensor with `dtype=tf.int32`, the output from `broadcast_then_convert_to_csr()`|
|bp_trigger| tf.Variable(dtype=tf.float32), used to automatically trigger back propagation of the embedding layer.|
|is_training| `boolean`, specify whether use `training` resources or `evaluation` resources.|

|Returns||
|:----|:----|
|replica_forward| tf.Tensor with `shape=[replica_batch_size, slot_num, embedding_vec_size]` for one GPU.

---
---
+ ```python
  replica_forward = fprop_experimental(embedding_name, replica_id, row_offset, values, nnz, bp_trigger, is_training=True, input_buffer_reset=False)
  ```
  
This function can be used to do forward propagation for `Distributed` and `Localized` embedding layers. If multiple GPUs are used, this function must be put inside the scope of `MirroredStrategy` to do forward propagation on each GPU. It only accept input data for one GPU, and produce the corresponding forward result for that GPU. And it takes `CSR` format as input.

In some cases, this function can get better performance than `fprop`, but it is not stable. <br>
For code sample, please refer to [samples/sample_with_fprop_experimental.py](../tools/embedding_plugin/samples/sample_with_fprop_experimental.py).

|Args||
|:----|:----|
|embedding_name| tf.Tensor with `dtype=tf.string`, use which embedding layer. |
|replica_id| tf.Tensor with `dtype=tf.int32`, denotes the replica id in the group of GPUs. Can be obtained with `tf.distribute.get_replica_context().replica_id_in_sync_group`|
|row_offset| 1-D tf.Tensor with `dtype=tf.int64`. It denotes the `row offset` of the `CSR` format input data for one GPU.|
|values| 1-D tf.Tensor with `dtype=tf.int64`. It denotes the `valid value` of the `CSR` format input data for one GPU.|
|nnz| Scalar tf.Tensor with `dtype=tf.int64`. It denotes the length of `values` of the `CSR` format input data for one GPU.|
|bp_trigger|tf.Variable(dtype=tf.float32), used to automatically trigger back propagation of the embedding layer.|
|is_training| `boolean`, specify whether use `training` resources or `evaluation` resources.|
|input_buffer_reset| `boolean`, whether to reset the input buffer in each iteration.|

|Returns||
|:----|:----|
|replica_forward| tf.Tensor with `shape=[replica_batch_size, slot_num, embedding_vec_size]` for one GPU.

---
---
+ ```python
  save(embedding_name, save_name)
  ```

This function is used to save the `embedding_plugin` parameters in the file.

|Args||
|:----| :---- |
|embedding_name| tf.Tensor with `dtype=tf.string`, save which embedding layer's parameter to file.|
|save_name| string, the name of saved parameters.|

---
---
+ ```python
  restore(embedding_name, file_name)
  ```
  
This function is used to restore the `embedding_plugin` parameters from file.

|Args||
|:----| :---- |
|embedding_name| tf.Tensor with `dtype=tf.string`, restore parameters for which embedding layer.|
|file_name| string, restore paramters from this file. |