Skip to content

elegantly prevent data re-put within DP#50

Merged
0oshowero0 merged 8 commits into
TransferQueue:han/optimize_tq_collectfrom
jianjunzhong:feat/optimize_tq_collect
Dec 20, 2025
Merged

elegantly prevent data re-put within DP#50
0oshowero0 merged 8 commits into
TransferQueue:han/optimize_tq_collectfrom
jianjunzhong:feat/optimize_tq_collect

Conversation

@jianjunzhong
Copy link
Copy Markdown

What does this PR do?

Add concise overview of what this PR aims to achieve or accomplish. Reference related GitHub issues and PRs that help with the review.

Checklist Before Starting

  • Search for similar PRs. Paste at least one query link here: ...
  • Format the PR title as [{modules}] {type}: {description} (This will be checked by the CI)
    • {modules} include fsdp, megatron, sglang, vllm, rollout, trainer, ci, training_utils, recipe, hardware, deployment, ray, worker, single_controller, misc, perf, model, algo, env, tool, ckpt, doc, data
    • If this PR involves multiple modules, separate them with , like [megatron, fsdp, doc]
    • {type} is in feat, fix, refactor, chore, test
    • If this PR breaks any API (CLI arguments, config, function signature, etc.), add [BREAKING] to the beginning of the title.
    • Example: [BREAKING][fsdp, megatron] feat: dynamic batching

Test

For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc.

API and Usage Example

Demonstrate how the API changes if any, and provide usage example(s) if possible.

# Add code snippet or script demonstrating how to use this

Design & Code Changes

Demonstrate the high-level design if this PR is complex, and list the specific changes.

Checklist Before Submitting

Important

Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.

Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Dec 19, 2025

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

jianjunzhong and others added 2 commits December 19, 2025 13:39
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the data collection logic in the distributed parallelism (DP) framework to prevent redundant data re-put operations. It moves the logic for determining whether to collect data from individual workers to the decorator level, making it more centralized and elegant.

  • Introduces _compute_need_collect() function to determine collection necessity based on dispatch mode and worker state
  • Removes the collect_from_rank kwarg-based approach in favor of inspecting dispatch mode configuration
  • Simplifies collect_nd_compute_dataproto by removing the collect_mask parameter

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 8 comments.

File Description
verl/utils/transferqueue_utils.py Adds _compute_need_collect() helper function, updates tqbridge decorator to accept dispatch_mode parameter, removes manual collect_from_rank kwarg handling, and changes BatchMeta() to BatchMeta.empty()
verl/single_controller/base/decorator.py Removes collect_mask parameter from dispatch/collect functions, passes dispatch_mode to tqbridge, and simplifies the lazy compute dispatch/collect logic
Comments suppressed due to low confidence (1)

verl/single_controller/base/decorator.py:274

    output = collect_nd_compute(worker_group, output)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +273 to +274
def collect_nd_compute_dataproto(worker_group, output):
output = collect_nd_compute(worker_group, output)
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function collect_nd_compute at line 254 expects collect_mask as its first parameter, but the call at line 274 only passes worker_group and output. This will cause a TypeError when the function is invoked. The function signature of collect_nd_compute needs to be updated to remove the collect_mask parameter, or you need to retrieve and pass the collect_mask here.

Copilot uses AI. Check for mistakes.
Comment thread verl/utils/transferqueue_utils.py
Comment thread verl/utils/transferqueue_utils.py Outdated
Comment thread verl/utils/transferqueue_utils.py Outdated
Comment on lines +157 to +174
Args:
dispatch_mode: For controlling data collection logic. If None,
_compute_need_collect will always return True.
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation should explain what happens when dispatch_mode is None vs when it's provided, and clarify what types are accepted for dispatch_mode (should it be a dict or can it also be a Dispatch enum?). The current description is incomplete.

Copilot uses AI. Check for mistakes.
Comment on lines +154 to +158
collect_fn_name = dispatch_mode["collect_fn"].func.__name__
if collect_fn_name != "collect_lazy_compute_data_proto" or len(args) < 1 or not isinstance(args[0], Worker):
return True

collect_mesh_name = dispatch_mode["collect_fn"].args[0]
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition checks if collect_fn_name != "collect_lazy_compute_data_proto" to return True early. However, this hardcoded function name check is brittle. If the function is renamed or if there are other similar collect functions added in the future, this logic will break. Consider using a more robust approach, such as checking a property/attribute on the collect function or using a registry pattern.

Suggested change
collect_fn_name = dispatch_mode["collect_fn"].func.__name__
if collect_fn_name != "collect_lazy_compute_data_proto" or len(args) < 1 or not isinstance(args[0], Worker):
return True
collect_mesh_name = dispatch_mode["collect_fn"].args[0]
collect_fn = dispatch_mode["collect_fn"]
base_fn = getattr(collect_fn, "func", collect_fn)
# Prefer an explicit attribute on the collect function, fall back to name-based check.
is_lazy_collect = getattr(
base_fn,
"is_lazy_compute_data_proto",
base_fn.__name__ == "collect_lazy_compute_data_proto",
)
if not is_lazy_collect or len(args) < 1 or not isinstance(args[0], Worker):
return True
collect_mesh_name = collect_fn.args[0]

Copilot uses AI. Check for mistakes.
Comment thread verl/utils/transferqueue_utils.py
Comment thread verl/utils/transferqueue_utils.py Outdated
elif collect_from_rank == False:
return BatchMeta()
elif not need_collect:
return BatchMeta.empty()
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code now uses BatchMeta.empty() instead of BatchMeta(). Verify that the BatchMeta class from the transfer_queue library actually has an empty() class method. If it doesn't exist, this will cause an AttributeError at runtime.

Copilot uses AI. Check for mistakes.
Comment thread verl/utils/transferqueue_utils.py Outdated
return BatchMeta()
return output
elif not need_collect:
return BatchMeta.empty()
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code now uses BatchMeta.empty() instead of BatchMeta(). Verify that the BatchMeta class from the transfer_queue library actually has an empty() class method. If it doesn't exist, this will cause an AttributeError at runtime.

Copilot uses AI. Check for mistakes.
Comment thread verl/utils/transferqueue_utils.py Outdated
Comment thread verl/utils/transferqueue_utils.py Outdated
Comment thread verl/utils/transferqueue_utils.py Outdated
Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
Comment thread verl/utils/transferqueue_utils.py Outdated
Comment thread verl/utils/transferqueue_utils.py
Comment thread verl/utils/transferqueue_utils.py Outdated
Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
@0oshowero0 0oshowero0 merged commit df06822 into TransferQueue:han/optimize_tq_collect Dec 20, 2025
4 of 5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants