From 1135b668c205292a7ee230a4f526954ed009b027 Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Wed, 13 May 2026 20:40:44 -0700 Subject: [PATCH 01/18] fix(datasets): guard get_safe_version with is_valid_version on SHA revisions --- src/opentau/datasets/lerobot_dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index 6f3efae6..bf1bb2db 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -1358,7 +1358,8 @@ def __init__( assert all((self.root / fpath).is_file() for fpath in self.get_episodes_file_paths()) self.hf_dataset = self.load_hf_dataset() except (AssertionError, FileNotFoundError, NotADirectoryError): - self.revision = get_safe_version(self.repo_id, self.revision) + if is_valid_version(self.revision): + self.revision = get_safe_version(self.repo_id, self.revision) self.download_episodes(download_videos) self.hf_dataset = self.load_hf_dataset() From ad609f131592b7af1c78be010026991c369035ff Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Wed, 13 May 2026 20:48:44 -0700 Subject: [PATCH 02/18] fix(datasets): fall back to inferred features when info.json cast fails --- src/opentau/datasets/lerobot_dataset.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index bf1bb2db..f4d1aa56 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -1614,7 +1614,24 @@ def load_hf_dataset(self) -> datasets.Dataset: pa_dataset = pa_ds.dataset(list(map(str, paths)), format="parquet") filter_expr = pa_ds.field("episode_index").isin(self.episodes) if self.episodes is not None else None table = pa_dataset.to_table(filter=filter_expr) - hf_dataset = Dataset(table, info=DatasetInfo(features=features)) + try: + hf_dataset = Dataset(table, info=DatasetInfo(features=features)) + except TypeError as cast_err: + # info.json's declared feature types can drift from what was actually + # written to parquet (e.g. shape=[1] gets serialized as Value(...) by + # get_hf_features_from_features but the parquet column was emitted + # as list<...>). The strict cast in HF's Dataset constructor then + # raises TypeError("Couldn't cast ..."). Surface a warning and fall + # back to schema-inferred features so the dataset still loads; the + # underlying metadata still needs a curator-side fix. + if "Couldn't cast" not in str(cast_err): + raise + logging.warning( + "Feature cast from meta/info.json failed for %s (%s); falling back to parquet-inferred features.", + self.repo_id, + cast_err, + ) + hf_dataset = Dataset(table) hf_dataset.set_transform(hf_transform_to_torch) return hf_dataset From 0e34b6531d754b318249b80c6730b6bc282e3547 Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Wed, 13 May 2026 21:11:59 -0700 Subject: [PATCH 03/18] fix(datasets): use HF Hub filelock instead of rank-coordinated metadata download --- src/opentau/datasets/lerobot_dataset.py | 55 +++++++++++-------------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index f4d1aa56..60403db2 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -363,17 +363,18 @@ def __init__( if is_valid_version(self.revision): self.revision = get_safe_version(self.repo_id, self.revision) - # In distributed training, only rank 0 downloads to avoid race conditions - # where other ranks read metadata before the download has finished. - acc = get_proc_accelerator() - if acc is not None and acc.num_processes > 1: - if acc.is_main_process: - (self.root / "meta").mkdir(exist_ok=True, parents=True) - self.pull_from_repo(allow_patterns="meta/") - acc.wait_for_everyone() - else: - (self.root / "meta").mkdir(exist_ok=True, parents=True) - self.pull_from_repo(allow_patterns="meta/") + # All ranks attempt the metadata download. snapshot_download uses a + # per-repo filelock under the hood, so concurrent callers from + # sibling ranks don't double-fetch — one winner does the download, + # the rest block on the lock and then resolve as cache hits. + # The previous rank-0-only + accelerator.wait_for_everyone() + # pattern was racy in practice: under DeepSpeed launch the barrier + # did not reliably gate non-zero ranks until rank 0 finished + # pull_from_repo, so downstream ranks would reach load_metadata + # before meta/info.json existed on disk and crash with + # FileNotFoundError. + (self.root / "meta").mkdir(exist_ok=True, parents=True) + self.pull_from_repo(allow_patterns="meta/") self.load_metadata() def load_metadata(self) -> None: @@ -1305,27 +1306,17 @@ def __init__( src_root = HF_OPENTAU_HOME / src_repo src_info_path = src_root / INFO_PATH if not src_info_path.is_file(): - acc = get_proc_accelerator() - if acc is not None and acc.num_processes > 1: - if acc.is_main_process: - src_info_path.parent.mkdir(exist_ok=True, parents=True) - hf_hub_download( - repo_id=src_repo, - filename=INFO_PATH, - repo_type="dataset", - revision=src_revision, - local_dir=src_root, - ) - acc.wait_for_everyone() - else: - src_info_path.parent.mkdir(exist_ok=True, parents=True) - hf_hub_download( - repo_id=src_repo, - filename=INFO_PATH, - repo_type="dataset", - revision=src_revision, - local_dir=src_root, - ) + # All ranks fetch concurrently — hf_hub_download uses a per-file + # filelock, so sibling ranks coalesce on the same download. + # See LeRobotDatasetMetadata.__init__ for the same fix pattern. + src_info_path.parent.mkdir(exist_ok=True, parents=True) + hf_hub_download( + repo_id=src_repo, + filename=INFO_PATH, + repo_type="dataset", + revision=src_revision, + local_dir=src_root, + ) src_info = json.loads(src_info_path.read_text()) self._overlay = { "source_repo": src_repo, From 40f2a34dd79798b278d452fa638cfe990eaca0f2 Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Wed, 13 May 2026 21:38:22 -0700 Subject: [PATCH 04/18] fix(datasets): always barrier in load_or_compute_speed_percentiles --- src/opentau/datasets/speed_percentiles.py | 80 +++++++++++++---------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/src/opentau/datasets/speed_percentiles.py b/src/opentau/datasets/speed_percentiles.py index d95ec429..160e49f8 100644 --- a/src/opentau/datasets/speed_percentiles.py +++ b/src/opentau/datasets/speed_percentiles.py @@ -271,39 +271,47 @@ def load_or_compute_speed_percentiles( distributed = acc is not None and acc.num_processes > 1 is_main_or_solo = (not distributed) or acc.is_main_process - if path.is_file(): - # `episode_to_task_index` already drops episodes with empty tasks, - # so its length is the per-task episode count we want to compare - # against the on-disk sum. - return _read_persisted(path, len(episode_to_task_index), warn=is_main_or_solo) - - by_task = _group_lengths_by_task(episode_lengths, episode_to_task_index) - index_to_task = {idx: task for task, idx in task_to_task_index.items()} - percentiles = compute_task_percentiles(by_task) - rows = [ - { - "task_index": task_idx, - "task": index_to_task.get(task_idx, ""), - "n_episodes": len(by_task.get(task_idx, [])), - "percentiles": percentiles[task_idx], - } - for task_idx in sorted(percentiles) - ] - - if is_main_or_solo: - try: - _atomic_write_jsonlines(rows, path) - except (OSError, PermissionError) as e: - root_key = str(root) - if root_key not in _READONLY_WARNED: - _READONLY_WARNED.add(root_key) - logging.warning( - "Could not write speed percentiles to %s (%s); using in-memory " - "values for this run. The compute will repeat on every load until " - "the file can be written.", - path, - e, - ) - if distributed: - acc.wait_for_everyone() - return percentiles + # NB: the barrier at the end of this function must run on every code path, + # not just the compute path. Otherwise a rank that arrives *after* rank 0 + # has finished writing the file takes the early-return branch (file now + # exists), skips the barrier, and silently desyncs the collective counter + # for every subsequent collective in the mixture-load loop — manifesting + # as a NCCL hang at a much later (and entirely unrelated) sync point. + try: + if path.is_file(): + # `episode_to_task_index` already drops episodes with empty tasks, + # so its length is the per-task episode count we want to compare + # against the on-disk sum. + return _read_persisted(path, len(episode_to_task_index), warn=is_main_or_solo) + + by_task = _group_lengths_by_task(episode_lengths, episode_to_task_index) + index_to_task = {idx: task for task, idx in task_to_task_index.items()} + percentiles = compute_task_percentiles(by_task) + rows = [ + { + "task_index": task_idx, + "task": index_to_task.get(task_idx, ""), + "n_episodes": len(by_task.get(task_idx, [])), + "percentiles": percentiles[task_idx], + } + for task_idx in sorted(percentiles) + ] + + if is_main_or_solo: + try: + _atomic_write_jsonlines(rows, path) + except (OSError, PermissionError) as e: + root_key = str(root) + if root_key not in _READONLY_WARNED: + _READONLY_WARNED.add(root_key) + logging.warning( + "Could not write speed percentiles to %s (%s); using in-memory " + "values for this run. The compute will repeat on every load until " + "the file can be written.", + path, + e, + ) + return percentiles + finally: + if distributed: + acc.wait_for_everyone() From 25d50cc1afa7ada53a8a8ecd03d6101e8ea47203 Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Wed, 13 May 2026 21:44:17 -0700 Subject: [PATCH 05/18] build(deps): add py-spy to dev extras for deadlock debugging --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 058eafe2..a4198018 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,6 +107,7 @@ opentau-dataset-viz = "opentau.scripts.launch:visualize" [project.optional-dependencies] dev = ["pre-commit>=3.7.0", "debugpy>=1.8.1", + "py-spy>=0.3.14", "pytest>=8.1.0", "pytest-cov>=5.0.0", "pyserial>=3.5", From 808bd4d131a2e5fc970fd8d42db433a20bb7e04c Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Wed, 13 May 2026 21:46:07 -0700 Subject: [PATCH 06/18] Revert "build(deps): add py-spy to dev extras for deadlock debugging" This reverts commit 25d50cc1afa7ada53a8a8ecd03d6101e8ea47203. --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index a4198018..058eafe2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,7 +107,6 @@ opentau-dataset-viz = "opentau.scripts.launch:visualize" [project.optional-dependencies] dev = ["pre-commit>=3.7.0", "debugpy>=1.8.1", - "py-spy>=0.3.14", "pytest>=8.1.0", "pytest-cov>=5.0.0", "pyserial>=3.5", From db438c4bab24cbd8553afa9882ff937c5963e94c Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Wed, 13 May 2026 21:54:55 -0700 Subject: [PATCH 07/18] Revert "fix(datasets): use HF Hub filelock instead of rank-coordinated metadata download" This reverts commit 0e34b6531d754b318249b80c6730b6bc282e3547. --- src/opentau/datasets/lerobot_dataset.py | 55 ++++++++++++++----------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index 60403db2..f4d1aa56 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -363,18 +363,17 @@ def __init__( if is_valid_version(self.revision): self.revision = get_safe_version(self.repo_id, self.revision) - # All ranks attempt the metadata download. snapshot_download uses a - # per-repo filelock under the hood, so concurrent callers from - # sibling ranks don't double-fetch — one winner does the download, - # the rest block on the lock and then resolve as cache hits. - # The previous rank-0-only + accelerator.wait_for_everyone() - # pattern was racy in practice: under DeepSpeed launch the barrier - # did not reliably gate non-zero ranks until rank 0 finished - # pull_from_repo, so downstream ranks would reach load_metadata - # before meta/info.json existed on disk and crash with - # FileNotFoundError. - (self.root / "meta").mkdir(exist_ok=True, parents=True) - self.pull_from_repo(allow_patterns="meta/") + # In distributed training, only rank 0 downloads to avoid race conditions + # where other ranks read metadata before the download has finished. + acc = get_proc_accelerator() + if acc is not None and acc.num_processes > 1: + if acc.is_main_process: + (self.root / "meta").mkdir(exist_ok=True, parents=True) + self.pull_from_repo(allow_patterns="meta/") + acc.wait_for_everyone() + else: + (self.root / "meta").mkdir(exist_ok=True, parents=True) + self.pull_from_repo(allow_patterns="meta/") self.load_metadata() def load_metadata(self) -> None: @@ -1306,17 +1305,27 @@ def __init__( src_root = HF_OPENTAU_HOME / src_repo src_info_path = src_root / INFO_PATH if not src_info_path.is_file(): - # All ranks fetch concurrently — hf_hub_download uses a per-file - # filelock, so sibling ranks coalesce on the same download. - # See LeRobotDatasetMetadata.__init__ for the same fix pattern. - src_info_path.parent.mkdir(exist_ok=True, parents=True) - hf_hub_download( - repo_id=src_repo, - filename=INFO_PATH, - repo_type="dataset", - revision=src_revision, - local_dir=src_root, - ) + acc = get_proc_accelerator() + if acc is not None and acc.num_processes > 1: + if acc.is_main_process: + src_info_path.parent.mkdir(exist_ok=True, parents=True) + hf_hub_download( + repo_id=src_repo, + filename=INFO_PATH, + repo_type="dataset", + revision=src_revision, + local_dir=src_root, + ) + acc.wait_for_everyone() + else: + src_info_path.parent.mkdir(exist_ok=True, parents=True) + hf_hub_download( + repo_id=src_repo, + filename=INFO_PATH, + repo_type="dataset", + revision=src_revision, + local_dir=src_root, + ) src_info = json.loads(src_info_path.read_text()) self._overlay = { "source_repo": src_repo, From 65dfad772a22cba28f9ffbb7024a4ce639a016da Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Wed, 13 May 2026 22:07:01 -0700 Subject: [PATCH 08/18] fix(datasets): skip episodes with unresolvable task labels in speed bucketing --- src/opentau/datasets/speed_percentiles.py | 28 ++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/src/opentau/datasets/speed_percentiles.py b/src/opentau/datasets/speed_percentiles.py index 160e49f8..3e14f629 100644 --- a/src/opentau/datasets/speed_percentiles.py +++ b/src/opentau/datasets/speed_percentiles.py @@ -68,6 +68,11 @@ # ``_CONTROL_MODE_WARNED`` pattern in ``lerobot_dataset.py``. _READONLY_WARNED: set[str] = set() +# Module-level set of unresolved episode task labels we've already warned +# about, so episodes.jsonl / tasks.jsonl drift in a dataset only logs once +# per distinct bad label per process instead of once per episode per rank. +_UNRESOLVED_TASK_WARNED: set[str] = set() + def compute_task_percentiles( episode_lengths_per_task: dict[int, list[int]], @@ -143,13 +148,34 @@ def episode_to_task_index_from_episodes( ``tasks[0]`` — the codebase assumes an N-to-1 episode-to-task relationship even though the field is structurally a list. Episodes with an empty / missing ``tasks`` list are skipped. + + Episodes whose ``tasks[0]`` is absent from ``task_to_task_index`` are + also skipped (with a deduped warning) rather than raising. This guards + against ``episodes.jsonl`` / ``tasks.jsonl`` drift in dataset metadata + — a `tasks.jsonl` with stale/incomplete entries, or an + ``episodes.jsonl`` that stores integer task indices instead of task + strings. Skipped episodes are still trained on; they just fall back to + ``SPARSE_TASK_BUCKET`` in the downstream speed-bucket lookup, which + already tolerates a missing ``episode_to_task_index`` entry. """ out: dict[int, int] = {} for ep_idx, ep_info in episodes.items(): tasks = ep_info.get("tasks") or [] if not tasks: continue - out[ep_idx] = task_to_task_index[tasks[0]] + task_idx = task_to_task_index.get(tasks[0]) + if task_idx is None: + key = str(tasks[0]) + if key not in _UNRESOLVED_TASK_WARNED: + _UNRESOLVED_TASK_WARNED.add(key) + logging.warning( + "Episode task label %r is not present in tasks.jsonl; episode(s) " + "using it fall back to the sparse speed bucket. This indicates " + "episodes.jsonl / tasks.jsonl drift in the dataset metadata.", + tasks[0], + ) + continue + out[ep_idx] = task_idx return out From 59e7d083ff718f6effdddc7981f3bed8e9b0f8ab Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Wed, 13 May 2026 22:18:01 -0700 Subject: [PATCH 09/18] fix(datasets): also fall back on ValueError Keys mismatch, not just TypeError --- src/opentau/datasets/lerobot_dataset.py | 27 ++++++++++++++++--------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index f4d1aa56..e5c645f1 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -1616,18 +1616,25 @@ def load_hf_dataset(self) -> datasets.Dataset: table = pa_dataset.to_table(filter=filter_expr) try: hf_dataset = Dataset(table, info=DatasetInfo(features=features)) - except TypeError as cast_err: - # info.json's declared feature types can drift from what was actually - # written to parquet (e.g. shape=[1] gets serialized as Value(...) by - # get_hf_features_from_features but the parquet column was emitted - # as list<...>). The strict cast in HF's Dataset constructor then - # raises TypeError("Couldn't cast ..."). Surface a warning and fall - # back to schema-inferred features so the dataset still loads; the - # underlying metadata still needs a curator-side fix. - if "Couldn't cast" not in str(cast_err): + except (TypeError, ValueError) as cast_err: + # info.json's declared features can drift from what was actually + # written to parquet, in two ways: + # * type drift — e.g. shape=[1] gets serialized as Value(...) by + # get_hf_features_from_features but the parquet column was + # emitted as list<...>; HF's Dataset constructor then raises + # TypeError("Couldn't cast ..."). + # * column-set drift — parquet has a column info.json never + # declared (e.g. gripper_activity_action); HF raises + # ValueError("Keys mismatch ..."). + # Both mean the same thing — stale metadata — and both want the + # same fallback: use parquet-inferred features so the dataset + # still loads. The underlying metadata still needs a curator fix. + msg = str(cast_err) + if "Couldn't cast" not in msg and "Keys mismatch" not in msg: raise logging.warning( - "Feature cast from meta/info.json failed for %s (%s); falling back to parquet-inferred features.", + "Feature reconciliation from meta/info.json failed for %s (%s); " + "falling back to parquet-inferred features.", self.repo_id, cast_err, ) From 747e4cc5c7b6be5091fb050a264e73677b71b737 Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Wed, 13 May 2026 22:30:35 -0700 Subject: [PATCH 10/18] fix(datasets): fall back on any typed-Dataset schema error, truncate log --- src/opentau/datasets/lerobot_dataset.py | 36 ++++++++++++++----------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index e5c645f1..42f9f3ff 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -1617,26 +1617,30 @@ def load_hf_dataset(self) -> datasets.Dataset: try: hf_dataset = Dataset(table, info=DatasetInfo(features=features)) except (TypeError, ValueError) as cast_err: - # info.json's declared features can drift from what was actually - # written to parquet, in two ways: - # * type drift — e.g. shape=[1] gets serialized as Value(...) by - # get_hf_features_from_features but the parquet column was - # emitted as list<...>; HF's Dataset constructor then raises - # TypeError("Couldn't cast ..."). - # * column-set drift — parquet has a column info.json never - # declared (e.g. gripper_activity_action); HF raises - # ValueError("Keys mismatch ..."). - # Both mean the same thing — stale metadata — and both want the - # same fallback: use parquet-inferred features so the dataset - # still loads. The underlying metadata still needs a curator fix. - msg = str(cast_err) - if "Couldn't cast" not in msg and "Keys mismatch" not in msg: - raise + # info.json's declared features routinely drift from what was + # actually written to parquet. HF's typed Dataset constructor + # surfaces this as several different exceptions depending on the + # kind of drift — observed in this dataset corpus: + # * TypeError("Couldn't cast ...") — type drift + # * ValueError("Keys mismatch ...") — column-set drift + # * ValueError("External features info don't match ...") + # — fixed vs variable + # length list drift + # They all mean the same thing (stale metadata) and all want the + # same fallback: drop the declared features and infer them from the + # parquet so the dataset still loads. `features` is built by our + # own get_hf_features_from_features from a valid pa.Table, so a + # TypeError/ValueError from this constructor is always a metadata/ + # parquet reconciliation failure, never a programming error — hence + # catching the exception types rather than grepping their messages + # (the message text varies and embeds huge schema dumps). The + # underlying metadata still needs a curator-side fix. + reason = str(cast_err).splitlines()[0][:200] logging.warning( "Feature reconciliation from meta/info.json failed for %s (%s); " "falling back to parquet-inferred features.", self.repo_id, - cast_err, + reason, ) hf_dataset = Dataset(table) hf_dataset.set_transform(hf_transform_to_torch) From c9810cc0047f26b5ecb3687f23a650851e599ccc Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Wed, 13 May 2026 23:01:48 -0700 Subject: [PATCH 11/18] fix(datasets): stream parquet to mmap'd Arrow file instead of full in-RAM load --- src/opentau/datasets/lerobot_dataset.py | 108 ++++++++++++++++++------ 1 file changed, 82 insertions(+), 26 deletions(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index 42f9f3ff..9b2e5b5d 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -84,12 +84,15 @@ import contextlib import copy import functools +import hashlib import json import logging import math +import os import re import shutil import traceback +import uuid from abc import abstractmethod from pathlib import Path from typing import Callable @@ -98,12 +101,14 @@ import numpy as np import packaging.version import PIL.Image +import pyarrow as pa import pyarrow.dataset as pa_ds import torch import torch.nn.functional as F # noqa: N812 import torch.utils from datasets import Dataset, DatasetInfo, concatenate_datasets from einops import rearrange +from filelock import FileLock from huggingface_hub import HfApi, hf_hub_download, snapshot_download from huggingface_hub.constants import REPOCARD_NAME from huggingface_hub.errors import RevisionNotFoundError @@ -1589,33 +1594,35 @@ def load_hf_dataset(self) -> datasets.Dataset: paths = sorted(self.root.glob(glob_pattern)) if not paths: raise FileNotFoundError(f"No parquet files matching {glob_pattern!r} under {self.root}") - features = get_hf_features_from_features(self.meta.features) - # Read parquet directly via pyarrow.dataset and wrap the resulting - # pa.Table in a HF Dataset. Going through `load_dataset("parquet", ...)` - # or `Dataset.from_parquet(...)` both route through ParquetDatasetBuilder, - # which rewrites the parquet bytes into an uncompressed Arrow cache at - # $HF_HOME/datasets/parquet/ — 1-5x the source size (compression-dependent) - # and one cache entry per distinct (paths, filter) combo. Issue #277 has - # the empirical numbers; verified on physical-intelligence/libero. - # - # Trade-off: `to_table(filter=...)` materializes the filtered rows into - # RAM rather than mmapping a disk-backed Arrow cache. RAM cost scales - # with `len(filtered rows) × avg-row-size`; concretely: - # ~350 MB for physical-intelligence/libero with episodes=[0..9], - # ~46 GB for humanoid-everyday-A-overlay with episodes=None (full corpus). - # Narrow `episodes=` picks are fine; an episodes=None load on a multi-GB - # image-heavy repo will OOM on a small dev box — pass a manageable - # subset, or restore a mmap'd Arrow cache via tmp pa.ipc files if RAM - # ever becomes the binding constraint. - # - # The `Dataset(table, info=DatasetInfo(features=features))` constructor - # signature has been stable since datasets 2.x; the project pin is - # `datasets>=2.19.0`, so we're well inside the supported window. pa_dataset = pa_ds.dataset(list(map(str, paths)), format="parquet") filter_expr = pa_ds.field("episode_index").isin(self.episodes) if self.episodes is not None else None + + # Image-dtype columns hold raw image bytes inline in the parquet and + # need the HF `Image()` feature to decode on access. The mmap path + # loads via `Dataset.from_file`, which infers features from the Arrow + # schema and so cannot reconstruct `Image()`; route those datasets (all + # small in this corpus) through the in-RAM path instead. Everything + # else — including the multi-hundred-GB video repos — goes through the + # memory-mapped path so the mixture load doesn't OOM the node. + has_image_feature = any(ft.get("dtype") == "image" for ft in self.meta.features.values()) + if has_image_feature: + hf_dataset = self._load_hf_dataset_in_ram(pa_dataset, filter_expr) + else: + hf_dataset = self._load_hf_dataset_mmap(pa_dataset, filter_expr) + hf_dataset.set_transform(hf_transform_to_torch) + return hf_dataset + + def _load_hf_dataset_in_ram(self, pa_dataset: pa_ds.Dataset, filter_expr) -> datasets.Dataset: + """Materialize the filtered parquet fully in RAM and wrap it in a Dataset. + + Used only for datasets with `image`-dtype columns, which need the HF + `Image()` feature applied via the typed `Dataset(...)` constructor. + Such datasets are small in this corpus, so the in-RAM cost is bounded. + """ + features = get_hf_features_from_features(self.meta.features) table = pa_dataset.to_table(filter=filter_expr) try: - hf_dataset = Dataset(table, info=DatasetInfo(features=features)) + return Dataset(table, info=DatasetInfo(features=features)) except (TypeError, ValueError) as cast_err: # info.json's declared features routinely drift from what was # actually written to parquet. HF's typed Dataset constructor @@ -1642,9 +1649,58 @@ def load_hf_dataset(self) -> datasets.Dataset: self.repo_id, reason, ) - hf_dataset = Dataset(table) - hf_dataset.set_transform(hf_transform_to_torch) - return hf_dataset + return Dataset(table) + + def _load_hf_dataset_mmap(self, pa_dataset: pa_ds.Dataset, filter_expr) -> datasets.Dataset: + """Stream the filtered parquet into a memory-mapped Arrow IPC file. + + `to_table()` materializes every filtered row in RAM. With 8 ranks each + loading the full mixture, the multi-hundred-GB video repos + (humanoid-everyday-*) blow past node memory and OOM. Streaming the + scanner batch-by-batch into an Arrow IPC file keeps only one batch + resident during the write, and `Dataset.from_file` then mmaps the + result — resident memory is bounded by the OS page cache, not the + dataset size, and the mapping is shared across all ranks on the node. + + The Arrow file is cached under the dataset root keyed by + (revision, episode selection): reruns skip the conversion entirely and + the key invalidates when either input changes. A per-key FileLock + serialises the write across ranks sharing the node-local cache dir — + exactly one rank runs the conversion, the rest block then mmap the + result. (Going through `load_dataset`/`Dataset.from_parquet` instead + would route through ParquetDatasetBuilder, which rewrites an + uncompressed Arrow cache at $HF_HOME at 1-5x source size — see #277.) + """ + cache_dir = self.root / ".arrow_cache" + episodes_key = "all" if self.episodes is None else ",".join(map(str, sorted(self.episodes))) + cache_key = hashlib.sha256(f"{self.revision}\0{episodes_key}".encode()).hexdigest()[:16] + arrow_path = cache_dir / f"{cache_key}.arrow" + lock_path = cache_dir / f"{cache_key}.lock" + cache_dir.mkdir(parents=True, exist_ok=True) + + # FileLock releases on process exit (fcntl), so a crashed writer can't + # deadlock the others; the loser of the race just finds the file ready. + with FileLock(str(lock_path)): + if not arrow_path.is_file(): + tmp_path = cache_dir / f"{cache_key}.{uuid.uuid4().hex}.tmp" + try: + scanner = pa_dataset.scanner(filter=filter_expr) + with pa.OSFile(str(tmp_path), "wb") as sink: + writer = None + for batch in scanner.to_batches(): + if writer is None: + writer = pa.ipc.new_file(sink, batch.schema) + writer.write_batch(batch) + if writer is None: + # No rows matched the filter — still emit a valid + # (empty) Arrow file so the mmap load below works. + writer = pa.ipc.new_file(sink, pa_dataset.schema) + writer.close() + os.replace(tmp_path, arrow_path) # atomic publish + finally: + Path(tmp_path).unlink(missing_ok=True) + + return Dataset.from_file(str(arrow_path)) def create_hf_dataset(self) -> datasets.Dataset: """Create an empty HuggingFace dataset with the correct features. From 0d127446362995a00bf3d517552e2e94f77034bc Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Wed, 13 May 2026 23:11:51 -0700 Subject: [PATCH 12/18] fix(datasets): write Arrow IPC stream format for Dataset.from_file mmap --- src/opentau/datasets/lerobot_dataset.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index 9b2e5b5d..2699f88b 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -1685,16 +1685,21 @@ def _load_hf_dataset_mmap(self, pa_dataset: pa_ds.Dataset, filter_expr) -> datas tmp_path = cache_dir / f"{cache_key}.{uuid.uuid4().hex}.tmp" try: scanner = pa_dataset.scanner(filter=filter_expr) + # Arrow IPC *stream* format (new_stream), not file format: + # HF's Dataset.from_file memory-maps via pa.ipc.open_stream, + # which only reads the stream format. Writing the file + # format here makes from_file misparse the footer as a + # stream message ("Expected to read N metadata bytes ..."). with pa.OSFile(str(tmp_path), "wb") as sink: writer = None for batch in scanner.to_batches(): if writer is None: - writer = pa.ipc.new_file(sink, batch.schema) + writer = pa.ipc.new_stream(sink, batch.schema) writer.write_batch(batch) if writer is None: # No rows matched the filter — still emit a valid - # (empty) Arrow file so the mmap load below works. - writer = pa.ipc.new_file(sink, pa_dataset.schema) + # (empty) Arrow stream so the mmap load below works. + writer = pa.ipc.new_stream(sink, pa_dataset.schema) writer.close() os.replace(tmp_path, arrow_path) # atomic publish finally: From 4bf9277a0c77c393535476d1157aeb2ff26dbd70 Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Thu, 14 May 2026 00:14:52 -0700 Subject: [PATCH 13/18] fix(datasets): use load_dataset(parquet) for genuine mmap, drop hand-rolled paths --- src/opentau/datasets/lerobot_dataset.py | 148 +++++------------------- 1 file changed, 27 insertions(+), 121 deletions(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index 2699f88b..fb4df58f 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -84,15 +84,12 @@ import contextlib import copy import functools -import hashlib import json import logging import math -import os import re import shutil import traceback -import uuid from abc import abstractmethod from pathlib import Path from typing import Callable @@ -101,14 +98,11 @@ import numpy as np import packaging.version import PIL.Image -import pyarrow as pa -import pyarrow.dataset as pa_ds import torch import torch.nn.functional as F # noqa: N812 import torch.utils -from datasets import Dataset, DatasetInfo, concatenate_datasets +from datasets import concatenate_datasets, load_dataset from einops import rearrange -from filelock import FileLock from huggingface_hub import HfApi, hf_hub_download, snapshot_download from huggingface_hub.constants import REPOCARD_NAME from huggingface_hub.errors import RevisionNotFoundError @@ -1582,7 +1576,25 @@ def get_episodes_file_paths(self) -> list[str]: return fpaths def load_hf_dataset(self) -> datasets.Dataset: - """hf_dataset contains all the observations, states, actions, rewards, etc.""" + """hf_dataset contains all the observations, states, actions, rewards, etc. + + Loads the per-episode parquet files via `load_dataset("parquet", ...)`, + which builds a memory-mapped Arrow cache under `$HF_HOME/datasets/`. The + cache costs disk (~1-5x the parquet, compression-dependent — see #277) + but the loaded dataset is genuinely memory-mapped (resident pages are + file-backed and reclaimable), so RAM stays bounded by the OS page cache + rather than the dataset size. That is essential here: 8 ranks each load + the full mixture, and the multi-hundred-GB video repos would otherwise + OOM the node. A hand-rolled `pa_ds.to_table()` + `Dataset(table)` (or + streaming to a self-written Arrow IPC file + `Dataset.from_file`) was + tried and both materialised into anonymous RAM instead of mmapping — + `load_dataset` routes through HF's ParquetDatasetBuilder, whose Arrow + cache `Dataset.from_file` *does* memory-map correctly. + + Files are passed in sorted-episode order so the hf_dataset row layout + stays aligned with `episode_data_index["from"/"to"]` — see the + `sorted(episodes)` note in __init__. + """ # Derive the parquet glob from the meta data_path template so that # datasets with a non-default `info["data_path"]` (deeper nesting, # flat layout, etc.) keep working. Default template is @@ -1590,123 +1602,17 @@ def load_hf_dataset(self) -> datasets.Dataset: # which yields the glob "data/chunk-*/episode_*.parquet". Assumes the # template uses simple `{name}` / `{name:fmt}` placeholders and no # literal `{{`/`}}` escapes — true for every in-repo writer. - glob_pattern = re.sub(r"\{[^}]+\}", "*", self.meta.data_path) - paths = sorted(self.root.glob(glob_pattern)) - if not paths: - raise FileNotFoundError(f"No parquet files matching {glob_pattern!r} under {self.root}") - pa_dataset = pa_ds.dataset(list(map(str, paths)), format="parquet") - filter_expr = pa_ds.field("episode_index").isin(self.episodes) if self.episodes is not None else None - - # Image-dtype columns hold raw image bytes inline in the parquet and - # need the HF `Image()` feature to decode on access. The mmap path - # loads via `Dataset.from_file`, which infers features from the Arrow - # schema and so cannot reconstruct `Image()`; route those datasets (all - # small in this corpus) through the in-RAM path instead. Everything - # else — including the multi-hundred-GB video repos — goes through the - # memory-mapped path so the mixture load doesn't OOM the node. - has_image_feature = any(ft.get("dtype") == "image" for ft in self.meta.features.values()) - if has_image_feature: - hf_dataset = self._load_hf_dataset_in_ram(pa_dataset, filter_expr) + if self.episodes is None: + glob_pattern = re.sub(r"\{[^}]+\}", "*", self.meta.data_path) + files = [str(p) for p in sorted(self.root.glob(glob_pattern))] else: - hf_dataset = self._load_hf_dataset_mmap(pa_dataset, filter_expr) + files = [str(self.root / self.meta.get_data_file_path(ep_idx)) for ep_idx in self.episodes] + if not files: + raise FileNotFoundError(f"No parquet files for {self.repo_id} under {self.root}") + hf_dataset = load_dataset("parquet", data_files=files, split="train") hf_dataset.set_transform(hf_transform_to_torch) return hf_dataset - def _load_hf_dataset_in_ram(self, pa_dataset: pa_ds.Dataset, filter_expr) -> datasets.Dataset: - """Materialize the filtered parquet fully in RAM and wrap it in a Dataset. - - Used only for datasets with `image`-dtype columns, which need the HF - `Image()` feature applied via the typed `Dataset(...)` constructor. - Such datasets are small in this corpus, so the in-RAM cost is bounded. - """ - features = get_hf_features_from_features(self.meta.features) - table = pa_dataset.to_table(filter=filter_expr) - try: - return Dataset(table, info=DatasetInfo(features=features)) - except (TypeError, ValueError) as cast_err: - # info.json's declared features routinely drift from what was - # actually written to parquet. HF's typed Dataset constructor - # surfaces this as several different exceptions depending on the - # kind of drift — observed in this dataset corpus: - # * TypeError("Couldn't cast ...") — type drift - # * ValueError("Keys mismatch ...") — column-set drift - # * ValueError("External features info don't match ...") - # — fixed vs variable - # length list drift - # They all mean the same thing (stale metadata) and all want the - # same fallback: drop the declared features and infer them from the - # parquet so the dataset still loads. `features` is built by our - # own get_hf_features_from_features from a valid pa.Table, so a - # TypeError/ValueError from this constructor is always a metadata/ - # parquet reconciliation failure, never a programming error — hence - # catching the exception types rather than grepping their messages - # (the message text varies and embeds huge schema dumps). The - # underlying metadata still needs a curator-side fix. - reason = str(cast_err).splitlines()[0][:200] - logging.warning( - "Feature reconciliation from meta/info.json failed for %s (%s); " - "falling back to parquet-inferred features.", - self.repo_id, - reason, - ) - return Dataset(table) - - def _load_hf_dataset_mmap(self, pa_dataset: pa_ds.Dataset, filter_expr) -> datasets.Dataset: - """Stream the filtered parquet into a memory-mapped Arrow IPC file. - - `to_table()` materializes every filtered row in RAM. With 8 ranks each - loading the full mixture, the multi-hundred-GB video repos - (humanoid-everyday-*) blow past node memory and OOM. Streaming the - scanner batch-by-batch into an Arrow IPC file keeps only one batch - resident during the write, and `Dataset.from_file` then mmaps the - result — resident memory is bounded by the OS page cache, not the - dataset size, and the mapping is shared across all ranks on the node. - - The Arrow file is cached under the dataset root keyed by - (revision, episode selection): reruns skip the conversion entirely and - the key invalidates when either input changes. A per-key FileLock - serialises the write across ranks sharing the node-local cache dir — - exactly one rank runs the conversion, the rest block then mmap the - result. (Going through `load_dataset`/`Dataset.from_parquet` instead - would route through ParquetDatasetBuilder, which rewrites an - uncompressed Arrow cache at $HF_HOME at 1-5x source size — see #277.) - """ - cache_dir = self.root / ".arrow_cache" - episodes_key = "all" if self.episodes is None else ",".join(map(str, sorted(self.episodes))) - cache_key = hashlib.sha256(f"{self.revision}\0{episodes_key}".encode()).hexdigest()[:16] - arrow_path = cache_dir / f"{cache_key}.arrow" - lock_path = cache_dir / f"{cache_key}.lock" - cache_dir.mkdir(parents=True, exist_ok=True) - - # FileLock releases on process exit (fcntl), so a crashed writer can't - # deadlock the others; the loser of the race just finds the file ready. - with FileLock(str(lock_path)): - if not arrow_path.is_file(): - tmp_path = cache_dir / f"{cache_key}.{uuid.uuid4().hex}.tmp" - try: - scanner = pa_dataset.scanner(filter=filter_expr) - # Arrow IPC *stream* format (new_stream), not file format: - # HF's Dataset.from_file memory-maps via pa.ipc.open_stream, - # which only reads the stream format. Writing the file - # format here makes from_file misparse the footer as a - # stream message ("Expected to read N metadata bytes ..."). - with pa.OSFile(str(tmp_path), "wb") as sink: - writer = None - for batch in scanner.to_batches(): - if writer is None: - writer = pa.ipc.new_stream(sink, batch.schema) - writer.write_batch(batch) - if writer is None: - # No rows matched the filter — still emit a valid - # (empty) Arrow stream so the mmap load below works. - writer = pa.ipc.new_stream(sink, pa_dataset.schema) - writer.close() - os.replace(tmp_path, arrow_path) # atomic publish - finally: - Path(tmp_path).unlink(missing_ok=True) - - return Dataset.from_file(str(arrow_path)) - def create_hf_dataset(self) -> datasets.Dataset: """Create an empty HuggingFace dataset with the correct features. From f5cc6917a7c2c54a4648e6b8c3052c1f9ee8bae3 Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Thu, 14 May 2026 00:54:31 -0700 Subject: [PATCH 14/18] fix(datasets): download episode files directly, skip slow snapshot_download filter --- src/opentau/datasets/lerobot_dataset.py | 53 +++++++++++++++++++++---- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index fb4df58f..3b0bfcb9 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -91,6 +91,7 @@ import shutil import traceback from abc import abstractmethod +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Callable @@ -1537,20 +1538,56 @@ def pull_from_repo( ignore_patterns=ignore_patterns, ) + @on_accelerate_main_proc(local=True, _sync=True) + def download_files(self, files: list[str]) -> None: + """Fetch an explicit list of repo files, one `hf_hub_download` per file. + + `snapshot_download(allow_patterns=)` spends + many minutes GIL-held and I/O-idle inside `filter_repo_objects`, whose + fnmatch loop is O(repo_files x patterns) — long enough to trip the NCCL + watchdog while sibling ranks wait at the `_sync` broadcast. + `hf_hub_download` targets each file by exact path, skipping the filter + entirely; a thread pool overlaps the network round-trips (the GIL is + released during I/O). Files already present in `local_dir` are not + re-downloaded. + """ + if not files: + return + + def _fetch(fpath: str) -> None: + hf_hub_download( + repo_id=self.repo_id, + filename=fpath, + repo_type="dataset", + revision=self.revision, + local_dir=self.root, + ) + + with ThreadPoolExecutor(max_workers=16) as pool: + # list() forces the lazy map so any per-file failure propagates. + list(pool.map(_fetch, files)) + def download_episodes(self, download_videos: bool = True) -> None: """Downloads the dataset from the given 'repo_id' at the provided version. If 'episodes' is given, this will only download those episodes (selected by their episode_index). If 'episodes' is None, the whole - dataset will be downloaded. Thanks to the behavior of snapshot_download, if the files are already present - in 'local_dir', they won't be downloaded again. + dataset will be downloaded. Already-present files in 'local_dir' are not re-downloaded. """ # TODO(rcadene, aliberts): implement faster transfer # https://huggingface.co/docs/huggingface_hub/en/guides/download#faster-downloads - files = None - ignore_patterns = None if download_videos else "videos/" - if self.episodes is not None: - files = self.get_episodes_file_paths() - - self.pull_from_repo(allow_patterns=files, ignore_patterns=ignore_patterns) + if self.episodes is None: + # Whole-dataset download: snapshot_download with no allow_patterns + # has nothing to filter, so there is no filter_repo_objects blowup. + ignore_patterns = None if download_videos else "videos/" + self.pull_from_repo(ignore_patterns=ignore_patterns) + return + # Episode subset: download exactly the needed files directly. Passing + # the explicit per-episode path list to snapshot_download as + # allow_patterns triggers a pathologically slow filter_repo_objects + # scan (see download_files). + files = self.get_episodes_file_paths() + if not download_videos: + files = [f for f in files if not f.startswith("videos/")] + self.download_files(files) def get_episodes_file_paths(self) -> list[str]: """Get file paths for all selected episodes. From 0f6f2495f3d806d08898ed8780dd1e2408d9fef0 Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Thu, 14 May 2026 08:05:53 -0700 Subject: [PATCH 15/18] fix(datasets): route whole-repo pulls to snapshot_download, skip on-disk files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit download_episodes never reached its "episodes is None" branch: __init__ backfills self.episodes with the full episode list before it runs, so whole-repo datasets (no episode subset) wrongly took the per-file download_files path and fired one hf_hub_download per file — thousands of HF API requests, tripping the 3000 req / 5 min rate limit (429). Capture _episodes_were_specified at construction so the branch survives the backfill and whole-repo datasets use snapshot_download (O(1) listing API calls). download_files also called hf_hub_download for every file with no on-disk check; hf_hub_download issues a network metadata request per file even when the file is already in local_dir. Skip files already on disk so a pre-downloaded episode set makes zero requests. --- src/opentau/datasets/lerobot_dataset.py | 36 ++++++++++++++++++++----- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index 3b0bfcb9..d45733ac 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -1255,6 +1255,13 @@ def __init__( # episode_data_index["from"/"to"] (built in self.episodes order in # get_episode_data_index). Mismatched order would silently return # rows from the wrong episode for callers that pass an unsorted list. + # + # `self.episodes` is backfilled with the full episode list further down + # when it is None (so downstream indexing always has a concrete list), + # which destroys the "no subset requested" signal. Capture it now so + # `download_episodes` can still distinguish a whole-repo pull from a + # subset pull. + self._episodes_were_specified = episodes is not None self.episodes = sorted(episodes) if episodes is not None else None self.tolerance_s = tolerance_s self.skip_timestamp_check = skip_timestamp_check @@ -1540,7 +1547,7 @@ def pull_from_repo( @on_accelerate_main_proc(local=True, _sync=True) def download_files(self, files: list[str]) -> None: - """Fetch an explicit list of repo files, one `hf_hub_download` per file. + """Fetch the files from `files` that are missing on disk, one `hf_hub_download` each. `snapshot_download(allow_patterns=)` spends many minutes GIL-held and I/O-idle inside `filter_repo_objects`, whose @@ -1548,11 +1555,25 @@ def download_files(self, files: list[str]) -> None: watchdog while sibling ranks wait at the `_sync` broadcast. `hf_hub_download` targets each file by exact path, skipping the filter entirely; a thread pool overlaps the network round-trips (the GIL is - released during I/O). Files already present in `local_dir` are not - re-downloaded. + released during I/O). """ if not files: return + # `hf_hub_download` issues a network metadata request per file even + # when the file is already in `local_dir`. Calling it for an + # already-complete episode set burns one HF API request per file and + # trips the 3000 req / 5 min rate limit (429). Skip files already on + # disk — when the selected episodes were pre-downloaded this is a + # no-op that makes zero requests. + missing = [f for f in files if not (self.root / f).is_file()] + if not missing: + return + logging.info( + "%s: %d/%d episode files absent on disk, downloading them", + self.repo_id, + len(missing), + len(files), + ) def _fetch(fpath: str) -> None: hf_hub_download( @@ -1565,7 +1586,7 @@ def _fetch(fpath: str) -> None: with ThreadPoolExecutor(max_workers=16) as pool: # list() forces the lazy map so any per-file failure propagates. - list(pool.map(_fetch, files)) + list(pool.map(_fetch, missing)) def download_episodes(self, download_videos: bool = True) -> None: """Downloads the dataset from the given 'repo_id' at the provided version. If 'episodes' is given, this @@ -1574,9 +1595,12 @@ def download_episodes(self, download_videos: bool = True) -> None: """ # TODO(rcadene, aliberts): implement faster transfer # https://huggingface.co/docs/huggingface_hub/en/guides/download#faster-downloads - if self.episodes is None: + if not self._episodes_were_specified: # Whole-dataset download: snapshot_download with no allow_patterns - # has nothing to filter, so there is no filter_repo_objects blowup. + # has nothing to filter, so there is no filter_repo_objects blowup, + # and it lists the repo tree in O(1) API calls instead of one + # metadata request per file. `self.episodes` has been backfilled to + # the full list by now, so branch on the construction-time flag. ignore_patterns = None if download_videos else "videos/" self.pull_from_repo(ignore_patterns=ignore_patterns) return From 1e28204596b139aabaf96a8234db4bdf224c59d8 Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Thu, 14 May 2026 08:13:25 -0700 Subject: [PATCH 16/18] test(datasets): mock hf_hub_download in lerobot_dataset_factory lerobot_dataset_factory patched snapshot_download but not hf_hub_download. Once download_episodes started routing the episode-subset path through download_files (which uses hf_hub_download), the 3 subset-episode tests in test_datasets.py made real Hub calls and 404'd on the dummy repo. Add mock_hf_hub_download_factory that writes the requested fixture file, mirroring mock_snapshot_download_factory. --- tests/fixtures/dataset_factories.py | 11 ++++ tests/fixtures/hub.py | 86 +++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/tests/fixtures/dataset_factories.py b/tests/fixtures/dataset_factories.py index a04624f6..ce5c7d62 100644 --- a/tests/fixtures/dataset_factories.py +++ b/tests/fixtures/dataset_factories.py @@ -449,6 +449,7 @@ def lerobot_dataset_factory( episodes_factory, hf_dataset_factory, mock_snapshot_download_factory, + mock_hf_hub_download_factory, lerobot_dataset_metadata_factory, ) -> LeRobotDatasetFactory: def _create_lerobot_dataset( @@ -494,6 +495,14 @@ def _create_lerobot_dataset( episodes=episode_dicts, hf_dataset=hf_dataset, ) + mock_hf_hub_download = mock_hf_hub_download_factory( + info=info, + stats=stats, + episodes_stats=episodes_stats, + tasks=tasks, + episodes=episode_dicts, + hf_dataset=hf_dataset, + ) mock_metadata = lerobot_dataset_metadata_factory( root=root, repo_id=repo_id, @@ -507,10 +516,12 @@ def _create_lerobot_dataset( patch("opentau.datasets.lerobot_dataset.LeRobotDatasetMetadata") as mock_metadata_patch, patch("opentau.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version_patch, patch("opentau.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download_patch, + patch("opentau.datasets.lerobot_dataset.hf_hub_download") as mock_hf_hub_download_patch, ): mock_metadata_patch.return_value = mock_metadata mock_get_safe_version_patch.side_effect = lambda repo_id, version: version mock_snapshot_download_patch.side_effect = mock_snapshot_download + mock_hf_hub_download_patch.side_effect = mock_hf_hub_download # Construct a minimal TrainPipelineConfig for the dataset from dataclasses import dataclass diff --git a/tests/fixtures/hub.py b/tests/fixtures/hub.py index d78217c2..0a1f1994 100644 --- a/tests/fixtures/hub.py +++ b/tests/fixtures/hub.py @@ -132,3 +132,89 @@ def _mock_snapshot_download( return _mock_snapshot_download return _mock_snapshot_download_func + + +@pytest.fixture(scope="session") +def mock_hf_hub_download_factory( + info_factory, + info_path, + stats_factory, + stats_path, + episodes_stats_factory, + episodes_stats_path, + tasks_factory, + tasks_path, + episodes_factory, + episode_path, + single_episode_parquet_path, + hf_dataset_factory, +): + """Patch hf_hub_download so a single-file fetch writes the expected fixture file. + + Mirrors mock_snapshot_download_factory, but for the one-file-at-a-time path + used by LeRobotDataset.download_files (which snapshot_download mocking does + not cover). + """ + + def _mock_hf_hub_download_func( + info: dict | None = None, + stats: dict | None = None, + episodes_stats: list[dict] | None = None, + tasks: list[dict] | None = None, + episodes: list[dict] | None = None, + hf_dataset: datasets.Dataset | None = None, + ): + if not info: + info = info_factory() + if not stats: + stats = stats_factory(features=info["features"]) + if not episodes_stats: + episodes_stats = episodes_stats_factory( + features=info["features"], total_episodes=info["total_episodes"] + ) + if not tasks: + tasks = tasks_factory(total_tasks=info["total_tasks"]) + if not episodes: + episodes = episodes_factory( + total_episodes=info["total_episodes"], total_frames=info["total_frames"], tasks=tasks + ) + if not hf_dataset: + hf_dataset = hf_dataset_factory(tasks=tasks, episodes=episodes, fps=info["fps"]) + + def _mock_hf_hub_download( + repo_id: str, + filename: str, + local_dir: str | Path | None = None, + *args, + **kwargs, + ) -> str: + if not local_dir: + local_dir = OPENTAU_TEST_DIR + local_dir = Path(local_dir) + path = Path(filename) + + if path.suffix == ".parquet" and path.stem.startswith("episode_"): + ep_idx = int(path.stem[len("episode_") :]) # 'episode_000000' -> 0 + single_episode_parquet_path(local_dir, ep_idx, hf_dataset, info) + elif filename == INFO_PATH: + info_path(local_dir, info) + elif filename == STATS_PATH: + stats_path(local_dir, stats) + elif filename == EPISODES_STATS_PATH: + episodes_stats_path(local_dir, episodes_stats) + elif filename == TASKS_PATH: + tasks_path(local_dir, tasks) + elif filename == EPISODES_PATH: + episode_path(local_dir, episodes) + else: + # Video files (and anything else without a fixture writer): the + # tests that hit this path never decode them, so an empty + # placeholder is enough to satisfy the on-disk checks. + fpath = local_dir / filename + fpath.parent.mkdir(parents=True, exist_ok=True) + fpath.touch() + return str(local_dir / filename) + + return _mock_hf_hub_download + + return _mock_hf_hub_download_func From 9e1f8ec1dc84b1825de54220bc137e92f744e9c4 Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Thu, 14 May 2026 14:44:37 -0700 Subject: [PATCH 17/18] refactor(datasets): address #304 review feedback - load_hf_dataset: drop the unreachable `episodes is None` glob branch (__init__ backfills self.episodes before this method runs), remove the now-unused `import re`, and document in the docstring that schema is inferred from parquet (not validated against info.json) and that the Arrow cache is unpruned. - download_files: hoist the thread-pool width to a named module constant _DOWNLOAD_MAX_WORKERS. - Add test_download_files_skips_present_files (asserts zero hf_hub_download calls when the episode files are already on disk) and test_unresolvable_task_label_skipped (episode_to_task_index_from_episodes skips a task label absent from tasks.jsonl). --- src/opentau/datasets/lerobot_dataset.py | 52 +++++++++++++----------- tests/datasets/test_datasets.py | 23 +++++++++++ tests/datasets/test_speed_percentiles.py | 11 +++++ 3 files changed, 62 insertions(+), 24 deletions(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index d45733ac..7b27137d 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -87,7 +87,6 @@ import json import logging import math -import re import shutil import traceback from abc import abstractmethod @@ -219,6 +218,12 @@ def wrapped(self, idx): CODEBASE_VERSION = "v2.1" +# Thread-pool width for the per-file `hf_hub_download` fan-out in `download_files`. +# The work is network-I/O-bound (the GIL is released during each request), so a +# width well above the core count is fine; 16 keeps enough round-trips in flight +# without hammering the Hub. +_DOWNLOAD_MAX_WORKERS = 16 + # Set of repo_ids for which we've already emitted the "missing control_mode" warning. # Keyed at module level so duplicates are suppressed across multiple LeRobotDataset # instances within a single process (e.g., train + val constructed for the same repo). @@ -1584,7 +1589,7 @@ def _fetch(fpath: str) -> None: local_dir=self.root, ) - with ThreadPoolExecutor(max_workers=16) as pool: + with ThreadPoolExecutor(max_workers=_DOWNLOAD_MAX_WORKERS) as pool: # list() forces the lazy map so any per-file failure propagates. list(pool.map(_fetch, missing)) @@ -1642,32 +1647,31 @@ def load_hf_dataset(self) -> datasets.Dataset: Loads the per-episode parquet files via `load_dataset("parquet", ...)`, which builds a memory-mapped Arrow cache under `$HF_HOME/datasets/`. The cache costs disk (~1-5x the parquet, compression-dependent — see #277) - but the loaded dataset is genuinely memory-mapped (resident pages are - file-backed and reclaimable), so RAM stays bounded by the OS page cache - rather than the dataset size. That is essential here: 8 ranks each load - the full mixture, and the multi-hundred-GB video repos would otherwise - OOM the node. A hand-rolled `pa_ds.to_table()` + `Dataset(table)` (or - streaming to a self-written Arrow IPC file + `Dataset.from_file`) was - tried and both materialised into anonymous RAM instead of mmapping — - `load_dataset` routes through HF's ParquetDatasetBuilder, whose Arrow - cache `Dataset.from_file` *does* memory-map correctly. + and nothing prunes it, so a multi-hundred-GB mixture needs that much + extra disk provisioned. In exchange the loaded dataset is genuinely + memory-mapped (resident pages are file-backed and reclaimable), so RAM + stays bounded by the OS page cache rather than the dataset size. That is + essential here: 8 ranks each load the full mixture, and the + multi-hundred-GB video repos would otherwise OOM the node. A hand-rolled + `pa_ds.to_table()` + `Dataset(table)` (or streaming to a self-written + Arrow IPC file + `Dataset.from_file`) was tried and both materialised + into anonymous RAM instead of mmapping — `load_dataset` routes through + HF's ParquetDatasetBuilder, whose Arrow cache `Dataset.from_file` *does* + memory-map correctly. + + Schema is inferred from the parquet files themselves; it is intentionally + not validated against `meta/info.json`. This lets cross-file column drift + load instead of raising a cast error, at the cost of `info.json` no + longer being the authoritative schema (drift now surfaces as a + `load_dataset` failure rather than a feature-cast error). Files are passed in sorted-episode order so the hf_dataset row layout stays aligned with `episode_data_index["from"/"to"]` — see the - `sorted(episodes)` note in __init__. + `sorted(episodes)` note in __init__. `self.episodes` is always a concrete + list here: __init__ backfills it with the full episode list before this + method is ever called. """ - # Derive the parquet glob from the meta data_path template so that - # datasets with a non-default `info["data_path"]` (deeper nesting, - # flat layout, etc.) keep working. Default template is - # "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet" - # which yields the glob "data/chunk-*/episode_*.parquet". Assumes the - # template uses simple `{name}` / `{name:fmt}` placeholders and no - # literal `{{`/`}}` escapes — true for every in-repo writer. - if self.episodes is None: - glob_pattern = re.sub(r"\{[^}]+\}", "*", self.meta.data_path) - files = [str(p) for p in sorted(self.root.glob(glob_pattern))] - else: - files = [str(self.root / self.meta.get_data_file_path(ep_idx)) for ep_idx in self.episodes] + files = [str(self.root / self.meta.get_data_file_path(ep_idx)) for ep_idx in self.episodes] if not files: raise FileNotFoundError(f"No parquet files for {self.repo_id} under {self.root}") hf_dataset = load_dataset("parquet", data_files=files, split="train") diff --git a/tests/datasets/test_datasets.py b/tests/datasets/test_datasets.py index ba32809a..872332dd 100644 --- a/tests/datasets/test_datasets.py +++ b/tests/datasets/test_datasets.py @@ -145,6 +145,29 @@ def test_dataset_no_episodes_loads_all(tmp_path, lerobot_dataset_factory): _assert_episode_row_alignment(dataset) +def test_download_files_skips_present_files(tmp_path, lerobot_dataset_factory): + """download_files must not call hf_hub_download for files already on disk. + + This is the core of the 429-avoidance fix: a pre-downloaded episode set + should make download_files a no-op with zero Hub requests. Constructing + the dataset already places every selected-episode file on disk, so a + second download_files pass over the same paths must fetch nothing. + """ + dataset = lerobot_dataset_factory( + root=tmp_path / "test", + repo_id=DUMMY_REPO_ID, + total_episodes=10, + total_frames=400, + episodes=[2, 5, 6], + ) + files = dataset.get_episodes_file_paths() + assert files, "expected a non-empty file list for the test to be meaningful" + assert all((dataset.root / f).is_file() for f in files), "fixture should pre-place all files" + with patch("opentau.datasets.lerobot_dataset.hf_hub_download") as mock_hf_hub_download: + dataset.download_files(files) + mock_hf_hub_download.assert_not_called() + + def test_add_frame_missing_task(tmp_path, empty_lerobot_dataset_factory): features = {"state": {"dtype": "float32", "shape": (1,), "names": None}} dataset = empty_lerobot_dataset_factory(root=tmp_path / "test", features=features) diff --git a/tests/datasets/test_speed_percentiles.py b/tests/datasets/test_speed_percentiles.py index 07fa3828..a83a5a56 100644 --- a/tests/datasets/test_speed_percentiles.py +++ b/tests/datasets/test_speed_percentiles.py @@ -126,6 +126,17 @@ def test_empty_tasks_skipped(self): t2i = {"taskA": 0} assert episode_to_task_index_from_episodes(episodes, t2i) == {2: 0} + def test_unresolvable_task_label_skipped(self): + # A task label absent from task_to_task_index (episodes.jsonl / + # tasks.jsonl drift) is skipped, not raised on; resolvable episodes + # in the same dataset still come through. + episodes = { + 0: {"tasks": ["ghost"], "length": 100}, + 1: {"tasks": ["taskA"], "length": 200}, + } + t2i = {"taskA": 0} + assert episode_to_task_index_from_episodes(episodes, t2i) == {1: 0} + class TestBucketEpisodeLength: @pytest.fixture From b6bc39d5ea7842f31c7b3aceef6467665ebe57b5 Mon Sep 17 00:00:00 2001 From: Shuheng Liu Date: Thu, 14 May 2026 15:17:05 -0700 Subject: [PATCH 18/18] docs(datasets): clarify load_hf_dataset schema-inference docstring Split the two distinct schema-drift cases the re-review flagged as conflated: a parquet/info.json mismatch now loads silently (parquet wins), whereas a mismatch between parquet files of one dataset fails as a load_dataset concatenation error. --- src/opentau/datasets/lerobot_dataset.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/opentau/datasets/lerobot_dataset.py b/src/opentau/datasets/lerobot_dataset.py index 7b27137d..2abf6c14 100644 --- a/src/opentau/datasets/lerobot_dataset.py +++ b/src/opentau/datasets/lerobot_dataset.py @@ -1660,10 +1660,11 @@ def load_hf_dataset(self) -> datasets.Dataset: memory-map correctly. Schema is inferred from the parquet files themselves; it is intentionally - not validated against `meta/info.json`. This lets cross-file column drift - load instead of raising a cast error, at the cost of `info.json` no - longer being the authoritative schema (drift now surfaces as a - `load_dataset` failure rather than a feature-cast error). + not validated against `meta/info.json`. So a parquet/`info.json` mismatch + now loads silently — the parquet's own schema wins and `info.json` is no + longer authoritative. A mismatch *between* the parquet files of a single + dataset still fails, but as a `load_dataset` concatenation error rather + than the old explicit feature-cast error. Files are passed in sorted-episode order so the hf_dataset row layout stays aligned with `episode_data_index["from"/"to"]` — see the