In [1]:
import argparse
import sys
import horovod.spark

In [2]:
parser = argparse.ArgumentParser()
parser.add_argument("--data_dir", type=str)
parser.add_argument("--global_batch_size", type=int)
parser.add_argument("--amp", action="store_true", help="use auto mixed precision")
parser.add_argument("--xla", action="store_true", help="enable xla of tensorflow")
parser.add_argument(
    "--compress",
    action="store_true",
    help="use tf.unique/tf.gather to compress/decompress embedding keys",
)
parser.add_argument("--custom_interact", action="store_true", help="use custom interact op")
parser.add_argument(
    "--eval_in_last", action="store_true", help="evaluate only after the last iteration"
)
parser.add_argument(
    "--use_synthetic_dataset", action="store_true", help="use synthetic dataset for profiling"
)
parser.add_argument(
    "--use_splited_dataset",
    action="store_true",
    help="categories features were splited into different binary files",
)
parser.add_argument("--early_stop", type=int, default=-1)
parser.add_argument("--epochs", type=int, default=1)
parser.add_argument("--lr", type=float, default=24.0)

_StoreAction(option_strings=['--lr'], dest='lr', nargs=None, const=None, default=24.0, type=<class 'float'>, choices=None, help=None, metavar=None)

In [3]:
local_batch_size = 1 << 8
num_proc = 2
args = parser.parse_args(["--epochs=100",f"--global_batch_size={num_proc * local_batch_size}","--use_synthetic_dataset"])
args.num_proc = num_proc
args.lr_schedule_steps = [
    int(2750 * 55296 / args.global_batch_size),
    int(49315 * 55296 / args.global_batch_size),
    int(27772 * 55296 / args.global_batch_size),
]
args.num_proc = 2
print("[Info] args:", args)

[Info] args: Namespace(amp=False, compress=False, custom_interact=False, data_dir=None, early_stop=-1, epochs=100, eval_in_last=False, global_batch_size=512, lr=24.0, lr_schedule_steps=[297000, 5326020, 2999376], num_proc=2, use_splited_dataset=False, use_synthetic_dataset=True, xla=False)


In [4]:
spark

In [5]:
def train_fn(args):
    import os
    from horovod.spark.task import get_available_devices
    
    if args.xla:
        os.environ["TF_XLA_FLAGS"] = "--tf_xla_auto_jit=fusible"
    
    gpus = get_available_devices()
    print("gpus: " + str(gpus))
    if gpus:
        os.environ['CUDA_VISIBLE_DEVICES'] = gpus[0]
        
    import sparse_operation_kit as sok
    import horovod.tensorflow as hvd
    import tensorflow as tf
    import numpy as np
    from dataset import BinaryDataset, SplitedBinaryDataset, SyntheticDataset
    from model import DLRM
    from trainer import Trainer
    import json
    import time
    start_time = time.time()
    
        
    if args.amp:
        print("[Info] use amp mode")
        policy = tf.keras.mixed_precision.Policy("mixed_float16")
        tf.keras.mixed_precision.set_global_policy(policy)

    hvd.init()

    global_batch_size = args.global_batch_size
    sok.Init(global_batch_size=global_batch_size)
    
    if not (args.use_synthetic_dataset or args.data_dir is None):
        with open(os.path.join(args.data_dir, "train/metadata.json"), "r") as f:
            metadata = json.load(f)
    else:
        vocab_sizes = [
            39884406,
            39043,
            17289,
            7420,
            20263,
            3,
            7120,
            1543,
            63,
            38532951,
            2953546,
            403346,
            10,
            2208,
            11938,
            155,
            4,
            976,
            14,
            39979771,
            25641295,
            39664984,
            585935,
            12972,
            108,
            36,
        ]
        metadata = {"vocab_sizes": vocab_sizes}
    print(metadata)
    embedding_vec_size=64
    model = DLRM(
        metadata["vocab_sizes"],
        num_dense_features=13,
        #embedding_vec_size=128,
        embedding_vec_size=embedding_vec_size,
        bottom_stack_units=[512, 256, embedding_vec_size],
        top_stack_units=[1024, 1024, 512, 256, 1],
        num_gpus=hvd.size(),
        use_cuda_interact=args.custom_interact,
        compress=args.compress,
    )

    if args.use_synthetic_dataset or args.data_dir is None:
        print("[Info] Using synthetic dataset")

        dataset = SyntheticDataset(
            batch_size=global_batch_size // hvd.size(),
            num_iterations=args.early_stop if args.early_stop > 0 else 30,
            vocab_sizes=metadata["vocab_sizes"],
            prefetch=20,
        )
        test_dataset = SyntheticDataset(
            batch_size=global_batch_size // hvd.size(),
            num_iterations=args.early_stop if args.early_stop > 0 else 30,
            vocab_sizes=metadata["vocab_sizes"],
            prefetch=20,
        )
    elif args.use_splited_dataset:
        print("[Info] Using splited dataset in %s" % args.data_dir)
        dataset = SplitedBinaryDataset(
            os.path.join(args.data_dir, "train/label.bin"),
            os.path.join(args.data_dir, "train/dense.bin"),
            [os.path.join(args.data_dir, "train/category_%d.bin" % i) for i in range(26)],
            metadata["vocab_sizes"],
            batch_size=global_batch_size // hvd.size(),
            drop_last=True,
            global_rank=hvd.rank(),
            global_size=hvd.size(),
            prefetch=20,
        )
        test_dataset = SplitedBinaryDataset(
            os.path.join(args.data_dir, "test/label.bin"),
            os.path.join(args.data_dir, "test/dense.bin"),
            [os.path.join(args.data_dir, "test/category_%d.bin" % i) for i in range(26)],
            metadata["vocab_sizes"],
            batch_size=global_batch_size // hvd.size(),
            drop_last=False,
            global_rank=hvd.rank(),
            global_size=hvd.size(),
            prefetch=20,
        )
    else:
        print("[Info] Using dataset in %s" % args.data_dir)
        dtype = {"int32": np.int32, "float32": np.float32}
        dataset_dir = args.data_dir
        dataset = BinaryDataset(
            os.path.join(dataset_dir, "train/label.bin"),
            os.path.join(dataset_dir, "train/dense.bin"),
            os.path.join(dataset_dir, "train/category.bin"),
            batch_size=global_batch_size // hvd.size(),
            drop_last=True,
            global_rank=hvd.rank(),
            global_size=hvd.size(),
            prefetch=20,
            label_raw_type=dtype[metadata["label_raw_type"]],
            dense_raw_type=dtype[metadata["dense_raw_type"]],
            category_raw_type=dtype[metadata["category_raw_type"]],
            log=metadata["dense_log"],
        )
        test_dataset = BinaryDataset(
            os.path.join(dataset_dir, "test/label.bin"),
            os.path.join(dataset_dir, "test/dense.bin"),
            os.path.join(dataset_dir, "test/category.bin"),
            batch_size=global_batch_size // hvd.size(),
            drop_last=False,
            global_rank=hvd.rank(),
            global_size=hvd.size(),
            prefetch=20,
            label_raw_type=dtype[metadata["label_raw_type"]],
            dense_raw_type=dtype[metadata["dense_raw_type"]],
            category_raw_type=dtype[metadata["category_raw_type"]],
            log=metadata["dense_log"],
        )

    trainer = Trainer(
        model,
        dataset,
        test_dataset,
        auc_thresholds=8000,
        base_lr=args.lr,
        warmup_steps=args.lr_schedule_steps[0],
        decay_start_step=args.lr_schedule_steps[1],
        decay_steps=args.lr_schedule_steps[2],
        amp=args.amp,
    )

    if args.eval_in_last:
        trainer.train(
            eval_interval=None, eval_in_last=True, early_stop=args.early_stop, epochs=args.epochs
        )
    else:
        trainer.train(eval_in_last=False, early_stop=args.early_stop, epochs=args.epochs)

    print("main time: %.2fs" % (time.time() - start_time))

In [6]:
def train(args):
    # Horovod: run training.
    history = horovod.spark.run(train_fn,
                                args=(args,),
                                num_proc=args.num_proc,
                                #extra_mpi_args='-mca btl_tcp_if_include enp134s0f0 -x NCCL_IB_GID_INDEX=3',
                                #extra_mpi_args='--mca btl tcp,sm,self',
                                stdout=sys.stdout,
                                stderr=sys.stderr,
                                verbose=2,
                                nics={},
                                prefix_output_with_timestamp=True,
                                env=dict(os.environ))[0]

    print(f'history: {history}')

In [7]:
train(args)

[Stage 0:>                                                          (0 + 2) / 2]

Checking whether extension tensorflow was built with MPI.
Extension tensorflow was built with MPI.
mpirun --allow-run-as-root --tag-output -np 2 -H 3ea7bbc82c43-a981c8275bef9425530e401853bc2659:2 -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib --timestamp-output     -x BASH_ENV -x BAZELRC -x CLICOLOR -x CUBLAS_VERSION -x CUDA_CACHE_DISABLE -x CUDA_CUDA_LIBRARY -x CUDA_DRIVER_VERSION -x CUDA_HOME -x CUDA_PATH -x CUDA_VERSION -x CUDNN_VERSION -x CUFFT_VERSION -x CURAND_VERSION -x CUSOLVER_VERSION -x CUSPARSE_VERSION -x CUTENSOR_VERSION -x DALI_BUILD -x DALI_VERSION -x DEBIAN_FRONTEND -x DLPROF_VERSION -x ENV -x GDRCOPY_VERSION -x GIT_PAGER -x HOME -x HOSTNAME -x HPCX_VERSION -x JPY_PARENT_PID -x JUPYTER_PORT -x LC_ALL -x LD_LIBRARY_PATH -x LESSCLOSE -x LESSOPEN -x LIBRARY_PATH -x LS_COLORS -x MOFED_VERSION -x MPLBACKEND -x NCCL_VERSION -x NPP_VERSION -x NSIGHT_COMPUTE_VERSION -x NSIGHT_SYSTEMS_VERSION -x NVIDIA_BUILD_ID -x NVIDIA_DRIVER_CAPABILITIES -x NVIDIA_PRODUCT_NAME -x NVI

Tue Sep 27 21:18:25 2022[1,0]<stderr>:2022-09-27 21:18:25.288984: I tensorflow/core/platform/cpu_feature_guard.cc:152] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE3 SSE4.1 SSE4.2 AVX
Tue Sep 27 21:18:25 2022[1,0]<stderr>:To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Tue Sep 27 21:18:25 2022[1,1]<stderr>:2022-09-27 21:18:25.355467: I tensorflow/core/platform/cpu_feature_guard.cc:152] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE3 SSE4.1 SSE4.2 AVX
Tue Sep 27 21:18:25 2022[1,1]<stderr>:To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Tue Sep 27 21:18:25 2022[1,0]<stderr>:2022-09-27 21:18:25.985843: W tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.cc:39] Overridi

Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 [0] NCCL INFO Bootstrap : Using eth0:172.17.0.2<0>
Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 [0] NCCL INFO NET/Plugin: Failed to find ncclNetPlugin_v5 symbol.
Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 [0] NCCL INFO NET/Plugin: Failed to find ncclCollNetPlugin_v5 symbol.
Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 [0] NCCL INFO Plugin Path : /opt/hpcx/nccl_rdma_sharp_plugin/lib/libnccl-net.so
Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 [0] NCCL INFO P2P plugin IBext
Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 [0] NCCL INFO NET/IB : No device found.
Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 [0] NCCL INFO NET/IB : No device found.
Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 [0] NCCL INFO NET/Socket : Using [0]eth0:172.17.0.2<0>
Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 

Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 [0] NCCL INFO Channel 07 : 0[34000] -> 1[36000] via P2P/IPC
Tue Sep 27 21:18:26 2022[1,1]<stdout>:3ea7bbc82c43:29030:29030 [0] NCCL INFO Channel 08 : 1[36000] -> 0[34000] via P2P/IPC
Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 [0] NCCL INFO Channel 08 : 0[34000] -> 1[36000] via P2P/IPC
Tue Sep 27 21:18:26 2022[1,1]<stdout>:3ea7bbc82c43:29030:29030 [0] NCCL INFO Channel 09 : 1[36000] -> 0[34000] via P2P/IPC
Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 [0] NCCL INFO Channel 09 : 0[34000] -> 1[36000] via P2P/IPC
Tue Sep 27 21:18:26 2022[1,1]<stdout>:3ea7bbc82c43:29030:29030 [0] NCCL INFO Channel 10 : 1[36000] -> 0[34000] via P2P/IPC
Tue Sep 27 21:18:26 2022[1,0]<stdout>:3ea7bbc82c43:29029:29029 [0] NCCL INFO Channel 10 : 0[34000] -> 1[36000] via P2P/IPC
Tue Sep 27 21:18:26 2022[1,1]<stdout>:3ea7bbc82c43:29030:29030 [0] NCCL INFO Channel 11 : 1[36000] -> 0[34000] via P2P/IPC
Tue Sep 27 21:18

Tue Sep 27 21:18:32 2022[1,1]<stdout>:3ea7bbc82c43:29030:29500 [0] NCCL INFO Channel 12 : 1[36000] -> 0[34000] via P2P/IPC
Tue Sep 27 21:18:32 2022[1,1]<stdout>:3ea7bbc82c43:29030:29500 [0] NCCL INFO Channel 13 : 1[36000] -> 0[34000] via P2P/IPC
Tue Sep 27 21:18:32 2022[1,1]<stdout>:3ea7bbc82c43:29030:29500 [0] NCCL INFO Channel 14 : 1[36000] -> 0[34000] via P2P/IPC
Tue Sep 27 21:18:32 2022[1,1]<stdout>:3ea7bbc82c43:29030:29500 [0] NCCL INFO Channel 15 : 1[36000] -> 0[34000] via P2P/IPC
Tue Sep 27 21:18:32 2022[1,0]<stdout>:3ea7bbc82c43:29029:29489 [0] NCCL INFO Launch mode Parallel
Tue Sep 27 21:18:32 2022[1,1]<stdout>:Iteration 0 finished. The following log will be printed every 1000 iterations.
Tue Sep 27 21:18:32 2022[1,0]<stdout>:Iteration 0 finished. The following log will be printed every 1000 iterations.
Tue Sep 27 21:18:45 2022[1,0]<stdout>:Iteration:1000	loss:0.692346	time:18.28s	throughput:0.03M
Tue Sep 27 21:18:45 2022[1,1]<stdout>:Iteration:1000	loss:0.689463	time:18.25s	t

                                                                                

history: None
