# MASBotS — Coalescence-Time Calibration

Purpose: Calibrate model parameters (α, β, f₀, …) so simulated time-to-coalescence matches observed.

Out-of-scope: pixel→meter, spatial trajectory fitting, MPC (handled elsewhere).


In [None]:
%load_ext autoreload
%autoreload 2

from pathlib import Path
import numpy as np, pandas as pd
import matplotlib.pyplot as plt

import sys
ROOT = Path().resolve().parents[1] if (Path().resolve().name == "notebooks") else Path().resolve()
sys.path.append(str(ROOT / "src"))

from data_io import load_runs
from metrics import time_to_coalescence_from_df
from objective import objective_time_error
from optimize import run_global_optimization
from model_params import save_calibrated_params
from sim import SimNotWiredError


In [None]:
CONF = {
    "data_dir": str(ROOT / "data" / "raw"),
    "fps": 30.0,
    "dt": 0.05,
    "T": 100.0,
    "radius_threshold_m": 0.25,
    "dwell_time_s": 1.0,
    "bounds": {
        "alpha": (0.0, 1.0),
        "beta":  (0.0, 1.0),
        "f0":    (0.0, 2.0)
    },
    "max_iter": 60,
    "seed": 42,
    "export_path": str(ROOT / "configs" / "calibrated_params.json")
}


In [None]:
from pathlib import Path

runs = load_runs(Path(CONF["data_dir"]))
len(runs), [r["run_id"] for r in runs[:3]]


In [None]:
# Wiring helpers for IC and inputs
# These MUST reconstruct the initial state and inputs for the sim from each CSV. Keep minimal but explicit.

import numpy as np
import pandas as pd

def initial_state_builder(df: pd.DataFrame) -> np.ndarray:
    # Use the first frame as initial state.
    # State layout must match your simulator exactly (e.g., [x1,y1,x2,y2,...]).
    f0 = df["frame"].min()
    d0 = df[df["frame"]==f0].sort_values("bot_id")
    X0 = d0[["x","y"]].values.reshape(-1)  # meters
    return X0.astype(float)


def input_builder(df: pd.DataFrame, dt: float, T: float) -> np.ndarray:
    # Build per-bot input over time (e.g., spin rates); constant if unavailable.
    n_steps = int(T/dt) + 1
    n_bots = df["bot_id"].nunique()
    U = np.zeros((n_steps, n_bots), dtype=float)
    # TODO: replace with real input reconstruction when available.
    return U


In [None]:
def make_loss():
    def loss_fn(params_dict):
        return objective_time_error(
            params=params_dict,
            runs=runs,
            fps=CONF["fps"],
            dt=CONF["dt"],
            T=CONF["T"],
            radius_threshold_m=CONF["radius_threshold_m"],
            dwell_time_s=CONF["dwell_time_s"],
            initial_state_builder=initial_state_builder,
            input_builder=input_builder
        )
    return loss_fn

loss = make_loss()


In [None]:
try:
    _ = loss({"alpha":0.1,"beta":0.01,"f0":0.2})
    print("Loss function callable. (If this fails, wire src/sim.py to your sim.)")
except SimNotWiredError as e:
    print("Simulation not wired yet:", e)


In [None]:
best = run_global_optimization(
    loss_fn=loss,
    bounds=CONF["bounds"],
    max_iter=CONF["max_iter"],
    seed=CONF["seed"]
)
best


In [None]:
PARAMS = {k: v for k, v in best.items() if k in CONF["bounds"].keys()}
META = {
    "radius_threshold_m": CONF["radius_threshold_m"],
    "dwell_time_s": CONF["dwell_time_s"],
    "fps": CONF["fps"],
    "dt": CONF["dt"],
    "T": CONF["T"],
    "bounds": CONF["bounds"],
    "optimizer": "scipy.differential_evolution",
    "runs_used": [r["run_id"] for r in runs]
}
save_calibrated_params(Path(CONF["export_path"]), PARAMS, META)
print("Saved:", CONF["export_path"])


In [None]:
# Minimal reporting (after wiring)
import pandas as pd
from sim import simulate_swarm_time_to_coalescence

rows = []
for run in runs[:5]:
    df = run["df"]
    obs = time_to_coalescence_from_df(df, CONF["radius_threshold_m"], CONF["dwell_time_s"], CONF["fps"])
    X0 = initial_state_builder(df)
    U  = input_builder(df, CONF["dt"], CONF["T"])
    sim = simulate_swarm_time_to_coalescence(
        params=PARAMS, initial_state=X0, inputs=U,
        dt=CONF["dt"], T=CONF["T"],
        radius_threshold_m=CONF["radius_threshold_m"],
        dwell_time_s=CONF["dwell_time_s"]
    )
    rows.append({"run_id": run["run_id"], "obs_t_s": obs, "sim_t_s": sim})
report = pd.DataFrame(rows)
report.to_csv(Path(ROOT/"results"/"coalescence_times_sample.csv"), index=False)
report


## Notes

Assumptions, bounds, threshold/dwell definitions, what’s exported, next steps.

- We calibrate only coalescence time; no pixel→meter, no MPC.
- Deterministic seeds and `CONF` live at the top; exports are versioned.
- `configs/calibrated_params.json` includes commit hash and dataset summary.
- CSV schema requires columns: `frame, bot_id, x, y` in meters.
- `SimNotWiredError` protects against using the notebook before the sim is wired.
- Save small figures to `figs/` and tables to `results/` to keep Git light.
- Downstream notebooks should load params via `model_params.load_calibrated_params`.
