In [None]:
import pandas as pd
import numpy as np
import os
from typing import List, Tuple
from dataset_utils import split_data, TimeSeriesDataset, min_max_normalization
from model.lstm import LSTMMdel
from model.nlinear import NLinear
from model.SegRNN import SegRNN
from model.PatchTST import PatchTST,Config
from train_utils import Trainer
from evaluate_utils import Evaluator
import torch
import torch.nn as nn
import torch.nn.init as init
from torch.utils.data import DataLoader
from sklearn.preprocessing import MinMaxScaler,StandardScaler
from collections import defaultdict
from typing import Dict
import threading
import json
from json_utils import NpEncoder

In [None]:
dataset_path = "../../dataset"
index_field = "timestamp"
data_field = "num_request"

BATCH_SIZE = 1
N_HISTORY = 32
N_LOOKBACK = 4
N_PREDICT = 2
LR = 1e-2
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

RESULT_ROOT_PATH="results"
MODEL_NAME="lstm"
# MODEL_NAME="nlinear"
# MODEL_NAME="segrnn"
# MODEL_NAME="patchtst"

TRAIN_EPOCH=20

# Trainer also supports training by early stop
EARLY_STOP_GAIN=0.01
EARLY_STOP_LOSS=0.02

start_index = -1
start_index_end = -1
start_index_lock = threading.Lock()

N_THREAD = 8

train_gt_scaled = {}
train_pd_scaled = {}
test_gt_scaled = {}
test_pd_scaled = {}

train_gt_original = {}
train_pd_original = {}
test_gt_original = {}
test_pd_original = {}

In [None]:
def get_data_file_list(dataset_path: str) -> List[str]:
    return os.listdir(dataset_path)


In [None]:
def read_dataset(csv_path: str) -> Tuple[np.ndarray, np.ndarray]:
    df = pd.read_csv(csv_path)
    return df[index_field].to_numpy(), df[data_field].to_numpy()

In [None]:
def learn_on_history(history_data: np.ndarray) -> Tuple[Evaluator, MinMaxScaler, Evaluator, MinMaxScaler]:
    train_set, test_set = split_data(history_data, N_LOOKBACK, N_PREDICT)
    train_set, train_scaler = min_max_normalization(train_set)
    test_set, test_scaler = min_max_normalization(test_set)
    train_dataset = TimeSeriesDataset(train_set, N_LOOKBACK, N_PREDICT)
    test_dataset = TimeSeriesDataset(test_set, N_LOOKBACK, N_PREDICT)
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
    if MODEL_NAME=="lstm":
        model = LSTMMdel(1, 64, N_PREDICT, 2).to(DEVICE)
    elif MODEL_NAME=="nlinear":
        model=NLinear(N_LOOKBACK,N_PREDICT).to(DEVICE)
    elif MODEL_NAME=="segrnn":
        model = SegRNN(seq_len=N_LOOKBACK, pred_len=N_PREDICT, enc_in=1, d_model=64, dropout=0.5, rnn_type="lstm", dec_way="rmf", seg_len=1, channel_id=False, revin=False).to(DEVICE)
    elif MODEL_NAME=="patchtst":
        model = PatchTST(configs=Config(enc_in=1, seq_len=N_LOOKBACK, pred_len=N_PREDICT, e_layers=1, n_heads=4, d_model=16, d_ff=16, dropout=0.5, fc_dropout=0.5, head_dropout=0.5, individual=False, patch_len=1, stride=1, padding_patch=False, revin=False, affine=False, subtract_last=False, decomposition=False, kernel_size=1)).to(DEVICE)
    
    # for name, param in model.named_parameters():
    #     if 'weight' in name:
    #         init.xavier_normal_(param)
    #     elif 'bias' in name:
    #         init.constant_(param, 0.0)
    
    optimizer = torch.optim.AdamW(model.parameters(), lr=LR)
    loss_fn = nn.MSELoss()
    trainer = Trainer(model, train_dataloader, loss_fn, optimizer, num_epochs=TRAIN_EPOCH, early_stop_gain=EARLY_STOP_GAIN, early_stop_loss=EARLY_STOP_LOSS, lr_scheduler=None, device=DEVICE)
    # use `trainer.train_by_early_stop()` as an alternative
    trainer.train_by_epoch()
    train_evaluator = Evaluator(model, train_dataloader, loss_fn, DEVICE)
    train_evaluator.evaluate()
    test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
    test_evaluator = Evaluator(model, test_dataloader, loss_fn, DEVICE)
    test_evaluator.evaluate()
    return train_evaluator, train_scaler, test_evaluator, test_scaler

In [None]:
def save_to_dict_scaled(train_evaluator: Evaluator, test_evaluator: Evaluator, local_start_index: int):
    train_gt_scaled[local_start_index] = train_evaluator.get_gt()
    train_pd_scaled[local_start_index] = train_evaluator.get_pd()
    test_gt_scaled[local_start_index+N_HISTORY-N_LOOKBACK] = test_evaluator.get_gt()
    test_pd_scaled[local_start_index+N_HISTORY-N_LOOKBACK] = test_evaluator.get_pd()

In [None]:
def save_to_dict_original(train_evaluator: Evaluator, train_scaler: MinMaxScaler, test_evaluator: Evaluator, test_scaler: MinMaxScaler, local_start_index: int):
    tmp_train_gt_scaled, tmp_train_pd_scaled, tmp_test_gt_scaled, tmp_test_pd_scaled = train_evaluator.get_gt(), train_evaluator.get_pd(), test_evaluator.get_gt(), test_evaluator.get_pd()

    tmp_train_gt_original = np.hstack([train_scaler.inverse_transform(tmp_train_gt_scaled[:, i_dim]) for i_dim in range(tmp_train_gt_scaled.shape[1])])
    tmp_train_gt_original = np.expand_dims(tmp_train_gt_original, -1)
    tmp_train_pd_original = np.hstack([train_scaler.inverse_transform(tmp_train_pd_scaled[:, i_dim]) for i_dim in range(tmp_train_pd_scaled.shape[1])])
    tmp_train_pd_original = np.expand_dims(tmp_train_pd_original, -1)
    tmp_test_gt_original = np.hstack([test_scaler.inverse_transform(tmp_test_gt_scaled[:, i_dim]) for i_dim in range(tmp_test_gt_scaled.shape[1])])
    tmp_test_gt_original = np.expand_dims(tmp_test_gt_original, -1)
    tmp_test_pd_original = np.hstack([test_scaler.inverse_transform(tmp_test_pd_scaled[:, i_dim]) for i_dim in range(tmp_test_pd_scaled.shape[1])])
    tmp_test_pd_original = np.expand_dims(tmp_test_pd_original, -1)

    train_gt_original[local_start_index] = tmp_train_gt_original
    train_pd_original[local_start_index] = tmp_train_pd_original
    test_gt_original[local_start_index+N_HISTORY-N_LOOKBACK] = tmp_test_gt_original
    test_pd_original[local_start_index+N_HISTORY-N_LOOKBACK] = tmp_test_pd_original

In [None]:
def run_learning(np_data: np.ndarray):
    global start_index, start_index_end
    while start_index < start_index_end:
        start_index_lock.acquire()
        if start_index > start_index_end:
            start_index_lock.release()
        else:
            start_index += 1
            local_start_index = start_index
            start_index_lock.release()
            print(str(threading.current_thread().name) + "\t" + str(local_start_index))
            history_data = np_data[local_start_index:local_start_index+N_HISTORY]
            train_evaluator, train_scaler, test_evaluator, test_scaler = learn_on_history(history_data)
            save_to_dict_scaled(train_evaluator, test_evaluator, local_start_index)
            save_to_dict_original(train_evaluator, train_scaler, test_evaluator, test_scaler, local_start_index)

In [None]:
def save_to_file(result_dict: Dict[int, np.ndarray], file_name: str):
    save_dir = os.path.join(RESULT_ROOT_PATH, MODEL_NAME)
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    result_list=[result_dict[key] for key in sorted(result_dict.keys())]
    np_result=np.array(result_list)
    with open(os.path.join(save_dir, file_name), "w") as f:
        json.dump(np_result, f, indent=4, cls=NpEncoder)

> **Note:** The final result outputed to file is different from that in `offline_training`. The `online_training` result has one more dimension in result data. In this notebook, the result looks like `N_SEG x N_SAMPLES x N_PREDICT x1` where `N_SEG` is the number of segments, calculated as `ceil(LEN_DATA/N_HISTORY)`, `N_SAMPLES` is the number of training/test samples. This is caused by the sliding-window way of training. In the `offline_training` notebook, the result looks like `N_SAMPLES x N_PREDICT x 1` where `N_SAMPLES` is the sample number of each train/val/test set. 

In [None]:
data_file_list = get_data_file_list(dataset_path)
for file_name in data_file_list:
    
    np_index, np_data = read_dataset(os.path.join(dataset_path, file_name))
    np_data = np_data.reshape((-1, 1))
    
    start_index = -1
    start_index_end = len(np_data)-N_HISTORY
    
    train_gt_scaled = {}
    train_pd_scaled = {}
    test_gt_scaled = {}
    test_pd_scaled = {}

    train_gt_original = {}
    train_pd_original = {}
    test_gt_original = {}
    test_pd_original = {}
    
    thread_list = []
    for _ in range(N_THREAD):
        # use multi-threading training to improve the efficiency.
        thread_list.append(threading.Thread(target=run_learning, args=[np_data]))
    for t in thread_list:
        t.start()
    for t in thread_list:
        t.join()

    save_to_file(train_gt_scaled, file_name.split(".")[0]+"_gt_train_scaled.json")
    save_to_file(train_pd_scaled, file_name.split(".")[0]+"_pd_train_scaled.json")
    save_to_file(test_gt_scaled, file_name.split(".")[0]+"_gt_test_scaled.json")
    save_to_file(test_pd_scaled, file_name.split(".")[0]+"_pd_test_scaled.json")
    
    save_to_file(train_gt_original, file_name.split(".")[0]+"_gt_train_original.json")
    save_to_file(train_pd_original, file_name.split(".")[0]+"_pd_train_original.json")
    save_to_file(test_gt_original, file_name.split(".")[0]+"_gt_test_original.json")
    save_to_file(test_pd_original, file_name.split(".")[0]+"_pd_test_original.json")
    