Attention 的实现

In [None]:
import torch
import math
from torch import Tensor
from jaxtyping import Float

def run_scaled_dot_product_attention(
    Q: Float[Tensor, " ... queries d_k"],
    K: Float[Tensor, " ... keys d_k"],
    V: Float[Tensor, " ... values d_v"],
    mask: Float[Tensor, " ... queries keys"] | None = None,
) -> Float[Tensor, " ... queries d_v"]:
    """
    缩放点积注意力 (Scaled Dot-Product Attention) 实现
    
    这是 Transformer 架构的核心组件，通过查询(Q)、键(K)、值(V)三个矩阵
    计算注意力权重，实现序列中不同位置间的信息交互。
    
    数学公式:
    Attention(Q,K,V) = softmax(QK^T / √d_k)V
    
    计算步骤:
    1. 计算 Q 和 K 的点积得分矩阵
    2. 缩放 (除以 √d_k)
    3. 应用掩码 (如果提供)
    4. 应用 softmax 得到注意力权重
    5. 用权重对 V 进行加权求和

    参数:
        Q (Float[Tensor, " ... queries d_k"]): 查询张量
            - queries: 查询序列长度，通常等于目标序列长度
            - d_k: 查询/键的特征维度
            - 例如: (batch, num_heads, seq_len_q, d_k)
            
        K (Float[Tensor, " ... keys d_k"]): 键张量  
            - keys: 键序列长度，通常等于源序列长度
            - d_k: 必须与查询的 d_k 相同，确保点积计算有效
            - 例如: (batch, num_heads, seq_len_k, d_k)
            
        V (Float[Tensor, " ... values d_v"]): 值张量
            - values: 值序列长度，必须与键序列长度相同
            - d_v: 值的特征维度，可以与 d_k 不同
            - 例如: (batch, num_heads, seq_len_k, d_v)
            
        mask (Float[Tensor, " ... queries keys"] | None): 掩码张量
            - 形状: (..., queries, keys)
            - 值为 1.0 表示允许注意力，0.0 表示禁止注意力
            - 用于实现因果掩码、填充掩码等
            - 例如: (batch, num_heads, seq_len_q, seq_len_k)

    返回:
        Float[Tensor, " ... queries d_v"]: 注意力输出
            - 形状与查询的前几维相同，最后一维为 d_v
            - 例如: (batch, num_heads, seq_len_q, d_v)
    """
    
    # 步骤1: 计算注意力得分矩阵 (Q·K^T)
    # torch.einsum("... q d, ... k d -> ... q k", Q, K) 等价于:
    # Q @ K.transpose(-2, -1)
    # 
    # 输入形状:
    #   Q: (..., queries, d_k)
    #   K: (..., keys, d_k)  
    # 输出形状:
    #   score: (..., queries, keys)
    #
    # 物理意义: score[i,j] 表示第i个查询对第j个键的"原始兴趣度"
    score = torch.einsum("... q d, ... k d -> ... q k", Q, K) / math.sqrt(K.size(-1))
    
    # 等价的矩阵乘法写法 (注释掉的代码):
    # score = (Q @ K.transpose(-2, -1)) * (1.0 / math.sqrt(K.size(-1)))
    
    # 步骤2: 缩放因子 √d_k 的作用
    # 除以 √d_k 是为了防止点积值过大，导致 softmax 饱和
    # 
    # 原理: 如果 Q 和 K 的元素是独立同分布的，方差为 σ²
    # 那么点积 Q·K 的方差约为 d_k·σ²
    # 除以 √d_k 将方差稳定在 σ²，避免梯度消失
    
    # 步骤3: 应用掩码 (如果提供)
    if mask is not None:
        # masked_fill: 将 mask==0.0 位置的得分设为负无穷
        # 这样在 softmax 后，这些位置的注意力权重会变成 0
        #
        # 常见掩码类型:
        # 1. 因果掩码: 防止未来信息泄露 (下三角矩阵)
        # 2. 填充掩码: 忽略填充 token
        # 3. 自定义掩码: 特定的注意力模式
        score = score.masked_fill(mask == 0.0, float('-inf'))
        
        # 注释掉的替代写法:
        # score = score.masked_fill(mask == False, float('-inf'))
    
    # 步骤4: 应用 softmax 得到注意力权重
    # 沿最后一个维度(keys维度)进行 softmax
    # 确保每个查询对所有键的注意力权重和为 1
    #
    # 形状变化:
    #   输入: (..., queries, keys) - 原始得分
    #   输出: (..., queries, keys) - 概率分布，每行和为1
    #
    # 物理意义: score[i,j] 现在表示第i个查询对第j个键的注意力权重
    score = run_softmax(score, dim=-1)
    
    # 步骤5: 计算最终的注意力输出
    # score @ V: 用注意力权重对值向量进行加权平均
    #
    # 输入形状:
    #   score: (..., queries, keys) - 注意力权重矩阵
    #   V: (..., keys, d_v) - 值矩阵
    # 输出形状:
    #   att: (..., queries, d_v) - 加权后的特征
    #
    # 物理意义: 
    # att[i] = Σ_j score[i,j] * V[j]
    # 第i个查询的输出是所有值向量的加权平均，权重由注意力决定
    att = score @ V
    
    return att


一般是多头的，rope在做点积前做

In [None]:
import torch
from torch import nn
import einx
from einops import rearrange

class Multihead_self_attention(nn.Module):
    """
    多头自注意力机制 (Multi-Head Self-Attention)
    
    这是 Transformer 架构的核心组件，通过多个并行的注意力头来捕捉
    不同表示子空间中的依赖关系，增强模型的表达能力。
    
    核心思想:
    1. 将输入通过线性投影得到 Q、K、V
    2. 将 Q、K、V 分割成多个头
    3. 每个头独立计算注意力
    4. 拼接所有头的输出
    5. 通过输出投影得到最终结果
    
    数学公式:
    MultiHead(Q,K,V) = Concat(head_1, ..., head_h)W^O
    其中 head_i = Attention(QW_i^Q, KW_i^K, VW_i^V)
    """
    
    def __init__(self, 
                 d_model: int, 
                 num_heads: int, 
                 pos_encode: RotaryPositionalEmbedding | None = None, 
                 theta: float | None = None):
        """
        初始化多头自注意力模块
        
        参数:
            d_model (int): 模型的特征维度，必须能被 num_heads 整除
                          例如: 512, 768, 1024 等
            num_heads (int): 注意力头的数量
                           例如: 8, 12, 16 等
            pos_encode (RotaryPositionalEmbedding | None): RoPE 位置编码器
                                                         如果不为 None，将应用旋转位置编码
            theta (float | None): RoPE 的基础角度参数
                                 如果不为 None，表示启用位置编码
        """
        super().__init__()
        
        # 确保模型维度能被头数整除，这样每个头的维度是整数
        assert d_model % num_heads == 0, f"d_model ({d_model}) must be divisible by num_heads ({num_heads})"
        
        # 保存关键参数
        self.d_model = d_model           # 模型总维度
        self.num_heads = num_heads       # 注意力头数量
        self.d_k = d_model // num_heads  # 每个头的键/查询维度
        self.d_v = self.d_k             # 每个头的值维度，通常与 d_k 相等
        
        # 定义线性投影层
        # 注意：这些投影层的输出维度是 num_heads * d_k，而不是 d_k
        # 这样可以一次性为所有头生成 Q、K、V，然后再分割
        self.q_proj = Linear(self.d_model, self.num_heads * self.d_k)
        self.k_proj = Linear(self.d_model, self.num_heads * self.d_k) 
        self.v_proj = Linear(self.d_model, self.num_heads * self.d_v)
        
        # 输出投影：将所有头的结果拼接后投影回原始维度
        self.o_proj = Linear(self.num_heads * self.d_v, self.d_model)
        
        # 位置编码相关
        self.pos_encode = pos_encode  # RoPE 编码器实例
        self.theta = theta           # 位置编码参数，决定是否启用位置编码

    def forward(self, 
                x: torch.Tensor, 
                token_positions: torch.Tensor | None = None) -> torch.Tensor:
        """
        多头自注意力前向传播
        
        参数:
            x (torch.Tensor): 输入张量，形状 (..., seq_len, d_model)
                             通常是 (batch_size, seq_len, d_model)
            token_positions (torch.Tensor | None): 位置索引张量
                                                  如果为 None，自动生成 [0, 1, 2, ...]
                                                  用于 RoPE 位置编码
        
        返回:
            torch.Tensor: 注意力输出，形状与输入相同 (..., seq_len, d_model)
        """
        
        # 1. 解析输入张量的形状
        # *b 表示除了最后两个维度外的所有批次维度
        # 例如：(batch_size, seq_len, d_model) -> b=[batch_size], sequence_length=seq_len
        *b, sequence_length, d_model = x.size()
        assert d_model == self.d_model, f"Input d_model ({d_model}) != expected ({self.d_model})"
        
        # 2. 通过线性投影生成查询、键、值
        # 输入: (..., seq_len, d_model)
        # 输出: (..., seq_len, num_heads * d_k)
        Q = self.q_proj(x)  # 查询矩阵
        K = self.k_proj(x)  # 键矩阵  
        V = self.v_proj(x)  # 值矩阵
        
        # 3. 重塑张量以分离多个头
        # 将形状从 (..., seq_len, num_heads * d_k) 
        # 重塑为 (..., num_heads, seq_len, d_k)
        # 这样每个头可以独立处理
        Q = rearrange(Q, "... seq (heads d) -> ... heads seq d", heads=self.num_heads)
        K = rearrange(K, "... seq (heads d) -> ... heads seq d", heads=self.num_heads)
        V = rearrange(V, "... seq (heads d) -> ... heads seq d", heads=self.num_heads)
        
        # 4. 处理位置编码
        # 如果没有提供位置索引，自动生成连续的位置编码
        if token_positions is None:
            # 生成位置索引 [0, 1, 2, ..., sequence_length-1]
            # einx.rearrange 用于为批次维度添加维度
            token_positions = einx.rearrange(
                "seq -> b... seq", 
                torch.arange(sequence_length, device=x.device), 
                b=[1] * len(b)  # 为每个批次维度添加大小为1的维度
            )
        
        # 调整位置张量的形状以匹配多头结构
        # (..., seq_len) -> (..., 1, seq_len)
        # 添加头维度，使其能与 Q、K 广播
        token_positions = rearrange(token_positions, "... seq -> ... 1 seq")
        
        # 5. 应用旋转位置编码 (RoPE)
        # 只有当提供了 theta 参数时才应用位置编码
        if self.theta is not None:
            # 对查询和键应用位置编码，值不需要位置信息
            Q = self.pos_encode(Q, token_positions)
            K = self.pos_encode(K, token_positions)
        
        # 6. 构建因果掩码 (Causal Mask)
        # 创建下三角矩阵，防止模型看到未来的信息
        # torch.tril 创建下三角矩阵，上三角部分为0
        causal_mask = torch.tril(torch.ones(sequence_length, sequence_length, device=x.device))
        
        # 调整掩码形状以匹配注意力得分矩阵
        # (seq_len, seq_len) -> (1, 1, seq_len, seq_len)
        # 添加批次和头维度，便于广播
        causal_mask = causal_mask.view(1, 1, sequence_length, sequence_length)
        
        # 7. 计算缩放点积注意力
        # 每个头独立计算注意力，但可以并行处理
        # 输入形状: (..., num_heads, seq_len, d_k)
        # 输出形状: (..., num_heads, seq_len, d_v)
        att = run_scaled_dot_product_attention(Q=Q, K=K, V=V, mask=causal_mask)
        
        # 8. 拼接多头的输出
        # 将多头的结果拼接成一个大的特征向量
        # 形状变化: (..., num_heads, seq_len, d_v) -> (..., seq_len, num_heads * d_v)
        # .contiguous() 确保内存连续，提高后续计算效率
        att = rearrange(att, "... heads seq d_v -> ... seq (heads d_v)").contiguous()
        
        # 9. 输出投影
        # 将拼接后的多头特征投影回原始维度
        # (..., seq_len, num_heads * d_v) -> (..., seq_len, d_model)
        out = self.o_proj(att)
        
        return out

