diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index ca11c01c4af..3bf1921e8a8 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -1,14 +1,13 @@ use std::{ io, num::NonZeroUsize, - path::PathBuf, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, - Arc, + Arc, Mutex, }, }; -use futures::{FutureExt as _, TryFutureExt as _}; +use futures::FutureExt as _; use itertools::Itertools as _; use log::{info, trace, warn}; use scopeguard::ScopeGuard; @@ -17,8 +16,8 @@ use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile}; use spacetimedb_paths::server::ReplicaDir; use thiserror::Error; use tokio::{ - sync::{futures::OwnedNotified, mpsc, oneshot, watch, Notify}, - task::{spawn_blocking, AbortHandle}, + sync::watch, + task::{spawn_blocking, JoinHandle}, }; use tracing::{instrument, Span}; @@ -73,8 +72,6 @@ pub enum OpenError { Commitlog(#[from] io::Error), } -type ShutdownReply = oneshot::Sender; - /// [`Durability`] implementation backed by a [`Commitlog`] on local storage. /// /// The commitlog is constrained to store the canonical [`Txdata`] payload, @@ -104,10 +101,9 @@ pub struct Local { /// This is mainly for observability purposes, and can thus be updated with /// relaxed memory ordering. queue_depth: Arc, - /// Channel to request the actor to exit. - shutdown: mpsc::Sender, - /// [AbortHandle] to force cancellation of the [Actor]. - abort: AbortHandle, + /// [JoinHandle] for the actor task. Contains `None` if already cancelled + /// (via [Durability::close]). + actor: Mutex>>, } impl Local { @@ -129,7 +125,7 @@ impl Local { // We could just place a lock on the commitlog directory, // yet for backwards-compatibility, we keep using the `db.lock` file. - let lock = Lock::create(replica_dir.0.join("db.lock"))?; + let lock = LockedFile::lock(replica_dir.0.join("db.lock"))?; let clog = Arc::new(Commitlog::open( replica_dir.commit_log(), @@ -140,31 +136,27 @@ impl Local { let (queue, txdata_rx) = async_channel::bounded(queue_capacity); let queue_depth = Arc::new(AtomicU64::new(0)); let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset()); - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - let abort = rt - .spawn( - Actor { - clog: clog.clone(), + let actor = rt.spawn( + Actor { + clog: clog.clone(), - durable_offset: durable_tx, - queue_depth: queue_depth.clone(), + durable_offset: durable_tx, + queue_depth: queue_depth.clone(), - batch_capacity: opts.batch_capacity, + batch_capacity: opts.batch_capacity, - lock, - } - .run(txdata_rx, shutdown_rx), - ) - .abort_handle(); + lock, + } + .run(txdata_rx), + ); Ok(Self { clog, durable_offset: durable_rx, queue, - shutdown: shutdown_tx, queue_depth, - abort, + actor: Mutex::new(Some(actor)), }) } @@ -211,16 +203,12 @@ struct Actor { batch_capacity: NonZeroUsize, #[allow(unused)] - lock: Lock, + lock: LockedFile, } impl Actor { #[instrument(name = "durability::local::actor", skip_all)] - async fn run( - self, - transactions_rx: async_channel::Receiver>>, - mut shutdown_rx: mpsc::Receiver>, - ) { + async fn run(self, transactions_rx: async_channel::Receiver>>) { info!("starting durability actor"); let mut tx_buf = Vec::with_capacity(self.batch_capacity.get()); @@ -229,50 +217,37 @@ impl Actor { let mut sync_on_exit = true; loop { - tokio::select! { - // Biased towards the shutdown channel, - // so that we stop accepting new data promptly after - // `Durability::close` was called. - biased; - - Some(reply) = shutdown_rx.recv() => { - transactions_rx.close(); - let _ = reply.send(self.lock.notified()); - }, - - // Pop as many elements from the channel as possible, - // potentially requiring the `tx_buf` to allocate additional - // capacity. - // We'll reclaim capacity in excess of `self.batch_size` below. - n = recv_many(&transactions_rx, &mut tx_buf, usize::MAX) => { - if n == 0 { - break; - } - if tx_buf.is_empty() { - continue; - } - - let clog = self.clog.clone(); - let ready_len = tx_buf.len(); - self.queue_depth.fetch_sub(ready_len as u64, Relaxed); - tx_buf = spawn_blocking(move || -> io::Result>>> { - for tx in tx_buf.drain(..) { - clog.commit([tx.into_transaction()])?; - } - Ok(tx_buf) - }) - .await - .expect("commitlog write panicked") - .expect("commitlog write failed"); - if self.flush_and_sync().await.is_err() { - sync_on_exit = false; - break; - } - // Reclaim burst capacity. - if n < self.batch_capacity.get() { - tx_buf.shrink_to(self.batch_capacity.get()); - } - }, + // Pop as many elements from the channel as possible, + // potentially requiring the `tx_buf` to allocate additional + // capacity. + // We'll reclaim capacity in excess of `self.batch_size` below. + let n = recv_many(&transactions_rx, &mut tx_buf, usize::MAX).await; + if n == 0 { + break; + } + if tx_buf.is_empty() { + continue; + } + + let clog = self.clog.clone(); + let ready_len = tx_buf.len(); + self.queue_depth.fetch_sub(ready_len as u64, Relaxed); + tx_buf = spawn_blocking(move || -> io::Result>>> { + for tx in tx_buf.drain(..) { + clog.commit([tx.into_transaction()])?; + } + Ok(tx_buf) + }) + .await + .expect("commitlog write panicked") + .expect("commitlog write failed"); + if self.flush_and_sync().await.is_err() { + sync_on_exit = false; + break; + } + // Reclaim burst capacity. + if n < self.batch_capacity.get() { + tx_buf.shrink_to(self.batch_capacity.get()); } } @@ -312,34 +287,6 @@ impl Actor { } } -struct Lock { - file: Option, - notify_on_drop: Arc, -} - -impl Lock { - pub fn create(path: PathBuf) -> Result { - let file = LockedFile::lock(path).map(Some)?; - let notify_on_drop = Arc::new(Notify::new()); - - Ok(Self { file, notify_on_drop }) - } - - pub fn notified(&self) -> OwnedNotified { - self.notify_on_drop.clone().notified_owned() - } -} - -impl Drop for Lock { - fn drop(&mut self) { - // Ensure the file lock is dropped before notifying. - if let Some(file) = self.file.take() { - drop(file); - } - self.notify_on_drop.notify_waiters(); - } -} - impl Durability for Local { type TxData = Txdata; @@ -356,26 +303,27 @@ impl Durability for Local { info!("close local durability"); let durable_offset = self.durable_tx_offset(); - let shutdown = self.shutdown.clone(); + let maybe_actor = self.actor.lock().unwrap().take(); // Abort actor if shutdown future is dropped. - let abort = scopeguard::guard(self.abort.clone(), |actor| { - warn!("close future dropped, aborting durability actor"); - actor.abort(); - }); - + let abort = scopeguard::guard( + maybe_actor.as_ref().map(|join_handle| join_handle.abort_handle()), + |maybe_abort_handle| { + if let Some(abort_handle) = maybe_abort_handle { + warn!("close future dropped, aborting durability actor"); + abort_handle.abort(); + } + }, + ); + self.queue.close(); async move { - let (done_tx, done_rx) = oneshot::channel(); - // Ignore channel errors - those just mean the actor is already gone. - let _ = shutdown - .send(done_tx) - .map_err(drop) - .and_then(|()| done_rx.map_err(drop)) - .and_then(|done| async move { - done.await; - Ok(()) - }) - .await; - // Don't abort if we completed normally. + if let Some(actor) = maybe_actor + && let Err(e) = actor.await + { + // Will print "durability actor: task was cancelled" + // or "durability actor: task panicked [...]" + warn!("durability actor: {e}"); + } + // Don't abort if the actor completed. let _ = ScopeGuard::into_inner(abort); durable_offset.last_seen()