Conversation
…mm group (#1264) Signed-off-by: Youngeun Kwon <youngeunk@nvidia.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com>
ℹ️ File Consistency CheckCheck based on commit: 03c4f5c (PR #1311 from ✅ DTensor Policy Worker Synchronization CheckBoth DTensor policy worker files were modified in this PR:
Please ensure that the changes are consistent between both files where applicable. This check ensures that related file implementations remain synchronized across the codebase. If you believe this warning is incorrect or the files should intentionally differ, please add a comment explaining the reasoning. |
📝 WalkthroughWalkthroughAdds explicit train_world_size across algorithms, policies, and vLLM generation. World size is now computed as train_world_size + inference_world_size. All init_collective signatures and call sites are updated to accept and pass train_world_size. Rank computations and process group setup are adjusted accordingly; rank==0 special-casing for group creation and broadcasts is removed. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Orchestrator
participant TrainCluster as Train Cluster
participant InferenceCluster as Inference Cluster
participant Policy as Policy (train)
participant Gen as vLLM Generation (inference)
participant WorkerT as Policy Workers
participant WorkerG as vLLM Workers
Orchestrator->>TrainCluster: world_size()
TrainCluster-->>Orchestrator: train_world_size
Orchestrator->>InferenceCluster: world_size()
InferenceCluster-->>Orchestrator: inference_world_size
Note over Orchestrator: world_size = train_world_size + inference_world_size
Orchestrator->>Policy: init_collective(ip, port, world_size, train_world_size)
Policy->>WorkerT: create StatelessProcessGroup(rank=self.rank,<br/>world_size)
WorkerT->>WorkerT: init PyNcclCommunicator(group)
WorkerT->>WorkerT: broadcast(src=0)
Orchestrator->>Gen: init_collective(ip, port, world_size, train_world_size)
Gen->>WorkerG: collective_rpc("init_collective",<br/>rank_prefix, ip, port, world_size, train_world_size)
WorkerG->>WorkerG: rank = train_world_size + rank_prefix + local_rank
WorkerG->>WorkerG: create StatelessProcessGroup(rank, world_size)
WorkerG->>WorkerG: ready
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Pre-merge checks and finishing touches❌ Failed checks (3 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
tests/unit/algorithms/test_distillation.py (1)
418-525: Consider modelling world_size in DummyClusterNice to have the smoke test, but since
DummyCluster.world_size()always returns 1, we won’t catch regressions in the train/inference world-size arithmetic that motivated this PR. It’d be cheap insurance to let the dummy accept a target world size (e.g., derived frombundle_ct_per_node_list) so the mocked setup mirrors the real topology a bit more closely.nemo_rl/models/policy/dtensor_policy_worker.py (1)
504-516: Document unusedtrain_world_sizeparameter ininit_collective
Thetrain_world_sizeparameter is required by the interface but isn’t used in this implementation; add a brief comment (e.g. as inmegatron_policy_worker) to clarify its purpose.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
nemo_rl/algorithms/distillation.py(1 hunks)nemo_rl/algorithms/grpo.py(1 hunks)nemo_rl/models/generation/vllm/vllm_backend.py(1 hunks)nemo_rl/models/generation/vllm/vllm_generation.py(2 hunks)nemo_rl/models/generation/vllm/vllm_worker.py(2 hunks)nemo_rl/models/generation/vllm/vllm_worker_async.py(2 hunks)nemo_rl/models/policy/dtensor_policy_worker.py(2 hunks)nemo_rl/models/policy/dtensor_policy_worker_v2.py(2 hunks)nemo_rl/models/policy/interfaces.py(1 hunks)nemo_rl/models/policy/lm_policy.py(1 hunks)nemo_rl/models/policy/megatron_policy_worker.py(2 hunks)tests/unit/algorithms/test_distillation.py(1 hunks)tests/unit/models/generation/test_vllm_generation.py(2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (CODING_GUIDELINES.md)
**/*.py: Follow the Google Python Style Guide for all Python code
Target Python 3.12+ for all Python code in NeMo-RL
Indent Python code with 4 spaces; do not use tabs
Python filenames should be snake_case (e.g., some_file.py)
Class names should be PascalCase
Function and method names should be snake_case
Local variable names should be snake_case; if starting with a number, prefix with k (e.g., k_99th_percentile)
Global variables should be UPPER_SNAKE_CASE and prefixed with G_ (e.g., G_MY_GLOBAL)
Constants should be UPPER_SNAKE_CASE
Avoid shadowing variables declared in an outer scope
Initialize all externally visible members of a class in the constructor
For public interfaces used outside a file, prefer docstrings over comments
Use comments mainly for code within a function or interfaces local to a file
Commented-out code must include a nearby comment explaining usage and why it is commented out; otherwise remove before merging
Use Google-style docstrings for classes and functions (Sphinx-parseable)
Avoid using reflection when functionality can be easily achieved without it
Limit except clauses to the smallest specific set of exceptions possible
For duck-typing via try/except, keep the try body minimal and use else for main logic
Add the NVIDIA copyright header (with current year) at the top of all Python files, excluding tests/ and test-only scripts
Files:
nemo_rl/models/generation/vllm/vllm_worker.pynemo_rl/algorithms/grpo.pynemo_rl/models/policy/dtensor_policy_worker.pynemo_rl/models/policy/interfaces.pytests/unit/algorithms/test_distillation.pynemo_rl/models/policy/megatron_policy_worker.pytests/unit/models/generation/test_vllm_generation.pynemo_rl/models/generation/vllm/vllm_generation.pynemo_rl/models/generation/vllm/vllm_backend.pynemo_rl/algorithms/distillation.pynemo_rl/models/policy/lm_policy.pynemo_rl/models/policy/dtensor_policy_worker_v2.pynemo_rl/models/generation/vllm/vllm_worker_async.py
nemo_rl/**/*.py
📄 CodeRabbit inference engine (CODING_GUIDELINES.md)
nemo_rl/**/*.py: Do not set non-None configuration defaults in code; YAML is the single source of truth for defaults
Access required config attributes directly (e.g., policy_cfg["precision"]) and assume presence; do not introduce hidden defaults
Express configuration optionality via TypedDict using typing.NotRequired
When adding a new config key to a TypedDict subclass, document the key’s purpose, valid values/types, and recommended default in code
For any class or function decorated with @ray.remote, add '# pragma: no cover' on the class/def line (and on remote functions)
Files:
nemo_rl/models/generation/vllm/vllm_worker.pynemo_rl/algorithms/grpo.pynemo_rl/models/policy/dtensor_policy_worker.pynemo_rl/models/policy/interfaces.pynemo_rl/models/policy/megatron_policy_worker.pynemo_rl/models/generation/vllm/vllm_generation.pynemo_rl/models/generation/vllm/vllm_backend.pynemo_rl/algorithms/distillation.pynemo_rl/models/policy/lm_policy.pynemo_rl/models/policy/dtensor_policy_worker_v2.pynemo_rl/models/generation/vllm/vllm_worker_async.py
🧬 Code graph analysis (12)
nemo_rl/models/generation/vllm/vllm_worker.py (2)
tests/unit/algorithms/test_distillation.py (1)
world_size(479-480)nemo_rl/distributed/virtual_cluster.py (1)
world_size(357-358)
nemo_rl/algorithms/grpo.py (10)
tests/unit/algorithms/test_distillation.py (3)
world_size(479-480)init_collective(492-493)init_collective(505-506)nemo_rl/distributed/virtual_cluster.py (1)
world_size(357-358)nemo_rl/models/generation/vllm/vllm_backend.py (1)
init_collective(34-55)nemo_rl/models/generation/vllm/vllm_generation.py (1)
init_collective(370-407)nemo_rl/models/generation/vllm/vllm_worker.py (1)
init_collective(479-496)nemo_rl/models/policy/dtensor_policy_worker.py (1)
init_collective(504-515)nemo_rl/models/policy/dtensor_policy_worker_v2.py (1)
init_collective(462-472)nemo_rl/models/policy/interfaces.py (1)
init_collective(143-146)nemo_rl/models/policy/lm_policy.py (1)
init_collective(236-248)nemo_rl/models/policy/megatron_policy_worker.py (1)
init_collective(823-836)
nemo_rl/models/policy/dtensor_policy_worker.py (8)
nemo_rl/models/generation/vllm/vllm_backend.py (1)
init_collective(34-55)nemo_rl/models/generation/vllm/vllm_generation.py (1)
init_collective(370-407)nemo_rl/models/generation/vllm/vllm_worker.py (1)
init_collective(479-496)nemo_rl/models/policy/dtensor_policy_worker_v2.py (1)
init_collective(462-472)nemo_rl/models/policy/interfaces.py (1)
init_collective(143-146)nemo_rl/models/policy/lm_policy.py (1)
init_collective(236-248)nemo_rl/models/policy/megatron_policy_worker.py (1)
init_collective(823-836)tests/unit/algorithms/test_distillation.py (3)
init_collective(492-493)init_collective(505-506)world_size(479-480)
nemo_rl/models/policy/interfaces.py (2)
tests/unit/algorithms/test_distillation.py (1)
world_size(479-480)nemo_rl/distributed/virtual_cluster.py (1)
world_size(357-358)
tests/unit/algorithms/test_distillation.py (1)
nemo_rl/algorithms/distillation.py (1)
setup(150-460)
nemo_rl/models/policy/megatron_policy_worker.py (7)
nemo_rl/models/generation/vllm/vllm_backend.py (1)
init_collective(34-55)nemo_rl/models/generation/vllm/vllm_generation.py (1)
init_collective(370-407)nemo_rl/models/generation/vllm/vllm_worker.py (1)
init_collective(479-496)nemo_rl/models/policy/dtensor_policy_worker.py (1)
init_collective(504-515)nemo_rl/models/policy/dtensor_policy_worker_v2.py (1)
init_collective(462-472)nemo_rl/models/policy/interfaces.py (1)
init_collective(143-146)nemo_rl/models/policy/lm_policy.py (1)
init_collective(236-248)
tests/unit/models/generation/test_vllm_generation.py (11)
tests/unit/algorithms/test_distillation.py (3)
world_size(479-480)init_collective(492-493)init_collective(505-506)nemo_rl/distributed/virtual_cluster.py (1)
world_size(357-358)nemo_rl/models/generation/vllm/vllm_backend.py (1)
init_collective(34-55)nemo_rl/models/generation/vllm/vllm_generation.py (1)
init_collective(370-407)nemo_rl/models/generation/vllm/vllm_worker.py (1)
init_collective(479-496)nemo_rl/models/policy/dtensor_policy_worker.py (1)
init_collective(504-515)nemo_rl/models/policy/dtensor_policy_worker_v2.py (1)
init_collective(462-472)nemo_rl/models/policy/interfaces.py (1)
init_collective(143-146)nemo_rl/models/policy/lm_policy.py (1)
init_collective(236-248)nemo_rl/models/policy/megatron_policy_worker.py (1)
init_collective(823-836)nemo_rl/models/generation/interfaces.py (1)
init_collective(212-216)
nemo_rl/models/generation/vllm/vllm_generation.py (2)
tests/unit/algorithms/test_distillation.py (1)
world_size(479-480)nemo_rl/distributed/virtual_cluster.py (1)
world_size(357-358)
nemo_rl/algorithms/distillation.py (9)
tests/unit/algorithms/test_distillation.py (3)
world_size(479-480)init_collective(492-493)init_collective(505-506)nemo_rl/models/generation/vllm/vllm_backend.py (1)
init_collective(34-55)nemo_rl/models/generation/vllm/vllm_generation.py (1)
init_collective(370-407)nemo_rl/models/generation/vllm/vllm_worker.py (1)
init_collective(479-496)nemo_rl/models/policy/dtensor_policy_worker.py (1)
init_collective(504-515)nemo_rl/models/policy/dtensor_policy_worker_v2.py (1)
init_collective(462-472)nemo_rl/models/policy/interfaces.py (1)
init_collective(143-146)nemo_rl/models/policy/lm_policy.py (1)
init_collective(236-248)nemo_rl/models/policy/megatron_policy_worker.py (1)
init_collective(823-836)
nemo_rl/models/policy/lm_policy.py (2)
nemo_rl/distributed/virtual_cluster.py (1)
world_size(357-358)nemo_rl/distributed/worker_groups.py (1)
run_all_workers_single_data(728-772)
nemo_rl/models/policy/dtensor_policy_worker_v2.py (8)
nemo_rl/models/generation/vllm/vllm_backend.py (1)
init_collective(34-55)nemo_rl/models/generation/vllm/vllm_generation.py (1)
init_collective(370-407)nemo_rl/models/generation/vllm/vllm_worker.py (1)
init_collective(479-496)nemo_rl/models/policy/dtensor_policy_worker.py (1)
init_collective(504-515)nemo_rl/models/policy/interfaces.py (1)
init_collective(143-146)nemo_rl/models/policy/lm_policy.py (1)
init_collective(236-248)nemo_rl/models/policy/megatron_policy_worker.py (1)
init_collective(823-836)tests/unit/algorithms/test_distillation.py (3)
init_collective(492-493)init_collective(505-506)world_size(479-480)
nemo_rl/models/generation/vllm/vllm_worker_async.py (2)
tests/unit/algorithms/test_distillation.py (1)
world_size(479-480)nemo_rl/distributed/virtual_cluster.py (1)
world_size(357-358)
🪛 Ruff (0.13.3)
nemo_rl/models/policy/dtensor_policy_worker.py
505-505: Unused method argument: train_world_size
(ARG002)
tests/unit/algorithms/test_distillation.py
492-492: Unused method argument: args
(ARG002)
492-492: Unused method argument: kwargs
(ARG002)
502-502: Unused method argument: args
(ARG002)
502-502: Unused method argument: kwargs
(ARG002)
505-505: Unused method argument: args
(ARG002)
505-505: Unused method argument: kwargs
(ARG002)
nemo_rl/models/policy/megatron_policy_worker.py
824-824: Unused method argument: train_world_size
(ARG002)
nemo_rl/models/policy/dtensor_policy_worker_v2.py
463-463: Unused method argument: train_world_size
(ARG002)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Lint check
- GitHub Check: Lint check
- GitHub Check: Lint check
- GitHub Check: Post submodule check comment / Comment on PR
🔇 Additional comments (9)
nemo_rl/models/generation/vllm/vllm_backend.py (1)
35-49: Rank offset makes senseAdding
train_world_sizeto the rank calculation cleanly shifts vLLM workers after the training ranks, which is exactly what we need now that train and inference groups share one collective. Looks good.nemo_rl/models/generation/vllm/vllm_generation.py (1)
370-404: Propagation of train_world_size is solidMaking
train_world_sizekeyword-only and threading it throughcommon_kwargskeeps the public API unambiguous while ensuring every worker sees the same world metadata.nemo_rl/models/policy/interfaces.py (1)
142-145: Interface stays consistentMaking
train_world_sizepart of the abstract method keeps every policy implementation honest about the new collective contract.tests/unit/models/generation/test_vllm_generation.py (1)
944-965: Test update matches the new contractDeriving
world_sizefromtrain_world_size + inference_world_sizeand passing the train slice explicitly keeps the test in lockstep with the revised collective semantics.nemo_rl/models/policy/lm_policy.py (1)
236-246: Forwarding looks goodThe policy now forwards
train_world_sizealongside the existing parameters, so the worker group gets everything it needs without positional ambiguity.nemo_rl/models/generation/vllm/vllm_worker_async.py (1)
395-412: Async path is wired correctlyIncluding
train_world_sizein the async RPC keeps the worker extension aligned with the new offset logic—parity with the sync worker achieved.nemo_rl/models/policy/dtensor_policy_worker.py (1)
1812-1813: LGTM! Broadcast simplification aligns with inclusive comm group design.The removal of rank-specific gating and unconditional dtype conversion and broadcast is correct. All ranks now participate in the communication group, while
src=0ensures rank 0 remains the source for the broadcast operation.nemo_rl/models/policy/megatron_policy_worker.py (2)
1741-1743: LGTM! Broadcast simplification aligns with inclusive comm group design.The comment "broadcast from train rank 0 to all other ranks (training and inference)" clearly explains the intent. The removal of rank-specific gating allows all ranks to participate in the communication group, while
src=0ensures rank 0 is the broadcast source.
823-836: Approve code changes: unusedtrain_world_sizeparameter is intentional
train_world_sizeis deliberately unused ininit_collectiveofmegatron_policy_worker.pyfor API consistency; vLLM backends use it to compute rank offsets.
beep boop [🤖]: Hi @youngeunkwon0405 👋,
Summary by CodeRabbit