From da34a3d93ac84d33faa5346003ab42dc7353b453 Mon Sep 17 00:00:00 2001 From: William Yue Date: Wed, 6 May 2026 15:29:52 -0700 Subject: [PATCH 1/7] feat(scripts): add annotate_mistakes.py for per-frame mistake labels --- src/opentau/scripts/annotate_mistakes.py | 573 +++++++++++++++++++++++ 1 file changed, 573 insertions(+) create mode 100644 src/opentau/scripts/annotate_mistakes.py diff --git a/src/opentau/scripts/annotate_mistakes.py b/src/opentau/scripts/annotate_mistakes.py new file mode 100644 index 00000000..365397dd --- /dev/null +++ b/src/opentau/scripts/annotate_mistakes.py @@ -0,0 +1,573 @@ +#!/usr/bin/env python +# +# Copyright 2026 Tensor Auto Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Annotate each episode in a dataset mixture with a per-frame ``mistake`` column. + +The subtask labels are read directly from the per-frame ``response`` column +already present in each episode parquet (written by ``annotate_subtasks.py``). +Each contiguous run of identical ``response`` values is treated as a single +subtask segment. For every segment: + +1. The last frame of the segment is extracted from the dataset's ``camera0`` + video (resolved with the same lookup chain as ``annotate_subtasks.py``: + inline ``data_features_name_mapping``, then + ``DATA_FEATURES_NAME_MAPPING``, then the first ``dtype=='video'`` feature). +2. The frame is downsampled to ``--target-size`` × ``--target-size`` + (default 448) only when its shorter side already exceeds the target, + then JPEG-encoded. +3. The frame and the segment's subtask string are sent to the configured + VLM, which is asked to return ``{"success": bool, "reason": str}``. +4. If the VLM reports failure, every parquet row in the segment is set to + ``mistake=1``; on success (or any parse / API failure) the rows are + left at ``mistake=0``. + +Episodes whose parquet already contains a ``mistake`` column are skipped +(making the script fully resumable). Episodes whose parquet has no +``response`` column are skipped with a warning. The ``mistake`` feature is +registered in ``meta/info.json`` as ``{"dtype": "int64", "shape": (1,), +"names": None}`` the first time it is added to a dataset. + +Defaults to Google ``gemini-robotics-er-1.6-preview`` (the same Gemini ER +model used in ``annotate_subtasks.py``); Anthropic Claude is also supported +via ``--model``. + +The config file is read with plain JSON; it accepts both the full training +config format (``{"dataset_mixture": {"datasets": [...]}}``) and the simpler +``add_subtask_response`` format (``{"datasets": [...]}``). + +Example:: + + python src/opentau/scripts/annotate_mistakes.py \\ + --config-path configs/examples/train_mixture_config.json + + # Dry run: 1 episode per dataset + python src/opentau/scripts/annotate_mistakes.py \\ + --config-path configs/examples/train_mixture_config.json \\ + --max-episodes-per-dataset 1 +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import re +import sys +from pathlib import Path + +import anthropic +import av +import pyarrow as pa +import pyarrow.parquet as pq +from google import genai +from google.genai import types as genai_types +from PIL import Image +from tqdm import tqdm + +from opentau.datasets.utils import DEFAULT_CHUNK_SIZE, load_episodes, load_info, write_info +from opentau.scripts.add_subtask_response import _get_parquet_path +from opentau.scripts.annotate_subtasks import ( + _is_gemini_model, + _load_datasets_from_config, + _resize_and_center_crop, + _resolve_camera0_video_key, + _resolve_root, + _to_b64_jpeg, + _to_jpeg_bytes, +) + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Prompts +# --------------------------------------------------------------------------- + +_SYSTEM_PROMPT = ( + "You are a robot manipulation expert. " + "You will be shown the final frame of a robot attempting a single subtask. " + "Decide whether the robot completed the subtask successfully at that frame. " + "Return ONLY a JSON object — no markdown fences, no prose." +) + +_USER_TEMPLATE = """\ +Task: {task} +Subtask: {subtask} + +The image is the final frame of the robot's attempt at the subtask above. \ +Was the subtask completed successfully? + +Return ONLY a valid JSON object with this exact shape: +{{"success": true, "reason": "..."}} +or +{{"success": false, "reason": "..."}}""" + + +# --------------------------------------------------------------------------- +# Response parsing +# --------------------------------------------------------------------------- + + +def _parse_success_response(text: str) -> bool: + """Parse a ``{"success": bool, ...}`` JSON object from the model response. + + Tolerates ``` ... ``` fences. Raises ``ValueError`` / ``json.JSONDecodeError`` + on any malformed payload — the caller treats those as a mistake=0 default. + """ + text = text.strip() + text = re.sub(r"^```(?:json)?\s*", "", text) + text = re.sub(r"\s*```$", "", text) + parsed = json.loads(text.strip()) + if not isinstance(parsed, dict) or "success" not in parsed: + raise ValueError(f"Expected JSON object with 'success' key, got: {parsed!r}") + return bool(parsed["success"]) + + +# --------------------------------------------------------------------------- +# VLM calls (single image) +# --------------------------------------------------------------------------- + + +def _call_claude_single( + client: anthropic.Anthropic, + model: str, + task: str, + subtask: str, + frame: Image.Image, +) -> str: + response = client.messages.create( + model=model, + max_tokens=256, + system=_SYSTEM_PROMPT, + messages=[ + { + "role": "user", + "content": [ + { + "type": "image", + "source": { + "type": "base64", + "media_type": "image/jpeg", + "data": _to_b64_jpeg(frame), + }, + }, + {"type": "text", "text": _USER_TEMPLATE.format(task=task, subtask=subtask)}, + ], + } + ], + ) + text_blocks = [b for b in response.content if getattr(b, "type", None) == "text"] + if not text_blocks: + raise ValueError(f"Claude response had no text blocks (stop_reason={response.stop_reason}).") + return text_blocks[0].text + + +def _call_gemini_single( + client: genai.Client, + model: str, + task: str, + subtask: str, + frame: Image.Image, +) -> str: + response = client.models.generate_content( + model=model, + contents=[ + genai_types.Part.from_bytes(data=_to_jpeg_bytes(frame), mime_type="image/jpeg"), + _USER_TEMPLATE.format(task=task, subtask=subtask), + ], + config=genai_types.GenerateContentConfig( + system_instruction=_SYSTEM_PROMPT, + max_output_tokens=256, + response_mime_type="application/json", + ), + ) + raw_text = (response.text or "").strip() + if not raw_text: + finish_reason = response.candidates[0].finish_reason if response.candidates else None + raise ValueError(f"Gemini response had no text (finish_reason={finish_reason}).") + return raw_text + + +def _query_subtask_success( + client: anthropic.Anthropic | genai.Client, + model: str, + task: str, + subtask: str, + frame: Image.Image, +) -> bool: + """Ask the VLM whether the subtask was completed; default to ``True`` (mistake=0) on any failure.""" + try: + if _is_gemini_model(model): + assert isinstance(client, genai.Client) + raw = _call_gemini_single(client, model, task, subtask, frame) + else: + assert isinstance(client, anthropic.Anthropic) + raw = _call_claude_single(client, model, task, subtask, frame) + return _parse_success_response(raw) + except Exception as exc: + logger.warning( + "VLM query failed for subtask %r (%s); defaulting to success (mistake=0).", + subtask, + exc, + ) + return True + + +# --------------------------------------------------------------------------- +# Subtask runs from the parquet response column +# --------------------------------------------------------------------------- + + +def _find_response_runs(responses: list[str | None]) -> list[tuple[int, int, str]]: + """Return ``(start_idx, last_idx, subtask_string)`` for every contiguous non-empty run.""" + runs: list[tuple[int, int, str]] = [] + if not responses: + return runs + start = 0 + for i in range(1, len(responses)): + if responses[i] != responses[start]: + if responses[start]: + runs.append((start, i - 1, str(responses[start]))) + start = i + if responses[start]: + runs.append((start, len(responses) - 1, str(responses[start]))) + return runs + + +# --------------------------------------------------------------------------- +# Video frame extraction +# --------------------------------------------------------------------------- + + +def _extract_frames_at_indices( + video_path: Path, + indices: list[int], + target_size: int, +) -> dict[int, Image.Image]: + """Decode the video once, returning ``{index: PIL.Image}`` for the requested indices. + + Indices are 0-based decode order, matching parquet row order for LeRobot v2.1 + datasets. Frames whose shorter side exceeds ``target_size`` are downsampled + and center-cropped; smaller frames pass through unchanged (no upsampling). + """ + if not indices: + return {} + wanted = set(indices) + out: dict[int, Image.Image] = {} + with av.open(str(video_path)) as container: + for i, frame in enumerate(container.decode(video=0)): + if i in wanted: + out[i] = _resize_and_center_crop(frame.to_image(), target_size) + if len(out) == len(wanted): + break + return out + + +# --------------------------------------------------------------------------- +# Per-episode annotation +# --------------------------------------------------------------------------- + + +def _annotate_episode( + client: anthropic.Anthropic | genai.Client, + model: str, + root: Path, + info: dict, + video_key: str, + ep_index: int, + ep_info: dict, + target_size: int, +) -> bool: + """Annotate one episode. Returns ``True`` if the parquet was rewritten.""" + data_tmpl: str = info["data_path"] + chunks_size: int = info.get("chunks_size", DEFAULT_CHUNK_SIZE) + parquet_path = _get_parquet_path(root, data_tmpl, ep_index, chunks_size) + + if not parquet_path.is_file(): + logger.warning("Episode %d: parquet not found at %s; skipping.", ep_index, parquet_path) + return False + + schema_names = pq.read_metadata(parquet_path).schema.names + if "mistake" in schema_names: + logger.debug("Episode %d: 'mistake' column already present, skipping.", ep_index) + return False + if "response" not in schema_names: + logger.warning( + "Episode %d: parquet has no 'response' column (run annotate_subtasks.py first); skipping.", + ep_index, + ) + return False + + table = pq.read_table(parquet_path) + responses = table.column("response").to_pylist() + runs = _find_response_runs(responses) + if not runs: + logger.warning( + "Episode %d: 'response' column has no non-empty subtask labels; skipping.", + ep_index, + ) + return False + + video_tmpl: str = info["video_path"] + ep_chunk = ep_index // chunks_size + video_path = root / video_tmpl.format(episode_chunk=ep_chunk, video_key=video_key, episode_index=ep_index) + if not video_path.is_file(): + logger.warning("Episode %d: video file not found at %s; skipping.", ep_index, video_path) + return False + + last_indices = [last_idx for _, last_idx, _ in runs] + frames = _extract_frames_at_indices(video_path, last_indices, target_size) + missing = [i for i in last_indices if i not in frames] + if missing: + logger.warning( + "Episode %d: could not extract frames at indices %s; those segments default to mistake=0.", + ep_index, + missing, + ) + + tasks = ep_info.get("tasks", []) + task_description = tasks[0] if tasks else "robot manipulation task" + + mistakes = [0] * len(responses) + n_failures = 0 + for start_idx, last_idx, subtask in runs: + frame = frames.get(last_idx) + if frame is None: + continue + success = _query_subtask_success(client, model, task_description, subtask, frame) + if not success: + n_failures += 1 + for j in range(start_idx, last_idx + 1): + mistakes[j] = 1 + + mistake_col = pa.array(mistakes, type=pa.int64()) + table = table.append_column("mistake", mistake_col) + + tmp = parquet_path.with_suffix(".parquet.tmp") + try: + pq.write_table(table, tmp) + os.replace(tmp, parquet_path) + except Exception: + if tmp.exists(): + tmp.unlink() + raise + + logger.debug( + "Episode %d: %d/%d subtask segments flagged as failures (%d/%d rows mistake=1).", + ep_index, + n_failures, + len(runs), + sum(mistakes), + len(mistakes), + ) + return True + + +# --------------------------------------------------------------------------- +# Per-dataset processing +# --------------------------------------------------------------------------- + + +def _process_dataset( + client: anthropic.Anthropic | genai.Client, + root: Path, + ds_cfg: dict, + model: str, + target_size: int, + max_episodes: int | None, +) -> None: + root = root.resolve() + if not root.is_dir(): + logger.warning("Dataset root does not exist: %s — skipping.", root) + return + + info = load_info(root) + codebase_version = info.get("codebase_version") + if codebase_version != "v2.1": + logger.warning( + "Dataset %s has codebase_version=%r; this script has only been tested against v2.1. " + "For Hub datasets upgraded past v2.1, pin to the v2.1 tag via the 'revision' field.", + root.name, + codebase_version, + ) + episodes = load_episodes(root) + if not episodes: + logger.warning("No episodes in %s — skipping.", root) + return + + video_key = _resolve_camera0_video_key(info, ds_cfg) + if video_key is None: + logger.warning("Dataset %s: no usable camera0 video feature; skipping.", root.name) + return + + if "mistake" not in info.get("features", {}): + info.setdefault("features", {})["mistake"] = { + "dtype": "int64", + "shape": (1,), + "names": None, + } + write_info(info, root) + logger.info("Added 'mistake' feature to %s/meta/info.json", root.name) + + ep_indices = sorted(episodes.keys()) + if max_episodes is not None: + ep_indices = ep_indices[:max_episodes] + + n_annotated = 0 + n_skipped = 0 + for ep_index in tqdm(ep_indices, desc=root.name, unit="ep"): + ep_info = episodes[ep_index] + try: + processed = _annotate_episode( + client=client, + model=model, + root=root, + info=info, + video_key=video_key, + ep_index=ep_index, + ep_info=ep_info, + target_size=target_size, + ) + except Exception: + logger.exception("Episode %d: annotation failed.", ep_index) + n_skipped += 1 + continue + + if processed: + n_annotated += 1 + else: + n_skipped += 1 + + logger.info("%s: annotated %d episodes, skipped %d.", root.name, n_annotated, n_skipped) + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: + p = argparse.ArgumentParser( + description=( + "Annotate every episode in a dataset mixture with a per-frame 'mistake' " + "column using a VLM. Subtask boundaries are read from the per-frame " + "'response' column written by annotate_subtasks.py; the last frame of " + "each contiguous run is sent to the VLM to judge subtask success." + ), + epilog=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.add_argument( + "--config-path", + required=True, + type=Path, + help="Path to the dataset mixture config JSON.", + ) + p.add_argument( + "--target-size", + type=int, + default=448, + help="Downsample each frame whose shorter side exceeds this many pixels so its " + "shorter side equals target_size, then center-crop to a target_size × target_size " + "square before encoding as JPEG. Frames already at or below target_size pass " + "through unchanged — this only downsamples, it never upsamples (default: 448).", + ) + p.add_argument( + "--model", + default="gemini-robotics-er-1.6-preview", + help=( + "Model ID to use. Defaults to 'gemini-robotics-er-1.6-preview' (Google), " + "matching annotate_subtasks.py's Gemini support. Anthropic models " + "(e.g. 'claude-opus-4-7') route through ANTHROPIC_API_KEY; model IDs " + "starting with 'gemini' or 'robotics-er' route through GEMINI_API_KEY " + "via google-genai." + ), + ) + p.add_argument( + "--max-episodes-per-dataset", + type=int, + default=None, + help="Process at most this many episodes per dataset (useful for dry runs).", + ) + p.add_argument( + "--max-api-retries", + type=int, + default=8, + help=( + "Number of automatic retries for the Anthropic SDK on 429/5xx responses; " + "the SDK applies exponential backoff between attempts (default: 8). " + "Ignored when using a Gemini model — google-genai's retry policy is " + "configured via its own client options." + ), + ) + p.add_argument( + "--hub-cache-dir", + type=Path, + default=Path.home() / ".cache" / "huggingface" / "opentau_subtasks", + help=( + "Directory for caching Hub dataset downloads (default: ~/.cache/huggingface/opentau_subtasks)." + ), + ) + return p.parse_args(argv) + + +def main(argv: list[str] | None = None) -> None: + logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s") + + args = _parse_args(argv) + + client: anthropic.Anthropic | genai.Client + if _is_gemini_model(args.model): + api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY") + if not api_key: + logger.error("GEMINI_API_KEY (or GOOGLE_API_KEY) is not set in the environment.") + sys.exit(1) + client = genai.Client(api_key=api_key) + else: + api_key = os.environ.get("ANTHROPIC_API_KEY") + if not api_key: + logger.error("ANTHROPIC_API_KEY is not set in the environment.") + sys.exit(1) + client = anthropic.Anthropic(api_key=api_key, max_retries=args.max_api_retries) + + datasets = _load_datasets_from_config(args.config_path) + if not datasets: + logger.error("No datasets found in %s.", args.config_path) + sys.exit(1) + + for ds_cfg in datasets: + label = ds_cfg.get("repo_id") or ds_cfg.get("root", "") + try: + root = _resolve_root(ds_cfg, args.hub_cache_dir) + except Exception: + logger.exception("Could not resolve root for dataset '%s'; skipping.", label) + continue + + logger.info("Processing dataset: %s at %s", label, root) + _process_dataset( + client=client, + root=root, + ds_cfg=ds_cfg, + model=args.model, + target_size=args.target_size, + max_episodes=args.max_episodes_per_dataset, + ) + + logger.info("Done.") + + +if __name__ == "__main__": + main() From db60953549f07f0d139526857ce92cb216ac5029 Mon Sep 17 00:00:00 2001 From: William Yue Date: Wed, 6 May 2026 16:00:38 -0700 Subject: [PATCH 2/7] docs(datasets): document annotate_mistakes.py --- docs/source/tutorials/datasets.rst | 122 +++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/docs/source/tutorials/datasets.rst b/docs/source/tutorials/datasets.rst index 921e5092..e690e671 100644 --- a/docs/source/tutorials/datasets.rst +++ b/docs/source/tutorials/datasets.rst @@ -261,6 +261,128 @@ At 1 fps sampling with frames resized to 640 px wide (≈ 410 image tokens each) Costs scale linearly with episode count × episode duration. Use ``--sample-fps 0.5`` or lower to halve/quarter costs on longer episodes. +Automatically annotating mistakes with a VLM +--------------------------------------------- + +``annotate_mistakes.py`` adds a per-frame ``mistake`` column (``int64`` ∈ ``{0, 1}``) to +every episode parquet in a dataset mixture, by asking a VLM whether each subtask was +completed successfully. It runs **after** ``annotate_subtasks.py`` and reuses the same +mixture config format. + +How it works +^^^^^^^^^^^^ + +For each episode the script: + +1. Reads the per-frame ``response`` column from the episode parquet (written by + ``annotate_subtasks.py``). Every contiguous run of identical ``response`` values is + treated as one subtask segment. +2. Decodes the ``camera0`` video once (resolved with the same lookup chain as + ``annotate_subtasks.py``: inline ``data_features_name_mapping``, then + ``DATA_FEATURES_NAME_MAPPING``, then the first ``dtype=='video'`` feature) and + pulls the **last frame of each contiguous run** — no temporal subsampling, just one + frame per segment. Frames whose shorter side exceeds ``--target-size`` (default 448) + are downsampled and center-cropped before JPEG encoding; smaller frames pass through + unchanged. +3. Sends that single frame plus the segment's subtask string to the configured VLM + (default: ``gemini-robotics-er-1.6-preview``; Anthropic Claude is supported via + ``--model``) and asks for a ``{"success": bool, "reason": str}`` JSON verdict. +4. Sets every parquet row in the segment to ``mistake=1`` if the VLM reports failure, + ``0`` otherwise. Any parse / API failure defaults to ``0`` (no mistake). +5. Atomically rewrites the episode parquet with the new ``mistake`` column and registers + it in ``meta/info.json`` features the first time it is added to a dataset. + +Episodes whose parquet already contains a ``mistake`` column are skipped (cheap O(1) +schema check), making the script **fully resumable**. Episodes whose parquet has no +``response`` column are skipped with a warning — run ``annotate_subtasks.py`` first. + +Prerequisites +^^^^^^^^^^^^^ + +Set the API key for the provider you intend to use: + +.. code-block:: bash + + # Gemini (default) + export GEMINI_API_KEY="..." # or GOOGLE_API_KEY + + # Anthropic (when using --model claude-*) + export ANTHROPIC_API_KEY="sk-ant-..." + +The dataset must already have been processed by ``annotate_subtasks.py`` so that each +episode parquet has a non-empty ``response`` column. + +Running the script +^^^^^^^^^^^^^^^^^^ + +Reuse the same dataset mixture config you passed to ``annotate_subtasks.py``: + +.. code-block:: bash + + python src/opentau/scripts/annotate_mistakes.py \ + --config-path configs/examples/train_mixture_config.json + +For a dry run that processes only 1 episode per dataset: + +.. code-block:: bash + + python src/opentau/scripts/annotate_mistakes.py \ + --config-path configs/examples/train_mixture_config.json \ + --max-episodes-per-dataset 1 + +To annotate with Claude instead of Gemini: + +.. code-block:: bash + + ANTHROPIC_API_KEY=... python src/opentau/scripts/annotate_mistakes.py \ + --config-path configs/examples/train_mixture_config.json \ + --model claude-opus-4-7 + +Full list of flags: + +.. list-table:: + :header-rows: 1 + :widths: 30 15 55 + + * - Flag + - Default + - Description + * - ``--config-path`` + - *(required)* + - Path to dataset mixture config JSON. + * - ``--target-size`` + - ``448`` + - Downsample frames whose shorter side exceeds this many pixels (then center-crop + to a square). Frames at or below this size pass through unchanged — never upsamples. + * - ``--model`` + - ``gemini-robotics-er-1.6-preview`` + - Model ID to use. IDs starting with ``gemini`` or ``robotics-er`` go through + ``GEMINI_API_KEY`` (or ``GOOGLE_API_KEY``) via ``google-genai``; Anthropic IDs + (e.g. ``claude-opus-4-7``) go through ``ANTHROPIC_API_KEY``. + * - ``--max-episodes-per-dataset`` + - *(none)* + - Cap the number of episodes processed per dataset — useful for dry runs. + * - ``--max-api-retries`` + - ``8`` + - Anthropic SDK retry count for 429/5xx responses (ignored for Gemini). + * - ``--hub-cache-dir`` + - ``~/.cache/huggingface/opentau_subtasks`` + - Directory for caching Hub dataset downloads. + +Output +^^^^^^ + +For each processed episode the script: + +- Adds a ``mistake`` column to the episode parquet, where every frame row contains + ``0`` (subtask completed successfully, per the VLM) or ``1`` (subtask flagged as a + failure). All frames within the same contiguous ``response`` run share the same value. +- Adds a ``mistake`` feature entry to ``meta/info.json`` + (``{"dtype": "int64", "shape": (1,), "names": None}``). + +To force regeneration of the mistake labels, drop the ``mistake`` column from the +relevant episode parquets (or delete the cached dataset) before rerunning. + Adding subtask responses to a dataset -------------------------------------- From e12aa55ac0d13bf41309900dda29b166a702d3fe Mon Sep 17 00:00:00 2001 From: William Yue Date: Wed, 6 May 2026 16:15:19 -0700 Subject: [PATCH 3/7] docs(configs): add annotate_mistakes example mixture --- configs/examples/annotate_mistakes_example.json | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 configs/examples/annotate_mistakes_example.json diff --git a/configs/examples/annotate_mistakes_example.json b/configs/examples/annotate_mistakes_example.json new file mode 100644 index 00000000..58139b9e --- /dev/null +++ b/configs/examples/annotate_mistakes_example.json @@ -0,0 +1,12 @@ +{ + "dataset_mixture": { + "datasets": [ + { + "repo_id": "RoboCOIN/leju_robot_moving_parts_a", + "data_features_name_mapping": { + "camera0": "observation.images.camera_head_rgb" + } + } + ] + } +} From a278704aaad893819f1270c2b16dc88369b1469c Mon Sep 17 00:00:00 2001 From: William Yue Date: Wed, 6 May 2026 16:29:05 -0700 Subject: [PATCH 4/7] fix(annotate_mistakes): disable Gemini ER thinking to free output budget --- src/opentau/scripts/annotate_mistakes.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/opentau/scripts/annotate_mistakes.py b/src/opentau/scripts/annotate_mistakes.py index 365397dd..210ebe16 100644 --- a/src/opentau/scripts/annotate_mistakes.py +++ b/src/opentau/scripts/annotate_mistakes.py @@ -182,6 +182,11 @@ def _call_gemini_single( subtask: str, frame: Image.Image, ) -> str: + # Gemini Robotics-ER is a thinking model: with default settings it spends + # most of `max_output_tokens` on internal reasoning and truncates before + # emitting any JSON. For a simple per-frame success/failure judgment we + # disable thinking entirely (`thinking_budget=0`) and ask only for the + # final JSON. response = client.models.generate_content( model=model, contents=[ @@ -190,8 +195,9 @@ def _call_gemini_single( ], config=genai_types.GenerateContentConfig( system_instruction=_SYSTEM_PROMPT, - max_output_tokens=256, + max_output_tokens=512, response_mime_type="application/json", + thinking_config=genai_types.ThinkingConfig(thinking_budget=0), ), ) raw_text = (response.text or "").strip() From c7eeaf928d797c14592bd69f7477f397640bf20b Mon Sep 17 00:00:00 2001 From: William Yue Date: Wed, 6 May 2026 16:58:07 -0700 Subject: [PATCH 5/7] feat(annotate_mistakes): remove output token caps --- src/opentau/scripts/annotate_mistakes.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/opentau/scripts/annotate_mistakes.py b/src/opentau/scripts/annotate_mistakes.py index 210ebe16..002b0cc2 100644 --- a/src/opentau/scripts/annotate_mistakes.py +++ b/src/opentau/scripts/annotate_mistakes.py @@ -148,9 +148,12 @@ def _call_claude_single( subtask: str, frame: Image.Image, ) -> str: + # max_tokens is required by the Anthropic Messages API; set to the + # claude-opus-4-7 per-request output ceiling so the response is never + # truncated for budget reasons. response = client.messages.create( model=model, - max_tokens=256, + max_tokens=32000, system=_SYSTEM_PROMPT, messages=[ { @@ -183,10 +186,10 @@ def _call_gemini_single( frame: Image.Image, ) -> str: # Gemini Robotics-ER is a thinking model: with default settings it spends - # most of `max_output_tokens` on internal reasoning and truncates before - # emitting any JSON. For a simple per-frame success/failure judgment we - # disable thinking entirely (`thinking_budget=0`) and ask only for the - # final JSON. + # output tokens on internal reasoning and may truncate the JSON. We + # disable thinking entirely (`thinking_budget=0`) for these short + # success/failure judgments and leave the output budget unset so the + # model uses its own default ceiling. response = client.models.generate_content( model=model, contents=[ @@ -195,7 +198,6 @@ def _call_gemini_single( ], config=genai_types.GenerateContentConfig( system_instruction=_SYSTEM_PROMPT, - max_output_tokens=512, response_mime_type="application/json", thinking_config=genai_types.ThinkingConfig(thinking_budget=0), ), From 67ea92c7743be6bdccf60687824cf6b90ac5e20b Mon Sep 17 00:00:00 2001 From: William Yue Date: Wed, 6 May 2026 17:03:27 -0700 Subject: [PATCH 6/7] feat(annotate_mistakes): re-enable Gemini ER thinking --- src/opentau/scripts/annotate_mistakes.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/opentau/scripts/annotate_mistakes.py b/src/opentau/scripts/annotate_mistakes.py index 002b0cc2..e3bb3a38 100644 --- a/src/opentau/scripts/annotate_mistakes.py +++ b/src/opentau/scripts/annotate_mistakes.py @@ -185,11 +185,6 @@ def _call_gemini_single( subtask: str, frame: Image.Image, ) -> str: - # Gemini Robotics-ER is a thinking model: with default settings it spends - # output tokens on internal reasoning and may truncate the JSON. We - # disable thinking entirely (`thinking_budget=0`) for these short - # success/failure judgments and leave the output budget unset so the - # model uses its own default ceiling. response = client.models.generate_content( model=model, contents=[ @@ -199,7 +194,6 @@ def _call_gemini_single( config=genai_types.GenerateContentConfig( system_instruction=_SYSTEM_PROMPT, response_mime_type="application/json", - thinking_config=genai_types.ThinkingConfig(thinking_budget=0), ), ) raw_text = (response.text or "").strip() From bcfaebb21d0d776d2832885dde19e6eb5060d6f5 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Thu, 7 May 2026 00:20:03 +0000 Subject: [PATCH 7/7] [claude-fix] address review feedback on #280 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - addresses @claude-review (max_tokens): replaced hardcoded 32000 cap on the Anthropic call with a model-agnostic 1024 cap, fitting inside the smallest claude-3.5 ceiling and updated comment accordingly. - addresses @claude-review (bool coercion): _parse_success_response now rejects non-bool 'success' values (stringy "false" / int 1) instead of silently coercing — caller treats parse errors as mistake=0, so failing closed is safer than flipping the verdict. - addresses @claude-review (gemini thinking note): added a one-line comment in _call_gemini_single explaining why the prior thinking_budget=0 workaround was retired (default ceiling fits both reasoning and one-line JSON). - addresses @claude-review (silent API failures): _query_subtask_success now returns (success, ok); _annotate_episode aggregates per-segment API failures and missing-frame counts and surfaces both in the per-dataset summary log so a quiet outage doesn't masquerade as "no mistakes found". - addresses @claude-review (frame-extraction summary): same change above also surfaces n_missing_frames in the per-dataset summary. - addresses @claude-review (write_info ordering): deferred registering the 'mistake' feature in info.json until after the first parquet is successfully rewritten, so a mid-dataset crash before any annotation doesn't leave info.json advertising a column that exists nowhere. - addresses @claude-review (unit tests): added tests/scripts/test_annotate_mistakes.py with 20 tests covering _find_response_runs (empty/single/skip-empty/skip-None/alternating) and _parse_success_response (fences, whitespace, bool/string/int/ missing-key/non-object/invalid-json edge cases). - addresses @claude-review (hub-cache-dir docs): expanded the --hub-cache-dir help text and the docs flag table to explain that the default deliberately matches annotate_subtasks.py for cache reuse. - addresses @claude-review (example config + v2.1 pin): pinned "revision": "v2.1" on the example config's repo_id, and rewrote the tutorial / docstring to point at configs/examples/annotate_mistakes_example.json instead of the unrelated train_mixture_config.json. tests: passed — pytest -m "not gpu" -n auto tests/scripts/test_annotate_mistakes.py tests/scripts/test_annotate_subtasks.py Co-Authored-By: Claude Opus 4.7 (1M context) --- .../examples/annotate_mistakes_example.json | 1 + docs/source/tutorials/datasets.rst | 15 ++- src/opentau/scripts/annotate_mistakes.py | 122 +++++++++++++----- tests/scripts/test_annotate_mistakes.py | 99 ++++++++++++++ 4 files changed, 200 insertions(+), 37 deletions(-) create mode 100644 tests/scripts/test_annotate_mistakes.py diff --git a/configs/examples/annotate_mistakes_example.json b/configs/examples/annotate_mistakes_example.json index 58139b9e..e0b581d2 100644 --- a/configs/examples/annotate_mistakes_example.json +++ b/configs/examples/annotate_mistakes_example.json @@ -3,6 +3,7 @@ "datasets": [ { "repo_id": "RoboCOIN/leju_robot_moving_parts_a", + "revision": "v2.1", "data_features_name_mapping": { "camera0": "observation.images.camera_head_rgb" } diff --git a/docs/source/tutorials/datasets.rst b/docs/source/tutorials/datasets.rst index e690e671..3b55f341 100644 --- a/docs/source/tutorials/datasets.rst +++ b/docs/source/tutorials/datasets.rst @@ -315,19 +315,22 @@ episode parquet has a non-empty ``response`` column. Running the script ^^^^^^^^^^^^^^^^^^ -Reuse the same dataset mixture config you passed to ``annotate_subtasks.py``: +Reuse the same dataset mixture config you passed to ``annotate_subtasks.py``. +A minimal one-dataset example (with the Hub revision pinned to ``v2.1``, since +this script has only been tested against v2.1 datasets) is checked in at +``configs/examples/annotate_mistakes_example.json``: .. code-block:: bash python src/opentau/scripts/annotate_mistakes.py \ - --config-path configs/examples/train_mixture_config.json + --config-path configs/examples/annotate_mistakes_example.json For a dry run that processes only 1 episode per dataset: .. code-block:: bash python src/opentau/scripts/annotate_mistakes.py \ - --config-path configs/examples/train_mixture_config.json \ + --config-path configs/examples/annotate_mistakes_example.json \ --max-episodes-per-dataset 1 To annotate with Claude instead of Gemini: @@ -335,7 +338,7 @@ To annotate with Claude instead of Gemini: .. code-block:: bash ANTHROPIC_API_KEY=... python src/opentau/scripts/annotate_mistakes.py \ - --config-path configs/examples/train_mixture_config.json \ + --config-path configs/examples/annotate_mistakes_example.json \ --model claude-opus-4-7 Full list of flags: @@ -367,7 +370,9 @@ Full list of flags: - Anthropic SDK retry count for 429/5xx responses (ignored for Gemini). * - ``--hub-cache-dir`` - ``~/.cache/huggingface/opentau_subtasks`` - - Directory for caching Hub dataset downloads. + - Directory for caching Hub dataset downloads. The default deliberately matches + ``annotate_subtasks.py`` so this script reuses datasets already downloaded by + the prior step — pass the same value here if you overrode it there. Output ^^^^^^ diff --git a/src/opentau/scripts/annotate_mistakes.py b/src/opentau/scripts/annotate_mistakes.py index e3bb3a38..1a2cc9e8 100644 --- a/src/opentau/scripts/annotate_mistakes.py +++ b/src/opentau/scripts/annotate_mistakes.py @@ -50,12 +50,16 @@ Example:: python src/opentau/scripts/annotate_mistakes.py \\ - --config-path configs/examples/train_mixture_config.json + --config-path configs/examples/annotate_mistakes_example.json # Dry run: 1 episode per dataset python src/opentau/scripts/annotate_mistakes.py \\ - --config-path configs/examples/train_mixture_config.json \\ + --config-path configs/examples/annotate_mistakes_example.json \\ --max-episodes-per-dataset 1 + +A minimal mixture config is checked in at +``configs/examples/annotate_mistakes_example.json`` (Hub ``revision`` pinned +to ``v2.1`` since this script has only been tested against v2.1 datasets). """ from __future__ import annotations @@ -125,7 +129,10 @@ def _parse_success_response(text: str) -> bool: """Parse a ``{"success": bool, ...}`` JSON object from the model response. Tolerates ``` ... ``` fences. Raises ``ValueError`` / ``json.JSONDecodeError`` - on any malformed payload — the caller treats those as a mistake=0 default. + on any malformed payload — including a non-bool ``success`` value (e.g. the + string ``"false"``, which would otherwise coerce to ``True``). The caller + treats those as a mistake=0 default, so failing closed here is safer than + flipping the verdict. """ text = text.strip() text = re.sub(r"^```(?:json)?\s*", "", text) @@ -133,7 +140,10 @@ def _parse_success_response(text: str) -> bool: parsed = json.loads(text.strip()) if not isinstance(parsed, dict) or "success" not in parsed: raise ValueError(f"Expected JSON object with 'success' key, got: {parsed!r}") - return bool(parsed["success"]) + success = parsed["success"] + if not isinstance(success, bool): + raise ValueError(f"Expected boolean 'success' value, got {type(success).__name__}: {success!r}") + return success # --------------------------------------------------------------------------- @@ -148,12 +158,14 @@ def _call_claude_single( subtask: str, frame: Image.Image, ) -> str: - # max_tokens is required by the Anthropic Messages API; set to the - # claude-opus-4-7 per-request output ceiling so the response is never - # truncated for budget reasons. + # max_tokens is required by the Anthropic Messages API; the response is a + # one-line ``{"success": bool, "reason": str}`` JSON object, so 1024 is a + # comfortable model-agnostic cap that fits inside the smallest Anthropic + # output ceiling (claude-3.5 family, 8192) without underutilizing larger + # models. response = client.messages.create( model=model, - max_tokens=32000, + max_tokens=1024, system=_SYSTEM_PROMPT, messages=[ { @@ -185,6 +197,11 @@ def _call_gemini_single( subtask: str, frame: Image.Image, ) -> str: + # Gemini ER thinking is left at the model default. The earlier + # ``thinking_budget=0`` workaround (introduced as a fix for output + # truncation) was retired once we confirmed empirically that the default + # output budget comfortably fits both internal reasoning and the + # one-line JSON verdict for the success/failure prompt. response = client.models.generate_content( model=model, contents=[ @@ -209,8 +226,15 @@ def _query_subtask_success( task: str, subtask: str, frame: Image.Image, -) -> bool: - """Ask the VLM whether the subtask was completed; default to ``True`` (mistake=0) on any failure.""" +) -> tuple[bool, bool]: + """Ask the VLM whether the subtask was completed. + + Returns ``(success, ok)`` where ``success`` is the verdict (``True`` → + ``mistake=0``, ``False`` → ``mistake=1``) and ``ok`` is ``False`` if the + API call or response parse failed. Failures default to ``success=True`` + (no mistake) but the caller is expected to count ``not ok`` separately so + a quiet API outage doesn't masquerade as "no mistakes found". + """ try: if _is_gemini_model(model): assert isinstance(client, genai.Client) @@ -218,14 +242,14 @@ def _query_subtask_success( else: assert isinstance(client, anthropic.Anthropic) raw = _call_claude_single(client, model, task, subtask, frame) - return _parse_success_response(raw) + return _parse_success_response(raw), True except Exception as exc: logger.warning( "VLM query failed for subtask %r (%s); defaulting to success (mistake=0).", subtask, exc, ) - return True + return True, False # --------------------------------------------------------------------------- @@ -292,26 +316,33 @@ def _annotate_episode( ep_index: int, ep_info: dict, target_size: int, -) -> bool: - """Annotate one episode. Returns ``True`` if the parquet was rewritten.""" +) -> tuple[bool, int, int]: + """Annotate one episode. + + Returns ``(processed, n_api_failures, n_missing_frames)``. ``processed`` is + ``True`` iff the parquet was rewritten. ``n_api_failures`` counts segments + where the VLM call/parse failed (defaulted to ``mistake=0``). + ``n_missing_frames`` counts segments whose last frame could not be decoded + from the video (also defaulted to ``mistake=0``). + """ data_tmpl: str = info["data_path"] chunks_size: int = info.get("chunks_size", DEFAULT_CHUNK_SIZE) parquet_path = _get_parquet_path(root, data_tmpl, ep_index, chunks_size) if not parquet_path.is_file(): logger.warning("Episode %d: parquet not found at %s; skipping.", ep_index, parquet_path) - return False + return False, 0, 0 schema_names = pq.read_metadata(parquet_path).schema.names if "mistake" in schema_names: logger.debug("Episode %d: 'mistake' column already present, skipping.", ep_index) - return False + return False, 0, 0 if "response" not in schema_names: logger.warning( "Episode %d: parquet has no 'response' column (run annotate_subtasks.py first); skipping.", ep_index, ) - return False + return False, 0, 0 table = pq.read_table(parquet_path) responses = table.column("response").to_pylist() @@ -321,18 +352,19 @@ def _annotate_episode( "Episode %d: 'response' column has no non-empty subtask labels; skipping.", ep_index, ) - return False + return False, 0, 0 video_tmpl: str = info["video_path"] ep_chunk = ep_index // chunks_size video_path = root / video_tmpl.format(episode_chunk=ep_chunk, video_key=video_key, episode_index=ep_index) if not video_path.is_file(): logger.warning("Episode %d: video file not found at %s; skipping.", ep_index, video_path) - return False + return False, 0, 0 last_indices = [last_idx for _, last_idx, _ in runs] frames = _extract_frames_at_indices(video_path, last_indices, target_size) missing = [i for i in last_indices if i not in frames] + n_missing_frames = len(missing) if missing: logger.warning( "Episode %d: could not extract frames at indices %s; those segments default to mistake=0.", @@ -345,11 +377,14 @@ def _annotate_episode( mistakes = [0] * len(responses) n_failures = 0 + n_api_failures = 0 for start_idx, last_idx, subtask in runs: frame = frames.get(last_idx) if frame is None: continue - success = _query_subtask_success(client, model, task_description, subtask, frame) + success, ok = _query_subtask_success(client, model, task_description, subtask, frame) + if not ok: + n_api_failures += 1 if not success: n_failures += 1 for j in range(start_idx, last_idx + 1): @@ -375,7 +410,7 @@ def _annotate_episode( sum(mistakes), len(mistakes), ) - return True + return True, n_api_failures, n_missing_frames # --------------------------------------------------------------------------- @@ -415,14 +450,7 @@ def _process_dataset( logger.warning("Dataset %s: no usable camera0 video feature; skipping.", root.name) return - if "mistake" not in info.get("features", {}): - info.setdefault("features", {})["mistake"] = { - "dtype": "int64", - "shape": (1,), - "names": None, - } - write_info(info, root) - logger.info("Added 'mistake' feature to %s/meta/info.json", root.name) + mistake_feature_registered = "mistake" in info.get("features", {}) ep_indices = sorted(episodes.keys()) if max_episodes is not None: @@ -430,10 +458,12 @@ def _process_dataset( n_annotated = 0 n_skipped = 0 + n_api_failures_total = 0 + n_missing_frames_total = 0 for ep_index in tqdm(ep_indices, desc=root.name, unit="ep"): ep_info = episodes[ep_index] try: - processed = _annotate_episode( + processed, n_api_failures, n_missing_frames = _annotate_episode( client=client, model=model, root=root, @@ -450,10 +480,33 @@ def _process_dataset( if processed: n_annotated += 1 + n_api_failures_total += n_api_failures + n_missing_frames_total += n_missing_frames + # Defer registering the 'mistake' feature in info.json until after + # the first parquet has actually been rewritten — otherwise a + # mid-dataset crash leaves info.json advertising a column that + # exists in only some parquets. + if not mistake_feature_registered: + info.setdefault("features", {})["mistake"] = { + "dtype": "int64", + "shape": (1,), + "names": None, + } + write_info(info, root) + logger.info("Added 'mistake' feature to %s/meta/info.json", root.name) + mistake_feature_registered = True else: n_skipped += 1 - logger.info("%s: annotated %d episodes, skipped %d.", root.name, n_annotated, n_skipped) + logger.info( + "%s: annotated %d episodes, skipped %d, %d VLM call failures, %d missing frames " + "(both default to mistake=0).", + root.name, + n_annotated, + n_skipped, + n_api_failures_total, + n_missing_frames_total, + ) # --------------------------------------------------------------------------- @@ -520,7 +573,12 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: type=Path, default=Path.home() / ".cache" / "huggingface" / "opentau_subtasks", help=( - "Directory for caching Hub dataset downloads (default: ~/.cache/huggingface/opentau_subtasks)." + "Directory for caching Hub dataset downloads (default: " + "~/.cache/huggingface/opentau_subtasks). The default deliberately " + "matches annotate_subtasks.py so this script reuses datasets " + "already downloaded by the prior step rather than re-downloading " + "them — pass the same value here as you used there if you " + "overrode it." ), ) return p.parse_args(argv) diff --git a/tests/scripts/test_annotate_mistakes.py b/tests/scripts/test_annotate_mistakes.py new file mode 100644 index 00000000..25890631 --- /dev/null +++ b/tests/scripts/test_annotate_mistakes.py @@ -0,0 +1,99 @@ +# Copyright 2026 Tensor Auto Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import pytest + +from opentau.scripts.annotate_mistakes import _find_response_runs, _parse_success_response + + +class TestFindResponseRuns: + def test_empty(self): + assert _find_response_runs([]) == [] + + def test_single_run(self): + assert _find_response_runs(["a", "a", "a"]) == [(0, 2, "a")] + + def test_multiple_runs(self): + assert _find_response_runs(["a", "a", "b", "b", "b", "c"]) == [ + (0, 1, "a"), + (2, 4, "b"), + (5, 5, "c"), + ] + + def test_skips_empty_string_run(self): + # Falsy responses (e.g. "") form runs but are not emitted. + assert _find_response_runs(["", "", "a", "a"]) == [(2, 3, "a")] + + def test_skips_none_run(self): + # None responses are also dropped — not emitted as a run. + assert _find_response_runs([None, None, "a"]) == [(2, 2, "a")] + + def test_alternating_with_empties(self): + assert _find_response_runs(["a", "", "a"]) == [(0, 0, "a"), (2, 2, "a")] + + def test_all_empty(self): + assert _find_response_runs(["", "", ""]) == [] + + def test_singleton(self): + assert _find_response_runs(["a"]) == [(0, 0, "a")] + + def test_singleton_empty(self): + assert _find_response_runs([""]) == [] + + +class TestParseSuccessResponse: + def test_plain_true(self): + assert _parse_success_response('{"success": true, "reason": "ok"}') is True + + def test_plain_false(self): + assert _parse_success_response('{"success": false, "reason": "nope"}') is False + + def test_strips_json_fence(self): + text = '```json\n{"success": true, "reason": "ok"}\n```' + assert _parse_success_response(text) is True + + def test_strips_bare_fence(self): + text = '```\n{"success": false, "reason": "x"}\n```' + assert _parse_success_response(text) is False + + def test_strips_surrounding_whitespace(self): + assert _parse_success_response(' \n{"success": true}\n ') is True + + def test_rejects_string_false(self): + # Stringy "false" is truthy in Python — must NOT be coerced silently. + with pytest.raises(ValueError, match="Expected boolean 'success' value"): + _parse_success_response('{"success": "false", "reason": "x"}') + + def test_rejects_string_true(self): + with pytest.raises(ValueError, match="Expected boolean 'success' value"): + _parse_success_response('{"success": "true", "reason": "x"}') + + def test_rejects_int(self): + # bool is a subclass of int in Python — make sure we don't accept ints either. + with pytest.raises(ValueError, match="Expected boolean 'success' value"): + _parse_success_response('{"success": 1, "reason": "x"}') + + def test_rejects_missing_key(self): + with pytest.raises(ValueError, match="Expected JSON object with 'success' key"): + _parse_success_response('{"reason": "x"}') + + def test_rejects_non_object(self): + with pytest.raises(ValueError, match="Expected JSON object with 'success' key"): + _parse_success_response("[true]") + + def test_rejects_invalid_json(self): + with pytest.raises(ValueError): + _parse_success_response("not json at all")