# The Annotated Transformer - 带注释的Transformer实现

这是一个基于"Attention is All You Need"论文的完整Transformer实现。

本notebook包含：
- 完整的模型架构实现
- 训练和推理代码
- 可视化工具
- 详细的中文注释

## 1. 导入必要的库

In [1]:
# 基础库
import os
from os.path import exists
import math
import copy
import time
import warnings
warnings.filterwarnings("ignore")

# PyTorch相关
import torch
import torch.nn as nn
from torch.nn.functional import log_softmax, pad
from torch.optim.lr_scheduler import LambdaLR

# 数据处理和可视化
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 设置随机种子以保证可复现性
torch.manual_seed(42)
np.random.seed(42)

# 检查CUDA是否可用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
if torch.cuda.is_available():
    print(f"GPU名称: {torch.cuda.get_device_name(0)}")

使用设备: cuda
GPU名称: NVIDIA GeForce RTX 3090


## 2. 核心组件实现

### 2.1 Encoder-Decoder架构基础

In [2]:
class EncoderDecoder(nn.Module):
    """
    标准的Encoder-Decoder架构
    这是Transformer的顶层结构
    """
    def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
        super(EncoderDecoder, self).__init__()
        self.encoder = encoder  # 编码器
        self.decoder = decoder  # 解码器
        self.src_embed = src_embed  # 源语言嵌入层
        self.tgt_embed = tgt_embed  # 目标语言嵌入层
        self.generator = generator  # 输出生成器
    
    def forward(self, src, tgt, src_mask, tgt_mask):
        """处理masked的源序列和目标序列"""
        # 先编码，再解码
        return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)
    
    def encode(self, src, src_mask):
        """编码源序列"""
        return self.encoder(self.src_embed(src), src_mask)
    
    def decode(self, memory, src_mask, tgt, tgt_mask):
        """解码目标序列"""
        return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)

In [3]:
class Generator(nn.Module):
    """
    标准的线性层 + softmax生成步骤
    用于将decoder输出转换为词汇表上的概率分布
    """
    def __init__(self, d_model, vocab):
        super(Generator, self).__init__()
        self.proj = nn.Linear(d_model, vocab)  # 投影到词汇表大小
    
    def forward(self, x):
        # 使用log_softmax而不是softmax，数值更稳定
        return log_softmax(self.proj(x), dim=-1)

### 2.2 辅助函数

In [None]:
def clones(module, N):
    """
    产生N个相同的层（深拷贝）
    用于创建多个相同的encoder/decoder层
    """
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

### 2.3 Layer Normalization

In [None]:
class LayerNorm(nn.Module):
    """
    Layer Normalization层
    对每个样本的特征维度进行归一化，而不是像Batch Norm那样对batch维度归一化
    """
    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()
        # 可学习的缩放和平移参数
        self.a_2 = nn.Parameter(torch.ones(features))  # gamma
        self.b_2 = nn.Parameter(torch.zeros(features))  # beta
        self.eps = eps  # 防止除零的小常数
    
    def forward(self, x):
        # 计算最后一个维度的均值和标准差
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        # 归一化并应用可学习的仿射变换
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2

### 2.4 残差连接

In [None]:
class SublayerConnection(nn.Module):
    """
    残差连接 + Layer Norm
    为了代码简洁，这里先做norm再做残差连接（与原论文顺序相反，但效果相似）
    输出: LayerNorm(x + Sublayer(x))
    """
    def __init__(self, size, dropout):
        super(SublayerConnection, self).__init__()
        self.norm = LayerNorm(size)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, sublayer):
        """
        应用残差连接到任何相同大小的子层
        sublayer是一个函数，接受x作为输入
        """
        return x + self.dropout(sublayer(self.norm(x)))

### 2.5 Encoder实现

In [None]:
class Encoder(nn.Module):
    """
    Encoder由N个相同的层堆叠而成
    """
    def __init__(self, layer, N):
        super(Encoder, self).__init__()
        self.layers = clones(layer, N)  # 克隆N个encoder层
        self.norm = LayerNorm(layer.size)  # 最后的LayerNorm
    
    def forward(self, x, mask):
        """依次通过每一层"""
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

In [None]:
class EncoderLayer(nn.Module):
    """
    Encoder层由两个子层组成：
    1. Multi-head self-attention
    2. Position-wise feed-forward network
    """
    def __init__(self, size, self_attn, feed_forward, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = self_attn  # 自注意力层
        self.feed_forward = feed_forward  # 前馈网络
        self.sublayer = clones(SublayerConnection(size, dropout), 2)  # 两个残差连接
        self.size = size
    
    def forward(self, x, mask):
        """按照图1（左）的连接方式"""
        # 第一个子层：self-attention
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
        # 第二个子层：feed-forward
        return self.sublayer[1](x, self.feed_forward)

### 2.6 Decoder实现

In [None]:
class Decoder(nn.Module):
    """
    Decoder由N个相同的层堆叠而成，带有masking
    """
    def __init__(self, layer, N):
        super(Decoder, self).__init__()
        self.layers = clones(layer, N)
        self.norm = LayerNorm(layer.size)
    
    def forward(self, x, memory, src_mask, tgt_mask):
        """
        x: 目标序列
        memory: encoder的输出
        src_mask: 源序列的mask
        tgt_mask: 目标序列的mask（防止看到未来信息）
        """
        for layer in self.layers:
            x = layer(x, memory, src_mask, tgt_mask)
        return self.norm(x)

In [None]:
class DecoderLayer(nn.Module):
    """
    Decoder层由三个子层组成：
    1. Masked multi-head self-attention (只能看到之前的位置)
    2. Multi-head cross-attention (关注encoder的输出)
    3. Position-wise feed-forward network
    """
    def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
        super(DecoderLayer, self).__init__()
        self.size = size
        self.self_attn = self_attn  # 自注意力
        self.src_attn = src_attn    # 交叉注意力（关注encoder输出）
        self.feed_forward = feed_forward
        self.sublayer = clones(SublayerConnection(size, dropout), 3)  # 三个残差连接
    
    def forward(self, x, memory, src_mask, tgt_mask):
        """按照图1（右）的连接方式"""
        m = memory
        # 第一个子层：masked self-attention
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
        # 第二个子层：encoder-decoder attention
        x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask))
        # 第三个子层：feed-forward
        return self.sublayer[2](x, self.feed_forward)

### 2.7 Attention机制

这是Transformer的核心！

In [None]:
def subsequent_mask(size):
    """
    创建一个mask来隐藏后续位置的信息
    用于decoder的自注意力中，确保位置i只能关注位置<=i的内容
    
    返回值是一个下三角矩阵（对角线及以下为True，其余为False）
    """
    attn_shape = (1, size, size)
    # torch.triu创建上三角矩阵，diagonal=1表示对角线上方一位开始
    subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(torch.uint8)
    # 取反，得到下三角矩阵
    return subsequent_mask == 0

In [None]:
def attention(query, key, value, mask=None, dropout=None):
    """
    计算Scaled Dot-Product Attention
    
    公式: Attention(Q, K, V) = softmax(Q*K^T / sqrt(d_k)) * V
    
    参数:
        query: 查询矩阵 (batch, heads, seq_len, d_k)
        key: 键矩阵 (batch, heads, seq_len, d_k)
        value: 值矩阵 (batch, heads, seq_len, d_v)
        mask: 可选的mask，用于遮蔽某些位置
        dropout: 可选的dropout层
    
    返回:
        attention输出和attention权重
    """
    d_k = query.size(-1)  # d_k是key的维度
    
    # 1. 计算attention分数: Q * K^T / sqrt(d_k)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    
    # 2. 应用mask（如果有）
    if mask is not None:
        # 将mask为0的位置设置为一个很小的负数，softmax后会接近0
        scores = scores.masked_fill(mask == 0, -1e9)
    
    # 3. 应用softmax得到attention权重
    p_attn = scores.softmax(dim=-1)
    
    # 4. 应用dropout（如果有）
    if dropout is not None:
        p_attn = dropout(p_attn)
    
    # 5. 用attention权重加权value
    return torch.matmul(p_attn, value), p_attn

In [None]:
class MultiHeadedAttention(nn.Module):
    """
    Multi-Head Attention
    
    将d_model维度分成h个头，每个头的维度是d_k = d_model / h
    多个头可以让模型关注不同位置的不同表示子空间的信息
    """
    def __init__(self, h, d_model, dropout=0.1):
        super(MultiHeadedAttention, self).__init__()
        assert d_model % h == 0  # 确保可以整除
        
        # 假设d_v总是等于d_k
        self.d_k = d_model // h
        self.h = h
        
        # 创建4个线性层：Q、K、V的投影 + 最后的输出投影
        self.linears = clones(nn.Linear(d_model, d_model), 4)
        
        self.attn = None  # 保存attention权重，用于可视化
        self.dropout = nn.Dropout(p=dropout)
    
    def forward(self, query, key, value, mask=None):
        """
        实现Multi-Head Attention
        """
        if mask is not None:
            # 所有h个头使用相同的mask
            mask = mask.unsqueeze(1)
        
        nbatches = query.size(0)
        
        # 1. 对Q、K、V做线性投影，并分成h个头
        #    从 (batch, seq_len, d_model) 变为 (batch, h, seq_len, d_k)
        query, key, value = [
            lin(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
            for lin, x in zip(self.linears, (query, key, value))
        ]
        
        # 2. 在所有投影的向量上并行应用attention
        x, self.attn = attention(
            query, key, value, mask=mask, dropout=self.dropout
        )
        
        # 3. "Concat"：将多个头的输出拼接起来
        #    从 (batch, h, seq_len, d_k) 变回 (batch, seq_len, d_model)
        x = (
            x.transpose(1, 2)
            .contiguous()
            .view(nbatches, -1, self.h * self.d_k)
        )
        
        # 删除中间变量以节省内存
        del query, key, value
        
        # 4. 应用最后的线性层
        return self.linears[-1](x)

### 2.8 Position-wise Feed-Forward Networks

In [None]:
class PositionwiseFeedForward(nn.Module):
    """
    Position-wise Feed-Forward Network
    
    FFN(x) = max(0, xW1 + b1)W2 + b2
    
    对每个位置独立地应用两个线性变换，中间有ReLU激活
    可以看作是两个kernel_size=1的卷积层
    """
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.w_1 = nn.Linear(d_model, d_ff)  # 第一个线性层，扩展维度
        self.w_2 = nn.Linear(d_ff, d_model)  # 第二个线性层，恢复维度
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        # 先升维+ReLU，再降维
        return self.w_2(self.dropout(self.w_1(x).relu()))

### 2.9 Embeddings和Positional Encoding

In [None]:
class Embeddings(nn.Module):
    """
    标准的embedding层
    将token索引转换为d_model维的向量
    """
    def __init__(self, d_model, vocab):
        super(Embeddings, self).__init__()
        self.lut = nn.Embedding(vocab, d_model)  # lookup table
        self.d_model = d_model
    
    def forward(self, x):
        # 乘以sqrt(d_model)是论文中的技巧，让embedding和positional encoding的scale相近
        return self.lut(x) * math.sqrt(self.d_model)

In [None]:
class PositionalEncoding(nn.Module):
    """
    位置编码
    
    由于Transformer没有循环或卷积结构，需要注入位置信息
    使用不同频率的sin和cos函数：
    
    PE(pos, 2i)   = sin(pos / 10000^(2i/d_model))
    PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))
    
    这样每个维度对应一个正弦波，波长从2π到10000·2π
    """
    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        
        # 在log空间计算位置编码（数值更稳定）
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)  # (max_len, 1)
        
        # 计算除数项：10000^(2i/d_model)
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
        )
        
        # 偶数位置用sin，奇数位置用cos
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        
        # 注册为buffer，不是模型参数，但会被保存到state_dict
        self.register_buffer("pe", pe)
    
    def forward(self, x):
        # 将位置编码加到embedding上
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return self.dropout(x)

### 2.10 完整模型构建函数

In [None]:
def make_model(
    src_vocab, tgt_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1
):
    """
    从超参数构建完整的Transformer模型
    
    参数:
        src_vocab: 源语言词汇表大小
        tgt_vocab: 目标语言词汇表大小
        N: encoder和decoder的层数（默认6）
        d_model: 模型维度（默认512）
        d_ff: feed-forward网络的中间维度（默认2048）
        h: attention头数（默认8）
        dropout: dropout概率（默认0.1）
    """
    c = copy.deepcopy
    
    # 创建各个组件
    attn = MultiHeadedAttention(h, d_model)
    ff = PositionwiseFeedForward(d_model, d_ff, dropout)
    position = PositionalEncoding(d_model, dropout)
    
    # 组装模型
    model = EncoderDecoder(
        Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),
        Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N),
        nn.Sequential(Embeddings(d_model, src_vocab), c(position)),
        nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)),
        Generator(d_model, tgt_vocab),
    )
    
    # 重要：使用Glorot/Xavier初始化参数
    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)
    
    return model

## 3. 可视化辅助工具

### 3.1 可视化Mask矩阵

In [None]:
def visualize_mask(size=20):
    """
    可视化subsequent mask
    这个mask用于decoder，确保位置i只能看到位置<=i的信息
    """
    mask = subsequent_mask(size)
    plt.figure(figsize=(8, 8))
    plt.imshow(mask[0].cpu().numpy(), cmap='YlGn', interpolation='nearest')
    plt.title('Subsequent Mask\n(黄色=可见, 绿色=被遮蔽)', fontsize=14)
    plt.xlabel('当前位置可以看到的位置 (列)')
    plt.ylabel('当前位置 (行)')
    plt.colorbar()
    
    # 添加说明文字
    plt.text(size//2, -2, 
             '对角线及以下为True(黄色)：位置i可以看到位置<=i\n对角线以上为False(绿色)：位置i看不到位置>i',
             ha='center', fontsize=10, bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
    
    plt.tight_layout()
    plt.show()

# 显示mask
visualize_mask(20)

### 3.2 可视化Positional Encoding

In [None]:
def visualize_positional_encoding(d_model=20, max_len=100):
    """
    可视化位置编码的sin/cos波形
    不同维度有不同的频率，形成独特的位置表示
    """
    pe = PositionalEncoding(d_model, dropout=0)
    y = pe.forward(torch.zeros(1, max_len, d_model))
    
    # 绘制几个不同维度的波形
    plt.figure(figsize=(15, 5))
    
    # 选择4个不同的维度来展示
    dims_to_show = [4, 5, 6, 7]
    
    for dim in dims_to_show:
        plt.plot(y[0, :, dim].numpy(), label=f'维度 {dim}')
    
    plt.xlabel('位置')
    plt.ylabel('编码值')
    plt.title('位置编码的正弦波模式\n不同维度有不同频率，位置越近，编码越相似')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
    
    # 绘制热力图
    plt.figure(figsize=(15, 8))
    plt.imshow(y[0].numpy().T, aspect='auto', cmap='RdBu', interpolation='nearest')
    plt.colorbar(label='编码值')
    plt.xlabel('位置')
    plt.ylabel('维度')
    plt.title('位置编码热力图\n每一列代表一个位置的完整编码向量')
    plt.tight_layout()
    plt.show()

visualize_positional_encoding()

### 3.3 可视化Attention权重

In [None]:
def visualize_attention(attn_weights, src_tokens=None, tgt_tokens=None):
    """
    可视化attention权重矩阵
    
    参数:
        attn_weights: attention权重矩阵 (seq_len_tgt, seq_len_src)
        src_tokens: 源序列的token列表（可选）
        tgt_tokens: 目标序列的token列表（可选）
    """
    plt.figure(figsize=(10, 10))
    
    # 将attention权重转为numpy数组
    if torch.is_tensor(attn_weights):
        attn_weights = attn_weights.detach().cpu().numpy()
    
    # 绘制热力图
    sns.heatmap(attn_weights, cmap='YlOrRd', 
                xticklabels=src_tokens if src_tokens else 'auto',
                yticklabels=tgt_tokens if tgt_tokens else 'auto',
                cbar_kws={'label': 'Attention权重'})
    
    plt.xlabel('源序列位置')
    plt.ylabel('目标序列位置')
    plt.title('Attention权重可视化\n颜色越深，attention越强')
    plt.tight_layout()
    plt.show()

## 4. 训练相关组件

### 4.1 Batch类

In [None]:
class Batch:
    """
    训练时的batch对象
    包含源序列、目标序列及其masks
    """
    def __init__(self, src, tgt=None, pad=2):
        """
        参数:
            src: 源序列 (batch_size, src_len)
            tgt: 目标序列 (batch_size, tgt_len)
            pad: padding token的id
        """
        self.src = src
        # 源序列mask：标记哪些位置不是padding
        self.src_mask = (src != pad).unsqueeze(-2)
        
        if tgt is not None:
            # 目标序列的输入：去掉最后一个token
            self.tgt = tgt[:, :-1]
            # 目标序列的标签：去掉第一个token
            self.tgt_y = tgt[:, 1:]
            # 目标序列mask：既要遮蔽padding，又要遮蔽future tokens
            self.tgt_mask = self.make_std_mask(self.tgt, pad)
            # 统计有效token数（不包括padding）
            self.ntokens = (self.tgt_y != pad).data.sum()
    
    @staticmethod
    def make_std_mask(tgt, pad):
        """创建目标序列的mask：遮蔽padding和future positions"""
        tgt_mask = (tgt != pad).unsqueeze(-2)
        tgt_mask = tgt_mask & subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)
        return tgt_mask

### 4.2 学习率调度器

In [None]:
def rate(step, model_size, factor, warmup):
    """
    学习率调度函数
    
    公式: lrate = factor * (model_size^(-0.5) * min(step^(-0.5), step * warmup^(-1.5)))
    
    前warmup步线性增加，之后按step^(-0.5)衰减
    """
    if step == 0:
        step = 1
    return factor * (
        model_size ** (-0.5) * min(step ** (-0.5), step * warmup ** (-1.5))
    )

def visualize_learning_rate_schedule():
    """
    可视化不同配置下的学习率变化曲线
    """
    plt.figure(figsize=(12, 6))
    
    # 测试不同的配置
    opts = [
        [512, 1, 4000],  # 标准配置
        [512, 1, 8000],  # 更长的warmup
        [256, 1, 4000],  # 更小的model_size
    ]
    
    labels = [
        '标准配置 (d_model=512, warmup=4000)',
        '长warmup (d_model=512, warmup=8000)',
        '小模型 (d_model=256, warmup=4000)'
    ]
    
    for opt, label in zip(opts, labels):
        steps = list(range(1, 20000))
        rates = [rate(step, *opt) for step in steps]
        plt.plot(steps, rates, label=label)
    
    plt.xlabel('训练步数')
    plt.ylabel('学习率')
    plt.title('不同配置下的学习率调度\nWarmup后逐渐衰减')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

visualize_learning_rate_schedule()

### 4.3 Label Smoothing

In [None]:
class LabelSmoothing(nn.Module):
    """
    Label Smoothing正则化
    
    不使用one-hot标签，而是将一部分概率分配给其他类别
    这样可以防止模型过于自信，提高泛化能力
    
    例如，原本标签是[0, 1, 0, 0, 0]
    smoothing=0.1后变成[0.025, 0.9, 0.025, 0.025, 0.025]
    """
    def __init__(self, size, padding_idx, smoothing=0.0):
        super(LabelSmoothing, self).__init__()
        self.criterion = nn.KLDivLoss(reduction="sum")
        self.padding_idx = padding_idx
        self.confidence = 1.0 - smoothing  # 正确类别的概率
        self.smoothing = smoothing
        self.size = size
        self.true_dist = None
    
    def forward(self, x, target):
        """
        x: 模型的log概率输出 (batch_size, vocab_size)
        target: 真实标签 (batch_size,)
        """
        assert x.size(1) == self.size
        true_dist = x.data.clone()
        
        # 将smoothing的概率均匀分配给其他类（除了padding）
        true_dist.fill_(self.smoothing / (self.size - 2))
        
        # 正确类别得到更高的概率
        true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
        
        # padding位置的概率设为0
        true_dist[:, self.padding_idx] = 0
        mask = torch.nonzero(target.data == self.padding_idx)
        if mask.dim() > 0:
            true_dist.index_fill_(0, mask.squeeze(), 0.0)
        
        self.true_dist = true_dist
        return self.criterion(x, true_dist.clone().detach())

In [None]:
def visualize_label_smoothing():
    """
    可视化label smoothing的效果
    """
    # 创建一个示例
    crit = LabelSmoothing(size=5, padding_idx=0, smoothing=0.4)
    
    # 模拟预测
    predict = torch.FloatTensor([
        [0, 0.2, 0.7, 0.1, 0],
        [0, 0.2, 0.7, 0.1, 0],
        [0, 0.2, 0.7, 0.1, 0],
    ])
    
    # 真实标签: [2, 1, 0]
    target = torch.LongTensor([2, 1, 0])
    
    # 计算loss（会设置true_dist）
    crit(predict.log(), target)
    
    # 可视化
    plt.figure(figsize=(12, 4))
    
    for i in range(3):
        plt.subplot(1, 3, i+1)
        plt.bar(range(5), crit.true_dist[i].numpy())
        plt.title(f'样本{i+1}: 真实标签={target[i].item()}')
        plt.xlabel('类别')
        plt.ylabel('目标概率')
        plt.ylim([0, 1])
        
        # 标注confidence值
        plt.axhline(y=crit.confidence, color='r', linestyle='--', 
                   label=f'Confidence={crit.confidence:.2f}')
        plt.legend()
    
    plt.suptitle('Label Smoothing效果\n正确类别获得高概率，其他类别分配少量概率', 
                fontsize=14)
    plt.tight_layout()
    plt.show()

visualize_label_smoothing()

## 5. 简单示例：复制任务

我们先用一个简单的任务来测试模型：给定输入序列，输出相同的序列

In [None]:
def data_gen(V, batch_size, nbatches):
    """
    生成复制任务的随机数据
    
    参数:
        V: 词汇表大小
        batch_size: 批次大小
        nbatches: 批次数量
    """
    for i in range(nbatches):
        # 生成随机数据
        data = torch.randint(1, V, size=(batch_size, 10))
        data[:, 0] = 1  # 第一个token设为1（开始标记）
        
        src = data.requires_grad_(False).clone().detach()
        tgt = data.requires_grad_(False).clone().detach()
        
        yield Batch(src, tgt, pad=0)

In [None]:
class SimpleLossCompute:
    """
    简单的损失计算和训练函数
    """
    def __init__(self, generator, criterion):
        self.generator = generator
        self.criterion = criterion
    
    def __call__(self, x, y, norm):
        x = self.generator(x)
        sloss = (
            self.criterion(
                x.contiguous().view(-1, x.size(-1)), 
                y.contiguous().view(-1)
            )
            / norm
        )
        return sloss.data * norm, sloss

In [None]:
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    """
    贪婪解码：每次选择概率最大的token
    
    这不是最好的解码策略（beam search更好），但实现简单
    """
    memory = model.encode(src, src_mask)
    ys = torch.zeros(1, 1).fill_(start_symbol).type_as(src.data)
    
    for i in range(max_len - 1):
        out = model.decode(
            memory, src_mask, ys, 
            subsequent_mask(ys.size(1)).type_as(src.data)
        )
        prob = model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.data[0]
        ys = torch.cat(
            [ys, torch.zeros(1, 1).type_as(src.data).fill_(next_word)], 
            dim=1
        )
    return ys

In [None]:
def run_copy_task_example():
    """
    运行复制任务示例
    训练一个小模型来复制输入序列
    """
    V = 11  # 词汇表大小
    criterion = LabelSmoothing(size=V, padding_idx=0, smoothing=0.0)
    model = make_model(V, V, N=2)  # 使用2层的小模型
    
    optimizer = torch.optim.Adam(
        model.parameters(), lr=0.5, betas=(0.9, 0.98), eps=1e-9
    )
    lr_scheduler = LambdaLR(
        optimizer=optimizer,
        lr_lambda=lambda step: rate(
            step, model_size=model.src_embed[0].d_model, 
            factor=1.0, warmup=400
        ),
    )
    
    batch_size = 80
    losses = []
    
    print("开始训练复制任务...")
    model.train()
    
    for epoch in range(20):
        epoch_loss = 0
        for i, batch in enumerate(data_gen(V, batch_size, 20)):
            out = model.forward(
                batch.src, batch.tgt, batch.src_mask, batch.tgt_mask
            )
            loss, loss_node = SimpleLossCompute(
                model.generator, criterion
            )(out, batch.tgt_y, batch.ntokens)
            
            loss_node.backward()
            optimizer.step()
            optimizer.zero_grad(set_to_none=True)
            lr_scheduler.step()
            
            epoch_loss += loss
        
        avg_loss = epoch_loss / 20
        losses.append(avg_loss)
        print(f"Epoch {epoch+1}/20, Loss: {avg_loss:.4f}")
    
    # 可视化训练过程
    plt.figure(figsize=(10, 5))
    plt.plot(losses)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('复制任务训练曲线')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
    
    # 测试模型
    model.eval()
    src = torch.LongTensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]])
    src_mask = torch.ones(1, 1, 10)
    
    print("\n测试结果:")
    print(f"输入序列: {src[0].tolist()}")
    
    result = greedy_decode(
        model, src, src_mask, max_len=10, start_symbol=1
    )
    print(f"输出序列: {result[0].tolist()}")
    
    return model

# 运行示例
copy_model = run_copy_task_example()

## 6. 模型分析和可视化

### 6.1 模型结构分析

In [None]:
def analyze_model_structure(model):
    """
    分析并打印模型结构信息
    """
    print("="*60)
    print("Transformer模型结构分析")
    print("="*60)
    
    # 统计参数量
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    
    print(f"\n总参数量: {total_params:,}")
    print(f"可训练参数量: {trainable_params:,}")
    
    # 分析各个组件
    print("\n各组件参数分布:")
    print("-"*60)
    
    components = {
        'Encoder': model.encoder,
        'Decoder': model.decoder,
        'Source Embedding': model.src_embed,
        'Target Embedding': model.tgt_embed,
        'Generator': model.generator,
    }
    
    component_params = {}
    for name, component in components.items():
        params = sum(p.numel() for p in component.parameters())
        component_params[name] = params
        print(f"{name:25s}: {params:10,} ({params/total_params*100:5.2f}%)")
    
    # 绘制参数分布饼图
    plt.figure(figsize=(10, 6))
    plt.pie(component_params.values(), labels=component_params.keys(), 
            autopct='%1.1f%%', startangle=90)
    plt.title('模型各组件参数分布')
    plt.axis('equal')
    plt.tight_layout()
    plt.show()

# 创建一个标准大小的模型进行分析
analysis_model = make_model(10000, 10000, N=6)
analyze_model_structure(analysis_model)

### 6.2 Attention权重可视化

In [None]:
def visualize_model_attention(model, src, src_mask, tgt, tgt_mask):
    """
    可视化模型中的attention权重
    展示不同层和不同头的attention模式
    """
    model.eval()
    
    with torch.no_grad():
        # 前向传播
        memory = model.encode(src, src_mask)
        output = model.decode(memory, src_mask, tgt, tgt_mask)
    
    # 可视化encoder的self-attention（第一层，第一个头）
    if hasattr(model.encoder.layers[0].self_attn, 'attn'):
        encoder_attn = model.encoder.layers[0].self_attn.attn
        
        if encoder_attn is not None:
            # 取第一个batch，第一个头
            attn_weights = encoder_attn[0, 0].cpu().numpy()
            
            plt.figure(figsize=(10, 8))
            sns.heatmap(attn_weights, cmap='YlOrRd', 
                       cbar_kws={'label': 'Attention权重'})
            plt.xlabel('Key位置')
            plt.ylabel('Query位置')
            plt.title('Encoder Self-Attention (第1层, 第1个头)\n每行显示一个位置对其他位置的attention')
            plt.tight_layout()
            plt.show()
            
            # 显示所有头的attention
            n_heads = encoder_attn.shape[1]
            fig, axes = plt.subplots(2, 4, figsize=(16, 8))
            axes = axes.flatten()
            
            for i in range(min(8, n_heads)):
                attn_head = encoder_attn[0, i].cpu().numpy()
                sns.heatmap(attn_head, ax=axes[i], cmap='YlOrRd', 
                           cbar=False, square=True)
                axes[i].set_title(f'Head {i+1}')
                axes[i].set_xlabel('')
                axes[i].set_ylabel('')
            
            plt.suptitle('Encoder Self-Attention - 所有8个头\n不同的头学习到不同的attention模式', 
                        fontsize=14)
            plt.tight_layout()
            plt.show()

# 使用复制任务的模型进行可视化
src = torch.LongTensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]])
tgt = torch.LongTensor([[1, 2, 3, 4, 5, 6, 7, 8, 9]])
src_mask = torch.ones(1, 1, 10)
tgt_mask = subsequent_mask(9)

visualize_model_attention(copy_model, src, src_mask, tgt, tgt_mask)

## 7. 总结

### Transformer的关键要点：

1. **Self-Attention机制**
   - 允许每个位置关注序列中的所有位置
   - 通过Q、K、V三个矩阵计算attention
   - 使用scaling factor (1/√d_k) 稳定梯度

2. **Multi-Head Attention**
   - 将attention分成多个头
   - 每个头学习不同的表示子空间
   - 增强模型的表达能力

3. **位置编码**
   - 由于没有循环结构，需要显式添加位置信息
   - 使用sin/cos函数的组合
   - 允许模型学习相对位置关系

4. **残差连接和Layer Norm**
   - 帮助训练深层网络
   - 稳定训练过程

5. **Masked Attention**
   - Decoder中防止看到未来信息
   - 保持自回归特性

### 下一步学习建议：

1. 尝试在真实的翻译数据集上训练
2. 实验不同的超参数配置
3. 实现beam search解码
4. 尝试其他的位置编码方法
5. 探索Transformer的变体（如BERT、GPT等）

## 8. 练习和实验

### 练习1: 修改模型配置

In [None]:
# 尝试创建不同大小的模型
# 观察参数量的变化

configs = [
    {"name": "Tiny", "N": 2, "d_model": 128, "d_ff": 512, "h": 4},
    {"name": "Small", "N": 4, "d_model": 256, "d_ff": 1024, "h": 8},
    {"name": "Base", "N": 6, "d_model": 512, "d_ff": 2048, "h": 8},
    {"name": "Large", "N": 12, "d_model": 768, "d_ff": 3072, "h": 12},
]

param_counts = []
model_names = []

for config in configs:
    model = make_model(10000, 10000, 
                      N=config["N"], 
                      d_model=config["d_model"],
                      d_ff=config["d_ff"],
                      h=config["h"])
    
    params = sum(p.numel() for p in model.parameters())
    param_counts.append(params / 1e6)  # 转换为百万
    model_names.append(config["name"])
    
    print(f"{config['name']:10s}: {params:12,} parameters ({params/1e6:.2f}M)")

# 可视化
plt.figure(figsize=(10, 6))
plt.bar(model_names, param_counts, color=['skyblue', 'lightgreen', 'salmon', 'gold'])
plt.ylabel('参数量 (百万)')
plt.title('不同规模Transformer模型的参数量对比')
plt.grid(True, alpha=0.3, axis='y')

# 在柱子上标注数值
for i, (name, count) in enumerate(zip(model_names, param_counts)):
    plt.text(i, count, f'{count:.1f}M', ha='center', va='bottom')

plt.tight_layout()
plt.show()

### 练习2: 可视化不同层的表示

In [None]:
def visualize_layer_outputs(model, src, src_mask):
    """
    可视化encoder不同层的输出
    观察表示如何逐层变化
    """
    model.eval()
    
    # 获取embedding输出
    x = model.src_embed(src)
    
    layer_outputs = [x[0].detach().cpu().numpy()]  # 第一个batch
    
    # 逐层前向传播
    for i, layer in enumerate(model.encoder.layers):
        x = layer(x, src_mask)
        layer_outputs.append(x[0].detach().cpu().numpy())
    
    # 可视化
    n_layers = len(layer_outputs)
    fig, axes = plt.subplots(2, (n_layers + 1) // 2, figsize=(16, 6))
    axes = axes.flatten()
    
    for i, output in enumerate(layer_outputs):
        im = axes[i].imshow(output.T, aspect='auto', cmap='RdBu', 
                           interpolation='nearest')
        axes[i].set_title(f'Layer {i}输出' if i > 0 else 'Embedding')
        axes[i].set_xlabel('序列位置')
        axes[i].set_ylabel('特征维度')
        plt.colorbar(im, ax=axes[i])
    
    # 隐藏多余的子图
    for i in range(len(layer_outputs), len(axes)):
        axes[i].axis('off')
    
    plt.suptitle('Encoder各层输出的可视化\n颜色表示激活值的大小', fontsize=14)
    plt.tight_layout()
    plt.show()

# 使用示例
src = torch.LongTensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]])
src_mask = torch.ones(1, 1, 10)
visualize_layer_outputs(copy_model, src, src_mask)

## 9. 保存和加载模型

In [None]:
# 保存模型
def save_model(model, path="transformer_model.pt"):
    """
    保存模型参数
    """
    torch.save({
        'model_state_dict': model.state_dict(),
    }, path)
    print(f"模型已保存到: {path}")

# 加载模型
def load_model(model, path="transformer_model.pt"):
    """
    加载模型参数
    """
    checkpoint = torch.load(path)
    model.load_state_dict(checkpoint['model_state_dict'])
    print(f"模型已从 {path} 加载")
    return model

# 保存当前训练的模型
save_model(copy_model, "copy_task_model.pt")

## 10. 参考资源

- [原始论文: Attention Is All You Need](https://arxiv.org/abs/1706.03762)
- [The Illustrated Transformer](http://jalammar.github.io/illustrated-transformer/)
- [PyTorch官方教程](https://pytorch.org/tutorials/beginner/transformer_tutorial.html)
- [Transformer相关论文集合](https://github.com/huggingface/transformers)

祝你学习愉快！