# 电力变压器油温预测 - 线性回归模型# Power Transformer Oil Temperature Prediction - Linear Regression本notebook使用线性回归模型预测变压器油温。**主要特性 / Key Features:**- 使用共享 `utils.py` 模块，代码简洁- 一次训练一个模型，参数可配置- 支持三种数据分割方式（sequential/random/label_random）- 支持特征选择（负载特征 + 时间特征）- 可选的异常值处理

In [None]:
# 导入所需库 / Import Required Librariesimport numpy as npimport torchimport torch.nn as nnfrom sklearn.preprocessing import StandardScalerfrom sklearn.metrics import mean_squared_error, mean_absolute_error, r2_scoreimport warningswarnings.filterwarnings('ignore')# 导入共享工具模块 / Import shared utilitiesfrom utils import *# 设置随机种子 / Set random seedsnp.random.seed(42)torch.manual_seed(42)print(f"默认设备 / Default device: {DEFAULT_CONFIG['device']}")

In [None]:
# 线性回归模型定义 / Linear Regression Model Definitionclass LinearRegressionModel(nn.Module):    """    多层线性回归模型 / Multi-layer Linear Regression Model    """    def __init__(self, input_size, seq_length, hidden_sizes=[64, 32], dropout=0.2):        super(LinearRegressionModel, self).__init__()        self.input_size = input_size        self.seq_length = seq_length        # 展平输入        flattened_size = seq_length * input_size        # 构建网络层        layers = []        prev_size = flattened_size        for hidden_size in hidden_sizes:            layers.append(nn.Linear(prev_size, hidden_size))            layers.append(nn.ReLU())            layers.append(nn.Dropout(dropout))            prev_size = hidden_size        # 输出层        layers.append(nn.Linear(prev_size, 1))        self.network = nn.Sequential(*layers)    def forward(self, x):        # x shape: (batch, seq_length, input_size)        batch_size = x.size(0)        x = x.view(batch_size, -1)  # 展平        return self.network(x)print("LinearRegressionModel 已定义 / LinearRegressionModel defined")

In [None]:
# 训练函数 / Training Functiondef train_model(train_loader, test_loader, input_size, seq_length,                hidden_sizes=[64, 32], dropout=0.2, num_epochs=100,                lr=0.001, patience=10, device='cpu'):    """    训练线性回归模型 / Train Linear Regression Model    注意：每个epoch都会打印训练进度 / Note: Prints progress every epoch    """    # 初始化模型    model = LinearRegressionModel(input_size, seq_length, hidden_sizes, dropout).to(device)    criterion = nn.MSELoss()    optimizer = torch.optim.Adam(model.parameters(), lr=lr)    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(        optimizer, mode='min', factor=0.5, patience=5, verbose=False    )    # 训练历史    history = {'train_loss': [], 'test_loss': [], 'train_mae': [], 'test_mae': []}    best_test_loss = float('inf')    patience_counter = 0    for epoch in range(num_epochs):        # 训练模式        model.train()        train_losses = []        train_maes = []        for X_batch, y_batch in train_loader:            X_batch, y_batch = X_batch.to(device), y_batch.to(device)            optimizer.zero_grad()            outputs = model(X_batch)            loss = criterion(outputs, y_batch)            loss.backward()            optimizer.step()            train_losses.append(loss.item())            train_maes.append(torch.mean(torch.abs(outputs - y_batch)).item())        # 评估模式        model.eval()        test_losses = []        test_maes = []        with torch.no_grad():            for X_batch, y_batch in test_loader:                X_batch, y_batch = X_batch.to(device), y_batch.to(device)                outputs = model(X_batch)                loss = criterion(outputs, y_batch)                test_losses.append(loss.item())                test_maes.append(torch.mean(torch.abs(outputs - y_batch)).item())        # 计算平均指标        train_loss = np.mean(train_losses)        test_loss = np.mean(test_losses)        train_mae = np.mean(train_maes)        test_mae = np.mean(test_maes)        history['train_loss'].append(train_loss)        history['test_loss'].append(test_loss)        history['train_mae'].append(train_mae)        history['test_mae'].append(test_mae)        # 每个epoch打印 / Print every epoch        print(f'Epoch {epoch+1}/{num_epochs} | Train Loss: {train_loss:.6f} | Test Loss: {test_loss:.6f}')        # 学习率调度        scheduler.step(test_loss)        # Early stopping        if test_loss < best_test_loss:            best_test_loss = test_loss            patience_counter = 0            best_model_state = model.state_dict().copy()        else:            patience_counter += 1            if patience_counter >= patience:                print(f'Early stopping触发，在epoch {epoch+1} / Early stopping at epoch {epoch+1}')                model.load_state_dict(best_model_state)                break    return model, historyprint("训练函数已定义 / Training function defined")

In [None]:
# 主训练函数 / Main Training Functiondef train_single_model(config=None, **kwargs):    """    训练单个模型 / Train a single model    Parameters:    -----------    config : dict (配置字典，None时使用DEFAULT_CONFIG)    **kwargs : 可覆盖config中的任意参数    Returns:    --------    results : dict 包含模型、历史、评估指标等    """    # 合并配置    if config is None:        config = DEFAULT_CONFIG.copy()    else:        config = config.copy()    config.update(kwargs)    # 获取预测时间范围配置    horizon = config['prediction_horizon']    horizon_cfg = HORIZON_CONFIGS[horizon]    offset = horizon_cfg['offset']    seq_length = config.get('seq_length', horizon_cfg['seq_length'])    print(f"\n{'='*60}")    print(f"训练配置 / Training Configuration:")    print(f"  数据集 / Dataset: {config['dataset_path']}")    print(f"  预测范围 / Horizon: {horizon} ({horizon_cfg['description']})")    print(f"  分割方式 / Split: {config['split_method']}")    print(f"  特征数 / Features: {len(config['load_features']) + len(config['time_features'])}")    print(f"{'='*60}\n")    # 1. 加载数据    df = load_data(config['dataset_path'])    print(f"数据加载完成 / Data loaded: {df.shape}")    # 2. 异常值处理（可选）    if config['remove_outliers']:        df, n_removed = remove_outliers(            df,            method=config['outlier_method'],            threshold=config['outlier_threshold']        )        print(f"移除异常值 / Removed outliers: {n_removed} 行")    # 3. 特征选择    # 3a. 负载特征    load_cols = config['load_features']    X_load = df[load_cols].values    # 3b. 时间特征（可选）    if config['time_features']:        time_features_df = extract_time_features(df, config['time_features'])        X_time = time_features_df.values        X = np.concatenate([X_load, X_time], axis=1)        print(f"特征组合 / Features: {len(load_cols)} 负载 + {len(config['time_features'])} 时间")    else:        X = X_load        print(f"使用负载特征 / Load features: {len(load_cols)}")    # 目标变量    y = df['OT'].values    # 4. 创建序列    X_seq, y_seq = create_sequences_with_offset(X, y, seq_length, offset)    print(f"序列创建完成 / Sequences created: {X_seq.shape}")    # 5. 分割训练/测试集    split_method = config['split_method']    if split_method == 'sequential':        X_train, X_test, y_train, y_test = split_sequential(            X_seq, y_seq, config['train_ratio']        )    elif split_method == 'random':        X_train, X_test, y_train, y_test = split_random_groups(            X_seq, y_seq, config['n_groups'], config['train_ratio']        )    elif split_method == 'label_random':        X_train, X_test, y_train, y_test = split_label_random(            X_seq, y_seq, config['train_ratio']        )    else:        raise ValueError(f"未知的分割方式: {split_method}")    print(f"数据分割完成 / Data split: Train {len(X_train)}, Test {len(X_test)}")    # 6. 归一化（先分割后归一化，避免数据泄露）    scaler_X = StandardScaler()    scaler_y = StandardScaler()    # 展平进行归一化    n_train, seq_len, n_feat = X_train.shape    n_test = X_test.shape[0]    X_train_flat = X_train.reshape(-1, n_feat)    X_test_flat = X_test.reshape(-1, n_feat)    X_train_scaled = scaler_X.fit_transform(X_train_flat).reshape(n_train, seq_len, n_feat)    X_test_scaled = scaler_X.transform(X_test_flat).reshape(n_test, seq_len, n_feat)    y_train_scaled = scaler_y.fit_transform(y_train.reshape(-1, 1)).ravel()    y_test_scaled = scaler_y.transform(y_test.reshape(-1, 1)).ravel()    print("归一化完成 / Normalization done")    # 7. 创建DataLoader    train_loader, test_loader = create_dataloaders(        X_train_scaled, y_train_scaled,        X_test_scaled, y_test_scaled,        config['batch_size']    )    # 8. 训练模型    print(f"\n开始训练 / Start training...\n")    model, history = train_model(        train_loader, test_loader,        input_size=n_feat,        seq_length=seq_length,        hidden_sizes=config['hidden_sizes'],        dropout=config['dropout'],        num_epochs=config['num_epochs'],        lr=config['learning_rate'],        patience=config['patience'],        device=config['device']    )    # 9. 评估模型    model.eval()    with torch.no_grad():        X_test_t = torch.FloatTensor(X_test_scaled).to(config['device'])        y_pred_scaled = model(X_test_t).cpu().numpy()    # 反归一化    y_pred = scaler_y.inverse_transform(y_pred_scaled).ravel()    # 计算指标    r2 = r2_score(y_test, y_pred)    mse = mean_squared_error(y_test, y_pred)    mae = mean_absolute_error(y_test, y_pred)    print(f"\n{'='*60}")    print(f"最终评估结果 / Final Evaluation:")    print(f"  R² Score: {r2:.6f}")    print(f"  MSE: {mse:.6f}")    print(f"  MAE: {mae:.6f}")    print(f"{'='*60}\n")    # 返回结果    results = {        'model': model,        'history': history,        'scalers': {'X': scaler_X, 'y': scaler_y},        'metrics': {'r2': r2, 'mse': mse, 'mae': mae},        'data': {            'X_train': X_train, 'X_test': X_test,            'y_train': y_train, 'y_test': y_test,            'y_pred': y_pred        },        'config': config    }    return resultsprint("主训练函数已定义 / Main training function defined")

## 使用示例 / Usage Examples### 示例1：使用默认配置训练```pythonresults = train_single_model()```### 示例2：自定义数据集和预测范围```pythonresults = train_single_model(    dataset_path='../dataset/train2.csv',    prediction_horizon='day')```### 示例3：使用label_random分割```pythonresults = train_single_model(split_method='label_random')```### 示例4：启用异常值处理```pythonresults = train_single_model(    remove_outliers=True,    outlier_threshold=3.0)```### 示例5：添加时间特征```pythonresults = train_single_model(    time_features=['hour', 'dayofweek', 'is_weekend'])```

In [None]:
# 示例：使用默认配置训练 / Example: Train with default configresults_default = train_single_model()

In [None]:
# 可视化训练历史 / Visualize Training Historyplot_training_history(results_default['history'], 'Linear Regression - Default Config')

In [None]:
# 可视化预测结果 / Visualize Predictionsplot_predictions(    results_default['data']['y_test'],    results_default['data']['y_pred'],    f"Linear Regression - {results_default['config']['prediction_horizon'].upper()}")

In [None]:
# 对比实验：三种分割方式 / Comparison: Three Split Methodsprint("\n" + "="*80)print("对比实验：三种数据分割方式对模型性能的影响")print("Comparison: Impact of three data split methods on model performance")print("="*80 + "\n")# 简化配置用于快速对比quick_config = DEFAULT_CONFIG.copy()quick_config['num_epochs'] = 30  # 减少epochs用于快速测试split_methods = ['sequential', 'random', 'label_random']comparison_results = {}for method in split_methods:    print(f"\n{'*'*60}")    print(f"测试分割方式 / Testing split method: {method}")    print(f"{'*'*60}")    results = train_single_model(        config=quick_config,        split_method=method    )    comparison_results[method] = results# 使用utils中的对比函数plot_comparison_summary(comparison_results)print("\n注意 / Note:")print("- sequential: 无数据泄露，最接近真实场景")print("- random: 低数据泄露风险（组内连续）")print("- label_random: 可能存在数据泄露（窗口重叠），R²可能虚高")