# ChronoTick 2: Zero-Shot TSFM Benchmark

Automated benchmark of time series foundation models on clock drift prediction.
Runs all models across 4 machines, with and without covariates, at multiple
context lengths and horizons. Just click **Run All**.

## Models
| Model | Origin | Params | Covariates | Probabilistic |
|-------|--------|--------|------------|---------------|
| Chronos-2 Small | Amazon | 28M | Yes | Yes |
| Chronos-2 Base | Amazon | 120M | Yes | Yes |
| TimesFM 2.5 | Google | 200M | Yes | Yes |
| Granite TTM | IBM | 1-5M | No | No (point only) |
| Toto | Datadog | 151M | Yes | Yes (samples) |
| Moirai 1.1 Small | Salesforce | 14M | Yes | Yes (samples) |

## Dependency Groups
Models are ordered by dependency compatibility. Results are saved per-model,
so later installs that override earlier packages don't affect completed runs.

- **Group A** (compatible): Chronos-2, Granite TTM, TimesFM 2.5
- **Group B** (exact-pinned deps): Toto
- **Group C** (needs old torch): Moirai

In [None]:
# === Environment Setup ===
import os
import subprocess
import sys

IN_COLAB = "COLAB_GPU" in os.environ or os.path.exists("/content")

if IN_COLAB:
    REPO_DIR = "/content/sensor-collector"
    REPO_URL = "https://github.com/JaimeCernuda/sensor-collector.git"

    # Read GitHub token from Colab secrets (set via sidebar key icon).
    # Required for git push; without it the notebook runs but cannot push.
    GITHUB_TOKEN = None
    try:
        from google.colab import userdata
        GITHUB_TOKEN = userdata.get("GITHUB_TOKEN")
    except Exception:
        print("WARNING: GITHUB_TOKEN secret not available. Git push will be skipped.")
        print("  To enable: run from Colab UI with Secrets > GITHUB_TOKEN set.")

    # Build authenticated URL if token available
    if GITHUB_TOKEN:
        auth_url = f"https://{GITHUB_TOKEN}@github.com/JaimeCernuda/sensor-collector.git"
    else:
        auth_url = REPO_URL

    # Clone or pull latest tick2 code from GitHub
    if os.path.exists(REPO_DIR):
        # Update remote URL in case token was added after initial clone
        subprocess.run(["git", "-C", REPO_DIR, "remote", "set-url", "origin", auth_url], check=True)
        # Reset to remote HEAD to avoid divergence from previous Colab commits.
        # Colab outputs are regenerated each run, so local commits are disposable.
        subprocess.run(["git", "-C", REPO_DIR, "fetch", "-q", "origin"], check=True)
        subprocess.run(["git", "-C", REPO_DIR, "reset", "--hard", "origin/main"], check=True)
    else:
        subprocess.run(["git", "clone", "-q", auth_url, REPO_DIR], check=True)

    # Configure git identity (Colab has no global config)
    subprocess.run(["git", "-C", REPO_DIR, "config", "user.name", "Colab Runner"], check=True)
    subprocess.run(["git", "-C", REPO_DIR, "config", "user.email", "colab@chronotick.dev"], check=True)

    # Install tick2 package in editable mode
    subprocess.run(["pip", "install", "-q", "-e", f"{REPO_DIR}/tick2/"], check=True)

    # Ensure tick2 is importable (pip install via subprocess doesn't always
    # update sys.path in the running kernel)
    tick2_src = f"{REPO_DIR}/tick2/src"
    if tick2_src not in sys.path:
        sys.path.insert(0, tick2_src)

    # Data: repo now includes the CSVs
    DATA_DIR = f"{REPO_DIR}/sensors/data"
    if not os.path.isdir(f"{DATA_DIR}/24h_snapshot"):
        from google.colab import drive
        drive.mount("/content/drive")
        DATA_DIR = "/content/drive/MyDrive/chronotick2/data"

    # Output directory inside the repo (will be git-pushed)
    RESULTS_DIR = f"{REPO_DIR}/tick2/notebooks/output/02"
else:
    GITHUB_TOKEN = None
    DATA_DIR = None
    RESULTS_DIR = os.path.join(os.path.dirname(__file__) if "__file__" in dir() else ".", "output", "02")

print(f"Environment: {'Colab' if IN_COLAB else 'Local'}")
print(f"Data dir:    {DATA_DIR or '(default)'}")
print(f"Results dir: {RESULTS_DIR}")

In [None]:
# === Model Definitions & Helpers ===
import importlib
from pathlib import Path

import pandas as pd

# Models ordered by dependency compatibility group.
# Group A models share compatible deps and run first.
# Before Group B/C, conflicting packages are uninstalled so pip can resolve
# the new model's pinned versions cleanly.
#
# Dep conflicts (why we need the cleanup cycle):
#   granite-tsfm pins torch<2.9           vs  Colab has torch==2.9
#   chronos needs scikit-learn>=1.6       vs  toto-ts pins scikit-learn==1.5
#   toto-ts pins torch==2.7              vs  uni2ts needs torch<2.5
MODEL_CONFIGS = [
    # --- Group A: compatible (chronos + granite + timesfm) ---
    {
        "name": "chronos2-small",
        "cleanup": [],
        "install": ['pip install -q "chronos-forecasting[extras]>=2.2"'],
        "verify": "chronos",
        "group": "A",
    },
    {
        "name": "chronos2-base",
        "cleanup": [],
        "install": [],  # same package as chronos2-small
        "verify": "chronos",
        "group": "A",
    },
    {
        # granite-tsfm pins torch<2.9 which conflicts with Colab's torch 2.9.
        # Install with --no-deps to keep the CUDA torch, then add real deps.
        "name": "granite-ttm",
        "cleanup": [],
        "install": [
            'pip install -q "granite-tsfm>=0.3.3" --no-deps',
            'pip install -q "transformers>=4.56,<5" datasets deprecated',
        ],
        "verify": "tsfm_public",
        "verify_deep": "tsfm_public.models.tinytimemixer:TinyTimeMixerForPrediction",
        "group": "A",
    },
    {
        # TimesFM 2.5 is only on GitHub (PyPI has old 1.x API).
        # The repo is missing __init__.py in timesfm_2p5/; we add it.
        # Non-editable install to avoid namespace package issues.
        "name": "timesfm-2.5",
        "cleanup": [],
        "install": [
            "git clone -q --depth 1 https://github.com/google-research/timesfm /content/timesfm 2>/dev/null || true",
            "touch /content/timesfm/src/timesfm/timesfm_2p5/__init__.py",
            'pip install -q "/content/timesfm[torch]"',
        ],
        "verify": "timesfm",
        "verify_deep": "timesfm.timesfm_2p5.timesfm_2p5_torch:TimesFM_2p5_200M_torch",
        "group": "A",
    },
    # --- Group B: toto (exact-pinned deps) ---
    # Uninstall granite + chronos first so pip can resolve toto's pins.
    # toto-ts uses pkg_resources which was removed in setuptools>=82.
    {
        "name": "toto",
        "cleanup": [
            "pip uninstall -y -q chronos-forecasting granite-tsfm 2>/dev/null; true",
        ],
        "install": [
            'pip install -q "setuptools<81"',
            "pip install -q toto-ts",
        ],
        "verify": "toto",
        "group": "B",
    },
    # --- Group C: moirai (uni2ts pins torch<2.5) ---
    # Install with --no-deps to avoid torch downgrade, then add missing deps.
    {
        "name": "moirai-1.1-small",
        "cleanup": [
            "pip uninstall -y -q toto-ts 2>/dev/null; true",
        ],
        "install": [
            "pip install -q uni2ts --no-deps",
            'pip install -q "einops>=0.7" "gluonts>=0.14" jaxtyping hydra-core python-dotenv lightning safetensors huggingface_hub',
        ],
        "verify": "uni2ts",
        "verify_deep": "uni2ts.model.moirai:MoiraiModule",
        "group": "C",
    },
]


def _can_import(module_name: str) -> bool:
    """Check if a Python module is importable."""
    try:
        importlib.import_module(module_name)
        return True
    except (ImportError, ModuleNotFoundError):
        return False


def _deep_verify(spec: str) -> tuple[bool, str]:
    """Try to import module:class from a 'module.path:ClassName' spec.

    Returns (success, error_message).
    """
    mod_path, _, cls_name = spec.partition(":")
    try:
        mod = importlib.import_module(mod_path)
        if cls_name and not hasattr(mod, cls_name):
            return False, f"{mod_path} imported but {cls_name} not found"
        return True, ""
    except Exception as e:
        return False, f"{mod_path}: {e}"


def install_model_deps(cfg: dict) -> bool:
    """Install dependencies for a model, cleaning up conflicts first.

    Flow: check cache -> cleanup conflicting packages -> install -> verify.
    Returns True if the model is ready to run.
    """
    name, verify = cfg["name"], cfg["verify"]

    # Already importable -- no install needed
    if not cfg.get("cleanup") and _can_import(verify):
        # Also run deep verify if specified
        deep = cfg.get("verify_deep")
        if deep:
            ok, err = _deep_verify(deep)
            if not ok:
                print(f"  [{verify}] top-level OK but deep verify failed: {err}")
                # Fall through to reinstall
            else:
                print(f"  [{verify}] already available")
                return True
        else:
            print(f"  [{verify}] already available")
            return True

    if not IN_COLAB and not _can_import(verify):
        print(f"  [SKIP] {verify} not installed (install manually for local runs)")
        return False

    # If no install commands, just verify (e.g., chronos2-base shares chronos pkg)
    if not cfg["install"] and not cfg["cleanup"]:
        return _can_import(verify)

    # Cleanup: uninstall packages whose version pins conflict with this model
    for cmd in cfg.get("cleanup", []):
        print(f"  $ {cmd}")
        subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=120)

    # After cleanup, check again (maybe we don't need to install)
    importlib.invalidate_caches()
    if _can_import(verify) and not cfg.get("cleanup"):
        print(f"  [{verify}] already available")
        return True

    # Install
    for cmd in cfg["install"]:
        print(f"  $ {cmd}")
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=600)
        if result.returncode != 0:
            tail = result.stderr.strip().split("\n")[-5:]
            print("  [FAIL]", "\n        ".join(tail))
            return False

    importlib.invalidate_caches()

    # Verify: top-level import
    if not _can_import(verify):
        print(f"  [FAIL] {verify} not importable after install")
        return False

    # Deep verify: check specific class/submodule
    deep = cfg.get("verify_deep")
    if deep:
        ok, err = _deep_verify(deep)
        if not ok:
            print(f"  [FAIL] {err}")
            return False

    print(f"  [{verify}] ready")
    return True


def _model_csv_name(model_name: str, device: str) -> str:
    """Build device-tagged CSV filename: e.g. 'chronos2-small_cuda.csv'."""
    return f"{model_name}_{device}.csv"


def load_model_results(out_dir: Path, model_name: str, device: str) -> pd.DataFrame | None:
    """Load previously saved per-model results for this device, or None."""
    csv_path = out_dir / _model_csv_name(model_name, device)
    if csv_path.exists():
        return pd.read_csv(csv_path)
    # Fallback: check for legacy untagged file (from earlier runs)
    legacy = out_dir / f"{model_name}.csv"
    if legacy.exists():
        df = pd.read_csv(legacy)
        # Only use if device matches
        if "device" in df.columns and (df["device"] == device).all():
            return df
    return None


def save_model_results(out_dir: Path, model_name: str, device: str, df: pd.DataFrame) -> None:
    """Save per-model results CSV tagged by device."""
    out_dir.mkdir(parents=True, exist_ok=True)
    csv_path = out_dir / _model_csv_name(model_name, device)
    df.to_csv(csv_path, index=False)
    print(f"  Saved: {csv_path}")

In [None]:
# === Imports, Config & Data ===
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch

from tick2.benchmark.runner import BenchmarkConfig, run_benchmark, results_to_dataframe
from tick2.benchmark.reporting import format_summary, results_to_latex, save_results
from tick2.data.preprocessing import TARGET_COL, get_feature_cols, load_all
from tick2.models.registry import get_model, list_models
from tick2.utils.gpu import clear_gpu_memory

sns.set_theme(style="whitegrid", font_scale=1.1)

# --- Device selection ---
# Set DEVICE_OVERRIDE to force a specific device:
#   "cpu"  — run all models on CPU (for latency comparison)
#   "cuda" — force GPU
#   None   — auto-detect (GPU if available, else CPU)
DEVICE_OVERRIDE = None

if DEVICE_OVERRIDE:
    device = DEVICE_OVERRIDE
else:
    device = "cuda" if torch.cuda.is_available() else "cpu"

if device == "cuda":
    props = torch.cuda.get_device_properties(0)
    vram = getattr(props, "total_memory", getattr(props, "total_mem", 0))
    print(f"GPU:  {torch.cuda.get_device_name(0)}")
    print(f"VRAM: {vram / 1024**3:.1f} GB")
else:
    print("Running on CPU")
print(f"Device: {device}")

# --- Benchmark config ---
# Uses defaults from BenchmarkConfig:
#   context_lengths: [512, 1024, 1800, 3600]
#   horizons: [60, 120, 300]
#   n_samples: 25 per combo (averaged for robust metrics)
config = BenchmarkConfig()

print(f"Context lengths: {config.context_lengths}")
print(f"Horizons:        {config.horizons}")
print(f"Samples/combo:   {config.n_samples}")
n_combos = len(config.context_lengths) * len(config.horizons) * len(config.use_covariates)
print(f"Configs/machine: {n_combos} ({n_combos} x 4 machines = {n_combos * 4} total/model)")

# --- Load sensor data ---
data_dir = Path(DATA_DIR) if DATA_DIR else None
datasets = load_all(data_dir=data_dir, snapshot="24h_snapshot")
for name, (df, cats) in datasets.items():
    print(f"  {name:16s}: {len(df):6d} rows, {len(get_feature_cols(df)):3d} features")

# --- Results directory ---
results_dir = Path(RESULTS_DIR)
results_dir.mkdir(parents=True, exist_ok=True)
print(f"\nResults: {results_dir}")
print(f"Models:  {list_models()}")

## Run All Benchmarks

For each model: install deps, load, benchmark across all machines/configs,
save per-model CSV, unload. Cached results are loaded on resume.

**Config:** 4 context lengths x 3 horizons x 2 covariate modes x 4 machines = 96 combos/model,
each averaged over 25 random windows. On Colab T4, expect ~60-90 min per model.

In [None]:
# Set FORCE_RERUN to re-benchmark models that already have cached CSVs.
# Useful after fixing model wrappers. Set to a list of model names or True for all.
FORCE_RERUN = False  # e.g., ["granite-ttm", "timesfm-2.5"] or True

completed = {}  # model_name -> DataFrame
skipped = []

for cfg in MODEL_CONFIGS:
    model_name = cfg["name"]
    print(f"\n{'='*60}")
    print(f" {model_name}  [Group {cfg['group']}]  device={device}")
    print(f"{'='*60}")

    # Resume: load cached results if available (unless forced)
    force = FORCE_RERUN is True or (
        isinstance(FORCE_RERUN, list) and model_name in FORCE_RERUN
    )
    if not force:
        cached = load_model_results(results_dir, model_name, device)
        if cached is not None:
            print(f"  [CACHED] {len(cached)} rows from previous run ({device})")
            completed[model_name] = cached
            continue

    # Install dependencies
    if not install_model_deps(cfg):
        skipped.append(model_name)
        continue

    # Benchmark
    model = None
    try:
        clear_gpu_memory()
        model = get_model(model_name)
        model.load(device=device)
        print(f"  Loaded on {device}, ~{model.memory_footprint_mb():.0f} MB")

        run_results = run_benchmark(model, datasets, config, progress=True)
        model_df = results_to_dataframe(run_results)

        save_model_results(results_dir, model_name, device, model_df)
        completed[model_name] = model_df
        print(f"  Done: {len(run_results)} configs, mean MAE = {model_df['mae'].mean():.4f}")

    except Exception as e:
        print(f"  [FAIL] {e}")
        import traceback
        traceback.print_exc()
        skipped.append(model_name)

    finally:
        del model
        clear_gpu_memory()

# --- Summary ---
print(f"\n{'='*60}")
print(f"  Device:    {device}")
print(f"  Completed: {list(completed.keys())}")
if skipped:
    print(f"  Skipped:   {skipped}")
print(f"{'='*60}")

## Results

In [None]:
# Merge results from current run with any other device runs already on disk.
# This lets us compare GPU vs CPU side-by-side in the same notebook.
all_csvs = sorted(results_dir.glob("*_cuda.csv")) + sorted(results_dir.glob("*_cpu.csv"))
# Also pick up legacy untagged files
all_csvs += sorted(results_dir.glob("*.csv"))
# Deduplicate by reading all and concatenating
all_dfs = []
seen_files = set()
for csv_path in all_csvs:
    if csv_path.name in seen_files or csv_path.name.startswith("zero_shot"):
        continue
    seen_files.add(csv_path.name)
    all_dfs.append(pd.read_csv(csv_path))

if all_dfs:
    results_df = pd.concat(all_dfs, ignore_index=True)
    # Drop exact duplicates (same model+machine+ctx+hz+cov+device)
    results_df = results_df.drop_duplicates(
        subset=["model", "machine", "context_length", "horizon", "with_covariates", "device"],
        keep="last",
    )
    devices = results_df["device"].unique()
    print(f"Results: {len(results_df)} rows across devices: {list(devices)}")
    print(format_summary(results_df))
    display(results_df)
else:
    results_df = pd.DataFrame()
    print("No results collected. Check install/runtime failures above.")

## Visualizations

In [None]:
if not results_df.empty:
    df = results_df.copy()
    uni_df = df[~df["with_covariates"]]

    # --- 1. MAE by Model and Machine (univariate) ---
    fig, ax = plt.subplots(figsize=(12, 5))
    if not uni_df.empty:
        pivot = uni_df.pivot_table(values="mae", index="model", columns="machine")
        pivot.plot(kind="bar", ax=ax)
        ax.set_ylabel("MAE (ppm)")
        ax.set_title("Zero-Shot MAE by Model and Machine (Univariate)")
        ax.legend(title="Machine")
        plt.xticks(rotation=45)
    plt.tight_layout()
    fig.savefig(results_dir / "mae_by_model_machine.png", dpi=150, bbox_inches="tight")
    plt.show()

    # --- 2. Covariate Effect ---
    cov_models = df[df["model"].isin(df[df["with_covariates"]]["model"].unique())]
    if not cov_models.empty and len(cov_models["with_covariates"].unique()) > 1:
        fig, ax = plt.subplots(figsize=(10, 5))
        sns.barplot(data=cov_models, x="model", y="mae", hue="with_covariates", ax=ax)
        ax.set_ylabel("MAE (ppm)")
        ax.set_title("Covariate Effect: Univariate vs. Multivariate")
        ax.legend(title="With Covariates")
        plt.xticks(rotation=45)
        plt.tight_layout()
        fig.savefig(results_dir / "covariate_effect.png", dpi=150, bbox_inches="tight")
        plt.show()

    # --- 3. Inference Latency ---
    fig, ax = plt.subplots(figsize=(10, 5))
    time_data = df.groupby("model")["inference_ms"].mean().sort_values()
    time_data.plot(kind="barh", ax=ax, color="steelblue")
    ax.set_xlabel("Mean Inference Time (ms)")
    ax.set_title("Inference Latency by Model")
    plt.tight_layout()
    fig.savefig(results_dir / "inference_latency.png", dpi=150, bbox_inches="tight")
    plt.show()

    # --- 4. Context Length Sensitivity ---
    if len(uni_df["context_length"].unique()) > 1:
        fig, ax = plt.subplots(figsize=(10, 5))
        sns.lineplot(
            data=uni_df, x="context_length", y="mae",
            hue="model", style="model", markers=True, ax=ax,
        )
        ax.set_xlabel("Context Length (timesteps)")
        ax.set_ylabel("MAE (ppm)")
        ax.set_title("MAE vs. Context Length")
        plt.tight_layout()
        fig.savefig(results_dir / "context_sensitivity.png", dpi=150, bbox_inches="tight")
        plt.show()

    print(f"Saved figures to: {results_dir}")

## Export Results

In [None]:
if not results_df.empty:
    csv_path, latex_path = save_results(results_df, results_dir, prefix="zero_shot")
    print(f"CSV:   {csv_path}")
    print(f"LaTeX: {latex_path}")
    print(f"\n{results_to_latex(results_df)}")

    # Summary: device breakdown
    if len(results_df["device"].unique()) > 1:
        print("\n=== Per-Device Summary ===")
        for dev in sorted(results_df["device"].unique()):
            sub = results_df[results_df["device"] == dev]
            print(f"\n{dev}: {len(sub)} rows, {sub['model'].nunique()} models")
            print(sub.groupby("model")["inference_ms"].agg(["mean", "max"]).to_string())

## Save & Push Results

Commit all benchmark outputs (CSVs, figures, LaTeX) to the repo so they can
be accessed from another machine via `git pull`.

In [None]:
if IN_COLAB:
    os.chdir(REPO_DIR)

    # Stage all benchmark outputs
    subprocess.run(["git", "add", "tick2/notebooks/output/02/"], check=True)

    # Check if there's anything to commit
    status = subprocess.run(
        ["git", "status", "--porcelain", "tick2/notebooks/output/02/"],
        capture_output=True, text=True,
    )
    if status.stdout.strip():
        subprocess.run(
            ["git", "commit", "-m", "results: notebook 02 zero-shot benchmark outputs"],
            check=True,
        )
        if GITHUB_TOKEN:
            subprocess.run(["git", "push"], check=True)
            print("Pushed notebook 02 outputs to GitHub.")
        else:
            print("Committed locally but GITHUB_TOKEN not set — skipping push.")
            print("Set the secret in Colab sidebar > Secrets > GITHUB_TOKEN")
    else:
        print("No new outputs to commit.")
else:
    print(f"Local run. Outputs saved to: {results_dir}")
    print("Run 'git add tick2/notebooks/output/02/ && git commit && git push' to share.")