## 加载数据集
设定参数在这里修改

In [None]:
import numpy as np #导入numpy工具包
data_path = 'dataset/etth1.csv' #数据文件路径
dataset_name = 'etth1'
enc_in = 7 #特征数
seq_len = 720
pred_len = [96,192,336,720] #预测的时间步数
period_len = 24 # 周期
train_epochs = 30
patience = 5
batch_size = 256
learning_rate = 0.02
model_type = 'linear'
raw_data = np.loadtxt(data_path, delimiter=',', skiprows=1, usecols=range(1,enc_in+1)) #读取数据（忽略第1行的标题及第1列的时间戳）
print('数据形状：{0}，元素类型：{1}'.format(raw_data.shape, raw_data.dtype))

## 将输入数据标准化

In [None]:
from sklearn.preprocessing import StandardScaler

# 标准化 data
scaler = StandardScaler()
data = scaler.fit_transform(raw_data)  # 标准化后的数据


## 生成模型输入与输出数据

In [None]:
def generateData(data, seq_len, pred_len, enc_in):#定义generateData函数
    point_num = data.shape[0] #时间点总数
    sample_num = point_num-seq_len-pred_len+1 #生成的总样本数
    X = np.zeros((sample_num, seq_len, enc_in)) #用于保存输入数据
    Y = np.zeros((sample_num, pred_len, enc_in)) #用于保存对应的输出数据
    for i in range(sample_num): #通过遍历逐一生成输入数据和对应的输出数据
        X[i] = data[i:i+seq_len] #前seq_len个时间点数据组成输入数据
        Y[i] = data[i+seq_len:i+seq_len+pred_len]#后pred_len个时间点数据组成输出数据
    return X, Y #返回所生成的模型的输入数据X和输出数据Y

## 生成各个预测长度对应的数据集

In [None]:
X_datasets = []
Y_datasets = []
# sub_data = data[:1000,:] # 数据集太大的切割
for i in range(len(pred_len)):
    X_i, Y_i = generateData(data, seq_len, pred_len[i], enc_in) #生成任务1所用的数据集
    X_datasets.append(X_i)
    Y_datasets.append(Y_i)
    print('任务{0}数据集输入数据形状：{1}，输出数据形状：{2}'.format(i+1,X_i.shape, Y_i.shape))

## 划分训练集、验证集和测试集

In [None]:
def splitData(X, Y): #定义splitData函数
    N = X.shape[0] #样本总数
    train_X,train_Y=X[:int(N*0.6)],Y[:int(N*0.6)] #前60%的数据作为训练集
    vali_X,vali_Y=X[int(N*0.6):int(N*0.8)],Y[int(N*0.6):int(N*0.8)] #中间20%的数据作为验证集
    test_X,test_Y=X[int(N*0.8):],Y[int(N*0.8):] #最后20%的数据作为测试集
    return train_X,train_Y, vali_X,vali_Y, test_X,test_Y#返回划分好的数据集

train_X_datasets = []
train_Y_datasets = []
vali_X_datasets = []
vali_Y_datasets = []
test_X_datasets = []
test_Y_datasets = []

for i in range(len(pred_len)):
    train_X_i, train_Y_i, vali_X_i, vali_Y_i, test_X_i, test_Y_i=splitData(X_datasets[i], Y_datasets[i])
    train_X_datasets.append(train_X_i)
    train_Y_datasets.append(train_Y_i)
    vali_X_datasets.append(vali_X_i)
    vali_Y_datasets.append(vali_Y_i)
    test_X_datasets.append(test_X_i)
    test_Y_datasets.append(test_Y_i)
    print('任务{0}训练集样本数：{1}，验证集样本数：{2}，测试集样本数：{3}'.format(i+1,train_X_i.shape[0], vali_X_i.shape[0], test_X_i.shape[0])) #输出任务i训练集、验证集和测试集的样本数

## 定义数据序列类

In [None]:
class MultiTimeSeriesDataset(): #定义MultiTimeSeriesDataset类
    def __init__(self, X, Y): #构造方法
        self.X, self.Y = X, Y #设置输入数据和输出数据
    def __len__(self):
        return len(self.X) #获取数据的长度
    def __getitem__(self, index):
        return self.X[index], self.Y[index] #根据索引值为index的数据

## 生成MindSpore数据集

In [None]:
from mindspore import Tensor #导入Tensor类
from mindspore.dataset import GeneratorDataset #导入GeneratorDataset类
def generateMindsporeDataset(X, Y, batch_size): #定义generateMindsporeDataset函数
    dataset = MultiTimeSeriesDataset(X.astype(np.float32), Y.astype(np.float32)) #根据X和Y创建MultiTimeSeriesDataset类对象
    dataset = GeneratorDataset(dataset, column_names=['data','label']) #创建GeneratorDataset类对象，并指定数据集两列的列名称分别是data和label
    dataset = dataset.batch(batch_size=batch_size, drop_remainder=False) #将数据集分成多个批次，以支持批量训练
    return dataset #返回可用于模型训练和测试的数据集

## 定义config类

In [None]:
class Configs:
    def __init__(
        self, 
        seq_len=720, 
        pred_len=96, 
        enc_in=7, 
        period_len=24, 
        train_epochs = 30,
        patience = 5,
        learning_rate = 0.02,
        dataset_name = 'unknown',
        model_type='linear',
        d_model=64
    ):
        """
        初始化配置参数
        :param seq_len: 输入序列长度
        :param pred_len: 预测序列长度
        :param enc_in: 特征维度数
        :param period_len: 周期长度
        :param d_model: MLP的隐藏层维度
        :param model_type: 模型类型（'linear' 或 'mlp'）
        """
        self.seq_len = seq_len
        self.pred_len = pred_len
        self.enc_in = enc_in
        self.period_len = period_len
        self.train_epochs = train_epochs
        self.patience = patience
        self.learning_rate = learning_rate
        self.d_model = d_model
        self.model_type = model_type
        self.dataset_name = dataset_name
        
    def __str__(self):
        return (
            f"Configs(seq_len={self.seq_len}, pred_len={self.pred_len}, "
            f"enc_in={self.enc_in}, period_len={self.period_len}, "
            f"train_epochs={self.train_epochs}, patience={self.patience}, flearning_rate={self.learning_rate}, "
            f"dataset_name={self.dataset_name},model_type='{self.model_type}')"
        )
    
    def __repr__(self):
        return self.__str__()


## 定义SparseTSF类

In [None]:
import mindspore
from mindspore import nn, ops

class SparseTSF(nn.Cell):  # 定义 SparseTSF 类
    def __init__(self, configs):
        super(SparseTSF, self).__init__()
        print(configs)
        # 获取参数
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.enc_in = configs.enc_in
        self.period_len = configs.period_len
        self.patience = configs.patience
        self.d_model = configs.d_model
        self.model_type = configs.model_type
        self.dataset_name = configs.dataset_name
        assert self.model_type in ['linear', 'mlp']

        self.seg_num_x = self.seq_len // self.period_len
        self.seg_num_y = self.pred_len // self.period_len

        # 1D卷积层，用于时间序列局部特征提取
        self.conv1d = nn.Conv1d(
            in_channels=1,
            out_channels=1,
            kernel_size=1 + 2 * (self.period_len // 2),
            stride=1,
            pad_mode='pad',
            padding=self.period_len // 2,
            has_bias=False
        )

        # 稀疏预测层，支持linear和mlp两种类型
        if self.model_type == 'linear':
            self.linear = nn.Dense(self.seg_num_x, self.seg_num_y, has_bias=False)
        elif self.model_type == 'mlp':
            self.mlp = nn.SequentialCell(
                nn.Dense(self.seg_num_x, self.d_model),
                nn.ReLU(),
                nn.Dense(self.d_model, self.seg_num_y)
            )

    def construct(self, x):
        batch_size = x.shape[0]

        # 归一化并调整维度：b, s, c -> b, c, s
        seq_mean = ops.mean(x, 1, keep_dims=True)
        # seq_std = ops.std(x,1,0,keepdims=True)
        
        # print(x.shape, seq_mean.shape, seq_std.shape)
        #x = ops.permute((x - seq_mean)/seq_std, (0,2,1))
        x = ops.permute((x - seq_mean), (0,2,1))
        # 1D卷积聚合局部特征
        x = self.conv1d(x.reshape(-1, 1, self.seq_len)).reshape(-1, self.enc_in, self.seq_len) + x
        # 降采样：b, c, s -> bc, n, w -> bc, w, n
        x = ops.permute(x.reshape(-1, self.seg_num_x, self.period_len), (0, 2, 1))
        # 稀疏预测
        if self.model_type == 'linear':
            y = self.linear(x)  # bc, w, m
        elif self.model_type == 'mlp':
            y = self.mlp(x)
        # 上采样：bc, w, m -> bc, m, w -> b, c, s
        y = ops.permute(y,(0, 2, 1)).reshape(batch_size, self.enc_in, self.pred_len)
        
        # 调整维度并还原均值
        # y = ops.permute(y,(0, 2, 1)) * seq_std + seq_mean
        y = ops.permute(y,(0, 2, 1))  + seq_mean
        # print("----mark")
        return y

## 学习率调整
根据SparseTSF代码，学习率调整选择其中type3策略

In [None]:
def generate_learning_rate_tensor(learning_rate, max_epoch, steps_per_epoch):
    # 使用列表生成学习率序列，每个epoch生成steps_per_epoch个相同的学习率
    lr_list = []
    for epoch in range(1, max_epoch + 1):
        if epoch < 3:
            lr = learning_rate
        else:
            lr = learning_rate * (0.8 ** (epoch - 3))
        # 为当前epoch生成steps_per_epoch个学习率
        lr_list.extend([lr] * steps_per_epoch)
    
    # 转换为 MindSpore Tensor
    lr_tensor = mindspore.Tensor(np.array(lr_list), mindspore.float32)
    return lr_tensor

## EarlyStopping类

In [None]:
class EarlyStopping:
    def __init__(self, patience=7, verbose=False, delta=0):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta

    def __call__(self, val_loss, model, path):
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model, path)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'早停轮次: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model, path)
            self.counter = 0

    def save_checkpoint(self, val_loss, model, path):
        if self.verbose:
            print(f'验证损失降低 ({self.val_loss_min:.6f} --> {val_loss:.6f}).  保存模型 于{path}')
        mindspore.save_checkpoint(model, path)
        self.val_loss_min = val_loss

## 定义run_exp类

In [None]:
import mindspore
from mindspore import Tensor
import numpy as np
import os
import time
import matplotlib.pyplot as plt

class SparseTSFModelRun:
    def __init__(self, model, loss_fn, optimizer=None, grad_fn=None):
        self.model = model
        self.loss_fn = loss_fn
        self.optimizer = optimizer
        self.grad_fn = grad_fn

    def _train_one_step(self, data, label):
        (loss, _), grads = self.grad_fn(data, label)
        self.optimizer(grads)
        return loss

    def _train_one_epoch(self, train_dataset):
        self.model.set_train(True)
        for data, label in train_dataset.create_tuple_iterator():
            self._train_one_step(data, label)

    def evaluate(self, dataset):
        self.model.set_train(False)
        ls_pred, ls_label = [], []
        for data, label in dataset.create_tuple_iterator():
            pred = self.model(data)
            ls_pred += list(pred[:, :, -1].asnumpy())
            ls_label += list(label[:, :, -1].asnumpy())
        
        preds_tensor = Tensor(ls_pred)
        labels_tensor = Tensor(ls_label)
        loss = self.loss_fn(preds_tensor, labels_tensor)
        return loss.asnumpy(), np.array(ls_pred), np.array(ls_label)

    def train(self, train_dataset, vali_dataset, test_dataset, max_epoch_num):
        min_loss = np.finfo(np.float32).max
        steps_per_epoch = train_dataset.get_dataset_size()
        print(f"每个 epoch 的 step 数量: {steps_per_epoch}")
        # 对应文件夹名
        folder = f'{self.model.dataset_name}_{self.model.seq_len}_{self.model.pred_len}_{self.model.period_len}_{self.model.enc_in}_{self.model.model_type}'
        ckpt_path = f'checkpoints/{folder}/checkpoint.ckpt'
        # 如无创建
        os.makedirs(f'checkpoints/{folder}', exist_ok=True)
        # 配置早停
        early_stopping = EarlyStopping(patience=self.model.patience, verbose=True)
        print('>>>>>>>>>开始训练>>>>>>>>>')

        for epoch in range(1, max_epoch_num + 1):
            start_time = time.time()

            print(f'第{epoch}/{max_epoch_num}轮')
            print(f'当前学习率调整为：{self.optimizer.get_lr()}')
            self._train_one_epoch(train_dataset)

            train_loss, _, _ = self.evaluate(train_dataset)
            val_loss, _, _ = self.evaluate(vali_dataset)
            test_loss, preds, labels = self.evaluate(test_dataset)

            end_time = time.time()
            epoch_time = end_time - start_time

            print(f'训练集损失: {train_loss}, 验证集损失: {val_loss}, 测试集损失: {test_loss}, 用时: {epoch_time:.2f}s')

            if val_loss < min_loss:
                mindspore.save_checkpoint(self.model, ckpt_path)
                min_loss = val_loss

            # 输出模型训练图表
            if epoch % 10 == 1 or epoch == max_epoch_num:
                for data0, label0 in test_dataset.create_tuple_iterator():
                    inputs = data0[:, :, -1].asnumpy()
                    gt = np.concatenate((inputs[0, :], labels[0, :]), axis=0)
                    pd = np.concatenate((inputs[0, :], preds[0, :]), axis=0)
                    break
                self._plot_results(gt, pd, epoch,'train', folder)

            # 判断早停
            early_stopping(val_loss, self.model, ckpt_path)
            if early_stopping.early_stop:
                print("Early stopping")
                break

        print('训练完成！')
        self.test(test_dataset, ckpt_path, folder)

    def test(self, test_dataset, ckpt_path, img_folder):
        print('>>>>>>>>>开始测试>>>>>>>>>')
        mindspore.load_checkpoint(ckpt_path, net=self.model)
        loss, preds, labels = self.evaluate(test_dataset)
        print(f'测试集损失: {loss}')
        for data0, label0 in test_dataset.create_tuple_iterator():
            inputs = data0[:, :, -1].asnumpy()
            gt = np.concatenate((inputs[0, :], labels[0, :]), axis=0)
            pd = np.concatenate((inputs[0, :], preds[0, :]), axis=0)
            break
        self._plot_results(gt, pd, 0,'test', img_folder)

    def _plot_results(self, groundtruth, prediction, identifier, runtype, folder):
        os.makedirs(f"{runtype}_result", exist_ok=True)
        plt.figure(figsize=(10, 6))
        plt.plot(groundtruth, label='Ground Truth', color='blue', linewidth=2)
        plt.plot(prediction, label='Prediction', color='orange', linewidth=2)
        plt.title(f'Prediction vs Ground Truth - {identifier}')
        plt.xlabel('Time Step')
        plt.ylabel('Value')
        plt.legend()
        plt.tight_layout()

        pdf_path = f"{runtype}_result/{folder}/result_{identifier}.pdf"
        # 提取目标文件的目录路径
        folder_path = os.path.dirname(pdf_path)
        # 创建目标文件夹（如果不存在）
        os.makedirs(folder_path, exist_ok=True)
        plt.savefig(pdf_path)
        plt.close()
        print(f'结果图已保存到 {pdf_path}')


## 开始预测

In [None]:
for index in range(len(pred_len)):
    print(f'-----------当前预测长度为{pred_len[index]}')
    # 生成对应mindspore数据集
    train_dataset_t = generateMindsporeDataset(train_X_datasets[index], train_Y_datasets[index], batch_size)
    vali_dataset_t = generateMindsporeDataset(vali_X_datasets[index], vali_Y_datasets[index], batch_size)
    test_dataset_t = generateMindsporeDataset(vali_X_datasets[index], vali_Y_datasets[index], batch_size)
    for data, label in train_dataset_t.create_tuple_iterator():
        print('数据形状：', data.shape, '，数据类型：', data.dtype)
        print('标签形状：', label.shape, '，数据类型：', label.dtype)
        break
    configs = Configs(seq_len, pred_len[index], enc_in, period_len,train_epochs, patience, learning_rate, dataset_name,model_type)
    model = SparseTSF(configs) #创建模型对象
    loss_fn = nn.MAELoss() #定义损失函数
    #使用Adam优化器 并且使用type3学习率调整策略
    steps_per_epoch = train_dataset_t.get_dataset_size()
    learning_rate_tensor = generate_learning_rate_tensor(configs.learning_rate,configs.train_epochs,steps_per_epoch)
    optimizer = nn.Adam(model.trainable_params(), learning_rate_tensor) 
    def forward_fn(data, label): #定义前向计算的forward_fn函数
        pred = model(data) #使用sparseTSF模型进行预测
        loss = loss_fn(pred[:,:,:], label[:,:,:]) #根据损失函数计算损失值
        return loss, pred #返回损失值和预测结果
    grad_fn = mindspore.value_and_grad(forward_fn, None, optimizer.parameters, has_aux=True) #获取用于计算梯度的函数
    model_run = SparseTSFModelRun(model, loss_fn, optimizer, grad_fn) #创建MODEL_RUN类对象model_run
    model_run.train(train_dataset=train_dataset_t, vali_dataset=vali_dataset_t,test_dataset=test_dataset_t, max_epoch_num=configs.train_epochs) #调用model_run.train方法完成训练