# Transformer求解偏微分方程 - 简明教程

本教程介绍如何使用Transformer架构求解偏微分方程(PDE)。

## 目录
1. [基础概念](#1-基础概念)
2. [数据准备](#2-数据准备)
3. [模型构建](#3-模型构建)
4. [训练过程](#4-训练过程)
5. [结果可视化](#5-结果可视化)
6. [实战案例：热传导方程](#6-实战案例热传导方程)

## 1. 基础概念

### 1.1 为什么用Transformer解PDE？

**传统方法的局限**：
- CNN局限于局部特征
- RNN难以捕捉长距离依赖

**Transformer的优势**：
- ✅ 全局注意力机制：捕捉空间任意位置的关系
- ✅ 并行计算：高效训练
- ✅ 多尺度特征：自适应关注不同尺度
- ✅ 灵活性：处理不规则网格

### 1.2 核心思想

将PDE求解转化为序列建模问题：

```
输入序列：空间点 (x₁, x₂, ..., xₙ) + 对应值 (u₁, u₂, ..., uₙ)
         ↓
    Transformer
         ↓
输出序列：预测值 (û₁, û₂, ..., ûₙ)
```

In [None]:
# 导入必要的库
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import sys
import os

# 添加项目路径
sys.path.append('../')
sys.path.append('../../')

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 2. 数据准备

### 2.1 生成1D热传导方程数据

考虑简单的1D热传导方程：
$$\frac{\partial u}{\partial t} = \alpha \frac{\partial^2 u}{\partial x^2}$$

初始条件：$u(x, 0) = \sin(\pi x)$

边界条件：$u(0, t) = u(1, t) = 0$

In [None]:
def generate_heat_equation_data(num_samples=1000, nx=64, nt=50, alpha=0.01):
    """
    生成1D热传导方程数据
    
    参数:
        num_samples: 样本数量
        nx: 空间离散点数
        nt: 时间步数
        alpha: 热扩散系数
    """
    x = np.linspace(0, 1, nx)
    t = np.linspace(0, 1, nt)
    dx = x[1] - x[0]
    dt = t[1] - t[0]
    
    # 确保数值稳定性
    r = alpha * dt / dx**2
    if r > 0.5:
        print(f"Warning: r = {r:.4f} > 0.5, may be unstable!")
    
    data_input = []
    data_output = []
    
    for _ in range(num_samples):
        # 随机初始条件（多个正弦波的叠加）
        n_modes = np.random.randint(1, 4)
        u = np.zeros((nt, nx))
        
        for mode in range(1, n_modes + 1):
            amp = np.random.uniform(0.5, 1.5)
            u[0] += amp * np.sin(mode * np.pi * x)
        
        # 应用边界条件
        u[:, 0] = 0
        u[:, -1] = 0
        
        # 时间演化（显式差分格式）
        for n in range(0, nt-1):
            for i in range(1, nx-1):
                u[n+1, i] = u[n, i] + r * (u[n, i+1] - 2*u[n, i] + u[n, i-1])
        
        data_input.append(u[0])  # 初始条件
        data_output.append(u[-1])  # 最终状态
    
    return np.array(data_input), np.array(data_output), x

# 生成数据
print("生成训练数据...")
X_train, y_train, x_grid = generate_heat_equation_data(num_samples=800)
X_test, y_test, _ = generate_heat_equation_data(num_samples=200)

print(f"训练集: {X_train.shape}, 测试集: {X_test.shape}")

# 可视化一些样本
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
for i in range(3):
    axes[0, i].plot(x_grid, X_train[i], 'b-', linewidth=2)
    axes[0, i].set_title(f'初始条件 (样本 {i+1})')
    axes[0, i].set_xlabel('x')
    axes[0, i].set_ylabel('u(x, 0)')
    axes[0, i].grid(True)
    
    axes[1, i].plot(x_grid, y_train[i], 'r-', linewidth=2)
    axes[1, i].set_title(f'最终状态 (样本 {i+1})')
    axes[1, i].set_xlabel('x')
    axes[1, i].set_ylabel('u(x, T)')
    axes[1, i].grid(True)

plt.tight_layout()
plt.show()

### 2.2 创建PyTorch数据集

In [None]:
class PDEDataset(Dataset):
    """PDE数据集类"""
    
    def __init__(self, inputs, outputs, coords):
        self.inputs = torch.FloatTensor(inputs).unsqueeze(-1)  # [N, seq_len, 1]
        self.outputs = torch.FloatTensor(outputs).unsqueeze(-1)
        self.coords = torch.FloatTensor(coords).unsqueeze(0).expand(len(inputs), -1, 1)  # [N, seq_len, 1]
    
    def __len__(self):
        return len(self.inputs)
    
    def __getitem__(self, idx):
        return self.inputs[idx], self.coords[idx], self.outputs[idx]

# 创建数据加载器
train_dataset = PDEDataset(X_train, y_train, x_grid)
test_dataset = PDEDataset(X_test, y_test, x_grid)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print(f"训练批次数: {len(train_loader)}")
print(f"测试批次数: {len(test_loader)}")

## 3. 模型构建

### 3.1 位置编码（Positional Encoding）

Transformer需要位置信息，我们使用正弦-余弦位置编码：

In [None]:
class PhysicsPositionalEncoding(nn.Module):
    """基于物理坐标的位置编码"""
    
    def __init__(self, d_model, coord_dim=1):
        super().__init__()
        self.d_model = d_model
        self.coord_dim = coord_dim
        
        # 坐标投影
        self.coord_proj = nn.Linear(coord_dim, d_model)
        
        # 频率编码
        self.freq_bands = nn.Parameter(torch.randn(d_model // 2, coord_dim))
    
    def forward(self, coords):
        """
        编码物理坐标
        
        参数:
            coords: [batch_size, seq_len, coord_dim]
        返回:
            编码: [batch_size, seq_len, d_model]
        """
        # 线性编码
        linear_encoding = self.coord_proj(coords)
        
        # 频率编码
        coords_expanded = coords.unsqueeze(-2)  # [B, L, 1, C]
        freq_expanded = self.freq_bands.unsqueeze(0).unsqueeze(0)  # [1, 1, D/2, C]
        
        freqs = torch.sum(coords_expanded * freq_expanded, dim=-1)  # [B, L, D/2]
        
        sin_encoding = torch.sin(freqs)
        cos_encoding = torch.cos(freqs)
        
        freq_encoding = torch.cat([sin_encoding, cos_encoding], dim=-1)
        
        return linear_encoding + freq_encoding

### 3.2 简单的Transformer模型

构建一个轻量级的Transformer用于PDE求解：

In [None]:
class SimplePDETransformer(nn.Module):
    """简单的PDE求解Transformer"""
    
    def __init__(self, input_dim=1, output_dim=1, d_model=128, nhead=4, 
                 num_layers=3, coord_dim=1):
        super().__init__()
        
        self.d_model = d_model
        
        # 输入投影
        self.input_proj = nn.Linear(input_dim, d_model)
        
        # 位置编码
        self.pos_encoding = PhysicsPositionalEncoding(d_model, coord_dim)
        
        # Transformer编码器
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=4 * d_model,
            dropout=0.1,
            activation='gelu',
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        
        # 输出投影
        self.output_proj = nn.Sequential(
            nn.Linear(d_model, d_model // 2),
            nn.GELU(),
            nn.Linear(d_model // 2, output_dim)
        )
        
        self._reset_parameters()
    
    def _reset_parameters(self):
        """初始化参数"""
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)
    
    def forward(self, x, coords):
        """
        前向传播
        
        参数:
            x: 输入场值 [batch_size, seq_len, input_dim]
            coords: 空间坐标 [batch_size, seq_len, coord_dim]
        返回:
            输出场值 [batch_size, seq_len, output_dim]
        """
        # 输入嵌入 + 位置编码
        x = self.input_proj(x) + self.pos_encoding(coords)
        
        # Transformer
        x = self.transformer(x)
        
        # 输出投影
        return self.output_proj(x)

# 创建模型
model = SimplePDETransformer(
    input_dim=1,
    output_dim=1,
    d_model=128,
    nhead=4,
    num_layers=4,
    coord_dim=1
).to(device)

# 统计参数量
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"\n模型参数量: {num_params:,}")
print(f"\n模型结构:\n{model}")

## 4. 训练过程

### 4.1 定义损失函数和优化器

In [None]:
# 损失函数
criterion = nn.MSELoss()

# 优化器
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)

# 学习率调度器
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100, eta_min=1e-6)

print("优化器和损失函数已配置")

### 4.2 训练循环

In [None]:
def train_epoch(model, dataloader, criterion, optimizer, device):
    """训练一个epoch"""
    model.train()
    total_loss = 0
    
    for inputs, coords, targets in dataloader:
        inputs = inputs.to(device)
        coords = coords.to(device)
        targets = targets.to(device)
        
        # 前向传播
        outputs = model(inputs, coords)
        loss = criterion(outputs, targets)
        
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        
        # 梯度裁剪
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(dataloader)

def evaluate(model, dataloader, criterion, device):
    """评估模型"""
    model.eval()
    total_loss = 0
    
    with torch.no_grad():
        for inputs, coords, targets in dataloader:
            inputs = inputs.to(device)
            coords = coords.to(device)
            targets = targets.to(device)
            
            outputs = model(inputs, coords)
            loss = criterion(outputs, targets)
            total_loss += loss.item()
    
    return total_loss / len(dataloader)

# 训练
num_epochs = 100
train_losses = []
test_losses = []
best_test_loss = float('inf')

print("开始训练...\n")
for epoch in range(num_epochs):
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
    test_loss = evaluate(model, test_loader, criterion, device)
    
    train_losses.append(train_loss)
    test_losses.append(test_loss)
    
    # 更新学习率
    scheduler.step()
    
    # 保存最佳模型
    if test_loss < best_test_loss:
        best_test_loss = test_loss
        torch.save(model.state_dict(), 'best_transformer_model.pth')
    
    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}] - "
              f"Train Loss: {train_loss:.6f}, Test Loss: {test_loss:.6f}, "
              f"LR: {optimizer.param_groups[0]['lr']:.6f}")

print(f"\n训练完成! 最佳测试损失: {best_test_loss:.6f}")

### 4.3 训练曲线可视化

In [None]:
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss', linewidth=2)
plt.plot(test_losses, label='Test Loss', linewidth=2)
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Loss', fontsize=12)
plt.title('Training and Test Loss', fontsize=14, fontweight='bold')
plt.legend(fontsize=10)
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.semilogy(train_losses, label='Train Loss', linewidth=2)
plt.semilogy(test_losses, label='Test Loss', linewidth=2)
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Loss (log scale)', fontsize=12)
plt.title('Training and Test Loss (Log Scale)', fontsize=14, fontweight='bold')
plt.legend(fontsize=10)
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 5. 结果可视化

### 5.1 预测vs真实值对比

In [None]:
# 加载最佳模型
model.load_state_dict(torch.load('best_transformer_model.pth'))
model.eval()

# 获取一些测试样本
num_vis = 6
indices = np.random.choice(len(X_test), num_vis, replace=False)

fig, axes = plt.subplots(3, num_vis//3, figsize=(18, 12))
axes = axes.flatten()

with torch.no_grad():
    for i, idx in enumerate(indices):
        # 准备输入
        input_data = torch.FloatTensor(X_test[idx:idx+1]).unsqueeze(-1).to(device)
        coords_data = torch.FloatTensor(x_grid).unsqueeze(0).unsqueeze(-1).to(device)
        
        # 预测
        output = model(input_data, coords_data)
        prediction = output.cpu().numpy()[0, :, 0]
        
        # 真实值
        true_output = y_test[idx]
        
        # 绘图
        axes[i].plot(x_grid, X_test[idx], 'b--', linewidth=2, label='初始条件', alpha=0.6)
        axes[i].plot(x_grid, true_output, 'g-', linewidth=2, label='真实值')
        axes[i].plot(x_grid, prediction, 'r--', linewidth=2, label='预测值')
        
        # 计算相对误差
        rel_error = np.linalg.norm(prediction - true_output) / np.linalg.norm(true_output)
        
        axes[i].set_title(f'样本 {idx+1} (相对误差: {rel_error:.4f})', fontsize=11, fontweight='bold')
        axes[i].set_xlabel('x', fontsize=10)
        axes[i].set_ylabel('u', fontsize=10)
        axes[i].legend(fontsize=9, loc='upper right')
        axes[i].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

### 5.2 误差分析

In [None]:
# 计算所有测试样本的误差
relative_errors = []
absolute_errors = []

model.eval()
with torch.no_grad():
    for i in range(len(X_test)):
        input_data = torch.FloatTensor(X_test[i:i+1]).unsqueeze(-1).to(device)
        coords_data = torch.FloatTensor(x_grid).unsqueeze(0).unsqueeze(-1).to(device)
        
        output = model(input_data, coords_data)
        prediction = output.cpu().numpy()[0, :, 0]
        true_output = y_test[i]
        
        abs_error = np.linalg.norm(prediction - true_output)
        rel_error = abs_error / np.linalg.norm(true_output)
        
        absolute_errors.append(abs_error)
        relative_errors.append(rel_error)

# 可视化误差分布
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# 相对误差直方图
axes[0].hist(relative_errors, bins=30, edgecolor='black', alpha=0.7)
axes[0].axvline(np.mean(relative_errors), color='r', linestyle='--', 
                linewidth=2, label=f'平均值: {np.mean(relative_errors):.4f}')
axes[0].set_xlabel('相对误差', fontsize=12)
axes[0].set_ylabel('频数', fontsize=12)
axes[0].set_title('相对误差分布', fontsize=14, fontweight='bold')
axes[0].legend(fontsize=10)
axes[0].grid(True, alpha=0.3)

# 绝对误差直方图
axes[1].hist(absolute_errors, bins=30, edgecolor='black', alpha=0.7, color='orange')
axes[1].axvline(np.mean(absolute_errors), color='r', linestyle='--', 
                linewidth=2, label=f'平均值: {np.mean(absolute_errors):.4f}')
axes[1].set_xlabel('绝对误差', fontsize=12)
axes[1].set_ylabel('频数', fontsize=12)
axes[1].set_title('绝对误差分布', fontsize=14, fontweight='bold')
axes[1].legend(fontsize=10)
axes[1].grid(True, alpha=0.3)

# 误差散点图
axes[2].scatter(absolute_errors, relative_errors, alpha=0.5, s=20)
axes[2].set_xlabel('绝对误差', fontsize=12)
axes[2].set_ylabel('相对误差', fontsize=12)
axes[2].set_title('绝对误差 vs 相对误差', fontsize=14, fontweight='bold')
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# 打印统计信息
print("\n=== 误差统计 ===")
print(f"相对误差 - 平均: {np.mean(relative_errors):.6f}, 标准差: {np.std(relative_errors):.6f}")
print(f"相对误差 - 最小: {np.min(relative_errors):.6f}, 最大: {np.max(relative_errors):.6f}")
print(f"绝对误差 - 平均: {np.mean(absolute_errors):.6f}, 标准差: {np.std(absolute_errors):.6f}")
print(f"绝对误差 - 最小: {np.min(absolute_errors):.6f}, 最大: {np.max(absolute_errors):.6f}")

## 6. 实战案例：热传导方程

### 6.1 时间演化可视化

让我们看看Transformer如何捕捉热传导的时间演化过程：

In [None]:
def generate_time_evolution(initial_condition, x_grid, nt=50, alpha=0.01):
    """生成时间演化序列"""
    nx = len(x_grid)
    dx = x_grid[1] - x_grid[0]
    dt = 1.0 / (nt - 1)
    r = alpha * dt / dx**2
    
    u = np.zeros((nt, nx))
    u[0] = initial_condition
    u[:, 0] = 0
    u[:, -1] = 0
    
    for n in range(0, nt-1):
        for i in range(1, nx-1):
            u[n+1, i] = u[n, i] + r * (u[n, i+1] - 2*u[n, i] + u[n, i-1])
    
    return u

# 选择一个初始条件
test_idx = 0
initial = X_test[test_idx]
true_evolution = generate_time_evolution(initial, x_grid)

# 使用模型进行多步预测（这里我们简化为直接预测最终状态）
model.eval()
with torch.no_grad():
    input_data = torch.FloatTensor(initial).unsqueeze(0).unsqueeze(-1).to(device)
    coords_data = torch.FloatTensor(x_grid).unsqueeze(0).unsqueeze(-1).to(device)
    final_pred = model(input_data, coords_data).cpu().numpy()[0, :, 0]

# 可视化
fig = plt.figure(figsize=(16, 10))

# 真实的时间演化
ax1 = plt.subplot(2, 2, 1)
time_steps_to_show = [0, 10, 20, 30, 40, 49]
for t_idx in time_steps_to_show:
    ax1.plot(x_grid, true_evolution[t_idx], linewidth=2, 
             label=f't = {t_idx/(len(true_evolution)-1):.2f}')
ax1.set_xlabel('x', fontsize=12)
ax1.set_ylabel('u(x, t)', fontsize=12)
ax1.set_title('真实的时间演化', fontsize=14, fontweight='bold')
ax1.legend(fontsize=10)
ax1.grid(True, alpha=0.3)

# 热力图
ax2 = plt.subplot(2, 2, 2)
im = ax2.imshow(true_evolution.T, aspect='auto', origin='lower', 
                extent=[0, 1, 0, 1], cmap='hot')
ax2.set_xlabel('时间 t', fontsize=12)
ax2.set_ylabel('空间 x', fontsize=12)
ax2.set_title('热传导时空演化', fontsize=14, fontweight='bold')
plt.colorbar(im, ax=ax2, label='u(x, t)')

# 初始 vs 最终状态对比
ax3 = plt.subplot(2, 2, 3)
ax3.plot(x_grid, initial, 'b-', linewidth=2, label='初始条件')
ax3.plot(x_grid, true_evolution[-1], 'g-', linewidth=2, label='真实最终状态')
ax3.plot(x_grid, final_pred, 'r--', linewidth=2, label='预测最终状态')
ax3.set_xlabel('x', fontsize=12)
ax3.set_ylabel('u', fontsize=12)
ax3.set_title('初始 vs 最终状态', fontsize=14, fontweight='bold')
ax3.legend(fontsize=10)
ax3.grid(True, alpha=0.3)

# 误差分布
ax4 = plt.subplot(2, 2, 4)
error = np.abs(final_pred - true_evolution[-1])
ax4.plot(x_grid, error, 'r-', linewidth=2)
ax4.fill_between(x_grid, 0, error, alpha=0.3, color='red')
ax4.set_xlabel('x', fontsize=12)
ax4.set_ylabel('绝对误差', fontsize=12)
ax4.set_title(f'预测误差分布 (最大误差: {np.max(error):.4f})', 
              fontsize=14, fontweight='bold')
ax4.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 7. 总结与展望

### 7.1 本教程学到的内容

✅ **基础概念**：
- Transformer的核心原理：自注意力机制
- 位置编码在PDE中的应用
- 序列建模思想

✅ **实践技能**：
- 数据准备与预处理
- 构建适用于PDE的Transformer模型
- 训练与评估流程
- 结果可视化与误差分析

✅ **应用案例**：
- 1D热传导方程求解
- 时间演化预测

### 7.2 进阶方向

🚀 **模型改进**：
- 尝试Vision Transformer处理2D/3D问题
- 引入物理信息约束（Physics-Informed Transformer）
- 多尺度注意力机制
- 图Transformer处理不规则网格

🚀 **应用扩展**：
- Navier-Stokes方程（流体力学）
- Maxwell方程（电磁学）
- Schrödinger方程（量子力学）
- 多物理场耦合问题

🚀 **性能优化**：
- 混合精度训练
- 模型剪枝与量化
- 分布式训练
- 迁移学习

### 7.3 相关资源

📚 **论文**：
- "Choose a Transformer: Fourier or Galerkin" (NeurIPS 2021)
- "Transformer for Partial Differential Equations' Operator Learning" (2022)
- "Physics-Informed Neural Networks: A Deep Learning Framework" (2019)

💻 **代码库**：
- 本项目：`AI4CFD/Transformer/`
- PyTorch官方文档：https://pytorch.org/docs/stable/nn.html#transformer
- Hugging Face Transformers：https://huggingface.co/transformers/

---

**感谢使用本教程！如有问题，欢迎提Issue或PR。**

## 附录：模型保存与加载

In [None]:
# 保存完整模型
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'train_losses': train_losses,
    'test_losses': test_losses,
    'best_test_loss': best_test_loss,
}, 'transformer_checkpoint.pth')

print("模型已保存到 transformer_checkpoint.pth")

# 加载模型示例
# checkpoint = torch.load('transformer_checkpoint.pth')
# model.load_state_dict(checkpoint['model_state_dict'])
# optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
# print(f"模型已加载，最佳测试损失: {checkpoint['best_test_loss']:.6f}")