# nn.DataParallel  不推荐并行训练，但可以并行推理

Data Parrallel 的问题：

①单进程，多线程，由于GIL锁的问题，不能充分发挥多卡的优势

②负载均衡：由于Data Parrallel的训练策略问题，会存在一个主节点占用比其他节点高很多

③效率较低，每次训练开始都要重新同步模型，大模型的同步时间会较难接受

④只适用于单机训练，无法支持真正的分布式多节点训练

### 发现bfloat16会降低训练精度3个点

## Step1 导入相关包

In [1]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = "6,7"

from transformers import AutoTokenizer, AutoModelForSequenceClassification

  from .autonotebook import tqdm as notebook_tqdm


## Step2 加载数据

In [2]:
import pandas as pd

data = pd.read_csv("ChnSentiCorp_htl_all.csv")
data

Unnamed: 0,label,review
0,1,"距离川沙公路较近,但是公交指示不对,如果是""蔡陆线""的话,会非常麻烦.建议用别的路线.房间较..."
1,1,商务大床房，房间很大，床有2M宽，整体感觉经济实惠不错!
2,1,早餐太差，无论去多少人，那边也不加食品的。酒店应该重视一下这个问题了。房间本身很好。
3,1,宾馆在小街道上，不大好找，但还好北京热心同胞很多~宾馆设施跟介绍的差不多，房间很小，确实挺小...
4,1,"CBD中心,周围没什么店铺,说5星有点勉强.不知道为什么卫生间没有电吹风"
...,...,...
7761,0,尼斯酒店的几大特点：噪音大、环境差、配置低、服务效率低。如：1、隔壁歌厅的声音闹至午夜3点许...
7762,0,盐城来了很多次，第一次住盐阜宾馆，我的确很失望整个墙壁黑咕隆咚的，好像被烟熏过一样家具非常的...
7763,0,看照片觉得还挺不错的，又是4星级的，但入住以后除了后悔没有别的，房间挺大但空空的，早餐是有但...
7764,0,我们去盐城的时候那里的最低气温只有4度，晚上冷得要死，居然还不开空调，投诉到酒店客房部，得到...


In [3]:
data = data.dropna()
data

Unnamed: 0,label,review
0,1,"距离川沙公路较近,但是公交指示不对,如果是""蔡陆线""的话,会非常麻烦.建议用别的路线.房间较..."
1,1,商务大床房，房间很大，床有2M宽，整体感觉经济实惠不错!
2,1,早餐太差，无论去多少人，那边也不加食品的。酒店应该重视一下这个问题了。房间本身很好。
3,1,宾馆在小街道上，不大好找，但还好北京热心同胞很多~宾馆设施跟介绍的差不多，房间很小，确实挺小...
4,1,"CBD中心,周围没什么店铺,说5星有点勉强.不知道为什么卫生间没有电吹风"
...,...,...
7761,0,尼斯酒店的几大特点：噪音大、环境差、配置低、服务效率低。如：1、隔壁歌厅的声音闹至午夜3点许...
7762,0,盐城来了很多次，第一次住盐阜宾馆，我的确很失望整个墙壁黑咕隆咚的，好像被烟熏过一样家具非常的...
7763,0,看照片觉得还挺不错的，又是4星级的，但入住以后除了后悔没有别的，房间挺大但空空的，早餐是有但...
7764,0,我们去盐城的时候那里的最低气温只有4度，晚上冷得要死，居然还不开空调，投诉到酒店客房部，得到...


## Step3 创建Dataset

In [4]:
from torch.utils.data import Dataset

class MyDataset(Dataset):

    def __init__(self) -> None:
        super().__init__()
        self.data = pd.read_csv("ChnSentiCorp_htl_all.csv")
        self.data = self.data.dropna()

    def __getitem__(self, index):
        return self.data.iloc[index]["review"], self.data.iloc[index]["label"]
    
    def __len__(self):
        return len(self.data)

In [5]:
dataset = MyDataset()
for i in range(5):
    print(dataset[i])

('距离川沙公路较近,但是公交指示不对,如果是"蔡陆线"的话,会非常麻烦.建议用别的路线.房间较为简单.', np.int64(1))
('商务大床房，房间很大，床有2M宽，整体感觉经济实惠不错!', np.int64(1))
('早餐太差，无论去多少人，那边也不加食品的。酒店应该重视一下这个问题了。房间本身很好。', np.int64(1))
('宾馆在小街道上，不大好找，但还好北京热心同胞很多~宾馆设施跟介绍的差不多，房间很小，确实挺小，但加上低价位因素，还是无超所值的；环境不错，就在小胡同内，安静整洁，暖气好足-_-||。。。呵还有一大优势就是从宾馆出发，步行不到十分钟就可以到梅兰芳故居等等，京味小胡同，北海距离好近呢。总之，不错。推荐给节约消费的自助游朋友~比较划算，附近特色小吃很多~', np.int64(1))
('CBD中心,周围没什么店铺,说5星有点勉强.不知道为什么卫生间没有电吹风', np.int64(1))


## Step4 划分数据集

In [6]:
from torch.utils.data import random_split

trainset, validset = random_split(dataset, lengths=[0.9, 0.1])
len(trainset), len(validset)

(6989, 776)

In [7]:
for i in range(10):
    print(trainset[i])

('酒店地理位置比较比较偏僻，不过穿过两条马路就是石老人沙滩。酒店前台对客人比较怠慢，特别是女服务员，脸上几乎没有表情，对客人的询问或者是要求，都不太理睬，或者就是说，你等会。感觉很不好。男服务员倒是比较热情的，这个后面我会提到。酒店房间设施感觉一般，我定的是豪华房，感觉基本符合四星要求。大床比较舒服，房间内很多物品都是需要付费才能享用，好多介绍书都是用韩文标注的，感觉我到了韩国，中文标注的字体很不明显。酒店地址偏远，附近没有什么好的饭店，酒店内的饭店也在装修，只有咖啡厅和韩国餐厅营业，想吃点中餐很难，询问前台服务员，整个就一热脸贴冷屁股，爱搭不理的样子，我们没吃就被气饱了。晚上朋友带我们去吃海鲜，结果半夜两点两人都上吐下泻，请服务员送了点止泻的药和热水过来，还不错，很快送来了。早上五点还是挺难受，因为下午要赶回程的飞机，所以打算去医院看看病尽快止泻，想请前台派个人陪我们一起去，结果前台经理（男的）态度还不错，让我们等会，结果等到6点一刻，他告知我们6：30交接班，晚班的人要回去休息，早班的人送我们去，说实话，我们那时候都特别累，拉了一晚上，当时感觉肚子也没那么疼了，也别麻烦别人了，我们就自己上楼去休息。10点钟，拉了一晚上，饿的前胸贴后背，餐厅只有自助餐，我们一点油腻的东西都没胃口碰，于是我们拖着虚弱的身子询问哪有喝粥的，结果我们是二楼跑一楼，一楼跑二楼，在询问了无数遍，上下三四次后终于得知咖啡厅有粥，我们要两份白粥，告知10元一小碗，没有配菜，切片面包，2元/片，外加15％的服务费，就这样，两碗白粥六片面包总共37元，我和妈妈开始热切怀念之前两天住的太平角如家酒店10元的自主早餐了。点完餐，因为有送餐服务，我们就先回房间去休息了，结果左等右等都没有服务员过来送，我可怜的老妈又拉又吐，脸色蜡黄，饿的要命，因为吃止泻药之前得吃点早饭垫垫。后来老妈忍不住了，催我给餐厅打电话，我翻遍房间内得说明，恁是没找到餐厅电话，老妈只好自己亲自下楼去催，结果被告知粥还在熬呢。。。。。。在好漫长得等待后，终于端来了热腾腾得白粥和面包，我们那个激动啊，终于有吃得了。后来我们把情况和服务生说了下，他说我们身体情况应该去医院，他要和酒店领导说一下，我们很是感动，说实话，到了酒店以后就这个服务生最热情了，可后来直到我们退房，也没一个电话。。。。。618元的房费，但是服务却达不到这个价格，之前

## Step5 创建Dataloader

In [8]:
import torch

tokenizer = AutoTokenizer.from_pretrained("hfl/rbt3")

def collate_func(batch):
    texts, labels = [], []
    for item in batch:
        texts.append(item[0])
        labels.append(item[1])
    inputs = tokenizer(texts, max_length=128, padding="max_length", truncation=True, return_tensors="pt")
    inputs["labels"] = torch.tensor(labels)
    return inputs

In [9]:
from torch.utils.data import DataLoader

trainloader = DataLoader(trainset, batch_size=256, shuffle=True, collate_fn=collate_func)
validloader = DataLoader(validset, batch_size=64, shuffle=False, collate_fn=collate_func)

In [11]:
print(len(trainloader))
print(len(trainloader.dataset))
print(len(validloader))
print(len(validloader.dataset))

28
6989
13
776


In [None]:
next(enumerate(validloader))[1]

{'input_ids': tensor([[ 101, 2791, 7313,  ...,    0,    0,    0],
        [ 101, 1920, 6825,  ...,    0,    0,    0],
        [ 101, 1765, 4415,  ...,    0,    0,    0],
        ...,
        [ 101, 1453, 3314,  ...,    0,    0,    0],
        [ 101, 6983, 2421,  ...,    0,    0,    0],
        [ 101, 2697, 6230,  ...,    0,    0,    0]]), 'token_type_ids': tensor([[0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        ...,
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        ...,
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0]]), 'labels': tensor([1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1,
        1, 1, 1

## Step6 创建模型及优化器

In [11]:
from torch.optim import Adam

model = AutoModelForSequenceClassification.from_pretrained("hfl/rbt3")

if torch.cuda.is_available():
    model = model.cuda()

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at hfl/rbt3 and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [12]:
## New code 数据并行

model = torch.nn.DataParallel(model, device_ids=None)  # 默认所有卡都用

In [13]:
model.device_ids

[0, 1]

In [14]:
model  # 变成DataParallel

DataParallel(
  (module): BertForSequenceClassification(
    (bert): BertModel(
      (embeddings): BertEmbeddings(
        (word_embeddings): Embedding(21128, 768, padding_idx=0)
        (position_embeddings): Embedding(512, 768)
        (token_type_embeddings): Embedding(2, 768)
        (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (encoder): BertEncoder(
        (layer): ModuleList(
          (0-2): 3 x BertLayer(
            (attention): BertAttention(
              (self): BertSdpaSelfAttention(
                (query): Linear(in_features=768, out_features=768, bias=True)
                (key): Linear(in_features=768, out_features=768, bias=True)
                (value): Linear(in_features=768, out_features=768, bias=True)
                (dropout): Dropout(p=0.1, inplace=False)
              )
              (output): BertSelfOutput(
                (dense): Linear(in_features=768, out_features=768

In [15]:
model.module  # 拿到真正的模型

BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(21128, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-2): 3 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-1

In [16]:
optimizer = Adam(model.parameters(), lr=2e-5)

## Step7 训练与验证

In [17]:
def evaluate():
    model.eval()
    acc_num = 0
    with torch.inference_mode():
        for batch in validloader:
            if torch.cuda.is_available():
                batch = {k: v.cuda() for k, v in batch.items()}
            output = model(**batch)
            pred = torch.argmax(output.logits, dim=-1)
            acc_num += (pred.long() == batch["labels"].long()).float().sum()
    return acc_num / len(validset)

def train(epoch=3, log_step=100):
    global_step = 0
    for ep in range(epoch):
        model.train()
        for batch in trainloader:
            if torch.cuda.is_available():
                batch = {k: v.cuda() for k, v in batch.items()}
            optimizer.zero_grad()
            output = model(**batch)

            # New Code
            # print(output.loss)  # 不再是标量，无法反向传播
            loss = output.loss.mean()
            loss.backward()
            # output.loss.backward()
            optimizer.step()
            if global_step % log_step == 0:
                print(f"ep: {ep}, global_step: {global_step}, loss: {loss.mean().item()}")
                # print(f"ep: {ep}, global_step: {global_step}, loss: {output.loss.item()}")
            global_step += 1
        acc = evaluate()
        print(f"ep: {ep}, acc: {acc}")

## Step8 模型训练

In [18]:
train()



ep: 0, global_step: 0, loss: 0.6887935400009155
ep: 0, global_step: 100, loss: 0.27352023124694824
ep: 0, acc: 0.8659793734550476
ep: 1, global_step: 200, loss: 0.22526614367961884
ep: 1, acc: 0.8853092789649963
ep: 2, global_step: 300, loss: 0.19486884772777557
ep: 2, acc: 0.8827319145202637


 单GPU  batchsize 32：  27.6s, acc: 88.27

 双GPU  batchsize 2*16：31.7s, acc: 89.81
 
 双GPU  batchsize 2*32：19.8s, acc: 89.17

## Step9 模型预测

In [19]:
sen = "我觉得这家酒店不错，饭很好吃！"
id2_label = {0: "差评！", 1: "好评！"}
model.eval()
with torch.inference_mode():
    inputs = tokenizer(sen, return_tensors="pt")
    inputs = {k: v.cuda() for k, v in inputs.items()}
    logits = model(**inputs).logits
    pred = torch.argmax(logits, dim=-1)
    print(f"输入：{sen}\n模型预测结果:{id2_label.get(pred.item())}")

输入：我觉得这家酒店不错，饭很好吃！
模型预测结果:好评！


In [20]:
from transformers import pipeline

model.module.config.id2label = id2_label
pipe = pipeline("text-classification", model=model.module, tokenizer=tokenizer, device=0)

In [21]:
pipe(sen)

[{'label': '好评！', 'score': 0.9869500994682312}]

## Step10 分布式推理测试

In [38]:
%%time
# 单GPU推理
with torch.inference_mode():
    for batch in trainloader:
        if torch.cuda.is_available():
            batch = {k: v.cuda() for k, v in batch.items()}
        output = model.module(**batch)

CPU times: user 1min 5s, sys: 2.96 s, total: 1min 8s
Wall time: 2.44 s


In [39]:
%%time
# 双GPU推理，batchsize为512时才看出优势
with torch.inference_mode():
    for batch in trainloader:
        if torch.cuda.is_available():
            batch = {k: v.cuda() for k, v in batch.items()}
        output = model(**batch)

CPU times: user 59.6 s, sys: 2 s, total: 1min 1s
Wall time: 1.91 s


In [40]:
%%time
# 改进forward的双GPU推理
replicas = model.replicate(model.module, model.device_ids[:2]) # 让模型只复制一次，由于数据量小看不出效果

with torch.inference_mode():
    for batch in trainloader:
        if torch.cuda.is_available():
            batch = {k: v.cuda() for k, v in batch.items()}
        output = model(**batch)
        inputs, module_kwargs = model.scatter(inputs=None, kwargs=batch, device_ids=model.device_ids)  # 将 inputs 和 kwargs 中的数据分发到指定的 GPU 上
        
        outputs = model.parallel_apply(replicas, inputs, module_kwargs)
        model.gather(outputs, model.output_device)


CPU times: user 1min 1s, sys: 2.38 s, total: 1min 3s
Wall time: 3.63 s
