Skip to content

Add R3 router replay support#2465

Closed
zyzhou5 wants to merge 84 commits into
NVIDIA-NeMo:zhiyul/data_plane_planfrom
zyzhou5:r3-router-replay-pr2439
Closed

Add R3 router replay support#2465
zyzhou5 wants to merge 84 commits into
NVIDIA-NeMo:zhiyul/data_plane_planfrom
zyzhou5:r3-router-replay-pr2439

Conversation

@zyzhou5

@zyzhou5 zyzhou5 commented May 11, 2026

Copy link
Copy Markdown
Contributor

What does this PR do ?

Add a one line overview of what this PR aims to accomplish.

Issues

List issues that this PR closes (syntax):

Usage

  • You can potentially add a usage example below
# Add a code snippet demonstrating how to use this

Before your PR is "Ready for review"

Pre checks:

  • Make sure you read and followed Contributor guidelines
  • Did you write any new necessary tests?
  • Did you run the unit tests and functional tests locally? Visit our Testing Guide for how to run tests
  • Did you add or update any necessary documentation? Visit our Document Development Guide for how to write, build and test the docs.

Additional Information

  • ...

@zyzhou5 zyzhou5 requested review from a team as code owners May 11, 2026 21:35
@copy-pr-bot

copy-pr-bot Bot commented May 11, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

ZhiyuLi-Nvidia and others added 21 commits May 13, 2026 04:07
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Both rl-arena and verl converge on driver-balanced metadata + worker-side
direct fetch (1-hop). Plan updates:

- Header reframed: rl-arena and verl as co-references (same idea, different
  worker plumbing). NeMo-RL adopts verl's @tqbridge decorator.
- Stage 4: corrected LOC estimate (~150-250, not 400-600). shard_keys_by_seqlen
  uses sort-by-seqlen + stride (matches rl-arena's shard_for_dp and NeMo-RL's
  dynamic_batching_args branch). Single algorithm, no strategy parameter.
- Stage 4: TP/CP/PP guidance — broadcast inside the group, not per-sibling
  fetch. CP sequence-dim slicing happens in model forward, not data plane.
- Stage 3 lifecycle: corrected ordering (prev_lp + ref_lp + mask before
  advantage; KL-in-reward needs both logprobs).
- Stage-completion design: field-presence is the natural ready signal;
  mark_consumed dropped from public ABC (TQ advances inside get_meta(fetch)).
- KVBatchMeta mirrors transfer_queue.metadata.KVBatchMeta 1:1 (fields,
  not fields_available).
- ABC adds direct-by-key kv_batch_get / kv_batch_put / kv_clear.
- TQ pinned to 0.1.5 (matches local wheel); pyproject packages.find fix
  so nemo_rl.data_plane gets installed.
- New risks: R11 (dynamic sampling/DAPO), R12 (message_log Tier-1/3 split),
  R13 (stage completion / fault tolerance).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
… balanced packing

Adds an optional data-plane layer that routes GRPO train data through
TransferQueue (Ray-actor-backed KV store) instead of Ray's in-memory
object store. Mirrors verl's main_ppo.py / main_ppo_sync.py split:
algorithms/grpo.py is unchanged; algorithms/grpo_sync.py is a TQ-only
sibling dispatched when data_plane.enabled=true.

Key pieces:
- nemo_rl/data_plane/: stable adapter boundary (DataPlaneClient ABC,
  KVBatchMeta), TQ adapter, codec, sharder, observability middleware.
- @dp_dispatch decorator: makes Policy methods polymorphic over
  BatchedDataDict (legacy) and KVBatchMeta / list[KVBatchMeta] (TQ).
- Driver-side balanced packing: when sequence packing or dynamic
  batching is on, shard_by_batch_size must be called once on the
  driver with shards=DP_world — bin_count_multiple=DP_world is what
  keeps per-DP n_microbatches uniform. Per-shard packing metadata
  rides in KVBatchMeta.extra_info; train_presharded reattaches it
  post-fetch and skips local repack. Without this, per-rank shards=1
  packing produced different n_microbatches across DP groups and
  Megatron deadlocked at the first cross-DP collective (10-min NCCL
  watchdog at step 4 in our 2-node qwen3-30b runs).

Verification:
- Unit (5/5): dispatch decorator handles BatchedDataDict / KVBatchMeta /
  list[KVBatchMeta], rejects size mismatches, etc.
- Functional (3/3): legacy and TQ paths produce byte-identical sharded
  data + packing metadata for seqpack / dynbatch / no-packing — proves
  the data plane is a lossless transport, isolated from NCCL noise.
- E2E: qwen3-30b mcore GRPO 5/5 steps green for baseline-TQ, seqpack-TQ,
  and dynbatch-TQ on 2 nodes (16 GPUs).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…ard helpers

Pulls the driver-side balanced-packing + per-rank fan-out block out of
grpo_sync.py:605-704 into nemo_rl/data_plane/preshard.py so the same
two operations can be reused by future async data-plane trainers
without duplicating the bin_count_multiple=DP_world incantation.

The original block had two distinct concerns inlined together:
  1. Compute pre-shards from train_data via shard_by_batch_size with
     packing args derived from policy_cfg (pure transform, no I/O).
  2. For each pre-shard, kv_batch_put seed fields and build a
     KVBatchMeta with packing metadata in extra_info (TQ I/O).

Split into:
  - driver_balanced_preshards(train_data, dp_world, policy_cfg)
      → list[BatchedDataDict]
  - fan_out_per_rank_metas(pre_shards, dp_client, partition_id,
                           task_name, key_prefix, seed_fields)
      → list[KVBatchMeta]

key_prefix is the only behavioural parameter: sync GRPO passes
f"step{total_steps}", future async path will pass
f"v{wv}_step{step}". Field iteration order, .detach().contiguous()
calls, and KVBatchMeta construction order are byte-identical to the
inline version — the refactor preserves the exact balanced-packing
semantics that prevent Megatron from deadlocking on the first
cross-DP collective when sequence packing / dynamic batching is on
(commit a085559 described the 10-min NCCL watchdog at step 4).

Touches:
  - nemo_rl/data_plane/preshard.py (new, 162 lines): two helpers,
    distinct from sharding.py which is metadata-only sort-by-seqlen
    for the @dp_dispatch default fan-out.
  - nemo_rl/algorithms/grpo_sync.py (-113 / +21 net): inline block
    replaced with two helper calls; dead imports (asyncio,
    tensordict.TensorDict, KVBatchMeta) removed.
  - tests/data_plane/unit/test_architecture_invariants.py
    (R-C9 invariant): the regex check 'KVBatchMeta(' now accepts
    delegation via 'fan_out_per_rank_metas(' as well, with a
    chained check that the helper itself constructs KVBatchMeta so
    the dispatch chain to the TQ branch isn't silently broken.

Verification:
  - Tier 1 unit (data_plane): 56/56 passed (Python 3.13.13,
    nightly nemo-rl image).
  - Tier 2 functional (data_plane): 4 passed, 1 skipped — including
    test_seqpack_legacy_equals_tq, test_dynbatch_legacy_equals_tq,
    test_no_packing_legacy_equals_tq (all three byte-equality
    parity tests against the legacy inline path).
  - E2E: qwen3-30b mcore GRPO seqpack-TQ run past step 3 with no
    NCCL deadlock, validating the bin_count_multiple invariant
    survives the helper extraction.

Companion doc: research/data_plane_async_rl_limitations.md §5.4
explains why these helpers belong on the data-plane boundary rather
than in the algorithms layer (TQ I/O is data-plane concern, packing
is reused across sync and async trainers).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
… enabled

Producer-side hook for the planned async-on-TQ path. When ``dp_cfg``
is set on ``AsyncTrajectoryCollector``, the rollout's ``final_batch``
is tensorized into the TQ partition ``rollouts`` and a
``KVBatchMeta`` reference is pushed onto the buffer instead of the
in-memory dict. Pairs with PR 2 (ReplayBuffer clears the meta's TQ
keys on consume) and the upcoming PR 4 (grpo_async_dp.py — trainer
materializes per consumed batch and fans out via preshard.py).

Mechanics:
  - Keys: f"v{wv}_p{prompt_idx}_g{i}" — versioned namespace so the
    same prompt at different weight versions can't collide; trainer
    can later filter by ``tag.version`` if needed.
  - Tags: ``[{"version": wv}] * n_samples`` for each put. The version
    is duplicated on every key in the batch but each tag dict is the
    same object reference; serializer dedupes.
  - Fields: every ``torch.Tensor`` leaf of ``final_batch_cpu`` is
    written. The trainer side picks which to fetch via
    ``select_fields`` rather than constraining what the producer
    writes — keeps the producer schema-agnostic.
  - extra_info: rollout_metrics + timestamp ride on the meta so the
    trainer's per-step bookkeeping survives the TQ round-trip without
    a side channel.

``asyncio.run(client.kv_batch_put(...))`` is safe here because
``_collection_loop`` is a worker thread without an enclosing event
loop (Race 3 in the limitations doc; the running-loop conflict only
fires when there's already an asyncio loop in the calling thread).

Backward-compat: ``dp_cfg=None`` default — the in-memory async path
is byte-for-byte unchanged. The ``client = self._ensure_dp_client()``
guard short-circuits all new code when the data plane isn't enabled.
``bootstrap=False`` so the collector attaches to the driver's
controller rather than spinning up a second named actor.

Producer-owned rollback (kv_clear when push_with_wait_signal returns
"full") is *not* part of this PR. The current loop retries with
exponential backoff on "full" rather than dropping — kv_clear in that
path would lose data we just wrote. The shutdown-with-pending-meta
edge case (cluster ends while a put is in-flight) is left as a known
leak for now; TQ partitions are ephemeral per cluster, so it doesn't
accumulate across runs.

No call site passes ``dp_cfg`` yet — the wiring at
``algorithms/grpo.py:2527`` (the trainer_collector.options(...).remote
construction) lands in PR 4 alongside the dispatch in run_grpo.py.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…ed packing

Lights up async-on-TQ as a callable path:

  * ReplayBuffer.sample materializes any popped KVBatchMeta into the
    dict format ``async_grpo_train`` expects ({"batch", "rollout_metrics",
    "timestamp"}). Materialize+clear stays under the buffer lock —
    Race 5: keys are versioned so collisions are unlikely, but the lock
    is the cheap correctness invariant. Pairs with PR 2's clear-on-
    consume.

  * async_grpo_train reads master_config["data_plane"]; if enabled,
    bootstraps the TQ controller on the driver, captures the client
    handle (``_dp_client``), and threads ``dp_cfg`` to both
    ReplayBuffer and AsyncTrajectoryCollector at construction
    (bootstrap=False on the actor side).

  * At the policy.train call site, async_grpo_train now branches: when
    the client is set, drive the same balanced packing + per-rank
    fan-out as grpo_sync (driver_balanced_preshards +
    fan_out_per_rank_metas, key_prefix=f"v{wv}_s{step}"), call
    policy.train(list[KVBatchMeta]) — the @dp_dispatch list path with
    is_meta_list=True (dispatch.py:116-127), and kv_clear the train
    partition before the next step. This is the same bin_count_multiple
    invariant a085559 added for sync; without it, async + sequence
    packing would deadlock at the first cross-DP collective the same
    way sync did pre-a085559c.

  * Hoist DP_SEED_FIELDS from grpo_sync.py to nemo_rl/data_plane/
    preshard.py — both trainers now import the canonical schema. Test
    fixture in tests/data_plane/functional/test_seqpack_equivalence.py
    keeps its own copy on purpose (testing the wire schema as a
    contract, not the producer constant).

Why ``list[KVBatchMeta]`` and not single ``KVBatchMeta``:
    The single-meta path runs the @dp_dispatch sharder
    (shard_keys_by_seqlen) which sorts by seqlen and strides — that
    reorders samples vs. ``meta.keys`` order and skips the policy-aware
    sharding semantics (no GBS check, no FLOPs recording, no
    sequence-packing validation). The list-of-metas path skips the
    sharder entirely and uses the driver's pre-balanced layout.

Known gaps (NOT fixed here, follow-up):
  * FLOPs reporting is silently dropped on the @dp_dispatch list
    path. Lives in lm_policy.train's body (lm_policy.py:730-742) which
    the decorator skips when input is meta-shaped. Affects both
    grpo_sync (since a085559) and now the async-on-TQ path. Right fix
    is a _dp_post_train post-aggregator hook on the decorator —
    landing as a separate PR. ``policy.get_logprobs(KVBatchMeta)``
    has its own ordering bug (sharder reorders, aggregator concats in
    rank order) but async never goes through that path; flagged for
    documentation only.
  * Two TQ round-trips per async step (rollouts partition →
    materialize → train partition → workers). Necessary because the
    trainer needs the assembled BatchedDataDict for reward / advantage
    computation between the two TQ stages. Future optimization can
    fuse if reward/advantage move to the workers.

Backward-compat: when data_plane.enabled is unset/false, async path
behavior is byte-for-byte unchanged — _dp_client stays None, the new
branch isn't taken, ReplayBuffer / AsyncTrajectoryCollector get
dp_cfg=None and short-circuit all data-plane code.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…patch TQ path

Closes Issues #3 and NVIDIA-NeMo#4 raised in PR review of the data-plane stack.

Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order.
``shard_keys_by_seqlen`` sorts by sequence length and strides
(``order[r::dp_world_size]``) to balance per-rank token totals. The
worker logprob aggregators (``_aggregate_logprob_results``) then
concatenate per-rank outputs in rank order via
``BatchedDataDict.from_batches`` — without inverting the seqlen-
strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))``
returned rows in
[order[0], order[d], order[2d], …, order[1], order[1+d], …]
order, not the caller's ``meta.keys`` order. Silent correctness bug
(test_seqpack_legacy_equals_tq didn't catch it because the sync path
calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough,
no sharder).

Fix:
  * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per
    shard in ``extra_info`` (the ``idx`` list it computed).
  * ``dp_dispatch`` reconstructs the concat-position → input-index
    permutation from the shards' ``extra_info``, then applies the
    inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``.
  * The reorder is gated on ``is_meta and not is_meta_list`` — for
    ``list[KVBatchMeta]`` the driver controls ordering (PR 0
    ``fan_out_per_rank_metas``) and the decorator must not undo it.
  * Skipped silently if the result isn't a BatchedDataDict (e.g.
    ``train`` returns a plain dict — order doesn't apply).

Issue NVIDIA-NeMo#4 — TQ path silently dropped legacy training semantics.
The decorator's TQ branch returns ``aggregate(results)`` directly
and never enters ``Policy.train``'s body — so the FLOPs accumulation
at lm_policy.py around the ``flops_tracker`` block, plus the
``num_ranks`` and ``theoretical_tflops`` fields, were missing from
results when the trainer called ``policy.train(KVBatchMeta)`` or
``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS /
DP divisibility assertion.

Fix (additive — no signature changes to the existing aggregate
callables):
  * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path:
    ``total_meta_size % dp_size == 0`` (legacy path enforces this via
    ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call
    site).
  * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after
    ``aggregate``. If defined, calls
    ``post(aggregated, raw_results, shards=shards)`` and uses its
    return value. Convention-based — opt-in per Policy method, no
    decorator boilerplate.
  * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths``
    on each shard (driver-pre-balanced for ``list[KVBatchMeta]``,
    sharder-strided for single ``KVBatchMeta``), records ``total_flops``,
    ``num_ranks``, ``theoretical_tflops`` — same fields the legacy
    body produces.

Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py
and test_dispatch.py don't check ``extra_info`` shape on sharder output
or assert on aggregate-method return type other than what's already
returned, so the additive fields and gated reorder are transparent.
The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it
keeps building results inline and never enters the new hook.

Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the
``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically
via the post-hook. The reorder fix is only meaningful for callers
that pass single ``KVBatchMeta`` — primarily future logprob/reference-
logprob TQ wiring; flagged in commit message of #3 above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…spatch list path

Migrates ``policy.get_logprobs`` and ``policy.get_reference_policy_logprobs``
in ``grpo_sync.py`` from the legacy in-memory ``BatchedDataDict`` body
onto the @dp_dispatch ``list[KVBatchMeta]`` path that train (PR 0)
already uses. Activates the partition's pre-declared ``"prev_lp"`` /
``"ref_lp"`` consumer tasks (line 435) which until now were
reservations the original ``a085559c`` author left for future work.

Why this is safe (and why we don't need the bin_count_multiple
invariant the train path needed):

  Megatron's training step has cross-DP collectives per microbatch —
  gradient sync — so DP ranks lockstep on each microbatch. Different
  per-rank n_microbatches → first-finished rank hangs on the next
  collective (the step-4 NCCL deadlock from ``a085559c``).
  Logprob INFERENCE has no such collective: forward-only, no backward,
  no gradient sync. TP/PP collectives stay within (TP×PP) groups; DP
  ranks don't lockstep through microbatches. So per-rank packing
  variation is fine — slowest rank just takes longer, no deadlock.

  This is exactly why ``train_presharded`` reattaches
  ``meta.extra_info`` packing metadata (driver pre-balanced, must
  override worker's local re-pack) but ``get_logprobs_presharded`` does
  not (worker's local re-pack is fine). a085559's commit message
  documented this distinction; this commit relies on it.

So no worker-side changes are needed. The migration is purely driver-
side:

  before:
    train_data["prev_logprobs"] = policy.get_logprobs(
        BatchedDataDict({...}), timer=timer
    )["logprobs"]
  after:
    sharded, unsorted = logprob_data.shard_by_batch_size(
        dp_world, batch_size=None, sequence_packing_args=spa,
    )                       # policy-aware shard, same args as legacy
                            # body lines 426-450, with logprob_mb_tokens
    metas = fan_out_per_rank_metas(
        sharded, dp_client=..., partition_id="train",
        task_name="prev_lp", key_prefix=f"step{N}_lp",
        seed_fields=("input_ids", "input_lengths", "token_mask",
                     "sample_mask"),
    )                       # PR 0 helper, reused
    out = policy.get_logprobs(metas, timer=timer)
                            # @dp_dispatch is_meta_list=True — skips
                            # sharder, dispatches, aggregator concats.
    if seqpack or dynbatch:
        out.reorder_data(unsorted)
                            # mirrors legacy body line 478-479: the
                            # driver's shard_by_batch_size returned the
                            # same unsorted_data_indices it always has;
                            # we just apply it on the caller side.
    train_data["prev_logprobs"] = out["logprobs"]

Same flow for ``get_reference_policy_logprobs`` under a distinct
task_name + key_prefix so the per-rank fan-out keys don't collide with
the prev_lp fan-out's keys (or the train fan-out's later in the same
step). The single end-of-step ``kv_clear(keys=None, partition_id="train")``
(line ~967) wipes all three namespaces atomically — no GC plumbing
needed.

What this does NOT do:
  * No worker changes — ``get_logprobs_presharded`` and
    ``get_reference_policy_logprobs_presharded`` keep their existing
    bodies (``self._fetch(meta)`` then call legacy worker-internal
    method). Their local re-pack inside ``_fetch`` is correct for
    forward-only inference; see commit-message above.
  * No legacy ``Policy.get_logprobs(BatchedDataDict)`` body changes.
    The legacy passthrough is intact and unchanged for any other
    caller still passing BatchedDataDict.
  * No @dp_dispatch decorator changes. Reuses the existing list-path
    that train already exercises.
  * Multimodal data is dropped from the logprob input on the TQ path
    (P3 — tensor-only on the bus). Matches pre-existing behaviour of
    the train fan-out which already filters multimodal out of
    train_data via ``_DP_SEED_FIELDS``.

Verification: passed PR 0's qwen3-30b mcore seqpack run end-to-end is
the production signal. After this commit, every grpo_sync run with
seqpack/dynbatch on exercises the @dp_dispatch list path for prev_lp
*and* ref_lp every step — three distinct DP-balanced fan-outs per
step into the same TQ partition.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…d leader-broadcast fetch

Retire the @dp_dispatch decorator and migrate TQ-mediated dispatch into
a dedicated nemo_rl/models/policy/tq_policy.py:TQPolicy(Policy) subclass.
The legacy in-memory Policy and grpo.py are now untouched by data-plane
code; the TQ wiring (controller bootstrap, partition register, fan-out,
drain, close) is fully encapsulated in TQPolicy. examples/run_grpo.py
selects TQPolicy + grpo_train_sync when data_plane.enabled=True, legacy
Policy + grpo_train otherwise.

Adds leader-broadcast fetch policy in AbstractPolicyWorker._fetch:
- New default fetch_policy="auto" auto-detects via _get_replica_group():
  if CP > 1, leader of (TP×CP×PP) siblings fetches once and broadcasts
  the BatchedDataDict over NCCL; otherwise every rank fetches
  independently from TQ (TP=CP=PP=1, the cheapest path).
- _broadcast_batched_data_dict ships a shape descriptor via
  broadcast_object_list, then per-tensor broadcast on the group's
  backend device (NCCL → CUDA, gloo → CPU).
- _attach_or_repack_pack_metadata reattaches driver-side packing
  metadata (micro_batch_indices/micro_batch_lengths) for all three
  *_presharded entry points so seqpack TQ runs don't crash on
  data.micro_batch_indices[0].

Verified end-to-end:
- qwen3-30B-A3B mcore + seqpack + CP=1: 10/10 steps
- qwen3-30B-A3B mcore + seqpack + CP=2 + independent: 10/10 steps
- qwen3-30B-A3B mcore + seqpack + CP=2 + auto leader_broadcast: 10/10 steps,
  KL parity vs independent baseline within last-decimal jitter
- llama-3.1-8B DTensor + seqpack + CP=1: 10/10 steps

Architecture invariants tightened:
- legacy nemo_rl/algorithms/grpo.py has zero data_plane / TransferQueue /
  KVBatchMeta / dp_dispatch tokens (regex-checked)
- nemo_rl/algorithms/grpo_sync.py guards on hasattr(policy, "dp_cfg")
  rather than feature-gating on master_config
- 18/18 architecture invariant tests + 2 new leader_broadcast tests pass

Removed dead code: nemo_rl/data_plane/dispatch.py (the decorator),
nemo_rl/data_plane/sharding.py (its sharder), tests/data_plane/unit/
test_dispatch.py and test_shard_parity.py.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Apply 4 fixes from external review:

1. **Multimodal extras drop (correctness).** ``fan_out_per_rank_metas``
   now writes any tensor field present in the shard, not just those in
   ``seed_fields``. The legacy in-memory path passes the full
   BatchedDataDict; the TQ path was dropping VLM extras like
   ``pixel_values`` because the field filter was schema-restricted. The
   real TQ adapter creates partitions implicitly on first put (per
   adapter comment), so extras don't fight schema registration.

2. **Per-rank ``asyncio.run`` loop (scaling).** Replace the loop of
   per-shard ``asyncio.run(kv_batch_put(...))`` with a single
   ``asyncio.gather`` over all shards. Adds ``fan_out_per_rank_metas_async``
   and a sync façade. O(1) RTT instead of O(DP).

3. **Cleanup on worker failure.** Wrap ``TQPolicy.train``'s fan-out +
   dispatch in try/finally so the partition is drained even if a worker
   raises. Stale tensors no longer accumulate across failed steps.

4. **Schema consolidation.** Move ``_LP_SEED_FIELDS`` from
   ``tq_policy.py`` into ``preshard.py:LP_SEED_FIELDS`` next to
   ``DP_SEED_FIELDS``. Single source of truth for the canonical seed
   sets.

Adds ``tests/data_plane/unit/test_preshard_extras.py`` covering: tensor
extras auto-included, non-tensor entries skipped, LP⊆DP invariant,
per-rank key namespacing.

Deferred to follow-up issues (out of this PR's scope):
- async TQ key collision risk in ``async_utils.py`` (pre-existing)
- partial ``kv_clear`` invalidates ``seen_keys`` in the TQ adapter
  (latent — only ``keys=None`` full-clear is exercised today)

Architecture invariants 18/18 still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Companion to data_plane_integration_plan.md: documents the runtime view
(call order, payloads, per-step RPC counts) of the sync 1-hop GRPO path,
and contrasts it with verl's main_ppo_sync.py at the integration-shape
level (per-prompt actors + ReplayBuffer vs batched actor + slice-only
driver).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…fecycle

Land the sync GRPO data-plane refactor end-to-end:

- New SyncTrajectoryCollector (algorithms/sync_utils.py) — sibling of
  AsyncTrajectoryCollector. Owns rollout + flatten/mask + prompt
  extraction + flat kv_batch_put. Driver receives only KVBatchMeta +
  small per-sample slice.
- rollout_to_tq helper colocated in sync_utils.py (single first-write
  primitive; mirrors verl main_ppo_sync.py:386-423).
- driver_io.read_columns / write_columns helpers for driver-side
  delta read/write on metas.
- Register SyncTrajectoryCollector under VLLM env tier so multinode
  Ray workers provision tensordict.
- grpo_sync.py rewires logprob/ref/train through shard_meta_for_dp
  per-DP fan-out + worker leader-only write-back; driver reads
  small slices only (advantages, log_data input_ids).

Validated e2e on mcore-1B + seqpack + CP=1 (job 11610072,
20/20 steps, +0.21 s/step vs legacy, bit-exact through step 7).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Sync 1-hop simplify pass driven by /simplify review.

- nemo_rl/utils/venvs.py: add make_actor_runtime_env(fqn) — wraps the
  get_actor_python_env + create_local_venv_on_each_node + os.environ
  wiring that was duplicated 3× across grpo.py and grpo_sync.py.
  Touches only the new helper; legacy grpo.py inline blocks
  intentionally untouched (per "grpo.py is 100% backward compatible").
- nemo_rl/algorithms/grpo_sync.py: use the helper for SyncTrajectoryCollector
  runtime_env (~20 lines → ~3); switch _apply_dynamic_sampling's
  pending_unfiltered_rewards from O(N²) [*xs, y] to O(1) .append(y);
  drop rotted (grpo.py:878) line-ref comment; clean up orphan imports.

Tier-1 unit tests: 86/86 passing (job 11623540).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Two intertwined changes ride together:

Variable-length token fields (input_ids, token_mask, generation_logprobs)
now traverse TQ as torch.jagged nested tensors. Consumers call
materialize(layout="padded", pad_value_dict=..., pad_to_multiple=...)
to bridge back to rectangular for trainer code. Mirrors verl's
response_to_nested / to_padded_tensor pattern (main_ppo_sync.py:1180).

- codec.py: to_nested_by_length, maybe_pack_jagged, response_from_nested,
  materialize(pad_value_dict, pad_to_multiple) — pad-to-multiple rounds
  the seq dim to satisfy mcore SP / PyTorch CP divisibility.
- All 3 write sites (kv_first_write, write_columns, _write_back) call
  maybe_pack_jagged so jagged/rectangular wire shapes stay consistent.
- kv_first_write records make_sequence_length_divisible_by in
  meta.extra_info["pad_to_multiple"]; readers honor it.
- Read sites pass pad_value_dict={"input_ids": tokenizer.pad_token_id,
  ...} so padding values match the original padded wire.

Validated e2e at production scale:
  Run A (mcore-1B + seqpack + CP=1):    20/20, bit-exact step 1-4 vs padded
  Run B (qwen3-30B + mcore + CP=2 + seqpack, 2-node): 10/10
  Run C (Llama-8B + dtensor + CP=2, 2-node):           10/10

- Rename SyncTrajectoryCollector → SyncRolloutActor (it's not a
  continuous collector — drives one rollout per step).
- Rename free function rollout_to_tq → kv_first_write (no name
  collision with the ray-actor method).
- Move TQPolicy factory from examples/run_grpo.py into grpo.py:setup()
  via lazy gated import — entrypoints become one-liner setup() calls.
- Retire the now-obsolete test_legacy_grpo_has_zero_dataplane_refs
  invariant; remaining architecture tests still cover the spirit
  (no top-level TQPolicy import, no TQ internals leaked).
- Add tests: test_codec_jagged (9), test_smoke (5), test_correctness
  (15) covering the new bridge + sync 1-hop invariants.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Promote the three pure-metadata helpers from free functions in
preshard.py to methods on KVBatchMeta. Same semantics, cleaner call
sites, ~12 fewer lines net.

  meta.subset(indices)         # was select_meta_indices(meta, indices)
  meta.slice(start, stop)      # was slice_meta(meta, start, stop)
  meta.concat(other, ...)      # was concat_metas([meta, other, ...])

Construction boilerplate (5 dataclass fields) centralized in a private
_replace() helper. Free functions deleted; preshard.py keeps only the
DP-rank packing helpers (shard_meta_for_dp, DP_SEED_FIELDS).

No behavior change; tests updated accordingly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
feat(data-plane): mooncake_cpu binary fix + status doc

Three small additions, none enable mooncake_cpu end-to-end yet but they
unblock the path:

1. mooncake-transfer-engine is now a base dep (next to TransferQueue and
   tensordict). Worker venvs built without extras automatically include
   it.

2. Adapter `mooncake_cpu` branch resolves `<site-packages>/mooncake/`
   (where the wheel ships `mooncake_master`), restores the +x bit if
   pip strips it, and prepends the dir to $PATH so TQ's
   `subprocess.Popen(["mooncake_master", ...])` finds the binary.
   Without this, smoke crashes immediately with FileNotFoundError.

3. `_usb0_down()` is retained but its docstring now says clearly that
   it's a no-op from Python (Ray actors lack CAP_NET_ADMIN; APIPA is
   re-assigned by avahi-autoipd / NetworkManager within seconds). The
   real fix lives at the Slurm-startup layer.

Multi-node mooncake_cpu still hits a Mooncake C++ bug (RPC listener
binds to usb0 169.254.x, leading to metadata 404s and a
MemcpyWorkerPool segfault during the first kv_batch_put). The fix
requires adding `NETWORK_INIT_CMDS` to the cluster wrapper, modeled on
`data-plane-bench/ray.sub`. Captured in
`research/data_plane_mooncake_status.md` along with bench references.
That work is owned outside this commit; until then mooncake_cpu remains
unsupported in nemo-rl. The default `simple` backend is unaffected.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

feat(data-plane): port mooncake_cpu cluster-wrapper fix from data-plane-bench

The bench (data-plane-bench/DEBUG_TQ_BACKENDS.md Issue 1) proved
mooncake_cpu (TCP) works multi-node — 32-node and 48-node validated —
once the cluster wrapper kills usb0/APIPA at SLURM container startup.
This commit ports their working block into our ray.sub and bumps
segment sizes to match the bench's tested config.

- ray.sub: NETWORK_INIT_CMDS block prepended to head_cmd and worker_cmd.
  Kills avahi-autoipd, asks NetworkManager to drop usb0, flushes the
  address, and runs a 2 s relaunch loop as a failsafe. Without this,
  Mooncake's transfer_metadata_plugin.cpp:1127 binds to 169.254.3.1
  (the unreachable APIPA address), causing metadata 404s and a
  MemcpyWorkerPool segfault on the first kv_batch_put.

- adapter: bump MooncakeStore segment sizes from 4 GiB / 1 GiB to
  128 GiB / 16 GiB to match the bench's proven sizes.

- research/data_plane_mooncake_status.md: rewritten — mooncake_cpu is
  now expected to work end-to-end; the only unanswered question is
  whether torch.nested.nested_tensor survives Mooncake's wire codec
  (separate from the cluster-wrapper fixes the bench solved). If it
  doesn't, fallback is verl-style (layout, [tensors]) encoder.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

fix(data-plane): mooncake_cpu falls back to padded wire (no nested tensors)

Mooncake's C++ MemcpyWorkerPool segfaults on torch.nested.nested_tensor
pointer arithmetic. Confirmed by smoke run 11630793: with the wire
forced to padded for mooncake_cpu only, all 3 training steps complete
cleanly (policy_loss -1.0540 → -1.0631, mean_seq_len 84 → 68, no
traceback). Without the fallback, the same config segfaults at the
first kv_batch_put inside MemcpyWorkerPool::workerThread().

- codec.py: add module-level kill switch `_PACK_JAGGED` and
  `set_wire_format(jagged: bool)`. `maybe_pack_jagged` returns
  early when False, so all writers fall back to rectangular tensors.
- adapters/transfer_queue.py: mooncake_cpu branch flips the switch off
  before tq.init.

Bench (data-plane-bench/README.md) only validated mooncake_cpu against
rectangular tensors; this is consistent. Simple backend keeps jagged
and the Phase 1B bandwidth saving. Production paths today use simple,
so mooncake_cpu users pay the padding tax that bench already accepted.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

docs(data-plane): mooncake_cpu status — 1-node validated, multi-node latent gap

After smoke run 11630793 (3 steps clean with padded fallback), document
the actual state:

- 1-node mooncake_cpu now works (commit 86eab577 padded fallback).
- Multi-node mooncake_cpu still has a latent gap: Ray-spawned
  MegatronPolicyWorker actors bind their Mooncake TCP RPC listener to
  169.254.3.1 (usb0 APIPA) even with our ray.sub NETWORK_INIT_CMDS
  block in place. Loopback-routable on 1-node, fatal on 2+-node.
- Fix is ~5 LoC: inject MC_TCP_BIND_ADDRESS into
  _patch_tq_actor_runtime_env's env_vars. Captured as a follow-up;
  not blocking sync 1-hop work since simple is the production default.

fix(data-plane): mooncake_cpu e2e — segfault, multi-node, 1D round-trip

Three independent bugs prevented mooncake_cpu from running production
GRPO. Each is gated narrowly so simple-backend behavior is unchanged.

1. MemcpyWorkerPool segfault on first kv_batch_put. Root cause is
   Mooncake upstream issue NVIDIA-NeMo#1986: isLocalTransfer() regression
   reinterpret_casts another actor's virtual address under TCP. Set
   MC_STORE_MEMCPY=0 per-process in TQDataPlaneClient.__init__ before
   tq.init/connect. PR NVIDIA-NeMo#1995 is the upstream fix; not yet in our wheel.

2. Multi-node MC_TCP_BIND_ADDRESS propagation. Move env-var setup
   from _init_tq (driver-only) into TQDataPlaneClient.__init__ so it
   runs in every process that builds a TQ client (driver + each Ray
   actor's bootstrap=False branch).

3. 1D field round-trip on the KV-path. TQ's extract_field_schema
   silently unsqueezes 1D fields to (N, 1) when recording per-row
   shape into metadata, while _generate_values row-iterates the
   original 1D tensor — producing 0-dim per-row tensors. Mooncake
   stores them under the recorded shape (1,) so the round-trip
   inflates input_lengths/sample_mask from (N,) to (N, 1). Add a
   _KV_PROMOTE_1D flag (independent of _PACK_JAGGED): writer-side
   _to_wire unsqueezes 1D → (N, 1); materialize squeezes the trailing
   1 back. Flag flipped on by the mooncake_cpu adapter only.

Validated 5/5 steps on 1B mcore + seqpack (CP=1) and Llama 8B dtensor
+ seqpack on mooncake_cpu, with FLOPS within noise of simple backend.

Known issue: qwen3 30B mcore + TP=2 + SP + 2-node fails at step 3
with prev_logprobs shape (8, 4018) vs input_ids dim 1=3896. Same
config completes 5/5 on simple backend — narrow MoE+TP+SP+multi-node
regression to investigate separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

fix(data-plane): mooncake_cpu multi-node + qwen3 SP write-back round-trip

Three follow-up fixes after the initial mooncake_cpu commit (cb1ebbf3)
that close the multi-node + MoE+TP+SP gap.

1. Multi-node MC_TCP_BIND_ADDRESS propagation. Driver was setting the
   env var via os.environ.setdefault, but Ray actors inherit the
   driver's env, so setdefault was a no-op on worker-node actors and
   they announced the driver's IP. Peers connecting to that announced
   address on a host where no such mooncake port existed got
   "Connection refused" and the run hung. Fix: force-assign with
   os.environ[...] = local_ip in every TQDataPlaneClient.__init__.
   Rename _get_head_node_ip to _get_local_node_ip to make the
   per-process semantic obvious; check ipaddress.is_link_local rather
   than the hardcoded 169.254 prefix.

2. Worker write-back shape divergence under mcore SP. mcore SP rounds
   the forward output's seq dim up to a multiple of TP, so
   prev_logprobs / reference_policy_logprobs arrive at the write-back
   site 1+ tokens wider than max(meta.sequence_lengths). The strict
   shape check in maybe_pack_jagged left them rectangular at the
   SP-padded width while input_ids re-materialized to the
   lengths-derived width — and the seq-dim validator at training time
   crashed on the cross-field shape divergence. Add a separate
   pack_per_token_field helper invoked explicitly by _write_back
   (which knows the field is per-token); it accepts val.shape[1] >=
   max_len and lets to_nested_by_length slice each row to its own
   length, dropping the trailing SP padding. maybe_pack_jagged stays
   conservative so 3D extras like image features still round-trip.

3. setup() data-plane gate moved to the launcher. The legacy trainer
   (grpo.py) must not know about the data plane (architectural
   invariant — see test_no_feature_gate_pattern_in_either_trainer).
   Restore the policy_factory parameter on grpo.setup() and pick the
   factory in examples/run_grpo.py instead.

Validated:
  - 96/96 data-plane unit tests pass (test_no_feature_gate_pattern
    now green; test_kv_first_write_carries_multimodal_extras
    confirms 3D extras still round-trip after the codec split).
  - qwen3 30B mcore + TP=2 + SP + 2-node mooncake_cpu: 5/5 steps
    clean, FLOPS 140.61 → 568.89 (within noise of simple-backend
    control 136.44 → 599.86).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
refactor(data-plane): collapse observability submodule into single file with callback hook

Drops nemo_rl/data_plane/observability/ submodule (middleware.py 229 +
sinks.py 132 + __init__.py 34 = 395 LOC of MetricsSink ABC, InMemorySink,
LogSink, build_sink factory) in favor of one file
nemo_rl/data_plane/observability.py (160 LOC) with a single user-injected
on_event callback. The whole sink-ABC layer was speculative scaffolding
built before there was anything to observe — the lean shape preserves
per-op timing/transparency (the actual goal) while letting users plug
wandb / file / log via a function instead of subclassing a sink.

Public surface preserved: MetricsDataPlaneClient, print_event,
snapshot(). Factory wires the same way (cfg["observability"]["enabled"])
but takes cfg["observability"]["callback"] (programmatic) instead of
a sink Literal. Tests rewritten for the lean shape.

Net –232 LOC.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>

refactor(data-plane): move TQ awareness from BasePolicyWorker into TQWorkerMixin

The 14 TQ-aware methods (setup_data_plane, _fetch, _write_back,
train_presharded, get_logprobs_presharded, get_reference_policy_logprobs_presharded,
plus their helpers) and the _broadcast_batched_data_dict module-level
helper were appended to BasePolicyWorker so every worker carried TQ
awareness whether it used it or not. Lift them all into a separate
TQWorkerMixin under nemo_rl/data_plane/. Workers opt in by mixing it
into their MRO; bare workers stay zero-cost.

DTensor v1, V2, and Megatron worker subclasses inherit from
TQWorkerMixin AND AbstractPolicyWorker. The _get_replica_group overrides
those subclasses already had now satisfy the mixin's abstract hook.

base_policy_worker.py is now bit-identical to main (verified: git diff
main is empty for that file). All TQ awareness is reachable through
one file under nemo_rl/data_plane/.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>

refactor(policy): also extract _shard_for_logprob in get_topk_logits

The earlier shard-helper extraction collapsed the 27-line
dynamic-batches/sequence-packing/dense block in get_logprobs and
get_reference_policy_logprobs but missed get_topk_logits, which has
the same duplicated pattern. Apply the same extraction.

Pure refactor — same behavior. Companion sentinel change replaces
``self.use_dynamic_batches or self.use_sequence_packing`` with the
explicit ``unsorted_data_indices is not None``, matching the other
call sites.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>

fix(data-plane): make NoOp.kv_batch_get jagged-aware

The unconditional torch.stack at the bottom of kv_batch_get crashes on
per-token fields written via maybe_pack_jagged (variable row lengths).
Add _stack_or_nest helper: stack when shapes match, fall back to
torch.nested.as_nested_tensor when ragged. Mirrors what the TQ adapter
returns so codec.materialize takes the same branch on both adapters.

Net +18 LOC inside noop.py.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>

refactor(policy): move TQ-only result aggregators from lm_policy to tq_policy

The three aggregator helpers (_aggregate_train_results,
_aggregate_logprob_results, _aggregate_reference_logprob_results) were
defined at the top of lm_policy.py but called only by tq_policy.py's
*_from_meta methods. They have no caller on the legacy in-memory path.
Move them to tq_policy.py where they're used.

Pure code move — same behavior. lm_policy.py shrinks; tq_policy.py
absorbs the same lines. Eliminates the lm_policy → tq_policy
forward-reference import.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>

refactor(grpo): rename algorithms/sync_utils.py to algorithms/grpo_sync_workers.py

The file houses SyncRolloutActor (a Ray-remote rollout worker) plus
its kv_first_write helper, and is imported only by grpo_sync.py.
"sync_utils" is a generic name for what is, in practice, the worker
half of grpo_sync — rename for clarity, mirroring the existing
nemo_rl/algorithms/async_utils.py convention (one file containing the
async rollout actor + helpers).

Pure rename + import-string rewrite. No code change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>

refactor(data-plane): tighten string-typed params with Literal types

Two stringly-typed parameters were validated at runtime but invisible
to static checkers:

- observability.py: _emit's status param accepts {"ok", "error",
  "timeout"}. Now typed as Literal[...] so callers see allowed values.
- worker_mixin.py: _fetch's layout ({"padded", "jagged"}) and
  fetch_policy ({"auto", "independent", "leader_broadcast"}) now
  Literal-typed for the same reason.

No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>

refactor(experience): relocate SyncRolloutActor to experience/sync_rollout_actor.py

The earlier sync_utils.py → algorithms/grpo_sync_workers.py rename was
wrong-direction. SyncRolloutActor is a Ray-actor wrapper around the
stateless rollout building blocks in nemo_rl/experience/rollouts.py
(run_multi_turn_rollout, run_async_multi_turn_rollout,
run_async_nemo_gym_rollout). It belongs next to its dependency.

algorithms/ is for trainer orchestrators (grpo.py, grpo_sync.py);
data_plane/ is for the swappable client; the TQ write inside
SyncRolloutActor is incidental, not its primary identity. The actor
is a *colocated rollout worker* — domain match is experience/.

kv_first_write moves with the actor (one module, one file). Imports
updated in grpo_sync.py, ray_actor_environment_registry.py,
data_plane/preshard.py docstring, models/policy/tq_policy.py docstring,
and four test files.

No code change; pure relocation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…r cleanup

Three independent simplifications in the mooncake_cpu path. No behavior
change for any backend.

1. Drop dead code in codec.py:
   - set_wire_format() and _PACK_JAGGED were defined but never called
     anywhere (codec was always packing jagged regardless of backend).
     Remove both, and the unreachable `if not _PACK_JAGGED: return ...`
     early-returns inside maybe_pack_jagged / pack_per_token_field.
   - The "padded fallback for mooncake_cpu" hook was inert — if a
     future Mooncake bug forces it back, re-add explicitly as a
     parameter rather than module-level state.

2. Delete _usb0_down() in transfer_queue.py:
   - Its own docstring says "DO NOT rely on this from Python … no-op
     on the workers where it matters." Dead code. The Slurm-startup
     layer is the right place; the next commit drops that block too,
     because MC_TCP_BIND_ADDRESS makes it unnecessary.

3. Drop the duplicated mooncake_cpu setup in _init_tq:
   - set_kv_promote_1d(True) and the MC_TCP_BIND_ADDRESS env-var
     write were already done in TQDataPlaneClient.__init__ (which
     runs in EVERY process, including the driver before _init_tq).
     Removing the dups makes __init__ the single source of truth.
   - Simplify the +x chmod block to a single os.chmod(_master, 0o755)
     under try/except OSError (drop the os.access pre-check; chmod is
     idempotent and TOCTOU-free this way).
   - Move `import ipaddress` to module top.

Net: -121 lines across two Python files. All public symbols
referenced by tests/data_plane/unit/test_architecture_invariants.py
(pack_per_token_field, _to_wire's tensor-only guard) preserved.

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
The NETWORK_INIT_CMDS block (pkill avahi-autoipd / ifconfig usb0 down /
ip addr flush + a 2-second relaunch loop) was a workaround for an
outdated diagnosis in data-plane-bench/DEBUG_TQ_BACKENDS.md (Issue 1):
"MC_TCP_BIND_ADDRESS controls server_name (registration) but NOT the
RPC listener bind address."

Re-reading current Mooncake main (commit fast-forwarded today):

  - mooncake-transfer-engine/src/transfer_engine_impl.cpp:159-170
    If MC_TCP_BIND_ADDRESS is set, it goes directly into
    desc.ip_or_host_name, which is the address registered via
    addRpcMetaEntry — i.e. the address peers receive from the
    metadata service. This was added by PR NVIDIA-NeMo#226 (caef1ef, merged
    2025-04-10) and IS in the pinned wheel 0.3.10.post2 (bumped
    2026-04-22).

  - mooncake-transfer-engine/src/transfer_metadata_plugin.cpp:1292
    The TCP listener binds INADDR_ANY and accepts on all interfaces.
    Bind itself was never the bug — the announce was.

So per-process MC_TCP_BIND_ADDRESS in TQDataPlaneClient.__init__
(unchanged in this commit, runs on every process) gives Mooncake the
routable announce address and peer connections work cross-node
without OS-level interface stripping.

The pkill+sleep loop fought a symptom (avahi-autoipd respawning the
APIPA address). With the announce now correct regardless of usb0,
that fight is unnecessary. Removing the block also makes ray.sub a
no-op for non-mooncake_cpu backends (simple, mooncake_rdma) — they
were paying the host-process-kill cost for no reason.

If multi-node smoke regresses with peers connecting to 169.254.x,
revert this commit only — (A) codec/adapter cleanup stays.

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Combine research/data_plane_api_lifecycle.md into nemo_rl/data_plane/README.md
as the canonical reference. Move the rest of the data-plane research docs
(integration plan, observability, mooncake status, prefetch/test plans,
async-RL limitations, policy subclass plan, test SOP, tests/data_plane/README)
to local-only nemo_rl/data_plane/docs/ — kept untracked, out of the PR.

Comment cleanup across grpo_sync, sync_rollout_actor, tq_policy, and
nemo_rl/data_plane/{interfaces,codec,driver_io,preshard,adapters/*}:
strip dangling research/data_plane_integration_plan.md §1.2 pointers,
defunct Stage 1/2/3/4/5/Phase 1/(P2)/(P3)/Tier N references, verl
line-number cross-refs, and a (commit a085559) provenance line. Fix
an incorrect adapters/noop.py docstring that claimed the factory
returns NoOp on enabled=False (it actually raises). Retarget
interfaces.py and tq_policy.py docstrings from the removed integration
plan to the README.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia and others added 3 commits May 13, 2026 10:22
`_to_wire` had exactly one production caller and that caller always
passed `self._promote_1d` — the kwarg was redundant. Replace with a
narrower `_promote_1d_leaves` helper that does just the 1D unsqueeze
(the conditional part); the always-needed `.detach().contiguous()`
moves to the call site, where the `self._promote_1d` check now lives
explicitly. Symmetric with `_from_wire`, which has always been
caller-gated.

Test fallout:
* `test_codec_mooncake.py` — call `_promote_1d_leaves(td)` instead of
  `_to_wire(td, promote_1d=True)`. Drop the
  `promote_1d=False is passthrough` test (caller-side logic now, not
  helper logic).
* `test_architecture_invariants.py` — remove the stale
  `test_tq_adapter_enforces_no_pickle` guard. It was checking for a
  `TypeError` text in `_to_wire` that has been gone since 46cacb4
  (NonTensorStack switch); the assertion can no longer hold.
* `test_tq_lifecycle.py` — docstring rename only.

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
TQ's ``MsgpackEncoder._encode_tensordict`` serializes any
``TensorDictBase`` via ``dict(obj.items())`` — only the tensor backing
dict. ``NonTensorData`` keeps its payload in ``_non_tensordict["data"]``
(a separate dict), so a ``NonTensorData(data="…")`` round-trips through
ZMQ as an empty ``TensorDict({}, batch_size=[])``. The downstream
``codec.materialize`` path then crashed at
``np.asarray(val.tolist(), dtype=object)`` with
``RuntimeError: generator raised StopIteration`` — numpy probes each
item's ``__iter__`` for nested-array detection, and a wire-stripped TD
with ``batch_dims=0`` raises ``StopIteration`` from
``tensordict.base:576`` (the bare ``raise StopIteration`` becomes a
``RuntimeError`` in Py3.7+ generator semantics).

Fix is local to ``codec.materialize``: switch ``np.asarray(list,
dtype=object)`` → ``np.empty(n, dtype=object)`` + per-index assignment.
The new pattern doesn't iterate items, so the bad ``StopIteration``
never fires. Each item is normalized through
``unwrap_wire_stripped_payload`` which returns the live
``NonTensorData.data`` or the salvageable ``_non_tensordict["data"]``
payload, and substitutes ``None`` only for the exact wire-stripped
signature (``batch_dims=0`` + no tensor fields + no
``_non_tensordict["data"]``) so we never silently drop a legitimate
non-empty ``TensorDict``.

This is verl's strategy
(``verl/utils/transferqueue_utils.py:_async_meta_to_realdata``):
don't iterate stripped TDs in numpy. No TQ monkey-patches needed; the
``_pack_field_values`` subclass + ``MsgpackEncoder`` hook we explored
in earlier iterations were dead code in our actual data flow (DAPO's
``kv_clear`` happens AFTER ``_log_extras = read_columns(...)``, so no
``None`` values reach ``_pack_field_values``; and the SU is a separate
``@ray.remote`` actor whose encoder isn't reachable from the driver
patch anyway).

Also:

  - Extracts ``stack_or_nest`` from ``noop._stack_or_nest`` into
    ``codec`` so both adapters share the helper.
  - Adds a 5-line bare ``import mooncake.store`` probe in ``_init_tq``
    to surface the real native-library error before TQ's
    ``mooncake_client.py`` masks any underlying ``ImportError`` (e.g.
    ``libcudart.so.X: cannot open shared object file``) as
    "Please install via pip install mooncake-transfer-engine".

Unit test ``test_codec_wire_stripped.py`` covers three NonTensorStack
scenarios in materialize: all wire-stripped → ``None`` array; real
strings → roundtrip; mixed → survivors keep data, stripped become
``None``.

Validated end-to-end on simple backend:

  - 35B DAPO automodel 4n8g (24:13, 5/5 steps + ckpt)
  - Llama-3.2-1B mcore 1n8g (4:25/4:52/4:38, 5/5 across three runs)
  - Llama-3.1-8B fsdp2 noncolocated 2n8g (10:49, 5/5)

Mooncake_cpu backend uses pickle wire, not the broken msgpack
``_encode_tensordict`` path, so it never trips this; verified
unaffected (JOBID 11752365, 35B mooncake_cpu, 23:39, 5/5+ckpt).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
… containers

PyPI's ``mooncake-transfer-engine 0.3.10.post2`` is built against
CUDA 12 (links ``libcudart.so.12``). On the cu13 base container
(``nightly-05132026.squashfs``, ``nvidia-cudnn-cu13==9.20.0.48``),
``from mooncake.store import MooncakeDistributedStore`` fails with::

    ImportError: libcudart.so.12: cannot open shared object file:
    No such file or directory

…which TQ's ``mooncake_client.py:50`` masks as the misleading
"Please install via pip install mooncake-transfer-engine".

Upstream Mooncake ships a separately-named cu13 wheel under
``mooncake-transfer-engine-cuda13`` as a GitHub release asset (not
PyPI). Same source repo, same ``mooncake/`` import namespace; its
``store.so`` links ``libcudart.so.13``. Pinning directly to the
GitHub URL follows the existing flash-attn pattern in this file.

Linux x86_64 only — upstream doesn't publish an aarch64 cu13 wheel
(404). Drop and revert to PyPI when upstream promotes cu13 there.

Validated: JOBID 11752365 (35B DAPO automodel 4n8g mooncake_cpu,
23:39 elapsed, 5/5 steps, checkpoint saved).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
@ZhiyuLi-Nvidia ZhiyuLi-Nvidia force-pushed the zhiyul/data_plane_plan branch from 464f487 to 420bbe9 Compare May 14, 2026 00:23
ZhiyuLi-Nvidia and others added 9 commits May 13, 2026 17:50
CI pre-commit run --all-files flagged 7 files. Re-running ruff 0.12+ (--fix, --select I --fix, ruff-format) brings the tree back to spec. nemo_rl/data_plane/interfaces.py also gets a manual D205 fix: collapse the module docstring's two-line summary into a single line.

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
The file nemo_rl/data_plane/driver_io.py was renamed to
column_io.py in commit 5de226c, but pyrefly.toml still
referenced the old path. CI pyrefly check then failed with::

    No Python files matched pattern
    `/home/runner/work/RL/RL/nemo_rl/data_plane/driver_io.py`

Update the whitelist entry to track the rename.

Verified locally: `pyrefly check` no longer reports the missing-file
error.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…ments

After b48f21c fixed the column_io whitelist, pyrefly began checking
files it had previously skipped and reported 5 errors in pre-existing
branch code. All are pyrefly limitations against the tensordict stubs,
not real type bugs:

- transfer_queue.py:558  bad-assignment + missing-argument on
  fields.detach().contiguous() — tensordict's contiguous() is
  functools.wraps-decorated and the stub's _Wrapped pattern confuses
  pyrefly's overload resolution.
- transfer_queue.py:560  bad-argument-type cascades from the above
  (wire_fields narrowing).
- codec.py:275  bad-assignment from a pyrefly inference cycle on
  TensorDict.items() loop unpacking.
- factory.py:65  bad-argument-type because obs.get('callback')
  returns Any and pyrefly can't structurally narrow it to the
  callback signature.

Each site gets a one-line '# type: ignore[<rule>]' with a comment
naming the pyrefly limitation. No structural code changes.

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Pyrefly's missing_count guard flagged:

    File nemo_rl/data_plane/schema.py has zero errors but is not in
    pyrefly.toml in the 'project-includes' list. Please add it to
    this whitelist.

schema.py is the only data_plane module that wasn't on the list; all
other Python files under nemo_rl/data_plane/ are already enumerated.
Added it in alphabetical order between preshard.py and worker_mixin.py.

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
nemo_rl was wrapping ``np.ndarray(dtype=object)`` columns as
``NonTensorStack(*v.tolist())`` before storing them as leaves in the
``TensorDict`` passed to ``dp_client.kv_batch_put``. Under
``tensordict==0.12.2``, ``bulk[k]`` on such a leaf returns an internal
``LinkedList`` — the ``NonTensorStack`` class identity is lost, and
calling ``.contiguous()`` on the parent ``TensorDict`` collapses the
leaf to an empty ``TensorDict``, dropping the wrapped Python objects
entirely.

Symptom: simple-backend GRPO recipes crash at the first
``kv_batch_get`` for ``content`` with
``RuntimeError: All tensordicts must be non-tensors`` inside
``_pack_field_values``, because every batch position is an empty
``TensorDict`` instead of the expected per-sample string.

Fix:
- ``nemo_rl/experience/sync_rollout_actor.py``: pass object arrays
  through as ``np.ndarray`` (canonical site, full rationale).
- ``nemo_rl/data_plane/column_io.py``: same on the ``write_columns``
  path; refers to ``kv_first_write``.
- ``nemo_rl/data_plane/adapters/transfer_queue.py``: drop the
  ``.contiguous()`` call — TQ's encoder forces ``.contiguous()`` per
  tensor leaf itself, and on a parent TD with non-tensor leaves the
  call is destructive.

``TensorDict`` preserves ``ndarray(dtype=object)`` identity through
``__getitem__``, and TQ's encoder serializes object arrays via
``CUSTOM_TYPE_PICKLE``. No TQ patch required. As a bonus, the new
path skips the ``.tolist()`` materialization that the old wrapper
performed per write.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…cate writes

Under TP-only configs (e.g. TP=2 CP=1), every rank in the TP group
was calling ``TQWorkerMixin._write_back`` and racing to write the
same per-sample keys (``prev_logprobs`` etc.). On the simple
backend the second writer's bytes silently overwrote the first
(``last-write-wins`` on a Python dict — benign because the data is
identical post-all-reduce). On the mooncake_cpu backend the
``MooncakeStore`` master rejected the second writer's
``BatchPutEnd`` with ``ILLEGAL_CLIENT`` (-601) because the metadata
``client_id`` was set to the first writer's UUID — the recipe
crashed at the first ``kv_batch_put`` of the offending step.

The existing leader check ``_is_replica_leader`` correctly returns
False for non-leaders, but only when ``_get_replica_group`` returns
a non-None group. Subclasses gate ``_get_replica_group`` on
``CP > 1`` as a fetch-path optimization (the docstring explicitly
calls out "matches the qwen3-mcore TP=2 baseline"). That gate
incorrectly disables the leader check on the write-back path too:
``CP=1 ⇒ replica_group is None ⇒ _is_replica_leader → True for
every rank``.

Split the write-back leader check from the replica-group machinery:

- ``TQWorkerMixin._is_writeback_leader``: default delegates to
  ``_is_replica_leader`` (preserves behavior for workers with no
  parallelism).
- ``MegatronPolicyWorkerImpl._is_writeback_leader``: override gates on
  ``(tp_rank, cp_rank, pp_rank) == (0, 0, 0)`` via mcore
  ``parallel_state`` — unconditional, no CP gate.
- ``DTensorPolicyWorkerV2Impl._is_writeback_leader``: same idea but
  using ``device_mesh["cp"].get_local_rank()`` /
  ``device_mesh["tp"].get_local_rank()``.
- ``_write_back`` switched from ``_is_replica_leader`` to
  ``_is_writeback_leader``.

Correctness: simple backend's ``last-write-wins`` already proves the
data is identical across TP siblings (DSv3 32n8g TP=32 simple passes
its closing ``check_metrics.py`` with the same multi-write pattern;
gating to leader-only is semantically equivalent). Mooncake's race is
eliminated because exactly one client now writes each key.

Perf: ``tp_size - 1`` redundant ``kv_batch_put`` calls per training
step are now skipped on every backend, not just mooncake_cpu.

Verified by JOBID 11758259 (1n8g megatron TP=2 + temp/top-p/top-k
sampling on mooncake_cpu) — past Step 11/500 with no -601, whereas
every prior attempt of this recipe crashed at Step 1 within ~5 min.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Pre-commit auto-fix changes flagged by CI:

- nemo_rl/data_plane/column_io.py: remove unused NonTensorStack import (only referenced in docstrings/comments now).
- nemo_rl/experience/sync_rollout_actor.py: drop blank line between docstring close and first statement.

Plus two D205 (blank-line-between-summary-and-description) fixes that ruff check flagged after the auto-fixes:

- nemo_rl/models/policy/workers/dtensor_policy_worker_v2.py:216
- nemo_rl/models/policy/workers/megatron_policy_worker.py:117

Both _is_writeback_leader docstrings had a summary line ending with 'See' and the description continuing on the next line without a blank separator.

Verified: ruff check + ruff format --check both pass on nemo_rl/, tests/, examples/, docker/, docs/.
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
@zyzhou5 zyzhou5 force-pushed the r3-router-replay-pr2439 branch from bf611f9 to b04025d Compare May 15, 2026 01:46
@zyzhou5 zyzhou5 requested a review from a team as a code owner May 15, 2026 01:46
@github-actions github-actions Bot added the Documentation Improvements or additions to documentation label May 15, 2026
@ZhiyuLi-Nvidia ZhiyuLi-Nvidia force-pushed the zhiyul/data_plane_plan branch 2 times, most recently from c5d2b2a to 4826d26 Compare May 19, 2026 18:13
@ZhiyuLi-Nvidia ZhiyuLi-Nvidia force-pushed the zhiyul/data_plane_plan branch 7 times, most recently from 14cd92d to b63c18f Compare May 24, 2026 07:44
@terrykong terrykong deleted the branch NVIDIA-NeMo:zhiyul/data_plane_plan May 24, 2026 16:26
@terrykong terrykong closed this May 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants