test(data_plane): pin async-RL filter flow + example BaseRolloutFilter#2593
Open
ZhiyuLi-Nvidia wants to merge 10 commits into
Open
test(data_plane): pin async-RL filter flow + example BaseRolloutFilter#2593ZhiyuLi-Nvidia wants to merge 10 commits into
ZhiyuLi-Nvidia wants to merge 10 commits into
Conversation
Contributor
Author
|
/ok to test dd2608c |
Contributor
Author
|
/ok to test 85bc768 |
Contributor
Author
|
/ok to test 85bc768 |
Adds an e2e flow test for the async-RL rollout-buffer filter pipeline: claim_meta (claim batch atomically) -> driver-side filter (StalenessFilter example) -> shard_meta_for_dp (sync DP fan-out path) -> clear_samples for the drop set. Demonstrates that the filter step can be built on existing DataPlaneClient surfaces with no new methods, no TQ patch, no second hop. The test-only example filter mirrors TQ's BaseSampler shape but executes on the driver where it can see in-process state (current weight version) that's awkward to serialize through TQ's sampling_config. Combines verl's driver-side location with TQ's class-based extension. Sits under tests/ (not nemo_rl/) until async-RL chooses to adopt the pattern. Hardened against the issues caught in code review: - Filter raises (not silently drops/keeps) on missing tags, missing weight_version key, None / float / bool values, or missing current_weight_version kwarg - Both (keep, drop) halves are fresh subset copies — mutation isolated - Test uses blocking claim_meta (the default) to avoid racing the controller's async production_status update thread - Tag/seqlen alignment checked by sample_id, not list index, so the test survives any TQ default-sampler change - check_consumption_status used for the cursor-advance assertion (not the polling-mode short-circuit that would pass for the wrong reason) - Counter equality used for DP-shard coverage (set + len would let a swap pass silently) - Partition cleared at test end so state can't leak into subsequent tests reusing the partition_id Also refreshes data-plane-async-proposal.md so it reflects reality: KVBatchMeta.tags propagation through claim_meta is implemented today (no longer a TODO); Option 2 is the recommended pattern; the new test is the canonical reference. Files: - tests/unit/data_plane/_rollout_filters.py — BaseRolloutFilter ABC + StalenessFilter - tests/unit/data_plane/test_async_rl_filter_flow.py — Ray + simple-backend flow test - nemo_rl/data_plane/docs/data-plane-async-proposal.md — stale-todo refresh Verified locally: 18/18 NoOp smoke checks (covers all filter error paths, fresh-copy invariant, abstract base). Ray-based e2e covered by CI. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Replaces the single-process flow test with one that mirrors the production async-RL shape: a Ray actor stands in for AsyncTrajectoryCollector and writes rollout batches to TQ on demand, while the driver claims, filters, DP-fans-out, and clears in the test body. Closer to async-RL e2e than the in-process variant: the producer attaches to the TQ controller via tq.init() in its own Ray worker process (same path real workers use); the driver uses the dp_client wrappers (claim_meta, get_data, clear_samples) just like the trainer. Side benefits: - Reuses _rollout_shapes.make_rollout_batch / make_realistic_tags / register_train_partition / keys_from_uids - schema matches what the sync trainer's SyncRolloutActor actually writes (DP_TRAIN_FIELDS, not a hand-rolled minimal set). - Pipeline and verification are in distinct phases - no asserts inside the pipeline body; everything checked in a trailing block. Reads as "what happens" vs "what we require." - Tag/seqlen alignment checked via sample_id lookup (positional alignment would silently break on a TQ default-sampler change). File renamed test_async_rl_filter_flow.py to test_async_rl_with_producer_actor.py to reflect the producer-actor shape; git mv preserves history. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Test fixes (tests/unit/data_plane/test_async_rl_with_producer_actor.py): - register_train_partition now uses partition_id=pid so pre-registration targets the partition under test (was defaulting to 'train'). - Producer's bulk batch now includes a ``content`` field as np.ndarray(object) of per-row message_logs — matches SyncRolloutActor's production write shape (sync_rollout_actor.py:271-273) so the test's mock dataset is shape-equivalent to a real rollout batch. - ruff format applied. Doc consolidation: - Remove nemo_rl/data_plane/docs/data-plane-async-proposal.md (310 lines of design proposal + open questions, much of it now stale). - Replace the stub 'Async path (proposed)' section in nemo_rl/data_plane/README.md with a self-contained ~50-line summary: task-mediated vs direct-by-key, E2E flow diagram, the recommended tag-based filter pattern, and pointers to the canonical test example. Cleanup (Ray actor + partition state) intentionally NOT added to the test — tests/unit/conftest.py:380 has a session-scope init_ray_cluster fixture that calls ray.shutdown(), and the tq_client fixture's client.close() kills the TQ controller actor. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Two changes to align with grpo_sync.py's data-plane usage: 1. Producer's bulk write: tq.kv_batch_put -> kv_first_write (column_io.py:123). Same helper SyncRolloutActor uses on every step. Removes the raw TQ wire call from the test — the producer now goes through the DataPlaneClient surface via build_data_plane_client. 2. Per-rank shard fetch: dp_client.get_data -> read_columns (column_io.py:51). Same helper grpo_sync.py uses for worker fetches. Wraps get_samples + materialize (jagged unpack, object decode). Naming: tq_client -> dp_client throughout (fixture name, test parameter, all internal references). The DataPlaneClient ABC is backend-agnostic; the variable name shouldn't pin the test to TQ. The _TQ_CLIENT_CONFIG constant stays since the config dict IS TQ-specific (impl=transfer_queue, global_segment_size). claim_meta / check_consumption_status / clear_samples stay raw dp_client.* calls — these are at the backend-agnostic ABC level and have no higher-level wrappers in production. Sibling tests (test_tq_lifecycle.py, test_correctness.py, test_sync_one_hop.py, test_interface_contract.py, …) operate at the same level. Verified on Slurm: job 12218683, 1 passed in 146s with the kv_first_write + read_columns refactor; rename is cosmetic and doesn't change behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…oryCollector shape
Rename _ProducerActor -> _TQBackedAsyncCollector and split its surface
into two methods so it reads as a stand-in for the not-yet-built
production class that would fuse:
- AsyncTrajectoryCollector (async_utils/trajectory_collector.py):
weight-version state + per-version produce loop. Currently writes to
an in-memory ReplayBuffer, not TQ.
- SyncRolloutActor (experience/sync_rollout_actor.py): one Ray RPC =
rollout + flatten/mask + kv_first_write. Currently sync-only.
The fusion is the natural next step for TQ-backed async-RL; this test
pretends it exists so the integration test pins its shape today.
API:
- set_weight_version(version) -> None (mirrors AsyncTrajectoryCollector)
- rollout_to_tq(partition_id, sample_ids) -> sample_ids
(mirrors SyncRolloutActor.rollout_to_tq — rollout body is mocked
via _rollout_shapes helpers, kv_first_write is real)
Driver flow now reads as:
producer.set_weight_version.remote(1)
stale_done = producer.rollout_to_tq.remote(pid, keys_stale)
producer.set_weight_version.remote(current_v)
fresh_done = producer.rollout_to_tq.remote(pid, keys_fresh)
ray.get([stale_done, fresh_done])
Ray actor methods serialize on a single actor (default
max_concurrency=1), so the version stamp lands before each rollout
call without explicit waits.
Verified on Slurm job 12224679: 1 passed in 154s, exit 0.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Centralize the SimpleStorage DataPlaneConfig used by tq_client-style
fixtures into a YAML file + thin loader helper, mirroring the production
load path (master_config.data_plane via OmegaConf) instead of
hardcoding a dict in Python.
- New: tests/unit/data_plane/_simple_backend_dp.yaml — the config,
with the 8 GiB / 1 GiB (only-read-by-mooncake_cpu) annotations.
- New helper in _rollout_shapes.py:
def simple_backend_dp_config() -> dict[str, Any]:
return OmegaConf.to_container(OmegaConf.load(yaml_path), resolve=True)
- test_async_rl_with_producer_actor.py: drop the hardcoded
_TQ_CLIENT_CONFIG dict; both fixture and actor call the helper.
Sibling tests (test_tq_lifecycle.py, test_seqpack_equivalence.py)
still inline the same dict — left for a separate cleanup PR.
Verified on Slurm job 12234584: 1 passed in 148s, exit 0.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Replace the redundant tests/unit/data_plane/_simple_backend_dp.yaml with a load from tests/unit/reference_configs/grpo_math_1B.yaml's existing ``data_plane:`` section. That file is the canonical test reference (mirrored from examples/configs/grpo_math_1B.yaml), so the 8 GiB / mooncake_cpu annotations and the field list now live in ONE place — production-shaped YAML — instead of being duplicated. The helper applies three test-specific overrides on top: - ``enabled: True`` (production default is False) - ``storage_capacity: 1024`` (vs prod 1_000_000 — fast in-process setup) - ``num_storage_units: 1`` (vs prod 2) If production bumps any of the inherited fields the test inherits automatically; the overrides spell out exactly what's test-specific. Verified on Slurm job 12235120: 1 passed in 148s, exit 0. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
- _TQBackedAsyncCollector.__init__ now passes bootstrap=False, matching every production rollout-side caller (sync_rollout_actor.py:94, worker_mixin.py:142); the actor attaches to the driver's controller instead of re-entering the bootstrap path. - policy_refit delegates to set_weight_version so the version write has a single source of truth — collapses the silent-divergence risk. - StalenessFilter: accept numpy.integer scalars (real producers may emit np.int64 via torch.Tensor.item()); add an explicit len(tags) == size guard so trailing rows can't be silently dropped if the two ever disagree. - Weaken the "fresh copies" docstring claim in _rollout_filters — meta.subset rebuilds the outer list but per-sample tag dicts are shared references with the input. - Pin impl='transfer_queue', backend='simple' in simple_backend_dp_config so the helper name matches its guarantee if the reference YAML's data_plane defaults drift. - Fix the register_train_partition annotation: NoOpDataPlaneClient → DataPlaneClient (real callers all pass TQ-backed clients). - Assert keep_meta.tags / drop_meta.tags are non-None and length-matched so a subset-drops-tags regression can't pass silently. Verified on Slurm: job 12236253 PASSED in 223s. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Pre-commit ruff stripped the second blank line between the import block and the producer-actor section comment. Apply the same fix locally so CI passes without the lint hook auto-fixing again. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Line numbers rot as code shifts — keep pure explanations of what each step does and why. Class names and module-level concepts stay (those are stable); concrete `path/to/file.py:NN` citations and bare module paths are removed from comments and docstrings. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
85bc768 to
2216302
Compare
Contributor
Author
|
/ok to test 2216302 |
29 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What does this PR do ?
Adds an e2e flow test for the async-RL rollout-buffer filter pipeline: (claim_meta → driver-side filter → DP fan-out → clear_samples), driven by a Ray producer actor that mimics AsyncTrajectoryCollector with TQ write path. Replaces the proposal doc with a README "Async path" section.
Usage
# Add a code snippet demonstrating how to use thisBefore your PR is "Ready for review"
Pre checks:
Additional Information