## Chapter 13 : Computation
- compiler
  - net = torch.jit.script(net)
- Automatic Parallesim
  - y.to('cpu', non_blocking=non_blocking) for y in x, will return x[i-1] when calculate x[i]
- Tranning on multiple GPU
  - <img alt="Partion Methods" src="https://d2l.ai/_images/splitting.svg" style="background-color: white; display: inline-block;"> Partion Methods
  - nn.parallel.scatter to split data to different devices
  - 显式同步（torch.cuda.synchronize()）仅在需要精确测量执行时间或调试异步错误时必要，其他情况会自己根据cpu或者后续数据需求隐式调用
- Concise impl :
  - What we need to do
    - Network parameters need to be initialized across all devices.
    - While iterating over the dataset minibatches are to be divided across all devices.
    - We compute the loss and its gradient in parallel across devices.
    - Gradients are aggregated and parameters are updated accordingly.
  - Use torch.nn.parallel.DistributedDataParallel
- Parameter Server
  - <img alt="Parameter Exchange" src="https://d2l.ai/_images/ps-distributed.svg" style="background-color: white; display: inline-block;">
  - last graph above assume gradient can be divided into four parts, and exchange each one of them each GPU.
  - Ring Synchronization
  - Key–Value Stores

<!-- <img alt="ResNeXt Block" src="https://d2l.ai/_images/rnn.svg" style="background-color: white; display: inline-block;"> -->
<!-- <img alt="ResNeXt Block" src="https://d2l.ai/_images/rnn-bptt.svg" style="background-color: white; display: inline-block;"> -->

In [1]:
import collections
import math
import torch
from torch import nn
from torch.nn import functional as F
from torchinfo import summary
import matplotlib.pyplot as plt
from matplotlib_inline import backend_inline
import numpy as np
import time
import os
backend_inline.set_matplotlib_formats('svg')

In [10]:
import subprocess

In [8]:
# GPU计算热身
device = 'cuda'
a = torch.randn(size=(1000, 1000), device=device)
b = torch.mm(a, a)

with Benchmark('numpy'):
    for _ in range(10):
        a = np.random.normal(size=(1000, 1000))
        b = np.dot(a, a)

with Benchmark('numpy2'):
    a = np.random.normal(size=(1000, 1000))
    for _ in range(10):
        b = np.dot(a, a)
        
with Benchmark('torch'):
    for _ in range(10):
        a = torch.randn(size=(1000, 1000), device=device)
        b = torch.mm(a, a)

with Benchmark('torch2'):
    a = torch.randn(size=(1000, 1000), device=device)
    for _ in range(10):
        b = torch.mm(a, a)

numpy: 0.246553 sec
numpy2: 0.109070 sec
torch: 0.001004 sec
torch2: 0.000000 sec


In [9]:
with Benchmark():
    for _ in range(10):
        a = torch.randn(size=(1000, 1000), device=device)
        b = torch.mm(a, a)
    torch.cuda.synchronize(device)

Done: 0.012063 sec


In [None]:
'''
compute x[i], and return x[i-1] in the same time
'''
def copy_to_cpu(x, non_blocking=False):
    return [y.to('cpu', non_blocking=non_blocking) for y in x]

In [None]:
'''
Data Parallel
  allreduce : add gradient to device 0, and broadcast the result to every devices

  split_batch : 将X和y拆分到多个设备上
'''
def get_params(params, device):
    new_params = [p.to(device) for p in params]
    for p in new_params:
        p.requires_grad_()
    return new_params
    
def allreduce(data):
    for i in range(1, len(data)):
        data[0][:] += data[i].to(data[0].device)
    for i in range(1, len(data)):
        data[i][:] = data[0].to(data[i].device)
        
def split_batch(X, y, devices):
    assert X.shape[0] == y.shape[0]
    return (nn.parallel.scatter(X, devices),
            nn.parallel.scatter(y, devices))

def train_batch(X, y, device_params, devices, lr):
    X_shards, y_shards = split_batch(X, y, devices)
    # Loss is calculated separately on each GPU
    ls = [loss(lenet(X_shard, device_W), y_shard).sum()
          for X_shard, y_shard, device_W in zip(
              X_shards, y_shards, device_params)]
    for l in ls:  # Backpropagation is performed separately on each GPU
        l.backward()
    # Sum all gradients from each GPU and broadcast them to all GPUs
    with torch.no_grad():
        for i in range(len(device_params[0])):
            allreduce([device_params[c][i].grad for c in range(len(devices))])
    # The model parameters are updated separately on each GPU
    for param in device_params:
        d2l.sgd(param, lr, X.shape[0]) # Here, we use a full-size batch

def train(num_gpus, batch_size, lr):
    train_iter, test_iter = d2l.load_data_fashion_mnist(batch_size)
    devices = [d2l.try_gpu(i) for i in range(num_gpus)]
    # Copy model parameters to `num_gpus` GPUs
    device_params = [get_params(params, d) for d in devices]
    num_epochs = 10
    animator = d2l.Animator('epoch', 'test acc', xlim=[1, num_epochs])
    timer = d2l.Timer()
    for epoch in range(num_epochs):
        timer.start()
        for X, y in train_iter:
            # Perform multi-GPU training for a single minibatch
            train_batch(X, y, device_params, devices, lr)
            torch.cuda.synchronize()
        timer.stop()
        # Evaluate the model on GPU 0
        animator.add(epoch + 1, (d2l.evaluate_accuracy_gpu(
            lambda x: lenet(x, device_params[0]), test_iter, devices[0]),))
    print(f'test acc: {animator.Y[0][-1]:.2f}, {timer.avg():.1f} sec/epoch '
          f'on {str(devices)}')

In [7]:
class Benchmark:
    """For measuring running time."""
    def __init__(self, description='Done'):
        """Defined in :numref:`sec_hybridize`"""
        self.description = description

    def __enter__(self):
        self.timer = Timer()
        return self

    def __exit__(self, *args):
        print(f'{self.description}: {self.timer.stop():.6f} sec')

class Timer:
    """Record multiple running times."""
    def __init__(self):
        """Defined in :numref:`sec_minibatch_sgd`"""
        self.times = []
        self.start()

    def start(self):
        """Start the timer."""
        self.tik = time.time()

    def stop(self):
        """Stop the timer and record the time in a list."""
        self.times.append(time.time() - self.tik)
        return self.times[-1]

    def avg(self):
        """Return the average time."""
        return sum(self.times) / len(self.times)

    def sum(self):
        """Return the sum of time."""
        return sum(self.times)

    def cumsum(self):
        """Return the accumulated time."""
        return np.array(self.times).cumsum().tolist()