## 预备知识点学习


Transformer 模型是本项目的核心，接下来，我们将从机制原理、核心组件、对话场景适配等方面逐步学习如何搭建出本项目的模型。

### 1. Transformer 概述

![Transformer 工作流程图](./images/02-1.png)

**Transformer** 模型在2017年由 google 提出，直接基于 Self-Attention 结构，取代了之前 NLP 任务中常用的 RNN 神经网络结构，并在 WMT2014 Englishto-German 和 WMT2014 English-to-French 两个机器翻译任务上都取得了当时的 **SOTA**(State of the Art)。

论文链接：[Attention Is All You Need](https://arxiv.org/abs/1706.03762)。


Transformer通过引入注意力机制来跟踪序列数据中的关系，从而学习上下文并理解含义。与传统的递归神经网络（RNN）或卷积神经网络（CNN）不同，它能够并行处理序列数据，显著提高了训练效率。这种架构的关键在于自注意力机制，它允许模型同时分析序列中的所有元素，而不仅仅是顺序处理，从而捕捉长距离的依赖关系。

Transformer模型由输入嵌入层、编码器、解码器以及输出层组成，其中编码器和解码器都包含多头注意力机制和前馈神经网络。位置编码被添加到输入序列中，以提供关于词序的信息，因为Transformer本身不具有顺序处理的能力。

这种架构的出现极大地推动了自然语言处理（NLP）领域的发展，是BERT、GPT等现代大型语言模型的基础。此外，Transformer主要应用于处理序列数据，比如自然语言文本，现已经扩展到计算机视觉领域，如ViT（Vision Transformer），显示了其在不同领域的广泛适用性。

Transformer 包括以下关键组件：
- **Self-Attention**：在序列内部建模相关性
- **Multi-Head**：并行多组注意力，学习互补关系
- **Positional Encoding**：为无序的注意力引入位置信息
- **Feed-Forward + Residual + LayerNorm**：稳定训练与非线性建模
- **Masking**：屏蔽 PAD 与未来信息（解码）

### 2. 注意力机制

#### 2.1 自注意力机制(Self-Attention)

自注意力机制(Self-Attention) 是Transformer模型的核心组成部分，它允许模型在处理序列数据时能够关注到输入序列中最重要的部分，而不依赖于固定的顺序或距离。

注意力机制有三个核心变量：**$Query$**（查询值）、**$Key$**（键值）和 **$Value$**（真值）。自注意力机制的核心思想是在处理序列数据时，序列中的每个词都同时作为查询（$Query$）、键（$Key$）和值（$Value$），通过计算每个词与所有其他词之间的相关性得到一个注意力权重，以表示其重要性。具体来说注意力机制可以分为以下几个步骤：

![注意力机制](./images/02-2.png)

1. **计算查询（Query）、键（Key）、值（Value）向量：** 

    假设输入序列为 $X = [x_1, x_2, \ldots, x_n]$，通过权重矩阵线性变换得到：  
    $$Q_i = W_q \cdot x_i, \quad K_i = W_K \cdot x_i, \quad V_i = W_V \cdot x_i$$
    其中 $Q_i, K_i, V_i$ 分别表示第 $i$ 个元素的查询、键和值向量。
2. **计算注意力分数：**
    
    对每一对元素 $(x_i, x_j)$，计算点积并进行缩放（$d_k$ 为键向量维度），得到注意力分数，该值表示二者间的相关程度：
    $$\text{score}(Q_i, K_j) = \frac{Q_i \cdot K_j}{\sqrt{d_k}}$$
3. **计算注意力权重：**
    通过 $softmax$ 函数对注意力分数进行归一化，得到权重 $w_{ij}$（使用 $softmax$ 函数是为了使得所有的注意力权重在 $[0,1]$ 之间，且和为 1。其可以帮助模型在处理数据时，聚焦于重要的部分，忽略无关的信息。）：
    $$w_{ij} = \text{softmax}\left(\text{score}(Q_i, K_j)\right)$$
4. **加权求和：**  
    将数据序列中的每个元素与其注意力权重进行加权求和，生成上下文相关的输出 $z_i$,输出向量融合了全局上下文信息。
    $$z_i = \sum_{j=1}^n w_{ij} V_j$$

综上所述，缩放点积自注意力机制可表示为（$Q$、$K$、$V$ 均为矩阵）：$$\text{Output} = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right) \cdot V $$

代码实现如下：

In [1]:
# 导入相关库
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

In [12]:
class ModelArgs:
    def __init__(self, vocab_size=None, block_size=128, n_layer=2, n_heads=2, n_embd=128, dropout=0.1):
        self.vocab_size = vocab_size
        self.block_size = block_size
        self.max_seq_len = block_size
        self.n_layer = n_layer
        self.n_heads = n_heads
        self.n_embd = n_embd
        self.dim = n_embd
        self.dropout = dropout

args = ModelArgs(
    vocab_size=1000,
    block_size=16,
    n_layer=2,
    n_heads=2,
    n_embd=64,
    dropout=0.1,
)

In [21]:
class SelfAttention(nn.Module):
    def __init__(self, embed_size):
        super(SelfAttention, self).__init__()
        self.embed_size = embed_size
        self.query = nn.Linear(embed_size, embed_size)
        self.key = nn.Linear(embed_size, embed_size)
        self.value = nn.Linear(embed_size, embed_size)
    
    def forward(self, x, mask=None):
        Q = self.query(x)  # (B, L, embed_size)
        K = self.key(x)     # (B, L, embed_size)
        V = self.value(x)   # (B, L, embed_size)
        
        scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.embed_size))
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        attn_weights = F.softmax(scores, dim=-1)
        output = torch.matmul(attn_weights, V)
        return output, attn_weights


# 示例：SelfAttention 输入/输出
x = torch.randn(2, 5, 16)
sa = SelfAttention(embed_size=16)
sa_out, sa_attn = sa(x)
print('[SelfAttention] input:', x.shape, 'output:', sa_out.shape, 'attn:', sa_attn.shape)

[SelfAttention] input: torch.Size([2, 5, 16]) output: torch.Size([2, 5, 16]) attn: torch.Size([2, 5, 5])


#### 2.2 掩码自注意力

掩码自注意力，即 Mask Self-Attention，是指使用注意力掩码的自注意力机制。掩码的作用是遮蔽一些特定位置的 token，模型在学习的过程中，会忽略掉被遮蔽的 token。

使用注意力掩码的核心动机是让模型只能使用历史信息进行预测而不能看到未来信息。Transformer 模型对一个文本序列，会不断根据之前的 token 来预测下一个 token，直到将整个文本序列补全。

例如，如果待学习的文本序列是 【BOS】I like you【EOS】（ BOS表示Begin of Sentence，EOS表示End of Sentence），那么，模型会按如下顺序进行预测和学习：

    Step 1：输入 【BOS】，输出 I
    Step 2：输入 【BOS】I，输出 like
    Step 3：输入 【BOS】I like，输出 you
    Step 4：输入 【BOS】I like you，输出 【EOS】


我们可以发现，上述过程是一个串行的过程，也就是需要先完成 Step 1，才能做 Step 2，接下来逐步完成整个序列的补全。如果对于每一个训练语料，模型都需要串行完成上述过程才能完成学习，那么很明显没有做到并行计算，计算效率很低。那么，如何解决呢？

针对这个问题，Transformer 就提出了掩码自注意力的方法。掩码自注意力会生成一串掩码，来遮蔽未来信息。例如，待学习的文本序列仍然是 【BOS】I like you【EOS】，使用的注意力掩码是【MASK】，那么模型的输入为：

    <BOS> 【MASK】【MASK】【MASK】【MASK】
    <BOS>    I   【MASK】 【MASK】【MASK】
    <BOS>    I     like  【MASK】【MASK】
    <BOS>    I     like    you  【MASK】
    <BOS>    I     like    you   <EOS>

在每一行输入中，模型仍然是只看到前面的 token，预测下一个 token。但是，上述输入不再是串行的过程，而可以一起并行地输入到模型中，模型只需要每一个样本根据未被遮蔽的 token 来预测下一个 token 即可，从而实现了并行的语言模型。

观察上述的掩码，我们可以发现其实则是一个和文本序列等长的上三角矩阵。我们可以简单地通过创建一个和输入同等长度的上三角矩阵作为注意力掩码，再使用掩码来遮蔽掉输入即可。也就是说，当输入维度为 （batch_size, seq_len, hidden_size）时，我们的 Mask 矩阵维度一般为 (1, seq_len, seq_len)（通过广播实现同一个 batch 中不同样本的计算）。

在具体实现中，我们通过以下代码生成 Mask 矩阵：

In [13]:
mask = torch.full((1, args.max_seq_len, args.max_seq_len), float("-inf"))  # full 函数创建一个 1 * seq_len * seq_len 的矩阵
mask = torch.triu(mask, diagonal=1)  # triu 函数的功能是创建一个上三角矩阵
print(mask)

tensor([[[0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf],
         [0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf],
         [0., 0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf],
         [0., 0., 0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf],
         [0., 0., 0., 0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf],
         [0., 0., 0., 0., 0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf],
         [0., 0., 0., 0., 0., 0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf],
         [0., 0., 0., 0., 0., 0., 0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., -inf, -inf, -inf, -inf, -inf, -inf],
         [0., 0., 0., 0., 0., 0., 0., 0.

生成的 Mask 矩阵会是一个上三角矩阵，上三角位置的元素均为 -inf，其他位置的元素置为0。

在注意力计算时，我们会将计算得到的注意力分数与这个掩码做和，再进行 Softmax 操作：

```python
# 此处的 scores 为计算得到的注意力分数，mask 为上文生成的掩码矩阵
scores = scores + mask[:, :seqlen, :seqlen]
scores = F.softmax(scores.float(), dim=-1).type_as(xq)
```

通过做求和，上三角区域（也就是应该被遮蔽的 token 对应的位置）的注意力分数结果都变成了 `-inf`，而下三角区域的分数不变。再做 Softmax 操作，`-inf` 的值在经过 Softmax 之后会被置为 0，从而忽略了上三角区域计算的注意力分数，从而实现了注意力遮蔽。

#### 2.3 多头注意力

注意力机制可以实现并行化与长期依赖关系拟合，但一次注意力计算只能拟合一种相关关系，单一的注意力机制很难全面拟合语句序列里的相关关系。因此 Transformer 使用了多头注意力机制（Multi-Head Attention），即同时对一个语料进行多次注意力计算，每次注意力计算都能拟合不同的关系，将最后的多次结果拼接起来作为最后的输出，即可更全面深入地拟合语言信息。

多头注意力机制其实就是将原始的输入序列进行多组的自注意力处理；然后再将每一组得到的自注意力结果拼接起来，再通过一个线性层进行处理，得到最终的输出。我们用公式可以表示为：

$$
\mathrm{MultiHead}(Q, K, V) = \mathrm{Concat}(\mathrm{head_1}, ...,
\mathrm{head_h})W^O    \\
    \text{where}~\mathrm{head_i} = \mathrm{Attention}(QW^Q_i, KW^K_i, VW^V_i)
$$

我们可以通过矩阵运算巧妙地实现并行的多头计算，其核心逻辑在于使用三个组合矩阵来代替了n个参数矩阵的组合，也就是矩阵内积再拼接其实等同于拼接矩阵再内积。具体实现可以参考下列代码：


In [15]:
class MultiHeadAttention(nn.Module):
    '''多头自注意力计算模块'''
    def __init__(self, args, is_causal=False):
        # 构造函数
        # args: 配置对象
        super().__init__()
        self.is_causal = is_causal
        # 隐藏层维度必须是头数的整数倍，因为后面我们会将输入拆成头数个矩阵
        assert args.dim % args.n_heads == 0
        # 模型并行处理大小，默认为1。
        model_parallel_size = 1
        # 本地计算头数，等于总头数除以模型并行处理大小。
        self.n_local_heads = args.n_heads // model_parallel_size
        # 每个头的维度，等于模型维度除以头的总数。
        self.head_dim = args.dim // args.n_heads

        # Wq, Wk, Wv 参数矩阵，每个参数矩阵为 n_embd x n_embd
        # 这里通过三个组合矩阵来代替了n个参数矩阵的组合，其逻辑在于矩阵内积再拼接其实等同于拼接矩阵再内积，
        self.wq = nn.Linear(args.dim, self.n_local_heads * self.head_dim, bias=False)
        self.wk = nn.Linear(args.dim, self.n_local_heads * self.head_dim, bias=False)
        self.wv = nn.Linear(args.dim, self.n_local_heads * self.head_dim, bias=False)
        # 输出权重矩阵，维度为 dim x n_embd（head_dim = n_embeds / n_heads）
        self.wo = nn.Linear(self.n_local_heads * self.head_dim, args.dim, bias=False)
        # 注意力的 dropout
        self.attn_dropout = nn.Dropout(args.dropout)
        # 残差连接的 dropout
        self.resid_dropout = nn.Dropout(args.dropout)
        
        # 创建一个上三角矩阵，用于遮蔽未来信息
        # 注意，因为是多头注意力，Mask 矩阵比之前我们定义的多一个维度
        if is_causal:
            mask = torch.full((1, 1, args.max_seq_len, args.max_seq_len), float("-inf"))
            mask = torch.triu(mask, diagonal=1)
            # 注册为模型的缓冲区
            self.register_buffer("mask", mask)

    def forward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor):

        # 获取批次大小和序列长度，[batch_size, seq_len, dim]
        bsz, seqlen, _ = q.shape

        # 计算查询（Q）、键（K）、值（V）,输入通过参数矩阵层，维度为 (B, T, n_embed) x (n_embed, n_embed) -> (B, T, n_embed)
        xq, xk, xv = self.wq(q), self.wk(k), self.wv(v)

        # 将 Q、K、V 拆分成多头，维度为 (B, T, n_head, C // n_head)，然后交换维度，变成 (B, n_head, T, C // n_head)
        # 因为在注意力计算中我们是取了后两个维度参与计算
        # 为什么要先按B*T*n_head*C//n_head展开再互换1、2维度而不是直接按注意力输入展开，是因为view的展开方式是直接把输入全部排开，
        # 然后按要求构造，可以发现只有上述操作能够实现我们将每个头对应部分取出来的目标
        xq = xq.view(bsz, seqlen, self.n_local_heads, self.head_dim)
        xk = xk.view(bsz, seqlen, self.n_local_heads, self.head_dim)
        xv = xv.view(bsz, seqlen, self.n_local_heads, self.head_dim)
        xq = xq.transpose(1, 2)
        xk = xk.transpose(1, 2)
        xv = xv.transpose(1, 2)

        # 注意力计算
        # 计算 QK^T / sqrt(d_k)，维度为 (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
        scores = torch.matmul(xq, xk.transpose(2, 3)) / math.sqrt(self.head_dim)
        # 掩码自注意力必须有注意力掩码
        if self.is_causal:
            assert hasattr(self, 'mask')
            # 这里截取到序列长度，因为有些序列可能比 max_seq_len 短
            scores = scores + self.mask[:, :, :seqlen, :seqlen]
        # 计算 softmax，维度为 (B, nh, T, T)
        scores = F.softmax(scores.float(), dim=-1).type_as(xq)
        # 做 Dropout
        scores = self.attn_dropout(scores)
        # V * Score，维度为(B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
        output = torch.matmul(scores, xv)

        # 恢复时间维度并合并头。
        # 将多头的结果拼接起来, 先交换维度为 (B, T, n_head, C // n_head)，再拼接成 (B, T, n_head * C // n_head)
        # contiguous 函数用于重新开辟一块新内存存储，因为Pytorch设置先transpose再view会报错，
        # 因为view直接基于底层存储得到，然而transpose并不会改变底层存储，因此需要额外存储
        output = output.transpose(1, 2).contiguous().view(bsz, seqlen, -1)

        # 最终投影回残差流。
        output = self.wo(output)
        output = self.resid_dropout(output)
        return output

In [16]:
# 示例：MultiHeadAttention 输入/输出（非因果与因果）
# 使用上文定义的 ModelArgs（args）

# 构造一个简单的输入：batch_size=2, seq_len=5, dim=args.dim
x = torch.randn(2, 5, args.dim)

# 非因果（Self-Attention 类似 Encoder 内部）
mha_non_causal = MultiHeadAttention(args, is_causal=False)
y_non = mha_non_causal(x, x, x)
print('[MHA non-causal] input:', x.shape, 'output:', y_non.shape)

# 因果（Mask Self-Attention，类似 Decoder 第一个子层）
mha_causal = MultiHeadAttention(args, is_causal=True)
y_causal = mha_causal(x, x, x)
print('[MHA causal] input:', x.shape, 'output:', y_causal.shape)

[MHA non-causal] input: torch.Size([2, 5, 64]) output: torch.Size([2, 5, 64])
[MHA causal] input: torch.Size([2, 5, 64]) output: torch.Size([2, 5, 64])


### 3. 位置编码

自注意力对顺序不敏感，需要引入位置信息。经典的正弦/余弦位置编码：
$$
\begin{aligned}
\mathrm{PE}(pos,2i) &= \sin\left(\frac{pos}{10000^{2i/d_{model}}}\right) \\
\mathrm{PE}(pos,2i+1) &= \cos\left(\frac{pos}{10000^{2i/d_{model}}}\right)
\end{aligned}
$$
​上式中，$pos$ 为 $token$ 在句子中的位置，$d_{model}$代表位置向量的维度，$i \in [0, d_{model})$代表位置$d_{model}$维位置向量第$i$维。$2i$ 和 $2i+1$ 则是指示了 token 是奇数位置还是偶数位置，从上式中我们可以看出对于奇数位置的 $token$ 和偶数位置的 $token$，Transformer 采用了不同的函数进行编码。

我们以一个简单的例子来说明位置编码的计算过程：假如我们输入的是一个长度为 4 的句子"I like to code"，我们可以得到下面的词向量矩阵 $\rm x$ ，其中每一行代表的就是一个词向量， $\rm x_0=[0.1,0.2,0.3,0.4]$ 对应的就是“I”的词向量，它的 $pos$ 就是为0，以此类推，第二行代表的是“like”的词向量，它的 $pos$ 就是1：

$$
\rm x = \begin{bmatrix} 0.1 & 0.2 & 0.3 & 0.4 \\ 0.2 & 0.3 & 0.4 & 0.5 \\ 0.3 & 0.4 & 0.5 & 0.6 \\ 0.4 & 0.5 & 0.6 & 0.7 \end{bmatrix}
$$

​则经过位置编码后的词向量为：

$$
\rm x_{PE} = \begin{bmatrix} 0.1 & 0.2 & 0.3 & 0.4 \\ 0.2 & 0.3 & 0.4 & 0.5 \\ 0.3 & 0.4 & 0.5 & 0.6 \\ 0.4 & 0.5 & 0.6 & 0.7 \end{bmatrix} + \begin{bmatrix} \sin(\frac{0}{10000^0}) & \cos(\frac{0}{10000^0}) & \sin(\frac{0}{10000^{2/4}}) & \cos(\frac{0}{10000^{2/4}}) \\ \sin(\frac{1}{10000^0}) & \cos(\frac{1}{10000^0}) & \sin(\frac{1}{10000^{2/4}}) & \cos(\frac{1}{10000^{2/4}}) \\ \sin(\frac{2}{10000^0}) & \cos(\frac{2}{10000^0}) & \sin(\frac{2}{10000^{2/4}}) & \cos(\frac{2}{10000^{2/4}}) \\ \sin(\frac{3}{10000^0}) & \cos(\frac{3}{10000^0}) & \sin(\frac{3}{10000^{2/4}}) & \cos(\frac{3}{10000^{2/4}}) \end{bmatrix} = \begin{bmatrix} 0.1 & 1.2 & 0.3 & 1.4 \\ 1.041 & 0.84 & 0.41 & 1.49 \\ 1.209 & -0.016 & 0.52 & 1.59 \\ 0.541 & -0.489 & 0.895 & 1.655 \end{bmatrix}
$$

我们可以使用如下的代码来获取上述例子的位置编码：

In [6]:
def PositionEncoding(seq_len, d_model, n=10000):
    P = np.zeros((seq_len, d_model))
    for k in range(seq_len):
        for i in np.arange(int(d_model/2)):
            denominator = np.power(n, 2*i/d_model)
            P[k, 2*i] = np.sin(k/denominator)
            P[k, 2*i+1] = np.cos(k/denominator)
    return P

P = PositionEncoding(seq_len=4, d_model=4, n=100)
print(P)

[[ 0.          1.          0.          1.        ]
 [ 0.84147098  0.54030231  0.09983342  0.99500417]
 [ 0.90929743 -0.41614684  0.19866933  0.98006658]
 [ 0.14112001 -0.9899925   0.29552021  0.95533649]]


基于上述原理，我们实现一个​位置编码层：

In [22]:
class PositionalEncoding(nn.Module):
    '''位置编码模块'''

    def __init__(self, args):
        super(PositionalEncoding, self).__init__()
        # Dropout 层
        # self.dropout = nn.Dropout(p=args.dropout)

        # block size 是序列的最大长度
        pe = torch.zeros(args.block_size, args.n_embd)
        position = torch.arange(0, args.block_size).unsqueeze(1)
        # 计算 theta
        div_term = torch.exp(
            torch.arange(0, args.n_embd, 2) * -(math.log(10000.0) / args.n_embd)
        )
        # 分别计算 sin、cos 结果
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        # 将位置编码加到 Embedding 结果上
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return x


# 示例：PositionalEncoding 输出
pe_layer = PositionalEncoding(args)
# 构造一个简单的输入：batch_size=2, seq_len=5, hidden=args.n_embd
x = torch.randn(2, 5, args.n_embd)
y = pe_layer(x.clone())
print('[PositionalEncoding] input:', x.shape, 'output:', y.shape)
print('mean |y - x|:', (y - x).abs().mean().item())

[PositionalEncoding] input: torch.Size([2, 5, 64]) output: torch.Size([2, 5, 64])
mean |y - x|: 0.5410805940628052


在下图中，我们画出了一种位置向量在第4、5、6、7维度、不同位置的的数值大小。横坐标表示位置下标，纵坐标表示数值大小。

![位置编码](./images/02-3.png)

### 4. Transformer 架构

在Transformer模型中，Encoder 和 Decoder 是两个核心组成部分，主要用于处理输入数据并生成输出。如果我们将模型看为一个黑匣子，那么编码器和解码器连接类似与下面一张图片展示了 Transformer 模型的详细结构图，分为编码器（Encoder）和解码器（Decoder）两部分。编码器负责将输入序列转换为上下文感知的表示，而解码器则根据编码器的输出和自身的输入生成目标序列。

![Transformer 结构图](./images/02-4.png)

#### 4.1 编码器（Encoder）

Encoder 由 N 个 Encoder Layer 组成，每一个 Encoder Layer 包括一个注意力层（Multi-Head Self-Attention）和一个前馈神经网络（Feed-Forward Neural Network）。每个子模块的输出通过残差连接（Residual Connection）与输入相加，并通过层归一化（Layer Normalization）进行标准化，以加速训练和提高稳定性。

In [28]:
class MLP(nn.Module):
    def __init__(self, in_features: int, out_features: int, dropout: float):
        super().__init__()
        hidden_features = max(4 * in_features, out_features)
        self.fc1 = nn.Linear(in_features, hidden_features)
        self.fc2 = nn.Linear(hidden_features, out_features)
        self.act = nn.GELU()
        self.drop = nn.Dropout(dropout)
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.fc1(x)
        x = self.act(x)
        x = self.drop(x)
        x = self.fc2(x)
        x = self.drop(x)
        return x

In [34]:
class LayerNorm(nn.Module):
    ''' Layer Norm 层'''
    def __init__(self, features, eps=1e-6):
        super().__init__()
        # 线性矩阵做映射
        self.a_2 = nn.Parameter(torch.ones(features))
        self.b_2 = nn.Parameter(torch.zeros(features))
        self.eps = eps

    def forward(self, x):
        # 在统计每个样本所有维度的值，求均值和方差
        mean = x.mean(-1, keepdim=True) # mean: [bsz, max_len, 1]
        std = x.std(-1, keepdim=True) # std: [bsz, max_len, 1]
        # 注意这里也在最后一个维度发生了广播
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2


class EncoderLayer(nn.Module):
    '''Encoder层'''
    def __init__(self, args):
        super().__init__()
        # 一个 Layer 中有两个 LayerNorm，分别在 Attention 之前和 MLP 之前
        self.attention_norm = LayerNorm(args.n_embd)
        # Encoder 不需要掩码，传入 is_causal=False
        self.attention = MultiHeadAttention(args, is_causal=False)
        self.fnn_norm = LayerNorm(args.n_embd)
        self.feed_forward = MLP(args.dim, args.dim, args.dropout)

    def forward(self, x):
        # Layer Norm
        norm_x = self.attention_norm(x)
        # 自注意力
        h = x + self.attention.forward(norm_x, norm_x, norm_x)
        # 经过前馈神经网络
        out = h + self.feed_forward.forward(self.fnn_norm(h))
        return out

class Encoder(nn.Module):
    '''Encoder 块'''
    def __init__(self, args):
        super(Encoder, self).__init__() 
        # 一个 Encoder 由 N 个 Encoder Layer 组成
        self.layers = nn.ModuleList([EncoderLayer(args) for _ in range(args.n_layer)])
        self.norm = LayerNorm(args.n_embd)

    def forward(self, x):
        "分别通过 N 层 Encoder Layer"
        for layer in self.layers:
            x = layer(x)
        return self.norm(x)

#### 4.2 解码器（Decoder）

类似的，我们也可以先搭建 Decoder Layer，再将 N 个 Decoder Layer 组装为 Decoder。但是和 Encoder 不同的是，Decoder 由两个注意力层和一个前馈神经网络组成。第一个注意力层是一个掩码自注意力层，即使用 Mask 的注意力计算，保证每一个 token 只能使用该 token 之前的注意力分数；第二个注意力层是一个多头注意力层，该层将使用第一个注意力层的输出作为 query，使用 Encoder 的输出作为 key 和 value，来计算注意力分数。最后，再经过前馈神经网络：

In [35]:
class DecoderLayer(nn.Module):
    '''解码层'''
    def __init__(self, args):
        super().__init__()
        # 一个 Layer 中有三个 LayerNorm，分别在 Mask Attention 之前、Self Attention 之前和 MLP 之前
        self.attention_norm_1 = LayerNorm(args.n_embd)
        # Decoder 的第一个部分是 Mask Attention，传入 is_causal=True
        self.mask_attention = MultiHeadAttention(args, is_causal=True)
        self.attention_norm_2 = LayerNorm(args.n_embd)
        # Decoder 的第二个部分是 类似于 Encoder 的 Attention，传入 is_causal=False
        self.attention = MultiHeadAttention(args, is_causal=False)
        self.ffn_norm = LayerNorm(args.n_embd)
        # 第三个部分是 MLP
        self.feed_forward = MLP(args.dim, args.dim, args.dropout)

    def forward(self, x, enc_out):
        # Layer Norm
        norm_x = self.attention_norm_1(x)
        # 掩码自注意力
        x = x + self.mask_attention.forward(norm_x, norm_x, norm_x)
        # 多头注意力
        norm_x = self.attention_norm_2(x)
        h = x + self.attention.forward(norm_x, enc_out, enc_out)
        # 经过前馈神经网络
        out = h + self.feed_forward.forward(self.ffn_norm(h))
        return out

class Decoder(nn.Module):
    '''解码器'''
    def __init__(self, args):
        super(Decoder, self).__init__() 
        # 一个 Decoder 由 N 个 Decoder Layer 组成
        self.layers = nn.ModuleList([DecoderLayer(args) for _ in range(args.n_layer)])
        self.norm = LayerNorm(args.n_embd)

    def forward(self, x, enc_out):
        "Pass the input (and mask) through each layer in turn."
        for layer in self.layers:
            x = layer(x, enc_out)
        return self.norm(x)

In [39]:
# 构造输入
x = torch.randn(2, 5, args.n_embd)

# Encoder 输入/输出
encoder = Encoder(args)
enc_out = encoder(x)
print('[Encoder] input:', x.shape, 'output:', enc_out.shape)

# Decoder 输入/输出（使用上面 encoder 的输出作为 KV）
decoder = Decoder(args)
dec_out = decoder(x, enc_out)
print('[Decoder] input:', x.shape, 'output:', dec_out.shape)

[Encoder] input: torch.Size([2, 5, 64]) output: torch.Size([2, 5, 64])
[Decoder] input: torch.Size([2, 5, 64]) output: torch.Size([2, 5, 64])


#### 4.3 简易 Transformer

基于之前所实现过的组件，我们就组合出一个简易的 Transformer 模型：

In [40]:
class Transformer(nn.Module):
    '''整体模型'''
    def __init__(self, args):
        super().__init__()
        # 必须输入词表大小和 block size
        assert args.vocab_size is not None
        assert args.block_size is not None
        self.args = args
        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(args.vocab_size, args.n_embd),
            wpe = PositionalEncoding(args),
            drop = nn.Dropout(args.dropout),
            encoder = Encoder(args),
            decoder = Decoder(args),
        ))
        # 最后的线性层，输入是 n_embd，输出是词表大小
        self.lm_head = nn.Linear(args.n_embd, args.vocab_size, bias=False)

        # 初始化所有的权重
        self.apply(self._init_weights)

        # 查看所有参数的数量
        print("number of parameters: %.2fM" % (self.get_num_params()/1e6,))

    '''统计所有参数的数量'''
    def get_num_params(self, non_embedding=False):
        # non_embedding: 是否统计 embedding 的参数
        n_params = sum(p.numel() for p in self.parameters())
        # 如果不统计 embedding 的参数，就减去
        if non_embedding:
            n_params -= self.transformer.wte.weight.numel()
        return n_params

    '''初始化权重'''
    def _init_weights(self, module):
        # 线性层和 Embedding 层初始化为正则分布
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
    
    '''前向计算函数'''
    def forward(self, idx, targets=None):
        # 输入为 idx，维度为 (batch size, sequence length, 1)；targets 为目标序列，用于计算 loss
        device = idx.device
        b, t = idx.size()
        assert t <= self.args.block_size, f"不能计算该序列，该序列长度为 {t}, 最大序列长度只有 {self.args.block_size}"

        # 通过 self.transformer
        # 首先将输入 idx 通过 Embedding 层，得到维度为 (batch size, sequence length, n_embd)
        print("idx",idx.size())
        # 通过 Embedding 层
        tok_emb = self.transformer.wte(idx)
        print("tok_emb",tok_emb.size())
        # 然后通过位置编码
        pos_emb = self.transformer.wpe(tok_emb) 
        # 再进行 Dropout
        x = self.transformer.drop(pos_emb)
        # 然后通过 Encoder
        print("x after wpe:",x.size())
        enc_out = self.transformer.encoder(x)
        print("enc_out:",enc_out.size())
        # 再通过 Decoder
        x = self.transformer.decoder(x, enc_out)
        print("x after decoder:",x.size())

        if targets is not None:
            # 训练阶段，如果我们给了 targets，就计算 loss
            # 先通过最后的 Linear 层，得到维度为 (batch size, sequence length, vocab size)
            logits = self.lm_head(x)
            # 再跟 targets 计算交叉熵
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
        else:
            # 推理阶段，我们只需要 logits，loss 为 None
            # 取 -1 是只取序列中的最后一个作为输出
            logits = self.lm_head(x[:, [-1], :]) # note: using list [-1] to preserve the time dim
            loss = None

        return logits, loss

让我们使用随机生成的 token 序列进行下一词预测演示

In [41]:
import time

torch.manual_seed(42)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 复用上文 args；如需更小词表可自行调整
model = Transformer(args).to(device)
model.train()
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)

batch_size = 4
seq_len = args.block_size
num_steps = 3  # 演示用，步数少以减少输出

start = time.time()
for step in range(1, num_steps + 1):
    # 构造随机序列（[0, vocab_size) 的 token id）
    idx = torch.randint(0, args.vocab_size, (batch_size, seq_len), dtype=torch.long, device=device)
    # 目标为下一词预测：targets[t] = idx[t+1]，最后一个位置忽略
    targets = idx.clone()
    targets[:, :-1] = idx[:, 1:]
    targets[:, -1] = -1  # ignore_index

    optimizer.zero_grad(set_to_none=True)
    logits, loss = model(idx, targets)
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
    optimizer.step()

    print(f'step {step:02d} | loss {loss.item():.4f}')

print('done, elapsed: %.2fs' % (time.time() - start))

number of parameters: 0.36M
idx torch.Size([4, 16])
tok_emb torch.Size([4, 16, 64])
x after wpe: torch.Size([4, 16, 64])
enc_out: torch.Size([4, 16, 64])
x after decoder: torch.Size([4, 16, 64])
step 01 | loss 6.9663
idx torch.Size([4, 16])
tok_emb torch.Size([4, 16, 64])
x after wpe: torch.Size([4, 16, 64])
enc_out: torch.Size([4, 16, 64])
x after decoder: torch.Size([4, 16, 64])
step 02 | loss 6.9663
idx torch.Size([4, 16])
tok_emb torch.Size([4, 16, 64])
x after wpe: torch.Size([4, 16, 64])
enc_out: torch.Size([4, 16, 64])
x after decoder: torch.Size([4, 16, 64])
step 03 | loss 6.9190
done, elapsed: 0.12s


完整的Transformer模型代码详见 ./Model/transformer.py