# Plug-and-Play Demo: Run–Tumble with Spec-Declared 1‑Back Observation

This notebook mimics an external project that imports `plume_nav_sim` as a library and
defines its own policy. It showcases:

- Spec‑first composition using `SimulationSpec` + `prepare()`
- Dotted‑path policy import from a separate package (`plug_and_play_demo`)
- Spec‑declared observation wrapper: core `ConcentrationNBackWrapper(n=2)` so the policy sees `[c_prev, c_now]`
- Runner streaming with RGB frames (headless‑safe display)


## Capture Goals and Workflow

This demo focuses on minimal plug-and-play execution. For reproducible, analysis-ready datasets, use the data capture pipeline and CLI.

- See README: plug-and-play-demo/README.md (Capture Workflow)
- Data catalog and schemas: src/backend/docs/data_catalog_capture.md, src/backend/docs/data_capture_schemas.md

Quick start (CLI):

```bash
plume-nav-capture --output results --experiment demo --episodes 2 --grid 8x8 --parquet
```

Validate artifacts:

```python
from pathlib import Path
from plume_nav_sim.data_capture.validate import validate_run_artifacts
report = validate_run_artifacts(Path("results/demo/<run_id>"))
```


In [None]:
# Save video helper (optional)
from typing import Sequence

import numpy as np


def save_video(
    frames: Sequence[np.ndarray], out_path: str = "demo.gif", fps: int = 10
) -> None:
    """Persist captured frames to disk using the library helper."""
    if not frames:
        print("No frames to save.")
        return
    try:
        from plume_nav_sim.utils.video import save_video_frames

        save_video_frames(frames, out_path, fps=fps)
        print(f"Saved video to {out_path}")
    except ImportError as exc:
        print("Install imageio to enable video export:", exc)
    except Exception as exc:  # pragma: no cover - notebook convenience
        print("Video export failed:", exc)

> Prerequisite: install `plume_nav_sim` in your environment (e.g., `pip install plume_nav_sim` or editable install).

This notebook assumes `plume_nav_sim` is importable. The demo policy lives next to this notebook under the `plug_and_play_demo` package, so the dotted path resolves when running this notebook from its folder.

In [None]:
# Ensure demo package is importable if running from repo root
import sys
from pathlib import Path

demo_root = Path.cwd()
if not (demo_root / "plug_and_play_demo").exists():
    candidate = demo_root / "plug-and-play-demo"
    if candidate.exists():
        sys.path.append(str(candidate))
        print("Added demo to sys.path:", candidate)
else:
    pass

In [None]:
# Compose the simulation: run–tumble actions + concentration observation + 1-back wrapper
from plume_nav_sim.compose import SimulationSpec, PolicySpec, WrapperSpec, prepare


sim = SimulationSpec(
    grid_size=(128, 128),
    max_steps=400,
    action_type="run_tumble",
    observation_type="concentration",
    reward_type="step_penalty",
    render=True,
    seed=123,
    policy=PolicySpec(spec="plug_and_play_demo:DeltaBasedRunTumblePolicy"),
    observation_wrappers=[
        WrapperSpec(
            spec="plume_nav_sim.observations.history_wrappers:ConcentrationNBackWrapper",
            kwargs={"n": 2},
        )
    ],
)

env, policy = prepare(sim)
env.observation_space, env.action_space

In [None]:
# Stream one episode and display frames (headless-safe)
from IPython.display import display, update_display
from PIL import Image

from plume_nav_sim.runner import runner

frames = []
actions = []
positions = []

disp = None
for ev in runner.stream(env, policy, seed=sim.seed, render=True):
    actions.append(int(ev.action))
    # Collect agent position from info (agent_xy preferred)
    pos = ev.info.get("agent_xy")
    if pos is not None:
        positions.append(pos)
    if ev.frame is not None:
        frames.append(ev.frame)
        img = Image.fromarray(ev.frame)
        if disp is None:
            disp = display(img, display_id=True)
        else:
            update_display(img, display_id=disp.display_id)
env.close()

len(frames), actions[:10]

In [None]:
# Plot trajectory over a plume background
import matplotlib.pyplot as plt
from plume_nav_sim.envs.plume_search_env import unwrap_to_plume_env

%matplotlib inline

fig, ax = plt.subplots(figsize=(6, 6))
# Prefer last captured frame; if none, render a fresh one from the env
bg = None
if frames:
    bg = frames[-1]
else:
    try:
        base = unwrap_to_plume_env(env)
        try:
            bg = base.render("rgb_array")
        except TypeError:
            bg = base.render()
    except Exception:
        bg = None

if bg is not None:
    h, w = bg.shape[:2]
    ax.imshow(bg, origin="lower", extent=[0, w, 0, h])
if positions:
    xs, ys = zip(*positions)
    ax.plot(xs, ys, color="cyan", linewidth=2, alpha=0.9, label="path")
    ax.scatter([xs[0]], [ys[0]], c="yellow", s=40, label="start")
    ax.scatter([xs[-1]], [ys[-1]], c="red", s=40, label="end")
    if bg is not None:
        ax.set_xlim([0, w])
        ax.set_ylim([0, h])
        ax.set_aspect("equal", adjustable="box")
# Mark goal if available
try:
    base = unwrap_to_plume_env(env)
    goal = getattr(base._env, "goal_location", None)
    if goal is not None:
        ax.scatter([goal.x], [goal.y], c="magenta", marker="x", s=60, label="goal")
except Exception:
    pass
ax.set_title("Agent Trajectory")
ax.legend(loc="lower right")
plt.show()

In [None]:
# Quick summary
print({"steps": len(actions), "frames": len(frames)})