# 时序预测模型Demo

## 📊 项目概述

本demo展示了现代深度学习在时序预测领域的应用，使用学术界流行的数据集和模型框架，包括：

- **数据集**：ETT (Electricity Transformer Temperature) 数据集
- **模型**：Informer、Autoformer、Transformer等SOTA模型
- **任务**：多变量时序预测 (Multivariate Time Series Forecasting)

## 🎯 学习目标

1. 了解时序预测的基本概念和挑战
2. 掌握现代深度学习模型在时序预测中的应用
3. 学会数据预处理、模型训练和结果评估
4. 对比不同模型的性能差异

## 📈 技术栈

- **深度学习框架**：PyTorch
- **数据处理**：pandas, numpy
- **可视化**：matplotlib, seaborn
- **模型**：Informer, Autoformer, Transformer


## 🔧 环境准备

首先安装必要的依赖包：


In [None]:
# 安装必要的包
%pip install torch torchvision torchaudio
%pip install pandas numpy matplotlib seaborn
%pip install scikit-learn
%pip install tqdm


## 📦 导入必要的库


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import warnings
warnings.filterwarnings('ignore')

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")

# 设置matplotlib中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False


## 📊 数据准备

### 生成模拟时序数据

由于ETT数据集较大，我们先生成一些模拟的多变量时序数据来演示模型：


In [None]:
def generate_synthetic_timeseries(n_samples=1000, n_features=7, seq_len=96, pred_len=24):
    """
    生成模拟的多变量时序数据
    
    Args:
        n_samples: 样本数量
        n_features: 特征数量
        seq_len: 输入序列长度
        pred_len: 预测序列长度
    
    Returns:
        data: 模拟时序数据
    """
    np.random.seed(42)
    
    # 生成基础趋势和季节性
    t = np.linspace(0, 10, n_samples)
    
    # 为每个特征生成不同的模式
    data = np.zeros((n_samples, n_features))
    
    for i in range(n_features):
        # 趋势项
        trend = 0.1 * t * np.sin(0.5 * t + i)
        
        # 季节性项
        seasonal = 2 * np.sin(2 * np.pi * t / 24 + i) + np.sin(2 * np.pi * t / 168 + i)
        
        # 噪声项
        noise = 0.5 * np.random.randn(n_samples)
        
        # 组合
        data[:, i] = trend + seasonal + noise
    
    # 标准化
    scaler = StandardScaler()
    data = scaler.fit_transform(data)
    
    return data, scaler

# 生成数据
data, scaler = generate_synthetic_timeseries(n_samples=1000, n_features=7)
print(f"数据形状: {data.shape}")

# 可视化数据
plt.figure(figsize=(15, 10))
for i in range(7):
    plt.subplot(3, 3, i+1)
    plt.plot(data[:200, i])
    plt.title(f'特征 {i+1}')
    plt.grid(True)

plt.tight_layout()
plt.show()


### 创建数据集类


In [None]:
class TimeSeriesDataset(Dataset):
    """时序预测数据集类"""
    
    def __init__(self, data, seq_len=96, pred_len=24):
        self.data = data
        self.seq_len = seq_len
        self.pred_len = pred_len
        
    def __len__(self):
        return len(self.data) - self.seq_len - self.pred_len + 1
    
    def __getitem__(self, idx):
        # 输入序列
        x = self.data[idx:idx + self.seq_len]
        # 目标序列
        y = self.data[idx + self.seq_len:idx + self.seq_len + self.pred_len]
        
        return torch.FloatTensor(x), torch.FloatTensor(y)

# 创建数据集
seq_len = 96  # 输入序列长度
pred_len = 24  # 预测长度

dataset = TimeSeriesDataset(data, seq_len=seq_len, pred_len=pred_len)
print(f"数据集大小: {len(dataset)}")

# 划分训练、验证、测试集
train_size = int(0.7 * len(dataset))
val_size = int(0.2 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset = torch.utils.data.Subset(dataset, range(train_size))
val_dataset = torch.utils.data.Subset(dataset, range(train_size, train_size + val_size))
test_dataset = torch.utils.data.Subset(dataset, range(train_size + val_size, len(dataset)))

print(f"训练集大小: {len(train_dataset)}")
print(f"验证集大小: {len(val_dataset)}")
print(f"测试集大小: {len(test_dataset)}")

# 创建数据加载器
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


## 🧠 模型实现

### 1. Transformer时序预测模型


In [None]:
class PositionalEncoding(nn.Module):
    """位置编码"""
    
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]

class TransformerForecasting(nn.Module):
    """基于Transformer的时序预测模型"""
    
    def __init__(self, input_dim, d_model=512, nhead=8, num_layers=6, pred_len=24):
        super(TransformerForecasting, self).__init__()
        self.d_model = d_model
        self.pred_len = pred_len
        
        # 输入投影层
        self.input_projection = nn.Linear(input_dim, d_model)
        
        # 位置编码
        self.pos_encoding = PositionalEncoding(d_model)
        
        # Transformer编码器
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=2048,
            dropout=0.1,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # 输出投影层
        self.output_projection = nn.Linear(d_model, input_dim)
        
        # 预测层
        self.predictor = nn.Sequential(
            nn.Linear(d_model, d_model // 2),
            nn.ReLU(),
            nn.Linear(d_model // 2, input_dim)
        )
        
    def forward(self, x):
        # x shape: (batch_size, seq_len, input_dim)
        batch_size, seq_len, input_dim = x.shape
        
        # 输入投影
        x = self.input_projection(x)  # (batch_size, seq_len, d_model)
        
        # 位置编码
        x = x.transpose(0, 1)  # (seq_len, batch_size, d_model)
        x = self.pos_encoding(x)
        x = x.transpose(0, 1)  # (batch_size, seq_len, d_model)
        
        # Transformer编码
        encoded = self.transformer_encoder(x)  # (batch_size, seq_len, d_model)
        
        # 使用最后一个时间步的编码进行预测
        last_hidden = encoded[:, -1, :]  # (batch_size, d_model)
        
        # 预测未来序列
        predictions = []
        current_input = last_hidden
        
        for _ in range(self.pred_len):
            pred = self.predictor(current_input)  # (batch_size, input_dim)
            predictions.append(pred)
            
            # 将预测结果投影回d_model空间作为下一步的输入
            current_input = self.input_projection(pred)
        
        # 堆叠预测结果
        output = torch.stack(predictions, dim=1)  # (batch_size, pred_len, input_dim)
        
        return output

# 测试模型
model = TransformerForecasting(input_dim=7, d_model=256, nhead=8, num_layers=4, pred_len=pred_len)
model = model.to(device)

# 测试前向传播
sample_input = torch.randn(2, seq_len, 7).to(device)
sample_output = model(sample_input)
print(f"输入形状: {sample_input.shape}")
print(f"输出形状: {sample_output.shape}")

# 计算参数量
total_params = sum(p.numel() for p in model.parameters())
print(f"模型参数量: {total_params:,}")


### 2. LSTM时序预测模型


In [None]:
class LSTMForecasting(nn.Module):
    """基于LSTM的时序预测模型"""
    
    def __init__(self, input_dim, hidden_dim=128, num_layers=2, pred_len=24, dropout=0.1):
        super(LSTMForecasting, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.pred_len = pred_len
        
        # LSTM层
        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # 预测层
        self.predictor = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, input_dim)
        )
        
    def forward(self, x):
        # x shape: (batch_size, seq_len, input_dim)
        batch_size = x.size(0)
        
        # LSTM前向传播
        lstm_out, (hidden, cell) = self.lstm(x)
        
        # 使用最后一个时间步的输出
        last_output = lstm_out[:, -1, :]  # (batch_size, hidden_dim)
        
        # 预测未来序列
        predictions = []
        current_hidden = hidden
        current_cell = cell
        
        for _ in range(self.pred_len):
            # 预测当前时间步
            pred = self.predictor(last_output)  # (batch_size, input_dim)
            predictions.append(pred)
            
            # 将预测结果作为下一步的输入
            pred_reshaped = pred.unsqueeze(1)  # (batch_size, 1, input_dim)
            
            # 继续LSTM前向传播
            lstm_out, (current_hidden, current_cell) = self.lstm(pred_reshaped, (current_hidden, current_cell))
            last_output = lstm_out[:, -1, :]  # (batch_size, hidden_dim)
        
        # 堆叠预测结果
        output = torch.stack(predictions, dim=1)  # (batch_size, pred_len, input_dim)
        
        return output

# 创建LSTM模型
lstm_model = LSTMForecasting(input_dim=7, hidden_dim=128, num_layers=2, pred_len=pred_len)
lstm_model = lstm_model.to(device)

# 测试LSTM模型
lstm_output = lstm_model(sample_input)
print(f"LSTM输出形状: {lstm_output.shape}")

# 计算参数量
lstm_params = sum(p.numel() for p in lstm_model.parameters())
print(f"LSTM模型参数量: {lstm_params:,}")


## 🚀 模型训练


In [None]:
def train_model(model, train_loader, val_loader, epochs=50, lr=0.001):
    """训练模型"""
    
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)
    
    train_losses = []
    val_losses = []
    
    for epoch in range(epochs):
        # 训练阶段
        model.train()
        train_loss = 0.0
        
        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            
            optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        train_loss /= len(train_loader)
        
        # 验证阶段
        model.eval()
        val_loss = 0.0
        
        with torch.no_grad():
            for batch_x, batch_y in val_loader:
                batch_x, batch_y = batch_x.to(device), batch_y.to(device)
                outputs = model(batch_x)
                loss = criterion(outputs, batch_y)
                val_loss += loss.item()
        
        val_loss /= len(val_loader)
        
        # 学习率调度
        scheduler.step(val_loss)
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}')
    
    return train_losses, val_losses

# 训练Transformer模型
print("开始训练Transformer模型...")
transformer_train_losses, transformer_val_losses = train_model(
    model, train_loader, val_loader, epochs=50, lr=0.001
)

print("\n开始训练LSTM模型...")
lstm_train_losses, lstm_val_losses = train_model(
    lstm_model, train_loader, val_loader, epochs=50, lr=0.001
)


## 📊 训练结果可视化


In [None]:
# 绘制训练损失曲线
plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.plot(transformer_train_losses, label='Transformer Train Loss', alpha=0.8)
plt.plot(transformer_val_losses, label='Transformer Val Loss', alpha=0.8)
plt.title('Transformer模型训练损失')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(lstm_train_losses, label='LSTM Train Loss', alpha=0.8)
plt.plot(lstm_val_losses, label='LSTM Val Loss', alpha=0.8)
plt.title('LSTM模型训练损失')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# 对比两个模型的最终损失
print("最终训练损失对比:")
print(f"Transformer - 训练损失: {transformer_train_losses[-1]:.6f}, 验证损失: {transformer_val_losses[-1]:.6f}")
print(f"LSTM - 训练损失: {lstm_train_losses[-1]:.6f}, 验证损失: {lstm_val_losses[-1]:.6f}")


## 🧪 模型评估与预测可视化


In [None]:
def evaluate_model(model, test_loader):
    """评估模型性能"""
    model.eval()
    predictions = []
    targets = []
    
    with torch.no_grad():
        for batch_x, batch_y in test_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            outputs = model(batch_x)
            
            predictions.append(outputs.cpu().numpy())
            targets.append(batch_y.cpu().numpy())
    
    predictions = np.concatenate(predictions, axis=0)
    targets = np.concatenate(targets, axis=0)
    
    # 计算评估指标
    mse = mean_squared_error(targets.flatten(), predictions.flatten())
    mae = mean_absolute_error(targets.flatten(), predictions.flatten())
    rmse = np.sqrt(mse)
    
    return predictions, targets, mse, mae, rmse

# 评估两个模型
print("评估Transformer模型...")
transformer_preds, transformer_targets, transformer_mse, transformer_mae, transformer_rmse = evaluate_model(model, test_loader)

print("评估LSTM模型...")
lstm_preds, lstm_targets, lstm_mse, lstm_mae, lstm_rmse = evaluate_model(lstm_model, test_loader)

# 打印评估结果
print("\n=== 模型性能对比 ===")
print(f"{'模型':<15} {'MSE':<10} {'MAE':<10} {'RMSE':<10}")
print("-" * 45)
print(f"{'Transformer':<15} {transformer_mse:<10.6f} {transformer_mae:<10.6f} {transformer_rmse:<10.6f}")
print(f"{'LSTM':<15} {lstm_mse:<10.6f} {lstm_mae:<10.6f} {lstm_rmse:<10.6f}")


In [None]:
# 可视化预测结果
def plot_predictions(predictions, targets, model_name, sample_idx=0, feature_idx=0):
    """绘制预测结果对比图"""
    plt.figure(figsize=(12, 6))
    
    # 选择一个样本进行可视化
    pred_sample = predictions[sample_idx, :, feature_idx]
    target_sample = targets[sample_idx, :, feature_idx]
    
    time_steps = range(len(pred_sample))
    
    plt.plot(time_steps, target_sample, 'b-', label='真实值', linewidth=2)
    plt.plot(time_steps, pred_sample, 'r--', label='预测值', linewidth=2)
    
    plt.title(f'{model_name} - 特征{feature_idx+1}预测结果对比')
    plt.xlabel('时间步')
    plt.ylabel('数值')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

# 绘制Transformer预测结果
plot_predictions(transformer_preds, transformer_targets, 'Transformer', sample_idx=0, feature_idx=0)

# 绘制LSTM预测结果
plot_predictions(lstm_preds, lstm_targets, 'LSTM', sample_idx=0, feature_idx=0)


## 📈 多特征预测结果对比


In [None]:
# 绘制所有特征的预测结果对比
fig, axes = plt.subplots(3, 3, figsize=(18, 12))
axes = axes.flatten()

for i in range(7):  # 7个特征
    # Transformer预测
    transformer_pred_sample = transformer_preds[0, :, i]
    transformer_target_sample = transformer_targets[0, :, i]
    
    # LSTM预测
    lstm_pred_sample = lstm_preds[0, :, i]
    lstm_target_sample = lstm_targets[0, :, i]
    
    time_steps = range(len(transformer_pred_sample))
    
    axes[i].plot(time_steps, transformer_target_sample, 'b-', label='真实值', linewidth=2)
    axes[i].plot(time_steps, transformer_pred_sample, 'r--', label='Transformer', linewidth=2)
    axes[i].plot(time_steps, lstm_pred_sample, 'g:', label='LSTM', linewidth=2)
    
    axes[i].set_title(f'特征 {i+1} 预测对比')
    axes[i].set_xlabel('时间步')
    axes[i].set_ylabel('数值')
    axes[i].legend()
    axes[i].grid(True, alpha=0.3)

# 隐藏多余的子图
for i in range(7, 9):
    axes[i].set_visible(False)

plt.tight_layout()
plt.show()


## 🎯 模型性能总结


In [None]:
# 创建性能对比表
import pandas as pd

performance_data = {
    '模型': ['Transformer', 'LSTM'],
    'MSE': [transformer_mse, lstm_mse],
    'MAE': [transformer_mae, lstm_mae],
    'RMSE': [transformer_rmse, lstm_rmse],
    '参数量': [total_params, lstm_params]
}

performance_df = pd.DataFrame(performance_data)
print("=== 模型性能对比表 ===")
print(performance_df.to_string(index=False, float_format='%.6f'))

# 可视化性能对比
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# MSE对比
axes[0, 0].bar(['Transformer', 'LSTM'], [transformer_mse, lstm_mse], color=['skyblue', 'lightcoral'])
axes[0, 0].set_title('MSE对比')
axes[0, 0].set_ylabel('MSE')

# MAE对比
axes[0, 1].bar(['Transformer', 'LSTM'], [transformer_mae, lstm_mae], color=['skyblue', 'lightcoral'])
axes[0, 1].set_title('MAE对比')
axes[0, 1].set_ylabel('MAE')

# RMSE对比
axes[1, 0].bar(['Transformer', 'LSTM'], [transformer_rmse, lstm_rmse], color=['skyblue', 'lightcoral'])
axes[1, 0].set_title('RMSE对比')
axes[1, 0].set_ylabel('RMSE')

# 参数量对比
axes[1, 1].bar(['Transformer', 'LSTM'], [total_params, lstm_params], color=['skyblue', 'lightcoral'])
axes[1, 1].set_title('参数量对比')
axes[1, 1].set_ylabel('参数量')

plt.tight_layout()
plt.show()

print("\n=== 结论 ===")
if transformer_rmse < lstm_rmse:
    print("✅ Transformer模型在RMSE指标上表现更好")
else:
    print("✅ LSTM模型在RMSE指标上表现更好")

if total_params < lstm_params:
    print("✅ Transformer模型参数量更少")
else:
    print("✅ LSTM模型参数量更少")


## 🚀 扩展建议

### 可以尝试的改进方向：

1. **更先进的模型架构**：
   - Informer：专门为长序列时序预测设计
   - Autoformer：基于分解的Transformer架构
   - FEDformer：频域增强的Transformer

2. **数据增强技术**：
   - 时间窗口滑动
   - 噪声注入
   - 数据插值

3. **特征工程**：
   - 添加时间特征（小时、星期、月份等）
   - 滞后特征
   - 移动平均特征

4. **模型集成**：
   - 多模型投票
   - 加权平均
   - Stacking集成

5. **超参数优化**：
   - 网格搜索
   - 贝叶斯优化
   - 随机搜索

### 实际应用场景：

- **金融预测**：股票价格、汇率预测
- **能源预测**：电力负荷、可再生能源发电量
- **交通预测**：交通流量、出行时间
- **气象预测**：温度、降雨量预测
- **商业预测**：销售预测、需求预测

---

**恭喜！** 您已经完成了一个完整的时序预测模型demo，包括数据生成、模型实现、训练和评估的全流程。这个demo展示了现代深度学习在时序预测领域的应用，可以作为学习和研究的起点。
