In [1]:
# 基于CSDN博客《从零实现Transformer：中英文翻译实例》完整代码
# 依赖：PyTorch >= 2.0, 无需额外安装其他库
import math
import random
from dataclasses import dataclass
from typing import List, Tuple
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset

In [2]:
# 固定随机种子，保证复现性
random.seed(0)
torch.manual_seed(0)

<torch._C.Generator at 0x16662d2d6d0>

In [3]:
# -------------------------- 1. 全局常量定义 --------------------------
# 特殊符号索引（与博客一致）
PAD_IDX = 0    # 填充符
BOS_IDX = 1    # 句首符
EOS_IDX = 2    # 句尾符

In [4]:
# -------------------------- 2. 数据预处理模块 --------------------------
# 2.1 构建玩具级中英文平行语料（已经使用空格进行了分词！！！！！！！）
pairs = [
    ("我 有 一个 苹果", "i have an apple"),
    ("我 有 一本 书", "i have a book"),
    ("你 有 一个 苹果", "you have an apple"),
    ("他 有 一个 苹果", "he has an apple"),
    ("她 有 一个 苹果", "she has an apple"),
    ("我们 有 一个 苹果", "we have an apple"),
    ("我 喜欢 苹果", "i like apples"),
    ("我 吃 苹果", "i eat apples"),
    ("你 喜欢 书", "you like books"),
    ("我 喜欢 书", "i like books"),
    ("我 有 两个 苹果", "i have two apples"),
    ("我 有 红色 苹果", "i have red apples"),
]

In [5]:
# 2.2 构建词表（文本→索引 与 索引→文本）
def build_vocab(examples: List[str]) -> Tuple[dict, List[str]]:
    """
    输入：空格分词后的句子列表
    输出：stoi（词→索引）、itos（索引→词）
    """
    tokens = set()# 用set自动去重，避免同一词汇多次出现
    # 遍历所有句子，提取不重复词汇
    for s in examples:
        for t in s.split():# 按空格拆分句子，得到单个词汇（如“我”“有”“一个”“苹果”）
            tokens.add(t.lower())  # 英文统一小写，中文不影响
    # 加入特殊符号，按顺序排序（保证复现性）
    itos = ["<pad>", "<bos>", "<eos>"] + sorted(tokens)#与前面的特殊符号拼接起来
    stoi = {t: i for i, t in enumerate(itos)}#将词与索引一一对应
    return stoi, itos#python的多变量赋值

# 拆分中英文句子，分别构建词表
src_texts = [p[0] for p in pairs]  # 中文句子列表
tgt_texts = [p[1] for p in pairs]  # 英文句子列表
SRC_STOI, SRC_ITOS = build_vocab(src_texts)  # 中文词表
TGT_STOI, TGT_ITOS = build_vocab(tgt_texts)  # 英文词表


In [6]:
# 2.3 句子编码函数（文本→索引序列）
def encode_src(s: str) -> List[int]:
    """编码中文源句子（中文句子→数字索引（源语言编码））"""
    return [SRC_STOI[w.lower()] for w in s.split()]

def encode_tgt(s: str) -> List[int]:
    """编码英文目标句子（英文句子→数字索引（目标语言编码））"""
    return [BOS_IDX] + [TGT_STOI[w.lower()] for w in s.split()] + [EOS_IDX]#把输入的英文句子转换成带特殊标记的数字索引列表

def decode_tgt(ids: List[int]) -> str:
    """解码英文索引序列→文本（数字索引→英文句子（目标语言解码））"""
    special_ids = {PAD_IDX, BOS_IDX, EOS_IDX}
    tokens = [TGT_ITOS[id] for id in ids if id not in special_ids]
    return " ".join(tokens)


In [7]:
# 2.4 数据集类（自定义数据集，适配PyTorch DataLoader）
@dataclass
class Example:
    """单条样本：源语言序列（中文）、目标语言序列（英文）"""
    src: List[int]  # 中文索引序列（无BOS/EOS）
    tgt: List[int]  # 英文索引序列（有BOS/EOS）

class ToyDataset(Dataset):
    def __init__(self, pairs: List[Tuple[str, str]]):# 元组1：(中文str, 英文str)
        self.data = [Example(encode_src(s), encode_tgt(t)) for s, t in pairs]
    
    def __len__(self) -> int:
        return len(self.data)
    
    def __getitem__(self, idx: int) -> Example:
        return self.data[idx]


In [8]:
# 2.5 批量数据处理（collate_fn，解决句子长度不一致问题（向最长序列的长度看齐））
def collate_fn(batch: List[Example]):
    """
    输入：批量Example样本
    输出：src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask（均为Tensor）
    """
    # 计算批量内最长序列长度（用于填充）
    src_max_len = max(len(ex.src) for ex in batch)
    tgt_max_len = max(len(ex.tgt) for ex in batch) - 1  # tgt_in比tgt短1（去掉EOS）

    src_batch = []
    tgt_in_batch = []
    tgt_out_batch = []

    for ex in batch:#遍历样本中的数据来处理每个序列
        # 处理中文源序列（填充到src_max_len）用PAD补长
        src = ex.src + [PAD_IDX] * (src_max_len - len(ex.src))
        # 处理英文目标序列（Teacher Forcing：tgt_in去掉EOS，tgt_out去掉BOS）解码器训练时，输入和标签需要错开一位（用前一个词预测后一个词）翻译模型的关键技巧
        tgt_in = ex.tgt[:-1] + [PAD_IDX] * (tgt_max_len - len(ex.tgt[:-1]))#英文目标输入 [BOS, w1, w2, w3]
        tgt_out = ex.tgt[1:] + [PAD_IDX] * (tgt_max_len - len(ex.tgt[1:]))#英文目标输出 [w1, w2, w3, EOS]
        
        #加入批次列表
        src_batch.append(src)
        tgt_in_batch.append(tgt_in)
        tgt_out_batch.append(tgt_out)

    # 转换为Tensor
    src = torch.tensor(src_batch, dtype=torch.long)
    tgt_in = torch.tensor(tgt_in_batch, dtype=torch.long)
    tgt_out = torch.tensor(tgt_out_batch, dtype=torch.long)
    # 生成填充掩码（标记哪些位置是PAD，模型需要忽略）
    src_pad_mask = src.eq(PAD_IDX) # True表示该位置是PAD（0）
    tgt_pad_mask = tgt_in.eq(PAD_IDX)# 解码器输入的掩码

    return src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask


In [9]:
# 初始化数据加载器
dataset = ToyDataset(pairs)
dataloader = DataLoader(
    dataset,
    batch_size=8,
    shuffle=True,# 训练时打乱样本顺序
    collate_fn=collate_fn# 使用自定义的批量处理函数（填充、拆分、转张量）
)

In [10]:
# -------------------------- 3. Transformer核心组件 --------------------------
# 3.1 位置编码（正弦余弦编码，让模型知道词在句子中的位置）
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 5000, dropout: float = 0.1):#注意区分max_len与批次内最长序列长度（填充长度）的区别
        super().__init__()
        self.dropout = nn.Dropout(dropout)#防止过拟合
        # 预计算位置编码矩阵 (max_len, d_model)全0矩阵
        pe = torch.zeros(max_len, d_model)
        #生成位置索引并将其扩展为列向量（形状：[0,1,2,...,max_len-1] → 扩展为列向量）//.unsqueeze(1)：在第 1 维（列维度）增加一个维度，将形状变为(max_len, 1)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        #计算衰减因子（随着维度i增大，div_term的值呈指数衰减，使高纬度的位置编码变化更为平缓）
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        # 填充位置编码矩阵偶数维正弦，奇数维余弦
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # 扩展为(1, max_len, d_model)
        self.register_buffer("pe", pe)  # 不参与训练的缓冲区（缓冲区会随模型保存 / 加载，但不参与反向传播（即不会被优化器更新），位置编码固定，不需要训练）

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """x: (B, L, C) → 加位置编码后返回（批次大小、序列长度、词向量维度）"""
        x = x + self.pe[:, :x.size(1), :]#将词向量与位置编码相加，使词向量同时包含位置信息和语义信息
        return self.dropout(x)


In [11]:
# 3.2 缩放点积注意力（单头，将点积结果缩小到合理范围内）
class ScaledDotProductAttention(nn.Module):
    def __init__(self, dropout: float = 0.1):
        super().__init__()
        self.dropout = nn.Dropout(dropout)

    def forward(self, Q: torch.Tensor, K: torch.Tensor, V: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
        """
        Q: (B, H, Lq, Dh) B：批次大小；H：注意力头数；Lq：查询序列长度；Dh：每个头的维度（d_model / H）
        K: (B, H, Lk, Dh)
        V: (B, H, Lk, Dh)
        mask: 可广播到(B, H, Lq, Lk)的布尔掩码（True表示屏蔽）
        返回：(B, H, Lq, Dh)
        """
        d_k = Q.size(-1) # 获取每个头的维度Dh（Q的最后一个维度）
        # 计算注意力分数：QK^T / sqrt(d_k)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)#避免梯度消失（softmax 在大数值下会趋于饱和，梯度接近 0）
        # 应用掩码（屏蔽位填充-∞，softmax后接近0）；屏蔽PAD、解码器中的未来位置
        if mask is not None:
            scores = scores.masked_fill(mask, float("-inf"))
        # 计算注意力权重并应用dropout
        attn_weights = torch.softmax(scores, dim=-1)
        attn_weights = self.dropout(attn_weights)
        # 注意力加权求和（与V相乘）
        out = torch.matmul(attn_weights, V)
        return out


In [12]:
# 3.3 多头注意力（多头注意力将d_model拆分为nhead个d_head，每个头独立学习一种模式）
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, nhead: int, dropout: float = 0.1):
        super().__init__()
        assert d_model % nhead == 0, "d_model必须能被nhead整除"
        self.d_model = d_model#词向量总维度
        self.nhead = nhead#注意力头数
        self.d_head = d_model // nhead  # 每个头的维度

        # Q、K、V线性变换层（将输入映射到d_model维度，为拆分多头做准备）
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        # 缩放点积注意力
        self.attn = ScaledDotProductAttention(dropout)
        # 合并多头并输出投影层
        self.proj = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def _shape(self, x: torch.Tensor) -> torch.Tensor:
        """(B, L, C) → (B, H, L, Dh)：拆分多头"""# B：批次大小，L：序列长度，C：总维度（d_model），H：头数，Dh：每个头维度
        B, L, C = x.shape
        return x.view(B, L, self.nhead, self.d_head).transpose(1, 2)

    def _merge(self, x: torch.Tensor) -> torch.Tensor:
        """(B, H, L, Dh) → (B, L, C)：合并多头"""
        B, H, L, Dh = x.shape
        return x.transpose(1, 2).contiguous().view(B, L, H * Dh)# 交换维度（transpose）后，张量在内存中可能不连续，contiguous()确保内存连续，避免view操作报错

    def _build_attn_mask(self, Lq: int, Lk: int, attn_mask: torch.Tensor = None, key_padding_mask: torch.Tensor = None, device: torch.device = None) -> torch.Tensor:
        """合并因果掩码和填充掩码：将原始的因果掩码和填充掩码转换为符合注意力计算的形状，并合并两种掩码"""
        mask = None
        # 处理因果掩码 (Lq, Lk) → (1, 1, Lq, Lk)
        if attn_mask is not None:
            mask = attn_mask.to(device).unsqueeze(0).unsqueeze(0)
        # 处理填充掩码 (B, Lk) → (B, 1, 1, Lk)
        if key_padding_mask is not None:
            pad_mask = key_padding_mask.to(device).unsqueeze(1).unsqueeze(1)
            mask = pad_mask if mask is None else (mask | pad_mask)
        return mask

    def forward(self, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, attn_mask: torch.Tensor = None, key_padding_mask: torch.Tensor = None) -> torch.Tensor:
        """
        输入：
        - query/key/value：形状均为(B, L, C)（B=批次，L=序列长度，C=d_model）
        - attn_mask：因果掩码（可选，解码器用）
        - key_padding_mask：填充掩码（可选，所有层用）
        输出：形状为(B, L, C)（与输入query形状一致）
        """
        device = query.device
        Lq = query.size(1)
        Lk = key.size(1)

        # 1. 线性变换 + 拆分多头（从总维度→多头维度）
        Q = self._shape(self.w_q(query))
        K = self._shape(self.w_k(key))
        V = self._shape(self.w_v(value))

        # 2. 构建合并掩码（适配多头形状）
        mask = self._build_attn_mask(Lq, Lk, attn_mask, key_padding_mask, device)

        # 3. 计算单头注意力（调用之前实现的ScaledDotProductAttention）
        # 每个头独立计算注意力，输出形状：(B, H, Lq, Dh)
        attn_out = self.attn(Q, K, V, mask)

        # 4. 合并多头 + 输出投影（从多头维度→总维度）
        out = self._merge(attn_out)
        out = self.proj(out)
        out = self.dropout(out)

        return out


In [13]:
# 3.4 前馈网络（升维→激活→降维）
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model: int, dim_ff: int, dropout: float = 0.1):
        super().__init__()
        self.fc1 = nn.Linear(d_model, dim_ff) # 第一层全连接：升维（d_model → dim_ff）
        self.fc2 = nn.Linear(dim_ff, d_model) # 第二层全连接：降维（dim_ff → d_model）
        self.act = nn.ReLU()# 非线性激活函数
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        输入x形状：(B, L, C) → B=批次，L=序列长度，C=d_model
        输出形状：(B, L, C) → 与输入形状一致，便于残差连接
        """
        x = self.fc1(x)# 步骤1：升维 → (B, L, dim_ff)
        x = self.act(x)# 步骤2：非线性激活 → 引入非线性特征
        x = self.dropout(x)# 步骤3：Dropout → 防止过拟合
        x = self.fc2(x)# 步骤4：降维 → 回到d_model维度
        x = self.dropout(x)# 步骤5：再次Dropout → 进一步抑制过拟合
        return x

In [14]:
# 3.5 编码器层（自注意力捕捉全局依赖 + 前馈网络提取局部特征；残差连接 + 层归一化）
class EncoderLayer(nn.Module):
    def __init__(self, d_model: int, nhead: int, dim_ff: int, dropout: float = 0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, nhead, dropout) # 多头自注意力模块
        self.norm1 = nn.LayerNorm(d_model) # 第一层层归一化（用于自注意力之后）
        self.ff = PositionwiseFeedForward(d_model, dim_ff, dropout) # 前馈网络模块
        self.norm2 = nn.LayerNorm(d_model) # 第二层层归一化（用于前馈网络之后）

    def forward(self, x: torch.Tensor, src_key_padding_mask: torch.Tensor = None) -> torch.Tensor:
        """
        x: (B, S, C)
        src_key_padding_mask: (B, S)
        返回：(B, S, C)
        """
        # 自注意力 + 残差 + 层归一化（Query=Key=Value=x（输入序列自己与自己做注意力））
        attn_out = self.self_attn(x, x, x, key_padding_mask=src_key_padding_mask)
        # 残差连接（x + 注意力输出）+ 层归一化
        x = self.norm1(x + attn_out)
        #  前馈网络处理：对每个位置的特征独立做非线性变换
        ff_out = self.ff(x)
        #  残差连接（x + 前馈输出）+ 层归一化
        x = self.norm2(x + ff_out)
        return x


In [15]:
# 3.6 编码器（多层EncoderLayer堆叠）
class Encoder(nn.Module):
    def __init__(self, d_model: int, nhead: int, dim_ff: int, num_layers: int, dropout: float = 0.1):
        super().__init__()
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, nhead, dim_ff, dropout) for _ in range(num_layers)
        ])

    def forward(self, x: torch.Tensor, src_key_padding_mask: torch.Tensor = None) -> torch.Tensor:
        """
        输入x：(B, S, C) → B=批次，S=序列长度，C=d_model（经过词嵌入+位置编码的序列）
        src_key_padding_mask：(B, S) → 填充掩码，用于屏蔽所有层的PAD位置
        输出：(B, S, C) → 经过所有层处理后的最终序列表示
        """
        for layer in self.layers:
            # 让序列依次经过每个编码器层，每层的输出作为下一层的输入
            x = layer(x, src_key_padding_mask=src_key_padding_mask)#掩码共享，确保每个层都不会关注PAD填充符
        return x

In [16]:
# 3.7 解码器层（双注意力机制，多层堆叠）
class DecoderLayer(nn.Module):
    def __init__(self, d_model: int, nhead: int, dim_ff: int, dropout: float = 0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, nhead, dropout)  # 解码器自注意力
        self.norm1 = nn.LayerNorm(d_model)
        
        self.cross_attn = MultiHeadAttention(d_model, nhead, dropout)  # 交叉注意力（连接编码器和解码器）
        self.norm2 = nn.LayerNorm(d_model)
        
        self.ff = PositionwiseFeedForward(d_model, dim_ff, dropout)
        self.norm3 = nn.LayerNorm(d_model)

    def forward(self, x: torch.Tensor, memory: torch.Tensor, tgt_mask: torch.Tensor = None, tgt_key_padding_mask: torch.Tensor = None, memory_key_padding_mask: torch.Tensor = None) -> torch.Tensor:
        """
        x: (B, T, C) → 解码器输入
        memory: (B, S, C) → 编码器输出
        返回：(B, T, C)
        """
        # 1. 解码器自注意力（带因果掩码，关注自身已生成序列）
        sa_out = self.self_attn(x, x, x, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask)
        x = self.norm1(x + sa_out)

        # 2. 交叉注意力（Q来自解码器，K/V来自编码器）
        ca_out = self.cross_attn(x, memory, memory, key_padding_mask=memory_key_padding_mask)
        x = self.norm2(x + ca_out)

        # 3. 前馈网络
        ff_out = self.ff(x)
        x = self.norm3(x + ff_out)

        return x

# 3.8 解码器（多层DecoderLayer堆叠）
class Decoder(nn.Module):
    def __init__(self, d_model: int, nhead: int, dim_ff: int, num_layers: int, dropout: float = 0.1):
        super().__init__()
        self.layers = nn.ModuleList([  #自动将内部的DecoderLayer注册为模型子模块，确保训练时参数能被优化器更新
            DecoderLayer(d_model, nhead, dim_ff, dropout) for _ in range(num_layers)
        ])

    def forward(self, x: torch.Tensor, memory: torch.Tensor, tgt_mask: torch.Tensor = None, tgt_key_padding_mask: torch.Tensor = None, memory_key_padding_mask: torch.Tensor = None) -> torch.Tensor:
        """x: (B, T, C) → 经过所有解码器层后返回"""
        for layer in self.layers:
            x = layer(x, memory, tgt_mask=tgt_mask, tgt_key_padding_mask=tgt_key_padding_mask, memory_key_padding_mask=memory_key_padding_mask)
        return x


In [17]:
# 3.9 完整Transformer翻译模型（用编码器提取源语言全局特征，用解码器结合源语言特征逐词生成目标语言，最终通过贪心策略输出翻译结果）
class Seq2SeqTransformer(nn.Module):
    def __init__(self, src_vocab_size: int, tgt_vocab_size: int, d_model: int = 128, nhead: int = 4, num_encoder_layers: int = 2, num_decoder_layers: int = 2, dim_ff: int = 256, dropout: float = 0.1):
        super().__init__()
        self.d_model = d_model
        # 词嵌入层（指定填充符，不更新梯度）
        self.src_tok = nn.Embedding(src_vocab_size, d_model, padding_idx=PAD_IDX)#中文词嵌入
        self.tgt_tok = nn.Embedding(tgt_vocab_size, d_model, padding_idx=PAD_IDX)#英文词嵌入
        # 位置编码层：为词嵌入注入位置信息（Transformer无顺序感知，需显式添加）
        self.pos_enc = PositionalEncoding(d_model, dropout=dropout)
        # 编码器和解码器
        self.encoder = Encoder(d_model, nhead, dim_ff, num_encoder_layers, dropout)
        self.decoder = Decoder(d_model, nhead, dim_ff, num_decoder_layers, dropout)
        # 生成器（将解码器输出的d_model维特征映射到目标词表维度，用于预测下一个词）
        self.generator = nn.Linear(d_model, tgt_vocab_size)

    def make_subsequent_mask(self, sz: int) -> torch.Tensor:#生成掩码
        """生成下三角因果掩码（sz×sz），True表示屏蔽未来位置"""
        # torch.triu：生成上三角矩阵（对角线diagonal=1以上为1，以下为0）
        # 转换为bool类型后，True表示需要屏蔽的未来位置
        return torch.triu(torch.ones(sz, sz, dtype=torch.bool), diagonal=1)

    def forward(self, src: torch.Tensor, tgt_in: torch.Tensor, src_pad_mask: torch.Tensor, tgt_pad_mask: torch.Tensor) -> torch.Tensor:
        """
        训练阶段前向传播：
        src: (B, S) → 中文源序列
        tgt_in: (B, T) → 英文解码器输入（含BOS）
        返回：(B, T, tgt_vocab_size) → 词表概率分布
        """
        # 1. 词嵌入 + 位置编码（为词汇添加语义和位置信息）
        src_emb = self.pos_enc(self.src_tok(src))  # 源语言：索引→词嵌入→加位置编码 → (B, S, d_model)
        tgt_emb = self.pos_enc(self.tgt_tok(tgt_in))  # 目标语言：索引→词嵌入→加位置编码 → (B, T, C)

        # 2. 编码器编码：编码器处理源语言序列，输出源语言特征表示（memory）
        memory = self.encoder(src_emb, src_key_padding_mask=src_pad_mask)  # (B, S, C)
        # memory将传递给解码器，作为交叉注意力的K/V
        
        # 3. 生成解码器因果掩码（适配目标序列长度）
        tgt_mask = self.make_subsequent_mask(tgt_in.size(1)).to(src.device)  # (T, T)，确保与输入在同一设备

        # 4. 解码器解码：解码器处理目标语言序列，结合memory生成目标语言特征
        out = self.decoder(
            tgt_emb,# 目标语言嵌入（含位置编码）
            memory,# 编码器输出的源语言特征
            tgt_mask=tgt_mask,# 因果掩码（屏蔽未来词）
            tgt_key_padding_mask=tgt_pad_mask,# 目标序列PAD掩码
            memory_key_padding_mask=src_pad_mask# 源序列PAD掩码（交叉注意力屏蔽源PAD）
        )  # (B, T, C)

        # 5. 映射到词表，得到预测分数
        logits = self.generator(out)  # (B, T, tgt_vocab_size)
        return logits

    @torch.no_grad()#推理时不计算梯度（节省资源）
    def greedy_decode(self, src_ids: List[int], max_len: int = 20, device: str = "cpu") -> List[int]:
        """
        推理阶段贪心解码（每次生成一个词时，都选择当前概率最高的词）：
        src_ids: 中文源序列索引（无BOS/EOS）
        返回：英文目标序列索引（含BOS/EOS）
        """
        self.eval()  # 切换评估模式（Dropout等层随机失活）
        # 1. 处理源语言序列（添加批次维度，生成PAD掩码）
        src = torch.tensor(src_ids, dtype=torch.long, device=device).unsqueeze(0)  # (1, S)
        src_pad_mask = src.eq(PAD_IDX)  # (1, S)

        # 2. 编码器编码
        src_emb = self.src_tok(src)
        src_emb = self.pos_enc(src_emb)
        memory = self.encoder(src_emb, src_key_padding_mask=src_pad_mask)  # (1, S, C)

        # 3. 初始化解码器输入（从BOS开始）
        ys = torch.tensor([[BOS_IDX]], dtype=torch.long, device=device)  # (1, 1) 当前已生成的目标序列前缀

        # 4. 逐词生成目标序列（直到生成EOS或达到max_len）
        for _ in range(max_len - 1):
            # 生成掩码
            tgt_pad_mask = ys.eq(PAD_IDX)  # (1, T)
            tgt_mask = self.make_subsequent_mask(ys.size(1)).to(device)  # (T, T)

            # 解码器前向（输入当前生成的序列前缀）
            tgt_emb = self.tgt_tok(ys)
            tgt_emb = self.pos_enc(tgt_emb)
            out = self.decoder(
                tgt_emb,
                memory,
                tgt_mask=tgt_mask,
                tgt_key_padding_mask=tgt_pad_mask,
                memory_key_padding_mask=src_pad_mask
            )  # (1, T, C)

            # 贪心选择概率最大的词
            logits = self.generator(out[:, -1:, :])  # (1, 1, tgt_vocab_size)
            next_token = logits.argmax(-1)  # (1, 1)
            next_id = next_token.item() # 转换为Python整数

            # 拼接结果
            ys = torch.cat([ys, next_token], dim=1)  # (1, T+1)（序列长度+1）

            # 遇到EOS停止
            if next_id == EOS_IDX:
                break

        return ys.squeeze(0).tolist()  # 去掉批次维度，返回索引列表

In [18]:
# -------------------------- 4. 模型训练与推理 --------------------------
# 4.1 训练配置（博客原参数）
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Seq2SeqTransformer(
    src_vocab_size=len(SRC_ITOS),
    tgt_vocab_size=len(TGT_ITOS),
    d_model=6,  # 博客用小维度，加快训练
    nhead=3,    # 6能被3整除
    num_encoder_layers=2,
    num_decoder_layers=2,
    dim_ff=256,
    dropout=0.1
).to(device)


In [19]:
# 损失函数（忽略填充位）和优化器：loss_backword关联criterion与optimizer(数据流向)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX) #计算模型预测与真实目标序列的损失（误差），用于指导模型参数更新
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)

In [20]:
# 4.2 辅助函数：测试翻译效果
def evaluate_sample(sent: str = "我 有 一个 苹果"):
    """输入中文句子，输出翻译结果"""
    src_ids = encode_src(sent) # 编码（返回索引列表）
    pred_ids = model.greedy_decode(src_ids, device=device) # 推理（将中文索引序列转换为英文索引序列）
    pred_text = decode_tgt(pred_ids) # 解码（转换为翻译后的英文句子）
    print(f"输入中文：{sent}")
    print(f"输出英文：{pred_text}\n")


In [21]:
# 4.3 训练循环（博客原逻辑，800轮易过拟合玩具语料）
print("训练前测试（随机参数，结果无意义）：")
evaluate_sample()


训练前测试（随机参数，结果无意义）：
输入中文：我 有 一个 苹果
输出英文：eat has eat has eat has eat has has



In [22]:
EPOCHS = 800
## 完成每轮的训练：遍历所有训练数据（dataloader），完成 “数据加载→前向传播→损失计算→反向传播→参数更新” 的全流程
for epoch in range(1, EPOCHS + 1):
    model.train() # 模型切换为训练模式
    total_loss = 0.0 # 用于累加当前轮所有批次的损失
    for src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask in dataloader:
        # 数据移到目标设备（CPU），避免设备不匹配
        src = src.to(device)
        tgt_in = tgt_in.to(device)
        tgt_out = tgt_out.to(device)
        src_pad_mask = src_pad_mask.to(device)
        tgt_pad_mask = tgt_pad_mask.to(device)

        # 前向传播
        logits = model(src, tgt_in, src_pad_mask, tgt_pad_mask)
        # 计算损失（展平维度）；criterion调用交叉熵损失函数，计算展平后的预测与标签的损失（自动忽略PAD_IDX位置）
        loss = criterion(logits.reshape(-1, logits.size(-1)), tgt_out.reshape(-1))

        # 反向传播与优化
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)  # 梯度裁剪，防止梯度爆炸
        optimizer.step() # 优化器根据梯度更新模型参数

        total_loss += loss.item()

    # 每5轮打印损失并测试
    if epoch % 5 == 0 or epoch == 1:
        avg_loss = total_loss / len(dataloader) # 总损失除以批次数量
        print(f"Epoch {epoch:03d} | 平均损失：{avg_loss:.4f}")
        evaluate_sample()

Epoch 001 | 平均损失：3.0474
输入中文：我 有 一个 苹果
输出英文：eat has eat has eat has

Epoch 005 | 平均损失：3.0091
输入中文：我 有 一个 苹果
输出英文：eat has

Epoch 010 | 平均损失：2.7799
输入中文：我 有 一个 苹果
输出英文：

Epoch 015 | 平均损失：2.7534
输入中文：我 有 一个 苹果
输出英文：

Epoch 020 | 平均损失：2.7303
输入中文：我 有 一个 苹果
输出英文：

Epoch 025 | 平均损失：2.7083
输入中文：我 有 一个 苹果
输出英文：

Epoch 030 | 平均损失：2.6955
输入中文：我 有 一个 苹果
输出英文：i

Epoch 035 | 平均损失：2.6160
输入中文：我 有 一个 苹果
输出英文：i

Epoch 040 | 平均损失：2.5827
输入中文：我 有 一个 苹果
输出英文：i

Epoch 045 | 平均损失：2.5210
输入中文：我 有 一个 苹果
输出英文：i

Epoch 050 | 平均损失：2.6409
输入中文：我 有 一个 苹果
输出英文：i

Epoch 055 | 平均损失：2.5986
输入中文：我 有 一个 苹果
输出英文：i

Epoch 060 | 平均损失：2.5364
输入中文：我 有 一个 苹果
输出英文：i

Epoch 065 | 平均损失：2.5057
输入中文：我 有 一个 苹果
输出英文：i

Epoch 070 | 平均损失：2.4566
输入中文：我 有 一个 苹果
输出英文：i

Epoch 075 | 平均损失：2.4549
输入中文：我 有 一个 苹果
输出英文：i

Epoch 080 | 平均损失：2.3990
输入中文：我 有 一个 苹果
输出英文：i

Epoch 085 | 平均损失：2.4899
输入中文：我 有 一个 苹果
输出英文：i

Epoch 090 | 平均损失：2.3931
输入中文：我 有 一个 苹果
输出英文：i

Epoch 095 | 平均损失：2.3222
输入中文：我 有 一个 苹果
输出英文：i

Epoch 100 | 平均损失：2.2786
输入中文：我 有 一个 

Epoch 735 | 平均损失：1.2282
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 740 | 平均损失：1.3981
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 745 | 平均损失：1.2050
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 750 | 平均损失：1.1916
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 755 | 平均损失：1.1528
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 760 | 平均损失：1.2630
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 765 | 平均损失：1.1730
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 770 | 平均损失：1.2385
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 775 | 平均损失：1.2293
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 780 | 平均损失：1.1252
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 785 | 平均损失：1.2431
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 790 | 平均损失：1.0725
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 795 | 平均损失：1.1057
输入中文：我 有 一个 苹果
输出英文：i have an apple

Epoch 800 | 平均损失：1.0885
输入中文：我 有 一个 苹果
输出英文：i have an apple



In [23]:
# 4.4 最终测试（任意输入中文句子）
print("训练完成！测试其他句子：")
test_sents = ["我 喜欢 苹果", "你 有 一本 书", "他 吃 苹果"]
for sent in test_sents:
    evaluate_sample(sent)

训练完成！测试其他句子：
输入中文：我 喜欢 苹果
输出英文：i have have apples

输入中文：你 有 一本 书
输出英文：i have an apple

输入中文：他 吃 苹果
输出英文：she has an apple



In [None]:
# 在本demo中，主要是为了展示Transformer在翻译中的具体应用，设置的训练集较为简单，所得结果精度不高