[TRTLLM-11946][feat] Disaggregated gen-first serving with ADP#13112
[TRTLLM-11946][feat] Disaggregated gen-first serving with ADP#13112reasonsolo merged 6 commits intoNVIDIA:mainfrom
Conversation
fd1b125 to
3c95db4
Compare
📝 WalkthroughWalkthroughThe changes introduce stale request info cleanup mechanisms in the disaggregation transfer layer, make Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
tensorrt_llm/_torch/disaggregation/native/transfer.py (2)
1099-1117: Consider deduplicating ranks in ADP broadcast path.The code accumulates ranks via
extend()across all DP groups. While the topology typically ensures each physical rank belongs to one DP group (making duplicates unlikely), adding explicit deduplication would provide a safety net and clarify intent.♻️ Suggested defensive change
# Gen-first with ADP: ctx_dp_rank unknown — broadcast REQUEST_DATA # to ALL ctx sender ranks so every DP group receives it. # get_peer_overlap returns ranks for one DP group (topology is # symmetric), so use dp_rank=0 as representative. dp_size = peer_infos.dp_size - # Union of overlapping ranks across all DP groups for broadcast - all_ranks: list[int] = [] + # Union of overlapping ranks across all DP groups for broadcast (deduplicated) + all_ranks_set: set[int] = set() for dp in range(dp_size): - all_ranks.extend(self._registrar.get_peer_overlap(peer_infos, dp).ranks) + all_ranks_set.update(self._registrar.get_peer_overlap(peer_infos, dp).ranks) + all_ranks = list(all_ranks_set) logger.debug( f"Receiver.dispatch_task: ADP broadcast path, dp_size={dp_size}, " f"all_ranks={all_ranks}" )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tensorrt_llm/_torch/disaggregation/native/transfer.py` around lines 1099 - 1117, The ADP broadcast path in Receiver.dispatch_task collects ranks into all_ranks by extending per-DP overlaps which can produce duplicates; update the logic around self._registrar.get_peer_overlap(peer_infos, dp) to deduplicate the accumulated ranks (e.g., collect into a set or use an ordered-uniquing approach to preserve order) before constructing peer_overlap with type(self._registrar.get_peer_overlap(peer_infos, 0))(ranks=all_ranks); ensure peer_infos and sender_dp_rank handling stays the same and that you pass the deduplicated ranks list to the peer_overlap constructor.
294-308: Minor TOCTOU window in stale sweep (acceptable given TTL protection).There's a small race window between checking
has_session(line 305) and calling_remove_req_info(line 307) where another thread could create a session. However, given the 120-second TTL, this window is unlikely to cause issues in practice. The alternative would be holding both locks simultaneously, which adds complexity.If you wanted to tighten this up, you could:
♻️ Optional: Atomic check-and-remove
for rid in stale_rids: - with self._sessions_lock: - has_session = rid in self._sessions - if not has_session: + with self._sessions_lock, self._peer_requests_lock: + if rid not in self._sessions and rid in self._peer_requests: + self._peer_requests.pop(rid, None) + self._peer_requests_timestamps.pop(rid, None) + logger.debug(f"Swept stale RecvReqInfo for rid={rid}") - self._remove_req_info(rid) - logger.debug(f"Swept stale RecvReqInfo for rid={rid}")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tensorrt_llm/_torch/disaggregation/native/transfer.py` around lines 294 - 308, There is a small TOCTOU race in the stale sweep in transfer.py: between checking membership of rid in self._sessions (in the block using _sessions_lock) and calling self._remove_req_info(rid) under _peer_requests_lock another thread could create a session; to fix, perform the check-and-remove atomically by acquiring both locks (self._peer_requests_lock and self._sessions_lock) together before deciding to call _remove_req_info, or re-check membership of rid in self._sessions while holding _peer_requests_lock (or vice versa) so that the removal only happens if the session is still absent; update the logic around the stale sweep in the method containing now = time.monotonic() to use this atomic check-and-remove approach referencing _peer_requests_timestamps, _STALE_REQ_INFO_TTL_S, _peer_requests_lock, _sessions_lock, _sessions, and _remove_req_info.tensorrt_llm/_torch/pyexecutor/py_executor.py (1)
404-410: Consider adding a more specific type annotation.The buffer stores tuples of
(py_request_id, LlmResponse)but the type annotation is justlist. A more specific annotation would improve readability and IDE support.✨ Suggested type annotation
- self._pending_transfer_responses: list = [] + self._pending_transfer_responses: list[tuple[int, LlmResponse]] = []🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tensorrt_llm/_torch/pyexecutor/py_executor.py` around lines 404 - 410, The _pending_transfer_responses attribute currently uses a bare list type; update its annotation to a specific collection type like list[tuple[int, LlmResponse]] (or list[tuple[str, LlmResponse]] if py_request_id is a string) to reflect that it stores (py_request_id, LlmResponse) tuples, and ensure LlmResponse is in scope (imported or forward-referenced) so the annotation is valid; change the declaration of self._pending_transfer_responses in the PyExecutor class (symbol: _pending_transfer_responses) accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@tensorrt_llm/_torch/disaggregation/native/transfer.py`:
- Around line 1099-1117: The ADP broadcast path in Receiver.dispatch_task
collects ranks into all_ranks by extending per-DP overlaps which can produce
duplicates; update the logic around self._registrar.get_peer_overlap(peer_infos,
dp) to deduplicate the accumulated ranks (e.g., collect into a set or use an
ordered-uniquing approach to preserve order) before constructing peer_overlap
with type(self._registrar.get_peer_overlap(peer_infos, 0))(ranks=all_ranks);
ensure peer_infos and sender_dp_rank handling stays the same and that you pass
the deduplicated ranks list to the peer_overlap constructor.
- Around line 294-308: There is a small TOCTOU race in the stale sweep in
transfer.py: between checking membership of rid in self._sessions (in the block
using _sessions_lock) and calling self._remove_req_info(rid) under
_peer_requests_lock another thread could create a session; to fix, perform the
check-and-remove atomically by acquiring both locks (self._peer_requests_lock
and self._sessions_lock) together before deciding to call _remove_req_info, or
re-check membership of rid in self._sessions while holding _peer_requests_lock
(or vice versa) so that the removal only happens if the session is still absent;
update the logic around the stale sweep in the method containing now =
time.monotonic() to use this atomic check-and-remove approach referencing
_peer_requests_timestamps, _STALE_REQ_INFO_TTL_S, _peer_requests_lock,
_sessions_lock, _sessions, and _remove_req_info.
In `@tensorrt_llm/_torch/pyexecutor/py_executor.py`:
- Around line 404-410: The _pending_transfer_responses attribute currently uses
a bare list type; update its annotation to a specific collection type like
list[tuple[int, LlmResponse]] (or list[tuple[str, LlmResponse]] if py_request_id
is a string) to reflect that it stores (py_request_id, LlmResponse) tuples, and
ensure LlmResponse is in scope (imported or forward-referenced) so the
annotation is valid; change the declaration of self._pending_transfer_responses
in the PyExecutor class (symbol: _pending_transfer_responses) accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: ae2b71d3-3b99-4f26-a6c2-13a5fe21948d
📒 Files selected for processing (4)
tensorrt_llm/_torch/disaggregation/native/transfer.pytensorrt_llm/_torch/disaggregation/transceiver.pytensorrt_llm/_torch/pyexecutor/py_executor.pytests/integration/defs/accuracy/test_disaggregated_serving.py
46103ef to
f443eba
Compare
|
/bot run --disable-fail-fast |
|
PR_Github #43743 [ run ] triggered by Bot. Commit: |
|
PR_Github #43743 [ run ] completed with state
|
f443eba to
5da3ebb
Compare
|
/bot run --disable-fail-fast |
|
PR_Github #43910 [ run ] triggered by Bot. Commit: |
|
PR_Github #43910 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #44033 [ run ] triggered by Bot. Commit: |
|
PR_Github #44033 [ run ] completed with state |
|
/bot run --disable-fail-fast |
|
PR_Github #44200 [ run ] triggered by Bot. Commit: |
de0f27a to
7f9f629
Compare
|
/bot run --disable-fail-fast |
|
PR_Github #45638 [ run ] triggered by Bot. Commit: |
|
PR_Github #45638 [ run ] completed with state
|
Signed-off-by: Lizhi Zhou <1432185+reasonsolo@users.noreply.github.com>
|
/bot run --disable-fail-fast |
1 similar comment
|
/bot run --disable-fail-fast |
Signed-off-by: Lizhi Zhou <1432185+reasonsolo@users.noreply.github.com>
|
/bot run --disable-fail-fast |
|
PR_Github #45729 [ run ] triggered by Bot. Commit: |
Signed-off-by: Lizhi Zhou <1432185+reasonsolo@users.noreply.github.com>
|
/bot run --disable-fail-fast |
|
PR_Github #45848 [ run ] triggered by Bot. Commit: |
|
PR_Github #45848 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #45870 [ run ] triggered by Bot. Commit: |
|
/bot run --disable-fail-fast |
|
PR_Github #46008 [ run ] triggered by Bot. Commit: |
|
PR_Github #46008 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #46243 [ run ] triggered by Bot. Commit: |
|
PR_Github #46243 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #46334 [ run ] triggered by Bot. Commit: |
|
PR_Github #46334 [ run ] completed with state |
…g, structured cancel, bounded quarantine The C++ cache transceiver and Python executor cooperate on disaggregated KV transfer cancellation, timeout, and process health. Earlier attempts (PR NVIDIA#13706, PR NVIDIA#13713, PR NVIDIA#13728) each fixed one slice of the problem but still hung in the field: * PR NVIDIA#13706 with Python changes restarted workers on every routine per-request timeout (Python fail-close was too aggressive). * PR NVIDIA#13713 / PR NVIDIA#13728 hung even with Python cleanup removed because the C++ status polling could still call future.get() on an unready worker future, freezing the executor event loop while NIXL/UCX was wedged. This PR keeps each layer's responsibility separate, as described in docs/source/features/disagg-kv-transfer-hang-restart-analysis.md and the new docs/source/features/disagg-kv-transfer-session-lifecycle.md: * checkContextTransferStatus / checkGenTransferStatus are now strictly non-blocking. They poll with wait_for(0ms) and only call future.get() when the future is already ready. atLeastRequestNum > 0 still admits additional ready entries but never selects an unready one to satisfy the count. drainContextTransferStatus / drainGenTransferStatus are the only blocking variants and are intended for shutdown drain. * cancelRequestStructured returns a TransferCancelResult enum with six outcomes (NotFound, AlreadyComplete, CancelledBeforeAdvertise, CancelRequestedInFlight, BackendUnhealthy, NotCancellable). Only the first three permit Python to free request resources; the others keep the request owned by C++ until the worker future reaches a final state. The historical bool cancelRequest is preserved as a backward-compatible wrapper. * The transceiver maintains a bounded quarantine counter and a global progress deadline. A per-request timeout marks the entry quarantined but keeps the future pinned so NIXL/UCX cannot write into freed memory. If quarantined entries exceed mQuarantineBudget or no worker has reached a final state for longer than mGlobalProgressDeadlineMs, isHealthy() flips false and getHealth() surfaces the snapshot for orchestration. * PyExecutor adds _can_terminate_request_now / _inflight_cancel_- requested_ids so _do_terminate_request defers freeing resources for any request that is still in disagg transfer state or whose cancel is in flight. Per-request timeouts no longer clear active_requests, the waiting queue, or set is_shutdown — that was the PR NVIDIA#13706 restart-loop trigger and is explicitly forbidden by the analysis doc. * The ADP synchronized pending-response flush from PR NVIDIA#13112 is ported here (the base commit precedes that merge). Transfer- completion responses created in _end_transfer_and_maybe_terminate are buffered and flushed at synchronised loop points so every DP rank participates in the tp_gather collective. Test coverage: * tests/unittest/disaggregated/test_kv_transfer_session_lifecycle.py — 14 unit tests for the deferred-terminate guard, the structured- cancel decision tree, and the ADP flush symmetry. They run without GPU or MPI so they fail fast in pre-merge CI. References: * analysis: docs/source/features/disagg-kv-transfer-hang-restart-analysis.md * contract: docs/source/features/disagg-kv-transfer-session-lifecycle.md Signed-off-by: Yifan Jiang <19356972+yifjiang@users.noreply.github.com>
With Attention Data Parallelism (ADP) enabled, several issues prevented gen-first disaggregated serving from working correctly:
tp_gather deadlock in py_executor — _enqueue_responses performs a tp_gather collective when ADP is enabled. When called from _end_transfer_and_maybe_terminate (inside _send_kv_async), only the owning DP rank enters the collective, causing a deadlock. Fix by buffering transfer-completion responses and flushing them at synchronised points in the executor loop.
ctx_dp_rank broadcast in transceiver — With ADP the context DP rank is unknown at registration time. Return ctx_dp_rank=None so the gen-side Receiver broadcasts REQUEST_DATA to all ctx DP ranks. The broadcasting itself is an overhead for sure, but in gen-first use cases, the first optimization goal is TTFT, thus, it's still better than a two-stage query-send method (query the correct DP rank first then send request to correct rank).
ADP broadcast dispatch in native/transfer — When ctx_dp_rank is None, the Receiver now unions overlapping ranks across all DP groups for broadcast. Also adds stale RecvReqInfo sweep to prevent unbounded growth from orphaned ADP broadcast entries.
Also adds enable_attention_dp parametrization to the gen-first disaggregated serving integration test.
Summary by CodeRabbit
New Features
Improvements
Description
Test Coverage
PR Checklist
Please review the following before submitting your PR:
PR description clearly explains what and why. If using CodeRabbit's summary, please make sure it makes sense.
PR Follows TRT-LLM CODING GUIDELINES to the best of your knowledge.
Test cases are provided for new code paths (see test instructions)
Any new dependencies have been scanned for license and vulnerabilities
CODEOWNERS updated if ownership changes
Documentation updated as needed
Update tava architecture diagram if there is a significant design change in PR.
The reviewers assigned automatically/manually are appropriate for the PR.
Please check this after reviewing the above items as appropriate for this PR.
GitHub Bot Help
To see a list of available CI bot commands, please comment
/bot help.