Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ struct PTO2ReadyQueueSlot {
* the start of each iteration (verified by always_assert).
*
* Phase 1 fills per-CoreType buffers via on_task_complete().
* dispatch_ready_tasks_to_idle_cores drains them: local-first via
* get_ready_task_batch, then remaining tasks pushed to global readyQ.
* The dispatch stage drains them local-first via get_ready_tasks_batch,
* with any remaining tasks pushed to the global ready queue.
*/
// Number of CoreType values eligible for local dispatch (AIC=0, AIV=1)
static constexpr int PTO2_LOCAL_DISPATCH_TYPE_NUM = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ class SchedulerContext {
}
return "?";
}
static const PTO2ResourceShape *get_dispatch_order(int32_t thread_idx);

int pop_ready_tasks_batch(
PTO2ResourceShape shape, int32_t thread_idx, PTO2LocalReadyBuffer &local_buf, PTO2TaskSlotState **out,
Expand Down Expand Up @@ -250,6 +249,40 @@ class SchedulerContext {
CoreTracker &tracker, bool &entered_drain, bool &made_progress, bool &try_pushed
);

// One pass of "Phase 4" in the resolve_and_dispatch loop: IDLE-stage dispatch
// for MIX then (if no mix residual) AIC/AIV; mid-flush of local buffers; then
// PENDING-stage dispatch with cross-thread idle gating. MIX is strictly
// prioritized — when mix residual is detected after MIX-IDLE, AIC/AIV are
// skipped for the whole pass but MIX-PENDING still runs.
//
// Forward-progress argument for AIC/AIV: skip_aic_aiv is sticky for the
// current pass only. The next loop iteration re-evaluates after Phase 1
// completion polling and the global MIX queue draining (here or on any
// peer thread). AIC/AIV starvation is therefore bounded by MIX throughput,
// not unbounded — once mix completes on at least one cluster, the next
// pass either drains the residual or admits AIC/AIV.
void dispatch_ready_tasks(
int32_t thread_idx, CoreTracker &tracker, PTO2LocalReadyBuffer (&local_bufs)[PTO2_NUM_RESOURCE_SHAPES],
bool pmu_active, bool &made_progress, bool &try_pushed
);

// Returns true if any *other* scheduler thread currently has an idle core
// matching `shape`. Used as a scheduling hint on the PENDING dispatch path
// — see the implementation in scheduler_dispatch.cpp for the hint-semantics
// rationale and the safety argument against the drain worker.
bool has_idle_in_other_threads(int32_t self_thread_idx, PTO2ResourceShape shape) const;

// True if mix tasks remain anywhere this thread could see them: the caller's
// MIX local LIFO stack or the global MIX ready queue. Approximate —
// PTO2ReadyQueue::size() (see pto_scheduler.h) snapshots its enqueue/dequeue
// positions with std::memory_order_relaxed and may interleave with concurrent
// push/pop. Don't confuse with PTO2SpscQueue::size(), which uses acquire
// loads — that one isn't on this path. A stale read here causes at most one
// extra/missed AIC/AIV skip and self-corrects on the next loop iteration.
bool has_residual_mix(const PTO2LocalReadyBuffer &mix_local_buf) const {
return mix_local_buf.count > 0 || sched_->ready_queues[static_cast<int32_t>(PTO2ResourceShape::MIX)].size() > 0;
}

// =========================================================================
// Completion & drain (scheduler_completion.cpp)
// =========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,22 @@ const char *SchedulerContext::shape_name(PTO2ResourceShape shape) {
return "UNKNOWN";
}

const PTO2ResourceShape *SchedulerContext::get_dispatch_order(int32_t thread_idx) {
static constexpr PTO2ResourceShape kEvenOrder[PTO2_NUM_RESOURCE_SHAPES] = {
PTO2ResourceShape::MIX,
PTO2ResourceShape::AIC,
PTO2ResourceShape::AIV,
};
static constexpr PTO2ResourceShape kOddOrder[PTO2_NUM_RESOURCE_SHAPES] = {
PTO2ResourceShape::MIX,
PTO2ResourceShape::AIV,
PTO2ResourceShape::AIC,
};
return (thread_idx % 2 == 0) ? kEvenOrder : kOddOrder;
bool SchedulerContext::has_idle_in_other_threads(int32_t self_thread_idx, PTO2ResourceShape shape) const {
// Cross-thread read of peer trackers without explicit synchronization. The
// backing `core_states_` is a naturally aligned uint64_t; aarch64 guarantees
// single-copy atomicity for an 8-byte aligned load, so no torn read. The
// value is consumed only as a scheduling *hint* — a stale read at worst
// causes one missed/extra pending dispatch, corrected on the next iteration.
// Drain-mode cross-thread writes are serialized by handle_drain_mode's ack
// barrier (all peers spin out of the dispatch path before any tracker
// mutation), so this routine is never racing the drain worker.
for (int32_t t = 0; t < active_sched_threads_; t++) {
if (t == self_thread_idx) continue;
if (core_trackers_[t].get_idle_core_offset_states(shape).has_value()) {
return true;
}
}
return false;
}
Comment thread
poursoul marked this conversation as resolved.

int SchedulerContext::pop_ready_tasks_batch(
Expand Down Expand Up @@ -325,6 +329,125 @@ void SchedulerContext::dispatch_shape(
}
}

void SchedulerContext::dispatch_ready_tasks(
int32_t thread_idx, CoreTracker &tracker, PTO2LocalReadyBuffer (&local_bufs)[PTO2_NUM_RESOURCE_SHAPES],
bool pmu_active, bool &made_progress, bool &try_pushed
) {
using Phase = CoreTracker::DispatchPhase;
constexpr int32_t MIX_I = static_cast<int32_t>(PTO2ResourceShape::MIX);

// MIX is handled explicitly at the top of each stage; only AIC/AIV cycle
// through this 2-elem array, with order toggled by thread parity for
// shape-level load balancing across threads.
static constexpr PTO2ResourceShape kAicAivOrder[2][2] = {
{PTO2ResourceShape::AIC, PTO2ResourceShape::AIV},
{PTO2ResourceShape::AIV, PTO2ResourceShape::AIC},
};
const PTO2ResourceShape *aic_aiv = kAicAivOrder[thread_idx & 1];

#if PTO2_SCHED_PROFILING
auto &l2_perf = sched_l2_perf_[thread_idx];
#endif

// Note: flush_local_bufs is invoked multiple times per pass (mid-function
// flush + RAII tail flush). local_overflow_count accumulates each batch
// separately — each entry is counted exactly once (count is zeroed after
// push_batch). The total reflects "entries this pass pushed to the global
// queue", which is slightly larger than the pre-refactor "buf residual at
// pass end" semantics — comparing PTO2_SCHED_PROFILING traces across
// commits, expect the post-refactor number to be greater-or-equal.
auto flush_local_bufs = [&]() {
for (int32_t s = 0; s < PTO2_NUM_RESOURCE_SHAPES; s++) {
auto &lb = local_bufs[s];
#if PTO2_SCHED_PROFILING
l2_perf.local_overflow_count += lb.count;
#endif
if (lb.count > 0) {
sched_->ready_queues[s].push_batch(lb.slot_states, lb.count);
lb.count = 0;
}
}
};
// Every return path below must flush; wrap in RAII so we cannot forget.
// The mid-function flush between IDLE and PENDING is still called
// explicitly — guard only covers exit.
struct FlushGuard {
decltype(flush_local_bufs) &flush_fn;
~FlushGuard() { flush_fn(); }
} flush_guard{flush_local_bufs};

bool entered_drain = false;

// ===== IDLE stage =====
dispatch_shape(
thread_idx, PTO2ResourceShape::MIX, Phase::IDLE, local_bufs[MIX_I], tracker, entered_drain, made_progress,
try_pushed
);
if (entered_drain) return;

// MIX-IDLE residual: AIC/AIV (both IDLE and PENDING) yield for this pass.
// MIX-PENDING below still runs — that is the core of "mix strict priority":
// pending slots are spent on mix before AIC/AIV get any chance.
bool skip_aic_aiv = has_residual_mix(local_bufs[MIX_I]);

if (!skip_aic_aiv) {
for (int i = 0; i < 2; i++) {
PTO2ResourceShape s = aic_aiv[i];
dispatch_shape(
thread_idx, s, Phase::IDLE, local_bufs[static_cast<int32_t>(s)], tracker, entered_drain, made_progress,
try_pushed
);
if (entered_drain) return;
}
}

// Flush between IDLE and PENDING so PENDING-stage queue-size checks and any
// peer-thread reads see the IDLE-stage release_fanin output.
flush_local_bufs();

if (pmu_active) return;

// ===== PENDING stage =====
// MIX-PENDING gate: skip when a peer has an idle MIX-capable cluster — that
// peer's next IDLE-MIX iteration will pull the mix task from the global
// queue (already flushed above) at lower latency than us pre-loading a
// pending slot here. Forward progress for MIX is preserved: at least one
// thread will run MIX-IDLE next pass and consume the residual.
//
// The gate is NOT subject to skip_aic_aiv — residual mix continues to drain
// via pending slots on this thread when no peer is idle.
if (!has_idle_in_other_threads(thread_idx, PTO2ResourceShape::MIX)) {
dispatch_shape(
thread_idx, PTO2ResourceShape::MIX, Phase::PENDING, local_bufs[MIX_I], tracker, entered_drain,
made_progress, try_pushed
);
if (entered_drain) return;
}

// Re-check after MIX-PENDING. If MIX-IDLE already set skip_aic_aiv, leave
// it set; otherwise, escalate iff PENDING-MIX left residual.
if (!skip_aic_aiv && has_residual_mix(local_bufs[MIX_I])) {
skip_aic_aiv = true;
}

// PENDING-MIX may have re-populated AIC/AIV local_bufs via release_fanin
// during in-flight completions; flush_guard ensures these don't carry
// across to the next iteration's IDLE stage.
if (skip_aic_aiv) return;

// AIC/AIV-PENDING gate: a peer-idle skip is a delay, not a loss — the peer
// will pull from the global queue on its next IDLE pass.
for (int i = 0; i < 2; i++) {
PTO2ResourceShape s = aic_aiv[i];
if (has_idle_in_other_threads(thread_idx, s)) continue;
dispatch_shape(
thread_idx, s, Phase::PENDING, local_bufs[static_cast<int32_t>(s)], tracker, entered_drain, made_progress,
try_pushed
);
if (entered_drain) return;
}
}

// =============================================================================
// Main scheduler dispatch loop
// =============================================================================
Expand Down Expand Up @@ -569,34 +692,9 @@ int32_t SchedulerContext::resolve_and_dispatch(Runtime *runtime, int32_t thread_
}
}

// Phase 4: Two-phase dispatch (idle then pending)
const PTO2ResourceShape *dispatch_order = get_dispatch_order(thread_idx);
bool entered_drain = false;

for (int32_t si = 0; si < PTO2_NUM_RESOURCE_SHAPES && !entered_drain; si++) {
PTO2ResourceShape shape = dispatch_order[si];
for (auto phase : {CoreTracker::DispatchPhase::IDLE, CoreTracker::DispatchPhase::PENDING}) {
if (phase == CoreTracker::DispatchPhase::PENDING && unlikely(pmu_active)) break;
dispatch_shape(
thread_idx, shape, phase, local_bufs[static_cast<int32_t>(shape)], tracker, entered_drain,
made_progress, try_pushed
);
}
}

// Requeue local buffers to global ready queue
for (int32_t si = 0; si < PTO2_NUM_RESOURCE_SHAPES; si++) {
PTO2ResourceShape shape = dispatch_order[si];
auto &local_buf = local_bufs[static_cast<int32_t>(shape)];
auto &ready_queue = sched_->ready_queues[static_cast<int32_t>(shape)];
#if PTO2_SCHED_PROFILING
l2_perf.local_overflow_count += local_buf.count;
#endif
if (local_buf.count > 0) {
ready_queue.push_batch(local_buf.slot_states, local_buf.count);
local_buf.count = 0;
}
}
// Phase 4: MIX-strict-priority dispatch with phase-split and
// cross-thread idle gating. See dispatch_ready_tasks for the policy.
dispatch_ready_tasks(thread_idx, tracker, local_bufs, pmu_active, made_progress, try_pushed);

#if PTO2_PROFILING
if (!try_pushed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ struct PTO2ReadyQueueSlot {
* the start of each iteration (verified by always_assert).
*
* Phase 1 fills per-CoreType buffers via on_task_complete().
* dispatch_ready_tasks_to_idle_cores drains them: local-first via
* get_ready_task_batch, then remaining tasks pushed to global readyQ.
* The dispatch stage drains them local-first via get_ready_tasks_batch,
* with any remaining tasks pushed to the global ready queue.
*/
// Number of CoreType values eligible for local dispatch (AIC=0, AIV=1)
static constexpr int PTO2_LOCAL_DISPATCH_TYPE_NUM = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ class SchedulerContext {
}
return "?";
}
static const PTO2ResourceShape *get_dispatch_order(int32_t thread_idx);

int pop_ready_tasks_batch(
PTO2ResourceShape shape, int32_t thread_idx, PTO2LocalReadyBuffer &local_buf, PTO2TaskSlotState **out,
Expand Down Expand Up @@ -253,6 +252,41 @@ class SchedulerContext {
bool &try_pushed
);

// One pass of "Phase 4" in the resolve_and_dispatch loop: IDLE-stage dispatch
// for MIX then (if no mix residual) AIC/AIV; mid-flush of local buffers; then
// PENDING-stage dispatch with cross-thread idle gating. MIX is strictly
// prioritized — when mix residual is detected after MIX-IDLE, AIC/AIV are
// skipped for the whole pass but MIX-PENDING still runs.
//
// Forward-progress argument for AIC/AIV: skip_aic_aiv is sticky for the
// current pass only. The next loop iteration re-evaluates after Phase 1
// completion polling and the global MIX queue draining (here or on any
// peer thread). AIC/AIV starvation is therefore bounded by MIX throughput,
// not unbounded — once mix completes on at least one cluster, the next
// pass either drains the residual or admits AIC/AIV.
void dispatch_ready_tasks(
Runtime *runtime, int32_t thread_idx, CoreTracker &tracker,
PTO2LocalReadyBuffer (&local_bufs)[PTO2_NUM_RESOURCE_SHAPES], bool pmu_active, bool &made_progress,
bool &try_pushed
);

// Returns true if any *other* scheduler thread currently has an idle core
// matching `shape`. Used as a scheduling hint on the PENDING dispatch path
// — see the implementation in scheduler_dispatch.cpp for the hint-semantics
// rationale and the safety argument against the drain worker.
bool has_idle_in_other_threads(int32_t self_thread_idx, PTO2ResourceShape shape) const;

// True if mix tasks remain anywhere this thread could see them: the caller's
// MIX local LIFO stack or the global MIX ready queue. Approximate —
// PTO2ReadyQueue::size() (see pto_scheduler.h) snapshots its enqueue/dequeue
// positions with std::memory_order_relaxed and may interleave with concurrent
// push/pop. Don't confuse with PTO2SpscQueue::size(), which uses acquire
// loads — that one isn't on this path. A stale read here causes at most one
// extra/missed AIC/AIV skip and self-corrects on the next loop iteration.
bool has_residual_mix(const PTO2LocalReadyBuffer &mix_local_buf) const {
return mix_local_buf.count > 0 || sched_->ready_queues[static_cast<int32_t>(PTO2ResourceShape::MIX)].size() > 0;
}

// =========================================================================
// Completion & drain (scheduler_completion.cpp)
// =========================================================================
Expand Down
Loading
Loading