DPO

$\mathbb{E}_{x, y, y' \sim D}\log \sigma (\beta(\log (\frac{\pi_{\theta}(y', x)}{\pi_{\theta_{ref}}(y', x)} ) - \log (\frac{\pi_{\theta}(y, x)}{\pi_{\theta_{ref}}(y, x)})))$

In [None]:
import torch

def dpo_loss(new_logpro, old_logpro, new_ref_logpro, old_ref_logpro, beta=1.0):
    loss = -beta * torch.log(torch.sigmoid(new_logpro - old_logpro + old_ref_logpro - new_ref_logpro))
    return loss.mean()

PPO

$r_i = \frac{\pi_{\theta}(y_i, x_i)}{\pi_{\theta_{ref}}(y_i, x_i)} $

$\mathbb{E}_{i}[ \min(r_i * A_i, clip(1 + \epsilon, 1 - \epsilon, r_i) * A_i)]$

In [None]:
def ppo_loss(new_logpro, old_logpro, advantages, epsilon=0.2):
    r = torch.exp(new_logpro-old_logpro)
    loss = -torch.min(r * advantages, torch.clamp(r, min=1 - epsilon, max=1 + epsilon) * advantages)
    return loss.mean()

GAE优势估计

$\delta_t = r_t + \gamma(1-d_{t+1}) V_{s_{i+1}} - V_{s_{t}}$

$A_{t} = \delta_{t} + \gamma \lambda (1-d_{t+1}) A_{t+1}$

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# ========= GAE：单条轨迹版本（向量：长度 T）=========
@torch.no_grad()
def compute_gae_1d(rewards, values, dones, next_value, gamma=0.99, lam=0.95):
    """
    rewards: [T]
    values:  [T]           V(s_t) (估计值)
    dones:   [T]           终止标记(0/1)；用于(1 - done)作nonterminal
    next_value: scalar     V(s_{T})
    return: advantages[T], returns[T]
    """
    T = rewards.shape[0]
    advantages = torch.zeros_like(rewards)
    last_gae = 0.0
    for t in reversed(range(T)):
        nonterminal = 1.0 - dones[t]
        next_val = next_value if t == T - 1 else values[t + 1]
        delta = rewards[t] + gamma * next_val * nonterminal - values[t]
        last_gae = delta + gamma * lam * nonterminal * last_gae
        advantages[t] = last_gae
    returns = advantages + values
    return advantages, returns


# ========= GAE：时间-批次版本（矩阵：[T, B]）=========
@torch.no_grad()
def compute_gae_tb(rewards, values, dones, next_value, gamma=0.99, lam=0.95):
    """
    rewards:    [T, B]
    values:     [T, B]
    dones:      [T, B]     (0/1)
    next_value: [B]        V(s_{T})
    return: advantages[T, B], returns[T, B]
    """
    T, B = rewards.shape
    advantages = torch.zeros_like(rewards)
    last_gae = torch.zeros(B, device=rewards.device, dtype=rewards.dtype)
    for t in reversed(range(T)):
        nonterminal = 1.0 - dones[t]        # [B]
        next_val = torch.where(
            torch.tensor(t == T - 1, device=rewards.device),
            next_value,
            values[t + 1]
        )                                   # [B]
        delta = rewards[t] + gamma * next_val * nonterminal - values[t]
        last_gae = delta + gamma * lam * nonterminal * last_gae
        advantages[t] = last_gae
    returns = advantages + values
    return advantages, returns


# ========= PPO-Clip 策略损失 =========
def ppo_clip_loss(new_logprobs, old_logprobs, advantages, eps=0.2):
    """
    new_logprobs, old_logprobs, advantages: 形状一致（如 [N] 或 [T*B]）
    """
    # 建议把 advantages detach & 标准化
    adv = (advantages - advantages.mean()) / (advantages.std(unbiased=False) + 1e-8)

    r = torch.exp(new_logprobs - old_logprobs)       # [N]
    surr1 = r * adv
    surr2 = torch.clamp(r, 1 - eps, 1 + eps) * adv
    # 最大化 clip 目标 => 最小化其相反数
    loss_pi = -torch.min(surr1, surr2).mean()
    return loss_pi


# ========= 值函数损失（critic）=========
def value_loss(values_pred, returns):
    return 0.5 * F.mse_loss(values_pred, returns)


# ========= 熵正则（离散动作示例：直接用策略分布的熵）=========
def entropy_loss_from_logits(logits):
    # logits: [N, A]
    probs = logits.softmax(dim=-1)
    log_probs = (probs + 1e-8).log()
    ent = -(probs * log_probs).sum(dim=-1)           # [N]
    return -ent.mean()  # 作为“损失”项取负号；总损失里再减去 c_e * L_S 即可


# ========= 一个整合的例子：给出本轮 PPO 的总损失 =========
def ppo_total_loss(
    policy_logits,              # [N, A] 新策略的logits
    new_logprobs_taken,         # [N]    新策略在已选动作上的log prob
    old_logprobs_taken,         # [N]    旧策略在已选动作上的log prob (rollout时缓存)
    values_pred,                # [N]    新 value 预测 V(s_t)
    returns,                    # [N]    GAE 回报目标
    advantages,                 # [N]    GAE 优势
    clip_eps=0.2,
    vf_coef=0.5,
    ent_coef=0.01
):
    loss_pi = ppo_clip_loss(new_logprobs_taken, old_logprobs_taken, advantages, eps=clip_eps)
    loss_v  = value_loss(values_pred, returns)
    loss_ent = entropy_loss_from_logits(policy_logits)  # 注意这个函数返回的是“负熵的均值”，已是损失形式
    total = loss_pi + vf_coef * loss_v + ent_coef * loss_ent
    return total, {"pi": loss_pi.item(), "v": loss_v.item(), "ent": loss_ent.item()}

GRPO

$r_t = \frac{\pi_{\theta}(a_t|s_t)}{\pi_{\theta_{ref}(a_t|s_t)}}$

$a_t = \frac{(a_t - \mu_{group})}{\sqrt(\sigma_{group})}$

$\mathbb{E}_{t}(\min(r_t * a_t, clap(1+\epsilon, 1-\epsilon, r_t) * a_t)) - D_{KL}(\pi_{\theta} || \pi_{\theta_{ref}})$

In [None]:
def grpo_loss(rewards, new_logpro, old_logpro, epsilon=1e-5):
    mean_r = rewards.mean(dim=-1, keepdim=True)
    std_r = rewards.std(dim=-1, keepdim=True)
    loss = torch.exp(new_logpro - old_logpro) * ((rewards - mean_r) / (std_r + epsilon))
    kl_loss = -torch.mean(torch.exp(old_logpro) * (new_logpro - old_logpro))
    return loss + kl_loss

GSPO



In [None]:
def gspo_loss(rewards, new_logpro, old_logpro, epsilon=1e-5):
    rewards = (rewards - rewards.mean(dim=-1)) / (rewards.std(dim=-1) + 1e-5)
    ratio = torch.exp((new_logpro - old_logpro).mean(dim=-1))
    loss = torch.min(ratio * rewards, torch.clamp(ratio, min=1-epsilon, max=1+epsilon) * rewards).mean()
    return loss

DAPO

$_{\text{DAPO}}(\theta) = \mathbb{E}\Big[ \min\big( r_i(\theta) A_i,\; \text{clip}(r_i(\theta),\, 1-\epsilon_{\text{low}},\, 1+\epsilon_{\text{high}})\,A_i \big) \Big]$

GBPO

$
\mathcal{L}_{GBPO} = - \ \mathbb{E}{(s,a)} \Bigg[ \mathbf{1}[A \neq 0] \cdot
\frac{\pi_\theta(a|s)}{
\begin{cases}
\max(\pi_{\text{old}}(a|s), \ \text{sg}[\pi_\theta(a|s)]) & A \ge 0 \\[6pt]
\max(\pi_{\text{old}}(a|s), \ 1 - \text{sg}[\pi_\theta(a|s)]) & A < 0
\end{cases}
} \cdot A \Bigg]
$

In [None]:
import torch
import torch.nn.functional as F

def build_advantage(q: torch.Tensor, neg: torch.Tensor, tau_B: float = 0.75):
    """
    q: 归一化偏好分（如时长分位数 in [0,1]）
    neg: 是否点踩（bool）
    规则：q>tau_B 且非点踩 -> +1；点踩 -> -1；其余 -> 0
    """
    A = torch.zeros_like(q, dtype=torch.float)
    A[(q > tau_B) & (~neg)] = 1.0
    A[neg] = -1.0
    return A

def gbpo_loss_from_logits(
    logits: torch.Tensor,         # [B, V]
    actions: torch.Tensor,        # [B] 已选token/物品id
    advantages: torch.Tensor,     # [B] in {-1, 0, +1}
    old_logprobs: torch.Tensor | None = None,  # [B] 旧策略logP，可为None
    eps: float = 1e-6,
):
    # 当前策略对 chosen action 的概率 πθ
    logp_all = F.log_softmax(logits, dim=-1)          # [B, V]
    logp_new = logp_all.gather(-1, actions.unsqueeze(-1)).squeeze(-1)  # [B]
    pi_new = logp_new.exp().clamp_(eps, 1 - eps)      # [B]

    # 旧策略概率；若没有，直接用 detach(πθ) 作为 π_old
    if old_logprobs is None:
        pi_old = pi_new.detach()
    else:
        pi_old = old_logprobs.exp().clamp_(eps, 1 - eps)

    # 动态分母 π_old'（GBPO关键）
    pos = (advantages >= 0)
    neg = (advantages < 0)
    denom = torch.empty_like(pi_new)
    # 正样本：max(π_old, sg(π_new))
    denom[pos] = torch.maximum(pi_old[pos], pi_new.detach()[pos])
    # 负样本：max(π_old, 1 - sg(π_new))
    denom[neg] = torch.maximum(pi_old[neg], 1.0 - pi_new.detach()[neg])

    ratio = (pi_new / denom)
    # 只对 A != 0 的样本回传
    mask = advantages != 0
    if mask.any():
        loss = -(ratio[mask] * advantages[mask]).mean()
    else:
        loss = torch.zeros([], device=logits.device, dtype=logits.dtype)

    stats = {
        "pi_new_mean": pi_new.mean().item(),
        "denom_mean": denom.mean().item(),
        "ratio_mean": ratio[mask].mean().item() if mask.any() else float("nan"),
        "pos_frac": pos.float().mean().item(),
        "neg_frac": neg.float().mean().item(),
    }
    return loss, stats

# ---- 使用示例 ----
B, V = 8, 1000
logits   = torch.randn(B, V)              # 来自你的策略网络
actions  = torch.randint(0, V, (B,))      # 这一步产生的选择
q        = torch.rand(B)                  # 时长分位数（举例）
neg_flag = torch.rand(B) < 0.2            # 20% 点踩（举例）
A        = build_advantage(q, neg_flag, tau_B=0.75)

loss, stats = gbpo_loss_from_logits(logits, actions, A, old_logprobs=None)
loss.backward()
print("GBPO loss:", float(loss), stats)

list-wise DPO

$\mathcal{L} = - \mathbb{E} \Bigg[ \log \sigma \Big( \log \sum_{i_l \in \mathcal{I}l} \exp(rw\Delta \, \max(0, \hat r_\theta(x_u, i_w) - \hat r_\theta(x_u, i_l) - \delta)) \Big) + \alpha \log \pi_\theta(i_w|x_u) \Bigg]$

In [None]:
import torch
import torch.nn.functional as F

def listwise_dpo_loss(logprobs_w, logprobs_l, logprobs_ref_w, logprobs_ref_l,
                      delta=0.0, alpha=0.1, beta=0.1):
    """
    logprobs_w: [B]      模型对正样本的 log P
    logprobs_l: [B, L]   模型对负样本的 log P
    logprobs_ref_w: [B]  参考模型对正样本的 log P
    logprobs_ref_l: [B, L] 参考模型对负样本的 log P
    """

    # 计算 reward：r = beta * (log pi - log pi_ref)
    r_w = beta * (logprobs_w - logprobs_ref_w)             # [B]
    r_l = beta * (logprobs_l - logprobs_ref_l)             # [B, L]

    # margin: max(0, r_w - r_l - delta)
    rw_delta = torch.clamp(r_w.unsqueeze(1) - r_l - delta, min=0)  # [B, L]

    # list-wise aggregation: log sum exp
    agg = torch.logsumexp(rw_delta, dim=1)   # [B]

    # 主项：log σ(agg)
    loss_main = -F.logsigmoid(agg).mean()

    # 正则项：alpha * log pi_theta(i_w|x)
    loss_reg = -alpha * logprobs_w.mean()

    return loss_main + loss_reg


# ========== 测试 ==========
B, L = 2, 3
logprobs_w     = torch.tensor([-1.0, -0.5])          # 模型正样本
logprobs_l     = torch.tensor([[-2.0, -3.0, -1.5],
                               [-1.0, -2.5, -2.0]]) # 模型负样本
logprobs_ref_w = torch.tensor([-1.2, -0.6])          # ref正
logprobs_ref_l = torch.tensor([[-2.2, -2.8, -1.6],
                               [-1.1, -2.6, -2.1]]) # ref负

loss = listwise_dpo_loss(logprobs_w, logprobs_l, logprobs_ref_w, logprobs_ref_l)
print("List-wise DPO Loss:", loss.item())

ASPO: Asymmetric Importance Sampling Policy Optimization

$r_i = \frac{\pi_{\theta}(y_i| x) \pi_{old}(y_i| x)}{sg(\pi_{\theta}^2(y_i| x))} $

In [None]:
import torch

def aspo_loss(
    logp,             # 当前策略 log πθ(a|s)
    logp_old,         # 旧/参考策略 log π_ref(a|s)
    advantages,       # A_t（可来自GRPO的group归一化）
    eps_low=0.1, 
    eps_high=0.1, 
    kl_coef=0.0,      # 可选 KL 系数
):
    # 标准比率 r_std = π / π_old（用于 A<0，也用于hard-clip判定）
    r_std = torch.exp(logp - logp_old)

    # 倒置比率 r_flip = (π_old * π) / sg(π^2)  ⇔  exp(logp_old + logp - 2*sg(logp))
    r_flip = torch.exp(logp_old + logp - 2.0 * logp.detach())

    # Step 1: Hard clipping (token masking) —— 用标准比率判定并屏蔽极端样本
    mask_hard = torch.zeros_like(advantages, dtype=torch.bool)
    mask_hard |= (advantages < 0) & (r_std < (1.0 - eps_low))
    mask_hard |= (advantages > 0) & (r_std > (1.0 + eps_high))

    # 屏蔽后不参与学习（把 A 置 0）
    adv_eff = advantages.masked_fill(mask_hard, 0.0)

    # Step 2: 选择比率  ——  A>0 用倒置比率；A<0 用标准比率
    r = torch.where(adv_eff > 0, r_flip, r_std)

    # Step 3: Soft dual clipping —— 对 A>0 的 r 做软裁剪（只裁值、不裁梯度）
    # 注意：detach 只用于权重，不影响 logp 的梯度
    r_pos_soft = torch.clamp(r, 1.0 - eps_low, 1.0 + eps_high)
    r_soft = torch.where(adv_eff > 0, r_pos_soft, r)          # 只对正优势启用soft-clip
    weight = r_soft.detach()                                  # “只裁值不裁梯度”的关键

    # 策略梯度形式：- weight * A * log πθ
    pg_loss = -(weight * adv_eff * logp)

    # 可选：KL 正则（和参考策略/旧策略）
    kl = torch.exp(logp) * (logp - logp_old)  # per-token KL(π || π_old) 的近似项
    loss = pg_loss + kl_coef * kl

    # 归约（按需要也可先对序列做平均再 batch 平均）
    return loss.mean()