# TimesNet 教程
**安装说明：** 本笔记本演示了 `TimesNet` 支持的学习任务。
#
`TimesNet` 基本上支持五类任务，分别是：长期预测、短期预测、缺失值填补、异常检测、分类。


### 1. 安装 Python 3.8。为了方便，执行以下命令。

In [None]:
pip
install - r
requirements.txt


### 2. 导入包

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.fft
from layers.Embed import DataEmbedding
from layers.Conv_Blocks import Inception_Block_V1


# 用于对二维时序数据进行卷积的卷积块，可根据需要更换


### 3. TimesBlock 构建
 `TimesNet` 的核心思想体现在 `TimesBlock` 的构建上：先对数据做 FFT 得到基频，然后基于前 k 个主要基频，将一维时序重塑为 2D 矩阵，接着做二维卷积，最后再将输出变回一维并加权得到最终结果。
#
 接下来我们详细看下 `TimesBlock`。
#
 TimesBlock 包含两个成员函数。

In [None]:
class TimesBlock(nn.Module):
    def __init__(self, configs):
        ...

    def forward(self, x):
        ...



先看 `__init__(self, configs):`

In [None]:
def __init__(self, configs):  ## configs 为 TimesBlock 的配置参数
    super(TimesBlock, self).__init__()
    self.seq_len = configs.seq_len  ## 输入序列长度
    self.pred_len = configs.pred_len  ## 预测序列长度
    self.k = configs.top_k  ## top_k 表示要考虑的主要频率个数
    # 参数高效设计：使用两层 Inception 卷积 + GELU
    self.conv = nn.Sequential(
        Inception_Block_V1(configs.d_model, configs.d_ff,
                           num_kernels=configs.num_kernels),
        nn.GELU(),
        Inception_Block_V1(configs.d_ff, configs.d_model,
                           num_kernels=configs.num_kernels)
    )



再看 `forward(self, x)`

In [None]:
def forward(self, x):
    B, T, N = x.size()
    # B: 批大小  T: 序列长度  N: 特征维度
    period_list, period_weight = FFT_for_Period(x, self.k)
    # FFT_for_Period() 稍后给出。period_list([top_k]) 是前 k 个显著周期，
    # period_weight([B, top_k]) 是对应的幅值权重

    res = []
    for i in range(self.k):
        period = period_list[i]

        # padding：为了构造 2D 矩阵，需保证 (seq_len + pred_len) 能被 period 整除
        if (self.seq_len + self.pred_len) % period != 0:
            length = (((self.seq_len + self.pred_len) // period) + 1) * period
            padding = torch.zeros([x.shape[0],
                                   length - (self.seq_len + self.pred_len),
                                   x.shape[2]]).to(x.device)
            out = torch.cat([x, padding], dim=1)
        else:
            length = (self.seq_len + self.pred_len)
            out = x

        # reshape：切分并调整维度，准备做 2D 卷积
        # 先 view 为 [B, length//period, period, N]，再 permute 到 [B, N, rows, cols]
        out = out.reshape(B, length // period, period,
                          N).permute(0, 3, 1, 2).contiguous()

        # 2D 卷积，提取周期内与周期间的时序信息
        out = self.conv(out)

        # reshape 回一维，先 permute 再 reshape
        out = out.permute(0, 2, 3, 1).reshape(B, -1, N)

        # 去掉 padding 部分，加入结果列表
        res.append(out[:, :(self.seq_len + self.pred_len), :])
    # 拼成 [B, length, N, top_k]
    res = torch.stack(res, dim=-1)

    # 自适应加权聚合
    # softmax 得到归一化权重 [B, top_k]
    period_weight = F.softmax(period_weight, dim=1)
    # unsqueeze 并 repeat 到 [B, T, N, top_k]
    period_weight = period_weight.unsqueeze(1).unsqueeze(1).repeat(1, T, N, 1)
    # 加权求和得到本层输出
    res = torch.sum(res * period_weight, -1)

    # 残差连接
    res = res + x
    return res



上面提到的 `FFT_for_Period` 定义如下：

In [None]:
def FFT_for_Period(x, k=2):
    # x: [B, T, C]
    # 计算 rFFT，得到频域表示 [B, freq, C]
    xf = torch.fft.rfft(x, dim=1)

    # 假设周期特征在各批次和各通道上相对一致，先对 B,C 两维求平均，得到 [T]
    frequency_list = abs(xf).mean(0).mean(-1)
    frequency_list[0] = 0  # 忽略直流分量

    # 取前 k 大频率索引
    _, top_list = torch.topk(frequency_list, k)

    # detach 后转到 CPU 再转 numpy
    top_list = top_list.detach().cpu().numpy()

    # 周期列表 [top_k]：原序列长度 // 对应主频索引
    period = x.shape[1] // top_list

    # 返回周期列表，以及每个 batch 上对应主频幅值 [B, top_k]
    return period, abs(xf).mean(-1)[:, top_list]



为了便于理解，下面给出示意图。
#
![FFT 说明示意图](./fft.png)
#
![2D 卷积说明示意图](./conv.png)


更多细节请参阅论文（链接：https://openreview.net/pdf?id=ju_Uqw384Oq）


### 4. TimesNet
#
到此，我们已经得到了擅长提取周期内与周期间信息的 `TimesBlock`，可以进一步构建多任务通用的 `TimesNet`，完成短期/长期预测、缺失值填补、分类、异常检测等任务。
#
接下来详细介绍 `TimesNet` 如何在各任务中发挥作用。

In [None]:
class Model(nn.Module):
    def __init__(self, configs):
        ...

    def forecast(self, x_enc, x_mark_enc, x_dec, x_mark_dec):
        ...

    def imputation(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask):
        ...

    def anomaly_detection(self, x_enc):
        ...

    def classification(self, x_enc, x_mark_enc):
        ...

    def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):
        ...



先看 `__init__(self, configs):`

In [None]:
def __init__(self, configs):
    super(Model, self).__init__()
    # 参数初始化
    self.configs = configs
    self.task_name = configs.task_name
    self.seq_len = configs.seq_len
    self.label_len = configs.label_len
    self.pred_len = configs.pred_len

    # 主干：堆叠 e_layers 个 TimesBlock
    self.model = nn.ModuleList([TimesBlock(configs)
                                for _ in range(configs.e_layers)])

    # embedding 与归一化
    # enc_in: 编码器输入维度；d_model: 嵌入后的维度
    self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,
                                       configs.dropout)
    self.layer = configs.e_layers  # 编码层数
    self.layer_norm = nn.LayerNorm(configs.d_model)

    # 根据任务定义不同头部
    if self.task_name in ['long_term_forecast', 'short_term_forecast']:
        # 先做线性映射再投影到输出通道
        self.predict_linear = nn.Linear(
            self.seq_len, self.pred_len + self.seq_len)
        self.projection = nn.Linear(
            configs.d_model, configs.c_out, bias=True)
    if self.task_name in ['imputation', 'anomaly_detection']:
        self.projection = nn.Linear(
            configs.d_model, configs.c_out, bias=True)
    if self.task_name == 'classification':
        self.act = F.gelu
        self.dropout = nn.Dropout(configs.dropout)
        self.projection = nn.Linear(
            configs.d_model * configs.seq_len, configs.num_class)



#### 4.1 Forecast
#
预测任务思路：将已知序列长度扩展为 seq_len+pred_len，总长度过 TimesBlock 提取特征后，投影到输出空间，再做去归一化得到最终结果。

In [None]:
def forecast(self, x_enc, x_mark_enc, x_dec, x_mark_dec):
    # 非平稳 Transformer 的时序归一化
    means = x_enc.mean(1, keepdim=True).detach()  # [B,1,C]
    x_enc = x_enc - means
    stdev = torch.sqrt(
        torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5)
    x_enc /= stdev

    # embedding：数值映射到 d_model 维向量
    enc_out = self.enc_embedding(x_enc, x_mark_enc)  # [B,T,C]
    # 线性映射扩展到 [B, seq_len+pred_len, C]
    enc_out = self.predict_linear(enc_out.permute(0, 2, 1)).permute(
        0, 2, 1)

    # TimesNet 主体：多层 TimesBlock + LayerNorm
    for i in range(self.layer):
        enc_out = self.layer_norm(self.model[i](enc_out))

    # 投影到输出通道 [B,T,C_out]
    dec_out = self.projection(enc_out)

    # 去归一化
    dec_out = dec_out * \
              (stdev[:, 0, :].unsqueeze(1).repeat(
                  1, self.pred_len + self.seq_len, 1))
    dec_out = dec_out + \
              (means[:, 0, :].unsqueeze(1).repeat(
                  1, self.pred_len + self.seq_len, 1))
    return dec_out



#### 4.2 Imputation（缺失值填补）
#
与预测类似，不过只在已知序列中填补缺失值。

In [None]:
def imputation(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask):
    # 非平稳 Transformer 归一化（考虑 mask）
    means = torch.sum(x_enc, dim=1) / torch.sum(mask == 1, dim=1)
    means = means.unsqueeze(1).detach()
    x_enc = x_enc - means
    x_enc = x_enc.masked_fill(mask == 0, 0)
    stdev = torch.sqrt(torch.sum(x_enc * x_enc, dim=1) /
                       torch.sum(mask == 1, dim=1) + 1e-5)
    stdev = stdev.unsqueeze(1).detach()
    x_enc /= stdev

    # embedding
    enc_out = self.enc_embedding(x_enc, x_mark_enc)  # [B,T,C]
    # TimesNet
    for i in range(self.layer):
        enc_out = self.layer_norm(self.model[i](enc_out))
    # 投影
    dec_out = self.projection(enc_out)

    # 去归一化
    dec_out = dec_out * \
              (stdev[:, 0, :].unsqueeze(1).repeat(
                  1, self.pred_len + self.seq_len, 1))
    dec_out = dec_out + \
              (means[:, 0, :].unsqueeze(1).repeat(
                  1, self.pred_len + self.seq_len, 1))
    return dec_out



#### 4.3 异常检测
#
与缺失值填补类似。

In [None]:
def anomaly_detection(self, x_enc):
    # 非平稳 Transformer 归一化
    means = x_enc.mean(1, keepdim=True).detach()
    x_enc = x_enc - means
    stdev = torch.sqrt(
        torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5)
    x_enc /= stdev
    # embedding
    enc_out = self.enc_embedding(x_enc, None)  # [B,T,C]
    # TimesNet
    for i in range(self.layer):
        enc_out = self.layer_norm(self.model[i](enc_out))
    # 投影
    dec_out = self.projection(enc_out)
    # 去归一化
    dec_out = dec_out * \
              (stdev[:, 0, :].unsqueeze(1).repeat(
                  1, self.pred_len + self.seq_len, 1))
    dec_out = dec_out + \
              (means[:, 0, :].unsqueeze(1).repeat(
                  1, self.pred_len + self.seq_len, 1))
    return dec_out



#### 4.4 分类

In [None]:
def classification(self, x_enc, x_mark_enc):
    # embedding
    enc_out = self.enc_embedding(x_enc, None)  # [B,T,C]
    # TimesNet
    for i in range(self.layer):
        enc_out = self.layer_norm(self.model[i](enc_out))

    # 输出：先激活、再 dropout
    output = self.act(enc_out)
    output = self.dropout(output)

    # 将 padding 位置的 embedding 置零，帮助模型关注有效数据
    output = output * x_mark_enc.unsqueeze(-1)

    # 展平后投影到类别数
    output = output.reshape(output.shape[0], -1)
    output = self.projection(output)  # [B, num_classes]
    return output



`forward` 根据任务类型调用不同方法：

In [None]:
def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):
    if self.task_name in ['long_term_forecast', 'short_term_forecast']:
        dec_out = self.forecast(x_enc, x_mark_enc, x_dec, x_mark_dec)
        return dec_out[:, -self.pred_len:, :]  # 只返回预测部分
    if self.task_name == 'imputation':
        dec_out = self.imputation(x_enc, x_mark_enc, x_dec, x_mark_dec, mask)
        return dec_out  # 返回完整序列（含填补）
    if self.task_name == 'anomaly_detection':
        dec_out = self.anomaly_detection(x_enc)
        return dec_out  # 返回修正后序列
    if self.task_name == 'classification':
        dec_out = self.classification(x_enc, x_mark_enc)
        return dec_out  # 返回分类结果
    return None



### 5. 训练与设置
#
到目前为止我们已经构建好 `TimesNet`，接下来讨论如何训练和测试。`exp` 目录下聚合了各任务的训练、验证、测试逻辑。这里以长期预测任务为例说明训练流程，其它任务大同小异。
#
#### 5.1 长期预测任务的训练
#
训练过程可分为：数据准备、保存路径创建、初始化、优化器与损失函数选择、混合精度训练、训练循环、验证与早停、学习率调整、加载最佳模型等步骤。  
以下代码节选自 `class Exp_Long_Term_Forecast` 的 `train` 方法。

In [None]:
import os


def train(self, setting):  # setting 是此次训练的参数配置名称
    # 获取训练/验证/测试的数据和加载器
    train_data, train_loader = self._get_data(flag='train')
    vali_data, vali_loader = self._get_data(flag='val')
    test_data, test_loader = self._get_data(flag='test')

    # 设置模型检查点保存路径
    path = os.path.join(self.args.checkpoints, setting)
    if not os.path.exists(path):
        os.makedirs(path)
    time_now = time.time()

    train_steps = len(train_loader)

    # EarlyStopping: 根据验证损失判断是否提前停止训练
    early_stopping = EarlyStopping(patience=self.args.patience, verbose=True)

    # 选择优化器和损失函数
    model_optim = self._select_optimizer()
    criterion = self._select_criterion()

    # AMP 混合精度训练
    if self.args.use_amp:
        scaler = torch.cuda.amp.GradScaler()
    for epoch in range(self.args.train_epochs):
        iter_count = 0
        train_loss = []
        self.model.train()
        epoch_time = time.time()

        # 开始本轮训练
        for i, (batch_x, batch_y, batch_x_mark, batch_y_mark) in enumerate(train_loader):
            iter_count += 1
            model_optim.zero_grad()
            batch_x = batch_x.float().to(self.device)  # 输入特征
            batch_y = batch_y.float().to(self.device)  # 目标特征

            # _mark 包含时间特征信息
            batch_x_mark = batch_x_mark.float().to(self.device)
            batch_y_mark = batch_y_mark.float().to(self.device)
            # 构造解码器输入（TimesNet 不使用 attention，所以可以全 0 拼接）
            dec_inp = torch.zeros_like(batch_y[:, -self.args.pred_len:, :]).float()
            dec_inp = torch.cat([batch_y[:, :self.args.label_len, :], dec_inp], dim=1).float().to(self.device)
            # 前向与反向
            if self.args.use_amp:
                with torch.cuda.amp.autocast():
                    if self.args.output_attention:
                        outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
                    else:
                        outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)

                    # MS 模式下只取最后一列
                    f_dim = -1 if self.args.features == 'MS' else 0
                    outputs = outputs[:, -self.args.pred_len:, f_dim:]
                    batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device)

                    loss = criterion(outputs, batch_y)
                    train_loss.append(loss.item())
            else:
                if self.args.output_attention:
                    outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
                else:
                    outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)
                f_dim = -1 if self.args.features == 'MS' else 0
                outputs = outputs[:, -self.args.pred_len:, f_dim:]
                batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device)
                loss = criterion(outputs, batch_y)
                train_loss.append(loss.item())

            # 每 100 步打印一次训练进度、损失、速度估计等
            if (i + 1) % 100 == 0:
                print("\titers: {0}, epoch: {1} | loss: {2:.7f}".format(i + 1, epoch + 1, loss.item()))
                speed = (time.time() - time_now) / iter_count
                left_time = speed * ((self.args.train_epochs - epoch) * train_steps - i)
                print('\tspeed: {:.4f}s/iter; left time: {:.4f}s'.format(speed, left_time))
                iter_count = 0
                time_now = time.time()

            # 反向传播
            if self.args.use_amp:
                scaler.scale(loss).backward()
                scaler.step(model_optim)
                scaler.update()
            else:
                loss.backward()
                model_optim.step()

        # 本轮结束，打印耗时
        print("Epoch: {} cost time: {}".format(epoch + 1, time.time() - epoch_time))
        train_loss = np.average(train_loss)

        # 在验证集和测试集上评估
        vali_loss = self.vali(vali_data, vali_loader, criterion)
        test_loss = self.vali(test_data, test_loader, criterion)

        print("Epoch: {0}, Steps: {1} | Train Loss: {2:.7f} Vali Loss: {3:.7f} Test Loss: {4:.7f}".format(
            epoch + 1, train_steps, train_loss, vali_loss, test_loss))

        # 早停判断
        early_stopping(vali_loss, self.model, path)
        if early_stopping.early_stop:
            print("Early stopping")
            break

        # 调整学习率
        adjust_learning_rate(model_optim, epoch + 1, self.args)
    best_model_path = path + '/' + 'checkpoint.pth'

    # 加载训练得到的最佳模型
    self.model.load_state_dict(torch.load(best_model_path))
    return self.model



#### 5.2 早停机制
#
`EarlyStopping` 会监控验证损失，当连续若干次验证损失不下降时停止训练，避免过拟合。以下为 `tools.py` 中的实现。

In [None]:
class EarlyStopping:
    def __init__(self, patience=7, verbose=False, delta=0):
        self.patience = patience  # 容忍验证损失不下降的轮数
        self.verbose = verbose  # 是否打印信息
        self.counter = 0  # 当前连续未下降次数
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta

    def __call__(self, val_loss, model, path):
        score = -val_loss
        if self.best_score is None:
            # 首次记录
            self.best_score = score
            self.save_checkpoint(val_loss, model, path)
        elif score < self.best_score + self.delta:
            # 验证损失未提升
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                # 达到容忍上限
                self.early_stop = True
        else:
            # 验证损失提升，保存模型并重置计数
            self.best_score = score
            self.save_checkpoint(val_loss, model, path)
            self.counter = 0

    def save_checkpoint(self, val_loss, model, path):
        # 保存当前最佳模型
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), path + '/' + 'checkpoint.pth')
        self.val_loss_min = val_loss



#### 5.3 优化器与损失函数
#
在 `Exp_Long_Term_Forecast` 类中由 `_select_optimizer()` 和 `_select_criterion()` 定义。长期预测任务中使用 Adam 优化器和 MSELoss。

In [None]:
def _select_optimizer(self):
    model_optim = optim.Adam(self.model.parameters(), lr=self.args.learning_rate)
    return model_optim


def _select_criterion(self):
    criterion = nn.MSELoss()
    return criterion



#### 5.4 自动混合精度（AMP）
#
AMP 可在保持数值稳定性的同时加速训练、节省显存。主要用到 `torch.cuda.amp.autocast()` 和 `GradScaler`。

In [None]:
# 在前向过程中：
with torch.cuda.amp.autocast():
    ...

# 在反向过程中：
if self.args.use_amp:
    scaler.scale(loss).backward()
    scaler.step(model_optim)
    scaler.update()



#### 5.5 学习率调整
#
手动或按规则调整学习率，见 `tools.py` 中的 `adjust_learning_rate`。

In [None]:
def adjust_learning_rate(optimizer, epoch, args):
    # 第一种：按指数衰减
    if args.lradj == 'type1':
        lr_adjust = {epoch: args.learning_rate * (0.5 ** ((epoch - 1) // 1))}
    # 第二种：手动指定轮次
    elif args.lradj == 'type2':
        lr_adjust = {
            2: 5e-5, 4: 1e-5, 6: 5e-6, 8: 1e-6,
            10: 5e-7, 15: 1e-7, 20: 5e-8
        }
    # 按需更新
    if epoch in lr_adjust:
        lr = lr_adjust[epoch]
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr
        print('Updating learning rate to {}'.format(lr))



### 6. 验证与测试
#
验证可以评估模型的泛化能力，检测过拟合，并采用早停或调整超参等策略。下面以长期预测为例。

In [None]:
def vali(self, vali_data, vali_loader, criterion):
    total_loss = []
    self.model.eval()
    with torch.no_grad():
        for i, (batch_x, batch_y, batch_x_mark, batch_y_mark) in enumerate(vali_loader):
            batch_x = batch_x.float().to(self.device)
            batch_y = batch_y.float()
            batch_x_mark = batch_x_mark.float().to(self.device)
            batch_y_mark = batch_y_mark.float().to(self.device)
            # 构造解码器输入
            dec_inp = torch.zeros_like(batch_y[:, -self.args.pred_len:, :]).float()
            dec_inp = torch.cat([batch_y[:, :self.args.label_len, :], dec_inp], dim=1).float().to(self.device)
            if self.args.use_amp:
                with torch.cuda.amp.autocast():
                    if self.args.output_attention:
                        outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
                    else:
                        outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)
            else:
                if self.args.output_attention:
                    outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
                else:
                    outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)
            f_dim = -1 if self.args.features == 'MS' else 0
            outputs = outputs[:, -self.args.pred_len:, f_dim:]
            batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device)
            pred = outputs.detach().cpu()
            true = batch_y.detach().cpu()
            loss = criterion(pred, true)
            total_loss.append(loss)
    total_loss = np.average(total_loss)
    self.model.train()
    return total_loss



测试过程类似验证，但通常会额外可视化结果。

In [None]:
import matplotlib.pyplot as plt


def visual(true, preds=None, name='./pic/test.pdf'):
    """
    结果可视化
    """
    plt.figure()
    plt.plot(true, label='GroundTruth', linewidth=2)
    if preds is not None:
        plt.plot(preds, label='Prediction', linewidth=2)
    plt.legend()
    plt.savefig(name, bbox_inches='tight')


def test(self, setting, test=0):
    test_data, test_loader = self._get_data(flag='test')
    if test:
        print('加载模型')
        self.model.load_state_dict(torch.load(os.path.join('./checkpoints/' + setting, 'checkpoint.pth')))

    preds = []
    trues = []
    folder_path = './test_results/' + setting + '/'
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    self.model.eval()
    with torch.no_grad():
        for i, (batch_x, batch_y, batch_x_mark, batch_y_mark) in enumerate(test_loader):
            batch_x = batch_x.float().to(self.device)
            batch_y = batch_y.float().to(self.device)
            batch_x_mark = batch_x_mark.float().to(self.device)
            batch_y_mark = batch_y_mark.float().to(self.device)
            # 构造解码器输入
            dec_inp = torch.zeros_like(batch_y[:, -self.args.pred_len:, :]).float()
            dec_inp = torch.cat([batch_y[:, :self.args.label_len, :], dec_inp], dim=1).float().to(self.device)
            # 前向
            if self.args.use_amp:
                with torch.cuda.amp.autocast():
                    if self.args.output_attention:
                        outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
                    else:
                        outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)
            else:
                if self.args.output_attention:
                    outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
                else:
                    outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)

            f_dim = -1 if self.args.features == 'MS' else 0
            outputs = outputs[:, -self.args.pred_len:, f_dim:]
            batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device)
            outputs = outputs.detach().cpu().numpy()
            batch_y = batch_y.detach().cpu().numpy()

            # 若做过归一化，反归一化
            if test_data.scale and self.args.inverse:
                outputs = test_data.inverse_transform(outputs)
                batch_y = test_data.inverse_transform(batch_y)

            pred = outputs
            true = batch_y
            preds.append(pred)
            trues.append(true)

            # 每 20 个 batch 可视化一次
            if i % 20 == 0:
                input = batch_x.detach().cpu().numpy()
                gt = np.concatenate((input[0, :, -1], true[0, :, -1]), axis=0)
                pd = np.concatenate((input[0, :, -1], pred[0, :, -1]), axis=0)
                visual(gt, pd, os.path.join(folder_path, str(i) + '.pdf'))

    preds = np.array(preds)
    trues = np.array(trues)  # [batch_num, batch_size, pred_len, features]
    print('test shape:', preds.shape, trues.shape)
    preds = preds.reshape(-1, preds.shape[-2], preds.shape[-1])
    trues = trues.reshape(-1, trues.shape[-2], trues.shape[-1])
    print('test shape:', preds.shape, trues.shape)

    # 保存结果
    folder_path = './results/' + setting + '/'
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    mae, mse, rmse, mape, mspe = metric(preds, trues)
    print('mse:{}, mae:{}'.format(mse, mae))
    with open("result_long_term_forecast.txt", 'a') as f:
        f.write(setting + "  \n")
        f.write('mse:{}, mae:{}'.format(mse, mae))
        f.write('\n\n')

    np.save(folder_path + 'metrics.npy', np.array([mae, mse, rmse, mape, mspe]))
    np.save(folder_path + 'pred.npy', preds)
    np.save(folder_path + 'true.npy', trues)



### 7. Dataloader 与 DataProvider
#
训练时通过 `self._get_data(flag)` 获取数据，下面看其实现：

In [None]:
def _get_data(self, flag):
    data_set, data_loader = data_provider(self.args, flag)
    return data_set, data_loader



`data_provider(self.args, flag)` 定义在 `data_factory.py`：

In [None]:
data_dict = {
    'ETTh1': Dataset_ETT_hour,
    'ETTh2': Dataset_ETT_hour,
    'ETTm1': Dataset_ETT_minute,
    'ETTm2': Dataset_ETT_minute,
    'custom': Dataset_Custom,
    'm4': Dataset_M4,
    'PSM': PSMSegLoader,
    'MSL': MSLSegLoader,
    'SMAP': SMAPSegLoader,
    'SMD': SMDSegLoader,
    'SWAT': SWATSegLoader,
    'UEA': UEAloader
}


def data_provider(args, flag):
    Data = data_dict[args.data]  # 根据数据名称选择 Dataset
    timeenc = 0 if args.embed != 'timeF' else 1  # 时间特征编码方式

    if flag == 'test':
        shuffle_flag = False
        drop_last = True
        # 异常检测 / 分类任务测试时可批量处理
        if args.task_name in ['anomaly_detection', 'classification']:
            batch_size = args.batch_size
        else:
            batch_size = 1  # 评估阶段 bsz=1
        freq = args.freq
    else:
        shuffle_flag = True
        drop_last = True
        batch_size = args.batch_size  # 训练/验证批大小
        freq = args.freq

    if args.task_name == 'anomaly_detection':
        drop_last = False
        data_set = Data(
            root_path=args.root_path,  # 数据文件根路径
            win_size=args.seq_len,  # 输入序列长度
            flag=flag,
        )
        print(flag, len(data_set))
        data_loader = DataLoader(
            data_set,
            batch_size=batch_size,
            shuffle=shuffle_flag,
            num_workers=args.num_workers,
            drop_last=drop_last)
        return data_set, data_loader

    elif args.task_name == 'classification':
        drop_last = False
        data_set = Data(
            root_path=args.root_path,
            flag=flag,
        )
        data_loader = DataLoader(
            data_set,
            batch_size=batch_size,
            shuffle=shuffle_flag,
            num_workers=args.num_workers,
            drop_last=drop_last,
            collate_fn=lambda x: collate_fn(x, max_len=args.seq_len)
        )
        return data_set, data_loader
    else:
        if args.data == 'm4':
            drop_last = False
        data_set = Data(
            root_path=args.root_path,  # 如 ./data/ETT/
            data_path=args.data_path,  # 如 ETTh1.csv
            flag=flag,
            size=[args.seq_len, args.label_len, args.pred_len],
            features=args.features,  # 预测模式：M/S/MS
            target=args.target,  # S 或 MS 任务下的目标列
            timeenc=timeenc,
            freq=freq,
            seasonal_patterns=args.seasonal_patterns
        )
        print(flag, len(data_set))
        data_loader = DataLoader(
            data_set,
            batch_size=batch_size,
            shuffle=shuffle_flag,
            num_workers=args.num_workers,
            drop_last=drop_last)
        return data_set, data_loader



上述 `data_provider` 根据任务和模式，将原始数据集封装为可迭代的 DataLoader。  
接下来示例 `data_loader.py` 中的 `Dataset_ETT_hour`：

In [None]:
class Dataset_ETT_hour(Dataset):
    def __init__(self, root_path, flag='train', size=None,
                 features='S', data_path='ETTh1.csv',
                 target='OT', scale=True, timeenc=0, freq='h', seasonal_patterns=None):
        ...

    def __read_data__(self):
        ...

    def __getitem__(self, index):
        ...

    def __len__(self):
        ...

    def inverse_transform(self, data):
        ...



`__init__()` 初始化各参数并调用 `__read_data__()` 加载数据。

In [None]:
def __init__(self, root_path, flag='train', size=None,
             features='S', data_path='ETTh1.csv',
             target='OT', scale=True, timeenc=0, freq='h', seasonal_patterns=None):
    # size = [seq_len, label_len, pred_len]
    if size is None:
        self.seq_len = 24 * 4 * 4
        self.label_len = 24 * 4
        self.pred_len = 24 * 4
    else:
        self.seq_len, self.label_len, self.pred_len = size
    assert flag in ['train', 'test', 'val']
    type_map = {'train': 0, 'val': 1, 'test': 2}
    self.set_type = type_map[flag]
    self.features = features
    self.target = target
    self.scale = scale
    self.timeenc = timeenc
    self.freq = freq
    self.root_path = root_path
    self.data_path = data_path

    # 调用数据读取方法
    self.__read_data__()



`__read_data__()` 将原始 CSV 划分为训练/验证/测试集，筛选列并做归一化和时间特征编码。
>>
def __read_data__(self):
    self.scaler = StandardScaler()

    # 从文件读取原始数据
    df_raw = pd.read_csv(os.path.join(self.root_path, self.data_path))

    # 定义各集边界
    border1s = [0, 12 * 30 * 24 - self.seq_len, 12 * 30 * 24 + 4 * 30 * 24 - self.seq_len]
    border2s = [12 * 30 * 24, 12 * 30 * 24 + 4 * 30 * 24, 12 * 30 * 24 + 8 * 30 * 24]
    border1 = border1s[self.set_type]
    border2 = border2s[self.set_type]

    # 选择特征列
    if self.features in ['M', 'MS']:
        cols_data = df_raw.columns[1:]  # 去除日期列
        df_data = df_raw[cols_data]
    elif self.features == 'S':
        df_data = df_raw[[self.target]]

    # 归一化
    if self.scale:
        train_data = df_data[border1s[0]:border2s[0]]
        self.scaler.fit(train_data.values)
        data = self.scaler.transform(df_data.values)
    else:
        data = df_data.values

        # 提取时间戳
    df_stamp = df_raw[['date']][border1:border2]
    df_stamp['date'] = pd.to_datetime(df_stamp.date)

    if self.timeenc == 0:
        # 固定时间特征：月、日、周、时
        df_stamp['month'] = df_stamp.date.apply(lambda row: row.month, 1)
        df_stamp['day'] = df_stamp.date.apply(lambda row: row.day, 1)
        df_stamp['weekday'] = df_stamp.date.apply(lambda row: row.weekday(), 1)
        df_stamp['hour'] = df_stamp.date.apply(lambda row: row.hour, 1)
        data_stamp = df_stamp.drop(['date'], axis=1).values
    elif self.timeenc == 1:
        # timeF 编码：按 freq 生成归一化时间特征
        data_stamp = time_features(pd.to_datetime(df_stamp['date'].values), freq=self.freq)
        data_stamp = data_stamp.transpose(1, 0)

    # 保存处理后数据
    self.data_x = data[border1:border2]
    self.data_y = data[border1:border2]
    self.data_stamp = data_stamp



实现 `__getitem__` 与 `__len__`
>>
def __getitem__(self, index):
    # 计算输入、输出区间
    s_begin = index
    s_end = s_begin + self.seq_len
    r_begin = s_end - self.label_len
    r_end = r_begin + self.label_len + self.pred_len

    # 切分序列和时间标记
    seq_x = self.data_x[s_begin:s_end]
    seq_y = self.data_y[r_begin:r_end]
    seq_x_mark = self.data_stamp[s_begin:s_end]
    seq_y_mark = self.data_stamp[r_begin:r_end]

    return seq_x, seq_y, seq_x_mark, seq_y_mark


def __len__(self):
    return len(self.data_x) - self.seq_len - self.pred_len + 1



若需反归一化，可调用：
>>
def inverse_transform(self, data):
    return self.scaler.inverse_transform(data)



至此完成数据集与 DataLoader 的构建。下面展示一些常用数据集示例图。
#
![常用时序数据集](./dataset.png)


### 8. 运行实验与可视化结果
#
构建好数据和模型后，在 shell 脚本中运行 `run.py` 并传入参数。以下以 ETTh1 数据集的长期预测为例。
>>
model_name = TimesNet

python - u
run.py \
- -task_name
long_term_forecast \
- -is_training
1 \
- -root_path. / dataset / ETT - small / \
--data_path
ETTh1.csv \
- -model_id
ETTh1_96_96 \
- -model $model_name \
          - -data
ETTh1 \
- -features
M \
- -seq_len
96 \
- -label_len
48 \
- -pred_len
96 \
- -e_layers
2 \
- -d_layers
1 \
- -factor
3 \
- -enc_in
7 \
- -dec_in
7 \
- -c_out
7 \
- -d_model
16 \
- -d_ff
32 \
- -des
'Exp' \
- -itr
1 \
- -top_k
5


完成脚本后可通过 bash 运行，例如：
>>
bash. / scripts / long_term_forecast / ETT_script / TimesNet_ETTh1.sh


运行成功的标志：看到类似以下输出：
>>
Namespace(task_name='long_term_forecast', is_training=1, model_id='ETTh1_96_96', model='TimesNet', data='ETTh1',
          root_path='./dataset/ETT-small/', data_path='ETTh1.csv', features='M', target='OT', freq='h',
          checkpoints='./checkpoints/', seq_len=96, label_len=48, pred_len=96, seasonal_patterns='Monthly',
          inverse=False, mask_rate=0.25, anomaly_ratio=0.25, top_k=5, num_kernels=6, enc_in=7, dec_in=7, c_out=7,
          d_model=16, n_heads=8, e_layers=2, d_layers=1, d_ff=32, moving_avg=25, factor=3, distil=True, dropout=0.1,
          embed='timeF', activation='gelu', output_attention=False, num_workers=10, itr=1, train_epochs=10,
          batch_size=32, patience=3, learning_rate=0.0001, des='Exp', loss='MSE', lradj='type1', use_amp=False,
          use_gpu=False, gpu=0, use_multi_gpu=False, devices='0,1,2,3', p_hidden_dims=[128, 128], p_hidden_layers=2)
Use
GPU: cuda:0
>> >> >> > start
training: long_term_forecast_ETTh1_96_96_TimesNet_ETTh1_ftM_sl96_ll48_pl96_dm16_nh8_el2_dl1_df32_fc3_ebtimeF_dtTrue_Exp_0 >> >> >> >> >> >> >> >> >> >> >> >> >>
train
8449
val
2785
test
2785


训练过程中每轮结束时会打印：
>>
iters: 100, epoch: 1 | loss: 0.4701951
speed: 0.2108
s / iter;
left
time: 535.7317
s
iters: 200, epoch: 1 | loss: 0.4496171
speed: 0.0615
s / iter;
left
time: 150.0223
s
Epoch: 1
cost
time: 30.09317970275879
Epoch: 1, Steps: 264 | Train
Loss: 0.4964185
Vali
Loss: 0.8412074
Test
Loss: 0.4290483
Validation
loss
decreased(inf --> 0.841207).Saving
model...
Updating
learning
rate
to
0.0001


全部训练结束后进入测试，会打印 MAE 和 MSE：
>>
>> >> >> > testing: long_term_forecast_ETTh1_96_96_TimesNet_ETTh1_ftM_sl96_ll48_pl96_dm16_nh8_el2_dl1_df32_fc3_ebtimeF_dtTrue_Exp_0 << << << << << << << << << << << << << << << << <
test
2785
test
shape: (2785, 1, 96, 7)(2785, 1, 96, 7)
test
shape: (2785, 96, 7)(2785, 96, 7)
mse: 0.3890332877635956, mae: 0.41201362013816833


测试结果（PDF 格式）保存在 `test_results` 文件夹中，例如：
#
![ETTm1 2440 结果示例](./result.png)
