In [None]:
import os
import sys
current_path = os.path.abspath(os.getcwd())
PROJECT_DIR = os.path.abspath("../..")
print(PROJECT_DIR)
os.chdir(PROJECT_DIR)
sys.path.append(PROJECT_DIR)
from joblib import Parallel, delayed
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

In [None]:
from typing import Dict
import torch
from pmdarima import auto_arima
from torch.utils.data import DataLoader
import numpy as np
from basicts.utils import load_pkl, get_regular_settings
from basicts.data import TimeSeriesForecastingDataset
from basicts.metrics import masked_mae, masked_rmse, masked_mape, masked_mse
from basicts.scaler import ZScoreScaler
import pandas as pd

## Hyper-parameters

In [None]:
# construct configs
dataset_name = "ETTh1"

regular_settings = get_regular_settings(dataset_name)

input_len = 96 #regular_settings['INPUT_LEN']
output_len = 96 #regular_settings['OUTPUT_LEN']
rescale = regular_settings['RESCALE']
null_val = regular_settings['NULL_VAL']
norm_each_channel = regular_settings['NORM_EACH_CHANNEL']
train_val_test_ratio = regular_settings['TRAIN_VAL_TEST_RATIO']

target_time_series = None # for subset forecasting

gpu_num = 1
batch_size = 128 # only used for collecting data

# MA params
params = {
    'start_p': 0,            # The MA model is unrelated to it. Lower bound of parameter p (int). Default = 2
    'd': 0,                  # The MA model is unrelated to it. Non-seasonal differencing order (int). Default = None (auto selection, but increases runtime)
    'start_q': 1,            # Lower bound of parameter q (int). Default = 2
    'max_p': 0,              # The MA model is unrelated to it. Upper bound of parameter p (int). Default = 5 (must be >= start_p)
    'max_q': 5,              #  Upper bound of parameter q (int). Default = 5 (must be >= start_q)
    'D': None,               # Seasonal differencing order (int). Default = None (auto selection)
    'stationary': True,      # The MA model assumes that time series are stationary. Whether the time series is stationary (bool). Default = False
    'transparams':True,
    'seasonal': False,       # The MA model is unrelated to it. Whether to fit seasonal ARIMA (bool). Default = True (set to False if m=1)
    'm': 1,                  # The number of periods in each season (int). Default = 1 (e.g. 12 for monthly data, 4 for quarterly)
    'error_action': 'ignore',# How to handle errors (str). Default = 'trace'. Options: 'warn','raise','ignore','trace'
    'suppress_warnings': True,# Whether to suppress warnings (bool). Default = True
    'stepwise': True,        # Whether to use stepwise search algorithm (bool). Default = True (faster, but may miss best model)
    'n_jobs': 1,              # Number of models to fit in parallel (int). Default = 1. If -1, use all processors
    
}

## Construct Dataset

In [None]:
test_set = TimeSeriesForecastingDataset(dataset_name=dataset_name, input_len=input_len, output_len=output_len, train_val_test_ratio=train_val_test_ratio, mode="test")
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

scaler = ZScoreScaler(dataset_name=dataset_name, train_ratio=train_val_test_ratio[0], norm_each_channel=norm_each_channel, rescale=rescale)

In [None]:
Xs_test = []
Ys_test = []

def preprocessing(input_data, scaler, target_time_series) -> Dict:
    if scaler is not None:
        input_data['target'] = scaler.transform(input_data['target'])
        input_data['inputs'] = scaler.transform(input_data['inputs'])
    if target_time_series is not None:
        input_data['target'] = input_data['target'][:, :, target_time_series, :]
        input_data['inputs'] = input_data['inputs'][:, :, target_time_series, :]
    return input_data

# Local forecasting
# Test dataset only
for i, iter_data in enumerate(test_loader):
    iter_data = preprocessing(iter_data, scaler=scaler, target_time_series=target_time_series)
    inputs, target = iter_data['inputs'], iter_data['target']
    Xs_test.append(inputs)
    Ys_test.append(target)


Xs_test = torch.cat(Xs_test, dim=0)[..., [0]]
Xs_test = Xs_test[::input_len,:,:,:] # The time complexity of method is so high that we have to sample at intervals to test its performance.
Ys_test = torch.cat(Ys_test, dim=0)[..., [0]]
Ys_test = Ys_test[::input_len,:,:,:]

In [None]:
def reshape(data):
    B, L, N, C = data.shape
    data = data[..., 0].transpose(1, 2)
    return data

In [None]:
B, N, L = reshape(Xs_test).shape
B, N, L2 = reshape(Ys_test).shape
result = np.zeros((B, N, L2))
Xs_test = reshape(Xs_test).numpy()

## Train (Direct Multi-Step Forecasting)

In [None]:
# Fitting and Forecasting Functions
def fit_and_forecast(i, j, params):
    data = Xs_test[i, j, :] + 0.00001 # overcome the zero issue
    warnings.filterwarnings("ignore", category=RuntimeWarning)
    
    estimator = auto_arima(data, **params)
    # Fitting
    models = estimator.fit(data)
    # Forecasting
    forecast = models.predict(n_periods=L2) - 0.00001
    return i, j, forecast

print(Xs_test.shape)
# Create task list
tasks = [(i, j) for i in range(B) for j in range(N)]
results = Parallel(n_jobs=-1)(
    delayed(fit_and_forecast)(i, j, params) for i, j in tqdm(tasks, desc="Processing batches and series")
)

for i, j, forecast in results:
    result[i, j, :] = forecast

## Test (Direct Multi-Step Forecasting)

In [None]:
Ys_test = reshape(Ys_test)
B, N, L = Ys_test.shape
prediction = torch.tensor(result).reshape(B, N, L)
real_value = Ys_test

In [None]:
# print results
print("MAE: ", masked_mae(prediction, real_value, null_val).item())
print("RMSE: ", masked_rmse(prediction, real_value, null_val).item())
print("MSE: ", masked_mse(prediction, real_value, null_val).item())
print("MAPE: {:.2f}%".format(masked_mape(prediction, real_value, null_val) * 100))

##  Visualization

In [None]:
import matplotlib.pyplot as plt
import numpy as np

print(prediction.shape)
B_index = 0  # Select an index in dimension B
N_index = 0  # Select an index in dimension N

predicted_values = prediction[B_index, N_index, :]  # shape: (L,)
true_values = Ys_test[B_index, N_index, :]  # shape: (L,)

plt.figure(figsize=(10, 6))
plt.plot(predicted_values, label='Predicted', color='blue', linestyle='--')
plt.plot(true_values, label='True', color='red', linestyle='-')
plt.xlabel('Time Step (L)')
plt.ylabel('Value')
plt.title(f'Prediction vs True Values (B={B_index}, N={N_index})')
plt.legend()
plt.grid(True)
plt.show()