<a href="https://colab.research.google.com/github/AlvinScrp/minimind/blob/master/%E6%B5%8B%E8%AF%95%E4%BB%A3%E7%A0%81.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# RoPE

In [2]:
import torch

# -----------------------------------------------------------------
# 函数 1: 预计算 RoPE 复数值
# (来自您的第一个问题)
# -----------------------------------------------------------------
def precompute_pos_cis(dim: int, end: int = int(32 * 1024), theta: float = 1e6):
    """
    预计算旋转位置编码（Rotary Position Embeddings, RoPE）所需的复数值

    参数:
        dim: 隐藏维度大小 (head_dim)
        end: 最大序列长度，默认为32K
        theta: RoPE中的缩放因子
    返回:
        pos_cis: 预计算好的复数形式的位置编码，形状为[end, dim//2]
    """
    # 计算不同频率的逆频率项
    freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))
    # 生成位置索引
    t = torch.arange(end, device=freqs.device)  # type: ignore
    # 计算外积得到每个位置对应的每个频率 (角度)
    freqs = torch.outer(t, freqs).float()  # type: ignore
    # 使用欧拉公式 e^(i*θ) = cos(θ) + i*sin(θ) 生成复数
    # 幅值为1，相位为freqs的复数值
    pos_cis = torch.polar(torch.ones_like(freqs), freqs)  # complex64
    return pos_cis

# -----------------------------------------------------------------
# 函数 2: 应用 RoPE
# (来自您的第二个问题)
# -----------------------------------------------------------------
def apply_rotary_emb(xq, xk, pos_cis):
    """
    将旋转位置编码应用到查询(Q)和键(K)张量上

    参数:
        xq: 查询张量, 形状为[batch_size, seq_len, n_heads, head_dim]
        xk: 键张量, 形状为[batch_size, seq_len, n_kv_heads, head_dim]
        pos_cis: 预计算的位置编码复数, 形状为 [seq_len, head_dim // 2]

    返回:
        应用位置编码后的查询和键张量
    """
    def unite_shape(pos_cis, x):
        """
        调整pos_cis的形状使其与输入张量x兼容，便于广播计算
        """
        ndim = x.ndim
        # 预期的 pos_cis 形状 [seq_len, dim//2]
        # 预期的 x 形状 [B, seq_len, H, dim//2]
        assert 0 <= 1 < ndim
        # 检查 pos_cis 的形状是否与 x 的 seq_len 和 dim//2 维度匹配
        assert pos_cis.shape == (x.shape[1], x.shape[-1])
        # 创建一个新形状，只保留序列长度和特征维度，其余维度设为1
        # [1, seq_len, 1, dim//2]
        shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)]
        return pos_cis.view(*shape)

    # 将Q和K重塑并转换为复数形式
    # [B, L, H, D] -> [B, L, H, D//2, 2]
    xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
    xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))

    # 调整pos_cis的形状以便与输入张量兼容
    # [L, D//2] -> [1, L, 1, D//2]
    pos_cis = unite_shape(pos_cis, xq_)

    # 应用旋转操作：在复数域中，乘以pos_cis等同于旋转
    # (B, L, H, D//2) * (1, L, 1, D//2) -> (B, L, H, D//2)
    xq_rotated_complex = xq_ * pos_cis
    xk_rotated_complex = xk_ * pos_cis

    # 转换回复数对应的实数对
    # [B, L, H, D//2] -> [B, L, H, D//2, 2]
    xq_out = torch.view_as_real(xq_rotated_complex)
    xk_out = torch.view_as_real(xk_rotated_complex)

    # 拍平最后一个维度，恢复 [B, L, H, D]
    xq_out = xq_out.flatten(start_dim=3)
    xk_out = xk_out.flatten(start_dim=3)

    # 转换回输入张量的原始数据类型
    return xq_out.type_as(xq), xk_out.type_as(xk)



In [3]:
# --- 1. 设置超参数 ---
torch.manual_seed(42)  # 为了可复现性

batch_size = 1
seq_len = 4       # 序列长度
n_heads = 2       # 查询头数量
n_kv_heads = 1    # 键/值头数量 (模拟 GQA)
head_dim = 8      # 头部维度 (必须是偶数)
max_seq_len = 128 # 预计算的最大长度

print(f"--- Demo 参数 ---")
print(f"Batch Size: {batch_size}, Seq Len: {seq_len}, Head Dim: {head_dim}")
print(f"Query Heads: {n_heads}, KV Heads: {n_kv_heads}\n")
# --- 2. 预计算 pos_cis ---
# precompute_pos_cis 返回 [max_seq_len, head_dim // 2]
pos_cis_precomputed = precompute_pos_cis(head_dim, max_seq_len)

# 在实际使用中，我们只截取当前序列长度所需的部分
# 形状变为 [seq_len, head_dim // 2] -> [4, 4]
pos_cis_input = pos_cis_precomputed[:seq_len]

print(f"--- 预计算 pos_cis ---")
print(f"预计算的 pos_cis 形状 (用于输入): {pos_cis_input.shape}\n")


--- Demo 参数 ---
Batch Size: 1, Seq Len: 4, Head Dim: 8
Query Heads: 2, KV Heads: 1

--- 预计算 pos_cis ---
预计算的 pos_cis 形状 (用于输入): torch.Size([4, 4])



In [4]:
# --- 3. 创建模拟的 Q 和 K 张量 ---
xq = torch.randn(batch_size, seq_len, n_heads, head_dim)
xk = torch.randn(batch_size, seq_len, n_kv_heads, head_dim)

print(f"--- 原始张量 (Batch 0, Head 0) ---")
print("原始 XQ (前4维):")
print(xq[0, :, 0, :4])
# --- 4. 应用 RoPE 旋转 ---
xq_rotated, xk_rotated = apply_rotary_emb(xq, xk, pos_cis_input)
print(f"\n--- 旋转后张量 (Batch 0, Head 0) ---")
print("旋转后 XQ (前4维):")
print(xq_rotated[0, :, 0, :4])
print("-> 可以看到数值已经完全不同了\n")


--- 原始张量 (Batch 0, Head 0) ---
原始 XQ (前4维):
tensor([[ 1.9269,  1.4873,  0.9007, -2.1055],
        [ 1.6423, -0.1596, -0.4974,  0.4396],
        [-1.3847, -0.8712, -0.2234,  1.7174],
        [-0.9138, -0.6581,  0.0780,  0.5258]])

--- 旋转后张量 (Batch 0, Head 0) ---
旋转后 XQ (前4维):
tensor([[ 1.9269,  1.4873,  0.9007, -2.1055],
        [ 1.0216,  1.2957, -0.5110,  0.4236],
        [ 1.3684, -0.8965, -0.3315,  1.6998],
        [ 0.9976,  0.5226,  0.0279,  0.5308]])
-> 可以看到数值已经完全不同了



In [None]:
# --- 5. 验证 RoPE 的核心特性 ---
# 验证 1: 范数 (Norm) 保持不变
# RoPE 是纯旋转，不应改变向量的长度

# 将 [B, L, H, D] 视为 [B, L, H, D//2, 2]
xq_pairs_before = xq.float().reshape(*xq.shape[:-1], -1, 2)
xq_pairs_after = xq_rotated.float().reshape(*xq_rotated.shape[:-1], -1, 2)
# 计算每对 (x, y) 的L2范数: sqrt(x^2 + y^2)
norm_before = xq_pairs_before.norm(dim=-1)
norm_after = xq_pairs_after.norm(dim=-1)
print("--- 验证 1: 范数 (Norm) ---")
print("旋转前 XQ 范数 (Head 0, Pos 0):")
print(norm_before[0, 0, 0, :])
print("旋转后 XQ 范数 (Head 0, Pos 0):")
print(norm_after[0, 0, 0, :])
print(f"范数是否保持不变? {torch.allclose(norm_before, norm_after, atol=1e-6)}")

# 验证 2: 点积 (Dot Product) 发生变化
# 旋转改变了向量间的相对角度，因此点积会变

# 取 Q 在位置 0 和 K 在位置 1 (不同位置)
q0 = xq[0, 0, 0, :]       # B=0, L=0, H=0
k1 = xk[0, 1, 0, :]       # B=0, L=1, H=0 (KV头会广播)
q0_rotated = xq_rotated[0, 0, 0, :]
k1_rotated = xk_rotated[0, 1, 0, :]
dot_before = torch.dot(q0, k1)
dot_after = torch.dot(q0_rotated, k1_rotated)

print(f"\n--- 验证 2: 点积 (Dot Product) ---")
print(f"Q[pos=0] · K[pos=1] (旋转前): {dot_before.item():.4f}")
print(f"Q[pos=0] · K[pos=1] (旋转后): {dot_after.item():.4f}")
print(f"点积是否发生变化? {not torch.allclose(dot_before, dot_after)}")