# 5. Transformer基本块 (Transformer Block)

在前面的教程中，我们学习了Transformer的核心组件：注意力机制、多头注意力和位置编码。现在是时候将这些组件组合成完整的Transformer块了。

## 5.1 Transformer块的整体架构

一个标准的Transformer块包含以下组件：

1. **多头自注意力层** (Multi-Head Self-Attention)
2. **残差连接和层归一化** (Residual Connection & Layer Normalization)
3. **前馈神经网络** (Feed-Forward Network)
4. **再次残差连接和层归一化**

### 数学表示：
$$\text{output}_1 = \text{LayerNorm}(x + \text{MultiHeadAttention}(x))$$
$$\text{output}_2 = \text{LayerNorm}(\text{output}_1 + \text{FFN}(\text{output}_1))$$

![Transformer块架构图](images/transformer_block.png)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import math
from typing import Optional

# 设置随机种子和图表样式
torch.manual_seed(42)
np.random.seed(42)
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")

print(f"PyTorch版本: {torch.__version__}")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")

## 5.2 前馈神经网络 (Feed-Forward Network)

前馈网络是Transformer块中的重要组件，通常结构为：
- 线性层 → ReLU激活 → 线性层
- 中间层的维度通常是输入维度的4倍

$$\text{FFN}(x) = \text{Linear}_2(\text{ReLU}(\text{Linear}_1(x)))$$

In [None]:
class FeedForwardNetwork(nn.Module):
    """
    前馈神经网络
    """
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        super(FeedForwardNetwork, self).__init__()
        
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        前向传播
        
        Args:
            x: [batch_size, seq_len, d_model]
        
        Returns:
            output: [batch_size, seq_len, d_model]
        """
        # x -> Linear -> ReLU -> Dropout -> Linear
        output = self.linear1(x)  # [batch_size, seq_len, d_ff]
        output = F.relu(output)
        output = self.dropout(output)
        output = self.linear2(output)  # [batch_size, seq_len, d_model]
        
        return output

# 测试前馈网络
d_model = 512
d_ff = 2048  # 通常是d_model的4倍
seq_len = 10
batch_size = 2

ffn = FeedForwardNetwork(d_model, d_ff)
test_input = torch.randn(batch_size, seq_len, d_model)
ffn_output = ffn(test_input)

print(f"前馈网络输入形状: {test_input.shape}")
print(f"前馈网络输出形状: {ffn_output.shape}")
print(f"前馈网络参数量: {sum(p.numel() for p in ffn.parameters()):,}")

## 5.3 层归一化 (Layer Normalization)

层归一化是Transformer中的关键组件，它有助于稳定训练：

$$\text{LayerNorm}(x) = \frac{x - \mu}{\sigma} \cdot \gamma + \beta$$

其中 $\mu$ 和 $\sigma$ 是在最后一个维度上计算的均值和标准差。

In [None]:
# 从前面教程导入多头注意力
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1):
        super(MultiHeadAttention, self).__init__()
        
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, 
                mask: Optional[torch.Tensor] = None):
        batch_size, seq_len, _ = query.size()
        
        Q = self.W_q(query).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(key).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(value).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        
        attention_weights = F.softmax(scores, dim=-1)
        attention_weights = self.dropout(attention_weights)
        
        output = torch.matmul(attention_weights, V)
        output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        output = self.W_o(output)
        
        return output, attention_weights

class TransformerBlock(nn.Module):
    """
    完整的Transformer块实现
    """
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
        super(TransformerBlock, self).__init__()
        
        self.d_model = d_model
        self.num_heads = num_heads
        
        # 多头自注意力
        self.self_attention = MultiHeadAttention(d_model, num_heads, dropout)
        
        # 前馈网络
        self.feed_forward = FeedForwardNetwork(d_model, d_ff, dropout)
        
        # 层归一化
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        
        # Dropout
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None):
        """
        前向传播
        
        Args:
            x: [batch_size, seq_len, d_model]
            mask: [batch_size, seq_len, seq_len] 或 None
        
        Returns:
            output: [batch_size, seq_len, d_model]
            attention_weights: [batch_size, num_heads, seq_len, seq_len]
        """
        # 第一个子层：多头自注意力 + 残差连接 + 层归一化
        attn_output, attention_weights = self.self_attention(x, x, x, mask)
        x1 = self.norm1(x + self.dropout(attn_output))
        
        # 第二个子层：前馈网络 + 残差连接 + 层归一化
        ff_output = self.feed_forward(x1)
        x2 = self.norm2(x1 + self.dropout(ff_output))
        
        return x2, attention_weights

# 测试Transformer块
d_model = 256
num_heads = 8
d_ff = 1024
seq_len = 12
batch_size = 2

transformer_block = TransformerBlock(d_model, num_heads, d_ff)
test_input = torch.randn(batch_size, seq_len, d_model)

output, attention_weights = transformer_block(test_input)

print(f"Transformer块输入形状: {test_input.shape}")
print(f"Transformer块输出形状: {output.shape}")
print(f"注意力权重形状: {attention_weights.shape}")
print(f"Transformer块参数量: {sum(p.numel() for p in transformer_block.parameters()):,}")

## 5.4 可视化Transformer块的信息流

让我们可视化数据在Transformer块中的流动过程：

In [None]:
def visualize_transformer_block_flow(transformer_block, input_tensor):
    """
    可视化Transformer块中的信息流动
    """
    transformer_block.eval()
    
    with torch.no_grad():
        # 获取各个中间步骤的输出
        x = input_tensor
        
        # 第一个子层：自注意力
        attn_output, attention_weights = transformer_block.self_attention(x, x, x)
        x_after_attn = x + transformer_block.dropout(attn_output)
        x1 = transformer_block.norm1(x_after_attn)
        
        # 第二个子层：前馈网络
        ff_output = transformer_block.feed_forward(x1)
        x_after_ff = x1 + transformer_block.dropout(ff_output)
        x2 = transformer_block.norm2(x_after_ff)
        
        # 计算各阶段的统计信息
        stages = {
            '输入': x,
            '注意力输出': attn_output,
            '注意力+残差': x_after_attn,
            '第一层归一化': x1,
            '前馈网络输出': ff_output,
            '前馈+残差': x_after_ff,
            '最终输出': x2
        }
        
        # 计算统计信息
        stats = {}
        for name, tensor in stages.items():
            stats[name] = {
                'mean': tensor.mean().item(),
                'std': tensor.std().item(),
                'min': tensor.min().item(),
                'max': tensor.max().item()
            }
        
        # 可视化
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # 1. 均值变化
        means = [stats[name]['mean'] for name in stages.keys()]
        axes[0, 0].plot(means, 'o-', linewidth=2, markersize=6)
        axes[0, 0].set_xticks(range(len(stages)))
        axes[0, 0].set_xticklabels(list(stages.keys()), rotation=45)
        axes[0, 0].set_title('各阶段输出的均值变化')
        axes[0, 0].set_ylabel('均值')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. 标准差变化
        stds = [stats[name]['std'] for name in stages.keys()]
        axes[0, 1].plot(stds, 's-', linewidth=2, markersize=6, color='orange')
        axes[0, 1].set_xticks(range(len(stages)))
        axes[0, 1].set_xticklabels(list(stages.keys()), rotation=45)
        axes[0, 1].set_title('各阶段输出的标准差变化')
        axes[0, 1].set_ylabel('标准差')
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. 注意力权重热力图（第一个头）
        attn_weights_first_head = attention_weights[0, 0].numpy()
        im = axes[1, 0].imshow(attn_weights_first_head, cmap='Blues')
        axes[1, 0].set_title('注意力权重（第1个头）')
        axes[1, 0].set_xlabel('Key位置')
        axes[1, 0].set_ylabel('Query位置')
        plt.colorbar(im, ax=axes[1, 0])
        
        # 4. 输入vs输出对比
        input_sample = x[0, 0, :20].numpy()  # 取第一个样本的第一个位置的前20个维度
        output_sample = x2[0, 0, :20].numpy()
        
        x_dims = range(20)
        axes[1, 1].plot(x_dims, input_sample, 'b-', label='输入', alpha=0.7)
        axes[1, 1].plot(x_dims, output_sample, 'r-', label='输出', alpha=0.7)
        axes[1, 1].set_title('输入vs输出对比（前20维）')
        axes[1, 1].set_xlabel('维度')
        axes[1, 1].set_ylabel('数值')
        axes[1, 1].legend()
        axes[1, 1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        # 打印统计信息
        print("各阶段统计信息:")
        print("-" * 60)
        for name, stat in stats.items():
            print(f"{name:15s}: 均值={stat['mean']:8.4f}, 标准差={stat['std']:6.4f}, "
                  f"范围=[{stat['min']:6.3f}, {stat['max']:6.3f}]")
        
        return stats, attention_weights

# 运行可视化
stats, attn_weights = visualize_transformer_block_flow(transformer_block, test_input)

## 5.5 Pre-LN vs Post-LN 对比

现代Transformer有两种主要的层归一化位置：
- **Post-LN**：原始Transformer论文的设计
- **Pre-LN**：GPT-2及后续模型常用的设计

In [None]:
class PreLNTransformerBlock(nn.Module):
    """
    Pre-LN Transformer块（层归一化在残差连接之前）
    """
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
        super(PreLNTransformerBlock, self).__init__()
        
        self.self_attention = MultiHeadAttention(d_model, num_heads, dropout)
        self.feed_forward = FeedForwardNetwork(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None):
        # Pre-LN: 层归一化在残差连接之前
        # 第一个子层
        norm_x = self.norm1(x)
        attn_output, attention_weights = self.self_attention(norm_x, norm_x, norm_x, mask)
        x1 = x + self.dropout(attn_output)
        
        # 第二个子层
        norm_x1 = self.norm2(x1)
        ff_output = self.feed_forward(norm_x1)
        x2 = x1 + self.dropout(ff_output)
        
        return x2, attention_weights

def compare_ln_positions():
    """
    比较Pre-LN和Post-LN的效果
    """
    d_model = 128
    num_heads = 4
    d_ff = 512
    seq_len = 8
    batch_size = 1
    
    # 创建相同参数的模型
    post_ln_block = TransformerBlock(d_model, num_heads, d_ff)
    pre_ln_block = PreLNTransformerBlock(d_model, num_heads, d_ff)
    
    # 创建测试输入
    test_input = torch.randn(batch_size, seq_len, d_model)
    
    # 设置为评估模式
    post_ln_block.eval()
    pre_ln_block.eval()
    
    with torch.no_grad():
        # 前向传播
        post_ln_output, post_ln_attn = post_ln_block(test_input)
        pre_ln_output, pre_ln_attn = pre_ln_block(test_input)
        
        # 计算统计信息
        post_ln_stats = {
            'mean': post_ln_output.mean().item(),
            'std': post_ln_output.std().item(),
            'max': post_ln_output.max().item(),
            'min': post_ln_output.min().item()
        }
        
        pre_ln_stats = {
            'mean': pre_ln_output.mean().item(),
            'std': pre_ln_output.std().item(),
            'max': pre_ln_output.max().item(),
            'min': pre_ln_output.min().item()
        }
        
        # 可视化对比
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))
        
        # 输出分布对比
        axes[0, 0].hist(post_ln_output.flatten().numpy(), bins=30, alpha=0.7, 
                       label='Post-LN', color='blue')
        axes[0, 0].hist(pre_ln_output.flatten().numpy(), bins=30, alpha=0.7, 
                       label='Pre-LN', color='red')
        axes[0, 0].set_title('输出值分布对比')
        axes[0, 0].set_xlabel('数值')
        axes[0, 0].set_ylabel('频次')
        axes[0, 0].legend()
        
        # 注意力权重对比（第一个头）
        im1 = axes[0, 1].imshow(post_ln_attn[0, 0].numpy(), cmap='Blues')
        axes[0, 1].set_title('Post-LN 注意力权重')
        plt.colorbar(im1, ax=axes[0, 1])
        
        im2 = axes[1, 0].imshow(pre_ln_attn[0, 0].numpy(), cmap='Reds')
        axes[1, 0].set_title('Pre-LN 注意力权重')
        plt.colorbar(im2, ax=axes[1, 0])
        
        # 统计信息对比
        metrics = ['mean', 'std', 'max', 'min']
        post_values = [post_ln_stats[m] for m in metrics]
        pre_values = [pre_ln_stats[m] for m in metrics]
        
        x_pos = np.arange(len(metrics))
        width = 0.35
        
        axes[1, 1].bar(x_pos - width/2, post_values, width, label='Post-LN', alpha=0.8)
        axes[1, 1].bar(x_pos + width/2, pre_values, width, label='Pre-LN', alpha=0.8)
        axes[1, 1].set_xticks(x_pos)
        axes[1, 1].set_xticklabels(metrics)
        axes[1, 1].set_title('统计指标对比')
        axes[1, 1].legend()
        
        plt.tight_layout()
        plt.show()
        
        print("Post-LN vs Pre-LN 对比:")
        print("-" * 40)
        print(f"{'指标':<10} {'Post-LN':<12} {'Pre-LN':<12} {'差异':<10}")
        print("-" * 40)
        for metric in metrics:
            post_val = post_ln_stats[metric]
            pre_val = pre_ln_stats[metric]
            diff = abs(post_val - pre_val)
            print(f"{metric:<10} {post_val:<12.4f} {pre_val:<12.4f} {diff:<10.4f}")

# 运行对比
compare_ln_positions()

## 5.6 完整的编码器实现

现在让我们将多个Transformer块堆叠起来，创建完整的编码器：

In [None]:
class SinusoidalPositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_seq_len: int = 5000):
        super(SinusoidalPositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_seq_len, d_model)
        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                            (-math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        seq_len = x.size(1)
        return x + self.pe[:, :seq_len, :]

class TransformerEncoder(nn.Module):
    """
    完整的Transformer编码器
    """
    def __init__(self, vocab_size: int, d_model: int, num_heads: int, 
                 d_ff: int, num_layers: int, max_seq_len: int = 5000, 
                 dropout: float = 0.1, use_pre_ln: bool = False):
        super(TransformerEncoder, self).__init__()
        
        self.d_model = d_model
        self.num_layers = num_layers
        
        # 词嵌入
        self.embedding = nn.Embedding(vocab_size, d_model)
        
        # 位置编码
        self.pos_encoding = SinusoidalPositionalEncoding(d_model, max_seq_len)
        
        # Transformer块
        if use_pre_ln:
            self.layers = nn.ModuleList([
                PreLNTransformerBlock(d_model, num_heads, d_ff, dropout)
                for _ in range(num_layers)
            ])
        else:
            self.layers = nn.ModuleList([
                TransformerBlock(d_model, num_heads, d_ff, dropout)
                for _ in range(num_layers)
            ])
        
        # 输入dropout
        self.dropout = nn.Dropout(dropout)
        
        # 最终层归一化（Pre-LN架构需要）
        self.final_norm = nn.LayerNorm(d_model) if use_pre_ln else None
        
    def forward(self, input_ids: torch.Tensor, mask: Optional[torch.Tensor] = None):
        """
        前向传播
        
        Args:
            input_ids: [batch_size, seq_len]
            mask: [batch_size, seq_len, seq_len] 或 None
        
        Returns:
            output: [batch_size, seq_len, d_model]
            attention_weights_list: 各层的注意力权重列表
        """
        # 词嵌入和位置编码
        x = self.embedding(input_ids) * math.sqrt(self.d_model)  # 缩放嵌入
        x = self.pos_encoding(x)
        x = self.dropout(x)
        
        # 通过各层
        attention_weights_list = []
        for layer in self.layers:
            x, attention_weights = layer(x, mask)
            attention_weights_list.append(attention_weights)
        
        # 最终层归一化（Pre-LN）
        if self.final_norm is not None:
            x = self.final_norm(x)
        
        return x, attention_weights_list

# 创建完整的编码器
vocab_size = 1000
d_model = 256
num_heads = 8
d_ff = 1024
num_layers = 6
seq_len = 16
batch_size = 2

encoder = TransformerEncoder(
    vocab_size=vocab_size,
    d_model=d_model,
    num_heads=num_heads,
    d_ff=d_ff,
    num_layers=num_layers,
    use_pre_ln=True  # 使用Pre-LN架构
)

# 创建测试输入
input_ids = torch.randint(0, vocab_size, (batch_size, seq_len))

# 前向传播
output, attention_weights_all = encoder(input_ids)

print(f"编码器输入形状: {input_ids.shape}")
print(f"编码器输出形状: {output.shape}")
print(f"注意力权重层数: {len(attention_weights_all)}")
print(f"每层注意力权重形状: {attention_weights_all[0].shape}")
print(f"编码器总参数量: {sum(p.numel() for p in encoder.parameters()):,}")

## 总结

在这个教程中，我们学习了如何构建完整的Transformer块：

### 核心组件：
1. **多头自注意力**：捕获序列内的依赖关系
2. **前馈网络**：提供非线性变换和特征处理
3. **残差连接**：帮助梯度流动，缓解梯度消失
4. **层归一化**：稳定训练，加速收敛

### 架构变体：
- **Post-LN**：原始设计，层归一化在残差连接之后
- **Pre-LN**：现代设计，层归一化在残差连接之前，训练更稳定

### 设计原则：
- **残差连接**：确保信息能够直接流过深层网络
- **层归一化**：保持激活值的合理范围
- **Dropout**：防止过拟合
- **维度一致性**：所有子层的输出维度都是d_model

### 实际应用：
- Transformer块是BERT、GPT等模型的基础构建单元
- 通过堆叠多个块可以创建深层的编码器或解码器
- 不同任务可能需要不同的层数和维度配置

在下一个教程中，我们将学习如何构建**完整的Transformer模型**，包括编码器-解码器架构。

### 下一步学习：
- [06-complete-transformer.ipynb](06-complete-transformer.ipynb) - 完整Transformer实现