From e1728ea73c0626f19b0eebddf8ab3c47cfde8ec9 Mon Sep 17 00:00:00 2001 From: poursoul Date: Mon, 25 May 2026 19:57:37 +0800 Subject: [PATCH] Refactor: enforce mix strict priority and cross-thread idle gating in dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Apply to both a2a3 and a5 runtimes. Phase 4 of resolve_and_dispatch is reshaped from shape-outer/phase-inner into a new dispatch_ready_tasks pass with phase-split semantics: * IDLE-MIX runs first. If mix tasks remain (local_buf + ready_queue), AIC and AIV yield both their IDLE and PENDING stages for the pass. * MIX-PENDING is always considered next, gated only on whether any peer scheduler thread has an idle cluster — so residual mix continues to drain via pending slots regardless of skip_aic_aiv. * After MIX-PENDING, AIC/AIV-PENDING runs only when mix is fully drained and the corresponding shape has no peer idle core. * Local buffers are flushed between the IDLE and PENDING stages so PENDING-stage queue checks and peer threads see IDLE-stage results, and again on every return path via an RAII FlushGuard so release_fanin output during PENDING does not carry into the next iteration's IDLE. The PMU single-issue short-circuit and the sync_start drain protocol are preserved unchanged. a5 picks up the PMU guard alongside the new policy (its prior implementation lacked it); there's no automated test for this — PMU profiling correctness requires hardware PMU counters and a single-issue baseline to compare against, neither of which the sim suite provides. The change brings a5 in line with a2a3. cross-thread peer-tracker reads in has_idle_in_other_threads stay plain (not atomic) and consume the value as a hint; the comment on the implementation spells out the aarch64 single-copy-atomicity argument and the drain-protocol exclusion. PTO2_SCHED_PROFILING note: local_overflow_count now accumulates each batch separately as flush_local_bufs is called multiple times per pass (mid flush + RAII tail flush). Each entry is still counted exactly once (count is zeroed after push_batch), but the per-pass total reflects "entries pushed to the global queue this pass" rather than the pre-refactor "buf residual at pass end". Comparing traces across commits, expect the post-refactor number to be greater-or-equal. --- .../runtime/scheduler/pto_scheduler.h | 4 +- .../runtime/scheduler/scheduler_context.h | 35 +++- .../runtime/scheduler/scheduler_dispatch.cpp | 178 ++++++++++++++---- .../runtime/scheduler/pto_scheduler.h | 4 +- .../runtime/scheduler/scheduler_context.h | 36 +++- .../runtime/scheduler/scheduler_dispatch.cpp | 178 ++++++++++++++---- 6 files changed, 350 insertions(+), 85 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index 1fa6f41c0..8d50681ba 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -69,8 +69,8 @@ struct PTO2ReadyQueueSlot { * the start of each iteration (verified by always_assert). * * Phase 1 fills per-CoreType buffers via on_task_complete(). - * dispatch_ready_tasks_to_idle_cores drains them: local-first via - * get_ready_task_batch, then remaining tasks pushed to global readyQ. + * The dispatch stage drains them local-first via get_ready_tasks_batch, + * with any remaining tasks pushed to the global ready queue. */ // Number of CoreType values eligible for local dispatch (AIC=0, AIV=1) static constexpr int PTO2_LOCAL_DISPATCH_TYPE_NUM = 2; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h index 23fe31a95..52ee94a42 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h @@ -219,7 +219,6 @@ class SchedulerContext { } return "?"; } - static const PTO2ResourceShape *get_dispatch_order(int32_t thread_idx); int pop_ready_tasks_batch( PTO2ResourceShape shape, int32_t thread_idx, PTO2LocalReadyBuffer &local_buf, PTO2TaskSlotState **out, @@ -250,6 +249,40 @@ class SchedulerContext { CoreTracker &tracker, bool &entered_drain, bool &made_progress, bool &try_pushed ); + // One pass of "Phase 4" in the resolve_and_dispatch loop: IDLE-stage dispatch + // for MIX then (if no mix residual) AIC/AIV; mid-flush of local buffers; then + // PENDING-stage dispatch with cross-thread idle gating. MIX is strictly + // prioritized — when mix residual is detected after MIX-IDLE, AIC/AIV are + // skipped for the whole pass but MIX-PENDING still runs. + // + // Forward-progress argument for AIC/AIV: skip_aic_aiv is sticky for the + // current pass only. The next loop iteration re-evaluates after Phase 1 + // completion polling and the global MIX queue draining (here or on any + // peer thread). AIC/AIV starvation is therefore bounded by MIX throughput, + // not unbounded — once mix completes on at least one cluster, the next + // pass either drains the residual or admits AIC/AIV. + void dispatch_ready_tasks( + int32_t thread_idx, CoreTracker &tracker, PTO2LocalReadyBuffer (&local_bufs)[PTO2_NUM_RESOURCE_SHAPES], + bool pmu_active, bool &made_progress, bool &try_pushed + ); + + // Returns true if any *other* scheduler thread currently has an idle core + // matching `shape`. Used as a scheduling hint on the PENDING dispatch path + // — see the implementation in scheduler_dispatch.cpp for the hint-semantics + // rationale and the safety argument against the drain worker. + bool has_idle_in_other_threads(int32_t self_thread_idx, PTO2ResourceShape shape) const; + + // True if mix tasks remain anywhere this thread could see them: the caller's + // MIX local LIFO stack or the global MIX ready queue. Approximate — + // PTO2ReadyQueue::size() (see pto_scheduler.h) snapshots its enqueue/dequeue + // positions with std::memory_order_relaxed and may interleave with concurrent + // push/pop. Don't confuse with PTO2SpscQueue::size(), which uses acquire + // loads — that one isn't on this path. A stale read here causes at most one + // extra/missed AIC/AIV skip and self-corrects on the next loop iteration. + bool has_residual_mix(const PTO2LocalReadyBuffer &mix_local_buf) const { + return mix_local_buf.count > 0 || sched_->ready_queues[static_cast(PTO2ResourceShape::MIX)].size() > 0; + } + // ========================================================================= // Completion & drain (scheduler_completion.cpp) // ========================================================================= diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp index 57f6041a4..a460e78ef 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp @@ -57,18 +57,22 @@ const char *SchedulerContext::shape_name(PTO2ResourceShape shape) { return "UNKNOWN"; } -const PTO2ResourceShape *SchedulerContext::get_dispatch_order(int32_t thread_idx) { - static constexpr PTO2ResourceShape kEvenOrder[PTO2_NUM_RESOURCE_SHAPES] = { - PTO2ResourceShape::MIX, - PTO2ResourceShape::AIC, - PTO2ResourceShape::AIV, - }; - static constexpr PTO2ResourceShape kOddOrder[PTO2_NUM_RESOURCE_SHAPES] = { - PTO2ResourceShape::MIX, - PTO2ResourceShape::AIV, - PTO2ResourceShape::AIC, - }; - return (thread_idx % 2 == 0) ? kEvenOrder : kOddOrder; +bool SchedulerContext::has_idle_in_other_threads(int32_t self_thread_idx, PTO2ResourceShape shape) const { + // Cross-thread read of peer trackers without explicit synchronization. The + // backing `core_states_` is a naturally aligned uint64_t; aarch64 guarantees + // single-copy atomicity for an 8-byte aligned load, so no torn read. The + // value is consumed only as a scheduling *hint* — a stale read at worst + // causes one missed/extra pending dispatch, corrected on the next iteration. + // Drain-mode cross-thread writes are serialized by handle_drain_mode's ack + // barrier (all peers spin out of the dispatch path before any tracker + // mutation), so this routine is never racing the drain worker. + for (int32_t t = 0; t < active_sched_threads_; t++) { + if (t == self_thread_idx) continue; + if (core_trackers_[t].get_idle_core_offset_states(shape).has_value()) { + return true; + } + } + return false; } int SchedulerContext::pop_ready_tasks_batch( @@ -325,6 +329,125 @@ void SchedulerContext::dispatch_shape( } } +void SchedulerContext::dispatch_ready_tasks( + int32_t thread_idx, CoreTracker &tracker, PTO2LocalReadyBuffer (&local_bufs)[PTO2_NUM_RESOURCE_SHAPES], + bool pmu_active, bool &made_progress, bool &try_pushed +) { + using Phase = CoreTracker::DispatchPhase; + constexpr int32_t MIX_I = static_cast(PTO2ResourceShape::MIX); + + // MIX is handled explicitly at the top of each stage; only AIC/AIV cycle + // through this 2-elem array, with order toggled by thread parity for + // shape-level load balancing across threads. + static constexpr PTO2ResourceShape kAicAivOrder[2][2] = { + {PTO2ResourceShape::AIC, PTO2ResourceShape::AIV}, + {PTO2ResourceShape::AIV, PTO2ResourceShape::AIC}, + }; + const PTO2ResourceShape *aic_aiv = kAicAivOrder[thread_idx & 1]; + +#if PTO2_SCHED_PROFILING + auto &l2_perf = sched_l2_perf_[thread_idx]; +#endif + + // Note: flush_local_bufs is invoked multiple times per pass (mid-function + // flush + RAII tail flush). local_overflow_count accumulates each batch + // separately — each entry is counted exactly once (count is zeroed after + // push_batch). The total reflects "entries this pass pushed to the global + // queue", which is slightly larger than the pre-refactor "buf residual at + // pass end" semantics — comparing PTO2_SCHED_PROFILING traces across + // commits, expect the post-refactor number to be greater-or-equal. + auto flush_local_bufs = [&]() { + for (int32_t s = 0; s < PTO2_NUM_RESOURCE_SHAPES; s++) { + auto &lb = local_bufs[s]; +#if PTO2_SCHED_PROFILING + l2_perf.local_overflow_count += lb.count; +#endif + if (lb.count > 0) { + sched_->ready_queues[s].push_batch(lb.slot_states, lb.count); + lb.count = 0; + } + } + }; + // Every return path below must flush; wrap in RAII so we cannot forget. + // The mid-function flush between IDLE and PENDING is still called + // explicitly — guard only covers exit. + struct FlushGuard { + decltype(flush_local_bufs) &flush_fn; + ~FlushGuard() { flush_fn(); } + } flush_guard{flush_local_bufs}; + + bool entered_drain = false; + + // ===== IDLE stage ===== + dispatch_shape( + thread_idx, PTO2ResourceShape::MIX, Phase::IDLE, local_bufs[MIX_I], tracker, entered_drain, made_progress, + try_pushed + ); + if (entered_drain) return; + + // MIX-IDLE residual: AIC/AIV (both IDLE and PENDING) yield for this pass. + // MIX-PENDING below still runs — that is the core of "mix strict priority": + // pending slots are spent on mix before AIC/AIV get any chance. + bool skip_aic_aiv = has_residual_mix(local_bufs[MIX_I]); + + if (!skip_aic_aiv) { + for (int i = 0; i < 2; i++) { + PTO2ResourceShape s = aic_aiv[i]; + dispatch_shape( + thread_idx, s, Phase::IDLE, local_bufs[static_cast(s)], tracker, entered_drain, made_progress, + try_pushed + ); + if (entered_drain) return; + } + } + + // Flush between IDLE and PENDING so PENDING-stage queue-size checks and any + // peer-thread reads see the IDLE-stage release_fanin output. + flush_local_bufs(); + + if (pmu_active) return; + + // ===== PENDING stage ===== + // MIX-PENDING gate: skip when a peer has an idle MIX-capable cluster — that + // peer's next IDLE-MIX iteration will pull the mix task from the global + // queue (already flushed above) at lower latency than us pre-loading a + // pending slot here. Forward progress for MIX is preserved: at least one + // thread will run MIX-IDLE next pass and consume the residual. + // + // The gate is NOT subject to skip_aic_aiv — residual mix continues to drain + // via pending slots on this thread when no peer is idle. + if (!has_idle_in_other_threads(thread_idx, PTO2ResourceShape::MIX)) { + dispatch_shape( + thread_idx, PTO2ResourceShape::MIX, Phase::PENDING, local_bufs[MIX_I], tracker, entered_drain, + made_progress, try_pushed + ); + if (entered_drain) return; + } + + // Re-check after MIX-PENDING. If MIX-IDLE already set skip_aic_aiv, leave + // it set; otherwise, escalate iff PENDING-MIX left residual. + if (!skip_aic_aiv && has_residual_mix(local_bufs[MIX_I])) { + skip_aic_aiv = true; + } + + // PENDING-MIX may have re-populated AIC/AIV local_bufs via release_fanin + // during in-flight completions; flush_guard ensures these don't carry + // across to the next iteration's IDLE stage. + if (skip_aic_aiv) return; + + // AIC/AIV-PENDING gate: a peer-idle skip is a delay, not a loss — the peer + // will pull from the global queue on its next IDLE pass. + for (int i = 0; i < 2; i++) { + PTO2ResourceShape s = aic_aiv[i]; + if (has_idle_in_other_threads(thread_idx, s)) continue; + dispatch_shape( + thread_idx, s, Phase::PENDING, local_bufs[static_cast(s)], tracker, entered_drain, made_progress, + try_pushed + ); + if (entered_drain) return; + } +} + // ============================================================================= // Main scheduler dispatch loop // ============================================================================= @@ -569,34 +692,9 @@ int32_t SchedulerContext::resolve_and_dispatch(Runtime *runtime, int32_t thread_ } } - // Phase 4: Two-phase dispatch (idle then pending) - const PTO2ResourceShape *dispatch_order = get_dispatch_order(thread_idx); - bool entered_drain = false; - - for (int32_t si = 0; si < PTO2_NUM_RESOURCE_SHAPES && !entered_drain; si++) { - PTO2ResourceShape shape = dispatch_order[si]; - for (auto phase : {CoreTracker::DispatchPhase::IDLE, CoreTracker::DispatchPhase::PENDING}) { - if (phase == CoreTracker::DispatchPhase::PENDING && unlikely(pmu_active)) break; - dispatch_shape( - thread_idx, shape, phase, local_bufs[static_cast(shape)], tracker, entered_drain, - made_progress, try_pushed - ); - } - } - - // Requeue local buffers to global ready queue - for (int32_t si = 0; si < PTO2_NUM_RESOURCE_SHAPES; si++) { - PTO2ResourceShape shape = dispatch_order[si]; - auto &local_buf = local_bufs[static_cast(shape)]; - auto &ready_queue = sched_->ready_queues[static_cast(shape)]; -#if PTO2_SCHED_PROFILING - l2_perf.local_overflow_count += local_buf.count; -#endif - if (local_buf.count > 0) { - ready_queue.push_batch(local_buf.slot_states, local_buf.count); - local_buf.count = 0; - } - } + // Phase 4: MIX-strict-priority dispatch with phase-split and + // cross-thread idle gating. See dispatch_ready_tasks for the policy. + dispatch_ready_tasks(thread_idx, tracker, local_bufs, pmu_active, made_progress, try_pushed); #if PTO2_PROFILING if (!try_pushed) { diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index 1c0b2785f..32887d0be 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -69,8 +69,8 @@ struct PTO2ReadyQueueSlot { * the start of each iteration (verified by always_assert). * * Phase 1 fills per-CoreType buffers via on_task_complete(). - * dispatch_ready_tasks_to_idle_cores drains them: local-first via - * get_ready_task_batch, then remaining tasks pushed to global readyQ. + * The dispatch stage drains them local-first via get_ready_tasks_batch, + * with any remaining tasks pushed to the global ready queue. */ // Number of CoreType values eligible for local dispatch (AIC=0, AIV=1) static constexpr int PTO2_LOCAL_DISPATCH_TYPE_NUM = 2; diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h index 9920f1f9c..8bfcd5037 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h @@ -220,7 +220,6 @@ class SchedulerContext { } return "?"; } - static const PTO2ResourceShape *get_dispatch_order(int32_t thread_idx); int pop_ready_tasks_batch( PTO2ResourceShape shape, int32_t thread_idx, PTO2LocalReadyBuffer &local_buf, PTO2TaskSlotState **out, @@ -253,6 +252,41 @@ class SchedulerContext { bool &try_pushed ); + // One pass of "Phase 4" in the resolve_and_dispatch loop: IDLE-stage dispatch + // for MIX then (if no mix residual) AIC/AIV; mid-flush of local buffers; then + // PENDING-stage dispatch with cross-thread idle gating. MIX is strictly + // prioritized — when mix residual is detected after MIX-IDLE, AIC/AIV are + // skipped for the whole pass but MIX-PENDING still runs. + // + // Forward-progress argument for AIC/AIV: skip_aic_aiv is sticky for the + // current pass only. The next loop iteration re-evaluates after Phase 1 + // completion polling and the global MIX queue draining (here or on any + // peer thread). AIC/AIV starvation is therefore bounded by MIX throughput, + // not unbounded — once mix completes on at least one cluster, the next + // pass either drains the residual or admits AIC/AIV. + void dispatch_ready_tasks( + Runtime *runtime, int32_t thread_idx, CoreTracker &tracker, + PTO2LocalReadyBuffer (&local_bufs)[PTO2_NUM_RESOURCE_SHAPES], bool pmu_active, bool &made_progress, + bool &try_pushed + ); + + // Returns true if any *other* scheduler thread currently has an idle core + // matching `shape`. Used as a scheduling hint on the PENDING dispatch path + // — see the implementation in scheduler_dispatch.cpp for the hint-semantics + // rationale and the safety argument against the drain worker. + bool has_idle_in_other_threads(int32_t self_thread_idx, PTO2ResourceShape shape) const; + + // True if mix tasks remain anywhere this thread could see them: the caller's + // MIX local LIFO stack or the global MIX ready queue. Approximate — + // PTO2ReadyQueue::size() (see pto_scheduler.h) snapshots its enqueue/dequeue + // positions with std::memory_order_relaxed and may interleave with concurrent + // push/pop. Don't confuse with PTO2SpscQueue::size(), which uses acquire + // loads — that one isn't on this path. A stale read here causes at most one + // extra/missed AIC/AIV skip and self-corrects on the next loop iteration. + bool has_residual_mix(const PTO2LocalReadyBuffer &mix_local_buf) const { + return mix_local_buf.count > 0 || sched_->ready_queues[static_cast(PTO2ResourceShape::MIX)].size() > 0; + } + // ========================================================================= // Completion & drain (scheduler_completion.cpp) // ========================================================================= diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp index 2ec664bfa..fefaf7694 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp @@ -52,18 +52,22 @@ const char *SchedulerContext::shape_name(PTO2ResourceShape shape) { return "UNKNOWN"; } -const PTO2ResourceShape *SchedulerContext::get_dispatch_order(int32_t thread_idx) { - static constexpr PTO2ResourceShape kEvenOrder[PTO2_NUM_RESOURCE_SHAPES] = { - PTO2ResourceShape::MIX, - PTO2ResourceShape::AIC, - PTO2ResourceShape::AIV, - }; - static constexpr PTO2ResourceShape kOddOrder[PTO2_NUM_RESOURCE_SHAPES] = { - PTO2ResourceShape::MIX, - PTO2ResourceShape::AIV, - PTO2ResourceShape::AIC, - }; - return (thread_idx % 2 == 0) ? kEvenOrder : kOddOrder; +bool SchedulerContext::has_idle_in_other_threads(int32_t self_thread_idx, PTO2ResourceShape shape) const { + // Cross-thread read of peer trackers without explicit synchronization. The + // backing `core_states_` is a naturally aligned uint64_t; aarch64 guarantees + // single-copy atomicity for an 8-byte aligned load, so no torn read. The + // value is consumed only as a scheduling *hint* — a stale read at worst + // causes one missed/extra pending dispatch, corrected on the next iteration. + // Drain-mode cross-thread writes are serialized by handle_drain_mode's ack + // barrier (all peers spin out of the dispatch path before any tracker + // mutation), so this routine is never racing the drain worker. + for (int32_t t = 0; t < active_sched_threads_; t++) { + if (t == self_thread_idx) continue; + if (core_trackers_[t].get_idle_core_offset_states(shape).has_value()) { + return true; + } + } + return false; } int SchedulerContext::pop_ready_tasks_batch( @@ -326,6 +330,125 @@ void SchedulerContext::dispatch_shape( } } +void SchedulerContext::dispatch_ready_tasks( + Runtime *runtime, int32_t thread_idx, CoreTracker &tracker, + PTO2LocalReadyBuffer (&local_bufs)[PTO2_NUM_RESOURCE_SHAPES], bool pmu_active, bool &made_progress, bool &try_pushed +) { + using Phase = CoreTracker::DispatchPhase; + constexpr int32_t MIX_I = static_cast(PTO2ResourceShape::MIX); + + // MIX is handled explicitly at the top of each stage; only AIC/AIV cycle + // through this 2-elem array, with order toggled by thread parity for + // shape-level load balancing across threads. + static constexpr PTO2ResourceShape kAicAivOrder[2][2] = { + {PTO2ResourceShape::AIC, PTO2ResourceShape::AIV}, + {PTO2ResourceShape::AIV, PTO2ResourceShape::AIC}, + }; + const PTO2ResourceShape *aic_aiv = kAicAivOrder[thread_idx & 1]; + +#if PTO2_SCHED_PROFILING + auto &l2_perf = sched_l2_perf_[thread_idx]; +#endif + + // Note: flush_local_bufs is invoked multiple times per pass (mid-function + // flush + RAII tail flush). local_overflow_count accumulates each batch + // separately — each entry is counted exactly once (count is zeroed after + // push_batch). The total reflects "entries this pass pushed to the global + // queue", which is slightly larger than the pre-refactor "buf residual at + // pass end" semantics — comparing PTO2_SCHED_PROFILING traces across + // commits, expect the post-refactor number to be greater-or-equal. + auto flush_local_bufs = [&]() { + for (int32_t s = 0; s < PTO2_NUM_RESOURCE_SHAPES; s++) { + auto &lb = local_bufs[s]; +#if PTO2_SCHED_PROFILING + l2_perf.local_overflow_count += lb.count; +#endif + if (lb.count > 0) { + sched_->ready_queues[s].push_batch(lb.slot_states, lb.count); + lb.count = 0; + } + } + }; + // Every return path below must flush; wrap in RAII so we cannot forget. + // The mid-function flush between IDLE and PENDING is still called + // explicitly — guard only covers exit. + struct FlushGuard { + decltype(flush_local_bufs) &flush_fn; + ~FlushGuard() { flush_fn(); } + } flush_guard{flush_local_bufs}; + + bool entered_drain = false; + + // ===== IDLE stage ===== + dispatch_shape( + runtime, thread_idx, PTO2ResourceShape::MIX, Phase::IDLE, local_bufs[MIX_I], tracker, entered_drain, + made_progress, try_pushed + ); + if (entered_drain) return; + + // MIX-IDLE residual: AIC/AIV (both IDLE and PENDING) yield for this pass. + // MIX-PENDING below still runs — that is the core of "mix strict priority": + // pending slots are spent on mix before AIC/AIV get any chance. + bool skip_aic_aiv = has_residual_mix(local_bufs[MIX_I]); + + if (!skip_aic_aiv) { + for (int i = 0; i < 2; i++) { + PTO2ResourceShape s = aic_aiv[i]; + dispatch_shape( + runtime, thread_idx, s, Phase::IDLE, local_bufs[static_cast(s)], tracker, entered_drain, + made_progress, try_pushed + ); + if (entered_drain) return; + } + } + + // Flush between IDLE and PENDING so PENDING-stage queue-size checks and any + // peer-thread reads see the IDLE-stage release_fanin output. + flush_local_bufs(); + + if (pmu_active) return; + + // ===== PENDING stage ===== + // MIX-PENDING gate: skip when a peer has an idle MIX-capable cluster — that + // peer's next IDLE-MIX iteration will pull the mix task from the global + // queue (already flushed above) at lower latency than us pre-loading a + // pending slot here. Forward progress for MIX is preserved: at least one + // thread will run MIX-IDLE next pass and consume the residual. + // + // The gate is NOT subject to skip_aic_aiv — residual mix continues to drain + // via pending slots on this thread when no peer is idle. + if (!has_idle_in_other_threads(thread_idx, PTO2ResourceShape::MIX)) { + dispatch_shape( + runtime, thread_idx, PTO2ResourceShape::MIX, Phase::PENDING, local_bufs[MIX_I], tracker, entered_drain, + made_progress, try_pushed + ); + if (entered_drain) return; + } + + // Re-check after MIX-PENDING. If MIX-IDLE already set skip_aic_aiv, leave + // it set; otherwise, escalate iff PENDING-MIX left residual. + if (!skip_aic_aiv && has_residual_mix(local_bufs[MIX_I])) { + skip_aic_aiv = true; + } + + // PENDING-MIX may have re-populated AIC/AIV local_bufs via release_fanin + // during in-flight completions; flush_guard ensures these don't carry + // across to the next iteration's IDLE stage. + if (skip_aic_aiv) return; + + // AIC/AIV-PENDING gate: a peer-idle skip is a delay, not a loss — the peer + // will pull from the global queue on its next IDLE pass. + for (int i = 0; i < 2; i++) { + PTO2ResourceShape s = aic_aiv[i]; + if (has_idle_in_other_threads(thread_idx, s)) continue; + dispatch_shape( + runtime, thread_idx, s, Phase::PENDING, local_bufs[static_cast(s)], tracker, entered_drain, + made_progress, try_pushed + ); + if (entered_drain) return; + } +} + // ============================================================================= // Main scheduler dispatch loop // ============================================================================= @@ -564,33 +687,10 @@ int32_t SchedulerContext::resolve_and_dispatch(Runtime *runtime, int32_t thread_ } } - // Phase 4: Two-phase dispatch (idle then pending) - const PTO2ResourceShape *dispatch_order = get_dispatch_order(thread_idx); - bool entered_drain = false; - - for (int32_t si = 0; si < PTO2_NUM_RESOURCE_SHAPES && !entered_drain; si++) { - PTO2ResourceShape shape = dispatch_order[si]; - for (auto phase : {CoreTracker::DispatchPhase::IDLE, CoreTracker::DispatchPhase::PENDING}) { - dispatch_shape( - runtime, thread_idx, shape, phase, local_bufs[static_cast(shape)], tracker, entered_drain, - made_progress, try_pushed - ); - } - } - - // Requeue local buffers to global ready queue - for (int32_t si = 0; si < PTO2_NUM_RESOURCE_SHAPES; si++) { - PTO2ResourceShape shape = dispatch_order[si]; - auto &local_buf = local_bufs[static_cast(shape)]; - auto &ready_queue = sched_->ready_queues[static_cast(shape)]; -#if PTO2_SCHED_PROFILING - l2_perf.local_overflow_count += local_buf.count; -#endif - if (local_buf.count > 0) { - ready_queue.push_batch(local_buf.slot_states, local_buf.count); - local_buf.count = 0; - } - } + // Phase 4: MIX-strict-priority dispatch with phase-split and + // cross-thread idle gating. See dispatch_ready_tasks for the policy. + const bool pmu_active = is_pmu_enabled(); + dispatch_ready_tasks(runtime, thread_idx, tracker, local_bufs, pmu_active, made_progress, try_pushed); #if PTO2_PROFILING if (!try_pushed) {