In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
import numpy as np
import os
import time

In [2]:
class SimpleDataset(Dataset):
    def __init__(self, size=10000):
        """创建一个简单的线性数据集 y = 2x + 3 + 噪声"""
        self.x = torch.randn(size, 1)  # 随机生成 x 值
        self.y = 2 * self.x + 3 + 0.1 * torch.randn(size, 1)  # 计算对应的 y 值并添加噪声
    
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

In [3]:
class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.linear = nn.Linear(1, 1)  # 简单的线性层，输入输出都是 1 维
    
    def forward(self, x):
        return self.linear(x)

In [4]:
def single_gpu_training():
    """单卡训练函数，作为性能对比基准"""
    # 超参数设置
    batch_size = 32
    learning_rate = 0.01
    epochs = 10
    
    # 创建数据集和数据加载器
    dataset = SimpleDataset(size=10000)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    # 初始化模型、损失函数和优化器
    model = SimpleModel()
    criterion = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)
    
    # 记录训练开始时间
    start_time = time.time()
    
    # 训练循环
    for epoch in range(epochs):
        running_loss = 0.0
        for i, (inputs, labels) in enumerate(dataloader):
            # 清零梯度
            optimizer.zero_grad()
            
            # 前向传播
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # 反向传播和参数更新
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            
            # 每 100 个批次打印一次信息
            if i % 100 == 99:
                print(f'[{epoch + 1}, {i + 1}] loss: {running_loss / 100:.5f}')
                running_loss = 0.0
    
    # 计算总训练时间
    total_time = time.time() - start_time
    print(f'单卡训练完成，总时间: {total_time:.2f}秒')
    
    # 打印学到的参数，应该接近 w=2, b=3
    for name, param in model.named_parameters():
        print(f'{name}: {param.item():.4f}')
    
    return total_time

In [5]:
single_gpu_training()

[1, 100] loss: 4.06866
[1, 200] loss: 0.07826
[1, 300] loss: 0.01117
[2, 100] loss: 0.00984
[2, 200] loss: 0.01008
[2, 300] loss: 0.00992
[3, 100] loss: 0.00988
[3, 200] loss: 0.00961
[3, 300] loss: 0.01024
[4, 100] loss: 0.00942
[4, 200] loss: 0.01038
[4, 300] loss: 0.00975
[5, 100] loss: 0.00979
[5, 200] loss: 0.00993
[5, 300] loss: 0.01007
[6, 100] loss: 0.01006
[6, 200] loss: 0.00952
[6, 300] loss: 0.01008
[7, 100] loss: 0.01016
[7, 200] loss: 0.00974
[7, 300] loss: 0.00994
[8, 100] loss: 0.01039
[8, 200] loss: 0.00971
[8, 300] loss: 0.00969
[9, 100] loss: 0.00976
[9, 200] loss: 0.00988
[9, 300] loss: 0.01008
[10, 100] loss: 0.01008
[10, 200] loss: 0.00979
[10, 300] loss: 0.00986
单卡训练完成，总时间: 0.52秒
linear.weight: 1.9982
linear.bias: 3.0014


[1;36m0.5208249092102051[0m

In [None]:
def ddp_train(rank, world_size, epochs, batch_size, learning_rate):
    # 初始化进程组，使用 NCCL 后端（适合 GPU）
    # 对于 CPU 训练，可以使用 gloo 后端
    
    # 支持单GPU模拟：如果使用 torchrun，LOCAL_RANK 会被自动设置
    # 否则使用传入的 rank
    local_rank = int(os.environ.get('LOCAL_RANK', rank))
    
    # 如果使用 torchrun，它会自动初始化进程组
    # 否则手动初始化
    if not dist.is_initialized():
        dist.init_process_group(
            backend='nccl',  # 通信后端
            init_method='tcp://127.0.0.1:12355',  # 初始化方法和地址
            rank=rank,  # 当前进程编号
            world_size=world_size  # 总进程数
        )
    else:
        # torchrun 已经初始化了，使用环境变量中的 rank
        rank = int(os.environ.get('RANK', rank))
        world_size = int(os.environ.get('WORLD_SIZE', world_size))
    
    # 设置当前设备 - 使用 local_rank 而不是 rank
    # 在单GPU模拟时，所有进程的 local_rank 会映射到同一个GPU
    torch.cuda.set_device(local_rank)
    
    # 创建数据集
    dataset = SimpleDataset(size=10000)
    
    # 创建分布式采样器，确保每个进程获取不同的数据子集
    sampler = DistributedSampler(dataset, shuffle=True)
    
    # 创建数据加载器，注意这里的 batch_size 是每个进程的 batch size
    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        sampler=sampler  # 使用分布式采样器
    )
    
    # 创建模型并移动到当前设备
    model = SimpleModel().to(local_rank)
    
    # 使用 DDP 包装模型
    ddp_model = DDP(model, device_ids=[local_rank])
    
    # 定义损失函数和优化器
    criterion = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=learning_rate)
    
    # 记录训练开始时间（只在主进程记录）
    start_time = time.time() if rank == 0 else None
    
    # 训练循环
    for epoch in range(epochs):
        # 设置采样器的 epoch，确保不同 epoch 的 shuffle 一致
        sampler.set_epoch(epoch)
        
        running_loss = 0.0
        for i, (inputs, labels) in enumerate(dataloader):
            # 将数据移动到当前设备
            inputs = inputs.to(local_rank)
            labels = labels.to(local_rank)
            
            # 清零梯度
            optimizer.zero_grad()
            
            # 前向传播
            outputs = ddp_model(inputs)
            loss = criterion(outputs, labels)
            
            # 反向传播
            loss.backward()
            
            # 参数更新
            optimizer.step()
            
            running_loss += loss.item()
            
            # 只在主进程打印信息，避免多个进程同时打印
            if rank == 0 and i % 100 == 99:
                print(f'[{epoch + 1}, {i + 1}] loss: {running_loss / 100:.5f}')
                running_loss = 0.0
    
    # 计算总训练时间
    if rank == 0:
        total_time = time.time() - start_time
        print(f'DDP 训练完成，总时间: {total_time:.2f}秒')
        
        # 打印学到的参数
        for name, param in model.named_parameters():
            print(f'{name}: {param.item():.4f}')
    
    # 清理进程组
    dist.destroy_process_group()
    
    return total_time if rank == 0 else None

In [7]:
def run_ddp_training(world_size, epochs=10, batch_size=32, learning_rate=0.01):
    """启动多个进程进行 DDP 训练"""
    # 使用 torch.multiprocessing.spawn 启动多个进程
    # 每个进程将运行 ddp_train 函数，并传入不同的 rank
    mp.spawn(
        ddp_train,  # 要在每个进程中运行的函数
        args=(world_size, epochs, batch_size, learning_rate),  # 传递给 ddp_train 的参数
        nprocs=world_size,  # 进程数量
        join=True  # 是否等待所有进程完成
    )

## 单GPU模拟多GPU运行DDP

如果你只有一个GPU，但想测试多GPU的DDP训练，可以使用 `torchrun` 在单个GPU上模拟多个进程。

### 方法1: 使用 torchrun（推荐）

在终端中运行以下命令（从项目根目录）：

```bash
# 模拟2个GPU（在GPU 0上运行2个进程）
CUDA_VISIBLE_DEVICES=0 torchrun --nproc_per_node=2 code/chapter3/train_ddp_torchrun.py

# 模拟4个GPU（在GPU 0上运行4个进程）
CUDA_VISIBLE_DEVICES=0 torchrun --nproc_per_node=4 code/chapter3/train_ddp_torchrun.py
```

### 方法2: 在notebook中使用（需要修改代码）

如果你想在notebook中直接运行，需要修改上面的代码，使用环境变量来设置进程数。


In [None]:
# 设置随机种子，确保实验可复现
torch.manual_seed(42)
np.random.seed(42)

# 超参数设置
epochs = 10
batch_size_per_gpu = 32  # 每个 GPU 的 batch size
learning_rate = 0.01

# 检查可用 GPU 数量
available_gpus = torch.cuda.device_count()
print(f"可用 GPU 数量: {available_gpus}")

# 如果没有可用 GPU，使用 CPU 进行演示（实际中 DDP 通常用于 GPU）
if available_gpus == 0:
    print("警告: 未检测到 GPU，将使用 CPU 进行演示")
    # 单卡(CPU)训练
    print("\n===== 开始单卡(CPU)训练 =====")
    single_time = single_gpu_training()
    
    # 由于没有 GPU，这里不运行 DDP 训练
    print("\n 由于没有可用 GPU，跳过 DDP 训练演示")


# 单卡训练
#print("\n===== 开始单卡训练 =====")
#single_time = single_gpu_training()

# 使用所有可用 GPU 进行 DDP 训练
world_size = available_gpus
total_batch_size = batch_size_per_gpu * world_size
print(f"\n===== 开始 DDP 训练 (使用{world_size}个 GPU，总 batch size: {total_batch_size}) =====")

# 为了公平比较，DDP 训练的总 batch size 应与单卡训练相同
# 因此每个 GPU 的 batch size = 单卡 batch size / GPU 数量
adjusted_batch_size = batch_size_per_gpu

# 启动 DDP 训练
run_ddp_training(
    world_size=world_size,
    epochs=epochs,
    batch_size=adjusted_batch_size,
    learning_rate=learning_rate * world_size  # 当总 batch size 增加时，通常需要按比例增加学习率
)

# 注意：由于 mp.spawn 的限制，我们无法直接获取 DDP 训练时间
# 在实际应用中，可以通过文件或其他方式在进程间传递这个信息
print("\n===== 训练对比 =====")
print(f"单卡训练时间: {single_time:.2f}秒")
print(f"使用{world_size}个 GPU 的 DDP 训练时间: 请查看上面的 DDP 训练输出")
print(f"理论加速比: {single_time / (single_time / world_size):.2f}x (实际加速比可能因通信开销略低)")