### Import data

In [1]:
from datasets import load_dataset

CHRONOS = "autogluon/chronos_datasets"
DATASET_CHRONOS = [
    "dominick",
    "ercot",
    "exchange_rate",
    #"monash_m3_monthly"
]

for dataset in DATASET_CHRONOS:
    ds = load_dataset(CHRONOS, dataset, trust_remote_code=True)
    display(ds)

DatasetDict({
    train: Dataset({
        features: ['id', 'timestamp', 'target', 'im_0'],
        num_rows: 100014
    })
})

DatasetDict({
    train: Dataset({
        features: ['id', 'timestamp', 'target'],
        num_rows: 8
    })
})

DatasetDict({
    train: Dataset({
        features: ['id', 'timestamp', 'target'],
        num_rows: 8
    })
})

### Evaluation metrics

In [2]:
from gluonts.ev.metrics import (
    MSE,
    MAE,
    MASE,
    MAPE,
    SMAPE,
    MSIS,
    RMSE,
    NRMSE,
    ND,
    MeanWeightedSumQuantileLoss,
)

# Instantiate the metrics
metrics = [
    MSE(forecast_type="mean"),
    MSE(forecast_type=0.5),
    MAE(),
    MASE(),
    MAPE(),
    SMAPE(),
    MSIS(),
    RMSE(),
    NRMSE(),
    ND(),
    MeanWeightedSumQuantileLoss(
        quantile_levels=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
    ),
]

### Results file

In [3]:
import csv
import os

# Ensure the output directory exists
output_dir = "results"
os.makedirs(output_dir, exist_ok=True)

# Define the path for the CSV file
csv_file_path = os.path.join(output_dir, "chronos_data_results.csv")

with open(csv_file_path, "w", newline="") as csvfile:
    writer = csv.writer(csvfile)

    # Write the header
    writer.writerow(
        [
            "dataset",
            "model",
            "eval_metrics/MSE[mean]",
            "eval_metrics/MSE[0.5]",
            "eval_metrics/MAE[0.5]",
            "eval_metrics/MASE[0.5]",
            "eval_metrics/MAPE[0.5]",
            "eval_metrics/sMAPE[0.5]",
            "eval_metrics/MSIS",
            "eval_metrics/RMSE[mean]",
            "eval_metrics/NRMSE[mean]",
            "eval_metrics/ND[0.5]",
            "eval_metrics/mean_weighted_sum_quantile_loss",
            "domain",
            "num_variates",
        ]
    )

## Evaluation

### Chronos

In [None]:
from dataclasses import dataclass, field
from typing import List, Optional

import numpy as np
import torch
from chronos import BaseChronosPipeline, ForecastType
from gluonts.itertools import batcher
from gluonts.model import Forecast
from gluonts.model.forecast import QuantileForecast, SampleForecast
from tqdm.auto import tqdm

import logging


@dataclass
class ModelConfig:
    quantile_levels: Optional[List[float]] = None
    forecast_keys: List[str] = field(init=False)
    statsforecast_keys: List[str] = field(init=False)
    intervals: Optional[List[int]] = field(init=False)

    def __post_init__(self):
        self.forecast_keys = ["mean"]
        self.statsforecast_keys = ["mean"]
        if self.quantile_levels is None:
            self.intervals = None
            return

        intervals = set()

        for quantile_level in self.quantile_levels:
            interval = round(200 * (max(quantile_level, 1 - quantile_level) - 0.5))
            intervals.add(interval)
            side = "hi" if quantile_level > 0.5 else "lo"
            self.forecast_keys.append(str(quantile_level))
            self.statsforecast_keys.append(f"{side}-{interval}")

        self.intervals = sorted(intervals)

class ChronosPredictor:
    def __init__(
        self,
        model_path,
        num_samples: int,
        prediction_length: int,
        *args,
        **kwargs,
    ):
        print("prediction_length:", prediction_length)
        self.pipeline = BaseChronosPipeline.from_pretrained(
            model_path,
            *args,
            **kwargs,
        )
        self.prediction_length = prediction_length
        self.num_samples = num_samples

    def predict(self, test_data_input, batch_size: int = 1024) -> List[Forecast]:
        pipeline = self.pipeline
        predict_kwargs = (
            {"num_samples": self.num_samples}
            if pipeline.forecast_type == ForecastType.SAMPLES
            else {}
        )
        while True:
            try:
                # Generate forecast samples
                forecast_outputs = []
                for batch in tqdm(batcher(test_data_input, batch_size=batch_size)):
                    context = [torch.tensor(entry["target"]) for entry in batch]
                    forecast_outputs.append(
                        pipeline.predict(
                            context,
                            prediction_length=self.prediction_length,
                            **predict_kwargs,
                        ).numpy()
                    )
                forecast_outputs = np.concatenate(forecast_outputs)
                break
            except torch.cuda.OutOfMemoryError:
                print(
                    f"OutOfMemoryError at batch_size {batch_size}, reducing to {batch_size // 2}"
                )
                batch_size //= 2

        # Convert forecast samples into gluonts Forecast objects
        forecasts = []
        for item, ts in zip(forecast_outputs, test_data_input):
            forecast_start_date = ts["start"] + len(ts["target"])

            if pipeline.forecast_type == ForecastType.SAMPLES:
                forecasts.append(
                    SampleForecast(samples=item, start_date=forecast_start_date)
                )
            elif pipeline.forecast_type == ForecastType.QUANTILES:
                forecasts.append(
                    QuantileForecast(
                        forecast_arrays=item,
                        forecast_keys=list(map(str, pipeline.quantiles)),
                        start_date=forecast_start_date,
                    )
                )

        return forecasts


class WarningFilter(logging.Filter):
    def __init__(self, text_to_filter):
        super().__init__()
        self.text_to_filter = text_to_filter

    def filter(self, record):
        return self.text_to_filter not in record.getMessage()


gts_logger = logging.getLogger("gluonts.model.forecast")
gts_logger.addFilter(
    WarningFilter("The mean prediction is not stored in the forecast data")
)

In [None]:
model_name = "chronos_bolt_small" # TODO: change to "chronos_t5_base" for the original Chronos model

model_path="amazon/chronos-bolt-small",
# TODO: use "amazon/chronos-t5-base" for the corresponding original Chronos model
# "amazon/chronos-bolt-tiny", "amazon/chronos-bolt-mini", "amazon/chronos-bolt-small", "amazon/chronos-bolt-base",
# "amazon/chronos-t5-tiny", "amazon/chronos-t5-mini", "amazon/chronos-t5-small",
# "amazon/chronos-t5-base", "amazon/chronos-t5-large",

In [None]:
ds["train"]

Dataset({
    features: ['id', 'timestamp', 'target'],
    num_rows: 8
})

In [None]:
from gluonts.model import evaluate_model, evaluate_forecasts
from gluonts.time_feature import get_seasonality

for ds_name in DATASET_CHRONOS:
        print(f"Processing dataset: {ds_name}")

        dataset = load_dataset(CHRONOS, ds_name, trust_remote_code=True)
        season_length = get_seasonality(dataset.freq)
        prediction_length = dataset["train"].metadata.prediction_length
        dataset_properties_map = {
            "ercot": {"num_variates": 1},
            "exchange_rate": {"num_variates": 8},
            "dominick": {"num_variates": 1},
            # "monash_m3_monthly": {"num_variates": 1},
        }

        print(f"Dataset size: {len(dataset.test_data)}")
        predictor = ChronosPredictor(
            model_path=model_path,
            num_samples=20,
            prediction_length=dataset.prediction_length,
            # Change device_map to "cpu" to run on CPU or "cuda" to run on GPU
            device_map="cpu",
        )

        # Make predictions
        predictions = predictor.predict(
            test_data_input=dataset["train"],
            batch_size=512,
        )

        # Evaluate the predictions
        res = evaluate_forecasts(
            predictions=predictions,
            test_data=dataset["train"],
            metrics=metrics,
            prediction_length=dataset.prediction_length,
            season_length=season_length,
            quantiles=[0.5],
            num_samples=20,
            num_quantiles=10,
            num_samples_per_quantile=20,
        )

        # Append the results to the CSV file
        with open(csv_file_path, "a", newline="") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(
                [
                    ds_name,
                    model_name,
                    res["MSE[mean]"][0],
                    res["MSE[0.5]"][0],
                    res["MAE[0.5]"][0],
                    res["MASE[0.5]"][0],
                    res["MAPE[0.5]"][0],
                    res["sMAPE[0.5]"][0],
                    res["MSIS"][0],
                    res["RMSE[mean]"][0],
                    res["NRMSE[mean]"][0],
                    res["ND[0.5]"][0],
                    res["mean_weighted_sum_quantile_loss"][0],
                    ds_name,
                    dataset_properties_map[ds_name]["num_variates"],
                ]
            )

        print(f"Results for {ds_name} have been written to {csv_file_path}")