From 5521e6425848bd8e0baa0e08733ae38dd6bb0e07 Mon Sep 17 00:00:00 2001 From: Leo Stone Date: Thu, 4 Jun 2026 17:32:39 +0300 Subject: [PATCH] fix(core): stop long-running scheduled procedures from starving scheduled reducers The scheduler actor (`SchedulerActor::handle_queued`) awaited every scheduled function to completion before pulling the next due item from its `DelayQueue`. A scheduled `#[procedure]` that runs for a long time (e.g. one that calls `ctx.sleep_until` in a loop) therefore parked the actor and prevented every other due scheduled function -- reducers and procedures alike -- from being dispatched for as long as the procedure was alive. No error was logged; the reducer's schedule row simply never fired. Procedures already execute on their own pooled instances (`call_pooled`), separate from the main reducer executor, so awaiting them inline in the scheduler bought nothing but head-of-line blocking. Dispatch scheduled procedures on their own `tokio::spawn`ed task and let the actor loop keep draining the queue. Interval-scheduled procedures route their reschedule back to the actor through a new `SchedulerMessage::Reschedule`, since the spawned task cannot touch the actor-owned `queue`/`key_map`. Reducers keep their inline-await path (they cannot yield and run on the main executor, so this preserves their dispatch ordering). `Schedule` and the new `Reschedule` now share `enqueue_scheduled`, which removes any existing queued entry for the id before inserting -- without this, a row update or reschedule racing an already-queued entry would leak an orphaned `DelayQueue` entry and fire a duplicate dispatch. Consequence: scheduled procedures now run concurrently with scheduled reducers and with each other rather than strictly one-at-a-time. Transactional correctness is still enforced by the datastore's serializable isolation; only dispatch ordering relaxes. Concurrent execution remains bounded by the procedure instance pool. Co-Authored-By: Claude Opus 4.8 --- crates/core/src/host/scheduler.rs | 103 ++++++++++++++++++++++++------ 1 file changed, 85 insertions(+), 18 deletions(-) diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 45547ab16ed..192ea807a97 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -63,6 +63,14 @@ enum SchedulerMessage { function_name: String, args: FunctionArgs, }, + /// Re-enqueue an interval procedure after its spawned run finished. The spawned task + /// can't touch the actor's queue, so it sends the reschedule back through this message. + Reschedule { + id: ScheduledFunctionId, + function_name: Arc, + at: Timestamp, + real_at: Instant, + }, } #[derive(Clone)] @@ -72,13 +80,15 @@ pub struct Scheduler { pub struct SchedulerStarter { rx: mpsc::UnboundedReceiver>, + /// Sender clone for the [`SchedulerActor`], so spawned procedure tasks can reschedule. + tx: mpsc::UnboundedSender>, db: Arc, } impl Scheduler { pub fn open(db: Arc) -> (Self, SchedulerStarter) { let (tx, rx) = mpsc::unbounded_channel(); - (Scheduler { tx }, SchedulerStarter { rx, db }) + (Scheduler { tx: tx.clone() }, SchedulerStarter { rx, tx, db }) } } @@ -149,6 +159,7 @@ impl SchedulerStarter { tokio::spawn( SchedulerActor { rx: self.rx, + tx: self.tx, queue, key_map, module_host: module_host.downgrade(), @@ -271,6 +282,8 @@ impl Scheduler { struct SchedulerActor { rx: mpsc::UnboundedReceiver>, + /// Used by spawned procedure tasks to re-enqueue interval reschedules. + tx: mpsc::UnboundedSender>, queue: DelayQueue, key_map: FxHashMap, module_host: WeakModuleHost, @@ -348,19 +361,7 @@ impl SchedulerActor { effective_at, real_at, } => { - // Incase of row update, remove the existing entry from queue first - if let Some(key) = self.key_map.get(&id) { - self.queue.remove(key); - } - let key = self.queue.insert_at( - QueueItem::Id { - id, - function_name, - at: effective_at, - }, - real_at, - ); - self.key_map.insert(id, key); + self.enqueue_scheduled(id, function_name, effective_at, real_at); } SchedulerMessage::ScheduleImmediate { function_name, args } => { self.queue.insert( @@ -368,9 +369,40 @@ impl SchedulerActor { Duration::ZERO, ); } + SchedulerMessage::Reschedule { + id, + function_name, + at, + real_at, + } => { + self.enqueue_scheduled(id, function_name, at, real_at); + } } } + /// Insert (or replace) the queued entry for `id`, used by both `Schedule` and `Reschedule`. + fn enqueue_scheduled( + &mut self, + id: ScheduledFunctionId, + function_name: Arc, + at: Timestamp, + real_at: Instant, + ) { + // Incase of row update, remove the existing entry from queue first + if let Some(key) = self.key_map.get(&id) { + self.queue.remove(key); + } + let key = self.queue.insert_at( + QueueItem::Id { + id, + function_name, + at, + }, + real_at, + ); + self.key_map.insert(id, key); + } + async fn handle_queued(&mut self, id: Expired) { let item = id.into_inner(); let id = match &item { @@ -386,11 +418,46 @@ impl SchedulerActor { }; let params = ScheduledFunctionParams(item.clone()); - let result = match params.kind(module_host.info()) { - ScheduledFunctionKind::Procedure => module_host.call_scheduled_procedure(params).await, - ScheduledFunctionKind::Reducer => module_host.call_scheduled_reducer(params).await, - }; + match params.kind(module_host.info()) { + // Procedures can run for a long time (e.g. `sleep_until`) and execute on their own + // instance pool, so awaiting them inline here would starve every other due function. + // Spawn instead and keep draining the queue; interval reschedules come back via + // `SchedulerMessage::Reschedule`. (Scheduled procedures thus run concurrently rather + // than one-at-a-time; the datastore's serializable isolation still applies.) + ScheduledFunctionKind::Procedure => { + let tx = self.tx.clone(); + tokio::spawn(async move { + let result = module_host.call_scheduled_procedure(params).await; + if let Ok(CallScheduledFunctionResult { + reschedule: Some(Reschedule { at_ts, at_real }), + }) = result + { + if let QueueItem::Id { id, function_name, .. } = item { + let _ = tx.send(MsgOrExit::Msg(SchedulerMessage::Reschedule { + id, + function_name, + at: at_ts, + real_at: at_real, + })); + } + } + }); + } + // Reducers can't yield and run on the main executor, so await inline to keep order. + ScheduledFunctionKind::Reducer => { + let result = module_host.call_scheduled_reducer(params).await; + self.handle_result(item, result); + } + } + } + /// Handle the result of an inline (reducer) scheduled call, re-enqueueing it if it + /// is an interval schedule. + fn handle_result( + &mut self, + item: QueueItem, + result: Result, + ) { match result { // If the module already exited, leave the `ScheduledFunction` in // the database for when the module restarts.