In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Union, Any

In [4]:
class GroupQueryAttention(nn.Module):
    """
    Group Query Attention (GQA) 实现
    
    GQA是一种改进的注意力机制，通过将注意力头分组来减少计算复杂度。
    它将所有的查询头(Q)保留，但将键(K)和值(V)头分组，每组共享相同的K和V。
    
    主要优势：
    1. 减少内存使用：K和V的参数量减少
    2. 提高计算效率：减少矩阵乘法操作
    3. 保持性能：在大多数任务上表现接近标准多头注意力
    """
    def __init__(self, d_model, n_heads, n_groups):
        '''
        Group Query Attention 初始化
        
        Args:
            d_model: 模型的隐藏维度，即输入特征的维度
            n_heads: 注意力头的总数量
            n_groups: 分组数量，每组共享相同的K和V投影
        '''
        super().__init__()
        # 确保维度能够正确分组
        assert d_model % n_groups == 0, "d_model必须能被n_groups整除"
        assert n_heads % n_groups == 0, "n_heads必须能被n_groups整除"
        
        self.d_model = d_model
        self.n_heads = n_heads
        self.n_groups = n_groups
        
        # 计算每个头的维度
        self.head_dim = d_model // n_heads
        # 缩放因子，用于防止梯度消失
        self.scale = self.head_dim ** -0.5
        # 每组中的头数量
        self.n_heads_per_group = n_heads // n_groups
        
        # 投影层定义
        # Q投影：所有头都有独立的Q投影
        self.q_proj = nn.Linear(d_model, d_model, bias=False)
        # K投影：每组共享一个K投影，所以维度是 n_heads_per_group * head_dim
        self.k_proj = nn.Linear(d_model, self.n_heads_per_group * self.head_dim, bias=False)
        # V投影：每组共享一个V投影，所以维度是 n_heads_per_group * head_dim
        self.v_proj = nn.Linear(d_model, self.n_heads_per_group * self.head_dim, bias=False)
        # 输出投影层
        self.out_proj = nn.Linear(d_model, d_model, bias=False)
        
    def forward(self, 
                x: torch.Tensor, 
                mask: Optional[torch.Tensor] = None, 
                dropout: float = 0.0
        ):
        """
        前向传播
        
        Args:
            x: 输入张量，形状为 (batch_size, seq_len, d_model)
            mask: 可选的注意力掩码，形状为 (seq_len, seq_len)
            dropout: dropout概率
            
        Returns:
            attn_output: 注意力输出，形状为 (batch_size, seq_len, d_model)
        """
        batch_size, seq_len, _ = x.shape
        
        # 步骤1: 线性投影得到Q、K、V
        # Q: 所有头都有独立的投影
        q = self.q_proj(x)  # (batch_size, seq_len, d_model)
        # K: 每组共享投影，所以维度较小
        k = self.k_proj(x)  # (batch_size, seq_len, n_heads_per_group * head_dim)
        # V: 每组共享投影，所以维度较小
        v = self.v_proj(x)  # (batch_size, seq_len, n_heads_per_group * head_dim)
        
        # 步骤2: 重塑张量维度
        # Q: 重塑为多头格式
        q = q.view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
        # 形状: (batch_size, n_heads, seq_len, head_dim)
        
        # K: 重塑为分组格式
        k = k.view(batch_size, seq_len, self.n_groups, self.head_dim).transpose(1, 2)
        # 形状: (batch_size, n_groups, seq_len, head_dim)
        
        # V: 重塑为分组格式
        v = v.view(batch_size, seq_len, self.n_groups, self.head_dim).transpose(1, 2)
        # 形状: (batch_size, n_groups, seq_len, head_dim)
        
        # 步骤3: 扩展K和V以匹配所有头
        # 将每组共享的K扩展给该组的所有头
        k = k[:,:,None,:,:].expand(-1, -1, self.n_heads_per_group, -1, -1).reshape(batch_size, self.n_heads, seq_len, self.head_dim)
        # 将每组共享的V扩展给该组的所有头
        v = v[:,:,None,:,:].expand(-1, -1, self.n_heads_per_group, -1, -1).reshape(batch_size, self.n_heads, seq_len, self.head_dim)
        
        # 步骤4: 计算注意力权重
        # Q @ K^T: 计算查询和键的相似度
        attn_weights = q @ k.transpose(-2, -1) * self.scale
        # 形状: (batch_size, n_heads, seq_len, seq_len)

        # 步骤5: 应用掩码（如果提供）
        if mask is not None:
            # 将掩码应用到注意力权重上，被掩码的位置设为负无穷
            attn_weights = attn_weights.masked_fill(mask == 0, float("-inf"))
        
        # 步骤6: Softmax归一化
        attn_weights = attn_weights.softmax(dim=-1)
        
        # 步骤7: 应用dropout
        attn_weights = F.dropout(attn_weights, p=dropout)
        
        # 步骤8: 计算注意力输出
        attn_output = attn_weights @ v
        # 形状: (batch_size, n_heads, seq_len, head_dim)
        
        # 步骤9: 重塑并应用输出投影
        # 将多头输出合并
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        # 应用最终的线性投影
        return self.out_proj(attn_output)

In [3]:
gqa = GroupQueryAttention(d_model=1024, n_heads=16, n_groups=4)
x = torch.randn(1, 1024, 1024)
mask = torch.tril(torch.ones(1024, 1024))
output = gqa(x, mask)
print(output.shape)

torch.Size([1, 1024, 1024])
