## Multi-Machine Training

Distributed training on multiple machines adds a further challenge: we need to communicate with servers that are only connected across a comparatively lower bandwidth fabric which can be over an order of magnitude slower in some cases. Synchronization across devices is tricky. After all, different machines running training code will have subtly different speed. Hence we need to *synchronize* them if we want to use synchronous distributed optimization. :numref:`fig_ps_multimachine` illustrates how distributed parallel training occurs.


![Multi-machine multi-GPU distributed parallel training.](../img/ps-multimachine.svg)
:label:`fig_ps_multimachine`


1. A (different) batch of data is read on each machine, split across multiple GPUs and transferred to GPU memory. Their predictions and gradients are computed on each GPU batch separately.
2. The gradients from all local GPUs are aggregated on one GPU (or alternatively parts of it are aggregated over different GPUs.
3. The gradients are sent to the CPU.
4. The CPU sends the gradients to a central parameter server which aggregates all the gradients.
5. The aggregate gradients are then used to update the weight vectors and the updated weight vectors are broadcast back to the individual CPUs.
6. The information is sent to one (or multiple) GPUs.
7. The updated weight vectors are spread across all GPUs. 



Each of these operations seems rather straightforward. And, indeed, they can be carried out efficiently *within* a single machine. Once we look at multiple machines, though, we can see that the central parameter server becomes the bottleneck. After all, the bandwidth per server is limited, hence for $m$ workers the time it takes to send all gradients to the server is $O(m)$. We can break through this barrier by increasing the number of servers to $n$. At this point each server only needs to store $O(1/n)$ of the parameters, hence the total time for updates and optimization becomes $O(m/n)$. Matching both numbers yields constant scaling regardless of how many workers we are dealing with. In practice we use the *same* machines both as workers and as servers. :numref:`fig_ps_multips` illustrates the design. See also :cite:`Li.Andersen.Park.ea.2014` for details. In particular, ensuring that multiple machines work without unreasonable delays is nontrivial. We omit details on barriers and will only briefly touch on synchronous and asynchronous updates below. 

![Top - a single parameter server is a bottleneck since its bandwidth is finite. Bottom - multiple parameter servers store parts of the parameters with aggregate bandwidth.](../img/ps-multips.svg)
:label:`fig_ps_multips`



Let's implement the above 7 operations in python. Here we will show an example of distributed training a ResNet18 network over two machines. As you may notice in the below code, rather than executing each code block in this jupyter notebook, we are writting the functions and parameters to a file named "multimachine_cifar10_train.py". This python script will be called later by another executor "launcher.py", which supports the multi-machine training flows.


Now, let's dive into each steps. First, importing all supported packages.

In [1]:
%%writefile multimachine_cifar10_train.py
from __future__ import print_function
import collections, math, os, random, shutil, sys, time
import d2l
import mxnet as mx
from mxnet import autograd, gluon, kv, init, np, npx
from mxnet.gluon.model_zoo import vision
from mxnet.gluon import nn

npx.set_np()


Overwriting multimachine_cifar10_train.py


This training will be relatively similar to :ref:`sec_kaggle_cifar10` in terms of the dataset and neural net architecture. Let's load and reorganize the dataset, which a small subset of CIFAR10.

In [2]:
%%writefile -a multimachine_cifar10_train.py

d2l.DATA_HUB['cifar10_tiny'] = (d2l.DATA_URL + 'kaggle_cifar10_tiny.zip',
                                '2068874e4b9a9f0fb07ebe0ad2b29754449ccacd')
data_dir = d2l.download_extract('cifar10_tiny')
train_data_size = 800
batch_size = 1

def reorg_cifar10_data(data_dir, valid_ratio):
    labels = d2l.read_csv_labels(data_dir + 'trainLabels.csv')
    d2l.reorg_train_valid(data_dir, labels, valid_ratio)
    d2l.reorg_test(data_dir)

reorg_cifar10_data(data_dir, valid_ratio = 0)

Appending to multimachine_cifar10_train.py


Next, we create a key-value store for distributed training on multiple machines with the keyword "dist".

In [3]:
%%writefile -a multimachine_cifar10_train.py

store = kv.create('dist')
# print("Total number of workers: %d" % store.num_workers)
# print("This worker's rank: %d" % store.rank)


Appending to multimachine_cifar10_train.py


In [4]:
%%writefile -a multimachine_cifar10_train.py

## Step 1: Define how to split a batch for multimachine training
class SplitBatchSampler(gluon.data.sampler.Sampler):
    
    def __init__(self, length, batch_size, num_parts=1, part_index=0, last_batch='keep'):
        self.part_len = length // num_parts
        self._batch_size = batch_size
        self.start = self.part_len * part_index
        self.end = self.start + self.part_len
        self._last_batch = last_batch
        self._prev = []

    def __iter__(self):
        indices = list(range(self.start, self.end))
        random.shuffle(indices)
        batch, self._prev = self._prev, []
        for i in indices:
            batch.append(i)
            if len(batch) == self._batch_size:
                yield batch
                batch = []
        if batch:
            if self._last_batch == 'keep':
                yield batch
            elif self._last_batch == 'discard':
                return
            elif self._last_batch == 'rollover':
                self._prev = batch
            else:
                raise ValueError(
                    "last_batch must be one of 'keep', 'discard', or 'rollover', " \
                    "but got %s"%self._last_batch)

    def __len__(self):
        if self._last_batch == 'keep':
            return (len(self._sampler) + self._batch_size - 1) // self._batch_size
        if self._last_batch == 'discard':
            return len(self._sampler) // self._batch_size
        if self._last_batch == 'rollover':
            return (len(self._prev) + len(self._sampler)) // self._batch_size
        raise ValueError(
            "last_batch must be one of 'keep', 'discard', or 'rollover', " \
            "but got %s"%self._last_batch)



Appending to multimachine_cifar10_train.py


In [5]:
%%writefile -a multimachine_cifar10_train.py

## Step 2: Create and build the training/test dataset

# Load the training and test data
transform_train = gluon.data.vision.transforms.Compose([
    # Magnify the image to a square of 40 pixels in both height and width
    gluon.data.vision.transforms.Resize(40),
    # Randomly crop a square image of 40 pixels in both height and width to
    # produce a small square of 0.64 to 1 times the area of the original
    # image, and then shrink it to a square of 32 pixels in both height and
    # width
    gluon.data.vision.transforms.RandomResizedCrop(32, scale=(0.64, 1.0),
                                                   ratio=(1.0, 1.0)),
    gluon.data.vision.transforms.RandomFlipLeftRight(),
    gluon.data.vision.transforms.ToTensor(),
    # Normalize each channel of the image
    gluon.data.vision.transforms.Normalize([0.4914, 0.4822, 0.4465],
                                           [0.2023, 0.1994, 0.2010])])

transform_test = gluon.data.vision.transforms.Compose([
    gluon.data.vision.transforms.ToTensor(),
    gluon.data.vision.transforms.Normalize([0.4914, 0.4822, 0.4465],
                                           [0.2023, 0.1994, 0.2010])])

train_ds, test_ds = [
    gluon.data.vision.ImageFolderDataset(data_dir + "train_valid_test/" + folder)
    for folder in ['train','test']]

train_iter = gluon.data.DataLoader(
    train_ds.transform_first(transform_train), 
    batch_sampler=SplitBatchSampler(train_data_size, batch_size, store.num_workers, store.rank, last_batch='keep'),
) 

test_iter = gluon.data.DataLoader(
    test_ds.transform_first(transform_test), batch_size, shuffle=False,
) 


Appending to multimachine_cifar10_train.py


In [6]:
%%writefile -a multimachine_cifar10_train.py

## Step 3: Define and initial Resnet
class Residual(nn.HybridBlock):
    def __init__(self, num_channels, use_1x1conv=False, strides=1, **kwargs):
        super(Residual, self).__init__(**kwargs)
        self.conv1 = nn.Conv2D(num_channels, kernel_size=3, padding=1,
                               strides=strides)
        self.conv2 = nn.Conv2D(num_channels, kernel_size=3, padding=1)
        if use_1x1conv:
            self.conv3 = nn.Conv2D(num_channels, kernel_size=1,
                                   strides=strides)
        else:
            self.conv3 = None
        self.bn1 = nn.BatchNorm()
        self.bn2 = nn.BatchNorm()

    def hybrid_forward(self, F, X):
        Y = F.npx.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3:
            X = self.conv3(X)
        return F.npx.relu(Y + X)
    
def resnet18(num_classes):
    net = nn.HybridSequential()
    net.add(nn.Conv2D(64, kernel_size=3, strides=1, padding=1),
            nn.BatchNorm(), nn.Activation('relu'))

    def resnet_block(num_channels, num_residuals, first_block=False):
        blk = nn.HybridSequential()
        for i in range(num_residuals):
            if i == 0 and not first_block:
                blk.add(Residual(num_channels, use_1x1conv=True, strides=2))
            else:
                blk.add(Residual(num_channels))
        return blk

    net.add(resnet_block(64, 2, first_block=True),
            resnet_block(128, 2),
            resnet_block(256, 2),
            resnet_block(512, 2))
    net.add(nn.GlobalAvgPool2D(), nn.Dense(num_classes))
    return net



Appending to multimachine_cifar10_train.py


In [7]:
%%writefile -a multimachine_cifar10_train.py

## Step 4: Define the training function
def train(net, train_iter, test_iter, num_epochs, lr, wd, ctx, lr_period,
          lr_decay):

    # Use SGD optimizer. Ask trainer to use the distributor kv store.
    trainer = gluon.Trainer(net.collect_params(), 'sgd', 
                            {'learning_rate': lr, 'momentum': 0.9, 'wd': wd}, 
                            kvstore=store)
    loss = gluon.loss.SoftmaxCrossEntropyLoss()

    for epoch in range(num_epochs):
        train_l_sum, train_acc_sum, n, start = 0.0, 0.0, 0, time.time()
        if epoch > 0 and epoch % lr_period == 0:
            trainer.set_learning_rate(trainer.learning_rate * lr_decay)
        for X, y in train_iter:
            y = y.astype('float32').as_in_context(ctx)
            with autograd.record():
                y_hat = net(X.as_in_context(ctx))
                l = loss(y_hat, y).sum()
            l.backward()
            trainer.step(batch_size)
            train_l_sum += float(l)
            train_acc_sum += float((y_hat.argmax(axis=1) == y).sum())
            n += y.size
        time_s = "time %.2f sec" % (time.time() - start)
#         if valid_iter is not None:
#             valid_acc = d2l.evaluate_accuracy_gpu(net, valid_iter)
#             epoch_s = ("epoch %d, loss %f, train acc %f, valid acc %f, "
#                        % (epoch + 1, train_l_sum / n, train_acc_sum / n,
#                           valid_acc))
#         else:
        test_acc = d2l.evaluate_accuracy_gpu(net, test_iter)
        epoch_s = ("epoch %d, loss %f, train acc %f, test acc %f" %
                   (epoch + 1, train_l_sum / n, train_acc_sum / n, test_acc))
        print(epoch_s + time_s + ', lr ' + str(trainer.learning_rate))
        sys.stdout.flush()
        
# def main():
#     parser = argparse.ArgumentParser(description='Launch a distributed job')
#     parser.add_argument('-e', '--num_epochs', required=True, type=int,
#                         help = 'number of epochs to train')
#     parser.add_argument('-lr', '--learning_rate', required=True, type=float,
#                         help = 'learning rate control how much an updating step influences \
#                         the current value of the weights')
#     parser.add_argument('-wd', '--weight_decay', type=float,
#                         help = 'weight decay adds a penalty term to the loss function \
#                         to reduce the complexity of the learned model (weights)')
#     parser.add_argument('--lr_decay', default=0.1, type=float,
#                         help = 'The scope of learning rate decay at a learning rate peroid')
#     parser.add_argument('--lr_period', default=80, type=int,
#                         help = 'The frequency that the learning rate will decay with \
#                         a scope of lr_decay')
#     train(net, train_iter, test_iter, num_epochs, lr, wd, ctx, lr_period, lr_decay)

    

Appending to multimachine_cifar10_train.py


In [8]:
%%writefile -a multimachine_cifar10_train.py

if __name__ == '__main__':
    ctx = d2l.try_gpu()
    num_epochs, lr, wd = 1, 0.1, 5e-4
    lr_period, lr_decay  = 80, 0.1

    num_classes = 10
    net = resnet18(num_classes)
    net.collect_params().initialize(mx.init.Xavier(), ctx=ctx)
    net.hybridize()
    train(net, train_iter, test_iter, num_epochs, lr, wd, ctx, lr_period, lr_decay)

Appending to multimachine_cifar10_train.py


In [9]:
launcher_path = "/home/ubuntu/miniconda3/envs/d2l/lib/python3.7/site-packages/mxnet/tools/launch.py"
current_dir = !pwd


!python {launcher_path} \
    -n 2 -s 2 \
    --sync-dst-dir {current_dir} \
    --launcher local "python multimachine_cifar10_train.py"

[23:10:09] src/operator/nn/./cudnn/./cudnn_algoreg-inl.h:97: Running performance tests to find the best convolution algorithm, this can take a while... (set the environment variable MXNET_CUDNN_AUTOTUNE_DEFAULT to 0 to disable)
[23:10:09] src/operator/nn/./cudnn/./cudnn_algoreg-inl.h:97: Running performance tests to find the best convolution algorithm, this can take a while... (set the environment variable MXNET_CUDNN_AUTOTUNE_DEFAULT to 0 to disable)
epoch 1, loss 8815.827673, train acc 0.120000, test acc 0.000000time 38.27 sec, lr 0.1
epoch 1, loss 5286.654214, train acc 0.127500, test acc 0.000000time 38.20 sec, lr 0.1
terminate called without an active exception
terminate called without an active exception
^C
2020-02-09 23:11:04,860 INFO Stop launcher
