# 5-DPO

直接偏好优化（Direct Preference Optimization，DPO）是后训练阶段中，使用正反样例激励大模型产生符合人类偏好的回答的策略，为人类反馈强化学习（Reinforcement Learning from Human Feedback, RLHF）提供了一个高效简化的替代方案。通过这一阶段的训练，大模型将会学会依照人类的喜好生成回复.

在这个笔记本中，我们仅对 DPO 的训练流程进行展示和学习，因此只给出必要的代码片段，如 wandb 和 ddp 不会在此笔记本中涉及.

此笔记本的完整实现见主仓库 `/minimind/train_dpo.py`

In [1]:
# 导入依赖
import os
import platform
import argparse
import time
import math
import warnings

import pandas as pd
import torch
import torch.nn.functional as F
import torch.distributed as dist
from contextlib import nullcontext

from torch import optim, nn
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader, DistributedSampler
from transformers import AutoTokenizer, AutoModelForCausalLM
from model.model import MiniMindLM
from model.LMConfig import LMConfig
from model.dataset import DPODataset

In [2]:
warnings.filterwarnings('ignore')

## 可选参数设置

首先，查看训练的可选参数，这些参数在实际使用时通过命令行导入，为了保持笔记本的易用性，选择用 class 进行包装.

In [3]:
class args:
    epochs: int = 5 # 训练轮数，延续 sft 基础上微调
    batch_size: int = 2 # pretrain 数据集仅两个样本，设置 batch 为 2
    # sft阶段学习率为 「5e-6」->「5e-7」长度512，建议离线正负样本「概率」偏好对齐阶段lr <=「1e-8」长度3000，否则很容易遗忘训坏
    learning_rate: float = 5e-4 # 学习率
    device: str = 'cuda' if torch.cuda.is_available() else 'cpu'
    dtype: str = 'bfloat16' # 16 bit 浮点数：8 bit 指数 + 7 bit 尾数
    # use_wandb: bool = False # 是否使用 wandb 我们不使用
    wandb_project: str = 'MiniMind-Notebook'
    num_workers: int = 1 # 工作进程数
    # ddp：bool = False # 单机多卡
    accumulation_steps: int = 1 # 梯度累积步数
    grad_clip: float = 1.0 # 梯度剪裁
    warmup_iters: int = 0 # 学习率热启动
    log_interval: int = 1 # 每一步打印日志 仅用于观察
    local_rank: int = 1 # device 设备号
    dim: int = 512 # 词嵌入维度 模型超参数
    n_layers: int = 1 # MiniMind Block 数量 模型超参数 | 由于 dpo 要加载两个模型 我们出于演示目的设定 n_layers = 1
    max_seq_len: int = 512 # 序列长度阈值
    use_moe: bool = False # 是否启用混合专家
    data_path: str = './toydata/dpo_data.jsonl' # 数据集路径
    save_dir: str = "./output"  # 模型保存目录
    save_weight: str = "minimind_dpo"  # checkpoint 文件前缀
    save_interval: int = 1  # 每多少步保存一次模型，0表示不保存 我们这里只展示训练过程（可选择的保存模型，建议先保存）

In [4]:
print(f'查看工作设备 {args.device}')

查看工作设备 cuda


## 初始化训练

接下来，我们对一些重要模块进行初始化，我们已经了解过，分词器，模型和数据集是大模型的基本组件，我们对其进行初始化.

> 在这一阶段 我们调整的是大模型的问答偏好 因此与 sft 阶段同理 我们需要载入在 sft 阶段微调好的问答模型

---

actor 模型 vs ref 参考模型：
   - `model`（actor模型）：是后续训练的核心模型，参数可更新，用于学习优化（如RLHF中的策略模型）。
   - `ref_model`（参考模型）：参数固定，用于和actor模型的输出对比，计算奖励/优势值，避免模型训练偏离SFT阶段的基础能力。

---

In [5]:
def init_model(lm_config):
    tokenizer = AutoTokenizer.from_pretrained('./model/minimind_tokenizer')
    # 初始化actor模型
    model = MiniMindLM(lm_config)
    moe_path = '_moe' if lm_config.use_moe else ''
    ckp = f'./output/minimind_sft_{lm_config.dim}{moe_path}.pth' # 指示上一阶段训练保存的模型文件位置
    state_dict = torch.load(ckp, map_location=args.device) # 载入模型状态字典
    model.load_state_dict(state_dict, strict=False) # 装入模型
    # 初始化参考模型
    ref_model = MiniMindLM(lm_config)
    ref_model.load_state_dict(state_dict, strict=False)
    ref_model.eval()
    ref_model.requires_grad_(False)

    print(f'LLM总参数量：{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万')
    model = model.to(args.device)
    ref_model = ref_model.to(args.device)

    return model, ref_model, tokenizer

In [6]:
lm_config = LMConfig(dim=args.dim, n_layers=args.n_layers, max_seq_len=args.max_seq_len, use_moe=args.use_moe)
model, ref_model ,tokenizer = init_model(lm_config)

# 构建数据集和数据加载器
train_ds = DPODataset(args.data_path, tokenizer, max_length=lm_config.max_seq_len)

train_loader = DataLoader(
    train_ds,
    batch_size=args.batch_size,
    pin_memory=True,  # 锁页内存，加速数据传输
    drop_last=False,  # 是否丢弃最后一个不完整的批次
    shuffle=False,  # DPO 训练不需要打乱数据 
    num_workers=args.num_workers,  # 工作进程数，0表示在主进程中加载数据
)

print(f'模型位于设备：{model.device}, 词表长度：{tokenizer.vocab_size}, DataLoader：{train_loader}')

LLM总参数量：6.096 百万
模型位于设备：cuda:0, 词表长度：6400, DataLoader：<torch.utils.data.dataloader.DataLoader object at 0x000001EEE57FE190>


In [7]:
loader = iter(train_loader)
print(f'打印一个 iter 的数据:\n{next(loader)}\n')
print(f'数据集大小：{len(train_ds)}, DataLoader 大小：{len(loader)}')

打印一个 iter 的数据:
{'x_chosen': tensor([[  1,  85, 736,  ...,   0,   0,   0],
        [  1,  85, 736,  ...,   0,   0,   0]]), 'y_chosen': tensor([[ 85, 736, 201,  ...,   0,   0,   0],
        [ 85, 736, 201,  ...,   0,   0,   0]]), 'mask_chosen': tensor([[0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0]]), 'x_rejected': tensor([[  1,  85, 736,  ...,   0,   0,   0],
        [  1,  85, 736,  ...,   0,   0,   0]]), 'y_rejected': tensor([[ 85, 736, 201,  ...,   0,   0,   0],
        [ 85, 736, 201,  ...,   0,   0,   0]]), 'mask_rejected': tensor([[0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0]])}

数据集大小：2, DataLoader 大小：1


我们发现，train loader 的每一个 iter 都包含一个拥有六个键值对的字典，这是因为 train_dataset 每一次取数据都会返回:

- chosen 样本 X: 包含 \<bos> 在内的输入 content
- chosen 标签 Y: 包含 \<eos> 在内的输出 content
- chosen 掩码 loss_mask: 指示需要计算损失的 token 位置
- rejected 样本 X: 包含 \<bos> 在内的输入 content
- rejected 标签 Y: 包含 \<eos> 在内的输出 content
- rejected 掩码 loss_mask: 指示需要计算损失的 token 位置

由于我们的数据集只有两条数据，而 batch size 设置为 2，因此我们的 dataloader 只有一个 iter.

# 启动训练

训练一个深度学习模型，还涉及到了优化器，损失函数和学习率调度. 接下来，我们查看 MiniMind 训练部分的代码，并进行一轮简单的训练.

> DPO 阶段涉及 DPO 损失函数涉及 因此与前两个阶段相比内容略有增加 不过整体流程与逻辑类似

In [8]:
# 学习率调度方面 采用余弦退火学习率
def get_lr(current_step, total_steps, lr):
    return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))

# 优化器方面 选择 AdamW 优化器 并在混精度场景下创建 scaler 进行梯度缩放避免数值下溢
scaler = torch.amp.GradScaler('cuda', enabled=(args.dtype in ['float16', 'bfloat16']))  # 专门解决混合精度训练中的数值下溢问题
optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate)  # AdamW 优化器

device_type = "cuda" if "cuda" in args.device else "cpu"
print(f'设备类型：{device_type}')
# 根据指定的数据类型设置混精度训练的 dtype，以下步骤为不可缺少的混精度训练准备工作
if args.dtype == 'bfloat16':
    amp_dtype = torch.bfloat16
elif args.dtype == 'float16':
    amp_dtype = torch.float16
else:
    amp_dtype = torch.float32  # 默认为 FP32
print(f'使用混精度训练，数据类型：{amp_dtype}')
# 在 cuda 上启动混精度训练，否则空白上下文
autocast_ctx = nullcontext() if device_type == "cpu" else torch.amp.autocast(device_type='cuda', dtype=amp_dtype) 

设备类型：cuda
使用混精度训练，数据类型：torch.bfloat16


DPO 的原理是增加偏好样本的对数概率与减小非偏好样本响应的对数概率.

该阶段引入 DPO 损失函数，通过计算选择样本和拒绝样本的对数比率，然后基于这些比率计算 DPO 损失，适用于偏好学习任务.

---

DPO损失的数学公式：
$$\text{loss} = -\log\left(\sigma\left(\beta \cdot \left( (\log \pi_{\theta}(y_c|x) - \log \pi_{\theta}(y_r|x)) - (\log \pi_{\text{ref}}(y_c|x) - \log \pi_{\text{ref}}(y_r|x)) \right)\right)\right)$$
其中：
- $\sigma$ 是sigmoid函数
- $\beta$ 是温度系数
- $y_c$ 是chosen回答，$y_r$ 是rejected回答
- $\pi_\theta$ 是策略actor模型，$\pi_{\text{ref}}$ 是参考ref模型

简单理解：
- 如果策略模型认为chosen比rejected好（$\pi_{\theta}(y_c) > \pi_{\theta}(y_r)$），且优势超过参考模型，那么`logits`为正，`log_sigmoid`接近0，损失接近0；
- 如果策略模型偏好错误，`logits`为负，`log_sigmoid`为负数，损失会变大，驱动模型调整参数。

---

In [9]:
def logits_to_log_probs(logits, labels):
    # logits shape: (batch_size, seq_len, vocab_size)
    # labels shape: (batch_size, seq_len)
    # log_probs shape: (batch_size, seq_len)
    log_probs = F.log_softmax(logits, dim=2)  # 得到每个 token 的 log 概率分布
    # labels.unsqueeze(2)：把labels从(batch_size, seq_len)变成(batch_size, seq_len, 1)，增加维度匹配log_probs
    # torch.gather：按dim=2维度，根据index（标签id）提取对应位置的值，结果shape=(batch_size, seq_len, 1)
    # squeeze(-1)：去掉最后一个维度，变回(batch_size, seq_len)
    # 这一步的目的是从每个 token 的 log 概率分布中提取出对应标签的 log 概率值，得到每个 token 的 log 概率
    log_probs_per_token = torch.gather(log_probs, dim=2, index=labels.unsqueeze(2)).squeeze(-1)
    return log_probs_per_token

# DPO 损失函数
def dpo_loss(ref_log_probs, policy_log_probs, mask, beta):
    # ref_log_probs,policy_log_probs和mask的 shape: (batch_size, seq_len)
    seq_lengths = mask.sum(dim=1, keepdim=True).clamp_min(1e-8)  # 计算每个序列的有效长度，防止零长度mask导致除零NaN
    # 计算参考模型每个序列的平均 log 概率，shape=(batch_size,)
    ref_log_probs = (ref_log_probs * mask).sum(dim=1) / seq_lengths.squeeze()  
    # 计算策略模型每个序列的平均 log 概率，shape=(batch_size,)
    policy_log_probs = (policy_log_probs * mask).sum(dim=1) / seq_lengths.squeeze() 

    # 拆分chosen和rejected样本（DPO数据格式要求：前半是chosen，后半是rejected）
    batch_size = ref_log_probs.shape[0]
    chosen_ref_log_probs = ref_log_probs[:batch_size // 2]
    reject_ref_log_probs = ref_log_probs[batch_size // 2:]
    chosen_policy_log_probs = policy_log_probs[:batch_size // 2]
    reject_policy_log_probs = policy_log_probs[batch_size // 2:]

    pi_logratios = chosen_policy_log_probs - reject_policy_log_probs  # 策略模型对chosen的偏好 - 对rejected的偏好（越大越好）
    ref_logratios = chosen_ref_log_probs - reject_ref_log_probs  # 参考模型对chosen的偏好 - 对rejected的偏好（基准）
    logits = pi_logratios - ref_logratios  # 策略模型相对参考模型的偏好优势（目标：让这个值越大越好）
    loss = -F.logsigmoid(beta * logits)  # 维度为 (batch_size//2,) 
    return loss.mean()

接下来，我们来看看 MiniMind 的训练函数

In [10]:
def train_epoch(epoch, loader, iters, ref_model, lm_config, start_step=0, wandb=None, beta=0.1):
    start_time = time.time()
    
    for step, batch in enumerate(loader, start=start_step + 1):
        # 将数据移动到指定设备
        x_chosen = batch['x_chosen'].to(args.device)
        x_rejected = batch['x_rejected'].to(args.device)
        y_chosen = batch['y_chosen'].to(args.device)
        y_rejected = batch['y_rejected'].to(args.device)
        mask_chosen = batch['mask_chosen'].to(args.device)
        mask_rejected = batch['mask_rejected'].to(args.device)
        # 将 chosen 和 rejected 数据合并，形成一个批次进行模型前向计算
        x = torch.cat([x_chosen, x_rejected], dim=0)
        y = torch.cat([y_chosen, y_rejected], dim=0)
        mask = torch.cat([mask_chosen, mask_rejected], dim=0)

        lr = get_lr(epoch * iters + step, args.epochs * iters, args.learning_rate)
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

        with autocast_ctx:
            # 参考模型不计算梯度
            with torch.no_grad():
                ref_outputs = ref_model(x)  # 参考模型前向计算，得到 logits
                ref_logits = ref_outputs.logits
            ref_log_probs = logits_to_log_probs(ref_logits, y)  # 计算参考模型的 log 概率
            # 策略模型需要计算梯度
            outputs = model(x)
            logits = outputs.logits
            policy_log_probs = logits_to_log_probs(logits, y)  # 计算策略模型的 log 概率
            
            dpo_loss_val = dpo_loss(ref_log_probs, policy_log_probs, mask, beta=beta)
            loss = dpo_loss_val + outputs.aux_loss
            loss = loss / args.accumulation_steps

        scaler.scale(loss).backward()

        if (step + 1) % args.accumulation_steps == 0:
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip)
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad(set_to_none=True)

        if step % args.log_interval == 0 or step == iters - 1:
            spend_time = time.time() - start_time 
            current_loss = loss.item() * args.accumulation_steps  # 恢复到未缩放的损失值
            current_dpo_loss = dpo_loss_val.item()  # DPO 损失值
            current_aux_loss = outputs.aux_loss  # 额外损失值
            current_lr = optimizer.param_groups[-1]['lr']
            eta_min = spend_time / (step + 1) * iters // 60 - spend_time // 60
            
            print(
                f'Epoch:[{epoch + 1}/{args.epochs}]({step}/{iters}), '
                f'loss: {current_loss:.4f}, '
                f'dpo_loss: {current_dpo_loss:.4f}, '
                f'aux_loss: {current_aux_loss:.4f}, '
                f'learning_rate: {current_lr:.8f}, '
                f'epoch_time: {eta_min:.3f}min'
            )
            
            if wandb: 
                wandb.log({
                    "loss": current_loss, 
                    "dpo_loss": current_dpo_loss, 
                    "aux_loss": current_aux_loss, 
                    "learning_rate": current_lr, 
                    "epoch_time": eta_min
                })

        # 到达指定保存步数时，保存模型（仅主进程）
        if args.save_interval > 0 and (step % args.save_interval == 0 or step == iters - 1):
            if not dist.is_initialized() or dist.get_rank() == 0:
                os.makedirs(args.save_dir, exist_ok=True)  # 确保保存目录存在
                model.eval()
                moe_suffix = '_moe' if lm_config.use_moe else ''
                ckp = f'{args.save_dir}/{args.save_weight}_{lm_config.dim}{moe_suffix}.pth'
                raw_model = model.module if isinstance(model, DistributedDataParallel) else model
                raw_model = getattr(raw_model, '_orig_mod', raw_model)
                state_dict = raw_model.state_dict()
                torch.save({k: v.half().cpu() for k, v in state_dict.items()}, ckp)
                print(f'模型已保存至：{ckp}')
                model.train()
                del state_dict

        del x_chosen, x_rejected, y_chosen, y_rejected, mask_chosen, mask_rejected, x, y, mask
        del ref_outputs, ref_logits, ref_log_probs, outputs, logits, policy_log_probs, loss

准备完毕，我们尝试一轮长度 1 个 iter 的训练.

In [11]:
iter_per_epoch = len(train_loader)
for epoch in range(args.epochs):
    train_epoch(epoch, train_loader, iter_per_epoch, ref_model, lm_config)
print('dpo训练完成！')

Epoch:[1/5](1/1), loss: 0.6931, dpo_loss: 0.6931, aux_loss: 0.0000, learning_rate: 0.00050225, epoch_time: 0.000min
模型已保存至：./output/minimind_dpo_512.pth
Epoch:[2/5](1/1), loss: 0.5893, dpo_loss: 0.5893, aux_loss: 0.0000, learning_rate: 0.00037725, epoch_time: 0.000min
模型已保存至：./output/minimind_dpo_512.pth
Epoch:[3/5](1/1), loss: 0.5061, dpo_loss: 0.5061, aux_loss: 0.0000, learning_rate: 0.00022275, epoch_time: 0.000min
模型已保存至：./output/minimind_dpo_512.pth
Epoch:[4/5](1/1), loss: 0.4426, dpo_loss: 0.4426, aux_loss: 0.0000, learning_rate: 0.00009775, epoch_time: 0.000min
模型已保存至：./output/minimind_dpo_512.pth
Epoch:[5/5](1/1), loss: 0.4131, dpo_loss: 0.4131, aux_loss: 0.0000, learning_rate: 0.00005000, epoch_time: 0.000min
模型已保存至：./output/minimind_dpo_512.pth
dpo训练完成！


In [12]:
del model