数据并行，流水并行，张量并行,多维混合并行等等

https://juejin.cn/post/7269698032655728640

## DataParallel

https://www.cnblogs.com/CircleWang/p/15620825.html

https://zhuanlan.zhihu.com/p/86441879

In [3]:
import torch.nn as nn
import os
os.environ["CUDA_VISIBLE_DEVICES"] = '4,5,6,7' # 指定该程序可以识别的物理GPU编号
 


In [None]:

model = MyModel()
model = nn.DataParallel(model)
model = model.cuda()


In [None]:
model.device_ids

## DDP

In [None]:
# without tainer
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "4,5,6,7"
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '7860'
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

def setup(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

def cleanup():
    dist.destroy_process_group()

def main(rank, world_size):
    setup(rank, world_size)


    print(f"Process {rank} initialized successfully!")
    model = torch.nn.Linear(10, 1).to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    optimizer = torch.optim.SGD(ddp_model.parameters(), lr=0.01)
    criterion = torch.nn.MSELoss()

    inputs = torch.randn(20, 10).to(rank)
    targets = torch.randn(20, 1).to(rank)

    for epoch in range(5):
        optimizer.zero_grad()
        outputs = ddp_model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        print(f"Rank {rank}, Epoch {epoch}, Loss: {loss.item()}")

    cleanup() # PyTorch 分布式训练中用于清理和终止分布式进程组的函数


if __name__ == "__main__":
    world_size = 4  
    torch.multiprocessing.spawn(main, args=(world_size,), nprocs=world_size)

In [None]:
# tainer

...
import torch.distributed as dist

dist.init_process_group(backend="nccl")
...
from torch.utils.data.distributed import DistributedSampler

trainloader = DataLoader(trainset, batch_size=32, collate_fn=collate_func, sampler=DistributedSampler(trainset))
validloader = DataLoader(validset, batch_size=64, collate_fn=collate_func, sampler=DistributedSampler(validset))
...
from torch.nn.parallel import DistributedDataParallel as DDP
model = xxx
if torch.cuda.is_available():
    model = model.to(int(os.environ["LOCAL_RANK"]))
model = DDP(model)
...
def print_rank_0(info):
    if int(os.environ["RANK"]) == 0:
        print(info)
...
def evaluate():
    model.eval()
    acc_num = 0
    with torch.inference_mode():
        for batch in validloader:
            if torch.cuda.is_available():
                batch = {k: v.to(int(os.environ["LOCAL_RANK"])) for k, v in batch.items()}
            output = model(**batch)
            pred = torch.argmax(output.logits, dim=-1)
            acc_num += (pred.long() == batch["labels"].long()).float().sum()
    dist.all_reduce(acc_num)
    return acc_num / len(validset)

def train(epoch=3, log_step=100):
    global_step = 0
    for ep in range(epoch):
        model.train()
        trainloader.sampler.set_epoch(ep)
        for batch in trainloader:
            if torch.cuda.is_available():
                batch = {k: v.to(int(os.environ["LOCAL_RANK"])) for k, v in batch.items()}
            optimizer.zero_grad()
            output = model(**batch)
            loss = output.loss
            loss.backward()
            optimizer.step()
            if global_step % log_step == 0:
                dist.all_reduce(loss, op=dist.ReduceOp.AVG)
                print_rank_0(f"ep: {ep}, global_step: {global_step}, loss: {loss.item()}")
            global_step += 1
        acc = evaluate()
        print_rank_0(f"ep: {ep}, acc: {acc}")
...

## Accelerate

accelerate: https://huggingface.co/docs/accelerate/main/en/usage_guides/explore

### 初级使用

In [None]:

from accelerate import Accelerator ## accelerate
accelerator = Accelerator() ## accelerate

model, optimizer, training_dataloader, scheduler = accelerator.prepare(## accelerate
    model, optimizer, training_dataloader, scheduler
)

for batch in training_dataloader:
    optimizer.zero_grad()
    inputs, targets = batch
    inputs = inputs.to(device)
    targets = targets.to(device)
    outputs = model(inputs)
    loss = loss_function(outputs, targets)
    accelerator.backward(loss)## accelerate
    optimizer.step()
    scheduler.step()


### 启动

In [None]:
# 默认路径: ~/.cache/huggingface/accelerate/\n
! accelerate config
! accelerate launch {xx.py}
! accelerate launch --help

### 混合精度

In [None]:
accelerator = Accelerator(mixed_precision = "bf16") # choice1
! accelerator config && choice bf16 # choice2
! accelerator = launch --mixed_precision bf16 {xx.py} # choice2

### 梯度累计

In [None]:
accelerator = Accelerator(gradient_accumulation_steps=xx)
...
with accelerator.accumulate(model):

### 日志记录（Tensorboard）

In [None]:
accelerator = Accelerator(log_with="tensorboard", project_dir="xx")
...
accelerator.init_trackers(project_name="xx")
...
accelerator.end_training

### 模型保存

In [None]:
accelerator.unwrap_model(model).save_pretrained(
    save_directory=accelerator.project_dir + f"/step_{global_step}/model",
    is_main_process=accelerator.is_main_process,
    state_dict=accelerator.get_state_dict(model),
    save_func=accelerator.save
)

### 断点续训

In [None]:
accelerator.save_state()
accelerator.load_state()
# 计算resume_epoch和resume_step
# accelerator.skip_first_batches(trainloader,resume_step)

### example

In [None]:
import time
import math
import torch
import pandas as pd

from torch.optim import Adam
from accelerate import Accelerator
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data import random_split
from peft import LoraConfig, get_peft_model
from transformers import BertTokenizer, BertForSequenceClassification


class MyDataset(Dataset):

    def __init__(self) -> None:
        super().__init__()
        self.data = pd.read_csv("./ChnSentiCorp_htl_all.csv")
        self.data = self.data.dropna()

    def __getitem__(self, index):
        return self.data.iloc[index]["review"], self.data.iloc[index]["label"]
    
    def __len__(self):
        return len(self.data)


def prepare_dataloader():

    dataset = MyDataset()

    trainset, validset = random_split(dataset, lengths=[0.9, 0.1], generator=torch.Generator().manual_seed(42))

    tokenizer = BertTokenizer.from_pretrained("/gemini/code/model")

    def collate_func(batch):
        texts, labels = [], []
        for item in batch:
            texts.append(item[0])
            labels.append(item[1])
        inputs = tokenizer(texts, max_length=128, padding="max_length", truncation=True, return_tensors="pt")
        inputs["labels"] = torch.tensor(labels)
        return inputs

    trainloader = DataLoader(trainset, batch_size=32, collate_fn=collate_func, shuffle=True)
    validloader = DataLoader(validset, batch_size=64, collate_fn=collate_func, shuffle=False)

    return trainloader, validloader


def prepare_model_and_optimizer():

    model = BertForSequenceClassification.from_pretrained("/gemini/code/model")

    lora_config = LoraConfig(target_modules=["query", "key", "value"])

    model = get_peft_model(model, lora_config)

    model.print_trainable_parameters()

    optimizer = Adam(model.parameters(), lr=2e-5)

    return model, optimizer


def evaluate(model, validloader, accelerator: Accelerator):
    model.eval()
    acc_num = 0
    with torch.inference_mode():
        for batch in validloader:
            output = model(**batch)
            pred = torch.argmax(output.logits, dim=-1)
            pred, refs = accelerator.gather_for_metrics((pred, batch["labels"]))
            acc_num += (pred.long() == refs.long()).float().sum()
    return acc_num / len(validloader.dataset)


def train(model, optimizer, trainloader, validloader, accelerator: Accelerator, resume, epoch=3, log_step=10):
    global_step = 0
    start_time = time.time()

    resume_step = 0
    resume_epoch = 0

    if resume is not None: # 断点续训
        accelerator.load_state(resume)
        steps_per_epoch = math.ceil(len(trainloader) / accelerator.gradient_accumulation_steps)
        resume_step = global_step = int(resume.split("step_")[-1])
        resume_epoch = resume_step // steps_per_epoch
        resume_step -= resume_epoch * steps_per_epoch
        accelerator.print(f"resume from checkpoint -> {resume}")

    for ep in range(resume_epoch, epoch):
        model.train()
        if resume and ep == resume_epoch and resume_step != 0: # 断点续训
            active_dataloader = accelerator.skip_first_batches(trainloader, resume_step * accelerator.gradient_accumulation_steps)
        else:
            active_dataloader = trainloader
        for batch in active_dataloader:
            with accelerator.accumulate(model): # 梯度累计
                optimizer.zero_grad()
                output = model(**batch)
                loss = output.loss
                accelerator.backward(loss) # accelerator的backward
                optimizer.step()

                if accelerator.sync_gradients:# 梯度累计时更新step
                    global_step += 1

                    if global_step % log_step == 0:
                        loss = accelerator.reduce(loss, "mean")
                        accelerator.print(f"ep: {ep}, global_step: {global_step}, loss: {loss.item()}")
                        accelerator.log({"loss": loss.item()}, global_step) # 日志

                    if global_step % 50 == 0 and global_step != 0:
                        accelerator.print(f"save checkpoint -> step_{global_step}")
                        accelerator.save_state(accelerator.project_dir + f"/step_{global_step}") # 断点保存
                        accelerator.unwrap_model(model).save_pretrained( # 模型保存
                            save_directory=accelerator.project_dir + f"/step_{global_step}/model",
                            is_main_process=accelerator.is_main_process,
                            state_dict=accelerator.get_state_dict(model),
                            save_func=accelerator.save
                        )
        acc = evaluate(model, validloader, accelerator)
        accelerator.print(f"ep: {ep}, acc: {acc}, time: {time.time() - start_time}")
        accelerator.log({"acc": acc}, global_step) # 日志

    accelerator.end_training()


def main():

    accelerator = Accelerator(gradient_accumulation_steps=2, log_with="tensorboard", project_dir="ckpts") #初始化accelerator

    accelerator.init_trackers("runs") #初始化日志

    trainloader, validloader = prepare_dataloader()

    model, optimizer = prepare_model_and_optimizer()

    model, optimizer, trainloader, validloader = accelerator.prepare(model, optimizer, trainloader, validloader) # accelerator多卡处理

    train(model, optimizer, trainloader, validloader, accelerator, resume="/gemini/code/ckpts/step_150")


if __name__ == "__main__":
    main()

### accelerate集成deepspeed（日常使用推荐）

! accelerate config 指定一下是否使用deepspeed以及对应json

deepspeed: https://huggingface.co/docs/accelerate/main/en/usage_guides/deepspeed