# Train on synthetic data, test on real data

In this notebook, we go over the implementation and evaluation of training downstream models using synthetic data generated by TSDiff. Diffusion models have recently demonstrated state-of-the-art performance in various generative modeling tasks across different domains. Traditional approaches in time series diffusion models often focus on conditional models tailored for specific tasks such as forecasting or imputation. However, we explore the potential of task-agnostic, unconditional diffusion models for broader time series applications.

TSDiff is an unconditionally-trained diffusion model designed for time series data. A key feature of TSDiff is its self-guidance mechanism, which allows it to conditionally perform various downstream tasks during inference without requiring any changes to the training process or additional auxiliary networks. This capability makes TSDiff highly versatile and effective across different time series tasks.

This notebook includes step-by-step instructions and code for running TSDiff experiments, generating synthetic data, and training downstream models using this synthetic data. By the end of this notebook, you will have a comprehensive understanding of how to leverage TSDiff for generating high-quality synthetic time series data and how to evaluate its effectiveness.

Let's get started by setting up the environment and defining the necessary configurations for running our experiments.

# Imports and Setup

In this section, we import all necessary libraries and modules required for setting up the environment. This includes libraries for logging, handling file paths, and loading configurations. Additionally, we import essential packages for data loading, model inference such as PyTorch, PyTorch Lightning, and GluonTS. Custom modules specific to the time series diffusion model (TSDiff) are also imported.

In [1]:
import sys
sys.path.append('src')

In [2]:
from functools import partial
import math
import logging
import argparse
from pathlib import Path

import yaml
import torch
import numpy as np
from tqdm.auto import tqdm

from gluonts.mx import DeepAREstimator, TransformerEstimator
from gluonts.evaluation import Evaluator
from gluonts.dataset.loader import TrainDataLoader
from gluonts.itertools import Cached
from gluonts.torch.batchify import batchify
from gluonts.time_feature import (
    get_lags_for_frequency,
    time_features_from_frequency_str,
)
from gluonts.dataset.split import slice_data_entry
from gluonts.transform import AdhocTransform, Chain

from uncond_ts_diff.utils import (
    ScaleAndAddMeanFeature,
    ScaleAndAddMinMaxFeature,
    GluonTSNumpyDataset,
    create_transforms,
    create_splitter,
    get_next_file_num,
    add_config_to_argparser,
    make_evaluation_predictions_with_scaling,
    filter_metrics,
)
from uncond_ts_diff.model import TSDiff, LinearEstimator
from uncond_ts_diff.dataset import get_gts_dataset
import uncond_ts_diff.configs as diffusion_configs

Falling back on slow Cauchy kernel. Install at least one of pykeops or the CUDA extension for efficiency.
Falling back on slow Vandermonde kernel. Install pykeops for improved memory efficiency.


# Setup Logger and Configuration

This section sets up the logging configuration, loads the experiment configuration from a YAML file, defines the output directory, and specifies the models to be used in the downstream tasks.

In [5]:
# Setup Logger
logging.basicConfig(
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("logger")
logger.setLevel(logging.INFO)

out_dir = "./results"
config_path = "configs/tstr/uber_tlc_hourly.yaml"

with open(config_path, "r") as fp:
    config = yaml.safe_load(fp)

config["ckpt"] = "/projects/aieng/diffusion_bootcamp/models/time-seris/tsdiff/lightning_logs/version_2/best_checkpoint.ckpt"
DOWNSTREAM_MODELS = ["linear", "deepar", "transformer"]

In [6]:
config

{'ckpt': '/projects/aieng/diffusion_bootcamp/models/time-seris/tsdiff/lightning_logs/version_2/best_checkpoint.ckpt',
 'context_length': 336,
 'dataset': 'uber_tlc_hourly',
 'device': 'cuda:0',
 'diffusion_config': 'diffusion_small_config',
 'init_skip': False,
 'prediction_length': 24,
 'scaling_type': 'mean',
 'use_features': False,
 'use_lags': True}

# Loading the model

The `load_model` function is responsible for initializing and loading the TSDiff model which was trained previously based on the provided configuration. Below is a description of this function:

- The `TSDiff` model is initialized with various parameters extracted from the `config` dictionary.
- `getattr` is used to fetch the appropriate diffusion configuration from `diffusion_configs` based on the value provided in `config["diffusion_config"]`. If not specified, it defaults to `"diffusion_small_config"`.

This function encapsulates the process of configuring, initializing, loading, and preparing the TSDiff model based on the specified configurations, making it a crucial step in the experimental pipeline.

In [7]:
def load_model(config):
    model = TSDiff(
        **getattr(
            diffusion_configs,
            config.get("diffusion_config", "diffusion_small_config"),
        ),
        freq=config["freq"],
        use_features=config["use_features"],
        use_lags=config["use_lags"],
        normalization="mean",
        context_length=config["context_length"],
        prediction_length=config["prediction_length"],
        init_skip=config["init_skip"],
    )
    model.load_state_dict(
        torch.load(config["ckpt"], map_location="cpu"),
        strict=True,
    )
    model = model.to(config["device"])
    return model

# Generate synthetic data

The `sample_synthetic` function generates synthetic data samples using the TSDiff model. Below is a description of this function:

- `model`: An instance of the TSDiff model used for generating synthetic samples.
- `num_samples` (default: 10,000): The total number of synthetic samples to generate.
- `batch_size` (default: 1,000): The number of samples to generate in each batch.

This function efficiently generates the specified number of synthetic samples in batches, ensuring that the memory usage is optimized.

In [8]:
def sample_synthetic(
    model: TSDiff,
    num_samples: int = 10_000,
    batch_size: int = 1000,
):
    synth_samples = []

    n_iters = math.ceil(num_samples / batch_size)
    for _ in tqdm(range(n_iters)):
        samples = model.sample_n(num_samples=batch_size)
        synth_samples.append(samples)

    synth_samples = np.concatenate(synth_samples, axis=0)[:num_samples]

    return synth_samples

# Sample real data

The `sample_real` function extracts real data samples from a given data loader to test the forecasters which are trained on synthetic data. Below is a description of this function:

- `data_loader`: An iterable data loader that provides batches of real data.
- `n_timesteps`: The number of time steps to include in each sample.
- `num_samples` (default: 10,000): The total number of real samples to extract.
- `batch_size` (default: 1,000): The number of samples to extract in each batch.

This function efficiently extracts the specified number of real samples in batches, ensuring that the process handles data loading smoothly and is scalable for large datasets.

In [9]:
def sample_real(
    data_loader,
    n_timesteps: int,
    num_samples: int = 10_000,
    batch_size: int = 1000,
):
    real_samples = []
    data_iter = iter(data_loader)
    n_iters = math.ceil(num_samples / batch_size)
    for _ in tqdm(range(n_iters)):
        try:
            batch = next(data_iter)
        except StopIteration:
            data_iter = iter(data_loader)
            batch = next(data_iter)
        ts = np.concatenate(
            [batch["past_target"], batch["future_target"]], axis=-1
        )[:, -n_timesteps:]
        real_samples.append(ts)

    real_samples = np.concatenate(real_samples, axis=0)[:num_samples]

    return real_samples

# Evaluation

The `evaluate` function evaluates the performance of a time series prediction model using a test dataset. Below is a description of this function:

#### Parameters:
- `tstr_predictor`: The model used for making time series predictions.
- `test_dataset`: The dataset on which the model is evaluated.
- `context_length`: The length of the context window used for making predictions.
- `prediction_length`: The length of the prediction window.
- `num_samples` (default: 100): The number of samples to generate for each prediction.
- `scaling_type` (default: "mean"): The type of scaling applied to the data. Options are "mean" or "min-max".

This function provides a comprehensive evaluation of the model by slicing the test dataset, generating predictions, and computing performance metrics, making it a crucial component for assessing the model's accuracy and effectiveness.

In [10]:
def evaluate(
    tstr_predictor,
    test_dataset,
    context_length,
    prediction_length,
    num_samples=100,
    scaling_type="mean",
):
    total_length = context_length + prediction_length
    # Slice test set to be of the same length as context_length + prediction_length
    slice_func = partial(slice_data_entry, slice_=slice(-total_length, None))
    if scaling_type == "mean":
        ScaleAndAddScaleFeature = ScaleAndAddMeanFeature
    elif scaling_type == "min-max":
        ScaleAndAddScaleFeature = ScaleAndAddMinMaxFeature
    transformation = Chain(
        [
            AdhocTransform(slice_func),
            # Add scale to data entry for use later during evaluation
            ScaleAndAddScaleFeature("target", "scale", prediction_length),
        ]
    )
    sliced_test_set = transformation.apply(test_dataset)

    fcst_iter, ts_iter = make_evaluation_predictions_with_scaling(
        dataset=sliced_test_set,
        predictor=tstr_predictor,
        num_samples=num_samples,
        scaling_type=scaling_type,
    )
    evaluator = Evaluator()
    metrics, _ = evaluator(list(ts_iter), list(fcst_iter))
    return filter_metrics(metrics)

# Train and evaluate

The `train_and_evaluate` function trains a model on synthetic data and evaluates its performance on a real dataset. Below is a description of this function:

#### Parameters:
- `dataset`: The dataset object containing metadata and test data for evaluation.
- `model_name`: The name of the model to be trained and evaluated. Options are "linear", "deepar", or "transformer".
- `synth_samples`: The synthetic data samples used for training the model.
- `real_samples`: The real data samples used for evaluation.
- `config`: A dictionary containing the configuration settings, including context length and prediction length.
- `scaling_type` (default: "mean"): The type of scaling applied to the data. Options are "mean" or "min-max".

This function integrates the processes of training a model on synthetic data and evaluating its performance on real data, providing a comprehensive assessment of the model's effectiveness in handling synthetic sequences.

In [11]:
def train_and_evaluate(
    dataset,
    model_name,
    synth_samples,
    real_samples,
    config,
    scaling_type="mean",
):
    # NOTE: There's no notion of time for synthetic time series,
    # they are just "sequences".
    # A dummy timestamp is used for start time in synthetic time series.
    # Hence, time_features are set to [] in the models below.
    model_name = model_name.lower()
    freq = dataset.metadata.freq
    context_length = config["context_length"]
    prediction_length = config["prediction_length"]
    total_length = context_length + prediction_length

    assert len(synth_samples) == len(real_samples)
    assert (
        synth_samples.shape[-1] == total_length
        and real_samples.shape[-1] == total_length
    )
    num_samples = len(real_samples)

    synthetic_dataset = GluonTSNumpyDataset(synth_samples)

    if model_name == "linear":
        logger.info(f"Running TSTR for {model_name}")
        tstr_predictor = LinearEstimator(
            freq=freq,  # Not actually used in the estimator
            prediction_length=prediction_length,
            context_length=context_length,
            num_train_samples=num_samples,
            # Synthetic dataset is in the "scaled space"
            scaling=False,
        ).train(synthetic_dataset)
    elif model_name == "deepar":
        logger.info(f"Running TSTR for {model_name}")
        tstr_predictor = DeepAREstimator(
            freq=freq,
            prediction_length=prediction_length,
            # Synthetic dataset is in the "scaled space"
            scaling=False,
            time_features=[],
            lags_seq=get_lags_for_frequency(freq, lag_ub=context_length),
        ).train(synthetic_dataset)
    elif model_name == "transformer":
        logger.info(f"Running TSTR for {model_name}")
        tstr_predictor = TransformerEstimator(
            freq=freq,
            prediction_length=prediction_length,
            # Synthetic dataset is in the "scaled space"
            scaling=False,
            time_features=[],
            lags_seq=get_lags_for_frequency(freq, lag_ub=context_length),
        ).train(synthetic_dataset)

    tstr_metrics = evaluate(
        tstr_predictor=tstr_predictor,
        test_dataset=dataset.test,
        context_length=context_length,
        prediction_length=prediction_length,
        scaling_type=scaling_type,
    )

    return dict(
        tstr_metrics=tstr_metrics,
    )

# Run the experiment

The `run_experiment` function orchestrates the process of training and evaluating time series models using synthetic and real data. Below is a detailed description of the steps and parameters involved in this function:

#### Parameters:
- `config` (dict): A dictionary containing the configuration settings, including dataset name, context length, and prediction length.
- `log_dir` (str): The directory where logs and results will be saved.
- `samples_path` (str): The path to pre-generated synthetic samples. If `None`, synthetic samples will be generated within the function.

This function coordinates the end-to-end process of preparing data, training models on synthetic data, evaluating their performance on real data, and logging the results, making it an essential component for conducting experiments.

In [29]:
def run_experiment(config: dict, log_dir: str, samples_path: str):
    # Read global parameters
    dataset_name = config["dataset"]
    context_length = config["context_length"]
    prediction_length = config["prediction_length"]

    # Create log_dir
    log_dir: Path = Path(log_dir)
    base_dirname = "tstr_log"
    run_num = get_next_file_num(
        base_dirname, log_dir, file_type="", separator="-"
    )
    log_dir = log_dir / f"{base_dirname}-{run_num}"
    log_dir.mkdir(exist_ok=True, parents=True)
    logger.info(f"Logging to {log_dir}")

    # Load dataset and model
    logger.info("Loading model")
    dataset = get_gts_dataset(dataset_name)
    config["freq"] = dataset.metadata.freq
    assert prediction_length == dataset.metadata.prediction_length

    model = load_model(config)

    # Setup data transformation and loading
    transformation = create_transforms(
        num_feat_dynamic_real=0,
        num_feat_static_cat=0,
        num_feat_static_real=0,
        time_features=time_features_from_frequency_str(config["freq"]),
        prediction_length=prediction_length,
    )
    transformed_data = transformation.apply(list(dataset.train), is_train=True)
    training_splitter = create_splitter(
        past_length=context_length + max(model.lags_seq),
        future_length=prediction_length,
        mode="train",
    )
    train_dataloader = TrainDataLoader(
        Cached(transformed_data),
        batch_size=1000,
        stack_fn=batchify,
        transform=training_splitter,
    )

    # Generate real samples
    logger.info("Generating real samples")
    real_samples = sample_real(
        train_dataloader,
        n_timesteps=context_length + prediction_length,
        num_samples=10000,
    )
    np.save(log_dir / "real_samples.npy", real_samples)

    if samples_path is None:
        # Generate synthetic samples
        logger.info("Generating synthetic samples")
        synth_samples = sample_synthetic(model, num_samples=10000)
        np.save(log_dir / "synth_samples.npy", synth_samples)
    else:
        logger.info(f"Using synthetic samples from {samples_path}")
        synth_samples = np.load(samples_path)[:10000]
        synth_samples = synth_samples.reshape(
            (10000, context_length + prediction_length)
        )

    # Run TSTR experiment for each downstream model
    results = []

    for model_name in DOWNSTREAM_MODELS:
        logger.info(f"Training and evaluating {model_name}")
        metrics = train_and_evaluate(
            dataset=dataset,
            model_name=model_name,
            synth_samples=synth_samples,
            real_samples=real_samples,
            config=config,
            scaling_type=config["scaling_type"],
        )
        results.append({"model": model_name, **metrics})

    logger.info("Saving results")
    with open(log_dir / "results.yaml", "w") as fp:
        yaml.safe_dump(
            {"config": config, "metrics": results},
            fp,
            default_flow_style=False,
            sort_keys=False,
        )

In [36]:
run_experiment(config=config, log_dir=out_dir, samples_path=None)

2024-07-23 16:49:23,171 - logger - INFO - Logging to results/tstr_log-1
2024-07-23 16:49:23,172 - logger - INFO - Loading model
2024-07-23 16:49:23,432 - logger - INFO - Generating real samples


  0%|          | 0/10 [00:00<?, ?it/s]

2024-07-23 16:49:25,731 - logger - INFO - Generating synthetic samples


  0%|          | 0/10 [00:00<?, ?it/s]

2024-07-23 16:51:11,490 - logger - INFO - Training and evaluating linear
2024-07-23 16:51:11,491 - logger - INFO - Running TSTR for linear
Running evaluation: 262it [00:04, 55.87it/s]
  return np.mean(np.abs(target - forecast)) / seasonal_error
  return numerator / seasonal_error
  return np.mean(np.abs(target - forecast)) / seasonal_error
  return numerator / seasonal_error
  return np.mean(np.abs(target - forecast)) / seasonal_error
  return numerator / seasonal_error
2024-07-23 16:51:24,687 - logger - INFO - Training and evaluating deepar
2024-07-23 16:51:24,688 - logger - INFO - Running TSTR for deepar
100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:06<00:00,  8.14it/s, epoch=1/100, avg_epoch_loss=1.09]
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████

**Kollovieh, Marcel, et al.** "Predict, refine, synthesize: Self-guiding diffusion models for probabilistic time series forecasting." *Advances in Neural Information Processing Systems* 36 (2024).

**GitHub Repository:** [Amazon Science - Unconditional Time Series Diffusion](https://github.com/amazon-science/unconditional-time-series-diffusion)