# 时间序列预测实验 Notebook

这个notebook用于：
- 快速测试和调试代码
- 探索性数据分析
- 实验新的模型和方法
- 可视化和结果分析

## 1. 环境设置

In [5]:
# 添加项目根目录到Python路径
import sys
import os
sys.path.append(os.path.dirname(os.getcwd()))

# 基础导入
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yaml
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

# 设置matplotlib参数
%matplotlib inline
plt.style.use('seaborn-v0_8-darkgrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 12

print("环境设置完成！")

环境设置完成！


## 2. 加载配置和初始化组件

In [6]:
# 导入项目模块
from src.data.data_loader import DataLoader
from src.models.mean_model import MeanModel
from src.models.log_model import LogModel
from src.training.trainer import Trainer
from src.evaluation.evaluator import ModelEvaluator

# 加载配置
config_path = '../config/config.yaml'
with open(config_path, 'r') as f:
    config = yaml.safe_load(f)

print("配置加载成功！")
print(f"数据集: {config['dataset']['name']}")
print(f"Mean Model - 上下文长度: {config['mean_model']['context_length']}, 预测长度: {config['mean_model']['prediction_length']}")
print(f"Log Model - 上下文长度: {config['log_model']['context_length']}, 预测长度: {config['log_model']['prediction_length']}")

ImportError: cannot import name 'MeanModel' from 'src.models.mean_model' (/zhangyunyi/code/DualRes/univariate/src/models/mean_model.py)

In [None]:
# 初始化组件
data_loader = DataLoader(config)
mean_model = MeanModel(config)
log_model = LogModel(config)
trainer = Trainer(config, data_loader, mean_model, log_model)
evaluator = ModelEvaluator(config)

print("组件初始化完成！")

## 3. 数据探索

In [None]:
# 加载数据
data_loader.load_dataset()
dataset_info = data_loader.get_dataset_info()

print("数据集信息：")
for key, value in dataset_info.items():
    print(f"  {key}: {value}")

In [None]:
# 可视化数据样本
from gluonts.dataset.util import to_pandas

# 获取第一个时间序列
train_entry = data_loader.train_data[0]
test_entry = data_loader.test_data[0]

# 转换为pandas Series
train_series = to_pandas(train_entry, freq=config['dataset']['freq'])
test_series = to_pandas(test_entry, freq=config['dataset']['freq'])

# 绘图
fig, axes = plt.subplots(2, 1, figsize=(14, 8))

axes[0].plot(train_series, label='Train', color='blue', alpha=0.7)
axes[0].set_title(f"Training Data - Series {train_entry['item_id']}")
axes[0].set_ylabel('Value')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

axes[1].plot(test_series, label='Test', color='green', alpha=0.7)
axes[1].set_title(f"Test Data - Series {test_entry['item_id']}")
axes[1].set_xlabel('Time')
axes[1].set_ylabel('Value')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"训练序列长度: {len(train_series)}")
print(f"测试序列长度: {len(test_series)}")

In [None]:
# 数据统计分析
train_values = [entry['target'] for entry in data_loader.train_data]
train_flat = np.concatenate(train_values)

print("训练数据统计：")
print(f"  总数据点: {len(train_flat):,}")
print(f"  均值: {np.mean(train_flat):.4f}")
print(f"  标准差: {np.std(train_flat):.4f}")
print(f"  最小值: {np.min(train_flat):.4f}")
print(f"  最大值: {np.max(train_flat):.4f}")
print(f"  25%分位数: {np.percentile(train_flat, 25):.4f}")
print(f"  50%分位数: {np.percentile(train_flat, 50):.4f}")
print(f"  75%分位数: {np.percentile(train_flat, 75):.4f}")

## 4. Mean Model 实验

In [None]:
# 检查是否已有训练好的模型
model_exists = os.path.exists(mean_model.model_path)
print(f"Mean Model 是否已存在: {model_exists}")

if model_exists:
    print(f"模型路径: {mean_model.model_path}")
    # 加载已有模型
    mean_model.load_predictor()
    print("模型加载成功！")
else:
    print("需要训练新模型")
    # 训练模型
    mean_model.train(data_loader.train_data[:10])  # 使用部分数据快速测试
    print("模型训练完成！")

In [None]:
# 快速预测测试
from gluonts.evaluation import make_evaluation_predictions

# 使用前3个时间序列进行快速测试
test_subset = data_loader.test_data[:3]

forecasts, tss = mean_model.predict(test_subset, num_samples=1)

# 可视化预测结果
for i in range(min(3, len(forecasts))):
    plt.figure(figsize=(12, 4))
    
    # 真实值
    tss[i][-100:].plot(label='True values', color='blue')
    
    # 预测值
    pred_index = pd.date_range(
        start=forecasts[i].start_date.to_timestamp(),
        periods=len(forecasts[i].mean),
        freq=config['dataset']['freq']
    )
    forecast_series = pd.Series(forecasts[i].mean, index=pred_index)
    forecast_series.plot(label='Prediction', color='red', marker='.')
    
    plt.title(f'Mean Model Prediction - Series {i}')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

## 5. 滑动窗口预测分析

In [None]:
# 检查滑动窗口预测结果
mean_pred_path = os.path.join(config['paths']['predictions_dir'], "mean_predictions_value.npy")

if os.path.exists(mean_pred_path):
    mean_predictions = np.load(mean_pred_path, allow_pickle=True)
    print(f"滑动窗口预测已加载")
    print(f"预测形状: {[len(p) for p in mean_predictions[:5]]}")
else:
    print("生成滑动窗口预测...")
    # 使用少量数据测试
    test_data = data_loader.train_data[:5]
    mean_predictions = mean_model.sliding_window_prediction(test_data)
    print(f"预测完成！")

In [None]:
# 分析预测误差
if 'mean_predictions' in locals():
    series_idx = 0
    true_values = data_loader.train_data[series_idx]['target']
    pred_values = mean_predictions[series_idx]
    
    # 对齐数据
    context_len = config['mean_model']['context_length']
    true_aligned = true_values[context_len:context_len+len(pred_values)]
    
    # 计算误差
    errors = np.array(true_aligned) - np.array(pred_values)
    
    # 绘制误差分布
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # 误差时序图
    axes[0].plot(errors[:200], alpha=0.7)
    axes[0].axhline(y=0, color='r', linestyle='--', alpha=0.5)
    axes[0].set_title('Prediction Errors Over Time')
    axes[0].set_xlabel('Time Step')
    axes[0].set_ylabel('Error')
    axes[0].grid(True, alpha=0.3)
    
    # 误差直方图
    axes[1].hist(errors, bins=50, edgecolor='black', alpha=0.7)
    axes[1].set_title('Error Distribution')
    axes[1].set_xlabel('Error')
    axes[1].set_ylabel('Frequency')
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print(f"误差统计：")
    print(f"  均值: {np.mean(errors):.4f}")
    print(f"  标准差: {np.std(errors):.4f}")
    print(f"  MAE: {np.mean(np.abs(errors)):.4f}")
    print(f"  RMSE: {np.sqrt(np.mean(errors**2)):.4f}")

## 6. Log Model 分析

In [None]:
# 加载或计算log-MSE
log_mse_path = os.path.join(config['paths']['results_dir'], 
                            f"{config['dataset']['name']}_log_mse_losses.csv")

if os.path.exists(log_mse_path):
    log_mse_df = pd.read_csv(log_mse_path)
    print("Log-MSE数据加载成功")
    print(f"数据形状: {log_mse_df.shape}")
    print(f"\n前5行数据：")
    print(log_mse_df.head())
else:
    print("Log-MSE数据不存在，需要先运行完整训练流程")

In [None]:
# 分析log-MSE的自相关性
if 'log_mse_df' in locals():
    from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
    
    # 选择一个时间序列
    series_id = 0
    series_data = log_mse_df[log_mse_df['series_id'] == series_id]['log_mse'].values
    
    fig, axes = plt.subplots(2, 2, figsize=(14, 8))
    
    # 时序图
    axes[0, 0].plot(series_data[:500], alpha=0.7)
    axes[0, 0].set_title(f'Log-MSE Time Series (Series {series_id})')
    axes[0, 0].set_xlabel('Time')
    axes[0, 0].set_ylabel('Log-MSE')
    axes[0, 0].grid(True, alpha=0.3)
    
    # 分布图
    axes[0, 1].hist(series_data, bins=50, edgecolor='black', alpha=0.7)
    axes[0, 1].set_title('Log-MSE Distribution')
    axes[0, 1].set_xlabel('Log-MSE')
    axes[0, 1].set_ylabel('Frequency')
    axes[0, 1].grid(True, alpha=0.3)
    
    # ACF图
    plot_acf(series_data[:1000], lags=50, ax=axes[1, 0])
    axes[1, 0].set_title('Autocorrelation Function')
    
    # PACF图
    plot_pacf(series_data[:1000], lags=50, ax=axes[1, 1])
    axes[1, 1].set_title('Partial Autocorrelation Function')
    
    plt.tight_layout()
    plt.show()
    
    print(f"Log-MSE统计（Series {series_id}）：")
    print(f"  均值: {np.mean(series_data):.4f}")
    print(f"  标准差: {np.std(series_data):.4f}")
    print(f"  最小值: {np.min(series_data):.4f}")
    print(f"  最大值: {np.max(series_data):.4f}")

## 7. 最终预测结果分析

In [None]:
# 加载最终预测结果
final_res_path = os.path.join(config['paths']['predictions_dir'], "final_res_tensor.npy")

if os.path.exists(final_res_path):
    final_res = np.load(final_res_path)
    print(f"最终预测结果加载成功")
    print(f"结果形状: {final_res.shape}")
    print(f"  采样数: {final_res.shape[0]}")
    print(f"  时间序列数: {final_res.shape[1]}")
    print(f"  预测长度: {final_res.shape[2]}")
else:
    print("最终预测结果不存在，需要先运行预测流程")

In [None]:
# 分析预测不确定性
if 'final_res' in locals():
    series_idx = 0
    samples = final_res[:, series_idx, :]
    
    # 计算统计量
    mean_pred = np.mean(samples, axis=0)
    std_pred = np.std(samples, axis=0)
    p5 = np.percentile(samples, 5, axis=0)
    p25 = np.percentile(samples, 25, axis=0)
    p75 = np.percentile(samples, 75, axis=0)
    p95 = np.percentile(samples, 95, axis=0)
    
    # 绘制不确定性
    fig, axes = plt.subplots(2, 1, figsize=(14, 8))
    
    x = np.arange(len(mean_pred))
    
    # 预测和置信区间
    axes[0].plot(x, mean_pred, 'b-', label='Mean Prediction')
    axes[0].fill_between(x, p5, p95, alpha=0.2, color='blue', label='90% CI')
    axes[0].fill_between(x, p25, p75, alpha=0.3, color='blue', label='50% CI')
    axes[0].set_title(f'Prediction with Confidence Intervals (Series {series_idx})')
    axes[0].set_xlabel('Prediction Step')
    axes[0].set_ylabel('Value')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # 标准差随时间变化
    axes[1].plot(x, std_pred, 'r-', linewidth=2)
    axes[1].set_title('Prediction Uncertainty (Standard Deviation)')
    axes[1].set_xlabel('Prediction Step')
    axes[1].set_ylabel('Std Dev')
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print("不确定性分析：")
    print(f"  平均标准差: {np.mean(std_pred):.4f}")
    print(f"  最小标准差: {np.min(std_pred):.4f} (步骤 {np.argmin(std_pred)})")
    print(f"  最大标准差: {np.max(std_pred):.4f} (步骤 {np.argmax(std_pred)})")
    print(f"  标准差增长率: {(std_pred[-1] - std_pred[0]) / std_pred[0] * 100:.2f}%")

## 8. 实验新模型

In [None]:
# 实验：尝试不同的模型配置
from gluonts.torch.model.d_linear import DLinearEstimator
from gluonts.torch.model.deepar import DeepAREstimator

# 创建不同的模型配置
model_configs = [
    {
        'name': 'DLinear',
        'estimator': DLinearEstimator(
            prediction_length=24,
            context_length=168,  # 一周
            trainer_kwargs={"max_epochs": 10}
        )
    },
    {
        'name': 'DeepAR',
        'estimator': DeepAREstimator(
            prediction_length=24,
            context_length=168,
            freq=config['dataset']['freq'],
            trainer_kwargs={"max_epochs": 10}
        )
    }
]

print("实验模型配置准备完成")
print(f"待测试模型: {[m['name'] for m in model_configs]}")

In [None]:
# 快速比较不同模型（使用少量数据）
from gluonts.evaluation import Evaluator

# 使用少量数据进行快速实验
train_subset = data_loader.train_data[:10]
test_subset = data_loader.test_data[:10]

results = []
evaluator = Evaluator()

for model_config in model_configs:
    print(f"\n训练 {model_config['name']}...")
    
    # 训练
    predictor = model_config['estimator'].train(train_subset)
    
    # 预测
    from gluonts.evaluation import make_evaluation_predictions
    forecast_it, ts_it = make_evaluation_predictions(
        dataset=test_subset,
        predictor=predictor,
        num_samples=10
    )
    forecasts = list(forecast_it)
    tss = list(ts_it)
    
    # 评估
    agg_metrics, _ = evaluator(tss, forecasts)
    
    results.append({
        'model': model_config['name'],
        'RMSE': agg_metrics['RMSE'],
        'MAPE': agg_metrics['MAPE'],
        'sMAPE': agg_metrics['sMAPE']
    })
    
    print(f"  RMSE: {agg_metrics['RMSE']:.4f}")
    print(f"  MAPE: {agg_metrics['MAPE']:.4f}")

# 结果对比
results_df = pd.DataFrame(results)
print("\n模型性能对比：")
print(results_df.to_string(index=False))

## 9. 自定义实验区域

In [None]:
# 这里可以添加你自己的实验代码
# 例如：测试新的特征工程方法、新的模型架构等

print("自定义实验区域 - 在这里添加你的实验代码")

## 10. 保存实验结果

In [None]:
# 保存实验结果
import json
from datetime import datetime

experiment_results = {
    'timestamp': datetime.now().isoformat(),
    'dataset': config['dataset']['name'],
    'mean_model_config': config['mean_model'],
    'log_model_config': config['log_model'],
    'results': {}
}

# 如果有评估结果，添加到实验记录
if 'results_df' in locals():
    experiment_results['results']['model_comparison'] = results_df.to_dict('records')

# 保存到文件
exp_dir = '../res/experiments/'
os.makedirs(exp_dir, exist_ok=True)

exp_filename = f"experiment_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
exp_path = os.path.join(exp_dir, exp_filename)

with open(exp_path, 'w') as f:
    json.dump(experiment_results, f, indent=2)

print(f"实验结果已保存到: {exp_path}")

## 总结

这个notebook提供了以下功能：

1. **快速测试**: 可以快速测试项目中的各个模块
2. **数据探索**: 可视化和分析数据集
3. **模型调试**: 逐步调试模型训练和预测过程
4. **结果分析**: 详细分析预测结果和不确定性
5. **实验对比**: 比较不同模型配置的性能
6. **可视化**: 生成各种图表帮助理解模型行为

使用提示：
- 修改配置文件后，重新运行第2节的代码块重新加载配置
- 可以使用部分数据进行快速实验，避免长时间等待
- 所有实验结果会自动保存，方便后续分析