Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ use tokio::sync::{Mutex, Notify};
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;

use crate::task::{IoBudget, TaskRecord};

use crate::backpressure::{CompositePressure, ThrottlePolicy};
use crate::priority::Priority;
use crate::registry::TaskTypeRegistry;
Expand All @@ -50,6 +52,16 @@ use crate::store::TaskStore;
use dispatch::ActiveTaskMap;

pub use builder::SchedulerBuilder;

// ── Completion coalescing ──────────────────────────────────────────

/// Message sent from spawned tasks to the scheduler's completion channel.
///
/// Batched by `drain_completions` to reduce per-completion transaction overhead.
pub(crate) struct CompletionMsg {
pub task: TaskRecord,
pub metrics: IoBudget,
}
pub use event::{
SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader,
};
Expand Down Expand Up @@ -121,6 +133,12 @@ pub(crate) struct SchedulerInner {
/// Cleared when `paused_tasks()` returns empty. Avoids a SQL round-trip
/// per dispatch cycle when no tasks are paused.
pub(crate) has_paused_tasks: AtomicBool,
/// Send side of the completion coalescing channel.
pub(crate) completion_tx: tokio::sync::mpsc::UnboundedSender<CompletionMsg>,
/// Receive side, `Arc`-wrapped so spawned tasks can try to drain the batch
/// (leader election pattern) in addition to the run loop.
pub(crate) completion_rx:
std::sync::Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<CompletionMsg>>>,
}

/// IO-aware priority scheduler.
Expand Down Expand Up @@ -216,6 +234,7 @@ impl Scheduler {
);
let (event_tx, _) = tokio::sync::broadcast::channel(256);
let (progress_tx, _) = tokio::sync::broadcast::channel(64);
let (completion_tx, completion_rx) = tokio::sync::mpsc::unbounded_channel();
Self {
inner: Arc::new(SchedulerInner {
store,
Expand Down Expand Up @@ -249,6 +268,8 @@ impl Scheduler {
module_running,
// Conservative: true on startup so the first cycle checks.
has_paused_tasks: AtomicBool::new(true),
completion_tx,
completion_rx: std::sync::Arc::new(Mutex::new(completion_rx)),
}),
}
}
Expand Down
37 changes: 37 additions & 0 deletions src/scheduler/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ impl Scheduler {
module_running: Arc::clone(&self.inner.module_running),
module_state: Arc::clone(&self.inner.module_state),
module_registry: Arc::clone(&self.inner.module_registry),
completion_tx: self.inner.completion_tx.clone(),
completion_rx: self.inner.completion_rx.clone(),
}
}

Expand Down Expand Up @@ -291,12 +293,44 @@ impl Scheduler {
}
}

/// Drain the completion channel and process all queued completions in a
/// single batched transaction.
async fn drain_completions(&self) {
let mut batch = Vec::new();
{
let mut rx = self.inner.completion_rx.lock().await;
while let Ok(msg) = rx.try_recv() {
batch.push(msg);
}
}

if batch.is_empty() {
return;
}

tracing::debug!(count = batch.len(), "draining completion batch");
spawn::process_completion_batch(
&batch,
&self.inner.store,
&self.inner.event_tx,
&self.inner.active,
self.inner.max_retries,
&self.inner.work_notify,
)
.await;
}

/// Resume paused tasks, dispatch finalizers, and dispatch pending work.
async fn poll_and_dispatch(&self) {
if self.is_paused() {
return;
}

// Drain queued completions before dispatching new work.
// This ensures completed tasks are processed first, freeing
// dependency edges and unblocking downstream tasks.
self.drain_completions().await;

// Run expiry sweep before dispatching.
self.maybe_expire_tasks().await;

Expand Down Expand Up @@ -382,6 +416,9 @@ impl Scheduler {
}
}

// Drain any remaining completions before closing the store.
self.drain_completions().await;

// Flush WAL and close the database.
self.inner.store.close().await;
}
Expand Down
3 changes: 3 additions & 0 deletions src/scheduler/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::task::TaskRecord;
use super::dispatch::ActiveTask;
use super::SchedulerEvent;

pub(in crate::scheduler) use completion::process_completion_batch;
pub(crate) use context::SpawnContext;

/// Whether to call `execute` or `finalize` on the executor.
Expand Down Expand Up @@ -78,6 +79,8 @@ pub(crate) async fn spawn_task(
event_tx: ctx.event_tx.clone(),
work_notify: ctx.work_notify.clone(),
max_retries: ctx.max_retries,
completion_tx: ctx.completion_tx.clone(),
completion_rx: ctx.completion_rx.clone(),
};
let failure_deps = failure::FailureDeps {
store: ctx.store,
Expand Down
157 changes: 109 additions & 48 deletions src/scheduler/spawn/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::store::TaskStore;
use crate::task::{IoBudget, TaskRecord};

use super::super::dispatch::ActiveTaskMap;
use super::super::SchedulerEvent;
use super::super::{CompletionMsg, SchedulerEvent};
use super::parent::handle_parent_resolution;
use super::ExecutionPhase;

Expand All @@ -17,13 +17,15 @@ pub(crate) struct CompletionDeps {
pub event_tx: tokio::sync::broadcast::Sender<SchedulerEvent>,
pub work_notify: Arc<tokio::sync::Notify>,
pub max_retries: i32,
pub completion_tx: tokio::sync::mpsc::UnboundedSender<CompletionMsg>,
pub completion_rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<CompletionMsg>>>,
}

/// Handle a successful task execution.
///
/// For the execute phase, checks if the task spawned children (transition to
/// waiting). Otherwise records completion, resolves dependents, and handles
/// recurring re-enqueue.
/// waiting). Otherwise sends a [`CompletionMsg`] to the coalescing channel
/// for batched processing in `drain_completions`.
pub(crate) async fn handle_success(
task: &TaskRecord,
phase: ExecutionPhase,
Expand Down Expand Up @@ -72,58 +74,117 @@ pub(crate) async fn handle_success(
}
}

match deps
.store
.complete_with_record_and_resolve(task, metrics)
.await
{
Ok((recurring_info, unblocked)) => {
// Emit recurring event if this was a recurring task.
if task.recurring_interval_secs.is_some() {
let (next_run, exec_count) = match recurring_info {
Some((next, count)) => (Some(next), count),
None => (None, task.recurring_execution_count + 1),
};
let _ = deps.event_tx.send(SchedulerEvent::RecurringCompleted {
header: task.event_header(),
execution_count: exec_count,
next_run,
});
}
// Send completion to the coalescing channel.
let msg = CompletionMsg {
task: task.clone(),
metrics: *metrics,
};

// Decrement module counter and remove from active map eagerly so that
// concurrency slots are freed before the batch commits.
decrement_module();
deps.active.remove(task_id);

if deps.completion_tx.send(msg).is_err() {
tracing::error!(task_id, "completion channel closed — processing inline");
if let Err(e) = deps
.store
.complete_with_record_and_resolve(task, metrics)
.await
{
tracing::error!(task_id, error = %e, "inline completion failed");
}
return;
}

// Leader election: try to grab the rx lock and drain the batch.
// Under high concurrency, only one task wins — it processes the batch
// for everyone. Others just wake the scheduler and return.
if let Ok(mut rx) = deps.completion_rx.try_lock() {
let mut batch = Vec::new();
while let Ok(m) = rx.try_recv() {
batch.push(m);
}
drop(rx);

if !batch.is_empty() {
process_completion_batch(
&batch,
&deps.store,
&deps.event_tx,
&deps.active,
deps.max_retries,
&deps.work_notify,
)
.await;
}
}

// Wake the scheduler to dispatch new work (and drain any stragglers).
deps.work_notify.notify_one();
}

// Remove from active tracking AFTER the store write completes.
decrement_module();
deps.active.remove(task_id);
let _ = deps
.event_tx
.send(SchedulerEvent::Completed(task.event_header()));

// Emit unblocked events for resolved dependents.
for uid in &unblocked {
let _ = deps
.event_tx
.send(SchedulerEvent::TaskUnblocked { task_id: *uid });
/// Process a batch of completions: write to store, emit events, resolve parents.
///
/// Called from both the spawned task (leader election) and the run loop
/// (`drain_completions`). Handles parent resolution after the batch commits
/// since child records must be removed from the DB before the parent
/// resolution check.
pub(in crate::scheduler) async fn process_completion_batch(
batch: &[CompletionMsg],
store: &TaskStore,
event_tx: &tokio::sync::broadcast::Sender<SchedulerEvent>,
active: &ActiveTaskMap,
max_retries: i32,
work_notify: &Arc<tokio::sync::Notify>,
) {
let items: Vec<(&TaskRecord, &IoBudget)> =
batch.iter().map(|m| (&m.task, &m.metrics)).collect();

match store.complete_batch_with_resolve(&items).await {
Ok(results) => {
for (msg, (_task_id, recurring_info, unblocked)) in batch.iter().zip(results) {
emit_completion_events(&msg.task, recurring_info, &unblocked, event_tx);
}
}
Err(e) => {
tracing::error!(task_id, error = %e, "failed to complete task and resolve dependents");
decrement_module();
deps.active.remove(task_id);
tracing::error!(error = %e, "batch completion failed");
return;
}
}

deps.work_notify.notify_one();
// Parent resolution: now that the batch has committed (child records are
// removed from `tasks`), check if any parent is ready for finalization.
for msg in batch {
if let Some(parent_id) = msg.task.parent_id {
handle_parent_resolution(parent_id, store, active, event_tx, max_retries, work_notify)
.await;
}
}
}

/// Emit completion, recurring, and unblocked events for a single task.
pub(in crate::scheduler) fn emit_completion_events(
task: &TaskRecord,
recurring_info: Option<(chrono::DateTime<chrono::Utc>, i64)>,
unblocked: &[i64],
event_tx: &tokio::sync::broadcast::Sender<SchedulerEvent>,
) {
if task.recurring_interval_secs.is_some() {
let (next_run, exec_count) = match recurring_info {
Some((next, count)) => (Some(next), count),
None => (None, task.recurring_execution_count + 1),
};
let _ = event_tx.send(SchedulerEvent::RecurringCompleted {
header: task.event_header(),
execution_count: exec_count,
next_run,
});
}

let _ = event_tx.send(SchedulerEvent::Completed(task.event_header()));

// If this was a child task, check if parent is ready.
if let Some(parent_id) = task.parent_id {
handle_parent_resolution(
parent_id,
&deps.store,
&deps.active,
&deps.event_tx,
deps.max_retries,
&deps.work_notify,
)
.await;
for uid in unblocked {
let _ = event_tx.send(SchedulerEvent::TaskUnblocked { task_id: *uid });
}
}
6 changes: 6 additions & 0 deletions src/scheduler/spawn/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ pub(crate) struct SpawnContext {
/// Registry of all registered modules — shared with spawned tasks so they can
/// construct [`ModuleHandle`](crate::module::ModuleHandle) instances.
pub module_registry: Arc<crate::module::ModuleRegistry>,
/// Completion coalescing channel sender.
pub completion_tx: tokio::sync::mpsc::UnboundedSender<super::super::CompletionMsg>,
/// Completion coalescing channel receiver (Arc-wrapped for leader election).
pub completion_rx: std::sync::Arc<
tokio::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<super::super::CompletionMsg>>,
>,
}

/// Output of task context construction — everything needed to insert into the
Expand Down
43 changes: 43 additions & 0 deletions src/store/lifecycle/transitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,49 @@ impl TaskStore {
Ok((recurring_info, unblocked))
}

/// Complete a batch of tasks and resolve their dependents in a single transaction.
///
/// Coalesces N individual `complete_with_record_and_resolve` calls into one
/// `BEGIN IMMEDIATE` / `COMMIT` cycle, amortizing SQLite's WAL sync overhead.
///
/// Returns a vec of `(task_id, recurring_info, unblocked_ids)` per item.
pub async fn complete_batch_with_resolve(
&self,
items: &[(&crate::task::TaskRecord, &IoBudget)],
) -> Result<Vec<(i64, Option<(chrono::DateTime<chrono::Utc>, i64)>, Vec<i64>)>, StoreError>
{
if items.is_empty() {
return Ok(Vec::new());
}

// Single-item fast path: avoid the batch overhead.
if items.len() == 1 {
let (task, metrics) = items[0];
let (recurring, unblocked) =
self.complete_with_record_and_resolve(task, metrics).await?;
return Ok(vec![(task.id, recurring, unblocked)]);
}

tracing::debug!(count = items.len(), "store.complete_batch: BEGIN tx");
let mut conn = self.begin_write().await?;

let mut results = Vec::with_capacity(items.len());
for (task, metrics) in items {
let recurring = Self::complete_inner(&mut conn, task, metrics).await?;
let unblocked = Self::resolve_dependents_inner(&mut conn, task.id).await?;
results.push((task.id, recurring, unblocked));
}

sqlx::query("COMMIT").execute(&mut *conn).await?;
drop(conn);
tracing::debug!(count = items.len(), "store.complete_batch: COMMIT ok");

// Prune once for the whole batch.
self.maybe_prune().await;

Ok(results)
}

/// Shared completion logic: insert history, handle recurring next instance,
/// then handle requeue or delete.
///
Expand Down
Loading