# Lab 1: Training with multiple GPUs from scratch

Adapted from https://gluon.mxnet.io/chapter07_distributed-learning/multiple-gpus-scratch.html

This tutorial shows how we can increase performance by distributing training across multiple GPUs.

* This tutorial requires at least 2 GPUs.
* An NVIDIA driver must be installed on the machine.
* MXNet must be installed with gpu capabilities (https://www.nvidia.com/en-us/data-center/gpu-accelerated-applications/mxnet/)
* Tested on a Amazon SageMaker notebook p3.8xlarge instance. 

![](../img/multi-gpu.svg)

Start by checking the configuration of the GPUs installed on your machine:

In [1]:
!nvidia-smi

Tue Jul 10 14:40:47 2018       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 384.111                Driver Version: 384.111                   |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|   0  Tesla V100-SXM2...  On   | 00000000:00:1B.0 Off |                    0 |
| N/A   49C    P0    53W / 300W |      0MiB / 16152MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  Tesla V100-SXM2...  On   | 00000000:00:1C.0 Off |                    0 |
| N/A   48C    P0    50W / 300W |      0MiB / 16152MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   2  Tesla V100-SXM2...  On   | 00000000:00:1D.0 Off |                    

We want to use all of the GPUs on together for the purpose of significantly speeding up training (in terms of wall clock). 
Remember that CPUs and GPUs each can have multiple cores. 
CPUs on a laptop might have 2 or 4 cores, and on a server might have up to 16 or 32 cores. 
GPUs tend to have many more cores - an NVIDIA Tesla V100 GPU has 5120 - but run at slower clock speeds. 
Exploiting the parallelism across the GPU cores is how GPUs get their speed advantage in the first place. 

As compared to the single CPU or single GPU setting where all the cores are typically used by default,
parallelism across devices is a little more complicated.
That's because most layers of a neural network can only run on a single device. 
So, in order to parallelize across devices, we need to do a little extra.
Therefore, we need to do some additional work to partition a workload across multiple GPUs. 
This can be done in a few ways.

## Data Parallelism

For deep learning, data parallelism is by far the most widely used approach for partitioning workloads. 
It works like this: Assume that we have *k* GPUs. We split the examples in a data batch into *k* parts,
and send each part to a different GPUs which then computes the gradient that part of the batch. 
Finally, we collect the gradients from each of the GPUs and sum them together before updating the weights.

The following pseudo-code shows how to train one data batch on *k* GPUs.


```
def train_batch(data, k):
    split data into k parts
    for i = 1, ..., k:  # run in parallel
        compute grad_i w.r.t. weight_i using data_i on the i-th GPU
    grad = grad_1 + ... + grad_k
    for i = 1, ..., k:  # run in parallel
        copy grad to i-th GPU
        update weight_i by using grad
```

Next we will present how to implement this algorithm from scratch.


## Automatic Parallelization

We first demonstrate how to run workloads in parallel. 
Writing parallel code in Python in non-trivial, but fortunately, 
MXNet is able to automatically parallelize the workloads. 
Two technologies help to achieve this goal.

First, workloads, such as `nd.dot` are pushed into the backend engine for *lazy evaluation*. 
That is, Python merely pushes the workload `nd.dot` and returns immediately 
without waiting for the computation to be finished. 
We keep pushing until the results need to be copied out from MXNet, 
such as `print(x)` or are converted into numpy by `x.asnumpy()`. 
At that time, the Python thread is blocked until the results are ready.

In [2]:
import mxnet as mx
from mxnet import nd
from time import time
from mxnet import gpu
from mxnet import cpu
from mxnet import gluon
from mxnet.test_utils import get_mnist
from mxnet.io import NDArrayIter

### Task 1: Lazy Evaluation.

Create a random NDArray and multiply it with itself:

Fill in the lines in the next cell to:
1. Create a NDArray of size 2000x2000 called x.
2. Multiply the array by itself and assign the result to y.
3. Trigger the execution by converting to a numpy array. 

In [3]:
start = time()
# 2 lines, create the ndarray x, multiply it by itself and assign to y
x = nd.random_uniform(shape=(2000,2000))
y = nd.dot(x, x)
print('=== workloads are pushed into the backend engine ===\n%f sec' % (time() - start))
# 1 line, convert to numpy and assign to z
z = y.asnumpy()
print('=== workloads are finished ===\n%f sec' % (time() - start))
print(z.shape)

=== workloads are pushed into the backend engine ===
0.000944 sec
=== workloads are finished ===
0.057008 sec
(2000, 2000)


#### Expected Output
```
=== workloads are pushed into the backend engine ===
0.002730 sec
=== workloads are finished ===
0.048943 sec
(2000, 2000)
```

Second, MXNet depends on a powerful scheduling algorithm that analyzes the dependencies of the pushed workloads.
This scheduler checks to see if two workloads are independent of each other.
If they are, then the engine may run them in parallel.
If a workload depend on results that have not yet been computed, it will be made to wait until its inputs are ready.

For example, if we call three operators:

```
a = nd.random_uniform(...)
b = nd.random_uniform(...)
c = a + b
```

Then the computation for `a` and `b` may run in parallel, 
while `c` cannot be computed until both `a` and `b` are ready. 

The following code shows that the engine effectively parallelizes the `dot` operations on two GPUs:

### Task 2: Parallel Computation

Create an array stored on a GPU, run a complex job in series and in parallel:

1. Create a random NDArray of size 4000x4000 on gpu(0)
2. Create a copy of the array on gpu(1)
3. Run the workload in series and parallel

In [4]:
def run(x):
    """push 10 matrix-matrix multiplications"""
    return [nd.dot(x,x) for i in range(10)]

def wait(x):
    """explicitly wait until all results are ready"""
    for y in x:
        y.wait_to_read()

# 1 line create the 4000x4000 random array on gpu(0)
x0 = nd.random_uniform(shape=(4000, 4000), ctx=gpu(0))
# 1 line copy the array to gpu(1) and set as x1
x1 = x0.copyto(gpu(1))

print('=== Run on GPU 0 and 1 in sequential ===')
start = time()
wait(run(x0))
wait(run(x1))
print('time: %f sec' %(time() - start))

print('=== Run on GPU 0 and 1 in parallel ===')
start = time()
y0 = run(x0)
y1 = run(x1)
wait(y0)
wait(y1)
print('time: %f sec' %(time() - start))

=== Run on GPU 0 and 1 in sequential ===
time: 2.020221 sec
=== Run on GPU 0 and 1 in parallel ===
time: 0.103789 sec


#### Expected Output
```
=== Run on GPU 0 and 1 in sequential ===
time: 2.022742 sec
=== Run on GPU 0 and 1 in parallel ===
time: 0.099376 sec
```

### Task 3: Parallel Computation 2

Run on gpu and copy back to the CPU sequentially and then in parallel

1. Run the workload on the created array xo and assign to y0
2. Wait on the results of the computation
3. Copy the results to the cpu
4. Wait for the copy to complete

5. Run the workload on the created array xo
6. Copy the results to the cpu in parallel
7. Wait for the parallel operations to complete.

In [5]:
def copy(x, ctx):
    """copy data to a device"""
    return [y.copyto(ctx) for y in x]

print('=== Run on GPU 0 and then copy results to CPU in sequential ===')
start = time()
# 1 line run the workload on array xo and assign to y
y0 = run(x0)
# 1 line wait for the computation to complete
wait(y0)
# 1 line copy y0 back to the cpu
z0 = copy(y0, cpu())
wait(z0)
print(time() - start)

print('=== Run and copy in parallel ===')
start = time()
y0 = run(x0)
z0 = copy(y0, cpu())
wait(z0)
print(time() - start)

=== Run on GPU 0 and then copy results to CPU in sequential ===
0.5026967525482178
=== Run and copy in parallel ===
0.42870044708251953


#### Expected Output
```
=== Run on GPU 0 and then copy results to CPU in sequential ===
0.5133333206176758
=== Run and copy in parallel ===
0.4283921718597412
```

### Task 4: Define CNN model and updater

We will use the convolutional neural networks and plain SGD introduced in cnn-scratch as an example workload.

The task is to initialise the weights, define the first layer of the network, and set the loss function.

1. Create the parameter array for the first layer of shape=(20,1,3,3), use random_normal() with a scale of 0.01
2. Create the bias parameter array of length 20, filled with zeros
3. Create the first convolutional layer with a 3x3 kernel and 20 filters.
4. Define the loss function using SoftmaxCrossEntropyLoss.

In [6]:
# initialize parameters
scale = .01
# 1 line: Create the parameter array W1 filled by random_normal() with shape (20,1,3,3), multiply by scale (0.01)
W1 = nd.random_normal(shape=(20,1,3,3))*scale
# 1 line: Initialise the bias parameter array with zeros, length 20
b1 = nd.zeros(shape=20)
W2 = nd.random_normal(shape=(50,20,5,5))*scale
b2 = nd.zeros(shape=50)
W3 = nd.random_normal(shape=(800,128))*scale
b3 = nd.zeros(shape=128)
W4 = nd.random_normal(shape=(128,10))*scale
b4 = nd.zeros(shape=10)
params = [W1, b1, W2, b2, W3, b3, W4, b4]

# network and loss
def lenet(X, params):
    # 1 line: Create the first layer using nd.Convolution, 3x3 kernel, and 20 filters
    h1_conv = nd.Convolution(data=X, weight=params[0], bias=params[1], kernel=(3,3), num_filter=20)
    h1_activation = nd.relu(h1_conv)
    h1 = nd.Pooling(data=h1_activation, pool_type="max", kernel=(2,2), stride=(2,2))
    # second conv
    h2_conv = nd.Convolution(data=h1, weight=params[2], bias=params[3], kernel=(5,5), num_filter=50)
    h2_activation = nd.relu(h2_conv)
    h2 = nd.Pooling(data=h2_activation, pool_type="max", kernel=(2,2), stride=(2,2))
    h2 = nd.flatten(h2)
    # first fullc
    h3_linear = nd.dot(h2, params[4]) + params[5]
    h3 = nd.relu(h3_linear)
    # second fullc    
    yhat = nd.dot(h3, params[6]) + params[7]    
    return yhat

# 1 line: Define the loss function using SoftmaxCrossEntropyLoss
loss = gluon.loss.SoftmaxCrossEntropyLoss()

# plain SGD
def SGD(params, lr):
    for p in params:
        p[:] = p - lr * p.grad

## Task 5: Define a Function to Copy Parameters to a GPU

Create a function to copy the given parameters to a given gpu and initialise the gradients. 

1. In the list comprehension, copy each parameter array to the given gpu.
2. Use the ndarray.attach_grad() function to initialise the gradients for each parameter array.

In [7]:
def get_params(params, ctx):
    # 1 element: copy each parameter to the given gpu
    new_params = [p.copyto(ctx) for p in params]
    for p in new_params:
        # 1 line: initialise the gradients.
        p.attach_grad()
    return new_params

new_params = get_params(params, gpu(0))
print('=== copy b1 to GPU(0) ===\nweight = {}\ngrad = {}'.format(
    new_params[1], new_params[1].grad))

=== copy b1 to GPU(0) ===
weight = 
[ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
  0.  0.]
<NDArray 20 @gpu(0)>
grad = 
[ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
  0.  0.]
<NDArray 20 @gpu(0)>


#### Expected Output
```
=== copy b1 to GPU(0) ===
weight = 
[ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
  0.  0.]
<NDArray 20 @gpu(0)>
grad = 
[ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
  0.  0.]
<NDArray 20 @gpu(0)>
```

### Task 6: Create a function to sum and broadcast the results

Given a list of data that spans multiple GPUs, we then define a function to sum the gradients from all GPUs 
and broadcast the result to each GPU. 

1. Copy each array in the input to the same GPU as the first element.
2. In the same line sum all the arrays in the input
3. Copy the summed result to all GPUs

In [8]:
def allreduce(data):
    # sum on data[0].context, and then broadcast
    for i in range(1, len(data)):
        # 1 line: Copy each array in the input to data[0].context and sum all arrays
        data[0][:] += data[i].copyto(data[0].context)
    for i in range(1, len(data)):
        # 1 line: Copy this sum back to all the GPUs
        data[0].copyto(data[i])        

data = [nd.ones((1,2), ctx=gpu(i))*(i+1) for i in range(2)]
print("=== before allreduce ===\n {}".format(data))
allreduce(data)
print("\n=== after allreduce ===\n {}".format(data))

=== before allreduce ===
 [
[[ 1.  1.]]
<NDArray 1x2 @gpu(0)>, 
[[ 2.  2.]]
<NDArray 1x2 @gpu(1)>]

=== after allreduce ===
 [
[[ 3.  3.]]
<NDArray 1x2 @gpu(0)>, 
[[ 3.  3.]]
<NDArray 1x2 @gpu(1)>]


#### Expected Output
```
=== before allreduce ===
 [
[[ 1.  1.]]
<NDArray 1x2 @gpu(0)>, 
[[ 2.  2.]]
<NDArray 1x2 @gpu(1)>]

=== after allreduce ===
 [
[[ 3.  3.]]
<NDArray 1x2 @gpu(0)>, 
[[ 3.  3.]]
<NDArray 1x2 @gpu(1)>]
```

### Task 7: Split into Batches and Copy to GPUs

Given a data batch, we define a function that splits this batch and copies each part into the corresponding GPU.

1. In the list comprehension copy each data batch to the correct GPU.

In [9]:
def split_and_load(data, ctx):
    n, k = data.shape[0], len(ctx)
    assert (n//k)*k == n, '# examples is not divided by # devices'
    idx = list(range(0, n+1, n//k))
    # 1 element: Copy each data batch to the correct GPU
    return [data[idx[i]:idx[i+1]].as_in_context(ctx[i]) for i in range(k)]

batch = nd.arange(16).reshape((4,4))
print('=== original data ==={}'.format(batch))
ctx = [gpu(0), gpu(1)]
splitted = split_and_load(batch, ctx)
print('\n=== splitted into {} ==={}\n{}'.format(ctx, splitted[0], splitted[1]))

=== original data ===
[[  0.   1.   2.   3.]
 [  4.   5.   6.   7.]
 [  8.   9.  10.  11.]
 [ 12.  13.  14.  15.]]
<NDArray 4x4 @cpu(0)>

=== splitted into [gpu(0), gpu(1)] ===
[[ 0.  1.  2.  3.]
 [ 4.  5.  6.  7.]]
<NDArray 2x4 @gpu(0)>

[[  8.   9.  10.  11.]
 [ 12.  13.  14.  15.]]
<NDArray 2x4 @gpu(1)>


### Task 8: Create a Function to Train on a Single Batch

To train the batch the following is performed:

1. Split the batch up equally and load onto the GPUs using our split_and_load(data,ctx) function.
2. Run forward and backward passes on each GPU.
3. Sum all the gradients on each GPU and broadcast back to each GPU.
4. Update the parameters using the gradients.

A lot of that is already done, the task is to:

1. Split and load the batch data and labels onto the GPUs.
2. Calculate the gradients on each GPU.
3. Use the allreduce function to sum the gradients on all GPUs and broadcast the sum back.

In [10]:
def train_batch(batch, params, ctx, lr):
    # split the data batch and load them on GPUs
    # 2 lines: use the split_and_load function to split and copy data and label onto the GPUs
    data = split_and_load(batch.data[0], ctx)
    label = split_and_load(batch.label[0], ctx)
    # run forward on each GPU
    with mx.autograd.record():
        losses = [loss(lenet(X, W), Y) 
                  for X, Y, W in zip(data, label, params)]
    # run backward on each gpu
    for l in losses:
        # 1 line calculate the gradients on each GPU
        l.backward()
    # aggregate gradient over GPUs
    for i in range(len(params[0])):
        # 1-3 lines: use the allreduce function to sum the gradients on all GPUs and
        # broadcast the sum back to all GPUs.
        # Hint: pass a list to allreduce of params[j][i] for each j in the number of GPUs
        allreduce([params[c][i].grad for c in range(len(ctx))])
    # update parameters with SGD on each GPU
    for p in params:
        SGD(p, lr/batch.data[0].shape[0])

For inference, we simply let it run on the first GPU. We leave a data parallelism implementation as an exercise.

In [11]:
def valid_batch(batch, params, ctx):
    data = batch.data[0].as_in_context(ctx[0])
    pred = nd.argmax(lenet(data, params[0]), axis=1)
    return nd.sum(pred == batch.label[0].as_in_context(ctx[0])).asscalar()

### Task 9: Put it All Together to Train and Validate on MNIST

It is ready to put together to train and validate. It is run on the MNIST dataset.

1. Load the MNIST dataset
2. Copy the parameters to the GPUs
3. Iterate over the dataset 5 times, 5 epochs
4. Split into batches
5. Use the previous function to train on a batch
6. Check the accuracy on the validation set after each epoch

In [12]:
def run(num_gpus, batch_size, lr):    
    # the list of GPUs will be used
    ctx = [gpu(i) for i in range(num_gpus)]
    print('Running on {}'.format(ctx))
    
    # data iterator
    mnist = get_mnist()
    train_data = NDArrayIter(mnist["train_data"], mnist["train_label"], batch_size)
    valid_data = NDArrayIter(mnist["test_data"], mnist["test_label"], batch_size)
    print('Batch size is {}'.format(batch_size))
    
    # copy parameters to all GPUs
    dev_params = [get_params(params, c) for c in ctx]
    for epoch in range(5):
        # train
        start = time()
        train_data.reset()
        for batch in train_data:
            train_batch(batch, dev_params, ctx, lr)
        nd.waitall()  # wait all computations are finished to benchmark the time
        print('Epoch %d, training time = %.1f sec'%(epoch, time()-start))
        
        # validating
        valid_data.reset()
        correct, num = 0.0, 0.0
        for batch in valid_data:
            correct += valid_batch(batch, dev_params, ctx)
            num += batch.data[0].shape[0]                
        print('         validation accuracy = %.4f'%(correct/num))


First run on a single GPU with batch size 64.

In [25]:
run(1, 64, 0.3)

Running on [gpu(0)]
Batch size is 64
Epoch 0, training time = 2.7 sec
         validation accuracy = 0.9622
Epoch 1, training time = 2.7 sec
         validation accuracy = 0.9763
Epoch 2, training time = 2.7 sec
         validation accuracy = 0.9807
Epoch 3, training time = 2.7 sec
         validation accuracy = 0.9830
Epoch 4, training time = 2.7 sec
         validation accuracy = 0.9850


* Often it is necessary to increase the batch size when running on multiple GPUs, so each one works on a large enough sample.
* Increasing the batch size can decrease the convergence rate, the learning rate can be increased or more epochs can be run

In [26]:
run(2, 64, 0.3)

Running on [gpu(0), gpu(1)]
Batch size is 64
Epoch 0, training time = 6.6 sec
         validation accuracy = 0.9642
Epoch 1, training time = 6.6 sec
         validation accuracy = 0.9775
Epoch 2, training time = 6.6 sec
         validation accuracy = 0.9812
Epoch 3, training time = 6.6 sec
         validation accuracy = 0.9830
Epoch 4, training time = 6.6 sec
         validation accuracy = 0.9833


## Congratulations!

You have learnt how to implement data parallelism across multiple GPUs using MXNet from scratch. Some issues were uncovered with more communication overhead than computational speedup. The next lab covers the same using Gluon which is much simpler to implement and further optimised. 

## Next
[Training with multiple GPUs with gluon](../chapter07_distributed-learning/multiple-gpus-gluon.ipynb)

For whinges or inquiries, [open an issue on  GitHub.](https://github.com/edenduthie/scalable_mxnet)