## GRPO相关推导实现

#### 1. 优势函数部分

在GRPO中，优势函数通过真实环境中的一组奖励reward计算得到，而不是通过对价值估计来计算得到的。
$$ A_i = \frac{r_i - mean({r_1, r2,...,r_G})}{std({r1, r2,...,r_G})}  $$

##### 代码实现

In [21]:
import torch

In [22]:
def grpo_adv(rewards):
    """
    计算GRPO中的优势函数
    
    Args:
        rewards: 一组奖励值，形状为 [batch_size]
        
    Returns:
        advantages: 计算得到的优势函数值，形状与rewards相同
    """
    
    # 计算rewards的均值和标准差
    mean_rewards = torch.mean(rewards)
    std_rewards = torch.std(rewards)
    
    # 防止除以零
    if std_rewards == 0:
        return torch.zeros_like(rewards)
    
    # 计算优势函数
    advantages = (rewards - mean_rewards) / (std_rewards + 1e-8)
    
    return advantages
    

In [23]:
rewards = torch.tensor([0, 1, 0, 1, 1, 0], dtype=torch.float)
# convert to float
print(rewards)
adv_rst = grpo_adv(rewards)
print(adv_rst)

tensor([0., 1., 0., 1., 1., 0.])
tensor([-0.9129,  0.9129, -0.9129,  0.9129,  0.9129, -0.9129])


#### 2. KL散度部分

GRPO采用KL散度来计算两个概率分布之间的差异
$$ \text{D}_{\text{KL}} \left( \pi_{\theta} \left\| \pi_{\text{ref}} \right\| \right) = \frac{\pi_{\text{ref}}(o_i|q)}{\pi_{\theta}(o_i|q)} - \log \frac{\pi_{\text{ref}}(o_i|q)}{\pi_{\theta}(o_i|q)} - 1 $$

##### 代码实现

In [None]:
import torch

In [19]:
def grpo_kl(pi_logprobs, pi_ref_logprobs):
    """
    计算KL散度
    
    Args:
    pi_logprobs: 当前策略的对数概率
    pi_ref_logprobs: 参考策略的对数概率
    
    Return:
    KL散度值
    """
    # 计算概率比值的对数: log(pi_ref/pi) = log(pi_ref) - log(pi)
    log_ratio = pi_ref_logprobs - pi_logprobs
    
    # 计算概率比值: pi_ref/pi = exp(log(pi_ref/pi))
    ratio = torch.exp(log_ratio)
    
    # 计算KL散度: ratio - log(ratio) - 1
    kl = ratio - log_ratio - 1.0
    
    return kl

In [20]:

# 创建随机的对数概率分布
batch_size = 5
pi_logprobs = torch.randn(batch_size)
pi_ref_logprobs = torch.randn(batch_size)

# 计算KL散度
kl = grpo_kl(pi_logprobs, pi_ref_logprobs)

# 打印结果
print("当前策略的对数概率:", pi_logprobs)
print("参考策略的对数概率:", pi_ref_logprobs)
print("计算的KL散度:", kl)
print("KL散度平均值:", kl.mean().item())



当前策略的对数概率: tensor([-0.5775,  0.1163, -1.1137, -0.2934, -0.9523])
参考策略的对数概率: tensor([ 1.2962, -0.7097,  1.6133, -0.4940,  0.9184])
计算的KL散度: tensor([ 3.6390,  0.2638, 11.5589,  0.0188,  3.6218])
KL散度平均值: 3.8204777240753174


#### 3. GRPO Loss

$$ \frac{1}{G} \sum_{i=1}^G \left( \min \left( \frac{\pi_{\theta}(o_i|q)}{\pi_{\theta_{\text{old}}}(o_i|q)} A_i, \text{clip} \left( \frac{\pi_{\theta}(o_i|q)}{\pi_{\theta_{\text{old}}}(o_i|q)}, 1 - \varepsilon, 1 + \varepsilon \right) A_i \right) - \beta \text{D}_{\text{KL}} \left( \pi_{\theta} \left\| \pi_{\text{ref}} \right\| \right) \right) $$

这部分是目标函数的表达式，包含一个求和项，其中 $G$ 是总的观测数。

- 对于每个观测 $o_i$，计算当前策略 $\pi_{\theta}(o_i|q)$ 与旧策略 $\pi_{\theta_{\text{old}}}(o_i|q)$ 的比率，并乘以对应的优势函数 $A_i$。
- 使用 $\text{clip}$ 函数对比率进行限制，防止其偏离 1 过多。
- 最后，减去一个 $\beta$ 倍的 KL 散度 $\text{D}_{\text{KL}} \left( \pi_{\theta} \left\| \pi_{\text{ref}} \right\| \right)$，这部分用于控制新策略与参考策略 $\pi_{\text{ref}}$ 的相似性。

##### 3.1 代码实现（手推）

In [None]:
def grpo_loss(pi_logprob, 
              pi_old_logprob, 
              pi_ref_logprob, 
              advantage, 
              input_len, 
              len_oi):
    """
    计算GRPO损失
    
    Args:
        pi_logprob: 当前策略的对数概率 [batch_size, input_len]
        pi_old_logprob: 旧策略的对数概率 [batch_size, input_len]
        pi_ref_logprob: 参考策略的对数概率 [batch_size, input_len]
        advantage: 优势函数 [batch_size]
        input_len: 输入长度
        len_oi: 观测长度 [batch_size]
    Returns:
        loss: GRPO损失 [batch_size]
    """
    epsilon = 0.2
    beta = 0.3
    group_size = 2
    
    # 创建mask并处理无效样本
    mask = torch.ones_like(pi_logprob)
    mask[torch.where(len_oi == 0)] = 0
    
    # 计算概率比率
    ratio = torch.exp(pi_logprob - pi_old_logprob)
    ratio_clip = torch.clamp(ratio, 1 - epsilon, 1 + epsilon)
    
    # 调整advantage维度以匹配输入
    advantage = advantage.unsqueeze(1).expand(-1, input_len)
    
    # 计算policy gradient
    policy_gradient = torch.min(ratio * advantage, ratio_clip * advantage)
    
    # 计算KL散度
    kl = grpo_kl(pi_logprob, pi_ref_logprob)
    
    # 先应用mask
    masked_loss = (policy_gradient - beta * kl) * mask
    
    # 计算每个样本的有效token数量（防止除零）
    valid_tokens = torch.clamp(len_oi, min=1).unsqueeze(1)
    
    # 计算损失
    loss = (-1 / group_size) * masked_loss.sum(dim=1) / valid_tokens.squeeze(1)
    
    return loss