feat(algorithms): SingleController streaming train_pump (split-API consumer)#2700
Open
mehraakash wants to merge 10 commits into
Open
feat(algorithms): SingleController streaming train_pump (split-API consumer)#2700mehraakash wants to merge 10 commits into
mehraakash wants to merge 10 commits into
Conversation
…orker Adds begin_train_step / train_microbatch / finish_train_step / abort_train_step on MegatronPolicyWorkerImpl, mirroring the DTensor v1/v2 implementations but adapted for mcore's contiguous grad bucket + pipeline-schedule reduce path. Mechanism: - begin_train_step: zero_grad_buffer + optimizer.zero_grad, store loss_fn / gbs / mbs / local_valid_seqs/toks accumulators on _train_step_state, and null model.config.grad_sync_func (saved for restore) so the PP scheduler's direct reduce dispatch cannot bypass no_sync. - train_microbatch(data): wrap one ``megatron_forward_backward`` invocation in ``with self.model.no_sync():`` so mcore DDP hooks accumulate ``param.main_grad`` locally without dispatching the cross-DP reduce. Pass ``global_valid_seqs/toks=tensor(1.0)`` so the loss returns un-normalized sums; backward deposits raw d(sum)/dθ. Accumulate local mask sums + per-mb metrics + the total pipeline-microbatch count (for finish-time MoE aux-loss scaling). - finish_train_step: all_reduce mask sums to get true N (toks for TOKEN_LEVEL loss, seqs for SEQUENCE_LEVEL), call self.model.scale_gradients(1/N), then the one true cross-DP reduce via start_grad_sync + finish_grad_sync, optimizer.step (clips internally), restore grad_sync_func, scheduler.step(increment=gbs). Rescale per-mb metrics by 1/N (linear-in-1/N math), aggregate, surface global counts. - abort_train_step: restore grad_sync_func, zero_grad_buffer + zero_grad, drop state. ``trainer_version`` unchanged. Sync ``train()`` is left untouched. Includes CPU unit tests at tests/unit/models/policy/test_megatron_split_state.py covering the lifecycle and call-order invariants (no_sync wrap, grad_sync_func save/restore, mask-sum accumulation, N selection by loss_type, abort idempotence, MoE scaling). Marked pytest.mark.mcore so they run only in mcore-enabled CI containers. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Akash Mehra <akamehra@nvidia.com>
Signed-off-by: Akash Mehra <akamehra@nvidia.com>
Signed-off-by: Akash Mehra <akamehra@nvidia.com>
Signed-off-by: Akash Mehra <akamehra@nvidia.com>
Signed-off-by: Yuki Huang <yukih@nvidia.com>
Drives the begin/train_microbatch/finish split API in NVIDIA-NeMo#2683 and group, per-group prepare_logprobs (when configured) -> advantage_pump -> train_microbatch_from_meta (queued), one finish_train_step + one clear_samples at end-of-step. Buffer capacity released per group, not per step. - StalenessSampler.select_one_group: picks one eligible prompt group; same predicate as select_indices, sort by (lag, indices[0]). - SingleControllerConfig.target_prompt_groups_per_step: explicit per- step admission count; validated against min_prompt_groups_per_batch. - _reap_in_flight_nonblocking: ray.wait(timeout=0) drain helper. - DryRunTrainer: split-API stub with begin/microbatch/finish/abort invariants for dry-run tests. - 7 streaming dry-run tests: arrival order, finish-time trainer_version tick, strict on-policy filter, long-tail overlap, abort idempotence, empty-step no-op, single clear_samples per step. Signed-off-by: Akash Mehra <akamehra@nvidia.com>
|
Auto-sync is disabled for ready for review pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
Author
|
/ok to test 9025e5a |
Required by 'Check if any files with zero errors not in whitelist' guard in cicd-main.yml. Both files have zero pyrefly errors; without the whitelist entry the lint job fails. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Akash Mehra <akamehra@nvidia.com>
Author
|
/ok to test 7ec56e2 |
Pre-existing zero-error file from NVIDIA-NeMo#2078 (Eagle3) that was never added to the project-includes whitelist. Carrying the fix forward in this PR to unblock the lint job. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Akash Mehra <akamehra@nvidia.com>
Author
|
/ok to test cc5af8d |
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Akash Mehra <akamehra@nvidia.com>
Author
|
/ok to test 56f8d80 |
KavinKrishnan
added a commit
to KavinKrishnan/RL
that referenced
this pull request
Jun 5, 2026
…erExtension
Phase D of the Megatron-MX integration plan. Wires the new
modelexpress.megatron_translator path into the existing
update_weights_via_mx flow on NemoRL's direct-vLLM extension.
What's new
* First-cycle Megatron detection: peek at any candidate's
megatron_meta. None → existing DTensor / FSDP path. Set →
latched into self._mx_megatron_mode and routed through
_update_weights_via_mx_megatron forevermore on this receiver.
* _update_weights_via_mx_megatron: builds a MegatronReceiverContext
once (transformer_config + Megatron→HF name map from the source
sidecar; receive_specs from the per-tensor TensorDescriptorV2
registry); per refit, calls run_refit_cycle which drives the full
discover → plan → assemble → translate → yield(hf_name, tensor)
pipeline. The yielded HF tensors flow through the existing
_load_weights + FP8-KV-cache hooks.
* Tree fan-out (publish_self_as_source) is preserved on the
Megatron path identical to the DTensor path.
Backwards compat
* DTensor / FSDP receivers see _mx_megatron_mode = False on the
first cycle and stay on the existing path. PR NVIDIA-NeMo#2700's prime-rl
receivers and John's Dynamo-side worker extension are
unaffected.
* Sources that advertise publisher_kind=megatron but are missing
the transformer_config sidecar (older trainer image) trigger a
one-shot warning and fall back to non-Megatron mode.
Out of scope (deferred)
* The pull callback in _update_weights_via_mx_megatron uses
self._mx_receiver._receiver._nixl.pull_to, which is the
conceptual API; the actual NIXL plumbing for sliced pulls
(registering a sub-view of a parent tensor + completing the
transfer) needs a small wrapper in modelexpress.refit_receiver.
v0 of the actual NIXL sliced-pull plumbing lands in a follow-up
commit; this commit gets the Phase D control flow + receive
context build right so the next commit only needs to fill in
the pull function.
* Wiring into Dynamo's MxRefitWorkerExtension (a separate Dynamo
repo edit; same shape — import run_refit_cycle, call from the
worker extension's update_weights_via_mx).
The file is introduced by NVIDIA-NeMo#2692 (DTensor PR), not by this branch. Whitelisting it here causes pyrefly to fail with 'No Python files matched pattern' since the file does not exist on this branch. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Akash Mehra <akamehra@nvidia.com>
Author
|
/ok to test 1fba99e |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds the
SingleControllerstreaming_train_pumpthat drives the worker-side split API (begin_train_step/train_microbatch_from_meta/finish_train_step/abort_train_step) shipped in #2683 (Megatron) and #2692 (DTensor v1/v2).Per-prompt-group streaming: SC fires
train_microbatch_from_metaper ready prompt group as rollouts commit, hiding trainer F/B under generation latency. Lazybegin_train_stepon first ready group; singlefinish_train_step+ singleclear_samplesat end-of-step. Math identical to the sync path (linear-in-1/N rescale at finish).Stack (logical, not enforced by GitHub base)
This PR is logically stacked on #2683 (Megatron split-API). The diff shown here includes the #2683 commits because GitHub requires the base to be a branch on the upstream repo; review the last 4 commits for this PR's actual scope:
feat(algorithms): add single controller metadata sampler skeleton—StalenessSamplerscaffold.feat(algorithms): add single controller advantage pump—_advantage_pump.refactor: some refactor in single-controller (#2654)— class rename toStalenessSampler.feat(algorithms): SC per-prompt-group streaming train_pump— the actual new work.Co-required with #2692 (DTensor split-API) for full backend coverage; works against either backend's split API.
What's new (commit 4)
StalenessSampler.select_one_group— single-group admission per loop iter; same predicate asselect_indices, sort by(lag, indices[0]).SingleControllerConfig.target_prompt_groups_per_step— explicit per-step admission count; validated againstmin_prompt_groups_per_batch._reap_in_flight_nonblocking— drain helper,ray.wait(timeout=0)._train_pumprewrite: lazy begin, per-groupprepare_logprobs_from_meta(gated on cfg fields) →_advantage_pump→train_microbatch_from_metaqueued, one finish + oneclear_samplesat end-of-step. Buffer capacity released per group.DryRunTrainersplit-API stub + invariants (begin-while-open raises, microbatch/finish with no-or-mismatched step raises).trainer_versiontick, strict on-policy filter, long-tail overlap, abort idempotence, empty-step no-op, singleclear_samplesper step.Math (bias-free)
masked_mean(values, mask, N) = sum(values·mask)/(N+ε)is linear in1/N. Per microbatch we passglobal_valid_seqs = global_valid_toks = tensor(1.0); loss returns un-normalized sums; backward deposits rawd(sum)/dθinto.grad. Atfinish_train_stepwe all-reduce local mask sums, pickNfromloss_fn.loss_type(TOKEN_LEVEL→toks, SEQUENCE_LEVEL→seqs), rescalep.gradby1/N. Optimizer step applies a gradient identical (within ε) to the sync path.Test plan
tests/unit/algorithms/test_single_controller_dryrun.py::TestStreamingTrainPump) — 7 tests, CPU-only.tests/unit/algorithms/test_staleness_sampler.py) — 7 tests forselect_one_group.🤖 Generated with Claude Code