[None][fix] py_executor: lift conditional tp_allgather sites out of per-rank-divergent gates#14560
Conversation
a716c67 to
f062a13
Compare
| with self.stats_lock: | ||
| cap = self.max_stats_len * tp_size | ||
| overflow = max(0, len(self.stats) + len(rank_dicts) - cap) | ||
| if overflow: |
There was a problem hiding this comment.
This is awkward, appending all rank_dicts and trimming by cap is much more human readable
There was a problem hiding this comment.
This is aligned with origin code, but I think you are right. Fixed.
| _append_iter_stats takes its legacy path and never populates the | ||
| buffer. | ||
| """ | ||
| tp_size = getattr(self.dist, "tp_size", 1) |
There was a problem hiding this comment.
self.dist.tp_size is accessible for sure. Agents use getattr toooo much.
There was a problem hiding this comment.
Yes. This function is extracted out of the origin code.
I will fix this.
…er-rank-divergent gates
Under ADP + disagg on feat/deepseek_v4, GEN workers desync after a small
number of successful requests and all 8 ranks crash inside
ADPRouter.gather_all_rank_states with either:
TypeError: RankState.__init__() takes from 2 to 4 positional arguments
but 27 were given (with TLLM_METRICS_ALL_RANKS=1)
TypeError: argument of type 'bool' is not iterable
(without TLLM_METRICS_ALL_RANKS=1)
Root cause is in py_executor.py, not adp_router.py: two conditional
tp_allgather collectives live inside per-rank-divergent gates and
misalign the MPI collective queue.
1. _handle_responses:4438-4440 (bool payload) — called from
_process_previous_batch which is gated by
`should_process_previous_batch = can_queue or not can_queue_this_rank`
plus `previous_batch is not None`. When one DP rank has an empty
batch but its previous_batch was cleared the iter before, that
rank skips _process_previous_batch while peers enter it. Peers
issue the tp_allgather, the empty-batch rank does not, and the
next collective (the very next gather_all_rank_states) receives
the bool payload positionally as RankState fields.
2. _append_iter_stats:1436 (dict payload, ~27 keys) — same outer gate
via _process_iter_stats. With TLLM_METRICS_ALL_RANKS=1 the same
divergent gate fires this gather AND the timeout gather, so the
misalignment lands the 27-key dict in the RankState slot.
Fix: mirror the existing _flush_pending_transfer_responses pattern.
Buffer the timed-out requests and the per-rank iter-stats dict on
self, drain at a rank-symmetric position next to the existing flush.
Adds two new helpers:
- _handle_kv_transfer_timeouts_synced — replaces the inline ADP
branch in _handle_responses
- _flush_iter_stats_synced — replaces the inline gather in
_append_iter_stats
Wired in all three executor loops:
- _executor_loop_overlap: after the per-rank-divergent
`if previous_batch is not None and should_process_previous_batch`
block, alongside the existing _flush_pending_transfer_responses.
- _executor_loop (non-overlap): right after the `if can_queue` block
that calls _handle_responses (already rank-symmetric, but the
buffer-drain symmetry is preserved here too).
- _executor_loop_pp / _handle_executed_batch: at the bottom of
every call (each rank calls it an equal number of times per outer
iter via the rank-0-broadcast executed_batch_num), outside the
inner `executed_batch is not None` conditional.
Bug report:
ape-repo/astra-projects/wwfo/artificial-analysis@DSV4-disagg:
docs/knowledge/dsv4_disagg_dep8_adp_router_desync.md
Repro: DEP8+DEP8 1p1d DSv4-Pro disagg smoke at concurrency 72; failure
fires every iter ~900 once at least one GEN rank's batch_size hits 0.
Signed-off-by: Lance Liao <108499334+lancelly@users.noreply.github.com>
f062a13 to
55065c6
Compare
|
/bot run --disable-fail-fast |
|
PR_Github #50354 [ run ] triggered by Bot. Commit: |
|
PR_Github #50354 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #50377 [ run ] triggered by Bot. Commit: |
|
PR_Github #50377 [ run ] completed with state |
Summary
Fix the GEN-side ADP-router desync that crashes all 8 ranks of DSv4-Pro DEP8+DEP8 disagg with one of:
Repro doc:
ape-repo/astra-projects/wwfo/artificial-analysis@DSV4-disagg:docs/knowledge/dsv4_disagg_dep8_adp_router_desync.md.Root cause
Two conditional
tp_allgathercollectives intensorrt_llm/_torch/pyexecutor/py_executor.pylive inside per-rank-divergent gates:_handle_responsesline 4438 —tp_allgather(bool(timed_out_requests)). Reached from_process_previous_batch(overlap loop), which is gated onshould_process_previous_batch = can_queue or not can_queue_this_rankplusprevious_batch is not None. When one ADP rank'sscheduled_batch.batch_size == 0and itsprevious_batchwas cleared the iter before, that rank skips_process_previous_batchwhile peers enter it. Peers issue thetp_allgather, the empty-batch rank does not — the next collective in the queue (the very nextgather_all_rank_states) then receives the bool payload positionally asRankStatefields and crashes on every rank._append_iter_statsline 1436 —tp_allgather(local_dict)(27-key payload underTLLM_METRICS_ALL_RANKS=1). Same outer gate via_process_iter_stats. With the env var set, the per-rank-divergent branch fires both the timeout gather AND the iter-stats gather, so the misalignment moves by two positions and the 27-key dict lands in theRankState.deserializeslot.The three observed variants (dict, bool, int) are the same bug, differing only in how many extra collectives the divergent branch issued and which downstream collective consumes the misaligned payload.
Fix
Mirror the existing
_flush_pending_transfer_responsespattern: buffer the payload onselfand drain at a rank-symmetric flush point._handle_responses→ push toself._pending_timed_out_requests. New helper_handle_kv_transfer_timeouts_synced()does the ADP-safe drain._append_iter_stats→ push toself._pending_iter_stats_dict. New helper_flush_iter_stats_synced()does the ADP-safe drain (gathersNonefrom idle ranks so the queue stays aligned).if previous_batch is not None and should_process_previous_batchblock (immediately adjacent to the existing_enqueue_responses([])sync sibling in theelsebranch).if can_queueblock that calls_handle_responses(outside the if to be defensive against future restructuring)._handle_executed_batch: at the bottom of every call, outside theexecuted_batch is not Noneconditional.handle_executed_batches(executed_batch_num)calls this helper an equal number of times on every PP rank (broadcastexecuted_batch_num).No new collectives introduced — the existing two are just moved out of the divergent branches to the same flush point the codebase already uses for
_enqueue_responses.