# 使用Align-Anything框架进行文本到文本的DPO训练

这个教程介绍如何使用Align-Anything框架对文本模型进行直接偏好优化(DPO)训练。

# 准备工作

- Align-Anything已安装。
- 一个文本到文本偏好数据集，在本教程中，我们使用align_anything_t2t数据集。
- Qwen2.5-0.5B-Instruct模型，已经下载到本地。
- 一个至少有8GB内存的GPU。

> DPO训练需要同时加载策略模型和参考模型，因此内存需求较高。

## 导入必要的库

In [2]:
import os
import torch
import torch.nn.functional as F
from tqdm import tqdm
from collections import deque
import numpy as np

from torch.utils.data import DataLoader
from torch.utils.data.sampler import RandomSampler
from torch.optim import AdamW

from align_anything.models.pretrained_model import load_pretrained_models
from align_anything.datasets.text_to_text.preference import PreferenceDataset
from align_anything.configs.template import ChatTemplate
from align_anything.utils.multi_process import get_current_device
from align_anything.utils.tools import gather_log_probabilities

## 加载预训练模型

DPO训练需要两个模型：
1. **策略模型(Policy Model)**: 需要训练的模型
2. **参考模型(Reference Model)**: 用于计算KL散度的固定模型

我们将使用Qwen2.5-0.5B-Instruct模型。

In [3]:
# 模型路径
model_path = "./Qwen2.5-0.5B-Instruct"

# 加载策略模型（需要训练的模型）
policy_model, tokenizer, processor = load_pretrained_models(
    model_path,
    model_max_length=4096,
    padding_side='left',  # DPO训练通常使用左填充
    trust_remote_code=True,
)

# 加载参考模型（固定不变的模型）
reference_model, _, _ = load_pretrained_models(
    model_path,
    model_max_length=4096,
    padding_side='left',
    trust_remote_code=True,
)

# 将模型移动到GPU
device = get_current_device()
policy_model = policy_model.to(device)
reference_model = reference_model.to(device)

# 参考模型设置为评估模式，不需要梯度
reference_model.eval()
for param in reference_model.parameters():
    param.requires_grad = False

print(f"模型已加载到设备: {device}")
print(f"策略模型参数量: {sum(p.numel() for p in policy_model.parameters() if p.requires_grad):,}")
print(f"参考模型参数量: {sum(p.numel() for p in reference_model.parameters()):,}")

  resize_tokenizer_embedding(tokenizer=tokenizer, model=model)
  resize_tokenizer_embedding(tokenizer=tokenizer, model=model)


模型已加载到设备: cuda:0
策略模型参数量: 494,032,768
参考模型参数量: 494,032,768


## 设置优化器

DPO训练通常使用较小的学习率，因为我们是在已经训练好的模型基础上进行微调。

In [4]:
# 初始化优化器，使用较小的学习率
optimizer = AdamW(policy_model.parameters(), lr=5e-7, weight_decay=0.01)

print(f"优化器已初始化，学习率: {optimizer.param_groups[0]['lr']}")

优化器已初始化，学习率: 5e-07


## 配置 Chat Template

我们使用 HOMEWORK 模板来格式化偏好数据。这个模板专门为我们的数据集设计。

In [5]:
# 创建聊天模板
train_template = ChatTemplate(
    formatter=tokenizer,
    template="HOMEWORK",
)

print("Chat Template 已配置")

Chat Template 已配置


## 创建偏好数据集

偏好数据集包含问题和两个回答（更好的和更差的），用于DPO训练。

In [6]:
# 初始化训练数据集
train_dataset = PreferenceDataset(
    path="./align_anything_t2t",  # 数据集路径
    template=train_template,
    tokenizer=tokenizer,
    processor=processor,
    split="train",
    size=1000,  # 限制为1000个样本用于演示
)

print(f"训练数据集已加载，包含 {len(train_dataset)} 个样本")

# 查看一个数据样本
sample = train_dataset[0]
print("\n数据样本示例:")
print(f"更好的对话: {sample['better_conversation'][:200]}...")
print(f"更差的对话: {sample['worse_conversation'][:200]}...")
print(f"更好回答长度: {sample['better_response_lens']}")
print(f"更差回答长度: {sample['worse_response_lens']}")

Filtering valid indices: 100%|██████████| 1000/1000 [00:00<00:00, 3737.37it/s]

训练数据集已加载，包含 1000 个样本

数据样本示例:
更好的对话: <|im_start|>system
You are Qwen, created by Alibaba Cloud. You are a helpful assistant.<|im_end|>
<|im_start|>user
Can a Muslim person use pork flavoring?<|im_end|>
<|im_start|>assistant
No, a Muslim ...
更差的对话: <|im_start|>system
You are Qwen, created by Alibaba Cloud. You are a helpful assistant.<|im_end|>
<|im_start|>user
Can a Muslim person use pork flavoring?<|im_end|>
<|im_start|>assistant
No, a Muslim ...
更好回答长度: 92
更差回答长度: 56





## 设定 DataLoader

DataLoader 将处理批量、打乱和加载偏好数据。

In [7]:
# 创建训练数据加载器
train_dataloader = DataLoader(
    train_dataset,
    collate_fn=train_dataset.get_collator(),  # 使用数据集的自定义collate函数
    sampler=RandomSampler(train_dataset),     # 随机采样数据
    batch_size=1,                             # 每次处理一个样本
)

print(f"DataLoader 已创建，批次大小: 1")
print(f"总批次数: {len(train_dataloader)}")

DataLoader 已创建，批次大小: 1
总批次数: 1000


## DPO损失函数

DPO的核心是比较策略模型和参考模型在更好/更差回答上的对数概率差异。

In [8]:
def strip_pad(seq: torch.Tensor, pad_token_id: int):
    """移除序列中的填充token"""
    return seq[seq != pad_token_id]

def compute_log_probs(model, batch, tokenizer):
    """计算给定序列的对数概率"""
    # 移除meta_info，只保留模型输入
    model_inputs = {k: v for k, v in batch.items() if k != 'meta_info'}
    
    with torch.no_grad() if model == reference_model else torch.enable_grad():
        logits = model(**model_inputs).logits
    
    device = logits.device
    input_ids = batch['input_ids']
    batch_size = len(batch['meta_info']['response_lens'])
    logprob_list = []
    
    for idx in range(batch_size):
        response_length = batch['meta_info']['response_lens'][idx]
        raw_input_id = strip_pad(input_ids[idx], tokenizer.pad_token_id)
        logit = logits[idx][-response_length:].unsqueeze(0)
        input_id = raw_input_id[-response_length:].unsqueeze(0)
        log_p = gather_log_probabilities(logit[:, :-1], input_id[:, 1:])
        logprob_list.append(log_p.squeeze(0))
    
    return torch.nn.utils.rnn.pad_sequence(
        logprob_list, batch_first=True, padding_value=0.0
    ).to(device)

def dpo_loss(batch, scale_coeff=0.1):
    """计算DPO损失"""
    # 计算策略模型的对数概率
    sequence_log_probs = compute_log_probs(policy_model, batch, tokenizer)
    better_sequence_log_probs, worse_sequence_log_probs = sequence_log_probs.chunk(chunks=2, dim=0)
    
    # 计算参考模型的对数概率
    with torch.no_grad():
        ref_sequence_log_probs = compute_log_probs(reference_model, batch, tokenizer)
        ref_better_sequence_log_probs, ref_worse_sequence_log_probs = ref_sequence_log_probs.chunk(chunks=2, dim=0)
    
    losses = []
    better_sample_rewards = []
    worse_sample_rewards = []
    
    batch_size = better_sequence_log_probs.size(0)
    for i in range(batch_size):
        better_log_prob = better_sequence_log_probs[i, :].sum(dim=-1)
        worse_log_prob = worse_sequence_log_probs[i, :].sum(dim=-1)
        ref_better_log_prob = ref_better_sequence_log_probs[i, :].sum(dim=-1)
        ref_worse_log_prob = ref_worse_sequence_log_probs[i, :].sum(dim=-1)
        
        better_log_ratio = better_log_prob - ref_better_log_prob
        worse_log_ratio = worse_log_prob - ref_worse_log_prob
        
        losses.append(
            -F.logsigmoid(scale_coeff * (better_log_ratio - worse_log_ratio))
        )
        better_sample_rewards.append(scale_coeff * better_log_ratio.detach())
        worse_sample_rewards.append(scale_coeff * worse_log_ratio.detach())
    
    loss = torch.stack(losses).mean()
    better_sample_reward = torch.stack(better_sample_rewards)
    worse_sample_reward = torch.stack(worse_sample_rewards)
    reward_accuracy = (better_sample_reward > worse_sample_reward).float().mean()
    reward_margin = (better_sample_reward - worse_sample_reward).mean()
    
    return {
        'loss': loss,
        'better_sample_reward': better_sample_reward.mean(),
        'worse_sample_reward': worse_sample_reward.mean(),
        'reward_accuracy': reward_accuracy,
        'reward_margin': reward_margin,
    }

print("DPO损失函数已定义")

DPO损失函数已定义


## DPO训练循环

现在我们开始DPO训练。我们将训练几个epoch并监控各种指标。

In [9]:
# 训练配置
epochs = 3
scale_coeff = 0.1  # DPO的缩放系数
window_size = 100  # 指标统计step间隔
save_dir = './qwen_2_5_dpo_output'
os.makedirs(save_dir, exist_ok=True)

# 训练指标记录
progress_bar = tqdm(range(epochs * len(train_dataloader)), desc="DPO Training")
losses = []
reward_accuracies = []
reward_margins = []


print(f"开始DPO训练，共 {epochs} 个epoch")
print(f"缩放系数: {scale_coeff}")
print(f"模型保存路径: {save_dir}")

for epoch in range(epochs):
    progress_bar.set_description(f"Epoch {epoch+1}/{epochs}")
    policy_model.train()

    epoch_step = 0
    epoch_losses = []
    epoch_accuracies = []
    epoch_margins = []
    
    for batch_idx, batch in enumerate(train_dataloader):
        # 计算DPO损失
        loss_dict = dpo_loss(batch, scale_coeff=scale_coeff)
        loss = loss_dict['loss']
        
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        
        # 梯度裁剪
        torch.nn.utils.clip_grad_norm_(policy_model.parameters(), max_norm=1.0)
        
        optimizer.step()
        
        # 记录指标
        epoch_step += 1
        epoch_losses.append(loss.item())
        epoch_accuracies.append(loss_dict['reward_accuracy'].item())
        epoch_margins.append(loss_dict['reward_margin'].item())

        if epoch_step % window_size == 0:
            losses.append(np.mean(epoch_losses[-window_size:]))
            reward_accuracies.append(np.mean(epoch_accuracies[-window_size:]))
            reward_margins.append(np.mean(epoch_margins[-window_size:]))
        
        progress_bar.update(1)
        
        # 更新进度条显示
        progress_bar.set_postfix({
            'loss': f"{np.mean(losses):.4f}",
            'acc': f"{np.mean(reward_accuracies):.3f}",
            'margin': f"{np.mean(reward_margins):.4f}",
            'lr': f"{optimizer.param_groups[0]['lr']:.2e}"
        })
        
    # Epoch结束后的统计
    avg_loss = np.mean(epoch_losses)
    avg_accuracy = np.mean(epoch_accuracies)
    avg_margin = np.mean(epoch_margins)
    
    print(f"\nEpoch {epoch+1} 完成:")
    print(f"  平均损失: {avg_loss:.4f}")
    print(f"  平均奖励准确率: {avg_accuracy:.3f}")
    print(f"  平均奖励边际: {avg_margin:.4f}")
    
    # 保存模型
    epoch_save_dir = os.path.join(save_dir, f'epoch_{epoch+1}')
    os.makedirs(epoch_save_dir, exist_ok=True)
    
    policy_model.save_pretrained(epoch_save_dir)
    tokenizer.save_pretrained(epoch_save_dir)
    
    print(f"  模型已保存到: {epoch_save_dir}")

print("\nDPO训练完成！")

Epoch 1/3:   0%|          | 0/3000 [00:00<?, ?it/s]   

开始DPO训练，共 3 个epoch
缩放系数: 0.1
模型保存路径: ./qwen_2_5_dpo_output


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
Epoch 1/3:  33%|███▎      | 1000/3000 [02:07<04:22,  7.63it/s, loss=0.6936, acc=0.354, margin=0.0011, lr=5.00e-07]


Epoch 1 完成:
  平均损失: 0.6936
  平均奖励准确率: 0.354
  平均奖励边际: 0.0011


Epoch 2/3:  33%|███▎      | 1001/3000 [02:09<26:08,  1.27it/s, loss=0.6936, acc=0.354, margin=0.0011, lr=5.00e-07]

  模型已保存到: ./qwen_2_5_dpo_output/epoch_1


Epoch 2/3:  67%|██████▋   | 2000/3000 [04:11<01:54,  8.77it/s, loss=0.6895, acc=0.399, margin=0.0095, lr=5.00e-07]


Epoch 2 完成:
  平均损失: 0.6854
  平均奖励准确率: 0.443
  平均奖励边际: 0.0180


Epoch 3/3:  67%|██████▋   | 2001/3000 [04:13<11:32,  1.44it/s, loss=0.6895, acc=0.399, margin=0.0095, lr=5.00e-07]

  模型已保存到: ./qwen_2_5_dpo_output/epoch_2


Epoch 3/3: 100%|██████████| 3000/3000 [06:20<00:00,  8.36it/s, loss=0.6872, acc=0.421, margin=0.0144, lr=5.00e-07]


Epoch 3 完成:
  平均损失: 0.6827
  平均奖励准确率: 0.465
  平均奖励边际: 0.0242
  模型已保存到: ./qwen_2_5_dpo_output/epoch_3

DPO训练完成！


## 训练结果分析

让我们分析训练过程中的指标变化。

In [None]:
import matplotlib.pyplot as plt

# 绘制训练指标
fig, axes = plt.subplots(1, 3, figsize=(15, 10))

# 损失曲线
axes[0].plot(losses)
axes[0].set_title('DPO Loss')
axes[0].set_xlabel('Steps')
axes[0].set_ylabel('Loss')
axes[0].grid(True)

# 奖励准确率
axes[1].plot(reward_accuracies)
axes[1].set_title('Reward Accuracy')
axes[1].set_xlabel('Steps')
axes[1].set_ylabel('Accuracy')
axes[1].grid(True)

# 奖励边际
axes[2].plot(reward_margins)
axes[2].set_title('Reward Margin')
axes[2].set_xlabel('Steps')
axes[2].set_ylabel('Margin')
axes[2].grid(True)

plt.tight_layout()
plt.savefig(os.path.join(save_dir, 'training_metrics.png'), dpi=300, bbox_inches='tight')
plt.show()

## 模型测试

让我们测试训练后的模型，看看它的表现如何。

In [12]:
def generate_response(model, tokenizer, prompt, max_length=512):
    """使用模型生成回答"""
    model.eval()
    
    # 格式化输入
    messages = [{'role': 'user', 'content': prompt}]
    text = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )
    
    # 编码输入
    inputs = tokenizer(text, return_tensors='pt').to(device)
    
    # 生成回答
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=max_length,
            do_sample=True,
            temperature=0.7,
            top_p=0.9,
            pad_token_id=tokenizer.eos_token_id
        )
    
    # 解码输出
    response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True)
    return response.strip()

# 测试问题
test_prompts = [
    "Please explain what machine learning is?",
    "How to learn programming?",
    "What is deep learning?",
    "Please introduce the Python programming language.",
    "How to improve writing skills?"
]

print("=== 模型测试 ===")
print("\n比较训练前后的模型回答：\n")

for i, prompt in enumerate(test_prompts[:3]):  # 只测试前3个问题
    print(f"问题 {i+1}: {prompt}")
    print("-" * 50)
    
    # 参考模型（训练前）的回答
    ref_response = generate_response(reference_model, tokenizer, prompt)
    print(f"参考模型回答: {ref_response}")
    print()
    
    # 策略模型（训练后）的回答
    policy_response = generate_response(policy_model, tokenizer, prompt)
    print(f"训练后模型回答: {policy_response}")
    print("=" * 80)
    print()

=== 模型测试 ===

比较训练前后的模型回答：

问题 1: Please explain what machine learning is?
--------------------------------------------------
参考模型回答: Machine Learning (ML) is an interdisciplinary field that focuses on the development of algorithms and statistical models that computer systems can use to enable computers to learn from and improve from data without being explicitly programmed. This means that ML allows machines to "learn" from experience or data in order to make predictions or decisions based on those experiences.

Key aspects of Machine Learning include:

1. **Data**: The raw information used for training a model.
2. **Model**: A set of rules or equations that describes how the model learns from its training data.
3. **Algorithm**: A specific type of rule-based model designed to solve a particular problem.
4. **Evaluation**: A way to measure the performance of a model against real-world data.
5. **Training**: The process where the algorithm learns from the given data.
6. **Testing**: Th