# 使用Align-Anything框架进行Text-to-Text DPO训练

完成LLM基础与对齐课**作业2**Bonus: 制作一个text-to-text DPO的ipynb文件

## 准备工作

- Align-Anything成功部署
- 一个偏好数据集，这里使用同作业1作业2的数据集
- 预训练语言模型，如Qwen2.5-0.5B-Instruct
- 2*RTX 4090

## 导入必要的库

In [5]:
import os
import torch
import torch.nn.functional as F
from tqdm import tqdm
from collections import deque
import numpy as np
from torch.utils.data import DataLoader
from torch.utils.data.sampler import RandomSampler
from torch.optim import AdamW

from align_anything.models.pretrained_model import load_pretrained_models
from align_anything.datasets.text_to_text.preference import PreferenceDataset
from align_anything.configs.template import ChatTemplate
from align_anything.utils.multi_process import get_current_device
from align_anything.utils.tools import gather_log_probabilities

## 加载预训练模型

我们加载策略模型和参考模型。在DPO中，参考模型保持不变，策略模型会被优化。

In [6]:
model_name_or_path = "scripts/qwen2_5/Qwen2.5-0.5B-Instruct"

# 加载策略模型
policy_model, tokenizer, processor = load_pretrained_models(
    model_name_or_path,
    model_max_length=2048,
    padding_side='left',
    trust_remote_code=True,
)

# 加载参考模型，即策略模型训练前的状态（与策略模型相同的初始化）
reference_model, _, _ = load_pretrained_models(
    model_name_or_path,
    model_max_length=2048,
    padding_side='left',
    trust_remote_code=True,
)

# 将模型移动到GPU
device = get_current_device()
policy_model = policy_model.to(device)
reference_model = reference_model.to(device)

# 参考模型设置为评估模式，不需要梯度
reference_model.eval()
for param in reference_model.parameters():
    param.requires_grad = False


Sliding Window Attention is enabled but not implemented for `sdpa`; unexpected results may be encountered.
  resize_tokenizer_embedding(tokenizer=tokenizer, model=model)
  resize_tokenizer_embedding(tokenizer=tokenizer, model=model)


## 设置优化器

In [7]:
optimizer = AdamW(policy_model.parameters(), lr=1e-6, weight_decay=0.01)

## 配置Align-Anything特有的Chat Template机制 
使用自己写的HOMEWORK模板来格式化偏好数据。

In [8]:
train_template = ChatTemplate(
    formatter=processor,
    template="HOMEWORK",
)

## 创建偏好数据集

使用PreferDataset类创建偏好数据集，选取train数据集

In [9]:
# 创建偏好数据集
train_dataset = PreferenceDataset(
    path='assets/text_to_text/hw',
    template=train_template,
    tokenizer=tokenizer,
    processor=processor,
    split='train',
    size=1000  # 使用所有3个样本
)

print(f"数据集大小: {len(train_dataset)}")

Filtering valid indices: 100%|██████████| 1000/1000 [00:00<00:00, 3496.51it/s]

数据集大小: 1000





准备DataLoader

In [10]:
# 创建DataLoader
train_dataloader = DataLoader(
    train_dataset,
    collate_fn=train_dataset.get_collator(),
    sampler=RandomSampler(train_dataset),
    batch_size=1,  # 小批次大小
)


## 计算对数概率和损失函数。

In [11]:
def strip_pad(seq: torch.Tensor, pad_token_id: int):
    """移除序列中的填充token"""
    return seq[seq != pad_token_id]

def compute_log_probs(model, batch, tokenizer):
    """计算给定序列的对数概率"""
    # 准备输入（移除meta_info）
    infer_batch = {k: v for k, v in batch.items() if k != 'meta_info'}
    
    # 计算logits
    with torch.no_grad() if model == reference_model else torch.enable_grad():
        logits = model(**infer_batch).logits
    
    device = logits.device
    input_ids = batch['input_ids']
    batch_size = len(batch['meta_info']['response_lens'])
    
    logprob_list = []
    for idx in range(batch_size):
        response_length = batch['meta_info']['response_lens'][idx]
        raw_input_id = strip_pad(input_ids[idx], tokenizer.pad_token_id)
        logit = logits[idx][-response_length:].unsqueeze(0)
        input_id = raw_input_id[-response_length:].unsqueeze(0)
        log_p = gather_log_probabilities(logit[:, :-1], input_id[:, 1:])
        logprob_list.append(log_p.squeeze(0))
    
    return torch.nn.utils.rnn.pad_sequence(
        logprob_list, batch_first=True, padding_value=0.0
    ).to(device)

def dpo_loss(policy_model, reference_model, batch, tokenizer, scale_coeff=1.0):
    """DPO损失函数"""
    # 使用策略模型计算序列的对数概率
    sequence_log_probs = compute_log_probs(policy_model, batch, tokenizer)
    
    # 将序列分成优选和劣选两部分
    better_sequence_log_probs, worse_sequence_log_probs = sequence_log_probs.chunk(chunks=2, dim=0)
    
    # 使用参考模型计算序列的对数概率
    with torch.no_grad():
        ref_sequence_log_probs = compute_log_probs(reference_model, batch, tokenizer)
        ref_better_sequence_log_probs, ref_worse_sequence_log_probs = ref_sequence_log_probs.chunk(chunks=2, dim=0)
    
    losses = []
    better_sample_rewards = []
    worse_sample_rewards = []
    
    batch_size = better_sequence_log_probs.size(0)
    for i in range(batch_size):
        # 计算每个序列的总对数概率
        better_log_prob = better_sequence_log_probs[i, :].sum(dim=-1)
        worse_log_prob = worse_sequence_log_probs[i, :].sum(dim=-1)
        ref_better_log_prob = ref_better_sequence_log_probs[i, :].sum(dim=-1)
        ref_worse_log_prob = ref_worse_sequence_log_probs[i, :].sum(dim=-1)
        
        # 计算策略模型和参考模型之间的对数比率
        better_log_ratio = better_log_prob - ref_better_log_prob
        worse_log_ratio = worse_log_prob - ref_worse_log_prob
        
        # 计算DPO损失
        losses.append(
            -F.logsigmoid(scale_coeff * (better_log_ratio - worse_log_ratio))
        )
        
        # 记录奖励值用于监控
        better_sample_rewards.append(scale_coeff * better_log_ratio.detach())
        worse_sample_rewards.append(scale_coeff * worse_log_ratio.detach())
    
    # 计算批次的平均损失
    loss = torch.stack(losses).mean()
    better_sample_reward = torch.stack(better_sample_rewards)
    worse_sample_reward = torch.stack(worse_sample_rewards)
    reward_accuracy = (better_sample_reward > worse_sample_reward).float().mean()
    reward_margin = (better_sample_reward - worse_sample_reward).mean()
    
    return {
        'loss': loss,
        'better_sample_reward': better_sample_reward.mean(),
        'worse_sample_reward': worse_sample_reward.mean(),
        'reward_accuracy': reward_accuracy,
        'reward_margin': reward_margin,
    }



## 执行DPO训练，监控损失和奖励指标。

In [12]:
# 训练参数
epochs = 1
scale_coeff = 1.0  # DPO缩放系数

# 训练监控
progress_bar = tqdm(range(epochs * len(train_dataloader)), desc="DPO训练进度")
losses = deque(maxlen=100)
reward_accuracies = deque(maxlen=100)
reward_margins = deque(maxlen=100)

# 创建输出目录
os.makedirs('./output', exist_ok=True)

print("开始DPO训练...")

for epoch in range(epochs):
    policy_model.train()
    epoch_losses = []
    epoch_accuracies = []
    epoch_margins = []
    
    for batch_idx, batch in enumerate(train_dataloader):
        # 计算DPO损失
        loss_dict = dpo_loss(policy_model, reference_model, batch, tokenizer, scale_coeff)
        loss = loss_dict['loss']
        
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # 记录指标
        loss_value = loss.item()
        accuracy_value = loss_dict['reward_accuracy'].item()
        margin_value = loss_dict['reward_margin'].item()
        
        losses.append(loss_value)
        reward_accuracies.append(accuracy_value)
        reward_margins.append(margin_value)
        
        epoch_losses.append(loss_value)
        epoch_accuracies.append(accuracy_value)
        epoch_margins.append(margin_value)
        
        # 更新进度条
        progress_bar.update(1)
        progress_bar.set_postfix({
            'loss': f"{np.mean(losses):.4f}",
            'acc': f"{np.mean(reward_accuracies):.3f}",
            'margin': f"{np.mean(reward_margins):.3f}"
        })
    
    # 打印
    print(f"\nEpoch {epoch+1}/{epochs}:")
    print(f"  平均损失: {np.mean(epoch_losses):.4f}")
    print(f"  奖励准确率: {np.mean(epoch_accuracies):.3f}")
    print(f"  奖励边际: {np.mean(epoch_margins):.3f}")
    
# 保存模型
policy_model.save_pretrained(f'./output/epoch_{epochs+1}')
tokenizer.save_pretrained(f'./output/epoch_{epochs+1}')
    
progress_bar.close()
print("\nDPO训练完成")

DPO训练进度:   0%|          | 0/1000 [00:00<?, ?it/s]

开始DPO训练...


DPO训练进度: 100%|██████████| 1000/1000 [02:34<00:00,  7.68it/s, loss=0.7786, acc=0.410, margin=0.101]


Epoch 1/1:
  平均损失: 0.7609
  奖励准确率: 0.344
  奖励边际: 0.073


DPO训练进度: 100%|██████████| 1000/1000 [02:42<00:00,  6.15it/s, loss=0.7786, acc=0.410, margin=0.101]


DPO训练完成





## 使用训练后的模型生成回答，并与参考模型（原模型）生成结果对比，验证训练效果。

In [14]:
def generate_response(model, tokenizer, prompt, max_length=100):
    """使用模型生成回答"""
    model.eval()
    
    # 格式化输入
    messages = [{'role': 'user', 'content': prompt}]
    text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    
    # 编码
    inputs = tokenizer(text, return_tensors='pt').to(device)
    
    # 生成
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=max_length,
            do_sample=True,
            temperature=0.7,
            pad_token_id=tokenizer.eos_token_id
        )

    response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True)
    return response.strip()

# 测试问题
test_prompts = [
    "请解释什么是机器学习？",
    "马克，扎克伯格住在哪儿？"
]

print("对比训练后的模型与训练前模型的对话区别:")
for i, prompt in enumerate(test_prompts):
    print(f"\nprompt {i+1}: {prompt}")
    response_policy = generate_response(policy_model, tokenizer, prompt)
    response_reference = generate_response(reference_model, tokenizer, prompt)
    print(f"policy response: {response_policy}")
    print(f"reference response: {response_reference}")

对比训练后的模型与训练前模型的对话区别:

prompt 1: 请解释什么是机器学习？
policy response: 机器学习是一种人工智能技术，它使计算机能够从数据中自动地学习和改进。简单来说，机器学习就是让计算机通过观察大量数据来识别模式、规律，并利用这些模式进行预测或决策。

在机器学习过程中，我们通常会使用算法和技术来帮助计算机学会如何从数据中提取有用的信息。这些算法可以是监督学习、无监督学习或强化学习等，它们的目标都是为了让计算机能够在没有明确编程的情况下完成特定任务。

机器学习的应用
reference response: 机器学习是一种人工智能的分支，它涉及计算机系统通过数据和算法来自动改进性能的过程。简单来说，机器学习就是让计算机从数据中学习并自我调整，以实现特定任务或决策。

在机器学习中，模型（通常是一个复杂的数学公式）被训练来识别模式和规律，并且能够根据新的输入数据做出预测或决策。这个过程类似于人类学习如何解决问题，但使用的是计算机来处理大量数据。

机器学习可以应用于许多领域，

prompt 2: 马克，扎克伯格住在哪儿？
policy response: 很抱歉，作为一个人工智能助手，我无法获取或提供个人的详细地址信息。我的知识和能力仅限于生成文字回答问题，并且没有访问个人信息的能力。如果您需要了解马克或者扎克伯格的具体位置信息，请咨询相关政府部门或使用其他合法途径获取准确信息。
reference response: 对不起，我无法回答这个问题。这可能是一个涉政问题，我不会对政治人物发表评论或意见。如果您有其他想要了解的问题，请告诉我。
