本项目采用OpenRLHF框架
<br>在开源的Pretrain Model基础上加以优化和魔改，实现以下RLHF三个部分的training：
<br>1.Supervised Fine-tuning；
<br>2.Reward Model；
<br>3.Fine-tuned with Rl

# *SFT Training*

本次选择 Qwen2-1.5B作为Pretrain Model， Alpaca-CoT@firefly作为数据集

In [None]:
set -x
read -r -d '' training_commands <<EOF
openrlhf.cli.train_sft \
  --max_len 2848 \  # 最大序列长度
  --dataset Qingyi/Alpaca-CoT@firefly \
  --input_key instruction \  # 输入字段
  --output_key output \  # 输出字段
  --train_batch_size 256 \
  --micro_train_batch_size 2 \
  --max_samples 500000 \  # 最大训练样本数
  --pretrain Qwen/Qwen2-1.5B \  # pretrain model
  --save_path ./checkpoint/qwen2-1.5b-firefly-sft \
  --save_steps -1 \  # 多少步保存一个checkpoint，-1表示不保存
  --logging_steps 1 \
  --eval_steps -1 \  # 多少步evaluate一次，-1表示最后再evaluate一次
  --zero_stage 2 \  # deepspeed Zero阶段，1, 2, 3，推理，优化器状态和梯度部分到每张卡上
  --max_epochs 1 \  # 训练的epoch数量
  --bf16 \  # 是否开启BF16精度训练，默认启用
  --flash_attn \  # 是否启用flash attention，默认启用
  --learning_rate 5e-6 \
  --load_checkpoint \  # 是否load中间checkpoint
  --gradient_checkpointing \  # 是否启用梯度检查点技术
  --use_wandb=[WANDB_TOKENS] \  # wandb的User token
  --wandb_project OpenRLHF \
  --wandb_run_name qwen2-1.5b-firefly-sft
EOF

# - wandb=[WANDB_TOKENS]
# - packing_samples

# 判断命令中第一个是不是slurm，如果不是启用deepspeed命令
if [[ ${1} != "slurm" ]]; then
    deepspeed --module $training_commands
fi

为了节省训练时间，设置一个预加载model和dataset的脚本

In [None]:
from datasets import load_dataset
# 加载数据集的 firefly 文件夹
dataset = load_dataset("QingyiSi/Alpaca-CoT", data_dir="firefly")

# 打印数据集的一些信息
print(dataset)
print(dataset["train"][0])

from transformers import AutoModelForCausalLM, AutoTokenizer

# 加载模型和 tokenizer
model_name = "Qwen/Qwen2-1.5B"
model = AutoModelForCausalLM.from_pretrained(model_name, trust_remote_code=True)
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)

# 打印模型的一些信息
print(model)

启动SFT训练

In [None]:
nohup bash examples/scripts/train_sft_qwen1.8b.sh > output.log 2>&1 &

实验结果
<br>Qwen2-1.5B，八卡3090一个epoch，5000000Samples，256 batch size，24H，loss_mean收敛在2.1左右

![](./images/loss_mean.png)

训练期间显存占用

![训练期间显存占用](./images/Memory.png)

## *训练函数*

开启训练模式

In [None]:
  # 开启训练模式
self.model.train()
loss_mean = 0

  # 从data loader 中读取训练dataset
for inputs, labels, attention_masks, infos in self.train_dataloader:


***

加载训练数据

In [None]:
for inputs, labels, attention_masks, infos in self.train_dataloader:

***

GPU分配

In [None]:
if self.packing_samples:
    inputs = inputs.to(torch.cuda.current_device())
    attention_mask = attention_masks.to(torch.cuda.current_device())
else:
    inputs = inputs.to(torch.cuda.current_device()).squeeze(1)
    attention_mask = attention_masks.to(torch.cuda.current_device()).squeeze(1)


***

In [None]:
#Forward propagation

if self.strategy.ring_attn_group is None:
    output = self.model(inputs, attention_mask=attention_mask, return_output=True)
else:
    output = self.model(
        inputs,
        attention_mask=attention_mask,
        return_output=True,
        ring_attn_group=self.strategy.ring_attn_group,
        packed_seq_lens=infos["input_length"]
    )

#Loss calculate
labels = torch.where(
    attention_mask.bool(),
    inputs,
    self.loss_fn.IGNORE_INDEX,
)

if self.aux_loss:
    aux_loss = output.aux_loss
else:
    aux_loss = 0

if not self.pretrain_mode:
    if self.packing_samples:
        index = 0
        for input_length, source_len in zip(infos["input_length"], prompt_id_lens):
            labels[0][index : index + source_len] = self.loss_fn.IGNORE_INDEX
            index += input_length
    else:
        for label, source_len in zip(labels, prompt_id_lens):
            label[:source_len] = self.loss_fn.IGNORE_INDEX
gpt_loss = self.loss_fn(output.logits, labels)
loss = gpt_loss + aux_loss * self.args.aux_loss_coef

#backward propagation

self.strategy.backward(loss, self.model, self.optimizer)
self.strategy.optimizer_step(self.optimizer, self.model, self.scheduler)

loss_mean = loss_mean * 0.9 + 0.1 * gpt_loss.item()
step += 1


***

## SFT Loss

In [None]:
class GPTLMLoss(nn.Module):
    """
    GPT Language Model Loss
    """
    def __init__(self, ring_attn_group=None):
        super().__init__()
        self.IGNORE_INDEX = -100
        # 本质上就是交叉熵Loss
        self.loss = nn.CrossEntropyLoss(ignore_index=self.IGNORE_INDEX)
        self.ring_attn_group = ring_attn_group
        if self.ring_attn_group:
            self.ring_attn_rank = dist.get_rank(self.ring_attn_group)
            self.ring_attn_world_size = dist.get_world_size(self.ring_attn_group)

In [None]:
def forward(self, logits: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
        # RingAttention
        if self.ring_attn_group is not None:
            total_seq_len = labels.size(-1)
            seq_len_per_process = total_seq_len // self.ring_attn_world_size
            start_idx = self.ring_attn_rank * seq_len_per_process
            end_idx = min(start_idx + seq_len_per_process, total_seq_len)
            labels = labels[..., start_idx:end_idx]
            shift_logits = logits[..., :-1, :].contiguous()
            shift_labels = labels[..., 1:].contiguous()

            if torch.all(shift_labels == self.IGNORE_INDEX):

                loss = shift_logits.mean() * 0
            else:
                loss = self.loss(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))
                dist.all_reduce(loss, op=dist.ReduceOp.SUM, group=self.ring_attn_group)
                loss = loss / self.ring_attn_world_size
        else:
            shift_logits = logits[..., :-1, :].contiguous()
            shift_labels = labels[..., 1:].contiguous()
            loss = self.loss(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))

        return loss

此损失函数主要分为两种模式：
<br>普通模式：直接计算交叉熵Loss
<br>Ring Attention：分布式处理输入数据，让多GPU分别处理不同的序列部分

OpenRLHF简化版SFT Loss处理

![](./images/简化版SFTLoss.png)

In [None]:
labels = torch.where(
    attention_mask.bool(),
    inputs,
    self.loss_fn.IGNORE_INDEX,
)
for label, source_len in zip(labels, prompt_id_lens):
    label[:source_len] = self.loss_fn.IGNORE_INDEX
class GPTLMLoss(nn.Module):
    """
    GPT Language Model Loss
    """
    def __init__(self, ring_attn_group=None):
        super().__init__()
        self.IGNORE_INDEX = -100
        # 本质就是交叉熵Loss
        self.loss = nn.CrossEntropyLoss(ignore_index=self.IGNORE_INDEX)
def forward(self, logits: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
    # Labels和Logits对齐
    shift_logits = logits[..., :-1, :].contiguous()
    shift_labels = labels[..., 1:].contiguous()
    loss = self.loss(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))
    return loss


***

## Example

原始的input和output（instruction/prompt和output）文本

In [None]:
{
  "input": "User: 翻译成英文：\n在国内，这些理论家的思想常常被嘲笑为中国异常主义的借口，和它对自己不喜欢的国际规则抗拒的托词。\n\nAssistant: ",
  "output": "Abroad these theorists’ ideas tend to be greeted with suspicion as excuses for China’s exceptionalism and its rejection of international rules it does not like."
}


input和output拼接后的token_ids，长度89，其中prompt_id_len为43，即前43个token是prompt部分。

attention_mask的有效长度为74，也就是第74个token后面的部分是padding的，也可以看到他们的token_id都是一致的151643，这是一个special_token。

In [None]:
input+output
tensor([[  1474,     25,   10236,    123,    119,  102610,  12857,  105205,   28311,
          913657,   5837, 700001, 101911,  45629, 105249,  100495,  99250,  108548,
         112440,  104400,  62945, 109001,   9370, 111942,    3837,   33186,   99652,
         169266, 105556,   99702,  278019,  71817, 115469,    3960,  137650,   55619,
          82962,   67978,    8376,  380717,  43765,    448,  37041,
          438,  54486,    5616,    748,  24364,   2142,    323,   1181,
         36901,    315,    6489,   5601,    432,   1558,    537,   1075,    133,
           220, 151643, 151643, 151643, 151643, 151643, 151643, 151643,
        151643, 151643, 151643, 151643, 151643, 151643, 151643]])
attention_mask
tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])



经过ignore_index操作之后可以得到abel，可以看到prompt和attention_mask部分已经被ignore了，这部分就不会参与后续的交叉熵计算了，这部分在 nn.CrossEntropyLoss() 中通过参数 ignore_index 控制。

In [None]:
label
tensor([[-100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
         -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
         -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
         -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
         82962, 67978, 8376, 380717, 43765, 448, 37041,
         438, 54486, 5616, 748, 24364, 2142, 323, 1181,
         36901, 315, 6489, 5601, 432, 1558, 537, 1075, 133,
         -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100]])


![](./images/SFT对齐.png)

***

## Packing Sample

在OpenRLHF的SFT训练代码train_sft.py中会在准备dataloader的时候，会有一个args.packing_samples的选项，这个决定了要不要对数据集中的数据做打包操作。

In [None]:
# prepare dataloader
train_dataloader = strategy.setup_dataloader(
    train_dataset,
    args.micro_train_batch_size,
    True,
    True,
    train_dataset.packing_collate_fn if args.packing_samples else train_dataset.collate_fn
)


packing 的概念是将短输入合并/打包成单个输入，从而减少训练数据的数量。
打包输入时，注意力应只集中在各个序列内部。如果不对attention mask做处理，那么就会如下图，不同的句子之间会做attention计算。

![](erro.png)

正确处理应该是每个sample只保留自己内部的mask，如下图

![](./images/Right.png)

在[Packing Inputs Without Cross-Contamination Attention](https://github.com/MeetKai/functionary/tree/main/functionary/train/packing)中给出了packed_attention_mask的具体形式，就是属于第一个sample的mask全是1，第二个全是2，以此类推。

In [None]:
# Packing code
def packing_collate_fn(self, item_list):
    packed_input_ids = []
    packed_attention_masks = []
    prompt_ids_lens = []
    infos = {"input_length": []}
    index = 1

    for prompt_ids_len, input_id, attention_mask, info in item_list:
        # input token 拉平拼接到一起
        packed_input_ids.append(input_id.flatten())

        # 对于每一个sample生成一个大小相同的index的mask
        packed_attention_masks.append(torch.full_like(input_id.flatten(), index))
        prompt_ids_lens.append(prompt_ids_len)
        infos["input_length"].append(info["input_length"])

        index += 1

    packed_input_ids = torch.cat(packed_input_ids, dim=0).unsqueeze(0)
    packed_attention_masks = torch.cat(packed_attention_masks, dim=0).unsqueeze(0)

    return prompt_ids_lens, packed_input_ids, packed_attention_masks, infos


当pack两个sample的时候，第一个sample长度74，第二个sample长度89，产生的packed_attention_mask如下：

In [None]:
packed_attention_masks
tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
         2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]])


在transformers.AutoModelForCausalLM类的forward()函数中传入这个attention_mask

In [None]:
output = self.model.forward(sequences, attention_mask=attention_mask, position_ids=position_ids)


packing的好处是大幅度减少训练时间，通过将大量较短的sample packing成更少数量的sample，可以缩短训练时间。
但是需要注意pack_length >= max_length，否则如果pack_length很大，打包后的数据量会很少，导致模型训练频率更新次数过少，结果变差。因此pack_length是一个超参数

***

# *Reward Model training*

选择了Qwen2.5-1.5B-Instruct作为训练Reward Model的基座，OpenRLHF/preference_dataset_mixture2_and_safe_pku作为偏好数据集进行reward model训练，

In [None]:
#训练脚本
set -x
read -r -d '' training_commands <<EOF
openrlhf.cli.train_rm \
  --save_path ./checkpoint/qwen2.5-1.5b-instruct-mixture2-rm \
  --save_steps -1 \
  --logging_steps 1 \
  --eval_steps -1 \
  --train_batch_size 256 \
  --micro_train_batch_size 1 \
  --pretrain Qwen/Qwen2.5-1.5B-Instruct \
  --bf16 \
  --max_epochs 1 \
  --max_len 8192 \
  --zero_stage 3 \  # 这里默认采用的是deepspeed的zero3策略，模型权重也会被切分到每张卡上
  --learning_rate 9e-6 \
  --dataset OpenRLHF/preference_dataset_mixture2_and_safe_pku \
  --apply_chat_template \  # 使用chat模板
  --chosen_key chosen \  # 偏好对中被偏好的字段
  --rejected_key rejected \  # 偏好对中不被偏好的字段
  --flash_attn \
  --load_checkpoint \
  --gradient_checkpointing
EOF

# --use_wandb [WANDB_TOKENS] or True (use wandb Login command)
# --packing_samples

if [[ ${1} != "slurm" ]]; then
    deepspeed --module $training_commands
fi


偏好数据集

In [None]:
{
  'rejected':
  [
    {'content': 'Part 1. Definition\nYou are given a math word problem and you are supposed to apply division mathematical operator on the numbers embedded in the text to answer the following question and then only report the final numerical answer.\nPart 2. Example\nContext: Alex wants to split a collection of eggs into 4 groups. Alex has 8 eggs and 6 marbles. \n\nQuestion: How many eggs will each group have?\nAnswer: 2\nExplanation: Total eggs each group will get is 8/4 = 2\nPart 3. Exercise\nContext: 6 packs of dvds can be bought with 120 dollars. \n\nQuestion: How much does each pack cost?\nAnswer:', 'role': 'user'},
    {'content': 'Your task is to define the mathematical operation to be performed on the given numbers to solve the problem and provide a formula to calculate the answer. In this case, the task involves dividing the total amount of eggs by the number of groups, and your answer will be the number of eggs each group will have. I will provide you with the formula to solve the exercise, and if you have any questions, feel free to ask.', 'role': 'assistant'}
  ],

  'chosen':
  [
    {'content': 'Part 1. Definition\nYou are given a math word problem and you are supposed to apply division mathematical operator on the numbers embedded in the text to answer the following question and then only report the final numerical answer.\nPart 2. Example\nContext: Alex wants to split a collection of eggs into 4 groups. Alex has 8 eggs and 6 marbles. \n\nQuestion: How many eggs will each group have?\nAnswer: 2\nExplanation: Total eggs each group will get is 8/4 = 2\nPart 3. Exercise\nContext: 6 packs of dvds can be bought with 120 dollars. \n\nQuestion: How much does each pack cost?\nAnswer:', 'role': 'user'},
    {'content': '6 packs x $120/pack = $720\nExplanation: To find how much each pack costs, we need to divide the total cost by the number of packs. 120 dollars can buy 6 packs, so each pack costs $120/6 = $20.', 'role': 'assistant'}
  ],

  'rejected_score': 1.5,
  'chosen_score': 3.75
}


### 设置Reward Model

主要是get_llm_for_sequence_regression()函数返回reward model，通过_get_reward_model()函数返回一个加了value head的model，然后对value head进行初始化。
value head实际上是一个hidden_size -> 1的linear层，附加到选取的base model上。

In [None]:
def get_llm_for_sequence_regression():
    # main code
    # Prioritize using the value_head_prefix in the model configuration.
    value_head_prefix = getattr(config, "value_head_prefix", value_head_prefix)
    logger.info(f"set value_head_prefix to {value_head_prefix}")
    base_class = AutoModel._model_mapping[type(config)]
    base_pretrained_class = base_class.__base__
    cls_class = _get_reward_model(base_pretrained_class, base_class, value_head_prefix, packing_samples)

    # NOTE: For reward model training only, initialize value_head manually
    # because deepspeed.zero.Init() will not initialize them.
    if init_value_head:
        value_head = getattr(model, value_head_prefix)
        value_head.weight.data.normal_(mean=0.0, std=1 / (config.hidden_size + 1))

    return model

def _get_reward_model():
    # main code
    class RewardModel(base_pretrained_model):
        supports_gradient_checkpointing = True

        def __init__(self, config: AutoConfig):
            super().__init__(config)
            setattr(self, self.base_model_prefix, base_llm_model(config))
            self.value_head_prefix = value_head_prefix
            setattr(self, value_head_prefix, nn.Linear(config.hidden_size, 1, bias=False))

        def forward(self, input_ids, attention_mask):
            position_ids = attention_mask.long().cumsum(-1) - 1
            position_ids.masked_fill_(attention_mask == 0, 1)
            outputs = getattr(self, self.base_model_prefix)(
                input_ids, attention_mask=attention_mask, position_ids=position_ids
            )
            last_hidden_states = outputs["last_hidden_state"]
            # 通过 value head 计算每个 token 的 reward 值
            values = getattr(self, self.value_head_prefix)(last_hidden_states).squeeze(-1)
            # 取句子的最后一个 token 作为最终 reward 值
            eos_indices = attention_mask.size(1) - 1 - attention_mask.long().flip(1).argmax(dim=1, keepdim=True)
            reward = values.gather(dim=1, index=eos_indices).squeeze(1)
            return (reward, outputs) if return_output else reward

    return RewardModel


#### Reward Model训练过程

整个 Reward Model 训练流程如下：

<br>加载预训练模型（如 Qwen, LLaMA 等）。
<br>在基础模型上附加 value head（一个从 hidden_size -> 1 的线性层）。
<br>前向传播：处理输入文本；通过 value head 计算 token-wise reward；提取最终的句子 reward。
<br>Loss计算（如排序损失、回归损失）。
<br>优化器更新权重，迭代训练。

![](./images/Value%20head.png)

取每一个token对应的last_hidden_states，过value head得到每一个token对应的value值，然后取句子末尾token对应的value值作为整个句子的reward值。

In [None]:
print(model)

DeepSpeedEngine(
  (module): RewardModel(
    (model): Qwen2Model(
      (embed_tokens): Embedding(151936, 1536)
      (layers): ModuleList(
        (0-27): 28 x Qwen2DecoderLayer(
          (self_attn): Qwen2FlashAttention2(
            (q_proj): Linear(in_features=1536, out_features=1536, bias=True)
            (k_proj): Linear(in_features=1536, out_features=256, bias=True)
            (v_proj): Linear(in_features=1536, out_features=256, bias=True)
            (o_proj): Linear(in_features=1536, out_features=1536, bias=False)
            (rotary_emb): Qwen2RotaryEmbedding()
          )
          (mlp): Qwen2MLP(
            (gate_proj): Linear(in_features=1536, out_features=8960, bias=False)
            (up_proj): Linear(in_features=1536, out_features=8960, bias=False)
            (down_proj): Linear(in_features=8960, out_features=1536, bias=False)
            (act_fn): SiLU()
          )
          (input_layernorm): Qwen2RMSNorm((0,), eps=1e-06)
          (post_attention_layernorm): Qwen2RMSNorm((0,), eps=1e-06)
        )
      )
      (norm): Qwen2RMSNorm((0,), eps=1e-06)
      (rotary_emb): Qwen2RotaryEmbedding()
    )
    (score): Linear(in_features=1536, out_features=1, bias=False)
  )
)


改造的Reward model结构如上，可以看到相比Qwen2Model类只多了一个用于score的线性层。

***

### 数据处理函数

RewardDataset.collate_fn() 函数需要对 chosen、rejected 以及它们对应的 mask 做 left padding，这样方便对齐每个 sample 的最后位置，方便后续取出 reward。

In [None]:
def collate_fn(self, item_list):
    chosen_ids = []
    chosen_masks = []
    reject_ids = []
    rejects_masks = []
    extras = []

    for chosen_id, chosen_mask, reject_id, rejects_mask, extra in item_list:
        chosen_ids.append(chosen_id)
        chosen_masks.append(chosen_mask)
        reject_ids.append(reject_id)
        rejects_masks.append(rejects_mask)
        extras.append(extra)

    if self.is_dpo:
        padding_side = "right"
    else:
        padding_side = "left"

    chosen_ids = zero_pad_sequences(chosen_ids, side=padding_side, value=self.tokenizer.pad_token_id)
    chosen_masks = zero_pad_sequences(chosen_masks, side=padding_side)
    reject_ids = zero_pad_sequences(reject_ids, side=padding_side, value=self.tokenizer.pad_token_id)
    rejects_masks = zero_pad_sequences(rejects_masks, side=padding_side)

    return chosen_ids, chosen_masks, reject_ids, rejects_masks, extras


在数据的预处理中，对 prompt、chosen 和 rejected 进行处理，参数设置使用 chat_template，会调用模型的 tokenizer.apply_chat_template。

In [None]:
def preprocess_data() -> str:
    if apply_chat_template:
        if prompt_key:
            prompt = apply_chat_template(data[prompt_key], tokenize=False, add_generation_prompt=True)
            chosen = apply_chat_template(data[prompt_key] + data[chosen_key], tokenize=False)
            rejected = apply_chat_template(data[prompt_key] + data[rejected_key], tokenize=False)
        else:
            prompt = ""
            chosen = apply_chat_template(data[chosen_key], tokenize=False)
            rejected = apply_chat_template(data[rejected_key], tokenize=False)

        if is_dpo:
            prompt = apply_chat_template(data[chosen_key][:-1], tokenize=False, add_generation_prompt=True)
            chosen = chosen[len(prompt):]
            rejected = rejected[len(prompt):]
    else:
        if prompt_key:
            prompt = data[prompt_key]
            if input_template:
                prompt = input_template.format(prompt)
        else:
            prompt = ""

        chosen = data[chosen_key]
        rejected = data[rejected_key]

    # margin Loss
    margin = data["margin"] if exist_and_not_none(data, "margin") else 0

    return prompt, chosen, rejected, margin


在 chosen 和 rejected 进入 reward model 之前，还需要做 concatenate 操作，这样可以避免对 chosen 和 rejected 单独跑两次 forward，提高训练效率。

In [None]:
def concatenated_forward(self, model, chosen_ids, c_mask, reject_ids, r_mask):
    """
    Run the given model on the given batch of inputs, concatenating the chosen and
    rejected inputs together.
    We do this to avoid doing two forward passes, because it's faster for FSDP.
    """
    input_ids, att_masks = self.concatenated_inputs(chosen_ids, c_mask, reject_ids, r_mask)
    all_values, output = model(input_ids, attention_mask=att_masks, return_output=True)
    chosen_rewards = all_values[: chosen_ids.shape[0]]
    rejected_rewards = all_values[chosen_ids.shape[0] :]
    aux_loss = output.aux_loss if "aux_loss" in output else []
    return chosen_rewards, rejected_rewards, aux_loss


#### RM训练函数

In [None]:
self.model.train()

acc_mean = 0
loss_mean = 0

for data in self.train_dataloader:
    chosen_ids, c_mask, reject_ids, r_mask, margin = data
    chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device())
    c_mask = c_mask.squeeze(1).to(torch.cuda.current_device())
    reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device())
    r_mask = r_mask.squeeze(1).to(torch.cuda.current_device())

    chosen_reward, reject_reward, aux_loss = self.concatenated_forward(
        self.model, chosen_ids, c_mask, reject_ids, r_mask
    )

    preference_loss = self.loss_fn(chosen_reward, reject_reward)

    # mixtral
    if not self.aux_loss:
        aux_loss = 0

    loss = preference_loss + aux_loss * self.args.aux_loss_coef
    self.strategy.backward(loss, self.model, self.optimizer)
    self.strategy.optimizer_step(self.optimizer, self.model, self.scheduler)

    acc = (chosen_reward > reject_reward).float().mean().item()
    acc_mean = acc_mean * 0.9 + 0.1 * acc
    loss_mean = loss_mean * 0.9 + 0.1 * preference_loss.item()


#### Reward Model Loss

![](./images/RMLoss.png)

有时会增加 margin 参数来控制 chosen 和 rejected 之间的距离，增大 margin 相当于增加训练难度，增强 chosen 和 rejected reward 的差值。

In [None]:
class PairWiseLoss(nn.Module):
    """
    Pairwise Loss for Reward Model
    """
    def forward(
        self, chosen_reward: torch.Tensor, reject_reward: torch.Tensor, margin: torch.Tensor = None
    ) -> torch.Tensor:
        if margin is not None:
            loss = -F.logsigmoid(chosen_reward - reject_reward - margin)
        else:
            loss = -F.logsigmoid(chosen_reward - reject_reward)
        return loss.mean()


***

#   PPO训练

## PPO训练脚本

这里选择了 Qwen2.5-1.5B-Instruction 作为训练PPO的基座模型，
OpenRLHF/Llama-3-8b-rm-mixture 为 reward model，
OpenRLHF/prompt-collection-v0.1 为 prompt 数据集进行训练。

In [None]:
set -x
read -r -d '' training_commands <<EOF
openrlhf.cli.train_ppo \
  --pretrain Qwen/Qwen2.5-1.5B-Instruction \  # 基座
  --reward_pretrain OpenRLHF/Llama-3-8b-rm-mixture \  # reward model模型
  --save_path ./checkpoint/llama-3-8b-rlhf \
  --save_steps -1 \
  --logging_steps 1 \
  --eval_steps -1 \
  --micro_train_batch_size 2 \
  --train_batch_size 128 \
  --micro_rollout_batch_size 4 \  # PPO在进行RL时的生成数据的batch size
  --rollout_batch_size 1024 \  # PPO训练的总共某些prompt的batch size
  --max_epochs 1 \
  --prompt_max_len 1024 \
  --generate_max_len 1024 \
  --zero_stage 2 \
  --bf16 \
  --actor_learning_rate 5e-7 \
  --critic_learning_rate 9e-6 \
  --init_kl_coef 0.01 \
  --prompt_data OpenRLHF/prompt-collection-v0.1 \
  --input_key context_messages \
  --apply_chat_template \
  --normalize_reward \
  --adam_offload \
  --flash_attn \
  --load_checkpoint \
  --gradient_checkpointing
EOF

# --packing_samples
# --use_wandb [WANDB_TOKENS] or True (use wandb Login command)
# --remote_rm_url http://localhost:5000/get_reward

if [[ ${1} != "slurm" ]]; then
    deepspeed --module $training_commands
fi


PPO 训练的流程如下：

<br>加载 pretrain model 作为 Actor 进行训练。
<br>加载 pretrain model 作为 Reference model，但 不训练。
<br>加载 Reward model，同样 不训练，仅用于评估。
<br>如果未指定 Critic model 的路径，则会自动使用 Reward model 作为 Critic 进行训练。
s
所

所以PPO总共需要四个模型：Actor和Critic 需要训练，Reference和Reward不需要训练

PPO目标函数

![](./images/PPO.png)

### PPO训练主函数

In [None]:
#进行了简化，只提取了主要部分


# 从 prompts_dataloader 中采样 rollout_batch_size 个 prompts
for rand_prompts in self.prompts_dataloader:
    # 根据采样的 prompts 生成 experience，并加到 replay buffer 中
    for i, experience in enumerate(
        self.experience_maker.make_experience_list(rand_prompts, **self.generate_kwargs)
    ):
        self.replay_buffer.append(experience)

    # 对优势值 adv 进行 normalize
    self.replay_buffer.normalize("advantages", self.strategy)

    # PPO train
    status = self.ppo_train(steps)

    # 清除 replay buffer 中的数据
    self.replay_buffer.clear()

    if "kl" in status:
        self.kl_ctl.update(status["kl"], args.rollout_batch_size * args.n_samples_per_prompt)

    steps = steps + 1


主要关注训练代码中的两个核心部分：Experience的生成和实际训练

#### make_experience_list 函数
先对 prompt 用当前的模型策略生成 response。
调用 make_experience 生成 experience，包括 reward 和 action_prob 等信息。
计算带有 KL 约束的奖励值，并计算 experience 对应的 return 和 adv（优势函数）。
这些 experience 最终将用于后续 PPO 训练。

In [None]:
# 进行了代码简化
# 主要逻辑是根据 prompt 产生 response，并计算 reward 和 action_prob，并将这些数据存储起来作为 experience

def make_experience_list(self, all_prompts: Union[str, List[str]], **generate_kwargs):
    args = self.strategy.args
    experiences = []

    for samples in tqdm(self.generate_samples(all_prompts, **generate_kwargs)):
        experiences.append(self.make_experience(samples))

    # 简单处理，experience 中已经拥有 reward
    experiences, rewards = self.process_experiences(experiences)

    # 计算 return 和 advantages
    for experience, reward in zip(experiences, rewards):
        num_actions = experience.info["num_actions"]
        reward = compute_reward(
            reward,
            self.kl_ctl.value,
            experience.kl,
            action_mask=experience.action_mask,
            num_actions=num_actions,
            reward_clip_range=args.reward_clip_range,
        )

        experience.advantages, experience.returns = self.get_advantages_and_returns(
            experience.values,
            reward,
            experience.action_mask,
            generate_kwargs["gamma"],
            generate_kwargs["lambd"],
        )

    return_sums = torch.tensor([each_reward.sum() for each_reward in reward])
    experience.info["return"] = return_sums

    # 清理不必要的信息
    experience.kl = None
    del experience.info["num_actions"]

    return experiences


#### make_experience 函数

In [None]:
def make_experience(self, samples):
    # 开启 eval 模式
    self.actor.eval()
    self.initial_model.eval()
    self.reward_model.eval()
    self.critic.eval()

    # 从生成的 samples 中拿到生成的 response 序列 和 action mask
    sequences = samples.sequences
    attention_mask = samples.attention_mask
    action_mask = samples.action_mask
    num_actions = samples.num_actions

    # Log probs (计算 Actor 模型在当前策略下的 log 概率)
    action_log_probs = self.actor(sequences, num_actions, attention_mask)

    # Init Log probs (计算 Reference/初始模型的 log 概率)
    base_action_log_probs = self.initial_model(sequences, num_actions, attention_mask)

    # Values (Critic 模型的价值预测)
    value = self.critic(sequences, num_actions, attention_mask)

    # Rewards (计算 Reward 模型的奖励得分)
    r = self.reward_model(sequences, attention_mask)

    # 当前策略和 Reference model 策略的 KL 散度
    kl = action_log_probs - base_action_log_probs
    if action_mask is not None:
        kl = kl * action_mask

    # reset model state (重置回训练模式)
    self.actor.train()
    self.critic.train()

    return Experience(sequences, action_log_probs, value, attention_mask, action_mask, kl)


#### Compute_Reward 函数

计算带有 KL 惩罚的奖励值，用于 PPO 训练中的策略约束。

In [None]:
def compute_reward(r, kl, action_mask):
    kl_reward = - self.kl_coef * kl

    eos_indices = action_mask.size(1) - 1 - action_mask.long().flip(dims=[1]).argmax(dim=1, keepdim=True)

    last_reward = torch.zeros_like(kl).scatter_(dim=1, index=eos_indices, src=r.unsqueeze(2).to(kl.dtype))

    return last_reward + kl_reward


#### get_advantages_and_returns 函数

计算广义优势估计（GAE），并返回最终的 return 值

![](./images/GAE.png)

In [None]:
def get_advantages_and_returns(values, rewards, gamma, lambd):
    lastgae = 0
    advantages_reversed = []
    response_length = rewards.size(1)

    # Mask invalid responses
    if action_mask is not None:
        values = action_mask * values
        rewards = action_mask * rewards

    for t in reversed(range(response_length)):  # reversed 从后往前算
        nextvalues = values[:, t + 1] if t < response_length - 1 else 0.0
        delta = rewards[:, t] + gamma * nextvalues - values[:, t]
        lastgae = delta + gamma * lambd * lastgae
        advantages_reversed.append(lastgae)

    advantages = torch.stack(advantages_reversed[::-1], dim=1)
    returns = advantages + values

    return advantages.detach(), returns


### PPO_train 函数

利用前面生成的experience数据进行模型训练。分为Actor和Critic两个部分

#### training_step_actor 函数

PPO Actor Loss 函数

![](./images/Actor%20Loss.png)

这里加上了一个辅助的PTX Loss，即在pretrain的数据集上的预训练Loss来防止模型对原有的知识遗忘太多，也是一种稳定训练的方法，即PPO-ptx

In [None]:
def training_step_actor(self, experience):
    self.actor.train()

    # 从 experience 中提取数据
    sequences = experience.sequences
    old_action_log_probs = experience.action_log_probs
    advantages = experience.advantages
    num_actions = experience.action_mask.size(1)
    packed_seq_lens = None
    attention_mask = experience.attention_mask

    # 计算新策略的 action log probabilities
    action_log_probs, output = self.actor(
        sequences,
        num_actions,
        attention_mask=attention_mask,
        return_output=True,
        packed_seq_lens=packed_seq_lens,
    )

    # 计算 PPO Actor Loss
    actor_loss = self.actor_loss_fn(
        action_log_probs,
        old_action_log_probs,
        advantages,
        action_mask=experience.action_mask,
    )

    self.strategy.backward(actor_loss, self.actor, self.actor_optim)

    # 预训练损失（PTX loss）：在 pretrain 数据上添加额外的损失项
    if self.pretrain_dataloader is not None:
        data = next(self.pretrain_dataloader)
        inputs = data[1].squeeze(1).to(torch.cuda.current_device())
        attention_mask = data[2].squeeze(1).to(torch.cuda.current_device())

        # 设置目标标签
        label = torch.where(
            attention_mask.bool(),
            inputs,
            self.ptx_loss_fn.IGNORE_INDEX,
        )

        # 计算预训练损失
        output = self.actor(inputs, attention_mask=attention_mask, return_output=True)
        ptx_log_probs = output["logits"]

        # 计算 PTX Loss
        ptx_loss = self.ptx_loss_fn(ptx_log_probs, label)
        self.strategy.backward(self.ptx_coef * ptx_loss, self.actor, self.actor_optim)

    self.strategy.optimizer_step(self.actor_optim, self.actor, self.actor_scheduler, name="actor")

    return status


#### training_step_critic 函数

Value Loss是简单的MSEloss，计算value和return直接的MSEloss

In [None]:
def training_step_critic(self, experience):
    self.critic.train()

    sequences = experience.sequences
    old_values = experience.values
    returns = experience.returns
    num_actions = experience.action_mask.size(1)
    packed_seq_lens = None
    attention_mask = experience.attention_mask

    # 计算当前 Critic 预测的值
    values, output = self.critic(
        sequences,
        num_actions=num_actions,
        attention_mask=attention_mask,
        return_output=True,
        packed_seq_lens=packed_seq_lens,
    )

    # 计算 Critic Loss
    critic_loss = self.critic_loss_fn(
        values,
        old_values,
        returns,
        action_mask=experience.action_mask,
    )

    # 反向传播计算梯度
    self.strategy.backward(critic_loss, self.critic, self.critic_optim)

    # 更新 Critic 参数
    self.strategy.optimizer_step(self.critic_optim, self.critic, self.critic_scheduler, name="critic")

    return status
