# Transformer 组件 

## FFN（前馈神经网络）


![FFN](./img/pwFFN.png)

Position-wise 实际是线性层本身的一个特性，在线性层中，每个输入向量（对应于序列中的一个位置，比如一个词向量）都会通过相同的权重矩阵进行线性变换，这意味着每个位置的处理是相互独立的，逐元素这一点可以看成 kernal_size=1 的卷积核扫过一遍序列。

FFN实现很简单，本质上是proj_up换加上激活函数，再加上一个proj_down


In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [2]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        """
        FFN
        args:
            d_model: 输入和输出向量的维度
            d_ff： FFN隐藏层的维度
            dropout：随机屏蔽部分输出，防止过拟合（也是一种正则化手段）
        """
        super(PositionwiseFeedForward,self).__init__()
        self.proj_up = nn.Linear(d_model, d_ff)
        self.proj_down = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.proj_up(x).relu()
        x = self.dropout(x)
        x = self.proj_down(x)
        return x

In [3]:
batch_size = 64
seq_len = 256
d_model = 512
d_ff = 2048
x = torch.randn(batch_size, seq_len, d_model)

ffn = PositionwiseFeedForward(d_model, d_ff)
print("x shape:", x.shape, "\nffn(x) shape:", ffn(x).shape)

x shape: torch.Size([64, 256, 512]) 
ffn(x) shape: torch.Size([64, 256, 512])


## 残差连接

残差连接是一种跳跃连接，将输入直接加入到输出上（实际上，有了残差连接后参数的更新只需要去做f(x)-x的部分即可？）：
$$\text{Output} = \text{Sublayers}(x) + x$$
主要作用是**缓解梯度消失/爆炸**

In [None]:
class ResidualConnection(nn.Module):
    def __init__(self, dropout=0.1):
        """
        residual，用于在每个子层后添加残差连接和 Dropout。
        
        args:
            dropout: 防止过拟合。
        """
        super(ResidualConnection, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, sublayer):
        """
        args:
            x: 残差连接的输入张量，形状为 (batch_size, seq_len, d_model)。
            sublayer: 子层模块的函数，多头注意力或前馈网络。

        return:
            经过残差连接和 Dropout 处理后的张量，形状为 (batch_size, seq_len, d_model)。
        """
        # 将子层输出应用 dropout，然后与输入相加（参见论文 5.4 的表述或者本文「呈现」部分）
        return x + self.dropout(sublayer(x))

In [5]:
batch_size = 64
seq_len = 256
d_model = 512
d_ff = 2048
x = torch.randn(batch_size, seq_len, d_model)
ffn = PositionwiseFeedForward(d_model, d_ff)
residual = ResidualConnection()
print("x shape:", x.shape, "\nffn(x) and residual shape:", residual(x, ffn).shape)

x shape: torch.Size([64, 256, 512]) 
ffn(x) and residual shape: torch.Size([64, 256, 512])


## Ebedding Layer（嵌入层）
因为 token ID 只是整数标识符，彼此之间没有内在联系。如果直接使用这些整数，模型可能在训练过程中学习到一些模式，但无法充分捕捉词汇之间的语义关系，这显然不足以支撑起现在的大模型。
- 作用：
    - 捕捉语义信息（映射到高维的连续向量空间）
    - embeding 类似于表格查找的操作
- 参数可训练

In [7]:
class Embeddings(nn.Module):
    """
    token ID tranform to embedding vector

    args: 
        vocab_size: 词表大小
        d_model：嵌入向量维度（隐藏层维度）
    """
    def __init__(self, vocab_size, d_model):
        super(Embeddings, self).__init__()
        self.embed = nn.Linear(vocab_size, d_model)
        self.scaled_factor = math.sqrt(d_model)

    def forward(self, x):
        x = self.embed(x)
        x_embed = x * self.scaled_factor
        return x_embed

## softmax
将向量转化为**概率分布**
$$\text{Softmax(x)}_i =  \frac{\exp{(x_i)}}{\sum_j {\exp{(x_j)}}}$$
指数形式放大变换前后的差异性，并且保证非负性

In [17]:
def Softmax(x):
    exp_x = torch.exp(x)
    sum_exp_x = torch.sum(exp_x, dim=-1, keepdim=True)
    return exp_x / sum_exp_x

keepdim 保持维度，便于广播

In [18]:
x = torch.rand(2, 5)
softmax_x = Softmax(x)
print("x:", x, "\nafter softmax:", softmax_x)

x: tensor([[0.1442, 0.6782, 0.8527, 0.3439, 0.2320],
        [0.3960, 0.2310, 0.7250, 0.1348, 0.8522]]) 
after softmax: tensor([[0.1419, 0.2420, 0.2881, 0.1732, 0.1549],
        [0.1790, 0.1518, 0.2488, 0.1379, 0.2825]])


In [23]:
y = torch.arange(6, dtype=torch.float32).reshape(2, 3)
sum_keep = torch.sum(y, dim=-1, keepdim=True)
print(sum_keep)        # tensor([[ 6.], [15.]])
print(sum_keep.shape)  # torch.Size([2, 1])

sum_no_keep = torch.sum(y, dim=-1)
print(sum_no_keep)        # tensor([ 6., 15.])
print(sum_no_keep.shape)  # torch.Size([2])

tensor([[ 3.],
        [12.]])
torch.Size([2, 1])
tensor([ 3., 12.])
torch.Size([2])


## CrossEntropy（交叉熵损失）
公式：
$$\mathcal{L} = - \sum_{i} y_i \log(\hat{y}_i) $$
其中$y_i$是真实标签的one-hot编码，$\hat{y}_i$是预测值

- nn.CrossEntropyLoss()直接接受logits(没有经过softmax)

```
CrossEntropyLoss(logits, target) = NLLLoss(log_softmax(logits), target)
```

## PositionEbedding
![](./img/PE.png)

在原始论文中，Transformer 使用的是固定位置编码（Positional Encoding），其公式如下：

\begin{array}{c}
P E_{(p o s, 2 i)}=\sin \left(\frac{p o s}{10000^{2 i / d_{\text {model }}}}\right) \\
P E_{(p o s, 2 i+1)}=\cos \left(\frac{p o s}{10000^{2 i / d_{\text {model }}}}\right)
\end{array}


其中：
- pos 表示位置索引（Position）。
-  i  表示维度索引。
-  $d_{\text {model }}$  是嵌入向量的维度。

![drpopout](./img/AboutDropout.png)

In [None]:
class PositionEmbeding(nn.Module):
    def __init__(self,d_model, dropout =0.1, max_len=5000):
        """
        PE,添加序列的唯一位置信息

        args:
            d_model: 嵌入维度
            dropout: 应用PE后的 Dropout的概率
            max_len: 位置编码的最大长，适应不同的输入序列
        """
        super(PositionEmbeding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)  # 论文的5.4 Residual Dropout
        
        # 创建位置编码矩阵 shape is (max_len, d_model)
        # 位置索引为 (max_len, 1)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1) # unsqueeze(1) to broadcast d_model

        # 频率计算
        # pos / (10000^(2i/d_model)) 
        # = pos * exp(log(10000^(-2i/d_model)))
        # = pos * exp(-2i/d_model * log(10000))
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model )
        )

        # 计算 sin 和 cos
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)


        # 扩充维度广播 input (batch_size, seq_len, d_model)
        pe = pe.unsqueeze(0)

        # 将位置编码注册为模型的缓冲区，不作为参数更新
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]  # 去除相同序列长度的位置编码进行合并
        x = self.dropout(x)
        return x

示例

In [37]:
PE = PositionEmbeding(d_model=4)
x = torch.rand(4,3,4)
x_pe = PE(x)
print(x, "\n", x_pe)

tensor([[[0.6715, 0.7376, 0.1806, 0.1719],
         [0.7500, 0.3433, 0.6341, 0.1776],
         [0.6561, 0.3824, 0.3398, 0.5301]],

        [[0.8644, 0.7777, 0.3688, 0.9314],
         [0.0840, 0.0080, 0.5564, 0.0306],
         [0.7182, 0.2303, 0.6448, 0.1608]],

        [[0.7516, 0.7898, 0.6295, 0.4097],
         [0.5418, 0.5777, 0.6803, 0.8286],
         [0.8426, 0.0279, 0.4214, 0.3846]],

        [[0.4352, 0.7577, 0.0216, 0.4315],
         [0.0411, 0.4526, 0.4413, 0.4428],
         [0.5427, 0.2797, 0.2008, 0.6624]]]) 
 tensor([[[ 0.7461,  1.9307,  0.2006,  1.3021],
         [ 1.7683,  0.9817,  0.7156,  1.3084],
         [ 0.0000, -0.0000,  0.3997,  1.6999]],

        [[ 0.9604,  1.9752,  0.4098,  2.1460],
         [ 1.0283,  0.6093,  0.6293,  1.1450],
         [ 1.8083, -0.2065,  0.7387,  1.2896]],

        [[ 0.8351,  0.0000,  0.6995,  1.5663],
         [ 1.5369,  0.0000,  0.7670,  2.0318],
         [ 1.9465, -0.4314,  0.4904,  0.0000]],

        [[ 0.4836,  1.9531,  0.0240,  1.5905]

- encoder part
    - 编码器的输入由输入嵌入（Input Embedding）和位置编码（Positional Encoding）组成，在机器翻译任务中，还可以称为源语言嵌入（Source Embedding）。

In [39]:
class SourceEmbedding(nn.Module):
    def __init__(self, d_model, src_vocab_size, dropout=0.1):
        """
        inputs 部分的 embeddings

        args:
            scr_vocab_size: 源语言词汇表大小
            d_model: 嵌入向量维度
            dropout概率 
        """
        super(SourceEmbedding, self).__init__()
        self.embed = Embeddings(src_vocab_size, d_model)
        self.positional_encoding = PositionEmbeding(d_model, dropout)

    def forward(self, x):
        x = self.positional_encoding(self.embed(x))  # (batch_size, seq_len_src, d_model)
        return x


- decoder part
    - 解码器的输入由输出嵌入（Output Embedding）和位置编码（Positional Encoding）组成，在机器翻译这个任务中也可以称为目标语言嵌入（Target Embedding），为了避免与最终输出混淆，使用 TargetEmbedding 进行实现。

In [40]:
class TargetEmbedding(nn.Module):
    def __init__(self, d_model, tgt_vocab_size, dropout=0.1):
        """
        inputs 部分的 embeddings

        args:
            tgt_vocab_size: 源语言词汇表大小
            d_model: 嵌入向量维度
            dropout概率 
        """
        super(SourceEmbedding, self).__init__()
        self.embed = Embeddings(tgt_vocab_size, d_model)
        self.positional_encoding = PositionEmbeding(d_model, dropout)

    def forward(self, x):
        x = self.positional_encoding(self.embed(x))  # (batch_size, seq_len_tgt, d_model)
        return x

### 掩码mask

1. Padding Mask
- 填充掩码用于在注意力计算时屏蔽填充 \<PAD\> 位置，防止模型计算注意力权重的时候考虑这些无意义的位置，
    - 注意：这里接受的参数为 pad_token_id，这意味着掩码操作在嵌入操作前，也就是分词（tokenize）然后映射为 Token IDs 后进行。

In [41]:
def create_padding_mask(seq, pad_token_id=0):
    # seq shape is [batch_size, seq_len] --> [batch_size, 1, 1, seq_len]
    mask = (seq != pad_token_id).unsqueeze(1).unsqueeze(2)
    return mask

In [44]:
# 示例 0 表示 <PAD>
seq = torch.tensor([[5, 7, 9, 0, 0], [8, 6, 0, 0, 0]])  
seq_masked = create_padding_mask(seq)
seq_masked, seq_masked.shape

(tensor([[[[ True,  True,  True, False, False]]],
 
 
         [[[ True,  True, False, False, False]]]]),
 torch.Size([2, 1, 1, 5]))

2. Look-ahead Mask

防止模型训练时候偷看答案

In [45]:
def create_look_ahead_mask(size):
    mask = torch.tril(torch.ones(size, size)).type(torch.bool)  # 下三角矩阵
    return mask  # (seq_len, seq_len)

In [51]:
print(create_look_ahead_mask(6), create_look_ahead_mask(6).shape)

tensor([[ True, False, False, False, False, False],
        [ True,  True, False, False, False, False],
        [ True,  True,  True, False, False, False],
        [ True,  True,  True,  True, False, False],
        [ True,  True,  True,  True,  True, False],
        [ True,  True,  True,  True,  True,  True]]) torch.Size([6, 6])


In [None]:
def create_decoder_mask(tgt_seq, pad_token_id=0):
    # (batch_size, 1, 1, seq_len_tgt)
    padding_mask = create_padding_mask(tgt_seq, pad_token_id)

     # (seq_len_tgt, seq_len_tgt)  
    look_ahead_mask = create_look_ahead_mask(tgt_seq.size(1)).to(tgt_seq.device) 

    # broadcast to (batch_size, 1, seq_len_tgt, seq_len_tgt)
    combined_mask = look_ahead_mask.unsqueeze(0) & padding_mask  
    return combined_mask

In [54]:
tgt_seq = torch.tensor([[1, 2, 3, 4, 0, 0, 0]])  # 0 表示 <PAD>
print(create_decoder_mask(tgt_seq))

tensor([[[[ True, False, False, False, False, False, False],
          [ True,  True, False, False, False, False, False],
          [ True,  True,  True, False, False, False, False],
          [ True,  True,  True,  True, False, False, False],
          [ True,  True,  True,  True, False, False, False],
          [ True,  True,  True,  True, False, False, False],
          [ True,  True,  True,  True, False, False, False]]]])


### 总结

假设目标序列 `tgt_seq = [A, B, C, D, <PAD>]`。

- **填充掩码**：会屏蔽 `<PAD>` 位置。
- **未来信息掩码**：在位置 `C`，模型只能看到 `A`、`B` 和 `C`，但在最后一个位置上，会看到全部（包括 `<PAD>`）。
- **组合掩码**：在位置 `C`，模型只能看到 `A`、`B` 和 `C`，在最后一个位置上，模型也只能看到 `A`、`B`、`C` 和 `D`。