In [1]:
import copy                  
import math                  

import torch                  
from torch import nn                  
from torch.functional import F


In [2]:
! pip list

Package                           Version
--------------------------------- -----------
accessible-pygments               0.0.5
alabaster                         1.0.0
alphabase                         1.7.2
alpharaw                          0.4.8
alphatims                         1.0.9
annotated-types                   0.7.0
anyio                             4.11.0
argon2-cffi                       25.1.0
argon2-cffi-bindings              25.1.0
arrow                             1.4.0
asttokens                         3.0.0
async-lru                         2.0.5
attrs                             25.4.0
autodocsumm                       0.2.14
babel                             2.17.0
beautifulsoup4                    4.14.2
biopython                         1.85
bleach                            6.2.0
boto3                             1.40.59
botocore                          1.40.59
bracex                            2.6
bravado                           11.1.0
bravado-core           

## 词嵌入Embedding
词嵌入是将离散的文本符号（如单词、子词）映射为连续低维向量的过程。在PyTorch中通常通过nn.Embedding实现，它将每个词的索引转换为固定维度的稠密向量，使模型能够捕捉词语间的语义关联，为后续的注意力计算和特征提取奠定数值基础。

广义的embedding是指将自然语言字符串转换为向量的过程，各个大模型LLM均有自己的分词器和词汇表，在进入Transformer的embedding前已经将自然语言处理成了二维矩阵张量。


![Embedding](figures/a65c3e7a01e4ac3606f7c2b7a70ecfee.png)

图示张量为[[39,1592,10,2548,5]]

第一维（通常为batch_size）： 代表一个训练批次（batch）中包含的样本数量。为了高效训练，模型会并行处理多个句子序列。例如，一个形状为 [32, 10] 的张量，表示我们一次性处理32个独立的句子。

第二维： 代表单个样本（即一个句子）的长度，也就是该句子中包含的单词或子词（Token）的数量。继续上面的例子（[32, 10]），10 表示这32个句子中的每一个都被统一处理或填充为10个Token长。


In [3]:
class Embedding(nn.Module):                  
    def __init__(self, d_model: int, vocab_size: int):                  
        """d_model:词嵌入的维度， vocab:词表的大小"""
        super(Embedding, self).__init__()                  
        self.d_model = d_model                  
        self.vocab_size = vocab_size                  
        self.embedding = nn.Embedding(vocab_size, d_model)                  

    def forward(self, x):                  
        # 乘上权重来自于论文 3.4                  
        return self.embedding(x) * math.sqrt(self.d_model)

测试 class Embedding()

In [7]:
# 1. 设置参数
d_model = 512  # 词嵌入维度（示例，可自定义）
vocab_size = 1000  # 词表大小（需大于输入中的最大索引值）

# 2. 创建形状为 [2, 4] 的输入（2个样本，每个样本4个词索引）
# 索引值需在 [0, vocab_size-1] 范围内，这里随机生成
x = torch.randint(0, vocab_size, (2, 4))  # 形状: torch.Size([2, 4])
print("input shape:\n", x.shape)
print("input example:\n", x)

# 3. 实例化嵌入层
embedding_layer = Embedding(d_model, vocab_size)

# 4. 前向传播
output = embedding_layer(x)

# 5. 验证输出形状（预期：[2, 4, d_model]）
print("output shape:\n", output.shape)
print("output example:\n", output[0, 0, :])  # 打印前5维

input shape:
 torch.Size([2, 4])
input example:
 tensor([[861, 329,  20, 330],
        [921, 808, 470, 583]])
output shape:
 torch.Size([2, 4, 512])
output example:
 tensor([ 19.5487, -55.4443, -32.8522,  -6.5474,   5.4467, -33.1357,  10.2999,
         18.8888, -27.1909,  -6.5088,  10.0380,   5.0923,  11.7681,  28.7709,
         -8.9583,  22.8899,   9.5049,  -0.2021,  -6.9523, -17.2628, -21.2329,
         30.1896, -21.7649,   3.2211, -36.2002,  34.8609,  -1.9186,  12.6291,
         -8.8502, -30.2478,  -0.6691,  10.3841,  -3.0758, -34.2872, -48.0336,
        -52.2171, -14.6487, -10.1262, -48.9307, -18.0721, -29.8355,   2.2841,
         14.4870,   2.2769,  21.6282,   9.0954,  22.8564,  -6.2375, -51.8411,
        -13.2375,   3.0120,  15.3333, -30.0134, -29.5732,  27.9618, -19.4803,
         21.0745, -48.9125,   2.1010, -60.4926,  -0.8750,   8.5988, -57.8850,
         48.1430, -25.5690,  17.5148,  23.1672,  27.8680,  -6.6752,  16.9334,
         -5.7527, -25.4094,  34.9158,  48.8239,  34.47

## 位置编码器PositionalEncoding

由于Transformer无循环或卷积结构，无法直接捕捉序列的位置信息。位置编码器通过正弦和余弦函数生成与词嵌入维度相同的位置向量，将其与词嵌入向量相加，使模型能够区分不同位置的token，确保序列顺序信息在建模过程中不丢失。

In [None]:
class PositionalEncoding(nn.Module):                  
    def __init__(self, d_model: int, dropout: float, max_len: int = 5000):                  
        """d_model:词嵌入维度，dropout:置0比率，max_len:最大长度"""
        super(PositionalEncoding, self).__init__()                  
        self.dropout = nn.Dropout(p=dropout)                  

        pe = torch.zeros(max_len, d_model)                  
        position = torch.arange(0, max_len).unsqueeze(1)                  
        # 实现公式 3.5, 10000^(-2i/d_model) = exp(2i × (-ln(10000)/d_model))                  
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))                  

        pe[:, 0::2] = torch.sin(position * div_term)                  
        pe[:, 1::2] = torch.cos(position * div_term)                  

        pe = pe.unsqueeze(0)# (1, max_len, d_model)                  
        self.register_buffer('pe', pe)                  

    def forward(self, x):                  
        # (batch, max_len, d_model)                  
        # 论文不参与反向传播        
        x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False)                  
        return self.dropout(x)

架构图中位置编码器与embedding中间有个"+"号，对应到正向传播代码就是：

x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False)

## dropout

在神经网络中，dropout 是防止过拟合的重要手段。当模型计算出矩阵后，会随机将一部分权重值置为 0（丢弃比例通常设为 0.1），再进行后续的操作。这种随机丢弃机制打破了神经元间的过度依赖，避免模型对训练数据中特定位置的关联过度拟合，迫使模型学习更通用的特征关联模式，从而提升在未见数据上的泛化能力。

# Encoder部分

## MultiHeadAttention 多头注意力

多头注意力是Transformer的核心组件，它将输入向量通过多个并行的注意力头分别计算注意力分布，再将结果拼接融合。这种设计能让模型同时捕捉不同尺度、不同维度的语义关联（如局部依赖和全局依赖），PyTorch中可通过自定义线性层和注意力计算逻辑实现多头并行处理。

![MultiHeadAttention](figures/b5908f57cb73b58f93e6a5a139849b49.png)

注意力机制

注意力机制的核心思想源于人类视觉和认知系统：在处理信息时，我们不会同等地对待所有输入，而是会选择性地聚焦于与当前任务最相关的部分，同时忽略其他不重要的信息。

在神经网络中，这一思想被抽象为一个可计算的数学过程。它的关键作用可以概括为 “权重分配” 。具体来说，对于一个查询（Query），模型会计算它与一系列键（Key）的相似度，然后根据这些相似度得分（即注意力权重）来对对应的值（Value）进行加权求和。这个权重就代表了模型在处理当前查询时，应该对每个值投入多少“注意力”。

这段代码出现了概念：掩码张量（attn.masked_fill）。

In [9]:
def attention(self, query, key, value, mask=None, dropout=None):                  
    d_k = query.shape[-1] # d_model:词嵌入维度

    # torch.matmul A的最后一个维度 必须等于B的倒数第二个维度                  
    attn = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)                  
    if mask is not None:                  
        attn = attn.masked_fill(mask == 0, -1e9)                  
    # 最后一维进行softmax操作                  
    attn = F.softmax(attn, dim=-1)                  
    if dropout is not None:                  
        attn = dropout(attn)                  
    return torch.matmul(attn, value), attn


Multi-head可以在更细致的层⾯上提取不同head的特征，总体计算量和单⼀head相同的情况下，提取特征的效果更佳。

![MultiHeadAttention](figures/multiheadattention.png)

最关键部分是理解清楚矩阵的结构变换：

多头处理前：初始状态是一个三维矩阵(batch_size, max_len, d_model)，多头分割后变成了4维矩阵(batch_size, max_len, head, d_k)，为了方便多头处理将head所在维度前移(batch_size, head, max_len, d_k)

多头处理后，又重新变回三维矩阵，与输入结构保持一致(batch_size, max_len, d_model)

In [10]:
class MultiHeadAttention(nn.Module):                  
    def __init__(self, head: int, d_model: int, dropout: float = 0.1):                  
        """head代表头数，d_model代表词嵌⼊的维度"""
        super(MultiHeadAttention, self).__init__()                  

        assert d_model % head == 0, "d_model 必须可以整除 head"                  
        self.d_k = d_model // head# 每个头获得的分割词向量维度d_k                  
        self.head = head                  

        self.w_q = nn.Linear(d_model, d_model, bias=False)                  
        self.w_k = nn.Linear(d_model, d_model, bias=False)                  
        self.w_v = nn.Linear(d_model, d_model, bias=False)                  
        self.w_o = nn.Linear(d_model, d_model, bias=False)                  

        self.dropout = nn.Dropout(dropout)                  

    # 注意力                  
    def attention(self, query, key, value, mask=None, dropout=None):                  
        d_k = query.shape[-1]                  

        # torch.matmul A的最后一个维度 必须等于 B的倒数第二个维度                  
        attn = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)                  
        if mask is not None:                  
            attn = attn.masked_fill(mask == 0, -1e9)                  
        # 最后一维进行softmax操作                  
        attn = F.softmax(attn, dim=-1)                  
        if dropout is not None:                  
            attn = dropout(attn)                  
        return torch.matmul(attn, value), attn                  

    def forward(self, q, k, v, mask=None):                  
        if mask is not None:                  
            mask = mask.unsqueeze(0)                  

        query = self.w_q(q)# Q                  
        key = self.w_k(k)# K                  
        value = self.w_v(v)# V                  

        batch_size = query.size(0)                  
        # 多头切割                  
        # (batch_size, max_len, d_model) -->(batch_size, max_len, head, d_k) -->(batch_size, head, max_len, d_k)                  
        query = query.view(batch_size, -1, self.head, self.d_k).transpose(1, 2)                  
        key = key.view(batch_size, -1, self.head, self.d_k).transpose(1, 2)                  
        value = value.view(batch_size, -1, self.head, self.d_k).transpose(1, 2)                  

        x, self.attn = self.attention(query, key, value, mask=mask, dropout=self.dropout)                  
        # (batch_size, head, max_len, d_k) -> (batch_size, max_len, head * d_k) -> (batch_size, max_len, d_model)                  
        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)                  
        return self.w_o(x)

## FeedForward 前馈全连接层

前馈全连接层对多头注意力输出的每个位置向量进行独立的非线性变换，通常采用“线性层+激活函数+线性层”的结构（如ReLU或GELU激活）。它能将注意力机制捕捉到的关联特征进一步映射到更复杂的表示空间，增强模型的非线性拟合能力。

开发实现中，一般采用带有dropout的实用公式：

FFN(x) = dropout(σ(xW₁ + b₁))W₂ + b₂

其中 σ 是激活函数，在原始论文中是 ReLU

In [11]:
class FeedForward(nn.Module):                  
    def __init__(self, d_model: int, d_ff: int, dropout=0.1):                  
        """d_ff：第二个线性层的输入维度"""
        super(FeedForward, self).__init__()                  
        self.linear_1 = nn.Linear(d_model, d_ff)  # 包含 W₁ 和 b₁
        self.linear_2 = nn.Linear(d_ff, d_model)  # 包含 W₂ 和 b₂
        self.dropout = nn.Dropout(dropout)                  

    def forward(self, x):                  
        # 论文中 3.3 FFN(x)                  
        # self.linear1(x) 实际计算: x @ W₁.T + b₁
        # self.linear2(...) 实际计算: ... @ W₂.T + b₂        
        return self.linear_2(self.dropout(F.relu(self.linear_1(x))))

## LayerNorm 规范化层

规范化层用于稳定模型训练过程，通过对输入张量按样本维度进行均值和方差归一化，抑制梯度消失或爆炸问题。

In [12]:
class LayerNorm(nn.Module):                  
    # 对每个样本不同位置的向量求均值和方差，然后进行归一化                  
    def __init__(self, features: int, eps: float = 1e-6):                  
        """ features, 表示词嵌⼊的维度，样本的特征数量
        eps是⼀个⾜够⼩的数, 在规范化公式的分⺟中出现,防⽌分⺟为0.默认是1e-6."""
        super(LayerNorm, self).__init__()                  
        self.eps = eps                  
        self.alpha = nn.Parameter(torch.ones(features))                  
        self.bias = nn.Parameter(torch.zeros(features))                  

    def forward(self, x):                  
        # 最后一个维度的均值                  
        mean = x.mean(dim=-1, keepdim=True)                  
        # 最后一个维度的标准差                  
        std = x.std(dim=-1, keepdim=True)                  
        return self.alpha * (x - mean) / (std + self.eps) + self.bias


## SublayerConnection 子层连接结构

子层连接结构将每个编码器子层（如多头注意力、前馈层）的输出与输入进行残差连接，再经过层归一化处理。这种“残差+归一化”的设计能有效缓解深层模型的训练困难，让梯度更顺畅地反向传播，是Transformer能堆叠多层的关键保障。

Add & Norm模块接在每⼀个Encoder Block和Decoder Block中的每⼀个⼦层的后⾯。具体来说Add表示残差连接, Norm表示LayerNorm。

Add残差连接的作⽤：和其他神经⽹络模型中的残差连接作⽤⼀致，都是为了将信息传递的更深，增强模型的拟合能⼒。试验表明残差连接的确增强了模型的表现。

Norm的作⽤：随着⽹络层数的额增加，通过多层的计算后参数可能会出现过⼤、过⼩、⽅差变⼤等现象，这会导致学习过程出现异常，模型的收敛⾮常慢。因此对每⼀层计算后的数值进⾏规范化可以提升模型的表现。

论文中数学表达形式为: LayerNorm(x + Sublayer(x)), 其中Sublayer(x)为⼦层的输出

这里实现和论文中略有不同，先进行 norm 再进行 self-attention 或 feed-forward。也就是说，sublayer可以传多头注意力MultiHeadAttention，也只可以传前馈全连接层FeedForward。


In [13]:
class SublayerConnection(nn.Module):                  
    def __init__(self, features: int, dropout: float = 0.1):                  
        super(SublayerConnection, self).__init__()                  
        self.norm = LayerNorm(features)                  
        self.dropout = nn.Dropout(dropout)                  

    def forward(self, x, sublayer):                  
        # 此处和论文中 transformer 给的图略有不同, 先进行 norm 再进行 self-attention 或 feed-forward                  
        return x + self.dropout(sublayer(self.norm(x)))

## EncoderLayer 编码器层

编码器层是编码器的基本构建单元，由“多头注意力子层+子层连接”和“前馈全连接子层+子层连接”组成。每个EncoderLayer完成一次完整的特征提取与变换，通过重复堆叠多层EncoderLayer，模型能逐步捕捉序列中复杂的语义和结构信息。

In [24]:
class EncoderLayer(nn.Module):                  
    def __init__(self, features: int, self_attn: MultiHeadAttention,                  
                 feed_forward: FeedForward, dropout: float = 0.1):                  
        super(EncoderLayer, self).__init__()                  

        self.features = features                  
        self.self_attn = self_attn                  
        self.feed_forward = feed_forward                  

        # 编码器层中有两个⼦层连接结构                  
        self.sublayer = nn.ModuleList(                  
            [SublayerConnection(features, dropout) for _ in range(2)]                  
        )                  

    def forward(self, x, mask):                  
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))                  
        return self.sublayer[1](x, self.feed_forward)


## Encoder 编码器

编码器由N层EncoderLayer堆叠而成。它接收经过词嵌入和位置编码处理后的源序列输入，经过多层特征提取后输出源序列的上下文表示向量，该向量包含了源序列的全局语义信息，将作为解码器的注意力输入，为目标序列生成提供依据。

![transformer_encoder](figures/transformer_encoder.jpg)

In [25]:
def cloneModules(module, N):                  
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

In [26]:
class Encoder(nn.Module):                  
    def __init__(self, layer: EncoderLayer, N: int):                  
        """初始化函数的两个参数分别代表编码器层和编码器层的个数"""
        super(Encoder, self).__init__()                  
        self.layers = cloneModules(layer, N)                  
        self.norm = LayerNorm(layer.features)                  

    def forward(self, x, mask):                  
        for layer in self.layers:                  
            x = layer(x, mask)                  
        return self.norm(x)

# Decoder 解码器

## DecoderLayer

解码器的核心组件，包含三个子层：掩码自注意力层、编码器-解码器注意力层和前馈网络层，每层后都应用残差连接和规范化。

架构图中下边的多头注意力来自输入端，Q=K=V；上面的多头注意力，Q来自下面的多头注意力，K、V来自左边编码器的输出，即Q!=K=V。

![transformerDecoder](figures/transformer_decoder.jpg)

In [27]:
class DecoderLayer(nn.Module):                  
    def __init__(self, features: int, self_attn: MultiHeadAttention, cross_attn: MultiHeadAttention,                  
                 feed_forward: FeedForward, dropout: float):                  
        super(DecoderLayer, self).__init__()                  
        self.features = features                  
        self.self_attn = self_attn                  
        self.cross_attn = cross_attn                  
        self.feed_forward = feed_forward                  
        self.sublayer = cloneModules(SublayerConnection(features, dropout), 3)                  

    def forward(self, x, encoder_output, source_mask, target_mask):                  
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, target_mask))                  
        x = self.sublayer[1](x, lambda x: self.cross_attn(x, encoder_output, encoder_output, source_mask))                  
        return self.sublayer[2](x, self.feed_forward)


## Decoder 解码器

由多个解码器层堆叠组成，逐步融合编码器输出和已生成序列信息，生成目标语言的序列表示。

In [28]:
class Decoder(nn.Module):                  
    def __init__(self, layer: DecoderLayer, N: int):                  
        super(Decoder, self).__init__()                  
        self.layers = cloneModules(layer, N)                  
        self.norm = LayerNorm(layer.features)                  

    def forward(self, x, encoder_output, source_mask, target_mask):                  
        for layer in self.layers:                  
            x = layer(x, encoder_output, source_mask, target_mask)                  
        return self.norm(x)


# Generator 生成器类（线性层+softmax计算层）

In [29]:
class Generator(nn.Module):                  
    def __init__(self, d_model, vocab_size):                  
        super(Generator, self).__init__()                  
        self.project = nn.Linear(d_model, vocab_size)                  

    def forward(self, x):                  
        return F.log_softmax(self.project(x), dim=-1)


# Transformer 模型

In [30]:
class Transformer(nn.Module):                  
    def __init__(self, encoder: Encoder, decoder: Decoder,                  
                 source_embed: Embedding, target_embed: Embedding,                  
                 source_pos: PositionalEncoding, target_pos: PositionalEncoding,                  
                 generator: Generator):                  
        super(Transformer, self).__init__()                  
        self.encoder = encoder                  
        self.decoder = decoder                  
        self.source_embed = source_embed                  
        self.target_embed = target_embed                  
        self.source_pos = source_pos                  
        self.target_pos = target_pos                  
        self.generator = generator                  

    def encode(self, source, source_mask):                  
        return self.encoder(self.source_pos(self.source_embed(source)), source_mask)                  

    def decode(self, encoder_output, source_mask, target, target_mask):                  
        return self.decoder(self.target_pos(self.target_embed(target)), encoder_output, source_mask, target_mask)                  

    def forward(self, source, target, source_mask, target_mask):                  
        return self.decode(self.encode(source, source_mask), source_mask, target, target_mask)


# 构建模型
参照python编码行业规范，编写构建transformer函数方便初始化

In [31]:
def make_transformer(source_vocab: int, target_vocab: int,                  
                      N: int = 6, d_model: int = 512, d_ff: int = 2048, head: int = 8, dropout: float = 0.1):                  
    source_embed = Embedding(d_model, source_vocab)                  
    target_embed = Embedding(d_model, target_vocab)                  

    source_pos = PositionalEncoding(d_model, dropout)                  
    target_pos = PositionalEncoding(d_model, dropout)                  

    c = copy.deepcopy                  
    attn = MultiHeadAttention(head, d_model)                  
    ff = FeedForward(d_model, d_ff, dropout)                  

    # nn.Sequential是一个顺序容器，用于将多个网络层按顺序组合在一起                  
    model = Transformer(                  
        Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),                  
        Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N),                  
        source_embed, target_embed,                  
        source_pos, target_pos,                  
        Generator(d_model, target_vocab))                  
    return model

# 打印模型结构

In [33]:
if __name__ == '__main__':                  
    source_vocab = 1000                  
    target_vocab = 1000                  
    N = 8                  
    transformer = make_transformer(source_vocab, target_vocab, N)                  
    print(transformer)                  

    # 0维：batch_size，不同的句子                  
    # 1维：句子中的token                  
    torch.random.manual_seed(42)                  
    x = torch.randint(0,1000,(2,4))                  

    mask = torch.zeros(8, 4, 4)                  
    test_result = transformer(x, x, mask, mask)                  
    print(test_result.shape)

Transformer(
  (encoder): Encoder(
    (layers): ModuleList(
      (0-7): 8 x EncoderLayer(
        (self_attn): MultiHeadAttention(
          (w_q): Linear(in_features=512, out_features=512, bias=False)
          (w_k): Linear(in_features=512, out_features=512, bias=False)
          (w_v): Linear(in_features=512, out_features=512, bias=False)
          (w_o): Linear(in_features=512, out_features=512, bias=False)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (feed_forward): FeedForward(
          (linear_1): Linear(in_features=512, out_features=2048, bias=True)
          (linear_2): Linear(in_features=2048, out_features=512, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (sublayer): ModuleList(
          (0-1): 2 x SublayerConnection(
            (norm): LayerNorm()
            (dropout): Dropout(p=0.1, inplace=False)
          )
        )
      )
    )
    (norm): LayerNorm()
  )
  (decoder): Decoder(
    (layers): ModuleList(
  