# Transformer 架构详解与交互式演示

本 Notebook 提供了 Transformer 架构的完整实现和交互式演示，帮助深入理解这一革命性的深度学习架构。

## 目录
1. [Transformer 架构概述](#1-transformer-架构概述)
2. [环境设置和依赖导入](#2-环境设置和依赖导入)
3. [位置编码详解](#3-位置编码详解)
4. [多头自注意力机制](#4-多头自注意力机制)
5. [编码器和解码器](#5-编码器和解码器)
6. [完整模型构建](#6-完整模型构建)
7. [训练演示](#7-训练演示)
8. [注意力可视化](#8-注意力可视化)
9. [模型推理](#9-模型推理)
10. [总结与扩展](#10-总结与扩展)

## 1. Transformer 架构概述

Transformer 是由 Vaswani 等人在 2017 年提出的一种基于自注意力机制的神经网络架构。它彻底改变了自然语言处理领域，成为了 BERT、GPT 等模型的基础。

### 核心创新点：
- **自注意力机制**：允许模型直接关注序列中的任意位置
- **并行计算**：相比 RNN，可以并行处理整个序列
- **位置编码**：通过数学方式编码位置信息
- **多头注意力**：从多个角度捕获不同类型的依赖关系

### 架构组成：
1. **编码器（Encoder）**：处理输入序列
2. **解码器（Decoder）**：生成输出序列
3. **注意力机制**：核心计算单元
4. **前馈网络**：非线性变换
5. **残差连接和层归一化**：稳定训练

## 2. 环境设置和依赖导入

In [None]:
# 导入必要的库
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import math
import copy
from typing import Optional

# 设置中文字体以解决可视化中文显示问题
import matplotlib
matplotlib.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans', 'Arial Unicode MS']
matplotlib.rcParams['axes.unicode_minus'] = False

# 设置随机种子以确保结果可重现
torch.manual_seed(42)
np.random.seed(42)

# 检查 CUDA 是否可用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'使用设备: {device}')

# 设置图表样式
plt.style.use('default')
sns.set_palette('husl')

## 3. 位置编码详解

由于 Transformer 没有循环结构，需要通过位置编码来为模型提供序列中的位置信息。

### 位置编码公式：
- PE(pos, 2i) = sin(pos / 10000^(2i/d_model))
- PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))

其中：
- pos：位置索引
- i：维度索引
- d_model：模型维度

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 5000, dropout: float = 0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        
        # 创建位置编码矩阵
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        
        # 计算除数项
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                             (-math.log(10000.0) / d_model))
        
        # 应用正弦和余弦函数
        pe[:, 0::2] = torch.sin(position * div_term)  # 偶数位置使用 sin
        pe[:, 1::2] = torch.cos(position * div_term)  # 奇数位置使用 cos
        
        # 添加批次维度并注册为缓冲区
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

# 可视化位置编码
def visualize_positional_encoding(d_model=512, max_len=100):
    pe = PositionalEncoding(d_model, max_len)
    
    # 获取位置编码矩阵
    pos_encoding = pe.pe[:max_len, 0, :].numpy()
    
    plt.figure(figsize=(15, 8))
    
    # 绘制位置编码热力图
    plt.subplot(2, 2, 1)
    plt.imshow(pos_encoding.T, cmap='RdYlBu', aspect='auto')
    plt.title('位置编码热力图')
    plt.xlabel('位置')
    plt.ylabel('维度')
    plt.colorbar()
    
    # 绘制前几个维度的位置编码曲线
    plt.subplot(2, 2, 2)
    for i in range(0, min(8, d_model), 2):
        plt.plot(pos_encoding[:, i], label=f'维度 {i}')
        plt.plot(pos_encoding[:, i+1], label=f'维度 {i+1}', linestyle='--')
    plt.title('位置编码曲线（前8个维度）')
    plt.xlabel('位置')
    plt.ylabel('编码值')
    plt.legend()
    plt.grid(True)
    
    # 绘制不同频率的正弦波
    plt.subplot(2, 2, 3)
    positions = np.arange(max_len)
    for i in [0, 2, 4, 8, 16]:
        freq = 1 / (10000 ** (i / d_model))
        plt.plot(positions, np.sin(positions * freq), label=f'频率 {freq:.4f}')
    plt.title('不同频率的正弦波')
    plt.xlabel('位置')
    plt.ylabel('值')
    plt.legend()
    plt.grid(True)
    
    # 绘制位置编码的相似性矩阵
    plt.subplot(2, 2, 4)
    similarity = np.dot(pos_encoding, pos_encoding.T)
    plt.imshow(similarity, cmap='viridis', aspect='auto')
    plt.title('位置编码相似性矩阵')
    plt.xlabel('位置')
    plt.ylabel('位置')
    plt.colorbar()
    
    plt.tight_layout()
    plt.show()

# 运行位置编码可视化
visualize_positional_encoding()

## 4. 多头自注意力机制

自注意力机制是 Transformer 的核心，它允许模型在处理序列中的每个位置时，关注序列中的所有位置。

### 注意力计算公式：
Attention(Q, K, V) = softmax(QK^T / √d_k)V

其中：
- Q：查询矩阵（Query）
- K：键矩阵（Key）
- V：值矩阵（Value）
- d_k：键的维度

### 多头注意力的优势：
- 允许模型同时关注不同类型的信息
- 增加模型的表达能力
- 提供更丰富的特征表示

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1):
        super().__init__()
        assert d_model % num_heads == 0
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads  # 每个头的维度
        
        # 线性投影层
        self.w_q = nn.Linear(d_model, d_model)  # Query 投影
        self.w_k = nn.Linear(d_model, d_model)  # Key 投影
        self.w_v = nn.Linear(d_model, d_model)  # Value 投影
        self.w_o = nn.Linear(d_model, d_model)  # 输出投影
        
        self.dropout = nn.Dropout(dropout)
        
    def scaled_dot_product_attention(self, query, key, value, mask=None):
        d_k = query.size(-1)
        
        # 计算注意力分数
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
        
        # 应用掩码
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        
        # 计算注意力权重
        attention_weights = F.softmax(scores, dim=-1)
        attention_weights = self.dropout(attention_weights)
        
        # 应用注意力权重到值
        output = torch.matmul(attention_weights, value)
        
        return output, attention_weights
    
    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        
        # 1. 线性投影并重塑为多头格式
        Q = self.w_q(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.w_k(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.w_v(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        
        # 2. 计算注意力
        attention_output, attention_weights = self.scaled_dot_product_attention(Q, K, V, mask)
        
        # 3. 连接多头输出
        attention_output = attention_output.transpose(1, 2).contiguous().view(
            batch_size, -1, self.d_model)
        
        # 4. 最终线性投影
        output = self.w_o(attention_output)
        
        return output, attention_weights

# 演示注意力机制
def demonstrate_attention():
    d_model = 512
    num_heads = 8
    seq_len = 10
    batch_size = 1
    
    # 创建多头注意力模块
    attention = MultiHeadAttention(d_model, num_heads)
    
    # 创建随机输入
    x = torch.randn(batch_size, seq_len, d_model)
    
    # 计算自注意力
    output, weights = attention(x, x, x)
    
    print(f'输入形状: {x.shape}')
    print(f'输出形状: {output.shape}')
    print(f'注意力权重形状: {weights.shape}')
    
    # 可视化注意力权重
    plt.figure(figsize=(12, 8))
    
    # 显示前4个头的注意力权重
    for i in range(4):
        plt.subplot(2, 2, i+1)
        attn_map = weights[0, i].detach().numpy()
        plt.imshow(attn_map, cmap='Blues', aspect='auto')
        plt.title(f'注意力头 {i+1}')
        plt.xlabel('键位置')
        plt.ylabel('查询位置')
        plt.colorbar()
    
    plt.tight_layout()
    plt.show()
    
    return output, weights

# 运行注意力演示
output, weights = demonstrate_attention()

## 5. 导入完整的 Transformer 模型

现在我们导入之前创建的完整 Transformer 模型，并进行演示。

In [None]:
# 导入完整的 Transformer 模型
from transformer_model import Transformer

# 创建模型实例
def create_transformer_model():
    model = Transformer(
        src_vocab_size=1000,
        tgt_vocab_size=1000,
        d_model=512,
        num_heads=8,
        num_layers=6,
        d_ff=2048,
        max_len=100,
        dropout=0.1
    )
    
    # 计算模型参数数量
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    
    print(f'模型总参数数: {total_params:,}')
    print(f'可训练参数数: {trainable_params:,}')
    
    return model

# 创建模型
model = create_transformer_model()

## 6. 模型推理演示

演示如何使用 Transformer 模型进行推理，并可视化注意力权重。

In [None]:
def demonstrate_model_inference():
    # 创建示例输入
    batch_size = 2
    src_len = 10
    tgt_len = 8
    
    # 随机生成源序列和目标序列
    src = torch.randint(1, 100, (batch_size, src_len))
    tgt = torch.randint(1, 100, (batch_size, tgt_len))
    
    print(f'源序列形状: {src.shape}')
    print(f'目标序列形状: {tgt.shape}')
    
    # 模型推理
    model.eval()
    with torch.no_grad():
        output, enc_attn, dec_attn = model(src, tgt)
    
    print(f'输出形状: {output.shape}')
    print(f'编码器注意力层数: {len(enc_attn)}')
    print(f'解码器注意力层数: {len(dec_attn)}')
    
    return output, enc_attn, dec_attn, src, tgt

# 运行推理演示
output, enc_attn, dec_attn, src, tgt = demonstrate_model_inference()

## 7. 注意力权重可视化

可视化编码器和解码器的注意力权重，帮助理解模型的注意力模式。

In [None]:
def visualize_attention_weights(enc_attn, dec_attn, layer_idx=0):
    # 可视化编码器注意力
    plt.figure(figsize=(15, 10))
    
    # 编码器注意力 - 显示前4个头
    plt.suptitle(f'第 {layer_idx} 层注意力权重可视化', fontsize=16)
    
    for i in range(4):
        plt.subplot(2, 4, i+1)
        attn_map = enc_attn[layer_idx][0, i].detach().numpy()
        plt.imshow(attn_map, cmap='Blues', aspect='auto')
        plt.title(f'编码器头 {i+1}')
        plt.xlabel('键位置')
        plt.ylabel('查询位置')
        plt.colorbar()
    
    # 解码器自注意力 - 显示前4个头
    for i in range(4):
        plt.subplot(2, 4, i+5)
        attn_map = dec_attn[layer_idx][0][0, i].detach().numpy()
        plt.imshow(attn_map, cmap='Reds', aspect='auto')
        plt.title(f'解码器头 {i+1}')
        plt.xlabel('键位置')
        plt.ylabel('查询位置')
        plt.colorbar()
    
    plt.tight_layout()
    plt.show()

# 可视化注意力权重
visualize_attention_weights(enc_attn, dec_attn, layer_idx=0)

## 8. 训练演示

演示如何训练 Transformer 模型，包括损失函数计算和优化器设置。

In [None]:
def demonstrate_training():
    # 设置训练参数
    model.train()
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
    
    # 模拟训练数据
    batch_size = 4
    src_len = 12
    tgt_len = 10
    vocab_size = 1000
    
    losses = []
    
    print('开始训练演示...')
    
    # 训练几个步骤
    for step in range(10):
        # 生成随机数据
        src = torch.randint(1, vocab_size, (batch_size, src_len))
        tgt_input = torch.randint(1, vocab_size, (batch_size, tgt_len))
        tgt_output = torch.randint(1, vocab_size, (batch_size, tgt_len))
        
        # 前向传播
        optimizer.zero_grad()
        output, _, _ = model(src, tgt_input)
        
        # 计算损失
        loss = criterion(output.reshape(-1, vocab_size), tgt_output.reshape(-1))
        
        # 反向传播
        loss.backward()
        optimizer.step()
        
        losses.append(loss.item())
        
        if step % 2 == 0:
            print(f'步骤 {step}: 损失 = {loss.item():.4f}')
    
    # 绘制损失曲线
    plt.figure(figsize=(10, 6))
    plt.plot(losses, 'b-', linewidth=2)
    plt.title('训练损失曲线')
    plt.xlabel('训练步骤')
    plt.ylabel('损失值')
    plt.grid(True)
    plt.show()
    
    print(f'训练完成！最终损失: {losses[-1]:.4f}')
    
    return losses

# 运行训练演示
losses = demonstrate_training()

## 9. 模型分析和可视化

分析模型的各种特性，包括参数分布、梯度流等。

In [None]:
def analyze_model():
    # 分析模型参数分布
    plt.figure(figsize=(15, 10))
    
    # 收集所有参数
    all_params = []
    layer_names = []
    
    for name, param in model.named_parameters():
        if param.requires_grad:
            all_params.extend(param.data.flatten().numpy())
            layer_names.append(name)
    
    # 参数分布直方图
    plt.subplot(2, 3, 1)
    plt.hist(all_params, bins=50, alpha=0.7)
    plt.title('模型参数分布')
    plt.xlabel('参数值')
    plt.ylabel('频次')
    
    # 不同层的参数统计
    layer_stats = []
    layer_labels = []
    
    for name, param in model.named_parameters():
        if param.requires_grad and 'weight' in name:
            layer_stats.append([
                param.data.mean().item(),
                param.data.std().item(),
                param.data.min().item(),
                param.data.max().item()
            ])
            layer_labels.append(name.split('.')[0])
    
    # 参数统计可视化
    if layer_stats:
        layer_stats = np.array(layer_stats)
        
        plt.subplot(2, 3, 2)
        plt.plot(layer_stats[:, 0], 'o-', label='均值')
        plt.plot(layer_stats[:, 1], 's-', label='标准差')
        plt.title('各层参数统计')
        plt.xlabel('层索引')
        plt.ylabel('值')
        plt.legend()
        plt.grid(True)
    
    # 模型结构信息
    plt.subplot(2, 3, 3)
    module_counts = {}
    for name, module in model.named_modules():
        module_type = type(module).__name__
        module_counts[module_type] = module_counts.get(module_type, 0) + 1
    
    # 过滤掉容器模块
    filtered_counts = {k: v for k, v in module_counts.items() 
                      if k not in ['Transformer', 'ModuleList', 'Sequential']}
    
    if filtered_counts:
        plt.bar(range(len(filtered_counts)), list(filtered_counts.values()))
        plt.xticks(range(len(filtered_counts)), list(filtered_counts.keys()), rotation=45)
        plt.title('模型组件统计')
        plt.ylabel('数量')
    
    # 注意力头分析
    plt.subplot(2, 3, 4)
    # 创建示例数据进行注意力分析
    with torch.no_grad():
        src = torch.randint(1, 100, (1, 10))
        tgt = torch.randint(1, 100, (1, 8))
        _, enc_attn, _ = model(src, tgt)
        
        # 计算每个头的注意力熵
        entropies = []
        for layer_attn in enc_attn:
            layer_entropy = []
            for head in range(layer_attn.size(1)):
                attn_weights = layer_attn[0, head]
                entropy = -(attn_weights * torch.log(attn_weights + 1e-9)).sum(dim=-1).mean()
                layer_entropy.append(entropy.item())
            entropies.append(layer_entropy)
        
        entropies = np.array(entropies)
        plt.imshow(entropies, cmap='viridis', aspect='auto')
        plt.title('注意力熵分布')
        plt.xlabel('注意力头')
        plt.ylabel('层')
        plt.colorbar()
    
    # 模型复杂度分析
    plt.subplot(2, 3, 5)
    param_sizes = []
    param_names = []
    
    for name, param in model.named_parameters():
        if param.requires_grad:
            param_sizes.append(param.numel())
            param_names.append(name.split('.')[0])
    
    # 按参数数量排序
    sorted_indices = np.argsort(param_sizes)[-10:]  # 显示前10个最大的
    top_sizes = [param_sizes[i] for i in sorted_indices]
    top_names = [param_names[i] for i in sorted_indices]
    
    plt.barh(range(len(top_sizes)), top_sizes)
    plt.yticks(range(len(top_names)), top_names)
    plt.title('参数数量分布（前10）')
    plt.xlabel('参数数量')
    
    # 内存使用分析
    plt.subplot(2, 3, 6)
    memory_usage = []
    layer_types = []
    
    for name, module in model.named_modules():
        if hasattr(module, 'weight') and module.weight is not None:
            memory = module.weight.numel() * 4  # 假设 float32
            memory_usage.append(memory / 1024 / 1024)  # 转换为 MB
            layer_types.append(type(module).__name__)
    
    if memory_usage:
        plt.pie(memory_usage[:5], labels=layer_types[:5], autopct='%1.1f%%')
        plt.title('内存使用分布 (MB)')
    
    plt.tight_layout()
    plt.show()
    
    # 打印模型摘要
    print('\n=== 模型分析摘要 ===')
    print(f'总参数数: {sum(p.numel() for p in model.parameters()):,}')
    print(f'可训练参数数: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}')
    print(f'模型大小: {sum(p.numel() * 4 for p in model.parameters()) / 1024 / 1024:.2f} MB')
    print(f'编码器层数: {len(model.encoder.layers)}')
    print(f'解码器层数: {len(model.decoder.layers)}')
    print(f'注意力头数: {model.encoder.layers[0].self_attention.num_heads}')
    print(f'模型维度: {model.encoder.layers[0].self_attention.d_model}')

# 运行模型分析
analyze_model()

## 10. 总结与扩展

### 本教程涵盖的内容：

1. **Transformer 架构基础**：理解了自注意力机制、位置编码等核心概念
2. **模型实现**：从零开始实现了完整的 Transformer 模型
3. **可视化分析**：通过多种图表理解模型的工作原理
4. **训练演示**：展示了模型的训练过程
5. **模型分析**：深入分析了模型的各种特性

### 进一步学习方向：

1. **预训练模型**：学习 BERT、GPT 等预训练模型
2. **优化技术**：学习学习率调度、梯度裁剪等优化技术
3. **应用领域**：探索机器翻译、文本生成、问答系统等应用
4. **模型变体**：了解 Vision Transformer、DETR 等变体
5. **效率优化**：学习模型压缩、量化、蒸馏等技术

### 实践建议：

- 尝试修改模型参数，观察对性能的影响
- 在真实数据集上训练模型
- 实现更复杂的注意力机制
- 探索不同的位置编码方法
- 研究注意力权重的可解释性

In [None]:
# 最后的总结性演示
print('🎉 Transformer 架构学习完成！')
print('\n主要收获：')
print('1. 理解了自注意力机制的工作原理')
print('2. 掌握了位置编码的数学原理')
print('3. 实现了完整的 Transformer 模型')
print('4. 学会了可视化注意力权重')
print('5. 了解了模型训练和分析方法')
print('\n继续探索 Transformer 的无限可能！🚀')