[None][feat] use multi thread for kv transfer#13075
Conversation
|
/bot run |
|
PR_Github #43449 [ run ] triggered by Bot. Commit: |
📝 WalkthroughWalkthroughThis PR adds thread-safe synchronization for concurrent KV transfer task completion counters using per-task locks and modifies task queue routing to consider both session ID and peer rank rather than session ID alone. Test files are updated to exercise multi-threaded code paths. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsTimed out fetching pipeline failures after 30000ms Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tensorrt_llm/_torch/disaggregation/native/transfer.py (1)
420-435:⚠️ Potential issue | 🟠 MajorMake the count-to-future transition atomic.
The new lock only protects the increment.
done()/set_result()/ status updates still happen after releasing it, so another worker can callsession.set_exception()in between and turn this into anInvalidStateErroror a nondeterministicERROR/TRANSFERREDoutcome. The completion check and future/status resolution need to stay under the same per-task lock in both paths.Also applies to: 483-495
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tensorrt_llm/_torch/disaggregation/native/transfer.py` around lines 420 - 435, The increment of task.transferred_count is raced with the future/status checks because only the increment is inside with task.lock; move the entire completion logic (the count comparison, both session.set_exception calls, the write_meta.task_future.done() check, write_meta.task_future.set_result(AgentResult.SUCCESS), and task.status = TaskStatus.ERROR) inside the same with task.lock so the transition from counting -> error/success is atomic; apply the same change to the other occurrence around the code referenced (the block at ~483-495) to ensure both paths use the per-task lock for checking and resolving the future.
🧹 Nitpick comments (1)
tensorrt_llm/_torch/disaggregation/native/transfer.py (1)
153-157: Prefix this lock as internal.
SendTaskBase.lockis only consumed inside this module, so exposing it as a public-looking attribute unnecessarily widens the class surface.As per coding guidelines, "Variables and functions not part of a class's or module's public interface should be prefixed with an underscore".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tensorrt_llm/_torch/disaggregation/native/transfer.py` around lines 153 - 157, The SendTaskBase exposes a public-looking lock attribute; rename SendTaskBase.lock to a private attribute (e.g., SendTaskBase._lock) and update all references in this module to use the new name. Change the constructor to assign to self._lock and replace any usage sites that call or access self.lock (including methods or external functions in this file that manipulate the lock) to use self._lock so the attribute is treated as internal only.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tensorrt_llm/_torch/disaggregation/native/transfer.py`:
- Around line 289-294: The current _enqueue routes by (unique_rid, peer_rank)
causing the same peer_endpoint to hit different threads while
_get_or_connect_dealer() shares a process-global ZMQMessenger per endpoint;
change routing so the same peer endpoint always maps to the same thread (e.g.,
compute thread_idx = hash(write_meta.peer_endpoint) % self._num_threads in
_enqueue) OR make the dealer cache thread-local (store ZMQMessenger cache per
worker thread keyed by endpoint, including thread_idx in the cache key used by
_get_or_connect_dealer()); update references to _send_task_queues, _num_threads,
_enqueue, and _get_or_connect_dealer to use the chosen approach so a DEALER
socket is never shared across threads when KV_TRANSFER_NUM_THREADS > 1.
---
Outside diff comments:
In `@tensorrt_llm/_torch/disaggregation/native/transfer.py`:
- Around line 420-435: The increment of task.transferred_count is raced with the
future/status checks because only the increment is inside with task.lock; move
the entire completion logic (the count comparison, both session.set_exception
calls, the write_meta.task_future.done() check,
write_meta.task_future.set_result(AgentResult.SUCCESS), and task.status =
TaskStatus.ERROR) inside the same with task.lock so the transition from counting
-> error/success is atomic; apply the same change to the other occurrence around
the code referenced (the block at ~483-495) to ensure both paths use the
per-task lock for checking and resolving the future.
---
Nitpick comments:
In `@tensorrt_llm/_torch/disaggregation/native/transfer.py`:
- Around line 153-157: The SendTaskBase exposes a public-looking lock attribute;
rename SendTaskBase.lock to a private attribute (e.g., SendTaskBase._lock) and
update all references in this module to use the new name. Change the constructor
to assign to self._lock and replace any usage sites that call or access
self.lock (including methods or external functions in this file that manipulate
the lock) to use self._lock so the attribute is treated as internal only.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: f5daf00b-4bd8-4e3f-b25a-b8799816e863
📒 Files selected for processing (3)
tensorrt_llm/_torch/disaggregation/native/transfer.pytests/unittest/disaggregated/test_kv_transfer.pytests/unittest/disaggregated/test_kv_transfer_mp.py
|
PR_Github #43449 [ run ] completed with state
|
a6f6c00 to
d8eef88
Compare
|
/bot run |
|
PR_Github #43605 [ run ] triggered by Bot. Commit: |
|
PR_Github #43605 [ run ] completed with state
|
c7e107e to
d58455e
Compare
|
/bot run |
|
PR_Github #43989 [ run ] triggered by Bot. Commit: |
|
PR_Github #43989 [ run ] completed with state
|
601220e to
56b8de6
Compare
|
/bot run |
|
PR_Github #44346 [ run ] triggered by Bot. Commit: |
|
PR_Github #44346 [ run ] completed with state
|
56b8de6 to
23a3410
Compare
|
/bot run |
23a3410 to
0020d55
Compare
|
/bot run |
|
PR_Github #44547 [ run ] triggered by Bot. Commit: |
0020d55 to
c22eb68
Compare
|
PR_Github #47100 [ run ] triggered by Bot. Commit: |
|
PR_Github #47100 [ run ] completed with state
|
0a1709b to
ccbda89
Compare
|
/bot run --stage-list "A10-PyTorch-1, A10-PyTorch-2, B300-PyTorch-1, DGX_B200-PyTorch-1, DGX_B200-PyTorch-3" |
|
PR_Github #47475 [ run ] triggered by Bot. Commit: |
|
PR_Github #47475 [ run ] completed with state
|
|
/bot run --stage-list "A10-PyTorch-1, A10-PyTorch-2, B300-PyTorch-1, DGX_B200-PyTorch-1, DGX_B200-PyTorch-3" |
ccbda89 to
66b7806
Compare
|
PR_Github #47632 [ run ] triggered by Bot. Commit: |
|
PR_Github #47632 [ run ] completed with state
|
66b7806 to
1357b80
Compare
|
/bot run --stage-list "B300-PyTorch-1, DGX_B200-PyTorch-1, DGX_B200-PyTorch-3" |
👎 Promotion blocked, new vulnerability foundVulnerability report
|
1357b80 to
6a3c985
Compare
|
/bot run --stage-list "B300-PyTorch-1, DGX_B200-PyTorch-1, DGX_B200-PyTorch-3" |
|
PR_Github #47939 [ run ] triggered by Bot. Commit: |
|
PR_Github #47939 [ run ] completed with state |
…one thread Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>
Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>
6a3c985 to
af0a1ad
Compare
|
/bot skip --comment "all test passed" |
|
PR_Github #48094 [ skip ] triggered by Bot. Commit: |
|
PR_Github #48094 [ skip ] completed with state |
Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>
…one thread
Summary by CodeRabbit
Bug Fixes
Performance & Stability
Description
ctx1dep4_gen4_tep8_deepseek r1
gen side kv transfer time (ms)
┌────────┬────────┬────────┬────────┬─────────┐
│ Metric │ thread 4 │ thread 1 │ │ perf imporment│
├────────┼────────┼────────┼─────────┼─────────┤
│ mean │ 12.772 │ 15.229 │ +2.457 │ +19.24% │
├────────┼────────┼────────┼─────────┼─────────┤
│ median │ 12.055 │ 14.631 │ +2.576 │ +21.37% │
├────────┼────────┼────────┼─────────┼─────────┤
│ p90 │ 14.200 │ 17.535 │ +3.335 │ +23.49% │
├────────┼────────┼────────┼─────────┼─────────┤
│ p95 │ 18.809 │ 20.630 │ +1.821 │ +9.68% │
├────────┼────────┼────────┼─────────┼─────────┤
│ p99 │ 23.312 │ 29.193 │ +5.881 │ +25.23% │
├────────┼────────┼────────┼─────────┼─────────┤
│ max │ 30.318 │ 47.499 │ +17.181 │ +56.67% │
└────────┴────────┴────────┴─────────┴─────────┘
Test Coverage
PR Checklist
Please review the following before submitting your PR:
PR description clearly explains what and why. If using CodeRabbit's summary, please make sure it makes sense.
PR Follows TRT-LLM CODING GUIDELINES to the best of your knowledge.
Test cases are provided for new code paths (see test instructions)
Any new dependencies have been scanned for license and vulnerabilities
CODEOWNERS updated if ownership changes
Documentation updated as needed
Update tava architecture diagram if there is a significant design change in PR.
The reviewers assigned automatically/manually are appropriate for the PR.
Please check this after reviewing the above items as appropriate for this PR.
GitHub Bot Help
To see a list of available CI bot commands, please comment
/bot help.