# sparkle_motion — Colab / Drive setup
This notebook contains the recommended Colab setup steps and a small smoke-test harness for running the orchestrator in a GPU environment.
- Use the cells below to mount Google Drive (Colab only).
- Use the installation cell to install the ML stack from `requirements-ml.txt` (optional; heavy).
- The smoke-test cell is a safe stub that checks imports and shows how to run the orchestrator in simulation mode.

## Notes and expectations
- This notebook is intended for Google Colab (A100) runs. If you are running locally, skip the Drive mount and run the commands in a terminal.
- Before installing the heavy ML dependencies, ensure you have sufficient disk and GPU (Colab or a VM). The requirements are listed in `requirements-ml.txt`.

## Configure workspace inputs
Set these before running the helper so it knows where to create directories and which model snapshots to pull. Provide one or more repo IDs via `HF_MODELS`; set `DRY_RUN = True` or leave the list empty to skip downloads.

In [1]:
# Cell 0: Workspace configuration (edit these as needed)
from pathlib import Path

WORKSPACE_NAME = "SparkleMotion"          # Folder created under MyDrive/
HF_MODELS = [
    "stabilityai/stable-diffusion-xl-base-1.0",
]
DRY_RUN = False                              # True = skip download/smoke actions
MOUNT_POINT = "/content/drive"             # Default Colab mount
REPO_ROOT = Path.cwd()                       # Assumes notebook is opened from repo root

print(f"Configured workspace '{WORKSPACE_NAME}' (repo root: {REPO_ROOT})")
print(f"Models to manage: {HF_MODELS or '[none specified]'}")

Configured workspace 'SparkleMotion' (repo root: /home/phil/work/sparkle_motion/notebooks)
Models to manage: ['stabilityai/stable-diffusion-xl-base-1.0']


## Load secrets from `.env`
Run the next cell once the bootstrap script has created a `.env` file (either in the
repo root, `/content`, or your Drive workspace). It installs `python-dotenv` if
necessary and loads the variables into the current kernel so the ADK clients can
reuse them.

In [None]:
# Cell 0a: Load secrets from .env using python-dotenv
import importlib.util
import subprocess
import sys
from pathlib import Path


def _ensure_python_dotenv_installed() -> None:
    if importlib.util.find_spec("dotenv") is None:
        print("Installing python-dotenv...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "python-dotenv"])


_ensure_python_dotenv_installed()
from dotenv import load_dotenv  # type: ignore  # imported after ensuring package

candidate_paths = [
    REPO_ROOT / ".env.local",
    REPO_ROOT / ".env",
    Path("/content/.env"),
    Path(MOUNT_POINT) / "MyDrive" / WORKSPACE_NAME / ".env",
]

env_path = next((path for path in candidate_paths if path.exists()), None)
if env_path is None:
    print(
        "No .env file found. Run scripts/bootstrap_adk_projects.sh --profile local-colab "
        "and re-run this cell once the file exists."
    )
else:
    load_dotenv(env_path, override=True)
    print(f"Loaded environment variables from {env_path}")


In [None]:
# Cell 1: Mount Google Drive (Colab-only)
import os
from pathlib import Path

in_colab = importlib.util.find_spec("google.colab") is not None
if not in_colab:
    print("Not running inside Google Colab; skipping Drive mount.")
else:
    from google.colab import drive

    mount_target = Path(MOUNT_POINT)
    mount_target.mkdir(parents=True, exist_ok=True)
    if os.path.ismount(mount_target):
        print(f"Google Drive already mounted at {mount_target}.")
    else:
        print(f"Mounting Google Drive at {mount_target}...")
        drive.mount(str(mount_target), force_remount=False)

    workspace_root = mount_target / "MyDrive" / WORKSPACE_NAME
    workspace_root.mkdir(parents=True, exist_ok=True)
    print(f"Workspace directory ready at {workspace_root}")


In [None]:
# Cell 2: Install ML dependencies from requirements-ml.txt (Colab / heavy)
import importlib.util


def sh(cmd):
    print('Running:', cmd)
    return subprocess.check_call(cmd, shell=True)


if importlib.util.find_spec('google.colab'):
    print('Detected Colab.\n')
    print('If you need a CUDA-optimized torch wheel, install it first as recommended in the repo notebook.\n')
    # The repository contains requirements-ml.txt at the repo root.
    # If you placed the repository under Drive, adjust the path accordingly (e.g. /content/drive/MyDrive/sparkle_motion/requirements-ml.txt).
    req_path = 'requirements-ml.txt'
    try:
        sh(f'pip install -r "{req_path}"')
    except subprocess.CalledProcessError as exc:
        print('pip install failed:', exc)
else:
    print('Not running in Colab — to install locally run:\n    pip install -r requirements-ml.txt')


## Prepare Drive workspace and download models
Use the helper script added to the repo (`scripts/colab_drive_setup.py`) to create Drive folders, optionally download Hugging Face weights, and write a smoke artifact in `outputs/colab_smoke.json`.
- When running locally (outside Colab), pass `--local-root /path/to/workspace` so the helper skips the Drive mount and uses your filesystem directly.

In [2]:
# Run once before Cell 4
REPO_ROOT = Path("/home/phil/work/sparkle_motion")
print("Force-set REPO_ROOT to", REPO_ROOT)

Force-set REPO_ROOT to /home/phil/work/sparkle_motion


In [2]:
# Cell 4: Invoke Drive helper (creates folders, optional download)
import importlib.util
import subprocess
from pathlib import Path

helper_path = REPO_ROOT / "scripts" / "colab_drive_setup.py"
if not helper_path.exists():
    print(f"Helper script not found at {helper_path}. Ensure you're running the notebook from the repo root.")
else:
    in_colab = importlib.util.find_spec("google.colab") is not None
    if not in_colab:
        print("Not running inside Google Colab. Run the helper manually from a terminal:")
        local_root = (REPO_ROOT / "colab_drive_workspace").resolve()
        cmd_parts = [
            f"PYTHONPATH=\"{REPO_ROOT / 'src'}\"",
            "python",
            str(helper_path),
            WORKSPACE_NAME,
            "--local-root",
            str(local_root),
        ]
        for repo_id in HF_MODELS:
            cmd_parts.extend(["--model", repo_id])
        if DRY_RUN:
            cmd_parts.append("--dry-run")
        print("  " + " ".join(cmd_parts))
        print("Adjust --local-root to a writable directory if you prefer a different location.")
    else:
        cmd = [
            "python",
            str(helper_path),
            WORKSPACE_NAME,
            "--mount-point",
            str(MOUNT_POINT),
        ]
        for repo_id in HF_MODELS:
            cmd.extend(["--model", repo_id])
        if DRY_RUN:
            cmd.append("--dry-run")
        print("Running helper:", " ".join(cmd))
        subprocess.check_call(cmd)

Helper script not found at /home/phil/work/sparkle_motion/notebooks/scripts/colab_drive_setup.py. Ensure you're running the notebook from the repo root.


In [3]:
# Cell 4b: Inspect smoke artifact with per-model status
import json
from pathlib import Path

smoke_path = Path(MOUNT_POINT) / "MyDrive" / WORKSPACE_NAME / "outputs" / "colab_smoke.json"
if smoke_path.exists():
    data = json.loads(smoke_path.read_text(encoding="utf-8"))
    status = "OK" if data.get("ok") else "FAILED"
    print(f"Smoke status: {status}")
    for model in data.get("models", []):
        sample = model.get("sample_file") or "n/a"
        print(
            f"- {model['repo_id']}: {model['status']} "
            f"({model.get('files_present', 0)} files, sample={sample})"
        )
else:
    print(f"No smoke artifact found at {smoke_path}. Run the helper once to generate it.")

No smoke artifact found at /content/drive/MyDrive/SparkleMotion/outputs/colab_smoke.json. Run the helper once to generate it.


In [None]:
# Cell 3: Smoke-test stub for the orchestrator (safe, non-destructive)
# This cell attempts to import the orchestrator package and reports what is available.
try:
    import sparkle_motion.orchestrator as orchestrator_mod
    print('Imported sparkle_motion.orchestrator ->', orchestrator_mod)
    if hasattr(orchestrator_mod, 'Runner'):
        print('Runner class is available. You can instantiate it for a simulation run.')
        print('Example (local):')
        print("  from sparkle_motion.orchestrator import Runner")
        print("  r = Runner(run_dir='runs')")
        print("  # then use r.run(...) or similar per your orchestrator API")
    else:
        print('Runner class not found — inspect src/sparkle_motion/orchestrator.py for usage.')
except Exception as e:
    print('Could not import orchestrator module:', e)
    print('If you want to run the orchestrator, ensure the package is on PYTHONPATH (e.g., pip install -e .) or run via the repository root.')


## Quickstart: Launch the control panel
Run this cell after the FunctionTools are live (script_agent + production_agent listening on localhost). It imports `create_control_panel`, instantiates the widgets with the `local-colab` profile, and stores the panel in `control_panel` so later helpers (status polling, final deliverable preview) can reuse the same run metadata.


In [None]:
from pathlib import Path
import sys

REPO_ROOT = Path("/home/phil/work/sparkle_motion")  # or Path.cwd() if already in repo
if str(REPO_ROOT) not in sys.path:
    sys.path.append(str(REPO_ROOT))
SRC_PATH = REPO_ROOT / "src"
if str(SRC_PATH) not in sys.path:
    sys.path.append(str(SRC_PATH))


In [6]:
# Quickstart cell: import and display the ipywidgets control panel
from notebooks.control_panel import create_control_panel

print("Launching control panel with endpoints from configs/tool_registry.yaml (profile='local-colab').")
control_panel = create_control_panel()
control_panel

ModuleNotFoundError: No module named 'sparkle_motion'

## Notebook control panel prototype (advanced)
Need to override the timeout, point at a different profile, or inspect the underlying widgets? Use the cell below to instantiate `ControlPanel` manually. It shows how to swap endpoint profiles, tweak HTTP timeouts, and still reuse the global `control_panel` handle for downstream helpers.


In [None]:
# Advanced control panel prototype: customize endpoints/timeouts
from notebooks.control_panel import ControlPanel, PanelEndpoints

CUSTOM_PROFILE = "local-colab"  # change to another profile defined in configs/tool_registry.yaml
CUSTOM_TIMEOUT_S = 45.0

print(f"Building ControlPanel(profile={CUSTOM_PROFILE!r}, timeout={CUSTOM_TIMEOUT_S}s)...")
custom_endpoints = PanelEndpoints.from_registry(profile=CUSTOM_PROFILE)
advanced_control_panel = ControlPanel(endpoints=custom_endpoints, http_timeout_s=CUSTOM_TIMEOUT_S)

# Keep downstream helpers working by updating the shared reference.
control_panel = advanced_control_panel
advanced_control_panel.container

## Final deliverable preview & download
Use this helper after a production run completes. It fetches the `video_final` artifact from `qa_publish`, embeds it inline, surfaces the QA badge, and offers a Google Colab download when available.

In [None]:
# Cell 5: Final deliverable helper (qa_publish)
import importlib
import os
import subprocess
from pathlib import Path
from typing import Any, Dict, Optional

import httpx
from IPython.display import HTML, Video, display

PRODUCTION_AGENT_BASE = os.environ.get("PRODUCTION_AGENT_BASE", "http://127.0.0.1:8200")
QA_PUBLISH_STAGE = "qa_publish"
DOWNLOAD_DIR = Path(os.environ.get("FINAL_VIDEO_DIR", "/content/final_videos"))
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)

colab_files = None
_colab_spec = importlib.util.find_spec("google.colab.files")
if _colab_spec:
    colab_files = importlib.import_module("google.colab.files")


def _current_run_id() -> str:
    if "control_panel" in globals():
        cp = globals()["control_panel"]
        run_value = getattr(getattr(cp, "run_id_input", None), "value", "")
        if run_value and run_value.strip():
            return run_value.strip()
        state = getattr(cp, "state", None)
        if state and getattr(state, "last_run_request_id", None):
            return state.last_run_request_id
    return os.environ.get("RUN_ID", "").strip()


RUN_ID = _current_run_id()
if not RUN_ID:
    raise RuntimeError(
        "Set RUN_ID (or populate control_panel.run_id_input) before running the final deliverable helper.",
    )


def _summarize_stage(stage_section: Dict[str, Any]) -> Dict[str, Optional[Dict[str, Any]]]:
    preview = stage_section.get("preview") or {}
    media_summary = stage_section.get("media_summary") or {}
    qa_summary = stage_section.get("qa_summary") or {}
    preview_video = preview.get("video")
    log_lines = [
        f"qa_publish emitted {stage_section.get('count', 0)} artifact(s)",
        f"artifact_types: {stage_section.get('artifact_types') or []}",
        f"media_types: {stage_section.get('media_types') or []}",
    ]
    if preview_video:
        location_hint = preview_video.get("local_path") or preview_video.get("download_url") or preview_video.get("artifact_uri")
        log_lines.append(
            "video preview => "
            f"playback_ready={preview_video.get('playback_ready')} "
            f"qa_passed={preview_video.get('qa_passed')} "
            f"source={location_hint}",
        )
    else:
        log_lines.append("video preview => unavailable")
    video_summary = media_summary.get("video")
    if video_summary:
        log_lines.append(
            "video summary => "
            f"count={video_summary.get('count', 0)} duration_s={video_summary.get('total_duration_s', 0.0):.2f} "
            f"playback_ready={video_summary.get('playback_ready')}",
        )
    if qa_summary:
        log_lines.append(f"qa summary => {qa_summary}")
    print("\n".join(log_lines))
    return {
        "preview": preview_video,
        "media_summary": media_summary,
        "qa_summary": qa_summary,
    }


def fetch_video_final_entry(run_id: str) -> Dict[str, Any]:
    with httpx.Client(timeout=30.0) as client:
        resp = client.get(
            f"{PRODUCTION_AGENT_BASE}/artifacts",
            params={"run_id": run_id, "stage": QA_PUBLISH_STAGE},
        )
        resp.raise_for_status()
        payload = resp.json()

    stage_sections = payload.get("stages") or []
    if stage_sections:
        stage_section = stage_sections[0]  # stage filter ensures only one section
        summaries = _summarize_stage(stage_section)
        fetch_video_final_entry._latest_preview = summaries["preview"]  # type: ignore[attr-defined]
        fetch_video_final_entry._latest_media_summary = summaries["media_summary"]  # type: ignore[attr-defined]
        fetch_video_final_entry._latest_qa_summary = summaries["qa_summary"]  # type: ignore[attr-defined]
        candidate_entries = stage_section.get("artifacts") or []
    else:
        print(
            "Warning: /artifacts response missing 'stages' metadata; falling back to flattened artifacts list.",
        )
        fetch_video_final_entry._latest_preview = None  # type: ignore[attr-defined]
        fetch_video_final_entry._latest_media_summary = None  # type: ignore[attr-defined]
        fetch_video_final_entry._latest_qa_summary = None  # type: ignore[attr-defined]
        candidate_entries = payload.get("artifacts") or []
    for entry in candidate_entries:
        if entry.get("artifact_type") == "video_final":
            return entry
    raise RuntimeError(
        "No video_final artifact found. Ensure qa_publish succeeded or resume production_agent with resume_from='qa_publish'.",
    )


def ensure_local_video(entry: Dict[str, Any], run_id: str) -> Path:
    local_path = entry.get("local_path")
    if local_path:
        candidate = Path(local_path).expanduser()
        if candidate.exists():
            return candidate
    target = DOWNLOAD_DIR / f"{run_id}_video_final.mp4"
    download_url = entry.get("download_url")
    if download_url:
        with httpx.Client(timeout=None) as client:
            with client.stream("GET", download_url) as resp:
                resp.raise_for_status()
                with target.open("wb") as handle:
                    for chunk in resp.iter_bytes():
                        handle.write(chunk)
        return target
    artifact_uri = entry.get("artifact_uri")
    if artifact_uri:
        result = subprocess.run(
            ["adk", "artifacts", "download", artifact_uri, str(target)],
            capture_output=True,
            text=True,
            check=False,
        )
        if result.returncode == 0 and target.exists():
            return target
        print("ADK artifact download failed:")
        print(result.stderr or result.stdout)
    raise RuntimeError(
        "Unable to download the final video locally. Provide download_url or local_path in the artifacts manifest.",
    )


def render_qa_badge(entry: Dict[str, Any]) -> HTML:
    qa_skipped = entry.get("qa_skipped")
    if qa_skipped is None:
        qa_skipped = (entry.get("metadata") or {}).get("qa_skipped")
    color = "#d9534f" if qa_skipped else "#5cb85c"
    label = "QA SKIPPED — manual review required" if qa_skipped else "QA PASSED"
    html = f"""
    <div style=\"padding:6px 10px;background:{color};color:white;display:inline-block;border-radius:6px;font-weight:bold;\">
        {label}
    </div>
    """
    return HTML(html)


def render_preview_video(preview_entry: Optional[Dict[str, Any]]) -> None:
    if not preview_entry:
        print("No preview clip available from /artifacts preview metadata.")
        return
    local_path = preview_entry.get("local_path")
    if local_path and Path(local_path).exists():
        display(HTML("<strong>Preview clip</strong>"))
        display(Video(filename=local_path, embed=True, width=480, height=270))
        return
    download_url = preview_entry.get("download_url")
    if download_url:
        print(f"Preview available remotely (download_url={download_url}).")
    else:
        print("Preview metadata present but no local path or download URL; see artifact_uri for details.")


final_entry = fetch_video_final_entry(RUN_ID)
preview_entry = getattr(fetch_video_final_entry, "_latest_preview", None)
display(render_qa_badge(final_entry))
render_preview_video(preview_entry)
video_path = ensure_local_video(final_entry, RUN_ID)

info_html = f"""
<p><strong>Artifact URI:</strong> {final_entry.get('artifact_uri', 'n/a')}</p>
<p><strong>Local path:</strong> {video_path}</p>
"""
display(HTML(info_html))
display(Video(filename=str(video_path), embed=True, width=640, height=360))

if colab_files is not None:
    colab_files.download(str(video_path))
else:
    print("Download helper available only inside Google Colab. Share the path above manually if running elsewhere.")

## Artifacts viewer
Use this helper to inspect `/artifacts` responses without leaving the notebook. It shares the `control_panel` run metadata when available, lets you scope by stage (e.g., `qa_publish`), and can poll automatically so new artifacts appear as production advances.

In [None]:
# Cell 4c: Artifacts viewer helper
import asyncio
import json
import os
from typing import Any, Dict, Optional

import httpx
import ipywidgets as widgets
from IPython.display import display

PRODUCTION_AGENT_BASE = os.environ.get("PRODUCTION_AGENT_BASE", "http://127.0.0.1:8200")
ARTIFACTS_ENDPOINT = f"{PRODUCTION_AGENT_BASE}/artifacts"

if "artifact_viewer_state" in globals():
    existing_task = globals()["artifact_viewer_state"].get("task")
    if existing_task and not existing_task.done():
        existing_task.cancel()

artifact_viewer_state = {"task": None}


def _artifact_viewer_run_id() -> str:
    if "control_panel" in globals():
        cp = globals()["control_panel"]
        run_widget = getattr(cp, "run_id_input", None)
        candidate = getattr(run_widget, "value", "")
        if candidate and candidate.strip():
            return candidate.strip()
        state = getattr(cp, "state", None)
        if state and getattr(state, "last_run_request_id", None):
            return state.last_run_request_id
    return os.environ.get("RUN_ID", "").strip()


def _format_status_html(message: str, *, ok: bool) -> str:
    color = "#3c763d" if ok else "#d9534f"
    return f"<span style='color:{color}; font-size:0.9em;'>{message}</span>"


def _iter_artifacts(payload: Dict[str, Any]):
    stages = payload.get("stages") or []
    for stage in stages:
        for artifact in stage.get("artifacts") or []:
            yield artifact
    for artifact in payload.get("artifacts") or []:
        yield artifact


def _locate_video_final(payload: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    for artifact in _iter_artifacts(payload):
        if artifact.get("artifact_type") == "video_final":
            return artifact
    return None


def _render_artifacts(payload: Dict[str, Any]) -> None:
    artifacts = list(_iter_artifacts(payload))
    video_final = _locate_video_final(payload)
    with artifacts_output:
        artifacts_output.clear_output()
        print(f"Artifacts returned: {len(artifacts)}")
        if video_final:
            print("\nvideo_final summary:")
            print(json.dumps(video_final, indent=2, ensure_ascii=False))
        print("\nFull payload:\n")
        print(json.dumps(payload, indent=2, ensure_ascii=False))


def _fetch_artifacts_sync(run_id: str, stage: str) -> Dict[str, Any]:
    params = {"run_id": run_id}
    if stage:
        params["stage"] = stage
    with httpx.Client(timeout=30.0) as client:
        resp = client.get(ARTIFACTS_ENDPOINT, params=params)
        resp.raise_for_status()
        data = resp.json()
        if not isinstance(data, dict):
            raise RuntimeError("Unexpected artifacts response payload")
        return data


async def _fetch_artifacts_async(client: httpx.AsyncClient, run_id: str, stage: str) -> Dict[str, Any]:
    params = {"run_id": run_id}
    if stage:
        params["stage"] = stage
    resp = await client.get(ARTIFACTS_ENDPOINT, params=params)
    resp.raise_for_status()
    data = resp.json()
    if not isinstance(data, dict):
        raise RuntimeError("Unexpected artifacts response payload")
    return data


def _stop_artifact_poll(*, from_toggle: bool = False) -> None:
    task = artifact_viewer_state.get("task")
    if task and not task.done():
        task.cancel()
    artifact_viewer_state["task"] = None
    if not from_toggle:
        auto_refresh_toggle.value = False


def _handle_manual_refresh(_: Any) -> None:
    run_id = run_id_input.value.strip() or _artifact_viewer_run_id()
    if not run_id:
        status_label.value = _format_status_html("Set a Run ID to fetch artifacts.", ok=False)
        with artifacts_output:
            artifacts_output.clear_output()
            print("Provide a Run ID before refreshing artifacts.")
        return
    try:
        payload = _fetch_artifacts_sync(run_id, stage_input.value.strip())
    except httpx.HTTPError as exc:
        status_label.value = _format_status_html(f"Fetch failed: {exc}", ok=False)
        return
    except Exception as exc:
        status_label.value = _format_status_html(f"Unexpected error: {exc}", ok=False)
        return
    _render_artifacts(payload)
    status_label.value = _format_status_html("Artifacts refreshed.", ok=True)


def _handle_auto_toggle(change: Dict[str, Any]) -> None:
    if change.get("new"):
        _start_artifact_poll()
    else:
        _stop_artifact_poll(from_toggle=True)


def _start_artifact_poll() -> None:
    run_id = run_id_input.value.strip() or _artifact_viewer_run_id()
    if not run_id:
        status_label.value = _format_status_html("Set a Run ID before enabling auto-refresh.", ok=False)
        auto_refresh_toggle.value = False
        return
    loop = asyncio.get_event_loop()

    async def _poll() -> None:
        try:
            async with httpx.AsyncClient(timeout=30.0) as client:
                while auto_refresh_toggle.value:
                    stage = stage_input.value.strip()
                    active_run_id = run_id_input.value.strip() or _artifact_viewer_run_id()
                    if not active_run_id:
                        status_label.value = _format_status_html("Run ID cleared; stopping auto-refresh.", ok=False)
                        _stop_artifact_poll()
                        return
                    try:
                        payload = await _fetch_artifacts_async(client, active_run_id, stage)
                    except httpx.HTTPError as exc:
                        status_label.value = _format_status_html(f"Auto-refresh failed: {exc}", ok=False)
                        _stop_artifact_poll()
                        return
                    except Exception as exc:
                        status_label.value = _format_status_html(f"Error: {exc}", ok=False)
                        _stop_artifact_poll()
                        return
                    _render_artifacts(payload)
                    status_label.value = _format_status_html("Auto-refresh OK.", ok=True)
                    interval = max(2.0, float(interval_input.value or 4.0))
                    await asyncio.sleep(interval)
        except asyncio.CancelledError:
            status_label.value = _format_status_html("Auto-refresh stopped.", ok=True)

    _stop_artifact_poll(from_toggle=True)
    artifact_viewer_state["task"] = loop.create_task(_poll())


run_id_input = widgets.Text(
    value=_artifact_viewer_run_id(),
    description="Run ID",
    placeholder="production run id",
    layout=widgets.Layout(width="50%"),
)
stage_input = widgets.Text(
    description="Stage",
    placeholder="Optional stage (e.g., qa_publish)",
    layout=widgets.Layout(width="45%"),
)
refresh_button = widgets.Button(description="Refresh", icon="refresh")
auto_refresh_toggle = widgets.ToggleButton(description="Auto-refresh", icon="repeat", value=False)
interval_input = widgets.BoundedFloatText(value=4.0, min=2.0, max=60.0, step=1.0, description="Interval (s)")
status_label = widgets.HTML(value=_format_status_html("Idle", ok=True))
artifacts_output = widgets.Output(layout=widgets.Layout(border="1px solid #ddd", min_height="160px", max_height="360px", overflow="auto"))

refresh_button.on_click(_handle_manual_refresh)
auto_refresh_toggle.observe(_handle_auto_toggle, names="value")

controls_row_1 = widgets.HBox([run_id_input, stage_input])
controls_row_2 = widgets.HBox([refresh_button, auto_refresh_toggle, interval_input, status_label])
artifacts_viewer_panel = widgets.VBox([controls_row_1, controls_row_2, artifacts_output])

display(artifacts_viewer_panel)


## Optional: run a stub orchestration smoke test
This cell runs the Python runner in simulation mode (fallback adapters) so you can confirm Drive folders are writable before enabling real models.

In [None]:
# Cell 5: Optional orchestrator smoke run (uses fallback adapters)
import importlib.util
from sparkle_motion.orchestrator import Runner
in_colab = importlib.util.find_spec("google.colab") is not None
if in_colab:
    runs_root = Path(MOUNT_POINT) / "MyDrive" / WORKSPACE_NAME / "runs"
else:
    runs_root = REPO_ROOT / "runs"
runs_root.mkdir(parents=True, exist_ok=True)
movie_plan = {
    "title": "Colab Smoke",
    "shots": [
        {
            "id": "shot_001",
            "visual_description": "Test scene",
            "duration_sec": 2.0,
            "dialogue": [{"character": "narrator", "text": "Hello from Colab"}],
        }
    ],
}
runner = Runner(runs_root=str(runs_root))
asset_refs = runner.run(movie_plan=movie_plan, run_id="colab_smoke", resume=True)
print("Smoke run complete. Final asset refs keys:", asset_refs.keys())
print("Runs directory:", runs_root)