<center>
  
## Predict, Refine, Synthesize: Self-Guiding Diffusion Models for Probabilistic Time Series Forecasting

</center>


The paper proposes TSDiff, an unconditionally-trained diffusion model for time series. TSDiff utilizes a self-guidance mechanism that allows it to conditionally generate forecasts, refine predictions, and produce synthetic data without requiring auxiliary networks or altering the training procedure.

Time series forecasting is crucial for making informed decisions in various fields such as finance, energy, and healthcare. Traditional deep learning models approach this problem through conditional generative modeling. The paper introduces TSDiff, an unconditional diffusion model for time series, which can handle multiple downstream tasks. The self-guidance mechanism allows TSDiff to perform predictive tasks during inference without conditional training. The model's generative capabilities are also leveraged to improve the accuracy of base forecasters and generate high-quality synthetic data.

In the following, we will take a deeper dive to the implementation of this method.

# Imports and Setup

In this section, we import all necessary libraries and modules required for setting up the environment. This includes libraries for logging, parsing arguments, handling file paths, and loading configurations. Additionally, we import essential packages for data loading, model creation, and training such as PyTorch, PyTorch Lightning, and GluonTS. Custom modules specific to the time series diffusion model (TSDiff) are also imported.

In [1]:
import logging
import argparse
from pathlib import Path

import yaml
import torch
from tqdm.auto import tqdm
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, RichProgressBar

from gluonts.dataset.loader import TrainDataLoader
from gluonts.dataset.split import OffsetSplitter
from gluonts.itertools import Cached
from gluonts.torch.batchify import batchify
from gluonts.evaluation import make_evaluation_predictions, Evaluator
from gluonts.dataset.field_names import FieldName

import uncond_ts_diff.configs as diffusion_configs
from uncond_ts_diff.dataset import get_gts_dataset
from uncond_ts_diff.model.callback import EvaluateCallback
from uncond_ts_diff.model import TSDiff
from uncond_ts_diff.sampler import DDPMGuidance, DDIMGuidance
from uncond_ts_diff.utils import (
    create_transforms,
    create_splitter,
    add_config_to_argparser,
    filter_metrics,
    MaskInput,
)

Falling back on slow Cauchy kernel. Install at least one of pykeops or the CUDA extension for efficiency.
Falling back on slow Vandermonde kernel. Install pykeops for improved memory efficiency.


# Load Configuration

Here, we set up the configuration for the model training. This involves loading the configuration file which contains parameters and settings needed for the training process. The configuration is read from a YAML file and parsed into a dictionary format. Logging is also configured in this section to record the training process.

In [10]:
guidance_map = {"ddpm": DDPMGuidance, "ddim": DDIMGuidance}

# Setup Logger
logging.basicConfig(
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("logger")
logger.setLevel(logging.INFO)

# Setup config
config_path = "configs/train_tsdiff/train_uber_tlc.yaml"
log_dir = "./"

with open(config_path, "r") as fp:
    config = yaml.safe_load(fp)

# Model Creation

**Denoising Diffusion Probabilistic Models (DDPMs):**
DDPMs model data generation as a discrete-time diffusion process with Gaussian transitions. The forward process gradually adds noise to the data, while the reverse process, learned by the model, removes this noise to generate data samples. The model is trained to approximate this reverse process by minimizing a simplified objective function.

**TSDiff Architecture:**
TSDiff is designed for univariate time series and uses S4 layers for temporal modeling. The architecture incorporates historical information by appending lagged time series along the channel dimension, allowing it to handle noisy inputs. The model's output dimensions match its input dimensions, making it suitable for unconditional generative tasks.

**Observation Self-Guidance:**
This mechanism enables TSDiff to perform conditional forecasting during inference. By leveraging the learned probability density, TSDiff can guide its predictions based on observed data points. Two variants are proposed: mean square self-guidance, which uses Gaussian distribution, and quantile self-guidance, which uses asymmetric Laplace distribution for better quantile-based evaluation.

**Prediction Refinement:**
TSDiff can iteratively refine the predictions of base forecasters by interpreting the implicit probability density as a prior. This refinement is done directly in the data space, providing a computationally efficient way to improve forecast accuracy without modifying the core forecasting model. Two approaches are presented: energy-based sampling using Langevin Monte Carlo and maximum likelihood optimization.

The following cells focuses on creating the TSDiff model based on the loaded configuration. A function create_model is defined which initializes the TSDiff model with parameters such as frequency, feature usage, normalization, context length, prediction length, and learning rate. The model is then moved to the specified device (CPU or GPU).

In [11]:
def create_model(config):
    model = TSDiff(
        **getattr(diffusion_configs, config["diffusion_config"]),
        freq=config["freq"],
        use_features=config["use_features"],
        use_lags=config["use_lags"],
        normalization=config["normalization"],
        context_length=config["context_length"],
        prediction_length=config["prediction_length"],
        lr=config["lr"],
        init_skip=config["init_skip"],
    )
    model.to(config["device"])
    return model

In [12]:
# Create model
model = create_model(config)

# Data Loading and Preprocessing

In this section, the dataset is loaded and preprocessed based on the configuration settings. The dataset's metadata is validated to ensure consistency with the expected frequency and prediction length. Depending on the setup (forecasting or missing values), the appropriate data split is performed. Transformations and data loaders are also set up to facilitate the training process.

In [13]:
# Load parameters
dataset_name = config["dataset"]
freq = config["freq"]
context_length = config["context_length"]
prediction_length = config["prediction_length"]
total_length = context_length + prediction_length



# Setup dataset and data loading
dataset = get_gts_dataset(dataset_name)
assert dataset.metadata.freq == freq
assert dataset.metadata.prediction_length == prediction_length

if config["setup"] == "forecasting":
    training_data = dataset.train
elif config["setup"] == "missing_values":
    missing_values_splitter = OffsetSplitter(offset=-total_length)
    training_data, _ = missing_values_splitter.split(dataset.train)

num_rolling_evals = int(len(dataset.test) / len(dataset.train))

transformation = create_transforms(
    num_feat_dynamic_real=0,
    num_feat_static_cat=0,
    num_feat_static_real=0,
    time_features=model.time_features,
    prediction_length=config["prediction_length"],
)

training_splitter = create_splitter(
    past_length=config["context_length"] + max(model.lags_seq),
    future_length=config["prediction_length"],
    mode="train",
)

# Model Training

This section sets up the training process for the TSDiff model. Various callbacks are configured to monitor and save the model during training. The trainer is then defined using PyTorch Lightning, specifying parameters such as the number of epochs, devices, and callbacks. The training process is started, logging the progress and completing the training of the model.

In [14]:
# Setup callbacks
callbacks = []
if config["use_validation_set"]:
    transformed_data = transformation.apply(training_data, is_train=True)
    train_val_splitter = OffsetSplitter(
        offset=-config["prediction_length"] * num_rolling_evals
    )
    _, val_gen = train_val_splitter.split(training_data)
    val_data = val_gen.generate_instances(
        config["prediction_length"], num_rolling_evals
    )

    callbacks = [
        EvaluateCallback(
            context_length=config["context_length"],
            prediction_length=config["prediction_length"],
            sampler=config["sampler"],
            sampler_kwargs=config["sampler_params"],
            num_samples=config["num_samples"],
            model=model,
            transformation=transformation,
            test_dataset=dataset.test,
            val_dataset=val_data,
            eval_every=config["eval_every"],
        )
    ]
else:
    transformed_data = transformation.apply(training_data, is_train=True)

log_monitor = "train_loss"
filename = dataset_name + "-{epoch:03d}-{train_loss:.3f}"

data_loader = TrainDataLoader(
    Cached(transformed_data),
    batch_size=config["batch_size"],
    stack_fn=batchify,
    transform=training_splitter,
    num_batches_per_epoch=config["num_batches_per_epoch"],
)

checkpoint_callback = ModelCheckpoint(
    save_top_k=3,
    monitor=f"{log_monitor}",
    mode="min",
    filename=filename,
    save_last=True,
    save_weights_only=True,
)

callbacks.append(checkpoint_callback)
callbacks.append(RichProgressBar())

In [15]:
# Define trainer
trainer = pl.Trainer(
    accelerator="gpu" if torch.cuda.is_available() else None,
    devices=[int(config["device"].split(":")[-1])],
    max_epochs=config["max_epochs"],
    enable_progress_bar=True,
    num_sanity_val_steps=0,
    callbacks=callbacks,
    default_root_dir=log_dir,
    gradient_clip_val=config.get("gradient_clip_val", None),
)

  rank_zero_warn(
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [None]:
logger.info(f"Logging to {trainer.logger.log_dir}")
trainer.fit(model, train_dataloaders=data_loader)
logger.info("Training completed.")

2024-06-26 09:40:01,848 - logger - INFO - Logging to ./lightning_logs/version_7
  rank_zero_warn(
You are using a CUDA device ('NVIDIA A40') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Output()

# Model Evaluation

After the training is completed, the model is evaluated on the test dataset. A function evaluate_guidance is defined to assess the model's performance using different sampling techniques. This involves generating forecasts, applying transformations, and calculating metrics to evaluate the accuracy of the predictions. The best model checkpoint is loaded, and the evaluation results are saved for further analysis.

In [None]:
# Function to evaluate guidance
def evaluate_guidance(
    config, model, test_dataset, transformation, num_samples=100
):
    logger.info(f"Evaluating with {num_samples} samples.")
    results = []
    if config["setup"] == "forecasting":
        missing_data_kwargs_list = [
            {
                "missing_scenario": "none",
                "missing_values": 0,
            }
        ]
        config["missing_data_configs"] = missing_data_kwargs_list
    elif config["setup"] == "missing_values":
        missing_data_kwargs_list = config["missing_data_configs"]
    else:
        raise ValueError(f"Unknown setup {config['setup']}")

    Guidance = guidance_map[config["sampler"]]
    sampler_kwargs = config["sampler_params"]
    for missing_data_kwargs in missing_data_kwargs_list:
        logger.info(
            f"Evaluating scenario '{missing_data_kwargs['missing_scenario']}' "
            f"with {missing_data_kwargs['missing_values']:.1f} missing_values."
        )
        sampler = Guidance(
            model=model,
            prediction_length=config["prediction_length"],
            num_samples=num_samples,
            **missing_data_kwargs,
            **sampler_kwargs,
        )

        transformed_testdata = transformation.apply(
            test_dataset, is_train=False
        )
        test_splitter = create_splitter(
            past_length=config["context_length"] + max(model.lags_seq),
            future_length=config["prediction_length"],
            mode="test",
        )

        masking_transform = MaskInput(
            FieldName.TARGET,
            FieldName.OBSERVED_VALUES,
            config["context_length"],
            missing_data_kwargs["missing_scenario"],
            missing_data_kwargs["missing_values"],
        )
        test_transform = test_splitter + masking_transform

        predictor = sampler.get_predictor(
            test_transform,
            batch_size=1280 // num_samples,
            device=config["device"],
        )
        forecast_it, ts_it = make_evaluation_predictions(
            dataset=transformed_testdata,
            predictor=predictor,
            num_samples=num_samples,
        )
        forecasts = list(tqdm(forecast_it, total=len(transformed_testdata)))
        tss = list(ts_it)
        evaluator = Evaluator()
        metrics, _ = evaluator(tss, forecasts)
        metrics = filter_metrics(metrics)
        results.append(dict(**missing_data_kwargs, **metrics))

    return results

In [None]:
# Evaluate the model and save results
best_ckpt_path = Path(trainer.logger.log_dir) / "best_checkpoint.ckpt"

if not best_ckpt_path.exists():
    torch.save(
        torch.load(checkpoint_callback.best_model_path)["state_dict"],
        best_ckpt_path,
    )
logger.info(f"Loading {best_ckpt_path}.")
best_state_dict = torch.load(best_ckpt_path)
model.load_state_dict(best_state_dict, strict=True)

metrics = (
    evaluate_guidance(config, model, dataset.test, transformation)
    if config.get("do_final_eval", True)
    else "Final eval not performed"
)

with open(Path(trainer.logger.log_dir) / "results.yaml", "w") as fp:
    yaml.dump(
        {
            "config": config,
            "version": trainer.logger.version,
            "metrics": metrics,
        },
        fp,
    )

## References

**Kollovieh, Marcel, et al.** "Predict, refine, synthesize: Self-guiding diffusion models for probabilistic time series forecasting." *Advances in Neural Information Processing Systems* 36 (2024).

**GitHub Repository:** [Amazon Science - Unconditional Time Series Diffusion](https://github.com/amazon-science/unconditional-time-series-diffusion)