## 分布式训练

随着模型参数和训练数据的急速增长，单机已经无法满足模型的训练需求，因此需要设计分布式训练系统来解决海量的计算和内存资源要求问题。本节将会依次介绍机器学习系统的基础概念、分布式训练集群架构、分布式训练并行策略，并以DeepSpeed为例介绍如何在大集群上训练大语言模型。

### 分布式训练概述

`分布式训练`是指将机器学习或深度学习模型训练任务分解成多个子任务，并在多个计算设备上并行地进行训练。计算设备可以是中央处理器（Central Processing Unit，CPU）、图形处理器（Graphics Processing Unit，GPU）、张量处理器（Tensor Processing Unit，TPU）也可以是神经网络处理器（Neural network Processing Unit，NPU）。  

<img src="./images/multi_device.png" style="zoom:70%;" /> 

由于同一个服务器内部的多个计算设备(单机多卡)之间内存也可能并不共享，因此无论这些计算设备是否处于同一个服务器还是多个服务器中，其系统架构都属于分布式系统范畴。  

单设备计算速度主要由单块计算加速芯片的`运算速度`和`数据I/O能力`来决定，对单设备训练效率进行优化，主要的技术手段有混合精度训练、算子融合、梯度累加等；分布式训练系统中计算设备数量越多，其理论峰值计算速度就会越高，但是受到通讯效率的影响，计算设备数量增大则会造成加速比急速降低；多设备加速比则是由计算和`通讯效率`决定，需要结合算法和网络拓扑结构进行优化，分布式训练并行策略主要目标就是提升分布式训练系统中的多设备加速比。  



### 分布式训练并行策略

单节点模型训练系统主要由数据和模型两部分组成，训练过程会有多个数据小批次(mini-batch)完成。训练系统利用批次数据通过多个算子(神经网络层)根据损失函数和优化算法来生成梯度，从而调整模型参数。整个模型训练的过程是可以有一个计算图来表示:  

<img src="./images/single_train_system.png" style="zoom:70%;" /> 

计算图的执行过程可以分为前向计算和反向计算两个阶段。前向计算就是将批次数据通过计算图中的每个算子，然后反向计算是根据损失函数和优化算法，每个算子计算梯度，并更新本地的参数。这样根据批次一轮一轮的进行前向传播和反向传播计算数据、更新参数。

根据上面所述，很容易想到如果进行并行加速，可以从模型和数据两个维度来考虑。1.可以对数据进行切分，同时将模型复制到多个设备上，并行执行不同的数据分片，这种方式成为数据并行；2.可以对模型进行切分，将模型中的算子分发到多个设备分别完成，称为模型并行；3.可以结合上面说的两种并行方式，称为混合并行。

#### 数据并行

在数据并行系统中，每个计算设备都有整个模型的副本，在训练时每个计算设备都会分配一个批次数据样本的子集，进行前向传播计算。在前向计算完成后，每个计算设备需要计算所有算子的梯度，并将本地梯度进行广播。需要聚合设备的本地梯度后然后取均值对模型进行更新，完成该批次的训练。  

<img src="./images/data_parallel.png" style="zoom:70%;" /> 

数据并行训练系统可以通过增加计算设备，有效提升整体训练吞吐量，即`每秒全局批次数`。与单计算设备的区别在于反向传播阶段的梯度需要在所有设备完成同步，并计算平均梯度来更新参数。TensorFlow和Pytorch都有对应的实现方法，TensorFlow DistributedStrategy、PyTorch Distributed、Horovod DistributedOptimizer。

数据并行训练加速比最高，同时显存占用也比较高，主要是要求每个设备上都备份一分模型导致的。

In [None]:
# 使用 PyTorch DistributedDataParallel 实现单个服务器多加速卡训练代码

# from torch.utils.data.distributed import DistributedSampler

# DistributedSampler类

import math
from typing import TypeVar, Optional, Iterator

import torch
from . import Sampler, Dataset
import torch.distributed as dist

__all__ = ["DistributedSampler", ]

T_co = TypeVar('T_co', covariant=True)


class DistributedSampler(Sampler[T_co]):
    r"""Sampler that restricts data loading to a subset of the dataset.

    It is especially useful in conjunction with
    :class:`torch.nn.parallel.DistributedDataParallel`. In such a case, each
    process can pass a :class:`~torch.utils.data.DistributedSampler` instance as a
    :class:`~torch.utils.data.DataLoader` sampler, and load a subset of the
    original dataset that is exclusive to it.

    .. note::
        Dataset is assumed to be of constant size and that any instance of it always
        returns the same elements in the same order.

    Args:
        dataset: Dataset used for sampling.
        num_replicas (int, optional): Number of processes participating in
            distributed training. By default, :attr:`world_size` is retrieved from the
            current distributed group.
        rank (int, optional): Rank of the current process within :attr:`num_replicas`.
            By default, :attr:`rank` is retrieved from the current distributed
            group.
        shuffle (bool, optional): If ``True`` (default), sampler will shuffle the
            indices.
        seed (int, optional): random seed used to shuffle the sampler if
            :attr:`shuffle=True`. This number should be identical across all
            processes in the distributed group. Default: ``0``.
        drop_last (bool, optional): if ``True``, then the sampler will drop the
            tail of the data to make it evenly divisible across the number of
            replicas. If ``False``, the sampler will add extra indices to make
            the data evenly divisible across the replicas. Default: ``False``.

    .. warning::
        In distributed mode, calling the :meth:`set_epoch` method at
        the beginning of each epoch **before** creating the :class:`DataLoader` iterator
        is necessary to make shuffling work properly across multiple epochs. Otherwise,
        the same ordering will be always used.

    Example::

        >>> # xdoctest: +SKIP
        >>> sampler = DistributedSampler(dataset) if is_distributed else None
        >>> loader = DataLoader(dataset, shuffle=(sampler is None),
        ...                     sampler=sampler)
        >>> for epoch in range(start_epoch, n_epochs):
        ...     if is_distributed:
        ...         sampler.set_epoch(epoch)
        ...     train(loader)
    """

    def __init__(self, dataset: Dataset, num_replicas: Optional[int] = None,
                 rank: Optional[int] = None, shuffle: bool = True,
                 seed: int = 0, drop_last: bool = False) -> None:
        if num_replicas is None:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            num_replicas = dist.get_world_size()
        if rank is None:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            rank = dist.get_rank()
        if rank >= num_replicas or rank < 0:
            raise ValueError(
                "Invalid rank {}, rank should be in the interval"
                " [0, {}]".format(rank, num_replicas - 1))
        self.dataset = dataset
        self.num_replicas = num_replicas
        self.rank = rank
        self.epoch = 0
        self.drop_last = drop_last
        # If the dataset length is evenly divisible by # of replicas, then there
        # is no need to drop any data, since the dataset will be split equally.
        if self.drop_last and len(self.dataset) % self.num_replicas != 0:  # type: ignore[arg-type]
            # Split to nearest available length that is evenly divisible.
            # This is to ensure each rank receives the same amount of data when
            # using this Sampler.
            self.num_samples = math.ceil(
                (len(self.dataset) - self.num_replicas) / self.num_replicas  # type: ignore[arg-type]
            )
        else:
            self.num_samples = math.ceil(len(self.dataset) / self.num_replicas)  # type: ignore[arg-type]
        self.total_size = self.num_samples * self.num_replicas
        self.shuffle = shuffle
        self.seed = seed

    def __iter__(self) -> Iterator[T_co]:
        if self.shuffle:
            # deterministically shuffle based on epoch and seed
            g = torch.Generator()
            g.manual_seed(self.seed + self.epoch)
            indices = torch.randperm(len(self.dataset), generator=g).tolist()  # type: ignore[arg-type]
        else:
            indices = list(range(len(self.dataset)))  # type: ignore[arg-type]

        if not self.drop_last:
            # add extra samples to make it evenly divisible
            padding_size = self.total_size - len(indices)
            if padding_size <= len(indices):
                indices += indices[:padding_size]
            else:
                indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size]
        else:
            # remove tail of data to make it evenly divisible.
            indices = indices[:self.total_size]
        assert len(indices) == self.total_size

        # subsample
        indices = indices[self.rank:self.total_size:self.num_replicas]
        assert len(indices) == self.num_samples

        return iter(indices)

    def __len__(self) -> int:
        return self.num_samples

    def set_epoch(self, epoch: int) -> None:
        r"""
        Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas
        use a different random ordering for each epoch. Otherwise, the next iteration of this
        sampler will yield the same ordering.

        Args:
            epoch (int): Epoch number.
        """
        self.epoch = epoch

In [None]:
# 具体使用的一个demo见下面的git代码 - train_multi_gpu.py文件
# https://github.com/TianyuJIAA/llm-course/tree/main/%E5%B0%9A%E7%A1%85%E8%B0%B7/chapter01/BertClassifier

#### 模型并行

模型并行主要用于解决单点内存不足的问题。比如GPT-3的模型参数是1750亿，模型参数都通过32位浮点数表示，则模型需要占用700G内存，计算公式为:  
```text
1 GB = 10^3 MB = 10^6 KB = 10^9 Bytes
32bit = 4Bytes
1750*10^8*4 = 175*4%10^9 = 700GB
```
H100加速也仅支持80GB显存，所以无法将模型完整放入其中。模型并行从计算图角度可以从两种方式切分:  
1.按模型的层切分到不同设备，即层并行或算子间并行，也称之为流水线并行  
2.将计算图层内的参数切分到不同设备，即层内并行或算子间并行，也称为张量并行。  

<img src="./images/model_parallel.png" style="zoom:70%;" /> 

##### 流水线并行

