In [None]:
# 单机多卡：NCCL通信协议，torch.nn.parallel.DistributedDataParallel
# 多机多卡：先考虑NCCL通信协议，不行再考虑Gloo通信协议; 类似与map reduce过程

# 同一机器GPU之间的通信带宽 > CPU-GPU带宽 > 机器之间网络

# nccl: GPU之间的通信（英伟达）
# mpi: CPU和GPU之间的通信

In [15]:
# 1.数据并行DataParallel：模型和优化器需要进一步包装。
# model = DataParallel(model, device_ids=[0,1], output_device=0) 
# optimizer = DataParallel(optimizer, device_ids=[0,1], output_device=0)
# 单机多卡：多gpu并行计算，不支持多机器。好处是改动代码很少。
# 前向过程：batch数据会送到指定的GPU上（所以要注意batch_size大于能整除GPU个数）
# 反向传播：每个副本梯度累加到指定GPU上汇总（下面是0，默认也是0，所以第一块显卡的占用内存会多一些，类似于参数服务器）

# 许多低效率之处：
# 1.冗余数据副本：数据从主机复制到主GPU，然后将子微型批分散在其他GPU上
# 2.在前向传播之前跨GPU进行模型复制：由于模型参数是在主GPU上更新的，因此模型必须在每次正向传递的开始时重新同步
# 3.每批的线程创建/销毁开销：并行转发是在多个线程中实现的（这可能只是PyTorch问题）
# 4.梯度减少流水线机会未开发：在Pytorch 1.0数据并行实现中，梯度下降发生在反向传播的末尾。
# 5.在主GPU上不必要地收集模型输出output
# 6.GPU利用率不均：在主GPU上执行损失loss计算
# 7.梯度下降，在主GPU上更新参数

# 单进程多线程性能被python的GIL锁约束。


1 0.3859357237815857
2 0.39890220761299133
3 0.2811512351036072
4 0.873574435710907
5 0.42149585485458374
6 0.4353399872779846
7 0.421011745929718
8 0.44859200716018677
9 0.6005834937095642
10 0.2511233687400818


In [1]:
# 1.1 标准单机单卡版本：cnn on minist
import torch, time, os
from torch import nn, optim
from torchvision import datasets, transforms
from torch.autograd import Variable
from torch.utils.data import DataLoader
from tqdm import tqdm

os.environ["CUDA_VISIBLE_DEVICES"] = "0"

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.layer1 = nn.Sequential(nn.Conv2d(1,16,kernel_size=3), nn.BatchNorm2d(16), nn.ReLU(inplace=True))
        self.layer2 = nn.Sequential(nn.Conv2d(16,32,kernel_size=3), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.layer3 = nn.Sequential(nn.Conv2d(32,64,kernel_size=3), nn.BatchNorm2d(64), nn.ReLU(inplace=True))
        self.layer4 = nn.Sequential(nn.Conv2d(64,128,kernel_size=3), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.fc = nn.Sequential(nn.Linear(128*4*4,1024), nn.ReLU(inplace=True), nn.Linear(1024,128), nn.ReLU(inplace=True), nn.Linear(128,10))
    def forward( self , x):
        x = self.layer4(self.layer3(self.layer2(self.layer1(x))))
        x = x.reshape(x.size(0) , -1)
        fc_out = self.fc(x)
        return fc_out

# 定义超参数
learning_rate = 1e-2      # 学习率
batch_size    = 2000       # 批的大小
epoches_num   = 10        # 遍历训练集的次数
# 下载训练集 MNIST 手写数字训练集
train_dataset = datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True)
train_loader  = DataLoader( train_dataset, batch_size=batch_size, shuffle=True)

print(len(train_loader))

# 定义model 、loss 、optimizer
model = SimpleCNN().to('cuda:0')
criterion = nn.CrossEntropyLoss().to('cuda:0')
optimizer = optim.Adam(model.parameters(), lr=learning_rate )
# 开始训练
model.train()
starttime = time.time()
for epoch in range(epoches_num):
    train_loss = 0.0
    train_acc  = 0.0
    # 训练
    for i, data in tqdm(enumerate(train_loader, 1)):
        img, label = data
        img   = img.to('cuda:0')
        label = label.to('cuda:0')
        # 前向传播
        optimizer.zero_grad()
        out  = model(img)
        loss = criterion(out, label)
        # 反向传播
        loss.backward()
        optimizer.step()
        # 损失/准确率计算
        train_loss += loss.item() * label.size(0)
        _ , pred    = out.max(1)
        num_correct = pred.eq(label).sum()
        accuracy    = pred.eq(label).float().mean()
        train_acc  += num_correct.item()
    print('Finish  {}  Loss: {:.6f}, Acc: {:.6f}'.format( epoch+1 , train_loss / len(train_dataset), train_acc / len(train_dataset )))
endtime = time.time()
print('with only one GPU, it cost time:', endtime - starttime, '(s)!')
# 保存模型
torch.save(model, './model/cnn_one_gpu.pt')

30


30it [00:06,  4.64it/s]


Finish  1  Loss: 5.958403, Acc: 0.332633


30it [00:06,  4.71it/s]


Finish  2  Loss: 0.374562, Acc: 0.885100


30it [00:06,  4.72it/s]


Finish  3  Loss: 0.138205, Acc: 0.957133


30it [00:06,  4.73it/s]


Finish  4  Loss: 0.089404, Acc: 0.972033


30it [00:06,  4.76it/s]


Finish  5  Loss: 0.070034, Acc: 0.978483


30it [00:06,  4.74it/s]


Finish  6  Loss: 0.058024, Acc: 0.981717


30it [00:06,  4.66it/s]


Finish  7  Loss: 0.049302, Acc: 0.984400


30it [00:06,  4.73it/s]


Finish  8  Loss: 0.040589, Acc: 0.987650


30it [00:06,  4.72it/s]


Finish  9  Loss: 0.032878, Acc: 0.989967


30it [00:06,  4.65it/s]

Finish  10  Loss: 0.030534, Acc: 0.990367
with only one GPU, it cost time: 63.787768602371216 (s)!





In [1]:
# 1.2 单机多卡版本：cnn on minist 采用DataParallel来实现（多进程）

import torch, time, os
from torch import nn, optim
from torchvision import datasets, transforms
from torch.autograd import Variable
from torch.utils.data import DataLoader
from tqdm import tqdm
from torch.nn.parallel import DataParallel

os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0, 1"

device_ids = [0,1]
main_device = 0

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.layer1 = nn.Sequential(nn.Conv2d(1,16,kernel_size=3), nn.BatchNorm2d(16), nn.ReLU(inplace=True))
        self.layer2 = nn.Sequential(nn.Conv2d(16,32,kernel_size=3), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.layer3 = nn.Sequential(nn.Conv2d(32,64,kernel_size=3), nn.BatchNorm2d(64), nn.ReLU(inplace=True))
        self.layer4 = nn.Sequential(nn.Conv2d(64,128,kernel_size=3), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.fc = nn.Sequential(nn.Linear(128*4*4,1024), nn.ReLU(inplace=True), nn.Linear(1024,128), nn.ReLU(inplace=True), nn.Linear(128,10))
    def forward( self , x):
        x = self.layer4(self.layer3(self.layer2(self.layer1(x))))
        x = x.reshape(x.size(0) , -1)
        fc_out = self.fc(x)
        return fc_out

# 定义超参数
learning_rate = 1e-2      # 学习率
batch_size    = 4000       # 批的大小
epoches_num   = 10        # 遍历训练集的次数
# 下载训练集 MNIST 手写数字训练集
train_dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True )
train_loader  = DataLoader( train_dataset, batch_size=batch_size, shuffle=True)

print(len(train_loader))

# 定义model 、loss 、optimizer
model = DataParallel(module=SimpleCNN(), device_ids = device_ids, output_device=main_device).cuda()
criterion = nn.CrossEntropyLoss().cuda()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 开始训练
model.train()
starttime = time.time()
for epoch in range(epoches_num):
    train_loss = 0.0
    train_acc  = 0.0
    # 训练
    for i, data in tqdm(enumerate(train_loader, 1)):
        img, label = data
        img   = img.cuda(non_blocking=True)
        label = label.cuda(non_blocking=True)
        # 前向传播
        optimizer.zero_grad()
        out  = model(img)
        loss = criterion(out, label)
        # 反向传播
        loss.backward()
        optimizer.step()
        # 损失/准确率计算
        train_loss += loss.item() * label.size(0)
        _ , pred    = out.max(1)
        num_correct = pred.eq(label).sum()
        accuracy    = pred.eq(label).float().mean()
        train_acc  += num_correct.item()
    print('Finish  {}  Loss: {:.6f}, Acc: {:.6f}'.format( epoch+1 , train_loss / len(train_dataset), train_acc / len(train_dataset )))
endtime = time.time()
print('with only two GPUs, it cost time:', endtime - starttime, '(s)!')
# 保存模型
torch.save(model, './model/cnn_two_gpus_dp.pt')


15


15it [00:09,  1.62it/s]


Finish  1  Loss: 8.045089, Acc: 0.147517


15it [00:05,  2.58it/s]


Finish  2  Loss: 1.237980, Acc: 0.569300


15it [00:05,  2.57it/s]


Finish  3  Loss: 0.538698, Acc: 0.814600


15it [00:05,  2.57it/s]


Finish  4  Loss: 0.281035, Acc: 0.907167


15it [00:05,  2.57it/s]


Finish  5  Loss: 0.148703, Acc: 0.953367


15it [00:05,  2.59it/s]


Finish  6  Loss: 0.097726, Acc: 0.969883


15it [00:05,  2.60it/s]


Finish  7  Loss: 0.074189, Acc: 0.977117


15it [00:05,  2.58it/s]


Finish  8  Loss: 0.060719, Acc: 0.981200


15it [00:05,  2.58it/s]


Finish  9  Loss: 0.051184, Acc: 0.984583


15it [00:05,  2.57it/s]

Finish  10  Loss: 0.044030, Acc: 0.986583
with only two GPUs, it cost time: 61.6509530544281 (s)!





In [None]:
# 2.torch.nn.parallel.DistributedDataParallel(DDP)分布式训练
# 每个gpu对应一个进程
# ring all-reduce算法（百度提出）：保证GPU之间model和optimizer同步一致。所有的GPU连成一个ring，不需要等待所有卡计算完一轮。分布式同步。
# 算法本质是充分利用带宽，每部分都要走一圈ring，错开走。任意两个GPU之间每步只传一部分。
# 参数：world_size, ranks值范围为[0,world_size-1], local_rank
# 需要定义DistributedDataParallel和DistributedSampler


In [4]:
# 2.1. 单机多卡简单案例实现（网上例子）（成功执行）

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os
from torch.utils.data.distributed import DistributedSampler
# 1) 初始化
torch.distributed.init_process_group(backend="nccl")

input_size = 5
output_size = 2
batch_size = 30
data_size = 90
epoches = 10

# 2） 配置每个进程的gpu
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size).to('cuda')

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len
dataset = RandomDataset(input_size, data_size)
# 3）使用DistributedSampler
rand_loader = DataLoader(dataset=dataset, batch_size=batch_size, sampler=DistributedSampler(dataset))

class Model(nn.Module):
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)
    def forward(self, input):
        output = self.fc(input)
        print("  In Model: input size", input.size(),
              "output size", output.size())
        return output

model = Model(input_size, output_size)

# 4) 封装之前要把模型移到对应的gpu
model.to(device)
if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 5) 封装
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
for epoch in range(epoches):
    for data in rand_loader:
        if torch.cuda.is_available():
            input_var = data
        else:
            input_var = data
        output = model(input_var)
        print("Outside: input size", input_var.size(), "output_size", output.size())

# 单机多卡1：命令，成功执行
# CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 ddp.py

RuntimeError: Distributed package doesn't have NCCL built in

In [None]:
# 2.2 单机多卡简单版本 cnn on minist（成功执行）

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
import os
from torch.utils.data.distributed import DistributedSampler
# 1) 初始化
# torch.distributed.init_process_group(backend="nccl") # 推荐这个，两个gpu占用率才7%左右
# torch.distributed.init_process_group(backend="nccl", init_method='env://') # 报错ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: -6) local_rank: 0 (pid: 3596) of binary: /root/miniconda3/bin/python
torch.distributed.init_process_group(backend="nccl", world_size=2) # 推荐这个,gpu:1占用率会偶尔飙升一下
# torch.distributed.init_process_group(backend="nccl", rank=0, world_size=2)  # 这个有问题，只用了gpu:0

# 定义超参数
learning_rate = 1e-2      # 学习率
batch_size    = 1000       # 批的大小
epoches_num   = 10        # 遍历训练集的次数

# 2） 配置每个进程的gpu
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)

# 3）使用DistributedSampler
dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True )
sampler=DistributedSampler(dataset)
rand_loader = DataLoader(dataset=dataset, batch_size=batch_size, sampler=sampler)

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.layer1 = nn.Sequential(nn.Conv2d(1,16,kernel_size=3), nn.BatchNorm2d(16), nn.ReLU(inplace=True))
        self.layer2 = nn.Sequential(nn.Conv2d(16,32,kernel_size=3), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.layer3 = nn.Sequential(nn.Conv2d(32,64,kernel_size=3), nn.BatchNorm2d(64), nn.ReLU(inplace=True))
        self.layer4 = nn.Sequential(nn.Conv2d(64,128,kernel_size=3), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.fc = nn.Sequential(nn.Linear(128*4*4,1024), nn.ReLU(inplace=True), nn.Linear(1024,128), nn.ReLU(inplace=True), nn.Linear(128,10))
    def forward( self , x):
        x = self.layer4(self.layer3(self.layer2(self.layer1(x))))
        x = x.reshape(x.size(0) , -1)
        fc_out = self.fc(x)
        return fc_out
model = SimpleCNN()

# 4) 封装之前要把模型移到对应的gpu
model.to(device)
# 尽管会降低GPU利用率，但可以提高模型在多卡场景下的表现
# model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)  # BN层同步
# 同步容易报错，内存越界 RuntimeError: CUDA error: an illegal memory access was encountered
if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 5) 封装
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
for epoch in range(epoches_num):
    sampler.set_epoch(epoch)
    for batch_input, batch_label in rand_loader:
        output = model(batch_input)
        print("Outside: input size", batch_input.size(), "output_size", output.size())

# 单机多卡2：命令，成功执行
# CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 ddp.py

In [None]:
# 2.3 单机多卡复杂版本 cnn on minist（成功执行）

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
import os
from torch.utils.data.distributed import DistributedSampler

def reduce_loss(tensor, rank, world_size):
    with torch.no_grad():
        dist.reduce(tensor, dst=0)
        if rank == 0:
            tensor /= world_size

# 1) 初始化
# torch.distributed.init_process_group(backend="nccl") # 推荐这个，两个gpu占用率才7%左右
# torch.distributed.init_process_group(backend="nccl", init_method='env://') # 报错ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: -6) local_rank: 0 (pid: 3596) of binary: /root/miniconda3/bin/python
torch.distributed.init_process_group(backend="nccl", world_size=2) # 推荐这个,gpu:1占用率会偶尔飙升一下
# torch.distributed.init_process_group(backend="nccl", rank=0, world_size=2)  # 这个有问题，只用了gpu:0

# 定义超参数
learning_rate = 1e-2      # 学习率
batch_size    = 1000       # 批的大小
epoches_num   = 10        # 遍历训练集的次数

# 2） 配置每个进程的gpu
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)

# 3）使用DistributedSampler
dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True )
sampler=DistributedSampler(dataset)
rand_loader = DataLoader(dataset=dataset, batch_size=batch_size, sampler=sampler)

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.layer1 = nn.Sequential(nn.Conv2d(1,16,kernel_size=3), nn.BatchNorm2d(16), nn.ReLU(inplace=True))
        self.layer2 = nn.Sequential(nn.Conv2d(16,32,kernel_size=3), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.layer3 = nn.Sequential(nn.Conv2d(32,64,kernel_size=3), nn.BatchNorm2d(64), nn.ReLU(inplace=True))
        self.layer4 = nn.Sequential(nn.Conv2d(64,128,kernel_size=3), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.fc = nn.Sequential(nn.Linear(128*4*4,1024), nn.ReLU(inplace=True), nn.Linear(1024,128), nn.ReLU(inplace=True), nn.Linear(128,10))
    def forward( self , x):
        x = self.layer4(self.layer3(self.layer2(self.layer1(x))))
        x = x.reshape(x.size(0) , -1)
        fc_out = self.fc(x)
        return fc_out
model = SimpleCNN()

# 4) 封装之前要把模型移到对应的gpu
model.to(device)
# 尽管会降低GPU利用率，但可以提高模型在多卡场景下的表现
# model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)  # BN层同步
# 同步容易报错，内存越界 RuntimeError: CUDA error: an illegal memory access was encountered
if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 5) 封装
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
for epoch in range(epoches_num):
    sampler.set_epoch(epoch)
    for batch_input, batch_label in rand_loader:
        output = model(batch_input)
        print("Outside: input size", batch_input.size(), "output_size", output.size())

# 单机多卡2：命令，成功执行
# CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 ddp.py

In [None]:

# 2.3.单机多卡复杂版本 cnn on minist（成功执行）

import torch, time, os
from torch import nn, optim
from torchvision import datasets, transforms
from torch.autograd import Variable
from torch.utils.data import DataLoader, DistributedSampler
from tqdm import tqdm
from torch.nn.parallel import DistributedDataParallel
from torch.distributed import init_process_group, destroy_process_group, get_rank

device_ids = [0,1]
main_device = 0
world_size = len(device_ids)

def reduce_loss(tensor, rank, world_size):
    with torch.no_grad():
        dist.reduce(tensor, dst=0)
        if rank == 0:
            tensor /= world_size

os.environ['MASTER_ADDR'] = 'localhost'  # 0号机器的IP
os.environ['MASTER_PORT'] = '19198'  # 0号机器的可用端口
os.environ['WORLD_SIZE'] = str(world_size)
os.environ['RANK'] = str(main_device)
os.environ['LOCAL_RANK'] = str(main_device)
init_process_group(backend='gloo', init_method='env://')
torch.cuda.set_device(main_device)
global_rank = get_rank()
print(global_rank)

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.layer1 = nn.Sequential(nn.Conv2d(1,16,kernel_size=3), nn.BatchNorm2d(16), nn.ReLU(inplace=True))
        self.layer2 = nn.Sequential(nn.Conv2d(16,32,kernel_size=3), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.layer3 = nn.Sequential(nn.Conv2d(32,64,kernel_size=3), nn.BatchNorm2d(64), nn.ReLU(inplace=True))
        self.layer4 = nn.Sequential(nn.Conv2d(64,128,kernel_size=3), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.fc = nn.Sequential(nn.Linear(128*4*4,1024), nn.ReLU(inplace=True), nn.Linear(1024,128), nn.ReLU(inplace=True), nn.Linear(128,10))
    def forward( self , x):
        x = self.layer4(self.layer3(self.layer2(self.layer1(x))))
        x = x.reshape(x.size(0) , -1)
        fc_out = self.fc(x)
        return fc_out
model = DistributedDataParallel(SimpleCNN().cuda(), device_ids=device_ids, output_device=main_device)

# 定义超参数
learning_rate = 1e-2      # 学习率
batch_size    = 2000       # 批的大小
epoches_num   = 10        # 遍历训练集的次数
# 下载训练集 MNIST 手写数字训练集
train_dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
sampler = DistributedSampler(train_dataset, shuffle=True)
train_loader  = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, sampler=sampler) # 注意这里
criterion = nn.CrossEntropyLoss()

model.train()
for epoch in range(epoches):
    # 让每张卡在每轮迭代中数据顺序随机
    sampler.set_epoch(epoch)
    for batch_input, batch_label in tqdm(dataloader):
        batch_input, batch_label = batch_input.cuda(), batch_label.cuda()
        batch_output = model(batch_input)
        batch_loss = criterion(batch_output, batch_label)
        
        optimizer.zero_grad()
        batch_loss.backward()
        optimizer.step()
        reduce_loss(batch_loss, global_rank, world_size)
    # 损失/准确率计算
    train_loss += loss.item() * label.size(0)
    _ , pred    = out.max(1)
    num_correct = pred.eq(label).sum()
    accuracy    = pred.eq(label).float().mean()
    train_acc  += num_correct.item()
    if global_rank == 0:
        print('Finish  {}  Loss: {:.6f}, Acc: {:.6f}'.format( epoch+1 , train_loss / len(train_dataset), train_acc / len(train_dataset )))
# destroy_process_group()

# 单机多卡：命令
# python -m torch.distributed.launch --nproc_per_node=n_gpus DDP.py

# 多机多卡：命令（晚上跑一跑）
# python -m torch.distributed.launch --nproc_per_node=n_gpus --nodes=2 --node_rank=0 --master_addr="主节点ip" --master_port="主节点端口" DDP.py
# python -m torch.distributed.launch --nproc_per_node=n_gpus --nodes=2 --node_rank=1 --master_addr="主节点ip" --master_port="主节点端口" DDP.py
# python -m torch.distributed.launch --nproc_per_node=n_gpus --nodes=2 --node_rank=2 --master_addr="主节点ip" --master_port="主节点端口" DDP.py

In [None]:

# 2.1.单机多卡实现

import torch, time, os
from torch import nn, optim
from torchvision import datasets, transforms
from torch.autograd import Variable
from torch.utils.data import DataLoader, DistributedSampler
from tqdm import tqdm
from torch.nn.parallel import DistributedDataParallel
from torch.distributed import init_process_group, destroy_process_group, get_rank, reduce
from multiprocessing import Process

device_ids = [0,1]
main_device = 0
world_size = len(device_ids)

def reduce_loss(tensor, rank, world_size):
    with torch.no_grad():
        reduce(tensor, dst=0)
        if rank == 0:
            tensor /= world_size
def setup(rank, world_size):
    "Sets up the process group and configuration for PyTorch Distributed Data Parallelism"
    os.environ["MASTER_ADDR"] = 'localhost'
    os.environ["MASTER_PORT"] = "12355"
    # Initialize the process group
    init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
    "Cleans up the distributed environment"
    destroy_process_group()

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.layer1 = nn.Sequential(nn.Conv2d(1,16,kernel_size=3), nn.BatchNorm2d(16), nn.ReLU(inplace=True))
        self.layer2 = nn.Sequential(nn.Conv2d(16,32,kernel_size=3), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.layer3 = nn.Sequential(nn.Conv2d(32,64,kernel_size=3), nn.BatchNorm2d(64), nn.ReLU(inplace=True))
        self.layer4 = nn.Sequential(nn.Conv2d(64,128,kernel_size=3), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.fc = nn.Sequential(nn.Linear(128*4*4,1024), nn.ReLU(inplace=True), nn.Linear(1024,128), nn.ReLU(inplace=True), nn.Linear(128,10))
    def forward( self , x):
        x = self.layer4(self.layer3(self.layer2(self.layer1(x))))
        x = x.reshape(x.size(0) , -1)
        fc_out = self.fc(x)
        return fc_out
model = SimpleCNN()

# 定义超参数
learning_rate = 1e-2      # 学习率
batch_size    = 2000       # 批的大小
epoches_num   = 10        # 遍历训练集的次数
# 下载训练集 MNIST 手写数字训练集
train_dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
criterion = nn.CrossEntropyLoss()

def train(rank, world_size):
    setup(rank, world_size)
    sampler = DistributedSampler(train_dataset, shuffle=True)
    train_loader  = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, pin_memory=True, sampler=sampler) # 注意这里
    ddp_model = DistributedDataParallel(model.to(rank), device_ids=[rank])
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
    # Train for one epoch
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        if get_rank() == 0:
            print(loss.item())
    cleanup()

def main():
    size = 2
    processes=[]
    for i in range(size):
        p = Process(target=train, args=(i, size))
        p.start()
        processes.append(p)
        # init_processes(i, size)
    for p in processes:
        p.join() 
if __name__=='__main__':
    main()

# 单机多卡：命令
# python -m torch.distributed.launch --nproc_per_node=n_gpus DDP.py

# 多机多卡：命令（晚上跑一跑）
# python -m torch.distributed.launch --nproc_per_node=n_gpus --nodes=2 --node_rank=0 --master_addr="主节点ip" --master_port="主节点端口" DDP.py
# python -m torch.distributed.launch --nproc_per_node=n_gpus --nodes=2 --node_rank=1 --master_addr="主节点ip" --master_port="主节点端口" DDP.py
# python -m torch.distributed.launch --nproc_per_node=n_gpus --nodes=2 --node_rank=2 --master_addr="主节点ip" --master_port="主节点端口" DDP.py

In [None]:

# 2.1.单机多卡实现

import torch, time, os
from torch import nn, optim
from torchvision import datasets, transforms
from torch.autograd import Variable
from torch.utils.data import DataLoader, DistributedSampler
from tqdm import tqdm
from torch.nn.parallel import DistributedDataParallel
from torch.distributed import init_process_group, destroy_process_group, get_rank

device_ids = [0,1]
main_device = 0
world_size = len(device_ids)

def reduce_loss(tensor, rank, world_size):
    with torch.no_grad():
        dist.reduce(tensor, dst=0)
        if rank == 0:
            tensor /= world_size

os.environ['MASTER_ADDR'] = 'localhost'  # 0号机器的IP
os.environ['MASTER_PORT'] = '19198'  # 0号机器的可用端口
os.environ['WORLD_SIZE'] = str(world_size)
os.environ['RANK'] = str(main_device)
os.environ['LOCAL_RANK'] = str(main_device)
init_process_group(backend='nccl', init_method='env://')
torch.cuda.set_device(main_device)
global_rank = get_rank()

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.layer1 = nn.Sequential(nn.Conv2d(1,16,kernel_size=3), nn.BatchNorm2d(16), nn.ReLU(inplace=True))
        self.layer2 = nn.Sequential(nn.Conv2d(16,32,kernel_size=3), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.layer3 = nn.Sequential(nn.Conv2d(32,64,kernel_size=3), nn.BatchNorm2d(64), nn.ReLU(inplace=True))
        self.layer4 = nn.Sequential(nn.Conv2d(64,128,kernel_size=3), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2 , stride=2))
        self.fc = nn.Sequential(nn.Linear(128*4*4,1024), nn.ReLU(inplace=True), nn.Linear(1024,128), nn.ReLU(inplace=True), nn.Linear(128,10))
    def forward( self , x):
        x = self.layer4(self.layer3(self.layer2(self.layer1(x))))
        x = x.reshape(x.size(0) , -1)
        fc_out = self.fc(x)
        return fc_out
model = DistributedDataParallel(SimpleCNN().cuda(), device_ids=device_ids, output_device=main_device)

# 定义超参数
learning_rate = 1e-2      # 学习率
batch_size    = 4000       # 批的大小
epoches_num   = 10        # 遍历训练集的次数
# 下载训练集 MNIST 手写数字训练集
train_dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True )
sampler = DistributedSampler(train_dataset, shuffle=True)
train_loader  = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, sampler=sampler) # 注意这里
criterion = nn.CrossEntropyLoss()

model.train()
for epoch in range(epoches):
    # 让每张卡在每轮迭代中数据顺序随机
    sampler.set_epoch(epoch)
    for batch_input, batch_label in tqdm(dataloader):
        batch_input, batch_label = batch_input.cuda(), batch_label.cuda()
        batch_output = model(batch_input)
        batch_loss = criterion(batch_output, batch_label)
        
        optimizer.zero_grad()
        batch_loss.backward()
        optimizer.step()
        reduce_loss(batch_loss, global_rank, world_size)
    # 损失/准确率计算
    train_loss += loss.item() * label.size(0)
    _ , pred    = out.max(1)
    num_correct = pred.eq(label).sum()
    accuracy    = pred.eq(label).float().mean()
    train_acc  += num_correct.item()
    if global_rank == 0:
        print('Finish  {}  Loss: {:.6f}, Acc: {:.6f}'.format( epoch+1 , train_loss / len(train_dataset), train_acc / len(train_dataset )))
# destroy_process_group()

# 单机多卡：命令
# python -m torch.distributed.launch --nproc_per_node=n_gpus DDP.py

# 多机多卡：命令（晚上跑一跑）
# python -m torch.distributed.launch --nproc_per_node=n_gpus --nodes=2 --node_rank=0 --master_addr="主节点ip" --master_port="主节点端口" DDP.py
# python -m torch.distributed.launch --nproc_per_node=n_gpus --nodes=2 --node_rank=1 --master_addr="主节点ip" --master_port="主节点端口" DDP.py
# python -m torch.distributed.launch --nproc_per_node=n_gpus --nodes=2 --node_rank=2 --master_addr="主节点ip" --master_port="主节点端口" DDP.py

In [4]:
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os
from torch.utils.data.distributed import DistributedSampler
# 1) 初始化
torch.distributed.init_process_group(backend="nccl")

input_size = 5
output_size = 2
batch_size = 30
data_size = 90

# 2） 配置每个进程的gpu
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size).to('cuda')

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len

dataset = RandomDataset(input_size, data_size)
# 3）使用DistributedSampler
rand_loader = DataLoader(dataset=dataset,
                         batch_size=batch_size,
                         sampler=DistributedSampler(dataset))

class Model(nn.Module):
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        print("  In Model: input size", input.size(),
              "output size", output.size())
        return output

model = Model(input_size, output_size)

# 4) 封装之前要把模型移到对应的gpu
model.to(device)

if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 5) 封装
    model = torch.nn.parallel.DistributedDataParallel(model,
                                                      device_ids=[local_rank],
                                                      output_device=local_rank)

for data in rand_loader:
    if torch.cuda.is_available():
        input_var = data
    else:
        input_var = data

    output = model(input_var)
    print("Outside: input size", input_var.size(), "output_size", output.size())

ValueError: Error initializing torch.distributed using env:// rendezvous: environment variable RANK expected, but not set

In [11]:
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os
from torch.utils.data.distributed import DistributedSampler
# 1) 初始化
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12345'
torch.distributed.init_process_group(backend="nccl", init_method='env://', rank = 0, world_size = 2)

input_size = 5
output_size = 2
batch_size = 30
data_size = 90

# 2） 配置每个进程的gpu
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size).to('cuda')

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len

dataset = RandomDataset(input_size, data_size)
# 3）使用DistributedSampler
rand_loader = DataLoader(dataset=dataset,
                         batch_size=batch_size,
                         sampler=DistributedSampler(dataset))

class Model(nn.Module):
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        print("  In Model: input size", input.size(),
              "output size", output.size())
        return output

model = Model(input_size, output_size)

# 4) 封装之前要把模型移到对应的gpu
model.to(device)

if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 5) 封装
    model = torch.nn.parallel.DistributedDataParallel(model,
                                                      device_ids=[local_rank],
                                                      output_device=local_rank)

for data in rand_loader:
    if torch.cuda.is_available():
        input_var = data
    else:
        input_var = data

    output = model(input_var)
    print("Outside: input size", input_var.size(), "output_size", output.size())

RuntimeError: trying to initialize the default process group twice!

In [None]:
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import torch.backends.cudnn as cudnn
from torch.autograd import Variable
import time

def allreduce(send, recv):
    """ Implementation of a ring-reduce. """
    rank = dist.get_rank()
    size = dist.get_world_size()
    send_buff = torch.zeros(send.size())
    recv_buff = torch.zeros(send.size())
    accum = torch.zeros(send.size())
    accum[:] = send[:]
    # th.cuda.synchronize()

    left = ((rank - 1) + size) % size
    right = (rank + 1) % size

    for i in range(size - 1):
        if i % 2 == 0:
            # Send send_buff
            send_req = dist.isend(send_buff, right)
            dist.recv(recv_buff, left)
            accum[:] += recv[:]
        else:
            # Send recv_buff
            send_req = dist.isend(recv_buff, right)
            dist.recv(send_buff, left)
            accum[:] += send[:]
        send_req.wait()
    # th.cuda.synchronize()
    recv[:] = accum[:]


def run(rank, size):
    """ Distributed function to be implemented later. """
    model = Model()
    model = torch.nn.parallel.DistributedDataParallel(model.cuda())
    criterion = torch.nn.MSELoss(size_average=False).cuda()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    cudnn.benchmark = True
    x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
    y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))

    for epoch in range(500000):
        y_pred = model(x_data.cuda())
        # Compute loss
        loss = criterion(y_pred.cuda(), y_data.cuda())
        print(epoch, loss.data[0])
        # Zero gradients
        optimizer.zero_grad()
        # perform backward pass
        loss.backward()
        # update weights
        optimizer.step()

    hour_var = Variable(torch.Tensor([[7.0]]))
    print("predict (after training)", 7, model.forward(hour_var).data[0][0])


def init_processes(rank, size, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '192.168.0.12'
    os.environ['MASTER_PORT'] = '29555'
    dist.init_process_group(backend, rank=rank, world_size=size)
    # print("MM")
    ## 实现代码
    model = Model()
    model = torch.nn.parallel.DistributedDataParallel(model.cuda())
    criterion = torch.nn.MSELoss(size_average=False).cuda()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    cudnn.benchmark = True
    x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
    y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))

    for epoch in range(500000):
        y_pred = model(x_data.cuda())
        # Compute loss
        loss = criterion(y_pred.cuda(), y_data.cuda())
        print(epoch, loss.data[0])
        # Zero gradients
        optimizer.zero_grad()
        # perform backward pass
        loss.backward()
        # update weights
        optimizer.step()
    hour_var = Variable(torch.Tensor([[7.0]]))
    print("predict (after training)", 7, model.forward(hour_var).data[0][0])

def main():
    size = 2
    processes=[]
    for i in range(size):
        p = Process(target=init_processes, args=(i, size))
        p.start()
        processes.append(p)
        # init_processes(i, size)
    for p in processes:
        p.join()

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    print("耗时：", end_time-start_time)

In [None]:
# 另一个案例：单机多卡

import os
import torch
import torch.cuda
import torch.distributed as dist # 多卡通信
import torch.multiprocessing as mp # dpp启动
from torch.cuda.amp import GradScaler # 混合精度训练用
from torch.utils.data.distributed import DistributedSampler # 分布式采样
from torch.nn.parallel import DistributedDataParallel as DDP # 模型传递

# 启动1:
# mp.spawn()，下面代码就是这个模式

# 启动2:采用torchrun 命令启动所有机器上的gpu
# torchrun --standalone --nproc_per_node=2 ddp_main.py --gpu 0,1

# 进程初始化
def init_ddp(local_rank): # 当前cuda编号
    torch.cuda.set_device(local_rank)
    os.environ['RANK'] = str(local_rank)
    # 初始化进程组
    dist.init_process_group(backend='nccl', init_method='env://')

def get_ddp_generator(seed=3407):
    # 对每个进程使用不同的随机种子，增强训练的随机性
    local_rank = dist.get_rank()
    g = torch.Generator()
    g.manual_seed(seed + local_rank)
    return g
# 对多个进程对计算结果进行汇总
def reduce_tensor(tensor: torch.Tensor):
    rt = tensor.clone()
    dist.all_reduce(rt, op=dist.reduce_op.SUM)
    rt /= dist.get_world_size()
    return rt
# 数据采样：训练和测试

def get_dataloader(path, args, tokenizer, train:bool): 
    '''
    根据给定的路径获取数据，并将数据和训练标志传递给数据加载器，这样可以方便地从给定路径加载数据并生成数据加载器，以供后续的模型训练和评估使用。
    path：数据存放路径
    tokenizer：分词器
    train：是否是训练阶段
    '''
    texts, labels = load_dataset(path, args['num_labels'])
    texts = tokenizer(texts, padding='max_length', truncation=True, return_tensors='pt', max_length=args['max_length']) 
    data = TensorDataset(texts['input_ids'], texts['attention_mask'], torch.tensor(labels)) 
    
    if train:
        train_sampler = DistributedSampler(data, shuffle=True)  # 创建一个分布式随机采样器。
        g = get_ddp_generator()
        dataloader = DataLoader(dataset=data,
                                batch_size=args['batch_size'],
                                num_workers=args['num_workers'],
                                pin_memory=True,
                                shuffle=False, # 已经指定了采样器
                                sampler=train_sampler, #采用随机采样器。
                                generator=g) 
        
    else:
        test_sampler = DistributedSampler(data, shuffle=False) # 创建一个顺序采样器。
        dataloader = DataLoader(dataset=data,
                                batch_size=args['batch_size'],
                                num_workers=args['num_workers'],
                                pin_memory=True,
                                shuffle=False,
                                sampler=test_sampler #采用顺序采样器。
                                )
    return dataloader
# 训练函数

def train(model, train_dataloader, optimizer, scheduler, criterion, actual_epoch, scaler, args):
    model.train()
    
    tr_loss = 0
    num_train_samples = 0
    
    for step, batch in enumerate(train_dataloader):
        batch = tuple(t.cuda(non_blocking=True) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch
        with torch.cuda.amp.autocast(): # amp 自动混合精度训练
	        output = model(b_input_ids, attention_mask=b_input_mask, labels=b_labels) # 运行到这一行会增加一下显存
	        loss = criterion(output.logits.view(-1,args['num_labels']), b_labels.type_as(output.logits).view(-1,args['num_labels']))
        reduced_loss = reduce_tensor(loss.data)  # 对并行进程计算的多个 loss 取平均
        if dist.get_rank() == 0:  # 防止重复输出
            print("\nOutput Loss: ", reduced_loss.item())
        tr_loss += reduced_loss.item()
        # 并行状态下的更新，不同进程分别根据自己计算的 loss 更新数据
        optimizer.zero_grad()
        scaler.scale(loss).backward() # 混合精度
        scaler.step(optimizer)  #  运行到这一行会增加一下显存
        # 下面四行，多个进程只执行一次
        scheduler.step() # 梯度schedule
        scaler.update() # 更新梯度
        num_train_samples += b_labels.size(0) #将批次中的样本数量添加到 num_train_samples 中。
        torch.cuda.empty_cache()  # 释放GPU reserved memory显存
    
    epoch_train_loss = tr_loss / num_train_samples  # num_train_samples 代表每个进程承接的样本数量，由于上面已经有对loss取平均的操作，这里分母无需再乘以进程数

    if dist.get_rank() == 0:
        print("\nTrain loss after Epoch {} : {}".format(actual_epoch, epoch_train_loss))
# validate

@torch.no_grad()
def validate(model, valid_dataloader, criterion, epoch, args, threshold=0.5):

    model.eval()

    eval_loss = 0.0
    num_eval_samples = 0
    
    pred_labels = []
    true_labels = []
    
    for step, batch in enumerate(valid_dataloader):
        
        batch = tuple(t.cuda(non_blocking=True) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch
        
        with torch.no_grad():
            with torch.cuda.amp.autocast(): # 混合精度
	            output = model(b_input_ids, attention_mask=b_input_mask)
	            logits = output.logits
                        
            loss = criterion(logits.view(-1,args['num_labels']), b_labels.type_as(logits).view(-1,args['num_labels']))
            
            reduced_loss = reduce_tensor(loss.data)
            eval_loss += reduced_loss.item()
            
            pred_label = torch.sigmoid(logits)
            pred_label = pred_label.to('cpu').numpy()
            b_labels = b_labels.to('cpu').numpy()
            
        pred_labels.append(pred_label)
        true_labels.append(b_labels)

        num_eval_samples += b_labels.shape[0]  # 这里是针对单个 进程 的 计算样本数
    
    epoch_eval_loss = eval_loss/num_eval_samples  
    
    if dist.get_rank() == 0:
        print("Validation loss after Epoch {} : {}".format(epoch, epoch_eval_loss))
# 进程主函数：单机多卡模拟


def initialise_model(*args):
    pass

def main(local_rank, args):  # 参数列表更新
    init_ddp(local_rank)  ### 进程初始化

    best_macro = 0

    model, tokenizer = initialise_model(args['modelname'], args['num_labels'])
    model.cuda()
    # 尽管会降低GPU利用率，但可以提高模型在多卡场景下的表现
    model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)  # BN层同步
    
    num_gpus = torch.cuda.device_count()
    if num_gpus > 1:
        print('use {} gpus!'.format(num_gpus))
        # 数据并行：单机多gpu，对模型封装
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)  ### 套 DDP
    
    num_training_steps = args['num_epochs'] * (args['num_samples'] // args['batch_size']) #总的训练步数
    
    if args['requires_grad']:  # 权重衰减
        param_optimizer = list(model.named_parameters())
        no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
        # 设置模型参数的权重衰减
        optimizer_grouped_parameters = [
            {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
             'weight_decay': 0.01},
            {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
        ]
        optimizer = AdamW(optimizer_grouped_parameters, lr=float(args['learning_rate'])) # 部分参数更新
    else:
        optimizer = AdamW(model.parameters(), lr=float(args['learning_rate'])) # 部分参数更新
    
    scheduler = get_linear_schedule_with_warmup(optimizer,  num_warmup_steps=100, num_training_steps=num_training_steps) #创建学习率调度器。
    
    scaler = GradScaler()  ###  用于混合精度训练,FP32和FP16
    criterion = BCEWithLogitsLoss().cuda() #定义损失函数。

    train_dataloader = get_dataloader(args['traincsvpath'], args, tokenizer, train=True)
    valid_dataloader = get_dataloader(args['valcsvpath'], args, tokenizer, train=False)

    for actual_epoch in trange(args['num_epochs'], desc="Epoch"):
        if local_rank == 0:  ### 防止每个进程都输出一次，只允许0号进程打印中间结果
            print("begin training of epoch %d / %d" % (actual_epoch + 1, args['num_epochs']))
        
        train_dataloader.sampler.set_epoch(actual_epoch)  # 训练时每次的 sampling 顺序不同
        train(model, train_dataloader, optimizer, scheduler, criterion, actual_epoch, scaler, args)   
        
        if local_rank == 0:
            print(f'begin validating')  
        macro = validate(model, valid_dataloader, criterion, actual_epoch, args) #在验证集上评估模型。
     
        if macro > best_macro:
            best_macro = macro
            if local_rank == 0:  # 防止每个进程都保存一次
                save_model(actual_epoch, model, scaler, args['model_save_dir'] + '/best_macro_model_DDP_direct.pt')
    
    dist.destroy_process_group()  # 消除进程组，和 init_process_group 相对
# main函数

import argparse, time

def main():
    pass
def test():
    pass

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-args', help="priority", type=bool, required=False, default=True)
    parser.add_argument('-gpu', default='0,1', type=str, help='gpu device ids for CUDA_VISIBLE_DEVICES')
    parser.add_argument('-mode', help="train&test", type=str, required=False, default='train')
    parser.add_argument('-requires_grad', help="whether to weight_decay", type= bool, required=False, default=True)
    args = parser.parse_args()
    os.environ['MASTER_ADDR'] = 'localhost'  # 0号机器的IP
    os.environ['MASTER_PORT'] = '19198'  # 0号机器的可用端口
    os.environ['CUDA_VISIBLE_DEVICES'] = args['gpu']  # 使用哪些GPU
    world_size = torch.cuda.device_count()
    os.environ['WORLD_SIZE'] = str(world_size)
    os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128"
    os.environ["TOKENIZERS_PARALLELISM"] = "false"  # 指定程序在分词时不并行执行
    
    # train模式
    if args['mode'] == 'train':
        time_start = time.time()
        mp.spawn(fn=main, args=(args, ), nprocs=world_size) # 启动进程
        time_elapsed = time.time() - time_start
        print(f'\ntime elapsed: {time_elapsed:.2f} seconds.')
    # test模式
    elif args['mode'] == 'test':  
        time_start = time.time()
        mp.spawn(fn=test, args=(args, ), nprocs=world_size)
        time_elapsed = time.time() - time_start
        print(f'\ntime elapsed: {time_elapsed:.2f} seconds.')
