## 1.背景介绍

本实验主要研究 PyTorch 分布式训练框架下的多机多卡数据/流水线混合并行训练方法。

## 2.实验目的
实现基于 torch.distributed.pipelining 和 torch.nn.parallel.DistributedDataParallel 的并行训练，并理解其工作原理。


## 3.硬件要求

两台服务器，各两张 GPU（4090、V100、A100等）。


## 4.技术原理

### 混合并行

在朴素的数据并行中，每个 GPU 都要保存一份完整的模型权重。若单张显卡显存不足，便可在数据并行组内，开启流水线并行，从而降低单张显卡的显存需求。

## 5.实验流程

### 环境配置



In [20]:
!pip install torch



### 5.1. transformer模型定义

In [11]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
import torch.nn.functional as F
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed.device_mesh import init_device_mesh
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import DistributedSampler
from torch.distributed import ReduceOp
import math
import yaml
import argparse

from torch.distributed.pipelining import pipeline, SplitPoint, build_stage
from torch.distributed.pipelining import ScheduleGPipe


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)
        )
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        return x + self.pe[:, : x.size(1)]


class MultiHeadSelfAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadSelfAttention, self).__init__()
        assert d_model % num_heads == 0
        self.d_k = d_model // num_heads
        self.num_heads = num_heads

        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.out_linear = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        batch_size = x.shape[0]

        q = (
            self.q_linear(x)
            .view(batch_size, -1, self.num_heads, self.d_k)
            .transpose(1, 2)
        )
        k = (
            self.k_linear(x)
            .view(batch_size, -1, self.num_heads, self.d_k)
            .transpose(1, 2)
        )
        v = (
            self.v_linear(x)
            .view(batch_size, -1, self.num_heads, self.d_k)
            .transpose(1, 2)
        )

        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float("-1e9"))

        attn_weights = F.softmax(scores, dim=-1)
        output = torch.matmul(attn_weights, v)

        output = (
            output.transpose(1, 2)
            .contiguous()
            .view(batch_size, -1, self.num_heads * self.d_k)
        )
        return self.out_linear(output)


class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.fc2(F.relu(self.fc1(x)))


class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(TransformerBlock, self).__init__()
        self.attention = MultiHeadSelfAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_out = self.attention(x, mask)
        x = self.norm1(x + self.dropout(attn_out))
        ffn_out = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_out))
        return x


class Transformer(nn.Module):
    def __init__(
        self,
        vocab_size,
        d_model=512,
        num_heads=8,
        d_ff=2048,
        num_layers=6,
        max_len=5000,
    ):
        super(Transformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList(
            [TransformerBlock(d_model, num_heads, d_ff) for _ in range(num_layers)]
        )
        self.out_linear = nn.Linear(d_model, vocab_size)

    def forward(self, x, mask=None):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        for layer in self.layers:
            x = layer(x)
        return self.out_linear(x)



Transformer(
  (embedding): Embedding(1000, 32)
  (pos_encoding): PositionalEncoding()
  (layers): ModuleList(
    (0-1): 2 x TransformerBlock(
      (attention): MultiHeadSelfAttention(
        (q_linear): Linear(in_features=32, out_features=32, bias=True)
        (k_linear): Linear(in_features=32, out_features=32, bias=True)
        (v_linear): Linear(in_features=32, out_features=32, bias=True)
        (out_linear): Linear(in_features=32, out_features=32, bias=True)
      )
      (norm1): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
      (ffn): FeedForward(
        (fc1): Linear(in_features=32, out_features=64, bias=True)
        (fc2): Linear(in_features=64, out_features=32, bias=True)
      )
      (norm2): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
  )
  (out_linear): Linear(in_features=32, out_features=1000, bias=True)
)


### 5.2.数据集定义

In [13]:
class NLPDataset(Dataset):
    def __init__(self, size, length):
        self.data = []
        for i in range(size):
            self.data.append(torch.full((length, ), i))

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx]

dataset = NLPDataset(12, 10)
for data in dataset:
    print(data)

tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
tensor([2, 2, 2, 2, 2, 2, 2, 2, 2, 2])
tensor([3, 3, 3, 3, 3, 3, 3, 3, 3, 3])
tensor([4, 4, 4, 4, 4, 4, 4, 4, 4, 4])
tensor([5, 5, 5, 5, 5, 5, 5, 5, 5, 5])
tensor([6, 6, 6, 6, 6, 6, 6, 6, 6, 6])
tensor([7, 7, 7, 7, 7, 7, 7, 7, 7, 7])
tensor([8, 8, 8, 8, 8, 8, 8, 8, 8, 8])
tensor([9, 9, 9, 9, 9, 9, 9, 9, 9, 9])


### 5.3训练核心代码实现

In [17]:
def parse_yaml_config(file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        config = yaml.safe_load(f)

    model_args = {
        "vocab_size": config.get("vocab_size"),
        "max_seq_length": config.get("max_seq_length"),
        "hidden_size": config.get("hidden_size"),
        "feedforward_size": config.get("feedforward_size"),
        "num_heads": config.get("num_heads"),
        "num_layers": config.get("num_layers"),
    }

    dataset_args = {
        "dataset_size": config.get("dataset_size"),
        "data_length": config.get("data_length"),
    }

    training_args = {
        "train_epochs": config.get("train_epochs"),
        "micro_batch_size": config.get("micro_batch_size"),
        "micro_num": config.get("micro_num"),
        "learning_rate": config.get("learning_rate"),
        "device_type": config.get("device_type"),
    }

    training_args["batch_size"] = (
        training_args["micro_num"] * training_args["micro_batch_size"]
    )

    parallel_args = {
        "pipeline_parallel_size": config.get("pipeline_parallel_size"),
        "data_parallel_size": config.get("data_parallel_size"),
    }

    return model_args, dataset_args, training_args, parallel_args

def train(rank, world_size, config_file):
    model_args, dataset_args, training_args, parallel_args = parse_yaml_config(
        config_file
    )

    def compute_loss(output, target):
        criterion = nn.CrossEntropyLoss()
        loss = criterion(output.view(-1, model_args["vocab_size"]), target.view(-1))
        return loss

    x = torch.zeros(
        (training_args["micro_batch_size"], dataset_args["data_length"]),
        dtype=torch.long,
    )

    split_spec = {}
    for i in range(parallel_args["pipeline_parallel_size"] - 1):
        layers_id = (
            (model_args["num_layers"] - 1)
            // parallel_args["pipeline_parallel_size"]
            * (i + 1)
        )
        split_spec[f"layers.{layers_id}"] = SplitPoint.END

    pipe = pipeline(
        module=Transformer(
            vocab_size=model_args["vocab_size"],
            d_model=model_args["hidden_size"],
            num_heads=model_args["num_heads"],
            d_ff=model_args["feedforward_size"],
            num_layers=model_args["num_layers"],
            max_len=model_args["max_seq_length"],
        ),
        mb_args=(x,),
        split_spec=split_spec,
    )

    mesh_2d = init_device_mesh(
        training_args["device_type"],
        mesh_shape=(
            parallel_args["data_parallel_size"],
            parallel_args["pipeline_parallel_size"],
        ),
        mesh_dim_names=("dp", "pp"),
    )

    pp_group = mesh_2d.get_group("pp")
    dp_group = mesh_2d.get_group("dp")

    pp_rank = dist.get_rank(pp_group)
    dp_rank = dist.get_rank(dp_group)

    device = f"cuda:{rank % 2}" if training_args["device_type"] == "gpu" else "cpu"

    stage_mod = pipe.get_stage_module(pp_rank).to(device)
    print("rank, stage_mod", rank, stage_mod)
    dp_mod = DDP(
        stage_mod,
        device_ids=[rank] if not device == "cpu" else None,
        process_group=dp_group,
    )
    optimizer = optim.SGD(dp_mod.parameters(), lr=training_args["learning_rate"])
    info = pipe.info()
    stage = build_stage(stage_mod, pp_rank, info, device, pp_group)

    schedule = ScheduleGPipe(stage, training_args["micro_num"], compute_loss)
    dataset = NLPDataset(
        size=dataset_args["dataset_size"], length=dataset_args["data_length"]
    )
    sampler = DistributedSampler(
        dataset, num_replicas=parallel_args["data_parallel_size"], rank=dp_rank
    )
    dataloader = DataLoader(
        dataset, batch_size=training_args["batch_size"], sampler=sampler
    )
    for epoch in range(training_args["train_epochs"]):
        for batch, data in enumerate(dataloader):
            x = data.to(device)
            optimizer.zero_grad()
            if pp_rank == 0:
                schedule.step(x)
            else:
                losses = []
                output = schedule.step(target=x, losses=losses)
                loss = torch.stack(losses).mean()
                dist.all_reduce(loss, op=ReduceOp.SUM, group=dp_group)
                if dp_rank == 0:
                    print(
                        f"Epoch {epoch}, Batch {batch}, Loss: {loss / parallel_args['data_parallel_size']}"
                    )
            optimizer.step()

    dist.destroy_process_group()

### 5.4启动训练

- 修改 run.sh 中服务器地址
- 第 i 台机器对应的启动命令为 bash run.sh i

In [25]:
!bash run.sh 0/1


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.3 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "/Users/lantianzhang/opt/anaconda3/envs/temp/bin/torchrun", line 5, in <module>
    from torch.distributed.run import main
  File "/Users/lantianzhang/opt/anaconda3/envs/temp/lib/python3.10/site-packages/torch/__init__.py", line 1477, in <module>
    from .functional import *  # noqa: F403
  File "/Users/lantianzhang/opt/anaconda3/envs/temp/lib/python3.10/site-packages/torch/functional.py", line 9, in <module>
    import torch.nn.functional as F
  File "/Users/lantianzhang/opt/anaconda3/envs/temp/lib/python3.10/site