In [None]:
from __future__ import absolute_import, division, print_function, unicode_literals

# miscellaneous
import bisect
import builtins
import shutil
import time

# numpy
import numpy as np

# onnx
import onnx

# pytorch
import torch
import torch.nn as nn
from numpy import random as ra
from torch.nn.parallel.parallel_apply import parallel_apply
from torch.nn.parallel.replicate import replicate
from torch.nn.parallel.scatter_gather import gather, scatter

### import packages ###
import sys
import os
import io
import collections
import argparse


import os.path
from io import StringIO
from os import path


from torchviz import make_dot
import torch.nn.functional as Functional
from torch.nn.parameter import Parameter


# others
import collections

from numpy import random as ra

In [None]:
exc = getattr(builtins, "IOError", "FileNotFoundError")

In [None]:
### define dlrm in PyTorch ###
class DLRM_Net(nn.Module):
    def create_mlp(self, ln, sigmoid_layer):
        # build MLP layer by layer
        layers = nn.ModuleList()
        for i in range(0, ln.size - 1):
            n = ln[i]
            m = ln[i + 1]

            # construct fully connected operator
            LL = nn.Linear(int(n), int(m), bias=True)

            # initialize the weights
            # with torch.no_grad():
            # custom Xavier input, output or two-sided fill
            mean = 0.0  # std_dev = np.sqrt(variance)
            std_dev = np.sqrt(2 / (m + n))  # np.sqrt(1 / m) # np.sqrt(1 / n)
            W = np.random.normal(mean, std_dev, size=(m, n)).astype(np.float32)
            std_dev = np.sqrt(1 / m)  # np.sqrt(2 / (m + 1))
            bt = np.random.normal(mean, std_dev, size=m).astype(np.float32)
            # approach 1
            LL.weight.data = torch.tensor(W, requires_grad=True)
            LL.bias.data = torch.tensor(bt, requires_grad=True)
            # approach 2
            # LL.weight.data.copy_(torch.tensor(W))
            # LL.bias.data.copy_(torch.tensor(bt))
            # approach 3
            # LL.weight = Parameter(torch.tensor(W),requires_grad=True)
            # LL.bias = Parameter(torch.tensor(bt),requires_grad=True)
            layers.append(LL)

            # construct sigmoid or relu operator
            if i == sigmoid_layer:
                layers.append(nn.Sigmoid())
            else:
                layers.append(nn.ReLU())

        # approach 1: use ModuleList
        # return layers
        # approach 2: use Sequential container to wrap all layers
        return torch.nn.Sequential(*layers)

    def create_emb(self, m, ln):
        emb_l = nn.ModuleList()
        for i in range(0, ln.size):
            n = ln[i]

            # construct embedding operator
            EE = nn.EmbeddingBag(n, m, mode="sum", sparse=True)
            # initialize embeddings
            # nn.init.uniform_(EE.weight, a=-np.sqrt(1 / n), b=np.sqrt(1 / n))
            W = np.random.uniform(
                low=-np.sqrt(1 / n), high=np.sqrt(1 / n), size=(n, m)
            ).astype(np.float32)
            # approach 1
            EE.weight.data = torch.tensor(W, requires_grad=True)
            # approach 2
            # EE.weight.data.copy_(torch.tensor(W))
            # approach 3
            # EE.weight = Parameter(torch.tensor(W),requires_grad=True)
            emb_l.append(EE)

        return emb_l

    def __init__(
        self,
        m_spa=None,
        ln_emb=None,
        ln_bot=None,
        ln_top=None,
        arch_interaction_op=None,
        arch_interaction_itself=False,
        sigmoid_bot=-1,
        sigmoid_top=-1,
        sync_dense_params=True,
        loss_threshold=0.0,
        ndevices=-1,
    ):
        super(DLRM_Net, self).__init__()

        if (
            (m_spa is not None)
            and (ln_emb is not None)
            and (ln_bot is not None)
            and (ln_top is not None)
            and (arch_interaction_op is not None)
        ):

            # save arguments
            self.ndevices = ndevices
            self.output_d = 0
            self.parallel_model_batch_size = -1
            self.parallel_model_is_not_prepared = True
            self.arch_interaction_op = arch_interaction_op
            self.arch_interaction_itself = arch_interaction_itself
            self.sync_dense_params = sync_dense_params
            self.loss_threshold = loss_threshold
            # create operators
            self.emb_l = self.create_emb(m_spa, ln_emb)
            self.bot_l = self.create_mlp(ln_bot, sigmoid_bot)
            self.top_l = self.create_mlp(ln_top, sigmoid_top)

    def apply_mlp(self, x, layers):
        # approach 1: use ModuleList
        # for layer in layers:
        #     x = layer(x)
        # return x
        # approach 2: use Sequential container to wrap all layers
        return layers(x)

    def apply_emb(self, lS_o, lS_i, emb_l):
        # WARNING: notice that we are processing the batch at once. We implicitly
        # assume that the data is laid out such that:
        # 1. each embedding is indexed with a group of sparse indices,
        #   corresponding to a single lookup
        # 2. for each embedding the lookups are further organized into a batch
        # 3. for a list of embedding tables there is a list of batched lookups

        ly = []
        for k, sparse_index_group_batch in enumerate(lS_i):
            sparse_offset_group_batch = lS_o[k]

            # embedding lookup
            # We are using EmbeddingBag, which implicitly uses sum operator.
            # The embeddings are represented as tall matrices, with sum
            # happening vertically across 0 axis, resulting in a row vector
            E = emb_l[k]
            V = E(sparse_index_group_batch, sparse_offset_group_batch)
            
            ly.append(V)

        # print(ly)
        return ly

    def interact_features(self, x, ly):
        if self.arch_interaction_op == "dot":
            # concatenate dense and sparse features
            if len(x.shape) != 1:
                (batch_size, d) = x.shape
            else:
                (batch_size, d) = (x.shape, 0)
            
            T = torch.cat([x] + ly, dim=1).view((batch_size, -1, d))
            # perform a dot product
            Z = torch.bmm(T, torch.transpose(T, 1, 2))
            # append dense feature with the interactions (into a row vector)
            # approach 1: all
            # Zflat = Z.view((batch_size, -1))
            # approach 2: unique
            _, ni, nj = Z.shape
            offset = 0 if self.arch_interaction_itself else -1
            li, lj = torch.tril_indices(ni, nj, offset=offset)
            Zflat = Z[:, li, lj]
            # concatenate dense features and interactions
            R = torch.cat([x] + [Zflat], dim=1)
        elif self.arch_interaction_op == "cat":
            # concatenation features (into a row vector)
            R = torch.cat([x] + ly, dim=1)
        else:
            sys.exit(
                "ERROR: --arch-interaction-op="
                + self.arch_interaction_op
                + " is not supported"
            )

        return R

    def forward(self, dense_x, lS_o, lS_i):
        if self.ndevices <= 1:
            return self.sequential_forward(dense_x, lS_o, lS_i)
        else:
            return self.parallel_forward(dense_x, lS_o, lS_i)

    def sequential_forward(self, dense_x, lS_o, lS_i):
        # process dense features (using bottom mlp), resulting in a row vector
        x = self.apply_mlp(dense_x, self.bot_l)
#         print("x", x)

        # process sparse features(using embeddings), resulting in a list of row vectors
        ly = self.apply_emb(lS_o, lS_i, self.emb_l)
#         print("ly", ly)

        # interact features (dense and sparse)
        z = self.interact_features(x, ly)

        # obtain probability of a click (using top mlp)
        p = self.apply_mlp(z, self.top_l)

        # clamp output if needed
        if 0.0 < self.loss_threshold and self.loss_threshold < 1.0:
            z = torch.clamp(p, min=self.loss_threshold, max=(1.0 - self.loss_threshold))
        else:
            z = p

        return z

    def parallel_forward(self, dense_x, lS_o, lS_i):
        ### prepare model (overwrite) ###
        # WARNING: # of devices must be >= batch size in parallel_forward call
        batch_size = dense_x.size()[0]
        ndevices = min(self.ndevices, batch_size, len(self.emb_l))
        device_ids = range(ndevices)
        # WARNING: must redistribute the model if mini-batch size changes(this is common
        # for last mini-batch, when # of elements in the dataset/batch size is not even
        if self.parallel_model_batch_size != batch_size:
            self.parallel_model_is_not_prepared = True

        if self.sync_dense_params or self.parallel_model_is_not_prepared:
            # replicate mlp (data parallelism)
            self.bot_l_replicas = replicate(self.bot_l, device_ids)
            self.top_l_replicas = replicate(self.top_l, device_ids)
            # distribute embeddings (model parallelism)
            t_list = []
            for k, emb in enumerate(self.emb_l):
                d = torch.device("cuda:" + str(k % ndevices))
                emb.to(d)
                t_list.append(emb.to(d))
            self.emb_l = nn.ModuleList(t_list)
            self.parallel_model_batch_size = batch_size
            self.parallel_model_is_not_prepared = False

        ### prepare input (overwrite) ###
        # scatter dense features (data parallelism)
        # print(dense_x.device)
        dense_x = scatter(dense_x, device_ids, dim=0)
        # distribute sparse features (model parallelism)
        if (len(self.emb_l) != len(lS_o)) or (len(self.emb_l) != len(lS_i)):
            sys.exit("ERROR: corrupted model input detected in parallel_forward call")

        t_list = []
        i_list = []
        for k, _ in enumerate(self.emb_l):
            d = torch.device("cuda:" + str(k % ndevices))
            t_list.append(lS_o[k].to(d))
            i_list.append(lS_i[k].to(d))
        lS_o = t_list
        lS_i = i_list

        ### compute results in parallel ###
        # bottom mlp
        # WARNING: Note that the self.bot_l is a list of bottom mlp modules
        # that have been replicated across devices, while dense_x is a tuple of dense
        # inputs that has been scattered across devices on the first (batch) dimension.
        # The output is a list of tensors scattered across devices according to the
        # distribution of dense_x.
        x = parallel_apply(self.bot_l_replicas, dense_x, None, device_ids)
        # debug prints
        # print(x)

        # embeddings
        ly = self.apply_emb(lS_o, lS_i, self.emb_l)
        # debug prints
        # print(ly)

        # butterfly shuffle (implemented inefficiently for now)
        # WARNING: Note that at this point we have the result of the embedding lookup
        # for the entire batch on each device. We would like to obtain partial results
        # corresponding to all embedding lookups, but part of the batch on each device.
        # Therefore, matching the distribution of output of bottom mlp, so that both
        # could be used for subsequent interactions on each device.
        if len(self.emb_l) != len(ly):
            sys.exit("ERROR: corrupted intermediate result in parallel_forward call")

        t_list = []
        for k, _ in enumerate(self.emb_l):
            d = torch.device("cuda:" + str(k % ndevices))
            y = scatter(ly[k], device_ids, dim=0)
            t_list.append(y)
        # adjust the list to be ordered per device
        ly = list(map(lambda y: list(y), zip(*t_list)))
        # debug prints
        # print(ly)

        # interactions
        z = []
        for k in range(ndevices):
            zk = self.interact_features(x[k], ly[k])
            z.append(zk)
        # debug prints
        # print(z)

        # top mlp
        # WARNING: Note that the self.top_l is a list of top mlp modules that
        # have been replicated across devices, while z is a list of interaction results
        # that by construction are scattered across devices on the first (batch) dim.
        # The output is a list of tensors scattered across devices according to the
        # distribution of z.
        p = parallel_apply(self.top_l_replicas, z, None, device_ids)

        ### gather the distributed results ###
        p0 = gather(p, self.output_d, dim=0)

        # clamp output if needed
        if 0.0 < self.loss_threshold and self.loss_threshold < 1.0:
            z0 = torch.clamp(
                p0, min=self.loss_threshold, max=(1.0 - self.loss_threshold)
            )
        else:
            z0 = p0

        return z0

In [None]:
##Data Utils

def convertUStringToDistinctInts(mat, convertDicts, counts):
    # Converts matrix of unicode strings into distinct integers.
    #
    # Inputs:
    #     mat (np.array): array of unicode strings to convert
    #     convertDicts (list): dictionary for each column
    #     counts (list): number of different categories in each column
    #
    # Outputs:
    #     out (np.array): array of output integers
    #     convertDicts (list): dictionary for each column
    #     counts (list): number of different categories in each column

    # check if convertDicts and counts match correct length of mat
    if len(convertDicts) != mat.shape[1] or len(counts) != mat.shape[1]:
        print("Length of convertDicts or counts does not match input shape")
        print("Generating convertDicts and counts...")

        convertDicts = [{} for _ in range(mat.shape[1])]
        counts = [0 for _ in range(mat.shape[1])]

    # initialize output
    out = torch.zeros(mat.shape)

    for j in range(mat.shape[1]):
        for i in range(mat.shape[0]):
            # add to convertDict and increment count
            if mat[i, j] not in convertDicts[j]:
                convertDicts[j][mat[i, j]] = counts[j]
                counts[j] += 1
            out[i, j] = convertDicts[j][mat[i, j]]

    return out, convertDicts, counts


def processKaggleCriteoAdData(split, d_path):
    # Process Kaggle Display Advertising Challenge Dataset by converting unicode strings
    # in X_cat to integers and converting negative integer values in X_int.
    #
    # Loads data in the form "kaggle_day_i.npz" where i is the day.
    #
    # Inputs:
    #   split (int): total number of splits in the dataset (typically 7)
    #   d_path (str): path for kaggle_day_i.npz files

    convertDicts = []
    counts = []

    # check if processed file already exists
    idx = 1
    while idx <= split:
        if path.exists(str(d_path) + "kaggle_day_{0}_processed.npz".format(idx)):
            idx += 1
        else:
            break

    # process data if not all files exist
    if idx <= split:
        for i in range(1, split + 1):
            with np.load(str(d_path) + "kaggle_day_{0}.npz".format(i)) as data:

                X_cat, convertDicts, counts = convertUStringToDistinctInts(
                    data["X_cat"], convertDicts, counts
                )
                X_int = data["X_int"]
                X_int[X_int < 0] = 0
                y = data["y"]

            np.savez_compressed(
                str(d_path) + "kaggle_day_{0}_processed.npz".format(i),
                X_cat=X_cat,
                X_int=X_int,
                y=y,
            )
            print("Processed kaggle_day_{0}.npz...".format(i), end="\r")

        np.savez_compressed(str(d_path) + "kaggle_counts.npz", counts=counts)
    else:
        print("Using existing %skaggle_day_*_processed.npz files" % str(d_path))

    return


def concatKaggleCriteoAdData(split, d_path, o_filename):
    # Concatenates different days of Kaggle data and saves.
    #
    # Inputs:
    #   split (int): total number of splits in the dataset (typically 7)
    #   d_path (str): path for kaggle_day_i.npz files
    #   o_filename (str): output file name
    #
    # Output:
    #   o_file (str): output file path

    print ("Concatenating multiple day kaggle data into %s.npz file" % str(d_path + o_filename))

    # load and concatenate data
    for i in range(1, split + 1):
        with np.load(str(d_path) + "kaggle_day_{0}_processed.npz".format(i)) as data:

            if i == 1:
                X_cat = data["X_cat"]
                X_int = data["X_int"]
                y = data["y"]

            else:
                X_cat = np.concatenate((X_cat, data["X_cat"]))
                X_int = np.concatenate((X_int, data["X_int"]))
                y = np.concatenate((y, data["y"]))

        print("Loaded day:", i, "y = 1:", len(y[y == 1]), "y = 0:", len(y[y == 0]))

    with np.load(str(d_path) + "kaggle_counts.npz") as data:

        counts = data["counts"]

    print("Loaded counts!")

    np.savez_compressed(
        str(d_path) + str(o_filename) + ".npz",
        X_cat=X_cat,
        X_int=X_int,
        y=y,
        counts=counts,
    )

    return str(d_path) + str(o_filename) + ".npz"


def transformCriteoAdData(X_cat, X_int, y, split, randomize, cuda):
    # Transforms Kaggle data by applying log transformation on dense features and
    # converting everything to appropriate tensors.
    #
    # Inputs:
    #     X_cat (ndarray): array of integers corresponding to preprocessed
    #                      categorical features
    #     X_int (ndarray): array of integers corresponding to dense features
    #     y (ndarray): array of bool corresponding to labels
    #     split (bool): flag for splitting dataset into training/validation/test
    #                     sets
    #     randomize (str): determines randomization scheme
    #         "none": no randomization
    #         "day": randomizes each day"s data (only works if split = True)
    #         "total": randomizes total dataset
    #     cuda (bool): flag for enabling CUDA and transferring data to GPU
    #
    # Outputs:
    #     if split:
    #         X_cat_train (tensor): sparse features for training set
    #         X_int_train (tensor): dense features for training set
    #         y_train (tensor): labels for training set
    #         X_cat_val (tensor): sparse features for validation set
    #         X_int_val (tensor): dense features for validation set
    #         y_val (tensor): labels for validation set
    #         X_cat_test (tensor): sparse features for test set
    #         X_int_test (tensor): dense features for test set
    #         y_test (tensor): labels for test set
    #     else:
    #         X_cat (tensor): sparse features
    #         X_int (tensor): dense features
    #         y (tensor): label

    # define initial set of indices
    indices = np.arange(len(y))

    # split dataset
    if split:

        indices = np.array_split(indices, 7)

        # randomize each day"s dataset
        if randomize == "day" or randomize == "total":
            for i in range(len(indices)):
                indices[i] = np.random.permutation(indices[i])

        train_indices = np.concatenate(indices[:-1])
        test_indices = indices[-1]
        val_indices, test_indices = np.array_split(test_indices, 2)

        print("Defined training and testing indices...")

        # randomize all data in training set
        if randomize == "total":
            train_indices = np.random.permutation(train_indices)
            print("Randomized indices...")

        # create training, validation, and test sets
        X_cat_train = X_cat[train_indices]
        X_int_train = X_int[train_indices]
        y_train = y[train_indices]

        X_cat_val = X_cat[val_indices]
        X_int_val = X_int[val_indices]
        y_val = y[val_indices]

        X_cat_test = X_cat[test_indices]
        X_int_test = X_int[test_indices]
        y_test = y[test_indices]

        print("Split data according to indices...")

        # convert to tensors
        if cuda:
            X_cat_train = torch.tensor(X_cat_train, dtype=torch.long).pin_memory()
            X_int_train = torch.log(
                torch.tensor(X_int_train, dtype=torch.float) + 1
            ).pin_memory()
            y_train = torch.tensor(y_train.astype(np.float32)).pin_memory()

            X_cat_val = torch.tensor(X_cat_val, dtype=torch.long).pin_memory()
            X_int_val = torch.log(
                torch.tensor(X_int_val, dtype=torch.float) + 1
            ).pin_memory()
            y_val = torch.tensor(y_val.astype(np.float32)).pin_memory()

            X_cat_test = torch.tensor(X_cat_test, dtype=torch.long).pin_memory()
            X_int_test = torch.log(
                torch.tensor(X_int_test, dtype=torch.float) + 1
            ).pin_memory()
            y_test = torch.tensor(y_test.astype(np.float32)).pin_memory()
        else:
            X_cat_train = torch.tensor(X_cat_train, dtype=torch.long)
            X_int_train = torch.log(torch.tensor(X_int_train, dtype=torch.float) + 1)
            y_train = torch.tensor(y_train.astype(np.float32))

            X_cat_val = torch.tensor(X_cat_val, dtype=torch.long)
            X_int_val = torch.log(torch.tensor(X_int_val, dtype=torch.float) + 1)
            y_val = torch.tensor(y_val.astype(np.float32))

            X_cat_test = torch.tensor(X_cat_test, dtype=torch.long)
            X_int_test = torch.log(torch.tensor(X_int_test, dtype=torch.float) + 1)
            y_test = torch.tensor(y_test.astype(np.float32))

        print("Converted to tensors...done!")

        return (
            X_cat_train,
            X_int_train,
            y_train,
            X_cat_val,
            X_int_val,
            y_val,
            X_cat_test,
            X_int_test,
            y_test,
        )

    else:

        # randomize data
        if randomize == "total":
            indices = np.random.permutation(indices)

            print("Randomized indices...")

        X_cat = torch.tensor(X_cat[indices], dtype=torch.long)
        X_int = torch.log(torch.tensor(X_int[indices], dtype=torch.float) + 1)
        y = torch.tensor(y[indices].astype(np.float32))

        print("Converted to tensors...done!")

        return X_cat, X_int, y


def getKaggleCriteoAdData(datafile="", o_filename=""):
    # Passes through entire dataset and defines dictionaries for categorical
    # features and determines the number of total categories.
    #
    # Inputs:
    #    datafile : path to downloaded raw data file
    #    o_filename (str): saves results under o_filename if filename is not ""
    #
    # Output:
    #   o_file (str): output file path

    d_path = "./kaggle_data/"

    # determine if intermediate data path exists
    if path.isdir(str(d_path)):
        print("Saving intermediate data files at %s" % (d_path))
    else:
        os.mkdir(str(d_path))
        print("Created %s for storing intermediate data files" % (d_path))

    # determine if data file exists (train.txt)
    if path.exists(str(datafile)):
        print("Reading data from path=%s" % (str(datafile)))
    else:
        print(
            "Path of Kaggle Display Ad Challenge Dataset is invalid; please download from https://labs.criteo.com/2014/09/kaggle-contest-dataset-now-available-academic-use/"
        )
        exit(0)

    # count number of datapoints in training set
    total_count = 0
    with open(str(datafile)) as f:
        for _ in f:
            total_count += 1

    print("Total number of datapoints:", total_count)

    # determine length of split over 7 days
    split = 1
    num_data_per_split, extras = divmod(total_count, 7)

    # generate tuple for dtype and filling values
    type = np.dtype(
        [("label", ("i4", 1)), ("int_feature", ("i4", 13)), ("cat_feature", ("U8", 26))]
    )

    # initialize data to store
    if extras > 0:
        num_data_in_split = num_data_per_split + 1
        extras -= 1

    y = np.zeros(num_data_in_split, dtype="i4")
    X_int = np.zeros((num_data_in_split, 13), dtype="i4")
    X_cat = np.zeros((num_data_in_split, 26), dtype="U8")

    # check if files exist
    while split <= 7:
        if path.exists(str(str(d_path) + "kaggle_day_{0}.npz".format(split))):
            split += 1
        else:
            split = 1
            break

    count = 0
    if split == 1:
        # load training data
        with open(str(datafile)) as f:

            for i, line in enumerate(f):

                # store day"s worth of data and reinitialize data
                if i == (count + num_data_in_split):
                    np.savez_compressed(
                        str(d_path) + "kaggle_day_{0}.npz".format(split),
                        X_int=X_int,
                        X_cat=X_cat,
                        y=y,
                    )

                    print("\nSaved kaggle_day_{0}.npz!".format(split))

                    split += 1
                    count += num_data_in_split

                    if extras > 0:
                        num_data_in_split = num_data_per_split + 1
                        extras -= 1

                    y = np.zeros(num_data_in_split, dtype="i4")
                    X_int = np.zeros((num_data_in_split, 13), dtype="i4")
                    X_cat = np.zeros((num_data_in_split, 26), dtype="U8")

                data = np.genfromtxt(StringIO(line), dtype=type, delimiter="\t")

                y[i - count] = data["label"]
                X_int[i - count] = data["int_feature"]
                X_cat[i - count] = data["cat_feature"]

                print(
                    "Loading %d/%d   Split: %d   No Data in Split: %d  true label: %d  stored label: %d"
                    % (
                        i,
                        total_count,
                        split,
                        num_data_in_split,
                        data["label"],
                        y[i - count],
                    ),
                    end="\r",
                )

        np.savez_compressed(
            str(d_path) + "kaggle_day_{0}.npz".format(split),
            X_int=X_int,
            X_cat=X_cat,
            y=y,
        )

        print("\nSaved kaggle_day_{0}.npz!".format(split))
    else:
        print("Using existing %skaggle_day_*.npz files" % str(d_path))

    processKaggleCriteoAdData(split, d_path)
    o_file = concatKaggleCriteoAdData(split, d_path, o_filename)

    return o_file


def loadDataset(dataset, num_samples, df_path="", data=""):
    if dataset == "kaggle":
        df_exists = path.exists(str(data))
        if df_exists:
            print("Reading from pre-processed data=%s" % (str(data)))
            file = str(data)
        else:
            o_filename = "kaggleAdDisplayChallenge_processed"
            file = getKaggleCriteoAdData(df_path, o_filename)
    elif dataset == "terabyte":
        file = "./terbyte_data/tb_processed.npz"
        df_exists = path.exists(str(file))
        if df_exists:
            print("Reading Terabyte data-set processed data from %s" % file)
        else:
            raise (
                ValueError(
                    "Terabyte data-set processed data file %s does not exist !!" % file
                )
            )

    # load and preprocess data
    with np.load(file) as data:

        X_int = data["X_int"]
        X_cat = data["X_cat"]
        y = data["y"]
        counts = data["counts"]

    return X_cat, X_int, y, counts


In [None]:

### Data loading functions

# Kaggle Display Advertising Challenge Dataset
# dataset (str): name of dataset (Kaggle or Terabyte)
# randomize (str): determines randomization scheme
#            "none": no randomization
#            "day": randomizes each day"s data (only works if split = True)
#            "total": randomizes total dataset
# split (bool) : to split into train, test, validation data-sets
def read_dataset(
    dataset,
    mini_batch_size,
    randomize,
    num_batches,
    split=True,
    raw_data="",
    processed_data="",
    inference_only=False,
):
    # load
    print("Loading %s dataset..." % dataset)
    nbatches = 0
    num_samples = num_batches * mini_batch_size
    X_cat, X_int, y, counts = data_utils.loadDataset(
        dataset, num_samples, raw_data, processed_data
    )

    # transform
    (
        X_cat_train,
        X_int_train,
        y_train,
        X_cat_val,
        X_int_val,
        y_val,
        X_cat_test,
        X_int_test,
        y_test,
    ) = data_utils.transformCriteoAdData(X_cat, X_int, y, split, randomize, False)
    ln_emb = counts
    m_den = X_int_train.shape[1]
    n_emb = len(counts)
    print("Sparse features = %d, Dense features = %d" % (n_emb, m_den))

    # adjust parameters
    if not inference_only:
        lX = []
        lS_offsets = []
        lS_indices = []
        lT = []
        train_nsamples = len(y_train)
        data_size = train_nsamples
        nbatches = int(np.floor((data_size * 1.0) / mini_batch_size))
        print("Training data")
        if num_batches != 0 and num_batches < nbatches:
            print(
                "Limiting to %d batches of the total % d batches"
                % (num_batches, nbatches)
            )
            nbatches = num_batches
        else:
            print("Total number of batches %d" % nbatches)

        # training data main loop
        for j in range(0, nbatches):
            # number of data points in a batch
            print("Reading in batch: %d / %d" % (j + 1, nbatches), end="\r")
            n = min(mini_batch_size, data_size - (j * mini_batch_size))
            # dense feature
            idx_start = j * mini_batch_size
            # WARNING: X_int_train is a PyTorch tensor
            lX.append(
                torch.tensor(
                    (X_int_train[idx_start : (idx_start + n)])
                    .numpy()
                    .astype(np.float32)
                )
            )
            # Training targets - ouptuts
            # WARNING: y_train is a PyTorch tensor
            lT.append(
                torch.tensor(
                    (y_train[idx_start : idx_start + n])
                    .numpy()
                    .reshape(-1, 1)
                    .astype(np.float32)
                )
            )
            # sparse feature (sparse indices)
            lS_emb_indices = []
            # for each embedding generate a list of n lookups,
            # where each lookup is composed of multiple sparse indices
            for size in range(n_emb):
                lS_batch_indices = []
                for _b in range(n):
                    # WARNING: X_cat_train is a PyTorch tensor
                    # store lengths and indices
                    lS_batch_indices += (
                        (X_cat_train[idx_start + _b][size].view(-1))
                        .numpy()
                        .astype(np.int64)
                    ).tolist()
                lS_emb_indices.append(torch.tensor(lS_batch_indices))
            lS_indices.append(lS_emb_indices)
            # Criteo Kaggle data it is 1 because data is categorical
            lS_offsets.append([torch.tensor(list(range(n))) for _ in range(n_emb)])
        print("\n")

    # adjust parameters
    lX_test = []
    lS_offsets_test = []
    lS_indices_test = []
    lT_test = []
    test_nsamples = len(y_test)
    data_size = test_nsamples
    nbatches_test = int(np.floor((data_size * 1.0) / mini_batch_size))
    print("Testing data")
    if num_batches != 0 and num_batches < nbatches_test:
        print(
            "Limiting to %d batches of the total % d batches"
            % (num_batches, nbatches_test)
        )
        nbatches_test = num_batches
    else:
        print("Total number of batches %d" % nbatches_test)

    # testing data main loop
    for j in range(0, nbatches_test):
        # number of data points in a batch
        print("Reading in batch: %d / %d" % (j + 1, nbatches_test), end="\r")
        n = min(mini_batch_size, data_size - (j * mini_batch_size))
        # dense feature
        idx_start = j * mini_batch_size
        # WARNING: X_int_test is a PyTorch tensor
        lX_test.append(
            torch.tensor(
                (X_int_test[idx_start : (idx_start + n)]).numpy().astype(np.float32)
            )
        )
        # Training targets - ouptuts
        # WARNING: y_test is a PyTorch tensor
        lT_test.append(
            torch.tensor(
                (y_test[idx_start : idx_start + n])
                .numpy()
                .reshape(-1, 1)
                .astype(np.float32)
            )
        )
        # sparse feature (sparse indices)
        lS_emb_indices = []
        # for each embedding generate a list of n lookups,
        # where each lookup is composed of multiple sparse indices
        for size in range(n_emb):
            lS_batch_indices = []
            for _b in range(n):
                # WARNING: X_cat_test is a PyTorch tensor
                # store lengths and indices
                lS_batch_indices += (
                    (X_cat_test[idx_start + _b][size].view(-1)).numpy().astype(np.int64)
                ).tolist()
            lS_emb_indices.append(torch.tensor(lS_batch_indices))
        lS_indices_test.append(lS_emb_indices)
        # Criteo Kaggle data it is 1 because data is categorical
        lS_offsets_test.append([torch.tensor(list(range(n))) for _ in range(n_emb)])
    print("\n")

    if not inference_only:
        return (
            nbatches,
            lX,
            lS_offsets,
            lS_indices,
            lT,
            nbatches_test,
            lX_test,
            lS_offsets_test,
            lS_indices_test,
            lT_test,
            ln_emb,
            m_den,
        )
    else:
        return (
            nbatches_test,
            lX_test,
            lS_offsets_test,
            lS_indices_test,
            lT_test,
            None,
            None,
            None,
            None,
            None,
            ln_emb,
            m_den,
        )


# uniform ditribution (input data)
def generate_random_input_data(
    data_size,
    num_batches,
    mini_batch_size,
    round_targets,
    num_indices_per_lookup,
    num_indices_per_lookup_fixed,
    m_den,
    ln_emb,
):
    nbatches = int(np.ceil((data_size * 1.0) / mini_batch_size))
    if num_batches != 0:
        nbatches = num_batches
        data_size = nbatches * mini_batch_size
    # print("Total number of batches %d" % nbatches)

    # inputs
    lX = []
    lS_offsets = []
    lS_indices = []
    for j in range(0, nbatches):
        # number of data points in a batch
        n = min(mini_batch_size, data_size - (j * mini_batch_size))
        # dense feature
        Xt = ra.rand(n, m_den).astype(np.float32)
        lX.append(torch.tensor(Xt))
        # sparse feature (sparse indices)
        lS_emb_offsets = []
        lS_emb_indices = []
        # for each embedding generate a list of n lookups,
        # where each lookup is composed of multiple sparse indices
        for size in ln_emb:
            lS_batch_offsets = []
            lS_batch_indices = []
            offset = 0
            for _ in range(n):
                # num of sparse indices to be used per embedding (between
                if num_indices_per_lookup_fixed:
                    sparse_group_size = np.int64(num_indices_per_lookup)
                else:
                    # random between [1,num_indices_per_lookup])
                    r = ra.random(1)
                    sparse_group_size = np.int64(
                        np.round(max([1.0], r * min(size, num_indices_per_lookup)))
                    )
                # sparse indices to be used per embedding
                r = ra.random(sparse_group_size)
                sparse_group = np.unique(np.round(r * (size - 1)).astype(np.int64))
                # reset sparse_group_size in case some index duplicates were removed
                sparse_group_size = np.int64(sparse_group.size)
                # store lengths and indices
                lS_batch_offsets += [offset]
                lS_batch_indices += sparse_group.tolist()
                # update offset for next iteration
                offset += sparse_group_size
            lS_emb_offsets.append(torch.tensor(lS_batch_offsets))
            lS_emb_indices.append(torch.tensor(lS_batch_indices))
        lS_offsets.append(lS_emb_offsets)
        lS_indices.append(lS_emb_indices)

    return (nbatches, lX, lS_offsets, lS_indices)


# uniform distribution (output data)
def generate_random_output_data(
    data_size, num_batches, mini_batch_size, num_targets=1, round_targets=False
):
    nbatches = int(np.ceil((data_size * 1.0) / mini_batch_size))
    if num_batches != 0:
        nbatches = num_batches
        data_size = nbatches * mini_batch_size
    # print("Total number of batches %d" % nbatches)

    lT = []
    for j in range(0, nbatches):
        # number of data points in a batch
        n = min(mini_batch_size, data_size - (j * mini_batch_size))
        # target (probability of a click)
        if round_targets:
            P = np.round(ra.rand(n, num_targets).astype(np.float32)).astype(np.float32)
        else:
            P = ra.rand(n, num_targets).astype(np.float32)
        lT.append(torch.tensor(P))

    return (nbatches, lT)


# synthetic distribution (input data)
def generate_synthetic_input_data(
    data_size,
    num_batches,
    mini_batch_size,
    round_targets,
    num_indices_per_lookup,
    num_indices_per_lookup_fixed,
    m_den,
    ln_emb,
    trace_file,
    enable_padding=False,
):
    nbatches = int(np.ceil((data_size * 1.0) / mini_batch_size))
    if num_batches != 0:
        nbatches = num_batches
        data_size = nbatches * mini_batch_size
    # print("Total number of batches %d" % nbatches)

    # inputs and targets
    lX = []
    lS_offsets = []
    lS_indices = []
    for j in range(0, nbatches):
        # number of data points in a batch
        n = min(mini_batch_size, data_size - (j * mini_batch_size))
        # dense feature
        Xt = ra.rand(n, m_den).astype(np.float32)
        lX.append(torch.tensor(Xt))
        # sparse feature (sparse indices)
        lS_emb_offsets = []
        lS_emb_indices = []
        # for each embedding generate a list of n lookups,
        # where each lookup is composed of multiple sparse indices
        for i, size in enumerate(ln_emb):
            lS_batch_offsets = []
            lS_batch_indices = []
            offset = 0
            for _ in range(n):
                # num of sparse indices to be used per embedding (between
                if num_indices_per_lookup_fixed:
                    sparse_group_size = np.int64(num_indices_per_lookup)
                else:
                    # random between [1,num_indices_per_lookup])
                    r = ra.random(1)
                    sparse_group_size = np.int64(
                        max(1, np.round(r * min(size, num_indices_per_lookup))[0])
                    )
                # sparse indices to be used per embedding
                file_path = trace_file
                line_accesses, list_sd, cumm_sd = read_dist_from_file(
                    file_path.replace("j", str(i))
                )
                # debug prints
                # print("input")
                # print(line_accesses); print(list_sd); print(cumm_sd);
                # print(sparse_group_size)
                # approach 1: rand
                # r = trace_generate_rand(
                #     line_accesses, list_sd, cumm_sd, sparse_group_size, enable_padding
                # )
                # approach 2: lru
                r = trace_generate_lru(
                    line_accesses, list_sd, cumm_sd, sparse_group_size, enable_padding
                )
                # WARNING: if the distribution in the file is not consistent
                # with embedding table dimensions, below mod guards against out
                # of range access
                sparse_group = np.unique(r).astype(np.int64)
                minsg = np.min(sparse_group)
                maxsg = np.max(sparse_group)
                if (minsg < 0) or (size <= maxsg):
                    print(
                        "WARNING: distribution is inconsistent with embedding "
                        + "table size (using mod to recover and continue)"
                    )
                    sparse_group = np.mod(sparse_group, size).astype(np.int64)
                # sparse_group = np.unique(np.array(np.mod(r, size-1)).astype(np.int64))
                # reset sparse_group_size in case some index duplicates were removed
                sparse_group_size = np.int64(sparse_group.size)
                # store lengths and indices
                lS_batch_offsets += [offset]
                lS_batch_indices += sparse_group.tolist()
                # update offset for next iteration
                offset += sparse_group_size
            lS_emb_offsets.append(torch.tensor(lS_batch_offsets))
            lS_emb_indices.append(torch.tensor(lS_batch_indices))
        lS_offsets.append(lS_emb_offsets)
        lS_indices.append(lS_emb_indices)

    return (nbatches, lX, lS_offsets, lS_indices)


def generate_stack_distance(cumm_val, cumm_dist, max_i, i, enable_padding=False):
    u = ra.rand(1)
    if i < max_i:
        # only generate stack distances up to the number of new references seen so far
        j = bisect.bisect(cumm_val, i) - 1
        fi = cumm_dist[j]
        u *= fi  # shrink distribution support to exclude last values
    elif enable_padding:
        # WARNING: disable generation of new references (once all have been seen)
        fi = cumm_dist[0]
        u = (1.0 - fi) * u + fi  # remap distribution support to exclude first value

    for (j, f) in enumerate(cumm_dist):
        if u <= f:
            return cumm_val[j]


# WARNING: global define, must be consistent across all synthetic functions
cache_line_size = 1


def trace_generate_lru(
    line_accesses, list_sd, cumm_sd, out_trace_len, enable_padding=False
):
    max_sd = list_sd[-1]
    l = len(line_accesses)
    i = 0
    ztrace = []
    for _ in range(out_trace_len):
        sd = generate_stack_distance(list_sd, cumm_sd, max_sd, i, enable_padding)
        mem_ref_within_line = 0  # floor(ra.rand(1)*cache_line_size) #0

        # generate memory reference
        if sd == 0:  # new reference #
            line_ref = line_accesses.pop(0)
            line_accesses.append(line_ref)
            mem_ref = np.uint64(line_ref * cache_line_size + mem_ref_within_line)
            i += 1
        else:  # existing reference #
            line_ref = line_accesses[l - sd]
            mem_ref = np.uint64(line_ref * cache_line_size + mem_ref_within_line)
            line_accesses.pop(l - sd)
            line_accesses.append(line_ref)
        # save generated memory reference
        ztrace.append(mem_ref)

    return ztrace


def trace_generate_rand(
    line_accesses, list_sd, cumm_sd, out_trace_len, enable_padding=False
):
    max_sd = list_sd[-1]
    l = len(line_accesses)  # !!!Unique,
    i = 0
    ztrace = []
    for _ in range(out_trace_len):
        sd = generate_stack_distance(list_sd, cumm_sd, max_sd, i, enable_padding)
        mem_ref_within_line = 0  # floor(ra.rand(1)*cache_line_size) #0
        # generate memory reference
        if sd == 0:  # new reference #
            line_ref = line_accesses.pop(0)
            line_accesses.append(line_ref)
            mem_ref = np.uint64(line_ref * cache_line_size + mem_ref_within_line)
            i += 1
        else:  # existing reference #
            line_ref = line_accesses[l - sd]
            mem_ref = np.uint64(line_ref * cache_line_size + mem_ref_within_line)
        ztrace.append(mem_ref)

    return ztrace


def trace_profile(trace, enable_padding=False):
    # number of elements in the array (assuming 1D)
    # n = trace.size

    rstack = []  # S
    stack_distances = []  # SDS
    line_accesses = []  # L
    for x in trace:
        r = np.uint64(x / cache_line_size)
        l = len(rstack)
        try:  # found #
            i = rstack.index(r)
            # WARNING: I believe below is the correct depth in terms of meaning of the
            #          algorithm, but that is not what seems to be in the paper alg.
            #          -1 can be subtracted if we defined the distance between
            #          consecutive accesses (e.g. r, r) as 0 rather than 1.
            sd = l - i  # - 1
            # push r to the end of stack_distances
            stack_distances.insert(0, sd)
            # remove r from its position and insert to the top of stack
            rstack.pop(i)  # rstack.remove(r)
            rstack.insert(l - 1, r)
        except ValueError:  # not found #
            sd = 0  # -1
            # push r to the end of stack_distances/line_accesses
            stack_distances.insert(0, sd)
            line_accesses.insert(0, r)
            # push r to the top of stack
            rstack.insert(l, r)

    if enable_padding:
        # WARNING: notice that as the ratio between the number of samples (l)
        # and cardinality (c) of a sample increases the probability of
        # generating a sample gets smaller and smaller because there are
        # few new samples compared to repeated samples. This means that for a
        # long trace with relatively small cardinality it will take longer to
        # generate all new samples and therefore obtain full distribution support
        # and hence it takes longer for distribution to resemble the original.
        # Therefore, we may pad the number of new samples to be on par with
        # average number of samples l/c artificially.
        l = len(stack_distances)
        c = max(stack_distances)
        padding = int(np.ceil(l / c))
        stack_distances = stack_distances + [0] * padding

    return (rstack, stack_distances, line_accesses)


# auxiliary read/write routines
def read_trace_from_file(file_path):
    try:
        with open(file_path) as f:
            if args.trace_file_binary_type:
                array = np.fromfile(f, dtype=np.uint64)
                trace = array.astype(np.uint64).tolist()
            else:
                line = f.readline()
                trace = list(map(lambda x: np.uint64(x), line.split(", ")))
            return trace
    except Exception:
        print("ERROR: no input trace file has been provided")


def write_trace_to_file(file_path, trace):
    try:
        if args.trace_file_binary_type:
            with open(file_path, "wb+") as f:
                np.array(trace).astype(np.uint64).tofile(f)
        else:
            with open(file_path, "w+") as f:
                s = str(trace)
                f.write(s[1 : len(s) - 1])
    except Exception:
        print("ERROR: no output trace file has been provided")


def read_dist_from_file(file_path):
    try:
        with open(file_path, "r") as f:
            lines = f.read().splitlines()
    except Exception:
        print("Wrong file or file path")
    # read unique accesses
    unique_accesses = [int(el) for el in lines[0].split(", ")]
    # read cumulative distribution (elements are passed as two separate lists)
    list_sd = [int(el) for el in lines[1].split(", ")]
    cumm_sd = [float(el) for el in lines[2].split(", ")]

    return unique_accesses, list_sd, cumm_sd


def write_dist_to_file(file_path, unique_accesses, list_sd, cumm_sd):
    try:
        with open(file_path, "w") as f:
            # unique_acesses
            s = str(unique_accesses)
            f.write(s[1 : len(s) - 1] + "\n")
            # list_sd
            s = str(list_sd)
            f.write(s[1 : len(s) - 1] + "\n")
            # cumm_sd
            s = str(cumm_sd)
            f.write(s[1 : len(s) - 1] + "\n")
    except Exception:
        print("Wrong file or file path")


if __name__ == "__main__":
    import sys
    import os
    import operator
    import argparse

    ### parse arguments ###
    parser = argparse.ArgumentParser(description="Generate Synthetic Distributions")
    parser.add_argument("--trace-file", type=str, default="./input/trace.log")
    parser.add_argument("--trace-file-binary-type", type=bool, default=False)
    parser.add_argument("--trace-enable-padding", type=bool, default=False)
    parser.add_argument("--dist-file", type=str, default="./input/dist.log")
    parser.add_argument(
        "--synthetic-file", type=str, default="./input/trace_synthetic.log"
    )
    parser.add_argument("--numpy-rand-seed", type=int, default=123)
    parser.add_argument("--print-precision", type=int, default=5)
    args = parser.parse_args()

    ### some basic setup ###
    np.random.seed(args.numpy_rand_seed)
    np.set_printoptions(precision=args.print_precision)

    ### read trace ###
    trace = read_trace_from_file(args.trace_file)
    # print(trace)

    ### profile trace ###
    (_, stack_distances, line_accesses) = trace_profile(
        trace, args.trace_enable_padding
    )
    stack_distances.reverse()
    line_accesses.reverse()
    # print(line_accesses)
    # print(stack_distances)

    ### compute probability distribution ###
    # count items
    l = len(stack_distances)
    dc = sorted(
        collections.Counter(stack_distances).items(), key=operator.itemgetter(0)
    )

    # create a distribution
    list_sd = list(map(lambda tuple_x_k: tuple_x_k[0], dc))  # x = tuple_x_k[0]
    dist_sd = list(
        map(lambda tuple_x_k: tuple_x_k[1] / float(l), dc)
    )  # k = tuple_x_k[1]
    cumm_sd = []  # np.cumsum(dc).tolist() #prefixsum
    for i, (_, k) in enumerate(dc):
        if i == 0:
            cumm_sd.append(k / float(l))
        else:
            # add the 2nd element of the i-th tuple in the dist_sd list
            cumm_sd.append(cumm_sd[i - 1] + (k / float(l)))

    ### write stack_distance and line_accesses to a file ###
    write_dist_to_file(args.dist_file, line_accesses, list_sd, cumm_sd)

    ### generate correspondinf synthetic ###
    # line_accesses, list_sd, cumm_sd = read_dist_from_file(args.dist_file)
    synthetic_trace = trace_generate_lru(
        line_accesses, list_sd, cumm_sd, len(trace), args.trace_enable_padding
    )
    # synthetic_trace = trace_generate_rand(
    #     line_accesses, list_sd, cumm_sd, len(trace), args.trace_enable_padding
    # )
    write_trace_to_file(args.synthetic_file, synthetic_trace)


In [34]:
# Args for random and synthetic:
use_gpu = False
arch_sparse_feature_size = 16 # 2
arch_embedding_size = "4-3-2"
arch_mlp_bot = "13-512-256-64-16" #"4-3-2"
arch_mlp_top = "512-256-1" #"4-2-1"
arch_interaction_op = "dot"
arch_interaction_itself = False

activation_function = "relu"
loss_function = "mse"
loss_threshold = 0.0
round_targets = False

data_size = 1
num_batches = 0
data_generation = "random"
data_trace_file = "./input/dist_emb_j.log"
data_set = "kaggle"
raw_data_file = ""
processed_data_file = ""
data_randomize = "total"
data_trace_enable_padding = False
num_indices_per_lookup = 10
num_indices_per_lookup_fixed = True


mini_batch_size = 2
nepochs = 1
learning_rate = 0.01
print_precision = 5
numpy_rand_seed = 123
sync_dense_params = True

inference_only = False
save_onnx = False
print_freq = 1
test_freq = 1
print_time = False
debug_mode = False
enable_profiling = False
plot_compute_graph = False
save_model = ""
load_model = ""

In [35]:
# # uncomment this and comment above cell for real criteo dataset:
# use_gpu = False
# arch_sparse_feature_size = 16
# arch_embedding_size = "4-3-2"
# arch_mlp_bot = "13-512-256-64-16"
# arch_mlp_top = "512-256-1"
# arch_interaction_op = "dot"
# arch_interaction_itself = False

# activation_function = "relu"
# loss_function = "bce"
# loss_threshold = 0.0
# round_targets = True

# data_size = 1
# num_batches = 0
# data_generation = "dataset"
# data_trace_file = "./input/dist_emb_j.log"
# data_set = "kaggle"
# raw_data_file = "./unprocesed/train.txt" ## fill in the path with train.txt file
# processed_data_file = "" ## once a master processed file is generated fill that file path here
# data_randomize = "total"
# data_trace_enable_padding = False
# num_indices_per_lookup = 10
# num_indices_per_lookup_fixed = True


# mini_batch_size = 1
# nepochs = 1
# learning_rate = 0.2
# print_precision = 5
# numpy_rand_seed = 123
# sync_dense_params = True

# inference_only = False
# save_onnx = True
# print_freq = 1024
# test_freq = 1
# print_time = False
# debug_mode = False
# enable_profiling = True
# plot_compute_graph = True
# save_model = ""
# load_model = ""

In [36]:
# use_gpu = False
### some basic setup ###
np.random.seed(numpy_rand_seed)
np.set_printoptions(precision=print_precision)
torch.set_printoptions(precision=print_precision)
torch.manual_seed(numpy_rand_seed)

use_gpu = use_gpu and torch.cuda.is_available()
if use_gpu:
    torch.cuda.manual_seed_all(numpy_rand_seed)
    torch.backends.cudnn.deterministic = True
    device = torch.device("cuda", 0)
    ngpus = torch.cuda.device_count()  # 1
    print("Using {} GPU(s)...".format(ngpus))
else:
    device = torch.device("cpu")
    print("Using CPU...")

Using CPU...


In [37]:
### prepare training data ###
ln_bot = np.fromstring(arch_mlp_bot, dtype=int, sep="-")
# input data
if data_generation == "dataset":
    # input and target data
    (
        nbatches,
        lX,
        lS_o,
        lS_i,
        lT,
        nbatches_test,
        lX_test,
        lS_o_test,
        lS_i_test,
        lT_test,
        ln_emb,
        m_den,
    ) = read_dataset(
        data_set,
        mini_batch_size,
        data_randomize,
        num_batches,
        True,
        raw_data_file,
        processed_data_file,
        inference_only,
    )
    ln_bot[0] = m_den
else:
    # input data
    ln_emb = np.fromstring(arch_embedding_size, dtype=int, sep="-")
    m_den = ln_bot[0]
    if data_generation == "random":
        (nbatches, lX, lS_o, lS_i) = generate_random_input_data(
            data_size,
            num_batches,
            mini_batch_size,
            round_targets,
            num_indices_per_lookup,
            num_indices_per_lookup_fixed,
            m_den,
            ln_emb,
        )
    elif data_generation == "synthetic":
        (nbatches, lX, lS_o, lS_i) = generate_synthetic_input_data(
            data_size,
            num_batches,
            mini_batch_size,
            round_targets,
            num_indices_per_lookup,
            num_indices_per_lookup_fixed,
            m_den,
            ln_emb,
            data_trace_file,
            data_trace_enable_padding,
        )
    else:
        sys.exit(
            "ERROR: --data-generation=" + data_generation + " is not supported"
        )

    # target data
    (nbatches, lT) = generate_random_output_data(
        data_size,
        num_batches,
        mini_batch_size,
        round_targets=round_targets,
    )



In [38]:
### parse command line arguments ###
m_spa = arch_sparse_feature_size
num_fea = ln_emb.size + 1  # num sparse + num dense features
m_den_out = ln_bot[ln_bot.size - 1]
if arch_interaction_op == "dot":
    # approach 1: all
    # num_int = num_fea * num_fea + m_den_out
    # approach 2: unique
    if arch_interaction_itself:
        num_int = (num_fea * (num_fea + 1)) // 2 + m_den_out
    else:
        num_int = (num_fea * (num_fea - 1)) // 2 + m_den_out
elif arch_interaction_op == "cat":
    num_int = num_fea * m_den_out
else:
    sys.exit(
        "ERROR: --arch-interaction-op="
        + args.arch_interaction_op
        + " is not supported"
    )
arch_mlp_top_adjusted = str(num_int) + "-" + arch_mlp_top
ln_top = np.fromstring(arch_mlp_top_adjusted, dtype=int, sep="-")
# sanity check: feature sizes and mlp dimensions must match
if m_den != ln_bot[0]:
    sys.exit(
        "ERROR: arch-dense-feature-size "
        + str(m_den)
        + " does not match first dim of bottom mlp "
        + str(ln_bot[0])
    )
if m_spa != m_den_out:
    sys.exit(
        "ERROR: arch-sparse-feature-size "
        + str(m_spa)
        + " does not match last dim of bottom mlp "
        + str(m_den_out)
    )
if num_int != ln_top[0]:
    sys.exit(
        "ERROR: # of feature interactions "
        + str(num_int)
        + " does not match first dimension of top mlp "
        + str(ln_top[0])
    )


In [39]:
# test prints (model arch)
if debug_mode:
    print("model arch:")
    print(
        "mlp top arch "
        + str(ln_top.size - 1)
        + " layers, with input to output dimensions:"
    )
    print(ln_top)
    print("# of interactions")
    print(num_int)
    print(
        "mlp bot arch "
        + str(ln_bot.size - 1)
        + " layers, with input to output dimensions:"
    )
    print(ln_bot)
    print("# of features (sparse and dense)")
    print(num_fea)
    print("dense feature size")
    print(m_den)
    print("sparse feature size")
    print(m_spa)
    print(
        "# of embeddings (= # of sparse features) "
        + str(ln_emb.size)
        + ", with dimensions "
        + str(m_spa)
        + "x:"
    )
    print(ln_emb)

    print("data (inputs and targets):")
    for j in range(0, nbatches):
        print("mini-batch: %d" % j)
        print(lX[j].detach().cpu().numpy())
        # transform offsets to lengths when printing
        print(
            [
                np.diff(
                    S_o.detach().cpu().tolist() + list(lS_i[j][i].shape)
                ).tolist()
                for i, S_o in enumerate(lS_o[j])
            ]
        )
        print([S_i.detach().cpu().tolist() for S_i in lS_i[j]])
        print(lT[j].detach().cpu().numpy())

In [40]:
### construct the neural network specified above ###
dlrm = DLRM_Net(
    m_spa,
    ln_emb,
    ln_bot,
    ln_top,
    arch_interaction_op=arch_interaction_op,
    arch_interaction_itself=arch_interaction_itself,
    sigmoid_bot=-1,
    sigmoid_top=ln_top.size - 2,
    sync_dense_params=sync_dense_params,
    loss_threshold=loss_threshold,
)

In [41]:
# test prints
if debug_mode:
    print("initial parameters (weights and bias):")
    for param in dlrm.parameters():
        print(param.detach().cpu().numpy())
    # print(dlrm)

if use_gpu:
    if ngpus > 1:
        # Custom Model-Data Parallel
        # the mlps are replicated and use data parallelism, while
        # the embeddings are distributed and use model parallelism
        dlrm.ndevices = min(ngpus, mini_batch_size, num_fea - 1)
    dlrm = dlrm.to(device)  # .cuda()

# specify the loss function
if loss_function == "mse":
    loss_fn = torch.nn.MSELoss(reduction="mean")
elif loss_function == "bce":
    loss_fn = torch.nn.BCELoss(reduction="mean")
else:
    sys.exit("ERROR: --loss-function=" + loss_function + " is not supported")

if not inference_only:
    # specify the optimizer algorithm
    optimizer = torch.optim.SGD(dlrm.parameters(), lr=learning_rate)


In [42]:
### main loop ###
def time_wrap(use_gpu):
    if use_gpu:
        torch.cuda.synchronize()
    return time.time()

def dlrm_wrap(X, lS_o, lS_i, use_gpu, device):
    if use_gpu:  # .cuda()
        return dlrm(
            X.to(device),
            [S_o.to(device) for S_o in lS_o],
            [S_i.to(device) for S_i in lS_i],
        )
    else:
        return dlrm(X, lS_o, lS_i)

def loss_fn_wrap(Z, T, use_gpu, device):
    if use_gpu:
        return loss_fn(Z, T.to(device))
    else:
        return loss_fn(Z, T)

    # training

best_gA_test = 0
total_time = 0
total_loss = 0
total_accu = 0
total_iter = 0
k = 0

In [43]:
# Load model is specified
if not (load_model == ""):
    print("Loading saved mode {}".format(load_model))
    ld_model = torch.load(load_model)
    dlrm.load_state_dict(ld_model["state_dict"])
    ld_j = ld_model["iter"]
    ld_k = ld_model["epoch"]
    ld_nepochs = ld_model["nepochs"]
    ld_nbatches = ld_model["nbatches"]
    ld_nbatches_test = ld_model["nbatches_test"]
    ld_gA = ld_model["train_acc"]
    ld_gL = ld_model["train_loss"]
    ld_total_loss = ld_model["total_loss"]
    ld_total_accu = ld_model["total_accu"]
    ld_gA_test = ld_model["test_acc"]
    ld_gL_test = ld_model["test_loss"]
    if not inference_only:
        optimizer.load_state_dict(ld_model["opt_state_dict"])
        best_gA_test = ld_gA_test
        total_loss = ld_total_loss
        total_accu = ld_total_accu
        k = ld_k  # epochs
        j = ld_j  # batches
    else:
        print_freq = ld_nbatches
        test_freq = 0
    print(
        "Saved model Training state: epoch = {:d}/{:d}, batch = {:d}/{:d}, train loss = {:.6f}, train accuracy = {:3.3f} %".format(
            ld_k, ld_nepochs, ld_j, ld_nbatches, ld_gL, ld_gA * 100
        )
    )
    print(
        "Saved model Testing state: nbatches = {:d}, test loss = {:.6f}, test accuracy = {:3.3f} %".format(
            ld_nbatches_test, ld_gL_test, ld_gA_test * 100
        )
    )

In [44]:
print("time/loss/accuracy (if enabled):")
with torch.autograd.profiler.profile(enable_profiling, use_gpu) as prof:
    while k < nepochs:
        j = 0
        while j < nbatches:
            t1 = time_wrap(use_gpu)

            # forward pass
            Z = dlrm_wrap(lX[j], lS_o[j], lS_i[j], use_gpu, device)

            # loss
            E = loss_fn_wrap(Z, lT[j], use_gpu, device)

            # compute loss and accuracy
            L = E.detach().cpu().numpy()  # numpy array
            S = Z.detach().cpu().numpy()  # numpy array
            T = lT[j].detach().cpu().numpy()  # numpy array
            mbs = T.shape[0]  # = args.mini_batch_size except maybe for last
            A = np.sum((np.round(S, 0) == T).astype(np.uint8)) / mbs

            if not inference_only:
                # scaled error gradient propagation
                # (where we do not accumulate gradients across mini-batches)
                optimizer.zero_grad()
                # backward pass
                E.backward()
                # debug prints (check gradient norm)
                # for l in mlp.layers:
                #     if hasattr(l, 'weight'):
                #          print(l.weight.grad.norm().item())

                # optimizer
                optimizer.step()

            t2 = time_wrap(use_gpu)
            total_time += t2 - t1
            total_accu += A
            total_loss += L
            total_iter += 1

            print_tl = ((j + 1) % print_freq == 0) or (j + 1 == nbatches)
            print_ts = (
                (test_freq > 0)
                and (data_generation == "dataset")
                and (((j + 1) % test_freq == 0) or (j + 1 == nbatches))
            )

            # print time, loss and accuracy
            if print_tl or print_ts:
                gT = 1000.0 * total_time / total_iter if print_time else -1
                total_time = 0

                gL = total_loss / total_iter
                total_loss = 0

                gA = total_accu / total_iter
                total_accu = 0

                str_run_type = "inference" if inference_only else "training"
                print(
                    "Finished {} it {}/{} of epoch {}, ".format(
                        str_run_type, j + 1, nbatches, k
                    )
                    + "{:.2f} ms/it, loss {:.6f}, accuracy {:3.3f} %".format(
                        gT, gL, gA * 100
                    )
                )
                total_iter = 0

            # testing
            if print_ts and not inference_only:
                test_accu = 0
                test_loss = 0

                for jt in range(0, nbatches_test):
                    t1_test = time_wrap(use_gpu)

                    # forward pass
                    Z_test = dlrm_wrap(
                        lX_test[jt], lS_o_test[jt], lS_i_test[jt], use_gpu, device
                    )
                    # loss
                    E_test = loss_fn_wrap(Z_test, lT_test[jt], use_gpu, device)

                    # compute loss and accuracy
                    L_test = E_test.detach().cpu().numpy()  # numpy array
                    S_test = Z_test.detach().cpu().numpy()  # numpy array
                    T_test = lT_test[jt].detach().cpu().numpy()  # numpy array
                    mbs_test = T_test.shape[
                        0
                    ]  # = args.mini_batch_size except maybe for last
                    A_test = (
                        np.sum((np.round(S_test, 0) == T_test).astype(np.uint8))
                        / mbs_test
                    )

                    t2_test = time_wrap(use_gpu)

                    test_accu += A_test
                    test_loss += L_test

                gL_test = test_loss / nbatches_test
                gA_test = test_accu / nbatches_test

                is_best = gA_test > best_gA_test
                if is_best:
                    best_gA_test = gA_test
                    if not (save_model == ""):
                        print("Saving model to {}".format(save_model))
                        torch.save(
                            {
                                "epoch": k,
                                "nepochs": nepochs,
                                "nbatches": nbatches,
                                "nbatches_test": nbatches_test,
                                "iter": j + 1,
                                "state_dict": dlrm.state_dict(),
                                "train_acc": gA,
                                "train_loss": gL,
                                "test_acc": gA_test,
                                "test_loss": gL_test,
                                "total_loss": total_loss,
                                "total_accu": total_accu,
                                "opt_state_dict": optimizer.state_dict(),
                            },
                            save_model,
                        )

                print(
                    "Testing at - {}/{} of epoch {}, ".format(j + 1, nbatches, 0)
                    + "loss {:.6f}, accuracy {:3.3f} %, best {:3.3f} %".format(
                        gL_test, gA_test * 100, best_gA_test * 100
                    )
                )

            j += 1  # nbatches
        k += 1  # nepochs
# profiling
if enable_profiling:
    with open("dlrm_s_pytorch.prof", "w") as prof_f:
        prof_f.write(prof.key_averages().table(sort_by="cpu_time_total"))
        prof.export_chrome_trace("./dlrm_s_pytorch.json")
    # print(prof.key_averages().table(sort_by="cpu_time_total"))   


time/loss/accuracy (if enabled):
Finished training it 1/1 of epoch 0, -1.00 ms/it, loss 0.071748, accuracy 0.000 %


In [45]:
### uncomment for profiling and plotting and saving in  onnx
# # # profiling
# # if enable_profiling:
# #     with open("dlrm_s_pytorch.prof", "w") as prof_f:
# #         prof_f.write(prof.key_averages().table(sort_by="cpu_time_total"))
# #         prof.export_chrome_trace("./dlrm_s_pytorch.json")
# #     # print(prof.key_averages().table(sort_by="cpu_time_total"))

# # # plot compute graph
# # if plot_compute_graph:
# # #     sys.exit(
# # #         "ERROR: Please install pytorchviz package in order to use the"
# # #         + " visualization. Then, uncomment its import above as well as"
# # #         + " three lines below and run the code again."
# # #     )
# #     V = Z.mean() if inference_only else Parameter(torch.Tensor(L))
# #     dot = make_dot(V, params=dict(dlrm.named_parameters()))
# #     dot.render('dlrm_s_pytorch_graph') # write .pdf file

# # test prints
# if not inference_only and debug_mode:
#     print("updated parameters (weights and bias):")
#     for param in dlrm.parameters():
#         print(param.detach().cpu().numpy())

# if save_onnx:
#     # export the model in onnx
#     with open("dlrm_s_pytorch.onnx", "w+b") as dlrm_pytorch_onnx_file:
#         torch.onnx._export(
#             dlrm, (lX[0], lS_o[0], lS_i[0]), dlrm_pytorch_onnx_file, verbose=True
#         )
#     # recover the model back
#     dlrm_pytorch_onnx = onnx.load("dlrm_s_pytorch.onnx")
#     # check the onnx model
#     onnx.checker.check_model(dlrm_pytorch_onnx)

In [46]:
torch.save(dlrm,"dlrm_synthetic_model")

In [47]:
dlrm_infer = torch.load("dlrm_synthetic_model")

In [48]:
dlrm_infer.eval()

DLRM_Net(
  (emb_l): ModuleList(
    (0): EmbeddingBag(4, 16, mode=sum)
    (1): EmbeddingBag(3, 16, mode=sum)
    (2): EmbeddingBag(2, 16, mode=sum)
  )
  (bot_l): Sequential(
    (0): Linear(in_features=13, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=256, bias=True)
    (3): ReLU()
    (4): Linear(in_features=256, out_features=64, bias=True)
    (5): ReLU()
    (6): Linear(in_features=64, out_features=16, bias=True)
    (7): ReLU()
  )
  (top_l): Sequential(
    (0): Linear(in_features=22, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=256, bias=True)
    (3): ReLU()
    (4): Linear(in_features=256, out_features=1, bias=True)
    (5): Sigmoid()
  )
)

In [51]:
lS_i[0][0]

tensor([0, 1, 2, 3])

In [32]:
lS_o

[[tensor([0, 3]), tensor([0, 3]), tensor([0, 2])]]

In [31]:
lS_i

[[tensor([0, 1, 2, 0, 1, 2, 3]),
  tensor([0, 1, 2, 0, 1, 2]),
  tensor([0, 1, 0, 1])]]

In [49]:
##### predicting click probabilities 

dlrm_infer(lX[0], lS_o[0], lS_i[0])

tensor([[0.14780]], grad_fn=<SigmoidBackward>)