这节课主要是讲怎么进行多GPU的数据并行训练，用LeNet做示范

这一节没办法在colab上弄，因为colab免费只给一个t4 gpu

In [6]:
%matplotlib inline
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

In [13]:
# 初始化模型参数
scale = 0.01
W1 = torch.randn(size=(20, 1, 3, 3)) * scale
b1 = torch.zeros(20)
W2 = torch.randn(size=(50, 20, 5, 5)) * scale
b2 = torch.zeros(50)
W3 = torch.randn(size=(800, 128)) * scale
b3 = torch.zeros(128)
W4 = torch.randn(size=(128, 10)) * scale
b4 = torch.zeros(10)
params = [W1, b1, W2, b2, W3, b3, W4, b4]

# 定义模型
def lenet(X, params):
    h1_conv = F.conv2d(input=X, weight=params[0], bias=params[1])
    h1_activation = F.relu(h1_conv)
    h1 = F.avg_pool2d(input=h1_activation, kernel_size=(2, 2), stride=(2, 2))
    h2_conv = F.conv2d(input=h1, weight=params[2], bias=params[3])
    h2_activation = F.relu(h2_conv)
    h2 = F.avg_pool2d(input=h2_activation, kernel_size=(2, 2), stride=(2, 2))
    h2 = h2.reshape(h2.shape[0], -1)
    h3_linear = torch.mm(h2, params[4]) + params[5]
    h3 = F.relu(h3_linear)
    y_hat = torch.mm(h3, params[6]) + params[7]
    return y_hat

# 交叉熵损失函数
loss = nn.CrossEntropyLoss(reduction='none')

上面这里我们用了F来定义各种层，还记得我们之前有提到过，F里面是不会自动更新参数的，所以我们这里树洞给他指定了参数，也就是传入了weight参数

在这里我们再阐述一遍两者的差别

从功能来说两者相当，基于nn.Mudle能实现的层，使用nn.funtional也可实现，反之亦然，而且性能方面两者也没有太大差异。不过在具体使用时，两者还是有区别，主要区别如下：
1. nn.Xxx继承于nn.Module，nn.Xxx 需要先实例化并传入参数，然后以函数调用的方式调用实例化的对象并传入输入数据。它能够很好的与nn.Sequential结合使用，而nn.functional.xxx无法与nn.Sequential结合使用。
2. nn.Xxx不需要自己定义和管理weight、bias参数；而nn.functional.xxx需要你自己定义weight、bias，每次调用的时候都需要手动传入weight、bias等参数, 不利于代码复用。
3. dropout操作在训练和测试阶段是有区别的，使用nn.Xxx方式定义dropout，在调用model.eval()之后，自动实现状态的转换，而使用nn.functional.xxx却无此功能。
* 总的来说，两种功能都是相同的，但PyTorch官方推荐：具有学习参数的（例如，conv2d, linear, batch_norm)采用nn.Xxx方式。没有学习参数的（例如，maxpool, loss func, activation func）等根据个人选择使用nn.functional.xxx或者nn.Xxx方式。

In [14]:
# 因为我们的数据在不进行任何操作的情况下，肯定是默认放在主内存（或者是CPU）里面的，所以这里我们就用一个函数，把我们所有的参数放到GPU里面，并且记得设置需要梯度来更新
def get_params(params, device):
    new_params = [p.to(device) for p in params]
    for p in new_params:
        p.requires_grad_()
    return new_params

new_params = get_params(params, d2l.try_gpu(0))
print('b1 权重:', new_params[1])
print('b1 梯度:', new_params[1].grad)

b1 权重: tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       requires_grad=True)
b1 梯度: None


这里的allreduce操作在HPC领域是一个很常见的操作，用人话说就是我把所有设备上的数据合并起来，然后计算到一个结果再交给所有设备（因为我们是对一个batch的数据做了并行还记得吗，等于是一个batch的数据分成了好几份）

In [16]:
def allreduce(data):
    # 这里写的是把所有GPU上的数据全部放到GPU0上面，然后GPU0上面算完了再放回去
    # 注意两个for loop的计算符号不一样，其实就是一个合并的过程
    # 还记得吗，我们做损失函数的时候是把所有样本的损失合并求均值来算的
    # 这里allreduce不就是合并的操作吗
    for i in range(1, len(data)):
        data[0][:] += data[i].to(data[0].device)
    for i in range(1, len(data)):
        data[i][:] = data[0].to(data[i].device)
        
data = [torch.ones((1, 2), device=d2l.try_gpu(i)) * (i + 1) for i in range(2)]
print('allreduce之前：\n', data[0], '\n', data[1])
allreduce(data)
print('allreduce之后：\n', data[0], '\n', data[1])

allreduce之前：
 tensor([[1., 1.]]) 
 tensor([[2., 2.]])
allreduce之后：
 tensor([[3., 3.]]) 
 tensor([[3., 3.]])


In [19]:
# 这里其实很简单，就是把你的batch分开 分到GPU设备上
data = torch.arange(20).reshape(4, 5)
devices = [torch.device('cuda:0'), torch.device('cuda:1')]
split = nn.parallel.scatter(data, devices) # 这个函数只支持cuda设备，所以到这里就运行不了了
print('input :', data)
print('load into', devices)
print('output:', split)

RuntimeError: Found no NVIDIA driver on your system. Please check that you have an NVIDIA GPU and installed a driver from http://www.nvidia.com/Download/index.aspx

In [1]:
#@save
def split_batch(X, y, devices):
    """将X和y拆分到多个设备上"""
    assert X.shape[0] == y.shape[0]
    return (nn.parallel.scatter(X, devices),
            nn.parallel.scatter(y, devices))

In [8]:
def train_batch(X, y, device_params, devices, lr):
    X_shards, y_shards = split_batch(X, y, devices)
    # 在每个GPU上分别计算损失
    ls = [loss(lenet(X_shard, device_W), y_shard).sum()
          for X_shard, y_shard, device_W in zip(
              X_shards, y_shards, device_params)]
    for l in ls:  # 反向传播在每个GPU上分别执行
        l.backward()
    # 将每个GPU的所有梯度相加，并将其广播到所有GPU
    with torch.no_grad():
        for i in range(len(device_params[0])):
            allreduce(
                [device_params[c][i].grad for c in range(len(devices))])
    # 在每个GPU上分别更新模型参数
    for param in device_params:
        d2l.sgd(param, lr, X.shape[0]) # 在这里，我们使用全尺寸的小批量
        # 不过这里其实有点重复计算，其实可以一个GPU算完，然后更新过去就行
        
# import inspect
# print(inspect.getsource(d2l.sgd))

In [2]:
def train(num_gpus, batch_size, lr):
    train_iter, test_iter = d2l.load_data_fashion_mnist(batch_size)
    devices = [d2l.try_gpu(i) for i in range(num_gpus)]
    # 将模型参数复制到num_gpus个GPU
    device_params = [get_params(params, d) for d in devices]
    num_epochs = 10
    animator = d2l.Animator('epoch', 'test acc', xlim=[1, num_epochs])
    timer = d2l.Timer()
    for epoch in range(num_epochs):
        timer.start()
        for X, y in train_iter:
            # 为单个小批量执行多GPU训练
            train_batch(X, y, device_params, devices, lr)
            torch.cuda.synchronize() # 这个函数是一个同步的函数，就是保证每个GPU都跑完了
        timer.stop()
        # 在GPU0上评估模型
        animator.add(epoch + 1, (d2l.evaluate_accuracy_gpu(
            lambda x: lenet(x, device_params[0]), test_iter, devices[0]),))
    print(f'测试精度：{animator.Y[0][-1]:.2f}，{timer.avg():.1f}秒/轮，'
          f'在{str(devices)}')

上面我们实现了两个函数，分别是对单个batch和整个数据集上进行GPU数据并行训练，下面是一些解释：
1. 首先，我们在每个GPU上面进行了损失函数的计算与梯度反向传播，注意此处我们只算了总的梯度，并没有算平均的损失（实际上我们是要算batch平均损失然后去算梯度的）
2. 接着我们把所有的GPU上的梯度加起来，因为是导数运算规则相加是直接加，所以我们此时得到了总损失计算得到的梯度
3. 我们再用自己实现的sgd优化器来根据梯度进行优化，这里我们的batch size一定要使用全尺寸的小批量，为啥呢？因为我们前面把多块GPU的损失加起来了算了梯度，等于这里我们得到的是总的batch的损失，自然要加上全尺寸大小去求平均值（可以看d2l.sgd的源代码）
4. 对于一个epoch来说，其他的其实都差不多，我们首先用并行的方法来进行训练，然后在每个epoch的末尾，我们在其中一个GPU上用训练得到的参数进行一次评估就行（因为评估等于只做前向运算，所以不并行无所谓了）
5. 其实会发现，我们上面算loss和算梯度都有一点点串行，并没有真正的并行。框架一般都会自动的帮你实现并行，所以我们不需要特地去写啥复杂的代码来并行（MXNET、TF都确定性，torch好像也是可以的）

In [None]:
train(num_gpus=1, batch_size=256, lr=0.2)

In [None]:
train(num_gpus=2, batch_size=256, lr=0.2)
# 其实实际情况下这里并没有变快，可能因为是LeNet本身小，然后batch size也不大，有很多种原因
# 一般来说，GPU数量增大，每个GPU的batch size应该保持不变的
# batch size变大了，其实lr也可以适当的变大一点，因为每次学习的样本变多了，等于一个epoch里面学习次数少了，所以lr适当大一点

接下来我们看看简洁实现是怎么做的

这里我们用个ResNet18，相对来说是个更正常大小的模型

In [1]:
import torch
from torch import nn
from d2l import torch as d2l

In [2]:
#@save
def resnet18(num_classes, in_channels=1):
    """稍加修改的ResNet-18模型"""
    def resnet_block(in_channels, out_channels, num_residuals,
                     first_block=False):
        blk = []
        for i in range(num_residuals):
            if i == 0 and not first_block:
                blk.append(d2l.Residual(in_channels, out_channels, use_1x1conv=True, strides=2))
            else:
                blk.append(d2l.Residual(out_channels, out_channels))
        return nn.Sequential(*blk)

    # 该模型使用了更小的卷积核、步长和填充，而且删除了最大汇聚层
    net = nn.Sequential(
        nn.Conv2d(in_channels, 64, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm2d(64),
        nn.ReLU())
    net.add_module("resnet_block1", resnet_block(
        64, 64, 2, first_block=True))
    net.add_module("resnet_block2", resnet_block(64, 128, 2))
    net.add_module("resnet_block3", resnet_block(128, 256, 2))
    net.add_module("resnet_block4", resnet_block(256, 512, 2))
    net.add_module("global_avg_pool", nn.AdaptiveAvgPool2d((1,1)))
    net.add_module("fc", nn.Sequential(nn.Flatten(),
                                       nn.Linear(512, num_classes)))
    return net

In [4]:
net = resnet18(10)
# 获取GPU列表Residual类的实现有错误，所以改了源代码
devices = d2l.try_all_gpus()
# d2l包里面
# 我们将在训练代码实现中初始化网络

In [5]:
def train(net, num_gpus, batch_size, lr):
    train_iter, test_iter = d2l.load_data_fashion_mnist(batch_size)
    devices = [d2l.try_gpu(i) for i in range(num_gpus)]
    def init_weights(m):
        if type(m) in [nn.Linear, nn.Conv2d]:
            nn.init.normal_(m.weight, std=0.01)
    net.apply(init_weights)
    # 在多个GPU上设置模型
    net = nn.DataParallel(net, device_ids=devices)
    trainer = torch.optim.SGD(net.parameters(), lr)
    loss = nn.CrossEntropyLoss()
    timer, num_epochs = d2l.Timer(), 10
    animator = d2l.Animator('epoch', 'test acc', xlim=[1, num_epochs])
    for epoch in range(num_epochs):
        net.train()
        timer.start()
        for X, y in train_iter:
            trainer.zero_grad()
            X, y = X.to(devices[0]), y.to(devices[0])
            l = loss(net(X), y)
            l.backward()
            trainer.step()
        timer.stop()
        animator.add(epoch + 1, (d2l.evaluate_accuracy_gpu(net, test_iter),))
    print(f'测试精度：{animator.Y[0][-1]:.2f}，{timer.avg():.1f}秒/轮，'
          f'在{str(devices)}')

这里的训练函数其实和上面的没啥本质区别，只是写的更加简洁了：
1. 这里我们对net重新进行了一次实例化，调用nn.DataParallel类，完成并行
2. 然后直接用torch的SGD优化器
3. 对每个batch，首先把数据放在GPU上，然后直接像我们之前单GPU网络实现那样，算loss，反向，优化就OK了
4. 总的来说torch的并行实现让我们保留了和单GPU一样的语法风格，这样很方便
5. 这里我们的net是nn.module的子类，所以可以发现，我们不需要自己定义参数，初始化也可以很方便的用apply函数来进行
6. 而上面我们用functional实现的LeNet，就需要自己定义参数，并且传进去，初始化也要自己来弄

In [None]:
train(net, num_gpus=1, batch_size=256, lr=0.1)

In [None]:
train(net, num_gpus=2, batch_size=512, lr=0.2)