# 手撕 Transformer：从零实现面试速通教程

本教程面向 AI/算法工程师面试，目标是“手撕 Transformer”时能在白板/编辑器中快速、正确、可讲解地实现关键模块与完整骨架。

你将学习并实现：
- Scaled Dot-Product Attention（带 mask）
- Multi-Head Attention（MHA）
- Position-wise Feed Forward（FFN）
- 残差连接 + LayerNorm
- 位置编码（Positional Encoding）
- EncoderLayer / DecoderLayer
- Transformer Encoder-Decoder 总装
- 贪心解码（Greedy Decode）与一个极简玩具任务

建议：面试中优先保证“正确 + 清晰 + 注释完善 + 形状无误”。

# 环境与依赖

- Python ≥ 3.8
- 推荐使用 PyTorch（面试常用）
- 若无 torch，可按需安装或在纸上仅写伪代码/接口签名

下面代码会尝试导入 torch 并给出缺失提示。

In [1]:
# Import and quick check
try:
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    from torch import Tensor
    print(torch.__version__)
except Exception as e:
    print("[Warn] torch not available. You can still read/understand the code.")
    print(e)

2.8.0


# Scaled Dot-Product Attention（带 Mask）

## 核心思想
注意力机制的本质是**加权求和**：对于每个查询位置，计算它与所有键位置的相似度，然后用这些相似度作为权重对值进行加权平均。

## 输入张量及其含义
令：
- $Q\in\mathbb{R}^{B\times H\times T_q\times d_k}$：**查询（Query）张量**
  - $B$：批次大小（Batch size）
  - $H$：注意力头数（num Heads）
  - $T_q$：查询序列长度（Query sequence length）
  - $d_k$：每个头的键/查询维度（Key/Query dimension per head）
  
- $K\in\mathbb{R}^{B\times H\times T_k\times d_k}$：**键（Key）张量**
  - $T_k$：键序列长度（Key sequence length，通常等于值序列长度）
  
- $V\in\mathbb{R}^{B\times H\times T_k\times d_v}$：**值（Value）张量**
  - $d_v$：每个头的值维度（Value dimension per head，通常 $d_v=d_k$）

## 计算步骤

### 步骤 1: 计算注意力分数（缩放点积）
$$
\mathrm{scores} \,=\, \frac{QK^{\top}}{\sqrt{d_k}} \in \mathbb{R}^{B\times H\times T_q\times T_k}
$$

**维度分析：**
- $Q$: $(B, H, T_q, d_k)$
- $K^{\top}$: $(B, H, d_k, T_k)$ ← 转置最后两维
- $QK^{\top}$: $(B, H, T_q, T_k)$ ← 批量矩阵乘法
- 除以 $\sqrt{d_k}$ 进行缩放，防止点积过大导致梯度消失

**物理意义：** `scores[b,h,i,j]` 表示第 $b$ 个样本、第 $h$ 个头中，查询位置 $i$ 对键位置 $j$ 的**相似度得分**。

### 步骤 2: 应用掩码（Mask）
令 $M\in\{0,1\}^{B\times 1\times T_q\times T_k}$ 为可见性掩码（1=可见，0=不可见）。定义加性掩码：
$$
\tilde{M} \,=\, (1-M)\cdot (-\infty)
$$

将掩码加到分数上：
$$
\mathrm{scores}_{\text{masked}} = \mathrm{scores} + \tilde{M}
$$

**作用：** 被遮挡位置（$M=0$）的分数变为 $-\infty$，经过 softmax 后概率趋近 0，实现"屏蔽"效果。

### 步骤 3: Softmax 归一化
$$
\mathrm{attn} \,=\, \mathrm{softmax}(\mathrm{scores}_{\text{masked}})\in \mathbb{R}^{B\times H\times T_q\times T_k}
$$

**操作：** 对最后一维（$T_k$ 维）做 softmax，使得对每个查询位置 $i$，所有键位置的权重和为 1：
$$
\sum_{j=1}^{T_k} \mathrm{attn}[b,h,i,j] = 1
$$

### 步骤 4: 加权求和输出
$$
\mathrm{out} \,=\, \mathrm{attn}* V\in \mathbb{R}^{B\times H\times T_q\times d_v}
$$

**维度分析：**
- $\mathrm{attn}$: $(B, H, T_q, T_k)$
- $V$: $(B, H, T_k, d_v)$
- $\mathrm{attn} \cdot V$: $(B, H, T_q, d_v)$ ← 批量矩阵乘法

**物理意义：** 输出的每个位置 $i$ 是所有值位置的**加权平均**，权重由注意力分数决定。

## 数值稳定性技巧
- 使用 `float('-inf')` 近似 $-\infty$，使被遮挡位置在 softmax 后概率趋近 0
- 缩放因子 $\sqrt{d_k}$ 防止点积值过大，避免 softmax 饱和导致梯度消失

**可视化结构：**

![Scaled Dot-Product Attention](Scaled_dot-product_attention.png)

上图展示了缩放点积注意力的计算流程：输入 Q、K、V 经过矩阵乘法、缩放、Mask、Softmax，最后加权求和得到输出。

In [2]:
import math
from typing import Optional

class ScaledDotProductAttention(nn.Module):
    def __init__(self, dropout: float = 0.0):
        super().__init__()
        self.dropout = nn.Dropout(dropout)

    def forward(self, Q: Tensor, K: Tensor, V: Tensor, mask: Optional[Tensor] = None) -> tuple[Tensor, Tensor]:
        """
        Q: (B, H, T_q, d_k)
        K: (B, H, T_k, d_k)
        V: (B, H, T_k, d_v)
        mask: (B, 1, T_q, T_k) 或 (B, H, T_q, T_k), 1表示可见, 0表示遮挡
        返回: (out, attn)
          out: (B, H, T_q, d_v)
          attn: (B, H, T_q, T_k)
        """
        d_k = Q.size(-1)
        scores = (Q @ K.transpose(-2, -1)) / math.sqrt(d_k)  # (B,H,T_q,T_k)
        if mask is not None:
            # 将不可见位置置为 -inf
            scores = scores.masked_fill(mask == 0, float('-inf'))
        attn = scores.softmax(dim=-1)
        attn = self.dropout(attn)
        out = attn @ V  # (B,H,T_q,d_v)
        return out, attn

# quick shape test (no torch run here if not installed)
if 'torch' in globals():
    B, H, T_q, T_k, d_k, d_v = 2, 4, 5, 6, 8, 8
    Q = torch.randn(B, H, T_q, d_k)
    K = torch.randn(B, H, T_k, d_k)
    V = torch.randn(B, H, T_k, d_v)
    mask = torch.ones(B, 1, T_q, T_k)
    attn = ScaledDotProductAttention()
    out, w = attn(Q, K, V, mask)
    print(out.shape, w.shape)  # expect: (2,4,5,8) (2,4,5,6)

torch.Size([2, 4, 5, 8]) torch.Size([2, 4, 5, 6])


# Multi-Head Attention（MHA）

## 核心思想
单头注意力只能学习一种模式，多头注意力通过**并行运行多个注意力头**，让模型同时关注不同的表示子空间，捕获更丰富的特征关系。

## 参数设定
- $H$：头数（num_heads）
- $d_{\text{model}}$：模型总维度（embedding dimension）
- $d_k = d_{\text{model}}/H$：每个头的维度（dimension per head）

**设计原则：** 保持总参数量不变，$H \times d_k = d_{\text{model}}$

## 输入输出
- **输入：** $X\in\mathbb{R}^{B\times T\times d_{\text{model}}}$
  - $B$：批次大小
  - $T$：序列长度
  - $d_{\text{model}}$：特征维度（如 512）
  
- **输出：** $\mathrm{MHA}(X)\in\mathbb{R}^{B\times T\times d_{\text{model}}}$（形状不变）

## 计算流程

### 步骤 1: 线性投影（生成 Q, K, V）
对每个头 $i\in\{1,\dots,H\}$，分别投影：
$$
Q_i = X W_Q^{(i)},\quad K_i = X W_K^{(i)},\quad V_i = X W_V^{(i)}
$$

**权重矩阵：**
- $W_Q^{(i)}, W_K^{(i)}, W_V^{(i)}\in\mathbb{R}^{d_{\text{model}}\times d_k}$

**维度变化：**
- $X$: $(B, T, d_{\text{model}})$
- $W_Q^{(i)}$: $(d_{\text{model}}, d_k)$
- $Q_i = X W_Q^{(i)}$: $(B, T, d_k)$

**实现技巧（本教程）：** 实际代码中使用 $W_Q\in\mathbb{R}^{d_{\text{model}}\times d_{\text{model}}}$ 一次性投影，再 reshape 分头：
$$
Q_{\text{all}} = X W_Q \;\in\; \mathbb{R}^{B\times T\times d_{\text{model}}} \;\xrightarrow{\text{reshape}}\; \mathbb{R}^{B\times T\times H\times d_k} \;\xrightarrow{\text{transpose}}\; \mathbb{R}^{B\times H\times T\times d_k}
$$

### 步骤 2: 头内注意力（并行计算）
对每个头独立计算缩放点积注意力：
$$
\mathrm{head}_i = \mathrm{Attention}(Q_i, K_i, V_i) = \mathrm{softmax}\!\left(\frac{Q_i K_i^{\top}}{\sqrt{d_k}} + \tilde{M}\right)V_i
$$

**维度：**
- 输入 $Q_i, K_i, V_i$: $(B, H, T, d_k)$（已包含所有头）
- 输出 $\mathrm{head}_i$: $(B, H, T, d_k)$

### 步骤 3: 拼接所有头
将 $H$ 个头的输出沿特征维拼接：
$$
\mathrm{Concat}(\mathrm{head}_1,\dots,\mathrm{head}_H) \;\in\; \mathbb{R}^{B\times T\times (H\cdot d_k)} = \mathbb{R}^{B\times T\times d_{\text{model}}}
$$

**操作：** 
- 转置：$(B, H, T, d_k) \to (B, T, H, d_k)$
- Reshape：$(B, T, H, d_k) \to (B, T, H \times d_k)$

### 步骤 4: 输出投影
通过线性层映射回原维度：
$$
\mathrm{MHA}(X) = \mathrm{Concat}(\mathrm{head}_1,\dots,\mathrm{head}_H)\, W_O
$$

**权重矩阵：**
- $W_O\in\mathbb{R}^{(H\cdot d_k)\times d_{\text{model}}} = \mathbb{R}^{d_{\text{model}}\times d_{\text{model}}}$

**最终输出：** $(B, T, d_{\text{model}})$

## 自注意力 vs 交叉注意力
- **自注意力（Self-Attention）：** $Q, K, V$ 都来自同一输入 $X$
- **交叉注意力（Cross-Attention）：** $Q$ 来自一个输入，$K, V$ 来自另一个输入（如 Decoder 中 $Q$ 来自 Decoder，$K,V$ 来自 Encoder）

## 参数量分析
每个 MHA 模块的参数：
- $W_Q, W_K, W_V$: $3 \times d_{\text{model}} \times d_{\text{model}}$
- $W_O$: $d_{\text{model}} \times d_{\text{model}}$
- **总计：** $4 d_{\text{model}}^2$ 参数（不含偏置）

**可视化结构：**

![Multi-Head Attention](Multi-Head_Attention.png)

上图展示了多头注意力的完整流程：输入经过线性投影分成多个头，每个头独立进行注意力计算，最后将所有头的输出拼接并通过线性层映射回原始维度。

In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, num_heads: int, dropout: float = 0.0):
        super().__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

        self.attn = ScaledDotProductAttention(dropout)
        self.dropout = nn.Dropout(dropout)

    def _split_heads(self, x: Tensor) -> Tensor:
        # x: (B,T,d_model) -> (B,H,T,d_k)
        B, T, _ = x.shape
        x = x.view(B, T, self.num_heads, self.d_k).transpose(1, 2)
        return x

    def _combine_heads(self, x: Tensor) -> Tensor:
        # x: (B,H,T,d_k) -> (B,T,d_model)
        B, H, T, d_k = x.shape
        x = x.transpose(1, 2).contiguous().view(B, T, H * d_k)
        return x

    def forward(self, x_q: Tensor, x_kv: Tensor, mask: Optional[Tensor] = None) -> tuple[Tensor, Tensor]:
        """
        x_q: (B,T_q,d_model)
        x_kv: (B,T_k,d_model)
        mask: (B,1,T_q,T_k) 或 (B,H,T_q,T_k)
        返回: (out, attn)
        """
        Q = self._split_heads(self.W_q(x_q))  # (B,H,T_q,d_k)
        K = self._split_heads(self.W_k(x_kv)) # (B,H,T_k,d_k)
        V = self._split_heads(self.W_v(x_kv)) # (B,H,T_k,d_k)

        out, attn = self.attn(Q, K, V, mask)   # out: (B,H,T_q,d_k)
        out = self._combine_heads(out)         # (B,T_q,d_model)
        out = self.W_o(out)                    # (B,T_q,d_model)
        out = self.dropout(out)
        return out, attn

# quick shape test
if 'torch' in globals():
    B, T_q, T_k, d_model, H = 2, 5, 6, 32, 4
    x_q = torch.randn(B, T_q, d_model)
    x_kv = torch.randn(B, T_k, d_model)
    mask = torch.ones(B, 1, T_q, T_k)
    mha = MultiHeadAttention(d_model, H)
    y, a = mha(x_q, x_kv, mask)
    print(y.shape, a.shape)  # expect: (2,5,32) (2,4,5,6)

torch.Size([2, 5, 32]) torch.Size([2, 4, 5, 6])


# Positional Encoding（位置编码）

## 为什么需要位置编码？
注意力机制本身是**置换不变**的（permutation-invariant）：交换序列顺序，输出也会相应交换，但注意力权重不变。为了让模型感知位置信息（如"猫吃鱼"和"鱼吃猫"的区别），需要显式注入位置信息。

## 两种常见实现方式
1. **固定位置编码（Sinusoidal）**：使用正弦/余弦函数，无需学习参数（本教程采用）
2. **可学习位置编码**：`nn.Embedding(max_len, d_model)`，需要训练

## 正弦/余弦位置编码公式
对于位置 $\mathrm{pos}\in\{0,1,\dots,T-1\}$ 和维度索引 $i\in\{0,1,\dots,\lfloor\tfrac{d_{\text{model}}}{2}\rfloor-1\}$：

$$
\begin{aligned}
\mathrm{PE}[\mathrm{pos},\,2i] &\;=\; \sin\!\left(\frac{\mathrm{pos}}{10000^{\frac{2i}{d_{\text{model}}}}}\right), \\[0.5em]
\mathrm{PE}[\mathrm{pos},\,2i+1] &\;=\; \cos\!\left(\frac{\mathrm{pos}}{10000^{\frac{2i}{d_{\text{model}}}}}\right).
\end{aligned}
$$

### 公式解析
- **偶数维度**（$2i$）：使用正弦函数
- **奇数维度**（$2i+1$）：使用余弦函数
- **频率**：$\omega_i = \frac{1}{10000^{2i/d_{\text{model}}}}$
  - 低维度（$i$ 小）：高频振荡，捕捉局部位置差异
  - 高维度（$i$ 大）：低频振荡，捕捉远距离位置关系

### 维度示例
假设 $d_{\text{model}}=512$，$T=100$（序列长度）：
- $\mathrm{PE}$ 形状：$(T, d_{\text{model}}) = (100, 512)$
- `PE[0, :]`：位置 0 的编码向量（512 维）
- `PE[:, 0]`：所有位置在第 0 维的编码值

## 使用方式
将位置编码**直接加**到词嵌入上：
$$
X_{\text{pos}} = X_{\text{embed}} + \mathrm{PE}
$$

**维度匹配：**
- $X_{\text{embed}}$: $(B, T, d_{\text{model}})$ ← 词嵌入
- $\mathrm{PE}$: $(1, T, d_{\text{model}})$ ← 位置编码（广播到批次维）
- $X_{\text{pos}}$: $(B, T, d_{\text{model}})$ ← 最终输入

## 为什么使用正弦/余弦？
1. **相对位置关系：** $\mathrm{PE}_{\mathrm{pos}+k}$ 可以表示为 $\mathrm{PE}_{\mathrm{pos}}$ 的线性函数
2. **外推能力：** 理论上可以处理比训练时更长的序列
3. **无需学习：** 减少参数量，避免过拟合

## 实现细节
```python
# 生成位置索引
position = torch.arange(0, max_len).unsqueeze(1)  # (T, 1)

# 生成频率项
div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
# 等价于: 10000^(-2i/d_model)

# 填充 PE 矩阵
pe[:, 0::2] = torch.sin(position * div_term)  # 偶数列
pe[:, 1::2] = torch.cos(position * div_term)  # 奇数列
```

In [4]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 5000, dropout: float = 0.0):
        super().__init__()
        pe = torch.zeros(max_len, d_model)  # (T, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # (T,1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))  # (1,T,d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: Tensor) -> Tensor:
        # x: (B,T,d_model)
        T = x.size(1)
        x = x + self.pe[:, :T, :]
        return self.dropout(x)

# quick test
if 'torch' in globals():
    pe = PositionalEncoding(32)
    x = torch.zeros(2, 10, 32)
    y = pe(x)
    print(y.shape)  # (2,10,32)

torch.Size([2, 10, 32])


# FFN + LayerNorm + 残差连接

## Position-wise Feed-Forward Network (FFN)

### 结构
两层全连接网络，对每个位置**独立**处理（不跨位置交互）：
$$
\mathrm{FFN}(x) = W_2\,\sigma(W_1 x + b_1) + b_2
$$

**参数：**
- $W_1\in\mathbb{R}^{d_{\text{model}}\times d_{\mathrm{ff}}}$：第一层权重（扩张）
- $W_2\in\mathbb{R}^{d_{\mathrm{ff}}\times d_{\text{model}}}$：第二层权重（压缩）
- $\sigma$：激活函数（ReLU 或 GELU）

### 维度变化
$$
(B, T, d_{\text{model}}) \xrightarrow{W_1} (B, T, d_{\mathrm{ff}}) \xrightarrow{\sigma} (B, T, d_{\mathrm{ff}}) \xrightarrow{W_2} (B, T, d_{\text{model}})
$$

**典型值：** $d_{\mathrm{ff}} = 4 \times d_{\text{model}}$（如 512 → 2048 → 512）

### 作用
- **非线性变换**：引入非线性，增强表达能力
- **特征混合**：每个位置独立地在高维空间中进行特征变换
- **位置独立**：与注意力的"跨位置交互"形成互补

---

## 残差连接（Residual Connection）
$$
\text{output} = x + \text{Sublayer}(x)
$$

**作用：**
1. **缓解梯度消失**：梯度可直接通过恒等映射反向传播
2. **简化学习**：子层只需学习"残差"（变化量），而非完整映射
3. **稳定训练**：允许堆叠更深的网络

---

## Layer Normalization（层归一化）
对每个样本的特征维度做归一化：
$$
\mathrm{LN}(x) = \gamma \odot \frac{x - \mu}{\sqrt{\sigma^2 + \epsilon}} + \beta
$$

**计算：**
- $\mu = \frac{1}{d_{\text{model}}}\sum_{i=1}^{d_{\text{model}}} x_i$：均值
- $\sigma^2 = \frac{1}{d_{\text{model}}}\sum_{i=1}^{d_{\text{model}}} (x_i - \mu)^2$：方差
- $\gamma, \beta\in\mathbb{R}^{d_{\text{model}}}$：可学习的缩放和偏移参数
- $\epsilon$：数值稳定项（如 $10^{-5}$）

**作用：** 稳定训练，加速收敛，减少对初始化的敏感度

---

## 两种组合顺序

### Post-LN（本教程采用，原论文）
$$
y = \mathrm{LN}\big(x + \mathrm{Sublayer}(x)\big)
$$

**流程：** 子层输出 → 残差连接 → LayerNorm

**特点：**
- 梯度直接流经子层，训练初期可能不稳定
- 需要 warm-up 学习率策略

### Pre-LN（现代常用）
$$
y = x + \mathrm{Sublayer}(\mathrm{LN}(x))
$$

**流程：** LayerNorm → 子层 → 残差连接

**特点：**
- 更稳定，易于训练深层网络
- 无需 warm-up，对学习率不敏感
- GPT-2/3、BERT 等现代模型多采用此方式

---

## 完整子层结构（Post-LN）
```
输入 x (B, T, d_model)
    ↓
子层(MHA/FFN) → sublayer_out
    ↓
x + sublayer_out  (残差)
    ↓
LayerNorm
    ↓
输出 y (B, T, d_model)
```

**参数量：**
- FFN: $2 \times d_{\text{model}} \times d_{\mathrm{ff}}$ （约 $8 d_{\text{model}}^2$）
- LayerNorm: $2 \times d_{\text{model}}$（$\gamma$ 和 $\beta$）

In [5]:
class FeedForward(nn.Module):
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.0, activation: str = 'relu'):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)
        if activation == 'relu':
            self.act = nn.ReLU()
        elif activation == 'gelu':
            self.act = nn.GELU()
        else:
            raise ValueError('activation must be relu or gelu')

    def forward(self, x: Tensor) -> Tensor:
        return self.fc2(self.dropout(self.act(self.fc1(x))))

class ResidualLayerNorm(nn.Module):
    def __init__(self, d_model: int, eps: float = 1e-5):
        super().__init__()
        self.ln = nn.LayerNorm(d_model, eps=eps)

    def forward(self, x: Tensor, sublayer_out: Tensor) -> Tensor:
        # x + sublayer(x) 再做 LN
        return self.ln(x + sublayer_out)

# quick test
if 'torch' in globals():
    ff = FeedForward(32, 64)
    x = torch.randn(2, 10, 32)
    y = ff(x)
    print(y.shape)  # (2,10,32)
    ln = ResidualLayerNorm(32)
    z = ln(x, y)
    print(z.shape)  # (2,10,32)

torch.Size([2, 10, 32])
torch.Size([2, 10, 32])


# EncoderLayer / DecoderLayer

## 符号约定
设自注意力函数：
$$
\mathrm{Att}(Q,K,V,M)=\mathrm{softmax}\!\left(\tfrac{QK^\top}{\sqrt{d_k}}+\tilde{M}\right)V
$$

其中 $M$ 是可见性掩码（mask）。

---

## EncoderLayer（编码器层）

### 结构
由两个子层组成：
1. **多头自注意力（Multi-Head Self-Attention）**
2. **前馈网络（FFN）**

每个子层后都有**残差连接 + LayerNorm**。

### 数学表达（Post-LN）
输入 $X\in\mathbb{R}^{B\times T_s\times d_{\text{model}}}$（源序列）：

$$
\begin{aligned}
\tilde{x}_1 &= \mathrm{MHA}(X, X, X, M_{\text{src}}), \quad &\text{← 自注意力}\\
X' &= \mathrm{LN}\big(X + \tilde{x}_1\big), \quad &\text{← 残差+归一化}\\
\tilde{x}_2 &= \mathrm{FFN}(X'), \quad &\text{← 前馈网络}\\
Y &= \mathrm{LN}\big(X' + \tilde{x}_2\big). \quad &\text{← 残差+归一化}
\end{aligned}
$$

### 维度追踪
- 输入 $X$: $(B, T_s, d_{\text{model}})$
- 自注意力输出 $\tilde{x}_1$: $(B, T_s, d_{\text{model}})$
- 第一次 LN 后 $X'$: $(B, T_s, d_{\text{model}})$
- FFN 输出 $\tilde{x}_2$: $(B, T_s, d_{\text{model}})$
- 最终输出 $Y$: $(B, T_s, d_{\text{model}})$

**关键点：** 自注意力中 $Q=K=V$，都来自同一输入 $X$，无需外部记忆。

---

## DecoderLayer（解码器层）

### 结构
由三个子层组成：
1. **掩码多头自注意力（Masked Multi-Head Self-Attention）**
2. **编码器-解码器注意力（Cross-Attention）**
3. **前馈网络（FFN）**

每个子层后都有**残差连接 + LayerNorm**。

### 数学表达（Post-LN）
输入：
- $Y\in\mathbb{R}^{B\times T_t\times d_{\text{model}}}$：目标序列（已解码部分）
- $\mathrm{Mem}\in\mathbb{R}^{B\times T_s\times d_{\text{model}}}$：编码器输出（源序列的编码表示）

计算流程：

$$
\begin{aligned}
\tilde{y}_1 &= \mathrm{MHA}(Y, Y, Y, M_{\text{causal}}), \quad &\text{← 掩码自注意力}\\
Y' &= \mathrm{LN}\big(Y + \tilde{y}_1\big), \quad &\text{← 残差+归一化}\\
\tilde{y}_2 &= \mathrm{MHA}(Y', \mathrm{Mem}, \mathrm{Mem}, M_{\text{cross}}), \quad &\text{← 交叉注意力}\\
Y'' &= \mathrm{LN}\big(Y' + \tilde{y}_2\big), \quad &\text{← 残差+归一化}\\
\tilde{y}_3 &= \mathrm{FFN}(Y''), \quad &\text{← 前馈网络}\\
Z &= \mathrm{LN}\big(Y'' + \tilde{y}_3\big). \quad &\text{← 残差+归一化}
\end{aligned}
$$

### 维度追踪
- 输入 $Y$: $(B, T_t, d_{\text{model}})$
- 编码器记忆 $\mathrm{Mem}$: $(B, T_s, d_{\text{model}})$
- 掩码自注意力输出 $\tilde{y}_1$: $(B, T_t, d_{\text{model}})$
- 第一次 LN 后 $Y'$: $(B, T_t, d_{\text{model}})$
- 交叉注意力输出 $\tilde{y}_2$: $(B, T_t, d_{\text{model}})$ ← $Q$ 来自 $Y'$，$K,V$ 来自 $\mathrm{Mem}$
- 第二次 LN 后 $Y''$: $(B, T_t, d_{\text{model}})$
- FFN 输出 $\tilde{y}_3$: $(B, T_t, d_{\text{model}})$
- 最终输出 $Z$: $(B, T_t, d_{\text{model}})$

---

## 关键掩码机制

### 1. Encoder Padding Mask ($M_{\text{src}}$)
**作用：** 屏蔽填充位置（PAD token），防止注意力关注无效位置

**形状：** $(B, 1, T_s, T_s)$

**示例：** 若序列 `[3, 5, 7, PAD, PAD]`，则 mask 为：
```
[[1, 1, 1, 0, 0],
 [1, 1, 1, 0, 0],
 [1, 1, 1, 0, 0],
 [1, 1, 1, 0, 0],
 [1, 1, 1, 0, 0]]
```

### 2. Decoder Causal Mask ($M_{\text{causal}}$，下三角掩码）
**作用：** 在自回归解码时，**防止未来信息泄露**，位置 $i$ 只能看到位置 $\leq i$ 的信息

**形状：** $(1, 1, T_t, T_t)$（所有样本共享）

**示例：** $T_t=5$ 时的掩码：
```
[[1, 0, 0, 0, 0],    ← 位置0只能看自己
 [1, 1, 0, 0, 0],    ← 位置1可以看0,1
 [1, 1, 1, 0, 0],    ← 位置2可以看0,1,2
 [1, 1, 1, 1, 0],
 [1, 1, 1, 1, 1]]    ← 位置4可以看全部
```

### 3. Cross-Attention Mask ($M_{\text{cross}}$)
**作用：** 屏蔽编码器的填充位置，防止解码器关注源序列的无效位置

**形状：** $(B, 1, T_t, T_s)$

**注意：** 查询长度为 $T_t$（目标），键长度为 $T_s$（源）

---

## Encoder vs Decoder 对比

| 特性 | EncoderLayer | DecoderLayer |
|------|--------------|--------------|
| 自注意力类型 | 双向（可见全部位置） | 单向（仅可见历史） |
| 注意力层数 | 1（自注意力） | 2（自注意力 + 交叉注意力） |
| 掩码类型 | Padding mask | Causal + Padding mask |
| 输入依赖 | 仅源序列 | 源序列 + 目标序列 |
| 并行性 | 完全并行 | 训练并行，推理串行 |

---

## 参数量分析（单层）
- **EncoderLayer:**
  - MHA: $4d_{\text{model}}^2$
  - FFN: $8d_{\text{model}}^2$
  - LayerNorm: $4d_{\text{model}}$
  - **总计:** ≈ $12d_{\text{model}}^2$

- **DecoderLayer:**
  - Masked MHA: $4d_{\text{model}}^2$
  - Cross MHA: $4d_{\text{model}}^2$
  - FFN: $8d_{\text{model}}^2$
  - LayerNorm: $6d_{\text{model}}$
  - **总计:** ≈ $16d_{\text{model}}^2$

In [None]:
def make_pad_mask(q_len: int, k_len: int, q_pad: Tensor | None, k_pad: Tensor | None) -> Tensor:
    """
    构造 padding mask（1 可见, 0 屏蔽），形状 (B,1,q_len,k_len)
    q_pad/k_pad: (B,T) 中 1 表示 pad 位置
    """
    if q_pad is None and k_pad is None:
        return None
    if q_pad is None:
        q_mask = torch.zeros_like(k_pad)
    else:
        q_mask = q_pad
    if k_pad is None:
        k_mask = torch.zeros_like(q_mask)
    else:
        k_mask = k_pad
    # 可见位置=1，即非pad
    q_visible = (q_mask == 0).unsqueeze(2)  # (B,T_q,1)
    k_visible = (k_mask == 0).unsqueeze(1)  # (B,1,T_k)
    mask = q_visible & k_visible            # (B,T_q,T_k)
    return mask.unsqueeze(1)                # (B,1,T_q,T_k)


def make_subsequent_mask(T: int) -> Tensor:
    """Decoder 自注意力的下三角可见性掩码（1 可见, 0 屏蔽），形状 (1,1,T,T)"""
    return torch.tril(torch.ones(T, T, dtype=torch.bool)).unsqueeze(0).unsqueeze(0)


class EncoderLayer(nn.Module):
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.ffn = FeedForward(d_model, d_ff, dropout)
        self.norm1 = ResidualLayerNorm(d_model)
        self.norm2 = ResidualLayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: Tensor, src_mask: Optional[Tensor] = None) -> tuple[Tensor, Tensor]:
        # Self-Attention
        sa_out, sa_w = self.self_attn(x, x, src_mask)
        x = self.norm1(x, self.dropout(sa_out))
        # FFN
        ff_out = self.ffn(x)
        x = self.norm2(x, self.dropout(ff_out))
        return x, sa_w


class DecoderLayer(nn.Module):
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.cross_attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.ffn = FeedForward(d_model, d_ff, dropout)
        self.norm1 = ResidualLayerNorm(d_model)
        self.norm2 = ResidualLayerNorm(d_model)
        self.norm3 = ResidualLayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, y: Tensor, memory: Tensor, tgt_mask: Optional[Tensor], memory_mask: Optional[Tensor]) -> tuple[Tensor, tuple[Tensor, Tensor]]:
        # Masked Self-Attention (decoder)
        sa_out, sa_w = self.self_attn(y, y, tgt_mask)
        y = self.norm1(y, self.dropout(sa_out))
        # Cross-Attention: Q=decoder, K/V=encoder memory
        ca_out, ca_w = self.cross_attn(y, memory, memory_mask)
        y = self.norm2(y, self.dropout(ca_out))
        # FFN
        ff_out = self.ffn(y)
        y = self.norm3(y, self.dropout(ff_out))
        return y, (sa_w, ca_w)


# 总装：Transformer Encoder-Decoder
- 词嵌入 + 位置编码
- N 层 EncoderLayer / DecoderLayer 堆叠
- 输出线性层映射到词表大小
- 解码时使用贪心或 beam search（本教程实现贪心）

**Transformer 整体架构：**

![Transformer Architecture](attention_architerture.png)

上图展示了完整的 Transformer Encoder-Decoder 架构：
- **左侧 Encoder**：输入嵌入 + 位置编码 → N×(多头自注意力 + FFN)
- **右侧 Decoder**：输出嵌入 + 位置编码 → N×(掩码多头自注意力 + 编码器-解码器注意力 + FFN)
- **输出层**：线性映射 + Softmax 生成目标词表概率分布

注意每个子层后都有残差连接和 LayerNorm。

In [None]:
class Transformer(nn.Module):
    def __init__(self, src_vocab: int, tgt_vocab: int, d_model: int = 256, num_heads: int = 8,
                 d_ff: int = 512, num_layers: int = 4, dropout: float = 0.1, max_len: int = 512):
        super().__init__()
        # src_vocab: 源端（输入）词表大小，例如包含 PAD/BOS/EOS 等特殊 token
        # tgt_vocab: 目标端（输出）词表大小，用于最后的线性投影到词表概率
        self.src_embed = nn.Embedding(src_vocab, d_model)
        self.tgt_embed = nn.Embedding(tgt_vocab, d_model)
        self.pos_enc = PositionalEncoding(d_model, max_len, dropout)

        self.encoder_layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
        ])
        self.decoder_layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
        ])
        self.out_proj = nn.Linear(d_model, tgt_vocab)

    def encode(self, src: Tensor, src_pad: Optional[Tensor] = None) -> tuple[Tensor, list[Tensor]]:
        # src: (B,T_s), src_pad: (B,T_s) 1=pad
        x = self.pos_enc(self.src_embed(src))  # (B,T_s,d_model)
        attn_weights = []
        src_len = src.size(1)
        src_mask = make_pad_mask(src_len, src_len, src_pad, src_pad)  # (B,1,T,T)
        for layer in self.encoder_layers:
            x, sa_w = layer(x, src_mask)
            attn_weights.append(sa_w)
        return x, attn_weights

    def decode(self, tgt: Tensor, memory: Tensor, src_pad: Optional[Tensor] = None, tgt_pad: Optional[Tensor] = None) -> tuple[Tensor, list[tuple[Tensor, Tensor]]]:
        # tgt: (B,T_t)
        y = self.pos_enc(self.tgt_embed(tgt))
        T_t = tgt.size(1)
        B, T_s = memory.size(0), memory.size(1)
        # masks
        pad_mask = make_pad_mask(T_t, T_t, tgt_pad, tgt_pad)            # (B,1,T_t,T_t)
        subs_mask = make_subsequent_mask(T_t).to(y.device)              # (1,1,T_t,T_t)
        tgt_mask = pad_mask & subs_mask if pad_mask is not None else subs_mask
        mem_mask = make_pad_mask(T_t, T_s, tgt_pad, src_pad)            # (B,1,T_t,T_s)

        attn_pairs = []
        for layer in self.decoder_layers:
            y, (sa_w, ca_w) = layer(y, memory, tgt_mask, mem_mask)
            attn_pairs.append((sa_w, ca_w))
        return y, attn_pairs

    def forward(self, src: Tensor, tgt_inp: Tensor, src_pad: Optional[Tensor] = None, tgt_pad: Optional[Tensor] = None) -> Tensor:
        memory, _ = self.encode(src, src_pad)
        y, _ = self.decode(tgt_inp, memory, src_pad, tgt_pad)
        logits = self.out_proj(y)
        return logits

    @torch.no_grad()
    def greedy_decode(self, src: Tensor, bos_id: int, eos_id: int, max_new_tokens: int,
                      src_pad: Optional[Tensor] = None) -> Tensor:
        """
        Greedy decode using the Transformer (no beam search).

        用途:
        - 在推理阶段从编码器记忆中逐步生成目标序列。
        - 每步选择当前模型概率最高的下一个 token（argmax），直到生成 EOS 或达到最大长度。

        参数说明:
        - src (Tensor): 源序列输入，形状 (B, T_s)，元素为 token id。
        - bos_id (int): 解码起始符 (BOS) 的 token id，用于初始化生成序列。
        - eos_id (int): 结束符 (EOS) 的 token id，遇到后可停止生成（对所有样本均为 EOS 时提前终止）。
        - max_new_tokens (int): 最多生成的新 token 数量（不包括初始 BOS）。
        - src_pad (Optional[Tensor]): 可选的源端 padding 标志，形状 (B, T_s)，1 表示 PAD，用于构造 encoder/decoder 的 mask（若为 None 则不使用 pad 屏蔽）。

        返回:
        - Tensor: 生成的 token id 序列，形状 (B, T_out)，通常包含初始 BOS 和随后生成的 token（可能包含 EOS）。
        """
        self.eval()
        memory, _ = self.encode(src, src_pad)
        B = src.size(0)
        ys = torch.full((B, 1), bos_id, dtype=torch.long, device=src.device)
        for _ in range(max_new_tokens):
            y, _ = self.decode(ys, memory, src_pad, tgt_pad=None)
            logits = self.out_proj(y)  # (B,T,d_vocab)
            next_token = logits[:, -1].argmax(dim=-1, keepdim=True)  # (B,1)
            ys = torch.cat([ys, next_token], dim=1)
            if (next_token == eos_id).all():
                break
        return ys

# 极简玩具任务：Copy Task（验证前向/反向是否正确）
任务：输入序列 [a b c]，输出也为 [a b c]。
- 词表：{PAD=0, BOS=1, EOS=2, 其他 3..V-1}
- 损失：交叉熵（忽略 PAD）
- 只训练少量步数，演示损失可下降

In [8]:
import random

def make_copy_batch(batch_size: int, seq_len: int, vocab_size: int, pad_id: int = 0, bos_id: int = 1, eos_id: int = 2):
    """构造一批 copy 样本。返回 src,tgt_inp,tgt_out, 以及 pad mask。"""
    src = []
    tgt_inp = []
    tgt_out = []
    for _ in range(batch_size):
        toks = [random.randint(3, vocab_size - 1) for _ in range(seq_len)]
        src.append(toks)
        # tgt: 以 BOS 开始，后接相同序列，最后 EOS
        tgt_inp.append([bos_id] + toks)
        tgt_out.append(toks + [eos_id])
    src = torch.tensor(src, dtype=torch.long)
    tgt_inp = torch.tensor(tgt_inp, dtype=torch.long)
    tgt_out = torch.tensor(tgt_out, dtype=torch.long)
    # 无 pad，这里简单起见
    src_pad = torch.zeros_like(src)
    tgt_pad = torch.zeros_like(tgt_inp)
    return src, tgt_inp, tgt_out, src_pad, tgt_pad

# 训练演示（可选）
if 'torch' in globals():
    torch.manual_seed(0)
    V = 100
    model = Transformer(src_vocab=V, tgt_vocab=V, d_model=128, num_heads=4, d_ff=256, num_layers=2, dropout=0.1)
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    optim = torch.optim.Adam(model.parameters(), lr=3e-4)

    for step in range(50):  # 小步数演示
        model.train()
        src, tgt_inp, tgt_out, src_pad, tgt_pad = make_copy_batch(batch_size=16, seq_len=5, vocab_size=V)
        logits = model(src, tgt_inp, src_pad, tgt_pad)     # (B,T+1,V)
        loss = criterion(logits.reshape(-1, V), tgt_out.reshape(-1))
        optim.zero_grad()
        loss.backward()
        optim.step()
        if (step + 1) % 10 == 0:
            print(f"step {step+1}: loss={loss.item():.4f}")

    # 贪心解码测试
    model.eval()
    src, tgt_inp, tgt_out, src_pad, tgt_pad = make_copy_batch(batch_size=2, seq_len=5, vocab_size=V)
    pred = model.greedy_decode(src, bos_id=1, eos_id=2, max_new_tokens=6)
    print("src:", src)
    print("pred:", pred)

step 10: loss=4.4004
step 20: loss=4.2882
step 30: loss=4.2305
step 40: loss=4.0967
step 30: loss=4.2305
step 40: loss=4.0967
step 50: loss=4.0603
src: tensor([[46, 37, 72, 78, 88],
        [33, 99, 70, 35,  9]])
pred: tensor([[ 1, 46, 46, 46,  2],
        [ 1, 46, 46,  2,  2]])
step 50: loss=4.0603
src: tensor([[46, 37, 72, 78, 88],
        [33, 99, 70, 35,  9]])
pred: tensor([[ 1, 46, 46, 46,  2],
        [ 1, 46, 46,  2,  2]])


# 复杂度、易错点与面试答题要点

## 时间复杂度分析

### 注意力机制的主要复杂度
$$
\mathcal{O}\big(B\,\cdot\,H\,\cdot\,T_q\,\cdot\,T_k\,\cdot\,d_k\big)
$$

**分解：**
- $B$：批次大小
- $H$：头数
- $T_q \times T_k$：计算 $QK^{\top}$ 的矩阵乘法
- $d_k$：每个头的维度

**自注意力情况**（$T_q = T_k = T$，$d_k = d_{\text{model}}/H$）：
$$
\mathcal{O}\big(B\,\cdot\,H\,\cdot\,T^2\,\cdot\,\frac{d_{\text{model}}}{H}\big) = \mathcal{O}\big(B\,\cdot\,T^2\,\cdot\,d_{\text{model}}\big)
$$

**瓶颈：** $T^2$ 项导致序列长度的**二次复杂度**，这是标准 Transformer 的主要限制。

### 其他操作的复杂度
| 操作 | 复杂度 | 说明 |
|------|--------|------|
| 线性投影（$XW$） | $\mathcal{O}(B \cdot T \cdot d^2)$ | $d=d_{\text{model}}$ |
| FFN | $\mathcal{O}(B \cdot T \cdot d \cdot d_{\mathrm{ff}})$ | 通常 $d_{\mathrm{ff}}=4d$ |
| LayerNorm | $\mathcal{O}(B \cdot T \cdot d)$ | 轻量级操作 |

**结论：** 当 $T$ 较大时，注意力的 $T^2$ 项占主导地位。

---

## 空间复杂度（显存占用）

### 注意力权重矩阵
$$
\mathcal{O}(B\,\cdot\,H\,\cdot\,T_q\,\cdot\,T_k)
$$

**影响：**
- 存储所有注意力权重用于反向传播
- 自注意力时为 $\mathcal{O}(B \cdot H \cdot T^2)$
- 长序列（$T>1000$）时显存消耗显著

### 优化方向
- **FlashAttention**：融合操作，减少中间激活存储
- **Sparse Attention**：仅计算部分注意力权重
- **Gradient Checkpointing**：重新计算代替存储

---

## 易错点清单（面试高频）

### 1. MHA 头部分割/合并时的维度变换
**错误示例：**
```python
# 错误：直接 view 可能导致内存不连续
x = x.view(B, T, H, d_k).transpose(1, 2)
out = out.transpose(1, 2).view(B, T, d_model)  # 可能报错
```

**正确做法：**
```python
# 分头：先 view 再 transpose
x = x.view(B, T, H, d_k).transpose(1, 2)  # (B,H,T,d_k)

# 合头：transpose 后必须 contiguous()
out = out.transpose(1, 2).contiguous().view(B, T, d_model)
```

**原因：** `transpose` 改变步长（stride），需要 `contiguous()` 使内存连续后才能 `view`。

---

### 2. Mask 的取值约定（1=可见 vs 1=遮挡）
**本教程约定：** `mask[i,j]=1` 表示位置 $j$ **可见**，`0` 表示**遮挡**

**实现：**
```python
scores = scores.masked_fill(mask == 0, float('-inf'))
```

**注意：** 不同框架/论文可能约定相反，务必统一！

---

### 3. Decoder 自注意力的下三角 Mask
**目的：** 防止未来信息泄露（如生成第3个词时不能看到第4、5个词）

**生成方式：**
```python
mask = torch.tril(torch.ones(T, T))  # 下三角全1
```

**形状：** $(1, 1, T, T)$ 或 $(T, T)$（广播）

**易错：** 忘记在推理时也需要此 mask！

---

### 4. Cross-Attention 的 Q/K/V 来源
**正确理解：**
- $Q$：来自 **Decoder 当前层的输出**（"我想查询什么"）
- $K, V$：来自 **Encoder 的输出记忆**（"从源序列中提取信息"）

**代码：**
```python
cross_out = MultiHeadAttention(
    x_q=decoder_hidden,    # Query from decoder
    x_kv=encoder_memory    # Key/Value from encoder
)
```

---

### 5. 残差连接 + LayerNorm 的顺序
**Post-LN（原论文）：**
```python
x = LayerNorm(x + Sublayer(x))
```

**Pre-LN（现代常用）：**
```python
x = x + Sublayer(LayerNorm(x))
```

**面试要点：** 能说明两者差异和适用场景。

---

### 6. 位置编码长度要足够
**问题：** 若 `max_len=512` 但输入序列长度为 600，会越界！

**解决：**
```python
# 动态裁剪
T = x.size(1)
x = x + self.pe[:, :T, :]  # 只取前 T 个位置
```

---

## 面试快速讲解结构（建议话术）

### 1. 总体架构（30秒）
> "Transformer 由 Encoder-Decoder 组成。Encoder 用多头自注意力捕获源序列的全局依赖，Decoder 在生成时通过掩码自注意力保证自回归特性，并用交叉注意力融合源序列信息。每个子层后都有残差连接和 LayerNorm。"

### 2. 核心公式（1分钟）
> "注意力的核心是缩放点积：$\mathrm{Attention}(Q,K,V) = \mathrm{softmax}(\frac{QK^{\top}}{\sqrt{d_k}})V$。多头注意力通过 $H$ 个并行头捕获不同子空间的特征，拼接后再投影。位置编码用正弦余弦函数注入位置信息。"

### 3. 形状追踪（关键！）
> "输入 $(B,T,d_{\text{model}})$ → 投影并分头为 $(B,H,T,d_k)$ → 注意力计算 $(B,H,T,T)$ 的权重矩阵 → 加权求和得 $(B,H,T,d_v)$ → 合头回 $(B,T,d_{\text{model}})$。"

### 4. Mask 机制（必考）
> "Encoder 用 padding mask 屏蔽 PAD。Decoder 有两种 mask：自注意力用下三角 causal mask 防止看到未来，交叉注意力用 padding mask 屏蔽源序列的 PAD。"

### 5. 复杂度（加分项）
> "自注意力的复杂度是 $\mathcal{O}(T^2 \cdot d)$，$T^2$ 是瓶颈。长序列场景可用 Sparse Attention、Linformer、Performer 等优化。"

---

## 可扩展点（展示深度理解）

1. **相对位置编码（RPE）**：如 T5、XLNet 的相对位置偏置
2. **Pre-LN vs Post-LN**：训练稳定性差异
3. **RoPE（旋转位置编码）**：LLaMA 等模型采用，外推能力强
4. **FlashAttention**：IO 优化，加速 2-4 倍
5. **Efficient Transformer**：Linformer、Performer、Reformer 等 $\mathcal{O}(T)$ 变体
6. **参数共享**：ALBERT 跨层共享参数降低模型大小

---

## 维度速查表（面试快速核对）

| 符号 | 含义 | 典型值 |
|------|------|--------|
| $B$ | Batch size | 32, 64 |
| $T$ | 序列长度 | 128, 512 |
| $d_{\text{model}}$ | 模型维度 | 512, 768 |
| $H$ | 注意力头数 | 8, 12 |
| $d_k = d_{\text{model}}/H$ | 每头维度 | 64 |
| $d_{\mathrm{ff}}$ | FFN 隐藏层 | $4d_{\text{model}}$ |
| $V$ | 词表大小 | 30k-50k |

**记忆技巧：** "BTD-HK" → Batch-Time-Dmodel-Heads-dK