Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 71 additions & 123 deletions crates/durability/src/imp/local.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::{
io,
num::NonZeroUsize,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering::Relaxed},
Arc,
Arc, Mutex,
},
};

use futures::{FutureExt as _, TryFutureExt as _};
use futures::FutureExt as _;
use itertools::Itertools as _;
use log::{info, trace, warn};
use scopeguard::ScopeGuard;
Expand All @@ -17,8 +16,8 @@ use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile};
use spacetimedb_paths::server::ReplicaDir;
use thiserror::Error;
use tokio::{
sync::{futures::OwnedNotified, mpsc, oneshot, watch, Notify},
task::{spawn_blocking, AbortHandle},
sync::watch,
task::{spawn_blocking, JoinHandle},
};
use tracing::{instrument, Span};

Expand Down Expand Up @@ -73,8 +72,6 @@ pub enum OpenError {
Commitlog(#[from] io::Error),
}

type ShutdownReply = oneshot::Sender<OwnedNotified>;

/// [`Durability`] implementation backed by a [`Commitlog`] on local storage.
///
/// The commitlog is constrained to store the canonical [`Txdata`] payload,
Expand Down Expand Up @@ -104,10 +101,9 @@ pub struct Local<T> {
/// This is mainly for observability purposes, and can thus be updated with
/// relaxed memory ordering.
queue_depth: Arc<AtomicU64>,
/// Channel to request the actor to exit.
shutdown: mpsc::Sender<ShutdownReply>,
/// [AbortHandle] to force cancellation of the [Actor].
abort: AbortHandle,
/// [JoinHandle] for the actor task. Contains `None` if already cancelled
/// (via [Durability::close]).
actor: Mutex<Option<JoinHandle<()>>>,
}

impl<T: Encode + Send + Sync + 'static> Local<T> {
Expand All @@ -129,7 +125,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {

// We could just place a lock on the commitlog directory,
// yet for backwards-compatibility, we keep using the `db.lock` file.
let lock = Lock::create(replica_dir.0.join("db.lock"))?;
let lock = LockedFile::lock(replica_dir.0.join("db.lock"))?;

let clog = Arc::new(Commitlog::open(
replica_dir.commit_log(),
Expand All @@ -140,31 +136,27 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
let (queue, txdata_rx) = async_channel::bounded(queue_capacity);
let queue_depth = Arc::new(AtomicU64::new(0));
let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset());
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);

let abort = rt
.spawn(
Actor {
clog: clog.clone(),
let actor = rt.spawn(
Actor {
clog: clog.clone(),

durable_offset: durable_tx,
queue_depth: queue_depth.clone(),
durable_offset: durable_tx,
queue_depth: queue_depth.clone(),

batch_capacity: opts.batch_capacity,
batch_capacity: opts.batch_capacity,

lock,
}
.run(txdata_rx, shutdown_rx),
)
.abort_handle();
lock,
}
.run(txdata_rx),
);

Ok(Self {
clog,
durable_offset: durable_rx,
queue,
shutdown: shutdown_tx,
queue_depth,
abort,
actor: Mutex::new(Some(actor)),
})
}

Expand Down Expand Up @@ -211,16 +203,12 @@ struct Actor<T> {
batch_capacity: NonZeroUsize,

#[allow(unused)]
lock: Lock,
lock: LockedFile,
}

impl<T: Encode + Send + Sync + 'static> Actor<T> {
#[instrument(name = "durability::local::actor", skip_all)]
async fn run(
self,
transactions_rx: async_channel::Receiver<PreparedTx<Txdata<T>>>,
mut shutdown_rx: mpsc::Receiver<oneshot::Sender<OwnedNotified>>,
) {
async fn run(self, transactions_rx: async_channel::Receiver<PreparedTx<Txdata<T>>>) {
info!("starting durability actor");

let mut tx_buf = Vec::with_capacity(self.batch_capacity.get());
Expand All @@ -229,50 +217,37 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
let mut sync_on_exit = true;

loop {
tokio::select! {
// Biased towards the shutdown channel,
// so that we stop accepting new data promptly after
// `Durability::close` was called.
biased;

Some(reply) = shutdown_rx.recv() => {
transactions_rx.close();
let _ = reply.send(self.lock.notified());
},

// Pop as many elements from the channel as possible,
// potentially requiring the `tx_buf` to allocate additional
// capacity.
// We'll reclaim capacity in excess of `self.batch_size` below.
n = recv_many(&transactions_rx, &mut tx_buf, usize::MAX) => {
if n == 0 {
break;
}
if tx_buf.is_empty() {
continue;
}

let clog = self.clog.clone();
let ready_len = tx_buf.len();
self.queue_depth.fetch_sub(ready_len as u64, Relaxed);
tx_buf = spawn_blocking(move || -> io::Result<Vec<PreparedTx<Txdata<T>>>> {
for tx in tx_buf.drain(..) {
clog.commit([tx.into_transaction()])?;
}
Ok(tx_buf)
})
.await
.expect("commitlog write panicked")
.expect("commitlog write failed");
if self.flush_and_sync().await.is_err() {
sync_on_exit = false;
break;
}
// Reclaim burst capacity.
if n < self.batch_capacity.get() {
tx_buf.shrink_to(self.batch_capacity.get());
}
},
// Pop as many elements from the channel as possible,
// potentially requiring the `tx_buf` to allocate additional
// capacity.
// We'll reclaim capacity in excess of `self.batch_size` below.
let n = recv_many(&transactions_rx, &mut tx_buf, usize::MAX).await;
if n == 0 {
break;
}
if tx_buf.is_empty() {
continue;
}

let clog = self.clog.clone();
let ready_len = tx_buf.len();
self.queue_depth.fetch_sub(ready_len as u64, Relaxed);
tx_buf = spawn_blocking(move || -> io::Result<Vec<PreparedTx<Txdata<T>>>> {
for tx in tx_buf.drain(..) {
clog.commit([tx.into_transaction()])?;
}
Ok(tx_buf)
})
.await
.expect("commitlog write panicked")
.expect("commitlog write failed");
if self.flush_and_sync().await.is_err() {
sync_on_exit = false;
break;
}
// Reclaim burst capacity.
if n < self.batch_capacity.get() {
tx_buf.shrink_to(self.batch_capacity.get());
}
}

Expand Down Expand Up @@ -312,34 +287,6 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
}
}

struct Lock {
file: Option<LockedFile>,
notify_on_drop: Arc<Notify>,
}

impl Lock {
pub fn create(path: PathBuf) -> Result<Self, LockError> {
let file = LockedFile::lock(path).map(Some)?;
let notify_on_drop = Arc::new(Notify::new());

Ok(Self { file, notify_on_drop })
}

pub fn notified(&self) -> OwnedNotified {
self.notify_on_drop.clone().notified_owned()
}
}

impl Drop for Lock {
fn drop(&mut self) {
// Ensure the file lock is dropped before notifying.
if let Some(file) = self.file.take() {
drop(file);
}
self.notify_on_drop.notify_waiters();
}
}

impl<T: Send + Sync + 'static> Durability for Local<T> {
type TxData = Txdata<T>;

Expand All @@ -356,26 +303,27 @@ impl<T: Send + Sync + 'static> Durability for Local<T> {
info!("close local durability");

let durable_offset = self.durable_tx_offset();
let shutdown = self.shutdown.clone();
let maybe_actor = self.actor.lock().unwrap().take();
Comment thread
kim marked this conversation as resolved.
// Abort actor if shutdown future is dropped.
let abort = scopeguard::guard(self.abort.clone(), |actor| {
warn!("close future dropped, aborting durability actor");
actor.abort();
});

let abort = scopeguard::guard(
maybe_actor.as_ref().map(|join_handle| join_handle.abort_handle()),
|maybe_abort_handle| {
if let Some(abort_handle) = maybe_abort_handle {
warn!("close future dropped, aborting durability actor");
abort_handle.abort();
}
},
);
self.queue.close();
async move {
let (done_tx, done_rx) = oneshot::channel();
// Ignore channel errors - those just mean the actor is already gone.
let _ = shutdown
.send(done_tx)
.map_err(drop)
.and_then(|()| done_rx.map_err(drop))
.and_then(|done| async move {
done.await;
Ok(())
})
.await;
// Don't abort if we completed normally.
if let Some(actor) = maybe_actor
&& let Err(e) = actor.await
{
// Will print "durability actor: task was cancelled"
// or "durability actor: task panicked [...]"
warn!("durability actor: {e}");
}
// Don't abort if the actor completed.
let _ = ScopeGuard::into_inner(abort);

durable_offset.last_seen()
Expand Down
Loading