In [13]:
import os
import time
import random
from typing import Callable
import numpy as np
import pandas as pd
import paddle
import paddle.nn as nn
from paddle.io import Dataset, DataLoader

  and should_run_async(code)


<h1>训练阶段</h1>

<h2>准备超参数</h2>

In [14]:
def prep_env():
    # type: () -> dict
    """
    Desc:
        Prepare the experimental settings
    Returns:
        The initialized arguments
    """
    settings = {
        "path_to_test_x": "../data/sdwpf_baidukddcup2022_test_toy/test_x",
        "path_to_test_y": "../data/sdwpf_baidukddcup2022_test_toy/test_y",
        "data_path": "../data",
        "filename": "wtbdata_245days.csv",
        "task": "MS",
        "target": "Patv",
        "checkpoints": "models",
        "input_len": 144,
        "output_len": 288,
        "start_col": 3,
        "in_var": 10,
        "out_var": 1,
        "day_len": 144,
        "train_size": 153,
        "val_size": 16,
        "test_size": 15,
        "total_size": 245,
        "lstm_layer": 2,
        "dropout": 0.05,
        "num_workers": 5,
        "train_epochs": 10,
        "batch_size": 32,
        "patience": 3,
        "lr": 1e-4,
        "lr_adjust": "type1",
        "gpu": 0,
        "capacity": 134,
        "turbine_id": 0,
        "pred_file": "predict.py",
        "framework": "paddlepaddle",
        "is_debug": True
    }
    ###
    # Prepare the GPUs
    if paddle.device.is_compiled_with_cuda():
        settings["use_gpu"] = True
        paddle.device.set_device('gpu:{}'.format(settings["gpu"]))
    else:
        settings["use_gpu"] = False
        paddle.device.set_device('cpu')

    print("The experimental settings are: \n{}".format(str(settings)))
    return settings

<h2>准备模型</h2>

In [15]:
class BaselineGruModel(nn.Layer):
    """
    Desc:
        A simple GRU model
    """
    def __init__(self, settings):
        # type: (dict) -> None
        """
        Desc:
            __init__
        Args:
            settings: a dict of parameters
        """
        super(BaselineGruModel, self).__init__()
        self.output_len = settings["output_len"]
        self.hidC = settings["in_var"]
        self.hidR = 48
        self.out_dim = settings["out_var"]
        self.dropout = nn.Dropout(settings["dropout"])
        self.lstm = nn.GRU(input_size=self.hidC, hidden_size=self.hidR, num_layers=settings["lstm_layer"],
                           time_major=True)
        self.projection = nn.Linear(self.hidR, self.out_dim)

    def forward(self, x_enc):
        # type: (paddle.tensor) -> paddle.tensor
        """
        Desc:
            The specific implementation for interface forward
        Args:
            x_enc:
        Returns:
            A tensor
        """
        # 修改：查看前向传播
#         print("--------------")
#         print(x_enc.shape)
        
        x = paddle.zeros([x_enc.shape[0], self.output_len, x_enc.shape[2]])
        x_enc = paddle.concat((x_enc, x), 1)
        
#         print(x_enc.shape)
        
        x_enc = paddle.transpose(x_enc, perm=(1, 0, 2))
        dec, _ = self.lstm(x_enc)
        dec = paddle.transpose(dec, perm=(1, 0, 2))
        sample = self.projection(self.dropout(dec))
        
#         print(sample.shape)
        sample = sample[:, -self.output_len:, -self.out_dim:]
        
        # 修改：查看前向传播过程
#         print(sample.shape)
#         print("--------------")
        return sample  # [B, L, D]

<h2>准备标准化</h2>

In [16]:
class Scaler(object):
    """
    Desc: Normalization utilities
    """
    def __init__(self):
        self.mean = 0.
        self.std = 1.

    def fit(self, data):
        # type: (paddle.tensor) -> None
        """
        Desc:
            Fit the data
        Args:
            data:
        Returns:
            None
        """
        self.mean = np.mean(data)
        self.std = np.std(data)

    def transform(self, data):
        # type: (paddle.tensor) -> paddle.tensor
        """
        Desc:
            Transform the data
        Args:
            data:
        Returns:
            The transformed data
        """
        mean = paddle.to_tensor(self.mean).type_as(data).to(data.device) if paddle.is_tensor(data) else self.mean
        std = paddle.to_tensor(self.std).type_as(data).to(data.device) if paddle.is_tensor(data) else self.std
        return (data - mean) / std

    def inverse_transform(self, data):
        # type: (paddle.tensor) -> paddle.tensor
        """
        Desc:
            Restore to the original data
        Args:
            data: the transformed data
        Returns:
            The original data
        """
        mean = paddle.to_tensor(self.mean) if paddle.is_tensor(data) else self.mean
        std = paddle.to_tensor(self.std) if paddle.is_tensor(data) else self.std
        return (data * std) + mean

<h2>准备数据集</h2>

In [17]:
class WindTurbineDataset(Dataset):
    """
    Desc: Data preprocessing,
          Here, e.g.    15 days for training,
                        3 days for validation,
                        and 6 days for testing
    """
    def __init__(self, data_path,
                 filename='my.csv',
                 flag='train',
                 size=None,
                 turbine_id=0,
                 task='MS',
                 target='Target',
                 scale=True,
                 start_col=2,       # the start column index of the data one aims to utilize
                 day_len=24 * 6,
                 train_days=15,     # 15 days
                 val_days=3,        # 3 days
                 test_days=6,       # 6 days
                 total_days=30      # 30 days
                 ):
        super().__init__()
        self.unit_size = day_len
        if size is None:
            self.input_len = self.unit_size
            self.output_len = self.unit_size
        else:
            self.input_len = size[0]
            self.output_len = size[1]
        # initialization
        assert flag in ['train', 'test', 'val']
        type_map = {'train': 0, 'val': 1, 'test': 2}
        self.set_type = type_map[flag]
        self.task = task
        self.target = target
        self.scale = scale
        self.start_col = start_col
        self.data_path = data_path
        self.filename = filename
        self.tid = turbine_id

        # If needed, we employ the predefined total_size (e.g. one month)
        self.total_size = self.unit_size * total_days
        #
        self.train_size = train_days * self.unit_size
        self.val_size = val_days * self.unit_size
        self.test_size = test_days * self.unit_size
        # self.test_size = self.total_size - train_size - val_size
        #
        # Or, if total_size is unavailable:
        # self.total_size = self.train_size + self.val_size + self.test_size
        self.__read_data__()

    def __read_data__(self):
        self.scaler = Scaler()
        df_raw = pd.read_csv(os.path.join(self.data_path, self.filename))
        border1s = [self.tid * self.total_size,
                    self.tid * self.total_size + self.train_size - self.input_len,
                    self.tid * self.total_size + self.train_size + self.val_size - self.input_len
                    ]
        border2s = [self.tid * self.total_size + self.train_size,
                    self.tid * self.total_size + self.train_size + self.val_size,
                    self.tid * self.total_size + self.train_size + self.val_size + self.test_size
                    ]
        border1 = border1s[self.set_type]
        border2 = border2s[self.set_type]

        df_data = df_raw
        if self.task == 'M':
            cols_data = df_raw.columns[self.start_col:]
            df_data = df_raw[cols_data]
        elif self.task == 'MS':
            cols_data = df_raw.columns[self.start_col:]
            df_data = df_raw[cols_data]
        elif self.task == 'S':
            df_data = df_raw[[self.tid, self.target]]

        # Turn off the SettingWithCopyWarning
        pd.set_option('mode.chained_assignment', None)
        df_data.replace(to_replace=np.nan, value=0, inplace=True)

        if self.scale:
            train_data = df_data[border1s[0]:border2s[0]]            
            self.scaler.fit(train_data.values)
            data = self.scaler.transform(df_data.values)
        else:
            data = df_data.values
        self.data_x = data[border1:border2]
        self.data_y = data[border1:border2]
        self.raw_data = df_data[border1 + self.input_len:border2]

    def get_raw_data(self):
        return self.raw_data

    def __getitem__(self, index):
        #
        # Only for customized use.
        # When sliding window not used, e.g. prediction without overlapped input/output sequences
        if self.set_type >= 3:
            index = index * self.output_len
        #
        # Standard use goes here.
        # Sliding window with the size of input_len + output_len
        s_begin = index
        s_end = s_begin + self.input_len
        r_begin = s_end
        r_end = r_begin + self.output_len
        seq_x = self.data_x[s_begin:s_end]
        seq_y = self.data_y[r_begin:r_end]
        return seq_x, seq_y

    def __len__(self):
        # In our case, the sliding window is adopted, the number of samples is calculated as follows
        if self.set_type < 3:
            return len(self.data_x) - self.input_len - self.output_len + 1
        # Otherwise, if sliding window is not adopted
        return int((len(self.data_x) - self.input_len) / self.output_len)

    def inverse_transform(self, data):
        return self.scaler.inverse_transform(data)

<h2>调整学习率</h2>

In [18]:
def adjust_learning_rate(optimizer, epoch, args):
    # type: (paddle.optimizer.Adam, int, dict) -> None
    """
    Desc:
        Adjust learning rate
    Args:
        optimizer:
        epoch:
        args:
    Returns:
        None
    """
    # lr = args.lr * (0.2 ** (epoch // 2))
    lr_adjust = {}
    if args["lr_adjust"] == 'type1':
        # learning_rate = 0.5^{epoch-1}
        lr_adjust = {epoch: args["lr"] * (0.50 ** (epoch - 1))}
    elif args["lr_adjust"] == 'type2':
        lr_adjust = {
            2: 5e-5, 4: 1e-5, 6: 5e-6, 8: 1e-6,
            10: 5e-7, 15: 1e-7, 20: 5e-8
        }
    if epoch in lr_adjust:
        lr = lr_adjust[epoch]
        optimizer.set_lr(lr)

<h2>EarlyStop</h2>

In [19]:
class EarlyStopping(object):
    """
    Desc:
        EarlyStopping
    """
    def __init__(self, patience=7, verbose=False, delta=0):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.best_model = False

    def save_checkpoint(self, val_loss, model, path, tid):
        # type: (nn.MSELoss, BaselineGruModel, str, int) -> None
        """
        Desc:
            Save current checkpoint
        Args:
            val_loss: the validation loss
            model: the model
            path: the path to be saved
            tid: turbine ID
        Returns:
            None
        """
        self.best_model = True
        self.val_loss_min = val_loss
        paddle.save(model.state_dict(), path + '/' + 'model_' + str(tid))

    def __call__(self, val_loss, model, path, tid):
        # type: (nn.MSELoss, BaselineGruModel, str, int) -> None
        """
        Desc:
            __call__
        Args:
            val_loss: the validation loss
            model: the model
            path: the path to be saved
            tid: turbine ID
        Returns:
            None
        """
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model, path, tid)
        elif score < self.best_score + self.delta:
            self.counter += 1
            self.best_model = False
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.update_hidden = True
            self.save_checkpoint(val_loss, model, path, tid)
            self.counter = 0

<h2>实验操作</h2>

In [20]:
class Experiment(object):
    """
    Desc:
        The experiment to train, validate and test a model
    """
    def __init__(self, args):
        # type: (dict) -> None
        """
        Desc:
            __init__
        Args:
            args: the arguments to initialize the experimental environment
        """
        self.model = BaselineGruModel(args)
        self.args = args

    def get_model(self):
        # type: () -> BaselineGruModel
        """
        Desc:
            the model
        Returns:
            An instance of the model
        """
        return self.model

    def get_args(self):
        # type: () -> dict
        """
        Desc:
            Get the arguments
        Returns:
            A dict
        """
        return self.args

    def get_data(self, flag):
        # type: (str) -> (WindTurbineDataset, DataLoader)
        """
        Desc:
            get_data
        Args:
            flag: train or test
        Returns:
            A dataset and a dataloader
        """
        if flag == 'test':
            shuffle_flag = False
            drop_last = True
        else:
            shuffle_flag = True
            drop_last = True
        data_set = WindTurbineDataset(
            data_path=self.args["data_path"],
            filename=self.args["filename"],
            flag=flag,
            size=[self.args["input_len"], self.args["output_len"]],
            task=self.args["task"],
            target=self.args["target"],
            start_col=self.args["start_col"],
            turbine_id=self.args["turbine_id"],
            day_len=self.args["day_len"],
            train_days=self.args["train_size"],
            val_days=self.args["val_size"],
            test_days=self.args["test_size"],
            total_days=self.args["total_size"]
        )
        data_loader = DataLoader(
            data_set,
            batch_size=self.args["batch_size"],
            shuffle=shuffle_flag,
            num_workers=self.args["num_workers"],
            drop_last=drop_last
        )
        return data_set, data_loader

    def load_train_data(self):
        # type: () -> WindTurbineData
        """
        Desc:
            Load train data to get the scaler for testing
        Returns:
            The train set
        """
        train_data = WindTurbineData(
            data_path=self.args["data_path"],
            filename=self.args["filename"],
            flag='train',
            size=[self.args["input_len"], self.args["output_len"]],
            task=self.args["task"],
            target=self.args["target"],
            start_col=self.args["start_col"],
            day_len=self.args["day_len"],
            train_days=self.args["train_size"],
            val_days=self.args["val_size"],
            total_days=self.args["total_size"],
            is_test=True
        )
        return train_data

    @staticmethod
    def get_test_x(args):
        # type: (dict) -> TestData
        """
        Desc:
            Obtain the input sequence for testing
        Args:
            args:
        Returns:
            Normalized input sequences and training data
        """
        test_x = TestData(path_to_data=args["path_to_test_x"], farm_capacity=args["capacity"])
        return test_x

    def inference_one_sample(self, model, sample_x):
        # type: (BaselineGruModel, paddle.tensor) -> paddle.tensor
        """
        Desc:
            Inference one sample
        Args:
            model:
            sample_x:
        Returns:
            Predicted sequence with sample_x as input
        """
        x = sample_x.astype('float32')
        prediction = model(x)
        f_dim = -1 if self.args["task"] == 'MS' else 0
        return prediction[..., :, f_dim:].astype('float32')

    def get_optimizer(self):
        # type: () -> paddle.optimizer.Adam
        """
        Desc:
            Get the optimizer
        Returns:
            An optimizer
        """
        clip = paddle.nn.ClipGradByNorm(clip_norm=50.0)
        model_optim = paddle.optimizer.Adam(parameters=self.model.parameters(),
                                            learning_rate=self.args["lr"],
                                            grad_clip=clip)
        return model_optim

    @staticmethod
    def get_criterion():
        # type: () -> nn.MSELoss
        """
        Desc:
            Use the mse loss as the criterion
        Returns:
            MSE loss
        """
        criterion = nn.MSELoss(reduction='mean')
        return criterion

    def process_one_batch(self, batch_x, batch_y):
        # type: (paddle.tensor, paddle.tensor) -> (paddle.tensor, paddle.tensor)
        """
        Desc:
            Process a batch
        Args:
            batch_x:
            batch_y:
        Returns:
            prediction and ground truth
        """
        batch_x = batch_x.astype('float32')
        batch_y = batch_y.astype('float32')
        sample = self.model(batch_x)
        #
        # If the task is the multivariate-to-univariate forecasting task,
        # the last column is the target variable to be predicted
        f_dim = -1 if self.args["task"] == 'MS' else 0
        #
        batch_y = batch_y[:, -self.args["output_len"]:, f_dim:].astype('float32')
        sample = sample[:, :, f_dim:].astype('float32')
        return sample, batch_y

<h2>遍历样本</h2>

In [21]:
def traverse_wind_farm(method, params, model_path, flag='train'):
    # type: (Callable, dict, str, str) -> list
    """
    Desc:
        Traverse the turbines in a wind farm on by one
    Args:
        method: the method for training or testing on the records of one turbine
        params: the arguments initialized
        model_path: the folder name of the model
        flag: 'train' or 'test'
    Returns:
        Predictions (for test) or None
    """
    responses = []
    start_time = time.time()
    for i in range(params["capacity"]):
        params["turbine_id"] = i
        exp = Experiment(params)
        if 'train' == flag:
            print('>>>>>>> Training Turbine {:3d} >>>>>>>>>>>>>>>>>>>>>>>>>>\n'.format(i))
            method(exp, model_path, is_debug=params["is_debug"])
        elif 'test' == flag:
            print('>>>>>>> Forecasting Turbine {:3d} >>>>>>>>>>>>>>>>>>>>>>>>>>\n'.format(i))
            res = method(exp, model_path)
            responses.append(res)
        else:
            pass
        paddle.device.cuda.empty_cache()
        if params["is_debug"]:
            end_time = time.time()
            print("Elapsed time for {} turbine {} is {} secs".format("training" if "train" == flag else "predicting", i,
                                                                     end_time - start_time))
            start_time = end_time
        
        # 修改：只遍历一个样本
        break
    if 'test' == flag:
        return responses

<h2>验证</h2>

In [22]:
def val(experiment, data_loader, criterion):
    # type: (Experiment, DataLoader, Callable) -> np.array
    """
    Desc:
        Validation function
    Args:
        experiment:
        data_loader:
        criterion:
    Returns:
        The validation loss
    """
    validation_loss = []
    for i, (batch_x, batch_y) in enumerate(data_loader):
        sample, true = experiment.process_one_batch(batch_x, batch_y)
        loss = criterion(sample, true)
        validation_loss.append(loss.item())
    validation_loss = np.average(validation_loss)
    return validation_loss

<h2>训练并验证</h2>

In [23]:
def train_and_val(experiment, model_folder, is_debug=False):
    # type: (Experiment, str, bool) -> None
    """
    Desc:
        Training and validation
    Args:
        experiment:
        model_folder: folder name of the model
        is_debug:
    Returns:
        None
    """
    args = experiment.get_args()
    model = experiment.get_model()
    train_data, train_loader = experiment.get_data(flag='train')
    val_data, val_loader = experiment.get_data(flag='val')

    path_to_model = os.path.join(args["checkpoints"], model_folder)
    if not os.path.exists(path_to_model):
        os.makedirs(path_to_model)

    time_now = time.time()
    early_stopping = EarlyStopping(patience=args["patience"], verbose=True)
    model_optim = experiment.get_optimizer()
    criterion = Experiment.get_criterion()

    epoch_start_time = time.time()
    for epoch in range(args["train_epochs"]):
        iter_count = 0
        train_loss = []
        model.train()
        for i, (batch_x, batch_y) in enumerate(train_loader):
            # 修改：我要查看一下样本长什么样子
#             print(batch_x)
#             print(batch_y)
            
            iter_count += 1
            sample, truth = experiment.process_one_batch(batch_x, batch_y)
            
            # 修改：我要查看一下输出是什么样子
#             print(sample)
#             print(truth)
            
            loss = criterion(sample, truth)
            train_loss.append(loss.item())
            loss.backward()
            model_optim.minimize(loss)
            model_optim.step()
        val_loss = val(experiment, val_loader, criterion)

        if is_debug:
            train_loss = np.average(train_loss)
            epoch_end_time = time.time()
            print("Epoch: {}, \nTrain Loss: {}, \nValidation Loss: {}".format(epoch, train_loss, val_loss))
            print("Elapsed time for epoch-{}: {}".format(epoch, epoch_end_time - epoch_start_time))
            epoch_start_time = epoch_end_time

        # Early Stopping if needed
        early_stopping(val_loss, model, path_to_model, args["turbine_id"])
        if early_stopping.early_stop:
            print("Early stopped! ")
            break
        adjust_learning_rate(model_optim, epoch + 1, args)

<h2>开始训练</h2>

In [24]:
settings = prep_env()

# Set up the initial environment
# Current settings for the model
cur_setup = '{}_t{}_i{}_o{}_ls{}_train{}_val{}'.format(
    settings["filename"], settings["task"], settings["input_len"], settings["output_len"], settings["lstm_layer"],
    settings["train_size"], settings["val_size"]
)

traverse_wind_farm(train_and_val, settings, cur_setup)

The experimental settings are: 
{'path_to_test_x': '../data/sdwpf_baidukddcup2022_test_toy/test_x', 'path_to_test_y': '../data/sdwpf_baidukddcup2022_test_toy/test_y', 'data_path': '../data', 'filename': 'wtbdata_245days.csv', 'task': 'MS', 'target': 'Patv', 'checkpoints': 'models', 'input_len': 144, 'output_len': 288, 'start_col': 3, 'in_var': 10, 'out_var': 1, 'day_len': 144, 'train_size': 153, 'val_size': 16, 'test_size': 15, 'total_size': 245, 'lstm_layer': 2, 'dropout': 0.05, 'num_workers': 5, 'train_epochs': 10, 'batch_size': 32, 'patience': 3, 'lr': 0.0001, 'lr_adjust': 'type1', 'gpu': 0, 'capacity': 134, 'turbine_id': 0, 'pred_file': 'predict.py', 'framework': 'paddlepaddle', 'is_debug': True, 'use_gpu': True}
>>>>>>> Training Turbine   0 >>>>>>>>>>>>>>>>>>>>>>>>>>

(22032, 10)
(22032, 10)


KeyboardInterrupt: 