# Multi-GPU training

## total number of GPUs

In [2]:
import torch

print("There are", torch.cuda.device_count(), "gpus available.")

There are 4 gpus available.


In [3]:
print("The current gpu is", torch.cuda.current_device())

The current gpu is 0


## Run computations on a single gpu asynchrously

In [4]:
import time

def run(x):
    return [x.mm(x) for _ in range(50)]


x_gpu0 = torch.rand(size=(4000, 4000), device=torch.device("cuda:0"))
start = time.time()
run(x_gpu0)
end = time.time()
print("cuda:0", end-start)

cuda:0 0.24007248878479004


## Run computations on a single gpu synchronously

In [5]:
x_gpu0 = torch.rand(size=(4000, 4000), device=torch.device("cuda:0"))
start = time.time()
run(x_gpu0)
torch.cuda.synchronize()
end = time.time()
print("cuda:0", end-start)

cuda:0 0.4931313991546631


## Run computations on 4 GPUs 

### Synchronously

In [6]:
class BenchMark(object):
    def __init__(self, name):
        self.name = name
    
    def __enter__(self):
        self.start = time.time()
    
    def __exit__(self, type, value, trace):
        self.end = time.time()
        print(self.name, self.end-self.start)

In [7]:
x_gpus = []
for i in range(torch.cuda.device_count()):
    device = torch.device("cuda:"+str(i))
    x_gpus.append(torch.rand(size=(4000, 4000), device=device))

start = time.time()
for i in range(torch.cuda.device_count()):
    with BenchMark("cuda:"+str(i)):
        run(x_gpus[i])
        torch.cuda.synchronize(device=torch.device("cuda:"+str(i)))
end = time.time()
print("Total time elapsed is", end-start)

cuda:0 0.49277520179748535
cuda:1 0.5054032802581787
cuda:2 0.49334192276000977
cuda:3 0.5074658393859863
Total time elapsed is 2.0000052452087402


### Asynchronously

In [8]:
with BenchMark("all gpus"):
    for i in range(torch.cuda.device_count()):
        run(x_gpus[i])
    for i in range(torch.cuda.device_count()):
        torch.cuda.synchronize()

all gpus 0.49680399894714355


## Tensor copy

In [9]:
def copy_to_cpu(x, non_blocking=False):
    return [y.to('cpu', non_blocking=non_blocking) for y in x]

with BenchMark('Run on GPU0'):
    y = run(x_gpus[0])
    torch.cuda.synchronize()
    
with BenchMark('Copy to CPU'):
    y_cpu = copy_to_cpu(y)
    torch.cuda.synchronize()

Run on GPU0 0.49151015281677246
Copy to CPU 2.44549560546875


In [13]:
def copy_to_cpu(x, non_blocking=False):
    return [y.to('cpu', non_blocking=non_blocking) for y in x]
    
with BenchMark('Copy to CPU'):
    y = run(x_gpus[0])
    y_cpu = copy_to_cpu(y, non_blocking=True)
    torch.cuda.synchronize()

Copy to CPU 0.7369072437286377


## Different ways of distributed training

There are mainly 3 ways to train a model in multiple GPUs:
1. Network Partitioning. The layers are assigned to different GPUs. 
2. Layer-wise Partitioning. For instance, each GPU train diffrent channels of the input.
3. Data parallelism. Each GPU trains a subset of a minibatch(recommended).
The first and second method are usually for big models that cannot fit in a single GPU. Thay both need a lot of data transfer and synchronization among GPUs.
![](./splitting.svg)

## Data parallelism

![](./data-parallel.svg)

## Multi-GPU traning

In [1]:
%matplotlib inline
import torch
from torch import nn
from torch.nn import functional as F

### Define the model

In [2]:
# Initialize model parameters
scale = 0.01
W1 = torch.randn(size=(20, 1, 3, 3)) * scale
b1 = torch.zeros(20)
W2 = torch.randn(size=(50, 20, 5, 5)) * scale
b2 = torch.zeros(50)
W3 = torch.randn(size=(800, 128)) * scale
b3 = torch.zeros(128)
W4 = torch.randn(size=(128, 10)) * scale
b4 = torch.zeros(10)
params = [W1, b1, W2, b2, W3, b3, W4, b4]

# Define the model
def lenet(X, params):
    h1_conv = F.conv2d(input=X, weight=params[0], bias=params[1])
    h1_activation = F.relu(h1_conv)
    h1 = F.avg_pool2d(input=h1_activation, kernel_size=(2, 2), stride=(2, 2))
    h2_conv = F.conv2d(input=h1, weight=params[2], bias=params[3])
    h2_activation = F.relu(h2_conv)
    h2 = F.avg_pool2d(input=h2_activation, kernel_size=(2, 2), stride=(2, 2))
    h2 = h2.reshape(h2.shape[0], -1)
    h3_linear = torch.mm(h2, params[4]) + params[5]
    h3 = F.relu(h3_linear)
    y_hat = torch.mm(h3, params[6]) + params[7]
    return y_hat

# Cross-entropy loss function
loss = nn.CrossEntropyLoss(reduction='none')

### Data synchoronization

In [4]:
def copy_params(params, device):
    new_params = [p.to(device) for p in params]
    for p in new_params:
        p.requires_grad_()
    return new_params

all_params = []
for i in range(torch.cuda.device_count()):
    device = torch.device("cuda:"+str(i))
    all_params.append(copy_params(params, device))

In [7]:
print('b1 weight:', all_params[1][1])
print('b1 grad:', all_params[1][1].grad)

b1 weight: tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       device='cuda:1', requires_grad=True)
b1 grad: None


### Aggregate gradients and brocast them to all GPUs

In [8]:
def allreduce(data):
    for i in range(1, len(data)):
        data[0][:] += data[i].to(data[0].device)
    for i in range(1, len(data)):
        data[i][:] = data[0].to(data[i].device)

## Distribute Data

In [11]:
data = torch.arange(20).reshape(4, 5)
devices = [torch.device('cuda:0'), torch.device('cuda:1'),torch.device('cuda:2'), torch.device('cuda:3')]
split = nn.parallel.scatter(data, devices)
print('input :', data)
print('load into', devices)
print('output:', split)

input : tensor([[ 0,  1,  2,  3,  4],
        [ 5,  6,  7,  8,  9],
        [10, 11, 12, 13, 14],
        [15, 16, 17, 18, 19]])
load into [device(type='cuda', index=0), device(type='cuda', index=1), device(type='cuda', index=2), device(type='cuda', index=3)]
output: (tensor([[0, 1, 2, 3, 4]], device='cuda:0'), tensor([[5, 6, 7, 8, 9]], device='cuda:1'), tensor([[10, 11, 12, 13, 14]], device='cuda:2'), tensor([[15, 16, 17, 18, 19]], device='cuda:3'))


In [12]:
def split_batch(X, y, devices):
    """Split `X` and `y` into multiple devices."""
    assert X.shape[0] == y.shape[0]
    return (nn.parallel.scatter(X, devices),
            nn.parallel.scatter(y, devices))

### Training

In [13]:
def train_batch(X, y, device_params, devices, lr):
    X_shards, y_shards = split_batch(X, y, devices)
    # Loss is calculated separately on each GPU
    ls = [loss(lenet(X_shard, device_W), y_shard).sum()
          for X_shard, y_shard, device_W in zip(
              X_shards, y_shards, device_params)]
    for l in ls:  # Backpropagation is performed separately on each GPU
        l.backward()
    # Sum all gradients from each GPU and broadcast them to all GPUs
    with torch.no_grad():
        for i in range(len(device_params[0])):
            allreduce([device_params[c][i].grad for c in range(len(devices))])
    # The model parameters are updated separately on each GPU
    for param in device_params:
        d2l.sgd(param, lr, X.shape[0]) # Here, we use a full-size batch