In [16]:
%run set_jupyter.py
%matplotlib inline 

Exception: File `'set_jupyter.py'` not found.

In [None]:
import torch
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from darts.utils.callbacks import TFMProgressBar
from torch.nn.modules.loss import MSELoss, CrossEntropyLoss
from pathlib import Path
# 回调函数
from pytorch_lightning.callbacks import Callback

from sklearn.metrics import mean_squared_error, precision_score 

from darts.models import TSMixerModel
from darts.metrics import mape, mse

# 自定义
from config import TIMESERIES_LENGTH # 测试和验证数据长度设置
from data_precessing.timeseries import prepare_timeseries_data  # 获取训练数据、验证数据和测试数据

# 全局参数

In [None]:
work_dir = Path("logs/tsmixer_logs").resolve() 
work_dir

# 读取数据

In [None]:
data = prepare_timeseries_data('training')

In [None]:
data.keys()

# 配置模型

In [None]:
import torch  
import torch.nn as nn  

class MAPELoss(nn.Module):  
    def __init__(self):  
        super(MAPELoss, self).__init__()  

    def forward(self, y_pred, y_true):  
        # 确保真实值不为零，以避免除零错误  
        epsilon = 1e-10  
        mape = torch.mean(torch.abs((y_true - y_pred) / (y_true + epsilon))) * 100  
        return mape 

In [None]:
# 可视化训练过程的损失值

class LossLogger(Callback):
    def __init__(self):
        self.train_loss = []
        self.val_loss = []

    # will automatically be called at the end of each epoch
    def on_train_epoch_end(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") -> None:
        self.train_loss.append(float(trainer.callback_metrics["train_loss"]))

    def on_validation_epoch_end(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") -> None:
        self.val_loss.append(float(trainer.callback_metrics["val_loss"]))

loss_logger = LossLogger()

In [None]:
 
class ModelParameters:  
    """模型参数"""  

    def __init__(  
            self,  
            input_chunk_length=104, #13  
            output_chunk_length=1,  
            batch_size=64,  
            full_training=True,
            hidden_size=120,
            ff_size=90,
            num_blocks=7,
            dropout=0.03
    ):  
        # 模型用于回溯的时间序列片段长度，即输入块长度  
        self.input_chunk_length = input_chunk_length  # lookback window  
        # 模型预测的时间序列长度，即输出块长度  
        self.output_chunk_length = output_chunk_length  # forecast/lookahead window  
        # 定义每个训练批次中的样本数量  
        self.batch_size = batch_size  
        # 随机数种子，用于结果复现  
        self.random_state = 42  
        # 工作目录，保存模型和输出的位置  
        self.work_dir = work_dir  

        # 第二个前馈层的隐藏层大小，决定模型容量  
        self.hidden_size = hidden_size  
        # 特征混合多层感知机（MLP）中第一前馈层的大小  
        self.ff_size = ff_size  
        # 模型中混合块的数量，包括首个块和所有后续块  
        self.num_blocks = num_blocks  
        # 在混合层中使用的激活函数名称。默认：”ReLU”  
        # 可选项包括“ReLU”, “RReLU”, “PReLU”, “ELU”, “Softplus”, “Tanh”, “SELU”, “LeakyReLU”, “Sigmoid”, “GELU”  
        self.activation = 'ReLU'  
        # Dropout正则化率，用于防止过拟合  
        self.dropout = dropout  
        # 使用的LayerNorm变体类型。默认：“LayerNorm”。  
        # 可选择“LayerNormNoBias”, “LayerNorm”, “TimeBatchNorm2d”。或者使用自定义的nn.Module。  
        self.norm_type = 'LayerNorm'  
        # 是否在进行其他层计算前进行归一化  
        self.normalize_before = False  
        # 是否使用静态协变量  
        self.use_static_covariates = True  
        # 是否使用可逆实例归一化RINorm来抵抗分布漂移，仅应用于目标序列特征，不适用于协变量  
        self.use_reversible_instance_norm = False 
        # 优化器参数配置  
        self.optimizer_kwargs = self.get_optimizer_kwargs()  
        # PyTorch Lightning Trainer参数  
        self.pl_trainer_kwargs = self.get_pl_trainer_kwargs(full_training)  
        # 学习率调度器类，使用指数学习率调度器  
        self.lr_scheduler_cls = torch.optim.lr_scheduler.ExponentialLR  
        # 学习率调度器的参数配置  
        self.lr_scheduler_kwargs = {"gamma": 0.999}  
        # 概率预测使用的似然函数，默认为无  
        self.likelihood = None  # use a `likelihood` for probabilistic forecasts  
        # 确定性模型使用的损失函数，默认为均方误差  
        loss_fn="binary_cross_entropy"
        # self.loss_fn = MSELoss()  # use a `loss_fn` for determinsitic model  
        # self.loss_fn = MAPELoss()
        # self.loss_fn = nn.L1Loss()
        # 是否保存最佳模型状态的检查点  
        self.save_checkpoints = True  # checkpoint to retrieve the best performing model state,  
        # 是否覆盖同名模型的检查点  
        self.force_reset = True  

    # 获取PyTorch Lightning的训练参数  
    def get_pl_trainer_kwargs(self, full_training):  
        # 提前停止：若验证损失在10个epoch内没有减少1e-6则停止  
        early_stopper = EarlyStopping(  
            monitor="val_loss",  
            patience=10,  
            min_delta=1e-6,  
            mode="min",  
        )  

        # PyTorch Lightning Trainer参数配置（可添加自定义回调）  
        if full_training:  
            limit_train_batches = None  
            limit_val_batches = None  
            max_epochs = 800  
            batch_size = 64  
        else:  
            limit_train_batches = 20  
            limit_val_batches = 10  
            max_epochs = 40  
            batch_size = 64  

        # 仅显示训练和预测进度条  
        progress_bar = TFMProgressBar(  
            enable_sanity_check_bar=False, enable_validation_bar=False  
        )  

        pl_trainer_kwargs = {  
            "gradient_clip_val": 1,  # 梯度剪裁，通过限制梯度的范围来稳定训练过程  
            "max_epochs": max_epochs,  
            "limit_train_batches": limit_train_batches,  
            "limit_val_batches": limit_val_batches,  
            "accelerator": "auto",  
            "callbacks": [early_stopper, progress_bar, loss_logger],  
        }  

        return pl_trainer_kwargs  

    # 获取优化器的参数配置  
    def get_optimizer_kwargs(self):  
        # 优化器配置，默认使用Adam  
        optimizer_kwargs = {  
            "lr": 1e-4,  
            # "optimizer_cls": torch.optim.Adam  
        }  
        return optimizer_kwargs

In [None]:
vars(ModelParameters())

In [None]:
model = TSMixerModel(
    **vars(ModelParameters()),
    model_name="tsm"
)

# 训练模型

In [None]:
model.fit(
    # 训练集
    series=data['train'],
    past_covariates=data['past_covariates'],
    future_covariates=data['future_covariates'],
    # 验证集
    val_series=data['val'],
    val_past_covariates=data['past_covariates'],
    val_future_covariates=data['future_covariates'],
)

In [None]:
plt.plot(loss_logger.train_loss, label='训练损失', color='red')
plt.plot(loss_logger.val_loss, label='验证损失', color='blue')
plt.legend()

# 测试模型

In [None]:
model = model.load_from_checkpoint(model_name='tsm', work_dir=work_dir)

In [None]:
# we will predict the next `pred_steps` points after the end of `pred_input`
pred_steps = TIMESERIES_LENGTH["test_length"]
pred_input = data["test"][:-pred_steps]

pred_series = model.predict(n=pred_steps, series=pred_input)

# 对预测结果进行二值化和展平 
true_labels = data["test"][-pred_steps:].values()  
true_labels = true_labels.astype(int).flatten()  # Flatten to 1D   
binary_predictions = pred_series.values() > 0.5  
binary_predictions = binary_predictions.astype(int).flatten()  

# 计算精确率  
precision = precision_score(true_labels, binary_predictions)  
precision

In [None]:
for i, stock in enumerate(data["test"].columns[:3]):
    data["test"][-pred_steps:].data_array().sel(component=stock).plot(label=f"{stock}_实际数据")
    pred_series.data_array().sel(component=stock).plot(label=f"{stock}_预测结果")
    # data['test'].slice_intersect(hfc).data_array().sel(component=stock).plot(label=f"{stock}_实际数据")
    plt.legend()
    plt.show()

# 超参调优

### 导入包

In [None]:
import optuna  
# import torch  
# from torch.nn import MSELoss  
from pytorch_lightning import Trainer, loggers as pl_loggers  
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint  
from sklearn.metrics import mean_squared_error  
# from darts import TimeSeries  
# from darts.models import TSMixerModel  
# from darts.utils.callbacks import TFMProgressBar  

from config import TIMESERIES_LENGTH

# 假设 train, val, past_cov_ts 和 future_cov_ts 四个时间序列已经定义好  


def objective(trial):  
    input_chunk_length = trial.suggest_int("input_chunk_length", 5, 128)  
    output_chunk_length = trial.suggest_int("output_chunk_length", 1, 8)  
    # batch_size = trial.suggest_int("batch_size", 32, 64)  
    hidden_size = trial.suggest_int("hidden_size", 64, 128) 
    ff_size  = trial.suggest_int("ff_size", 32, 128) 
    dropout = trial.suggest_float("dropout", 0, 0.5)  
    num_blocks = trial.suggest_int("num_blocks", 1, 29)     

    params = ModelParameters(  
        input_chunk_length=input_chunk_length,  
        output_chunk_length=output_chunk_length,  
        # batch_size=batch_size,  
        hidden_size=hidden_size, 
        ff_size = ff_size,
        num_blocks=num_blocks,  
        dropout=dropout,  
        full_training=False  
    )  

    logger = pl_loggers.TensorBoardLogger("/data/tb_logs", name="TSMixerModel")  

    model_checkpoint = ModelCheckpoint(  
        monitor='val_loss',  
        dirpath=f"/tata/optuna/trial_{trial.number}_checkpoints",  
        filename='{epoch}-{val_loss:.2f}',  
        save_top_k=1,  
        mode='min'  
    )  

    trainer = Trainer(  
        logger=logger,  
        callbacks=[model_checkpoint, TFMProgressBar()],  
        max_epochs=params.pl_trainer_kwargs['max_epochs'],  
        limit_train_batches=params.pl_trainer_kwargs['limit_train_batches'],  
        limit_val_batches=params.pl_trainer_kwargs['limit_val_batches'],  
        accelerator=params.pl_trainer_kwargs['accelerator']  
    )  

    model = TSMixerModel(**vars(params), model_name="tsm")  

    # 训练模型  
    model.fit( 

        series=data['train'],
        past_covariates=data['past_covariates'],
        future_covariates=data['future_covariates'],
        # 验证集
        val_series=data['val'],
        val_past_covariates=data['past_covariates'],
        val_future_covariates=data['future_covariates'],
          
        verbose=True,  
        trainer=trainer  # Now use the trainer  
    )  

    best_model_path = model_checkpoint.best_model_path  
    trial.set_user_attr('best_model_path', best_model_path)  

    # we will predict the next `pred_steps` points after the end of `pred_input`
    pred_steps = TIMESERIES_LENGTH["test_length"]
    pred_input = data["test"][:-pred_steps]
    
    pred_series = model.predict(n=pred_steps, series=pred_input)
    
    # 对预测结果进行二值化和展平 
    true_labels = data["test"][-pred_steps:].values()  
    true_labels = true_labels.astype(int).flatten()  # Flatten to 1D   
    binary_predictions = pred_series.values() > 0.5  
    binary_predictions = binary_predictions.astype(int).flatten()  
    
    # 计算精确率  
    precision = precision_score(true_labels, binary_predictions)  

    return -precision 



In [None]:
study = optuna.create_study(
    study_name='tsmixer',   # 变化
    direction="minimize",  
    storage="sqlite:///data/optuna/assoptuna_study.db",  
    load_if_exists=True  
)  


In [None]:
study.optimize(objective, n_trials=100, callbacks=[])  

In [None]:
print("Best trial:")  
trial = study.best_trial  
print("  Value: ", trial.value)  
print("  Params: ")  
for key, value in trial.params.items():  
    print(f"    {key}: {value}")  

best_model_path = trial.user_attrs['best_model_path']  
print(f"Best model path: {best_model_path}")