# 实现一个典型的 Transformer

在实现 Transformer 之前，需要简单的 pytorch 和深度学习相关的知识，推荐观看：

(1) [动手学深度学习（第二版）](https://zh.d2l.ai/) \
(2) [GPT是什么？直观解释Transformer](https://www.bilibili.com/video/BV13z421U7cs) \
(3) [Attention is all you need](https://arxiv.org/abs/1706.03762)


本文的核心讨论聚焦于算法代码实现层面，针对 Transformer 各部分模块的原理将不展开系统性论述，但会在相应章节为深度技术解析提供参考资料。

## 详细原理图

![Transformer Struct](img/Transformer-Struct.svg)

如图所示，实现一个典型的 Transformer 模型通常需要实现下面三个神经网络模块：
- **RMSNorm :** 方根误差归一化
- **Feed Forward :** 前馈神经网络
- **Muti-Head Attention :** 多头注意力

其中多头注意力机制依赖 **Scaled Dot-product Attention** (缩放点积注意力) 的实现。

此外，我们还要实现下面的函数：
- **Potitional Encoding Function :** 位置编码函数，用来标识输入数据的顺序。
- **Mask Matrix Generate Function :** 掩码矩阵生成函数，用来生成掩码矩阵。

---

## 准备工作

---

引入必要的库与定义必要的常量

In [None]:
import torch
import torch.nn.functional as F
from torch import nn
import math

## 三个神经网络模块的实现

---

### 1. RMSNorm

Root Mean Squared Error Normalized, 即均方根误差归一化。

#### (1) 计算均方差根（RMS）
$$
RMS(x) = \sqrt{\frac{1}{d}\sum^{d}_{i = 0}{x_i^{2}}+\epsilon}
$$

#### (2) 归一化输出向量
$$
\hat{x} = \frac{x}{RMS(x)}
$$

#### (3) 应用缩放参数
$$
RMSNorm(x) = \gamma \odot \hat{x}
$$

### 解释
- $x$ 是输入向量，维度为 $d$。
- $\epsilon$ 是一个很小的数，用于防止除零。
- $\gamma$ 是可学习的缩放参数，维度为 $d$。
- $\odot$ 表示元素级的乘法操作。


In [None]:
class RMSNorm(nn.Module):
    def __init__(self, dim:int, eps:float=1e-5):
        """
        Root Mean Squared Error Normalized

        :param dim: 维度
        :param eps: 公式中的 ε，很小的数，防止除零
        """
        super().__init__()
        self.eps = eps
        # weight 即缩放矩阵的权重
        self.weight = nn.Parameter(torch.ones(dim))

    def _rms(self, x):
        # rsqrt 是 sqrt 的倒数，即 rsqrt(x) = 1/sqrt(x)
        # 这里 mean() 中,dim = -1 ，指对最后一维求平均数，而 keepdim 指不改变该张量的维度
        return torch.rsqrt(x.pow(2).mean(dim=-1, keepdim=True) + self.eps)

    def _norm(self, x):
        return x * self._rms(x)

    def forward(self, x):
        return self.weight * self._norm(x.float()).type_as(x)

### 2. Feed Forward

前馈神经网络，就是一个两层的全连接神经网络。

In [None]:
class FeedForward(nn.Module):
    def __init__(self, dim:int, dim_hidden:int):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, dim_hidden, bias=False),
            nn.SiLU(),
            nn.Linear(dim_hidden, dim, bias=False),
        )

    def forward(self, x:torch.Tensor):
        return self.net(x)

### 3. Attention

#### (1) 实现缩放点积注意力 (Scaled Dot-product Attention) 神经网络模块

一个经典的注意力机制的网络结构如下所示：

![Classic Attention Struct](img/Classic-Attention-Struct.svg)

图中的 Q 表示 **查询（Query）** ，K 表示 **键（Key）** ，V 表示 **值（Value）** 。 注意力输出公式如下：

$$
Attention(x) = softmax(\frac{QK^T}{\sqrt{d_k}})V
$$

其中：

$$
Q = XW_q
$$
$$
K = XW_k
$$
$$
V = XW_v
$$

在这里的 $d_{k}$ 我们取输出层的维度（图中的 $D_{out}$），如下图所示：

![QKV Compute](./img/QKV-Compute.png)

有关 $d_{k}$ 取值的相关解释，推荐参考：[知乎-transformer中的attention为什么scaled?](https://www.zhihu.com/question/339723385/answer/3513306407)

In [None]:
class Attention(nn.Module):
    def __init__(self, dim_emb:int, dim_out:int):
        """
        Scaled Dot-Product Attention

        :param dim_emb: 数据的嵌入维度
        :param dim_out: 数据的输出维度
        """
        super().__init__()

        self.dim_emb = dim_emb
        self.dim_out = dim_out

        # 创建 Q、K、V 权重矩阵
        self.Q_net = nn.Linear(dim_emb, dim_out, bias=False)
        self.K_net = nn.Linear(dim_emb, dim_out, bias=False)
        self.V_net = nn.Linear(dim_emb, dim_out, bias=False)

    def forward(self, x_q, x_k, x_v, mask_mat = None):

        # 传入的 x_q、x_k、x_v 的维度为 (batch,seq_len,dim_emb)
        # 先计算 q、k、v 三个值，它们的维度为 (batch,seq_len,dim_out)
        q = self.Q_net(x_q)
        k = self.K_net(x_k)
        v = self.V_net(x_v)

        # 值得注意的是 q 的维度为 (batch,seq_len,dim_out)，k 转置后的维度为 (batch,dim_out,seq_len)，
        # 所以两者的矩阵乘法积 s 的维度为 (batch,seq_len,seq_len)，所以这里是一个方阵，就可以为接下来的矩阵
        # 遮掩做准备了。
        s = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(self.dim_out)

        # 检查是否需要遮掩数据
        if mask_mat is not None:
            # 进行遮掩
            s = torch.masked_fill(s, mask_mat, float('-inf'))

        # 计算 softmax(s1)*v 并返回，返回的维度为 (batch,seq_len,dim_out)
        return torch.matmul(F.softmax(s ,dim= -1), v).type_as(x_q)

#### (2) 实现多头注意力（Multi-Head Attention）神经网络模块

下面是多头注意力模块结构的示意图：

![Multi-Head Attention](img/Multi-Head-Attention-Struct.svg)

如上图所示，图中有 $N$ 个注意力模块，这里的 $N$ 就是 **多头注意力的头数** ，单个注意力的输出为 $out_{i}$ 。我们需要其直接拼接成一个大的矩阵：

$$
M = concat(out_{1},out_{2} ... ,out_{n})
$$

然后用拼接好的矩阵 $M$ 乘上一个权重矩阵 $W$ ，既为输出：

$$
Output = M \times W
$$

其中单个注意力模块的 $W_{q}/W_{k}/W_{v}$ 维度为 $D_{emb} \times D_{head}$，这里的 $D_{emb}$ 为输入 $X$ 的嵌入维度， $D_{head}$ 为 **头维度** ，这个值可以自行定义。

In [None]:
class MultiHeadAttention(nn.Module):

    def __init__(self, dim_emb:int, dim_head:int, dim_out:int, head_num:int):
        """
        Multi-Head Attention

        :param dim_emb: 数据的嵌入维度
        :param dim_head: 每个 Attention 块对应的头维度
        :param dim_out: 数据的输出维度
        :param head_num: 注意力头数
        """
        super().__init__()
        self.dim_emb = dim_emb
        self.dim_head = dim_head
        self.dim_out = dim_out
        self.head_num = head_num

        # 构建 head_num 个注意力块
        self.attention_blocks = nn.ModuleList()
        for i in range(head_num):
            self.attention_blocks.append(Attention(dim_emb, dim_head))

        # 构建 W 权重矩阵
        self.W_net = nn.Linear(dim_head * head_num, dim_out, bias=False)

    def forward(self, x_q, x_k, x_v, mask_mat = None):

        # 合并所有注意力模块的输出，单个注意力块的输出为 (batch,seq_len,dim_head)，一共有 head_num 个
        # 注意力块，所以所有注意力块合并在一起后，其维度为 (batch,seq_len,dim_head * head_num)
        data = torch.concat(
            [attention(x_q, x_k, x_v, mask_mat) for attention in self.attention_blocks],
            dim = -1
        )

        # 将合并的输出乘上 W 矩阵，其维度为 (batch,seq_len,dim_out)
        return self.W_net(data)

## 两个函数的实现

---

### 1. Positional Encoding Function

正弦-余弦位置编码函数，对于序列中的每个位置 $pos$、隐藏层维度 $d$ 、隐藏层中的每个维度索引 $i$ 、位置编码向量的第 $i$ 个元素
 $PE_{(pos,2i)}$ 和 $PE_{(pos,2i + 1)}$ 分别通过正弦和余弦函数计算：

$$
PE_{(pos,2i)} = sin(\frac{pos}{10000^{\frac{2i}{d}}})
$$

$$
PE_{(pos,2i + 1)} = cos(\frac{pos}{10000^{\frac{2i}{d}}})
$$

实际上就是偶数采用 $sin$ 函数计算，奇数采用 $cos$ 函数计算。推荐阅读：[知乎-位置编码](https://zhuanlan.zhihu.com/p/580739030)

In [None]:
def positional_encoding(batch:int , seq_len:int, dim:int, dtype:torch.dtype = torch.float32) :
    """
    位置编码

    :param batch: 批次数量
    :param seq_len: 序列长度
    :param dim: 序列中单个数据的隐藏维度
    :param dtype: 数据的类型
    :return: 正弦-余弦位置编码
    """

    batch_pe_mat = torch.zeros([batch, seq_len, dim], requires_grad=False, dtype=dtype)
    for batch_idx in range(batch):
        for pos in range(seq_len) :
            for i in range(dim):
                if i % 2 == 0:
                    batch_pe_mat[batch_idx][pos][i] = torch.sin(torch.tensor(pos / (10000**(i/dim)), dtype= dtype, requires_grad=False))
                else:
                    batch_pe_mat[batch_idx][pos][i] = torch.cos(torch.tensor(pos / (10000**((i - 1)/dim)), dtype= dtype, requires_grad=False))

    return batch_pe_mat

### 2. Mask Matrix Generate Function

掩码多头注意力中的掩码实现原理如下图所示：

![Mask Mat Demo](img/Mask-Mat-Demo.svg)

图中灰色的部分会被遮掩，在被遮掩后其值会被填充为 `-inf` （负无穷）。对于一个长度为 $n$ 需要遮掩的长度为 $n_{mask}$ 的序列，其对应掩码矩阵 $A_{n \times n}$ 的任意元素 $a_{i,j}$ 有：

$$
a_{i,j} = \begin{cases}
1,& n - j + i \leqslant n_{mask}\\
0,& n - j + i > n_{mask}\\
\end{cases}
$$

这里的 `1` 表达为 `true` 指需要被遮掩的元素，同理 `0` 表达为 `false` 指不需要被遮掩的元素 。

In [None]:
def generate_mask_mat(seq_len, masked_len):
    """
    生成掩码矩阵

    :param seq_len: 序列长度
    :param masked_len: 掩码长度
    :return: 掩码矩阵
    """

    mask_mat = torch.zeros([seq_len, seq_len], dtype = torch.bool, requires_grad=False)

    # 填充掩码矩阵
    for i in range(seq_len):
        for j in range(seq_len):
            mask_mat[i][j] = torch.tensor(seq_len - j + i <= masked_len, dtype = torch.bool, requires_grad=False)

    return mask_mat

## Transformer 最终实现

---

在 Transformer 结构中，可以将“详细原理图”中的左侧虚线框部分视作一个“编码器”（Encoder），将右侧虚线框内的部分视作一个“解码器”(Decoder)，其中 Encoder 数量与 Decoder 数量不必相等，那么整个 Transformer 的结构可以简化为下图：

![Encoder Decoder Struct](img/Encoder-Decoder-Struce.svg)

在 Transformer 中，输入张量通过 Encoder/Decoder 后要保持其维度不变，即输入维度为 `(batch,seq_len,dim_emb)` 从 Encoder/Decoder 输出后的维度也要为 `(batch,seq_len,dim_emb)` ，所以接下来实现的 Encoder 和 Decoder 都会令其中使用到的多头注意力机制的 `dim_out` 等于其 `dim_emb` 。

### 1. Encoder

In [None]:
class Encoder(nn.Module):
    def __init__(self, dim_emb:int, dim_head:int, head_num:int):
        """
        Transformer Encoder

        :param dim_emb: 数据的嵌入维度
        :param dim_head: 单个注意力模块的头维度
        :param head_num: 多头注意力的头数
        """
        super().__init__()

        self.dim_emb = dim_emb
        self.dim_head = dim_head
        self.head_num = head_num

        # 构建编码器需要的子模块
        self.multi_head_attention = MultiHeadAttention(dim_emb, dim_head, dim_emb, head_num)
        self.rms_norm1 = RMSNorm(dim_emb)
        self.feed_forward = FeedForward(dim_emb, 4 * dim_emb)
        self.rms_norm2 = RMSNorm(dim_emb)

    def forward(self, x):

        d0 = self.multi_head_attention(x, x, x)
        d1 = self.rms_norm1(x + d0)
        d2 = self.feed_forward(d1)
        d3 = self.rms_norm2(d1 + d2)

        return d3

### 2. Decoder

In [None]:
class Decoder(nn.Module):
    def __init__(self, dim_emb:int, dim_head:int, head_num:int):
        """
        Transformer Decoder

        :param dim_emb: 数据的嵌入维度
        :param dim_head: 单个注意力模块的头维度
        :param head_num: 多头注意力的头数
        """
        super().__init__()

        self.dim_emb = dim_emb
        self.dim_head = dim_head
        self.head_num = head_num

        # 构建解码器需要的子模块
        self.masked_multi_head_attention = MultiHeadAttention(dim_emb, dim_head, dim_emb, head_num)
        self.rms_norm1 = RMSNorm(dim_emb)
        self.multi_head_attention = MultiHeadAttention(dim_emb, dim_head, dim_emb, head_num)
        self.rms_norm2 = RMSNorm(dim_emb)
        self.feed_forward = FeedForward(dim_emb, 4 * dim_emb)
        self.rms_norm3 = RMSNorm(dim_emb)

    def forward(self, x, mask_mat = None, enc_k = None, enc_v = None):

        d0 = self.masked_multi_head_attention(x, x, x, mask_mat)
        d1 = self.rms_norm1(x + d0)

        _enc_k = d1 if enc_k is None else enc_k
        _enc_v = d1 if enc_v is None else enc_v

        d2 = self.multi_head_attention(d1, _enc_k, _enc_v)
        d3 = self.rms_norm2(d1 + d2)
        d4 = self.feed_forward(d3)
        d5 = self.rms_norm3(d3 + d4)

        return d5

### 3. Transformer

In [None]:
class Transformer(nn.Module):
    def __init__(
            self,
            encoder_num:int,
            decoder_num:int,
            vocab_size:int,
            dim_emb:int,
            dim_head:int,
            head_num:int,
    ):
        """
        Transformer

        :param encoder_num: 编码器数量
        :param decoder_num: 解码器数量
        :param vocab_size: 词表大小
        :param dim_emb: 嵌入维度
        :param dim_head: 单个注意力模块的头维度
        :param head_num: 头数
        """
        super().__init__()

        self.encoder_num = encoder_num
        self.decoder_num = decoder_num
        self.vocab_size = vocab_size
        self.dim_emb = dim_emb
        self.dim_head = dim_head
        self.head_num = head_num

        # 词向量嵌入模块
        self.input_embedding = nn.Embedding(vocab_size, dim_emb)
        self.output_embedding = nn.Embedding(vocab_size, dim_emb)

        # 创建解码器和编码器
        self.encoder_blocks = nn.ModuleList()
        for i in range(encoder_num):
            self.encoder_blocks.append(Encoder(dim_emb, dim_head, head_num))

        self.decoder_blocks = nn.ModuleList()
        for i in range(decoder_num):
            self.decoder_blocks.append(Decoder(dim_emb, dim_head, head_num))

        # 创建线性层，线性层的作用是将嵌入维度转换为词表维度
        self.linear = nn.Linear(dim_emb, vocab_size)

    def forward(self, input_seq, output_seq, mask_mat = None):

        # 输入词向量嵌入
        embedded_input_seq = self.input_embedding(input_seq)

        # 计算输入序列位置编码
        batch,input_seq_len = input_seq.size()
        input_positional_code = positional_encoding(batch, input_seq_len, self.dim_emb)

        # 对 input_seq 进行位置标记
        pe_input_seq = embedded_input_seq + input_positional_code

        # 计算编码器的计算结果，其最终输出维度为 (batch, input_seq_len, dim_emb)
        for encoder in self.encoder_blocks:
            pe_input_seq = encoder(pe_input_seq)

        # 输出词向量嵌入
        embedded_output_seq = self.output_embedding(output_seq)

        # 计算输出序列位置编码
        _,output_seq_len = output_seq.size()
        output_positional_code = positional_encoding(batch, output_seq_len, self.dim_emb)

        # 对 output_seq 进行位置标记
        pe_output_seq = embedded_output_seq + output_positional_code

        # 计算解码器的计算结果，其最终输出维度为 (batch, output_seq_len, dim_emb)
        for decoder in self.decoder_blocks:
            pe_output_seq = decoder(pe_output_seq, mask_mat, pe_input_seq, pe_input_seq)

        # 最终输出维度为 (batch, output_seq_len, vocab_size)
        return F.softmax(self.linear(pe_output_seq), dim=-1)

## 模型生成分析

---

### 1. 模型生成结果

transformer 的生成结果是一个 `(batch, output_seq_len, vocab_size)` 大小的张量，我们抛去 `batch` 这一维度仅看 `(output_seq_len, vocab_size)` ，其中 `output_seq_len` 是**原始输入**的序列长度，`vocab_size` 是词库长度， 那么每一行对应的列就是改行对应的下一个单词的在词库中的分布概率。下面是生成过程示意图：

![Transformer Prediction](img/Transformer-Prediction.svg)

此图中，Transformer 的作用是要将一句英文内容翻译为中文内容，现在我们只需关注 Decoder 部分即可：原始输入 $X$ 在掩码矩阵`Mask Matrix` 的作用下将对应的元素进行遮蔽，比如图中 `Veiw Field` 的第一行遮蔽了“河之水天” 只留下了 “黄” 字的视野，而 transformer 最终生成的结果概率矩阵 `Probability Matrix` 对应的第一行就是预测 “黄” 字下一个 token 在词库中的概率。

注意，上图只是一个简易的示意图，与其背后的真实原理有一定的差异，推荐参考：[BiliBili-利用上下文之三角掩码矩阵](https://www.bilibili.com/video/BV1cEPCesEKt)

### 2. 模型预测

在预测下一个 token 时，我们通常关注的是解码器生成的最后一个 token 的概率分布。因此，我们需要从 `(batch, output_seq_len, vocab_size)` 的概率分布中取出最后一个位置的概率分布，然后使用 `torch.argmax` 选择概率最高的 token（取 token 有很多采样策略，这里为了方便测试采用的是**贪婪策略**）。具体步骤如下：

- 若 transformer 的最终输出为 $P(y_t)$ ，取出最后一个位置的概率分布：
$$
P(y_t)_{last} = P(y_t)[:,−1,:]
$$

  其中，$P(y_t)_{last}$ 的维度为 `(batch, vocab_size)`。

- 使用 `torch.argmax` 选择概率最高的 token：
<center>
token_next = torch.argmax( $P(y_t)_{last}$ ,dim $=−1$ )
</center><p/>

  其中，`token_next` 的维度为 `(batch, 1)`。这样，我们就可以得到每个批次中下一个 token 的预测值。


In [None]:
import random

# 配置
# =====================================
encoder_num = 2  # 编码器数量
decoder_num = 3  # 解码器数量
vocab_size = 256 # 词表大小
dim_emb = 128    # 词向量维度
dim_head = 32    # 注意力头维度
head_num = 4     # 注意力头数

batch = 3             # 测试数据批数
input_seq_len = 12    # 测试输入数据长度
output_seq_len = 7    # 测试输出数据长度
output_masked_len = output_seq_len - 1 # 测试输出掩码长度
# =====================================

# 创建 transformer 网络
transformer = Transformer(encoder_num, decoder_num, vocab_size, dim_emb, dim_head, head_num)

def rand_seq_code(batch, seq_len):
    """
    随机序列编码

    :param batch: 批数
    :param seq_len: 序列长度
    :return: 随机序列编码
    """

    data = []
    for i in range(batch):
        bat_data = [random.randint(0, vocab_size) for _ in range(seq_len)]
        data.append(bat_data)
    return torch.tensor(data)

# 构建随机输入输出序列
input_seq = rand_seq_code(batch, input_seq_len)
output_seq = rand_seq_code(batch, output_seq_len)

# 构建掩码矩阵
mask_mat = torch.stack([generate_mask_mat(output_seq_len, output_masked_len) for i in range(batch)])

# transformer 输出
res = transformer(input_seq, output_seq)

# 打印输出结果
print(res)
print(res.size())

# 预测下一个值
next_token = torch.argmax(res[:,-1,:], dim=-1)
print(next_token)