From 107d108f65f51d901f76b645c714fba2f3a26d0e Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 15 Apr 2026 11:34:42 +0200 Subject: [PATCH] durability: Simplify shutdown We switched to using `async-channel` as the backing queue in 2b3aa5a, which allows to close the channel on the sender side without dropping the sender. This is unlike tokio channels, which require the sender to be dropped, or the receiver to request the channel to be closed. This allows to greatly simplify the async shutdown of a durability instance by simply awaiting the actor's `JoinHandle` instead of a complicated chain of notification primitives. --- crates/durability/src/imp/local.rs | 194 +++++++++++------------------ 1 file changed, 71 insertions(+), 123 deletions(-) diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index ca11c01c4af..3bf1921e8a8 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -1,14 +1,13 @@ use std::{ io, num::NonZeroUsize, - path::PathBuf, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, - Arc, + Arc, Mutex, }, }; -use futures::{FutureExt as _, TryFutureExt as _}; +use futures::FutureExt as _; use itertools::Itertools as _; use log::{info, trace, warn}; use scopeguard::ScopeGuard; @@ -17,8 +16,8 @@ use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile}; use spacetimedb_paths::server::ReplicaDir; use thiserror::Error; use tokio::{ - sync::{futures::OwnedNotified, mpsc, oneshot, watch, Notify}, - task::{spawn_blocking, AbortHandle}, + sync::watch, + task::{spawn_blocking, JoinHandle}, }; use tracing::{instrument, Span}; @@ -73,8 +72,6 @@ pub enum OpenError { Commitlog(#[from] io::Error), } -type ShutdownReply = oneshot::Sender; - /// [`Durability`] implementation backed by a [`Commitlog`] on local storage. /// /// The commitlog is constrained to store the canonical [`Txdata`] payload, @@ -104,10 +101,9 @@ pub struct Local { /// This is mainly for observability purposes, and can thus be updated with /// relaxed memory ordering. queue_depth: Arc, - /// Channel to request the actor to exit. - shutdown: mpsc::Sender, - /// [AbortHandle] to force cancellation of the [Actor]. - abort: AbortHandle, + /// [JoinHandle] for the actor task. Contains `None` if already cancelled + /// (via [Durability::close]). + actor: Mutex>>, } impl Local { @@ -129,7 +125,7 @@ impl Local { // We could just place a lock on the commitlog directory, // yet for backwards-compatibility, we keep using the `db.lock` file. - let lock = Lock::create(replica_dir.0.join("db.lock"))?; + let lock = LockedFile::lock(replica_dir.0.join("db.lock"))?; let clog = Arc::new(Commitlog::open( replica_dir.commit_log(), @@ -140,31 +136,27 @@ impl Local { let (queue, txdata_rx) = async_channel::bounded(queue_capacity); let queue_depth = Arc::new(AtomicU64::new(0)); let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset()); - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - let abort = rt - .spawn( - Actor { - clog: clog.clone(), + let actor = rt.spawn( + Actor { + clog: clog.clone(), - durable_offset: durable_tx, - queue_depth: queue_depth.clone(), + durable_offset: durable_tx, + queue_depth: queue_depth.clone(), - batch_capacity: opts.batch_capacity, + batch_capacity: opts.batch_capacity, - lock, - } - .run(txdata_rx, shutdown_rx), - ) - .abort_handle(); + lock, + } + .run(txdata_rx), + ); Ok(Self { clog, durable_offset: durable_rx, queue, - shutdown: shutdown_tx, queue_depth, - abort, + actor: Mutex::new(Some(actor)), }) } @@ -211,16 +203,12 @@ struct Actor { batch_capacity: NonZeroUsize, #[allow(unused)] - lock: Lock, + lock: LockedFile, } impl Actor { #[instrument(name = "durability::local::actor", skip_all)] - async fn run( - self, - transactions_rx: async_channel::Receiver>>, - mut shutdown_rx: mpsc::Receiver>, - ) { + async fn run(self, transactions_rx: async_channel::Receiver>>) { info!("starting durability actor"); let mut tx_buf = Vec::with_capacity(self.batch_capacity.get()); @@ -229,50 +217,37 @@ impl Actor { let mut sync_on_exit = true; loop { - tokio::select! { - // Biased towards the shutdown channel, - // so that we stop accepting new data promptly after - // `Durability::close` was called. - biased; - - Some(reply) = shutdown_rx.recv() => { - transactions_rx.close(); - let _ = reply.send(self.lock.notified()); - }, - - // Pop as many elements from the channel as possible, - // potentially requiring the `tx_buf` to allocate additional - // capacity. - // We'll reclaim capacity in excess of `self.batch_size` below. - n = recv_many(&transactions_rx, &mut tx_buf, usize::MAX) => { - if n == 0 { - break; - } - if tx_buf.is_empty() { - continue; - } - - let clog = self.clog.clone(); - let ready_len = tx_buf.len(); - self.queue_depth.fetch_sub(ready_len as u64, Relaxed); - tx_buf = spawn_blocking(move || -> io::Result>>> { - for tx in tx_buf.drain(..) { - clog.commit([tx.into_transaction()])?; - } - Ok(tx_buf) - }) - .await - .expect("commitlog write panicked") - .expect("commitlog write failed"); - if self.flush_and_sync().await.is_err() { - sync_on_exit = false; - break; - } - // Reclaim burst capacity. - if n < self.batch_capacity.get() { - tx_buf.shrink_to(self.batch_capacity.get()); - } - }, + // Pop as many elements from the channel as possible, + // potentially requiring the `tx_buf` to allocate additional + // capacity. + // We'll reclaim capacity in excess of `self.batch_size` below. + let n = recv_many(&transactions_rx, &mut tx_buf, usize::MAX).await; + if n == 0 { + break; + } + if tx_buf.is_empty() { + continue; + } + + let clog = self.clog.clone(); + let ready_len = tx_buf.len(); + self.queue_depth.fetch_sub(ready_len as u64, Relaxed); + tx_buf = spawn_blocking(move || -> io::Result>>> { + for tx in tx_buf.drain(..) { + clog.commit([tx.into_transaction()])?; + } + Ok(tx_buf) + }) + .await + .expect("commitlog write panicked") + .expect("commitlog write failed"); + if self.flush_and_sync().await.is_err() { + sync_on_exit = false; + break; + } + // Reclaim burst capacity. + if n < self.batch_capacity.get() { + tx_buf.shrink_to(self.batch_capacity.get()); } } @@ -312,34 +287,6 @@ impl Actor { } } -struct Lock { - file: Option, - notify_on_drop: Arc, -} - -impl Lock { - pub fn create(path: PathBuf) -> Result { - let file = LockedFile::lock(path).map(Some)?; - let notify_on_drop = Arc::new(Notify::new()); - - Ok(Self { file, notify_on_drop }) - } - - pub fn notified(&self) -> OwnedNotified { - self.notify_on_drop.clone().notified_owned() - } -} - -impl Drop for Lock { - fn drop(&mut self) { - // Ensure the file lock is dropped before notifying. - if let Some(file) = self.file.take() { - drop(file); - } - self.notify_on_drop.notify_waiters(); - } -} - impl Durability for Local { type TxData = Txdata; @@ -356,26 +303,27 @@ impl Durability for Local { info!("close local durability"); let durable_offset = self.durable_tx_offset(); - let shutdown = self.shutdown.clone(); + let maybe_actor = self.actor.lock().unwrap().take(); // Abort actor if shutdown future is dropped. - let abort = scopeguard::guard(self.abort.clone(), |actor| { - warn!("close future dropped, aborting durability actor"); - actor.abort(); - }); - + let abort = scopeguard::guard( + maybe_actor.as_ref().map(|join_handle| join_handle.abort_handle()), + |maybe_abort_handle| { + if let Some(abort_handle) = maybe_abort_handle { + warn!("close future dropped, aborting durability actor"); + abort_handle.abort(); + } + }, + ); + self.queue.close(); async move { - let (done_tx, done_rx) = oneshot::channel(); - // Ignore channel errors - those just mean the actor is already gone. - let _ = shutdown - .send(done_tx) - .map_err(drop) - .and_then(|()| done_rx.map_err(drop)) - .and_then(|done| async move { - done.await; - Ok(()) - }) - .await; - // Don't abort if we completed normally. + if let Some(actor) = maybe_actor + && let Err(e) = actor.await + { + // Will print "durability actor: task was cancelled" + // or "durability actor: task panicked [...]" + warn!("durability actor: {e}"); + } + // Don't abort if the actor completed. let _ = ScopeGuard::into_inner(abort); durable_offset.last_seen()