Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 85 additions & 18 deletions crates/core/src/host/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ enum SchedulerMessage {
function_name: String,
args: FunctionArgs,
},
/// Re-enqueue an interval procedure after its spawned run finished. The spawned task
/// can't touch the actor's queue, so it sends the reschedule back through this message.
Reschedule {
id: ScheduledFunctionId,
function_name: Arc<str>,
at: Timestamp,
real_at: Instant,
},
}

#[derive(Clone)]
Expand All @@ -72,13 +80,15 @@ pub struct Scheduler {

pub struct SchedulerStarter {
rx: mpsc::UnboundedReceiver<MsgOrExit<SchedulerMessage>>,
/// Sender clone for the [`SchedulerActor`], so spawned procedure tasks can reschedule.
tx: mpsc::UnboundedSender<MsgOrExit<SchedulerMessage>>,
db: Arc<RelationalDB>,
}

impl Scheduler {
pub fn open(db: Arc<RelationalDB>) -> (Self, SchedulerStarter) {
let (tx, rx) = mpsc::unbounded_channel();
(Scheduler { tx }, SchedulerStarter { rx, db })
(Scheduler { tx: tx.clone() }, SchedulerStarter { rx, tx, db })
}
}

Expand Down Expand Up @@ -149,6 +159,7 @@ impl SchedulerStarter {
tokio::spawn(
SchedulerActor {
rx: self.rx,
tx: self.tx,
queue,
key_map,
module_host: module_host.downgrade(),
Expand Down Expand Up @@ -271,6 +282,8 @@ impl Scheduler {

struct SchedulerActor {
rx: mpsc::UnboundedReceiver<MsgOrExit<SchedulerMessage>>,
/// Used by spawned procedure tasks to re-enqueue interval reschedules.
tx: mpsc::UnboundedSender<MsgOrExit<SchedulerMessage>>,
queue: DelayQueue<QueueItem>,
key_map: FxHashMap<ScheduledFunctionId, delay_queue::Key>,
module_host: WeakModuleHost,
Expand Down Expand Up @@ -348,29 +361,48 @@ impl SchedulerActor {
effective_at,
real_at,
} => {
// Incase of row update, remove the existing entry from queue first

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function was just encapsulated in enqueue_scheduled to be reused

if let Some(key) = self.key_map.get(&id) {
self.queue.remove(key);
}
let key = self.queue.insert_at(
QueueItem::Id {
id,
function_name,
at: effective_at,
},
real_at,
);
self.key_map.insert(id, key);
self.enqueue_scheduled(id, function_name, effective_at, real_at);
}
SchedulerMessage::ScheduleImmediate { function_name, args } => {
self.queue.insert(
QueueItem::VolatileNonatomicImmediate { function_name, args },
Duration::ZERO,
);
}
SchedulerMessage::Reschedule {
id,
function_name,
at,
real_at,
} => {
self.enqueue_scheduled(id, function_name, at, real_at);
}
}
}

/// Insert (or replace) the queued entry for `id`, used by both `Schedule` and `Reschedule`.
fn enqueue_scheduled(
&mut self,
id: ScheduledFunctionId,
function_name: Arc<str>,
at: Timestamp,
real_at: Instant,
) {
// Incase of row update, remove the existing entry from queue first
if let Some(key) = self.key_map.get(&id) {
self.queue.remove(key);
}
let key = self.queue.insert_at(
QueueItem::Id {
id,
function_name,
at,
},
real_at,
);
self.key_map.insert(id, key);
}

async fn handle_queued(&mut self, id: Expired<QueueItem>) {
let item = id.into_inner();
let id = match &item {
Expand All @@ -386,11 +418,46 @@ impl SchedulerActor {
};

let params = ScheduledFunctionParams(item.clone());
let result = match params.kind(module_host.info()) {
ScheduledFunctionKind::Procedure => module_host.call_scheduled_procedure(params).await,
ScheduledFunctionKind::Reducer => module_host.call_scheduled_reducer(params).await,
};
match params.kind(module_host.info()) {
// Procedures can run for a long time (e.g. `sleep_until`) and execute on their own
// instance pool, so awaiting them inline here would starve every other due function.
// Spawn instead and keep draining the queue; interval reschedules come back via
// `SchedulerMessage::Reschedule`. (Scheduled procedures thus run concurrently rather
// than one-at-a-time; the datastore's serializable isolation still applies.)
ScheduledFunctionKind::Procedure => {
let tx = self.tx.clone();
tokio::spawn(async move {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tokio::spawn runs on the host/control runtime, not the per-database executor — but it's only the await-coordinator. The actual wasm work is dispatched to the database's SingleThreadedExecutor inside call_scheduled_procedure → call_pooled → run_async_job. So procedure execution stays on the per-DB pool (bounded by the procedure-instance semaphore); this task just waits for the result and forwards the interval reschedule.

let result = module_host.call_scheduled_procedure(params).await;
if let Ok(CallScheduledFunctionResult {
reschedule: Some(Reschedule { at_ts, at_real }),
}) = result
{
if let QueueItem::Id { id, function_name, .. } = item {
let _ = tx.send(MsgOrExit::Msg(SchedulerMessage::Reschedule {
id,
function_name,
at: at_ts,
real_at: at_real,
}));
}
}
});
}
// Reducers can't yield and run on the main executor, so await inline to keep order.
ScheduledFunctionKind::Reducer => {
let result = module_host.call_scheduled_reducer(params).await;
self.handle_result(item, result);
}
}
}

/// Handle the result of an inline (reducer) scheduled call, re-enqueueing it if it
/// is an interval schedule.
fn handle_result(
&mut self,
item: QueueItem,
result: Result<CallScheduledFunctionResult, CallScheduledFunctionError>,
) {
match result {
// If the module already exited, leave the `ScheduledFunction` in
// the database for when the module restarts.
Expand Down