在深入探讨之前，让我们澄清一下，尽管增加了复杂性，但为什么要考虑在DataParallel上使用DistributedDataParallel：

如果模型太大而无法容纳在单个GPU上，则必须使用 model parallel 将其拆分到多个GPU中。 DistributedDataParallel与模型并行工作； DataParallel目前不提供。
DataParallel是单进程，多线程，并且只能在单台计算机上运行，​​而DistributedDataParallel是多进程，并且可以在单机和分布式训练中使用。因此，即使在单机训练中，您的数据足够小以适合单机，DistributedDataParallel仍要比DataParallel更快。 DistributedDataParallel还可以预先复制模型，而不是在每次迭代时复制模型，并且可以避免PIL全局解释器锁定。
如果数据和模型同时很大而无法用一个GPU训练，则可以将model parallel（与DistributedDataParallel结合使用。在这种情况下，每个DistributedDataParallel进程都可以model parallel，并且所有进程共同用数据并行

In [None]:
# 一段完整的伪代码以及程序启动命令
import os
import argparse
import torch
from torch.nn import SyncBatchNorm
from torch.nn.parallel import DistributedDataParallel
import torch.distributed as dist
from torch.utils.data.distributed import DistributedSampler

parser = argparse.ArgumentParser(description='training')
parser.add_argument('--local_rank', type=str, help='local rank for dist')
args = parser.parse_args()

print(os.environ['MASTER_ADDR'])
print(os.environ['MASTER_PORT'])
world_size = torch.cuda.device_count()
local_rank = args.local_rank

dist.init_process_group(backend='nccl')# 防止timeout

torch.cuda.set_device(local_rank) # 为每个gpu分配线程

train_dataset = Dataset(...)
train_sampler = DistributedSampler(train_dataset)
train_loader = Dataloader(dataset=train_dataet, sampler=train_sampler, shuffle=False)

val_set = Dataset()
val_loader = Dataloader(dataset=val_set)

model = MyModel()
model = model.cuda() # 先拷到当前进程的GPU中
model = SyncBatchNorn.convert_sync_batchnorm(model)
model = DistributedDataParallel(model, device_ids=[local_rank], output_devices=local_rank, find_unused_parameters=True)

cls_criterion = nn.CrossEntropyLoss()
optim = optimizer()
scheduler = scheduler()

for epoch in range(start_epoch, end_epoch):
    trainer_sampler.set_epoch(epoch)
    train()
    eval()
    # 存储在0进程中的GPU里
    if local_rank==0:
        log()

if local_rank==0:
    torch.save(model.module().state_dict())

In [None]:
# 运行代码
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=8 main.py

因为torch.dist对程序的入口有更改，所以先总结运行代码。torch.dist跟DataParallel不同，需要多进程运行程序，以此达到多机训练的效果。在官方的实现中，使用torch.distributed.launch来启动多进程。除此之外，还使用torch.multiprocessing来手动执行多进程程序。在最新的版本中，官方打算将python -m torch.distributed.lanuch用torchun替代，这里暂时不用。   
其中nproc_per_node表示开启的进程数，这里与使用的卡数保持一致。torch.distributed.launch会自动分配一些参数到主程序中，也可以手动修改。这次关注到的有：RANK表示进程的优先级，也可以认为是进程的序列号；MASTER_ADDR和MASTER_PORT分别表示通讯的地址和端口，torch.distributed.launch会将其设置为环境变量；WORLD_SIZE表示gpu*节点数，本例里就是gpu数量。

需要承接torch.distributed.launch给main.py传递的local_rank参数。local_rank参数表示进程的优先级，也可以认为是进程的序列号

local_rank表示进程的优先级，也可以认为是进程的序列号；MASTER_ADDR和MASTER_PORT分别表示通讯的地址和端口，torch.distributed.launch会将其设置为环境变量；world_size表示gpu*节点数，本例里就是gpu数量。这里代码里print出来展示。
dist.init_process_group(backend='nccl')初始化torch.dist的环境。这里backend选择nccl来进行通讯，可以用dist.is_nccl_avaliable()来查看是否可用nccl。除此之外也可以在这里设置一些其他的环境参数。
torch.cuda.set_device(local_rank)设置环境CUDA序号

对训练数据集做修改。将dataloader的sampler修改为DistributedSampler，这样保证其每个进程采样的数据是不同的
训练集的dataloader的shuffle只能设置为False，DistributedSampler会进行shuffle，如果dataloader再shuffle的话会打乱次序，导致多进程分配的数据不对
batch_size设置的是每个进程的，因此不需要像dataparalle一样乘以卡数
对验证集可以不做修改，如果每个进程不同的话需要再整合所有进程的结果

跟DataParallel类似的是，加载的模型需要用DDP（DistributedDataParallel）重载一下。这里如果运行时报错有unused_parameters，那么就设置find_unused_parameters=True
DDP从原理上应该是多机通讯更新梯度从而保证模型的参数都是一样的，而DataParallel则是在一张卡上集中更新模型权重，再复制到其他卡上
对于损失函数、优化器和sheduler都不需要DDP，如果有需要更新的参数的话还是需要DDP重载

In [None]:
# 基本使用（二）
import os
import tempfile
import torch
import torch.distributed as dist #分布
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp #多线程

from torch.nn.parallel import DistributedDataParallel as DDP


def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

    # Explicitly setting seed to make sure that models created in two processes
    # start from same random weights and biases.
    torch.manual_seed(42)


def cleanup():
    dist.destroy_process_group()

现在，创建一个toy model，将其与DDP封装在一起，并提供一些虚拟输入数据。请注意，如果训练从随机参数开始，则可能要确保所有DDP进程都使用相同的初始值。否则，全局梯度同步将没有意义。

In [None]:
class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    setup(rank, world_size)

    # setup devices for this process, rank 1 uses GPUs [0, 1, 2, 3] and
    # rank 2 uses GPUs [4, 5, 6, 7].
    n = torch.cuda.device_count() // world_size
    device_ids = list(range(rank * n, (rank + 1) * n))

    # create model and move it to device_ids[0]
    model = ToyModel().to(device_ids[0])
    # output_device defaults to device_ids[0]
    ddp_model = DDP(model, device_ids=device_ids)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_ids[0])
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

DDP封装了 lower level distributed communication details，并提供了干净的API，就好像它是本地模型一样。对于基本用例，DDP仅需要几个LoCs来设置 process group。在将DDP应用到更高级的用例时，需要注意一些警告。

处理速度不同步时
在DDP中，Model, forward method 和 differentiation of the outputs是分布式的同步点。期望不同的过程以相同的顺序到达同步点，并在大致相同的时间进入每个同步点。否则，快速流程可能会提早到达，并在等待时超时。因此，用户负责进程之间的工作负载分配。有时，由于例如网络延迟，资源争用，不可预测的工作量峰值，不可避免地会出现不同步的处理速度。为了避免在这些情况下超时，请确保在调用init_process_group时传递足够大valuetimeout

保存和加载 Checkpoints
在训练期间，通常使用torch.save 和torch.load 来保存和加载 Checkpoints，有关更多详细信息，请参见 SAVING AND LOADING MODELS，使用DDP时，一种优化方法是仅在一个进程中保存模型，然后将其加载到所有进程中，从而减少写开销，这是正确的，因为所有过程都从相同的参数开始，并且梯度在反向传播中同步，因此优化程序应将参数设置为相同的值。如果使用此优化，请确保在保存完成之前不要启动所有进程。此外，在加载模块时，您需要提供适当的 map_location 参数，以防止进程进入其他人的设备。如果缺少map_location，torch.load 将首先将模块加载到CPU，然后将每个参数复制到保存位置，这将导致同一台机器上的所有进程使用相同的设备集

In [None]:
def demo_checkpoint(rank, world_size):
    setup(rank, world_size)

    # setup devices for this process, rank 1 uses GPUs [0, 1, 2, 3] and
    # rank 2 uses GPUs [4, 5, 6, 7].
    n = torch.cuda.device_count() // world_size
    device_ids = list(range(rank * n, (rank + 1) * n))

    model = ToyModel().to(device_ids[0])
    # output_device defaults to device_ids[0]
    ddp_model = DDP(model, device_ids=device_ids)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    rank0_devices = [x - rank * len(device_ids) for x in device_ids]
    device_pairs = zip(rank0_devices, device_ids)
    map_location = {'cuda:%d' % x: 'cuda:%d' % y for x, y in device_pairs}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_ids[0])
    loss_fn = nn.MSELoss()
    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Use a barrier() to make sure that all processes have finished reading the
    # checkpoint
    dist.barrier()

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()

**DDP 和 Model Parallelism 一起使用(防止出错，非必要不用)**    
DDP还可以与 Model Parallelism一起使用，但是不支持进程内的复制。您需要为每个module 副本创建一个进程，与每个进程的多个副本相比，通常可以提高性能。 这种训练方式在具有巨大的数据量较大的模型时特别有用。使用此功能时，需要小心地实现 multi-GPU model，以避免使用硬编码的设备，因为会将不同的模型副本放置到不同的设备上

In [None]:
lass ToyMpModel(nn.Module):
    def __init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)

    def forward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)

将multi-GPU model 传递给DDP时，不得设置device_ids和output_device，输入和输出数据将通过应用程序或模型forward() 方法放置在适当的设备中。

In [None]:
def demo_model_parallel(rank, world_size):
    setup(rank, world_size)

    # setup mp_model and devices for this process
    dev0 = rank * 2
    dev1 = rank * 2 + 1
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    # outputs will be on dev1
    outputs = ddp_mp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


if __name__ == "__main__":
    run_demo(demo_basic, 2)
    run_demo(demo_checkpoint, 2)

    if torch.cuda.device_count() >= 8:
        run_demo(demo_model_parallel, 4)

[当代研究生应当掌握的5种Pytorch并行训练方法（单机多卡）](https://mp.weixin.qq.com/s/YwvO_5y67ZxYiR_-BlrNOg)

[**官网**](https://pytorch.org/docs/1.10/generated/torch.nn.parallel.DistributedDataParallel.html?highlight=distributeddataparallel#torch.nn.parallel.DistributedDataParallel)