In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import torch

from gluonts.dataset.repository.datasets import get_dataset
from gluonts.dataset.common import ListDataset
from gluonts.evaluation import make_evaluation_predictions, Evaluator

# from gluonts.torch.model.lag_tst.estimator import LagTSTEstimator
from gluonts.torch.distributions import NegativeBinomialOutput
from gluonts.torch.modules.loss import NegativeLogLikelihood

from LagTST import LagTSTEstimator

# from pts.modules import StudentTOutput

import random
import numpy as np
import time
import optuna
from optuna.samplers import TPESampler

In [2]:
class LagTSTObjective:
    def __init__(self, dataset, metric_type="mean_wQuantileLoss"):
        self.dataset = dataset
        self.metric_type = metric_type

    def get_params(self, trial) -> dict:
        return {
            "context_length": trial.suggest_int(
                "context_length",
                dataset.metadata.prediction_length,
                dataset.metadata.prediction_length * 5,
                1,
            ),
            "batch_size": trial.suggest_int("batch_size", 32, 256, 32),
            "d_model": trial.suggest_int("d_model", 16, 64, 16),
            "dim": trial.suggest_int("dim", 16, 64, 16),
            "patch_size": trial.suggest_int("patch_size", 2, 16, 4),
            "kernel_size": trial.suggest_int("kernel_size", 9, 18, 3),
        }

    def __call__(self, trial):
        params = self.get_params(trial)

        estimator = LagTSTEstimator(
            prediction_length=self.dataset.metadata.prediction_length,
            context_length=params["context_length"],
            freq=dataset.metadata.freq,
            scaling="std",
            # distr_output=NegativeBinomialOutput(),
            # loss=NegativeLogLikelihood(beta=0.2),
            d_model=params["d_model"],
            dim_feedforward=params["dim"],
            batch_size=params["batch_size"],
            patch_reverse_mapping_layer="mlp",
            num_batches_per_epoch=100,
            trainer_kwargs=dict(accelerator="gpu", max_epochs=30),
        )
        predictor = estimator.train(
            training_data=self.dataset.train,
            cache_data=True,
            shuffle_buffer_length=1024,
            validation_data=self.dataset.test,
        )

        forecast_it, ts_it = make_evaluation_predictions(
            dataset=dataset.test,
            predictor=predictor,
        )
        forecasts = list(forecast_it)
        # if layer == layers[0]:
        tss = list(ts_it)
        evaluator = Evaluator()
        agg_metrics, _ = evaluator(iter(tss), iter(forecasts))
        return agg_metrics[self.metric_type]

In [3]:
dataset = get_dataset(
    "solar-energy", regenerate=False
)  # dataset = get_dataset("electricity")

In [None]:
seed = 42
random.seed(seed)
torch.manual_seed(seed)
start_time = time.time()
sampler = TPESampler(seed=seed)
study = optuna.create_study(sampler=sampler, direction="minimize")
study.optimize(LagTSTObjective(dataset), n_trials=10)

print("Number of finished trials: {}".format(len(study.trials)))

print("Best trial:")
trial = study.best_trial

print("  Value: {}".format(trial.value))
print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))
print(time.time() - start_time)