Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,54 @@ pub async fn ack_commit_2pc<S: ControlStateDelegate + NodeDelegate>(
Ok(StatusCode::OK)
}

/// 2PC prepared-to-persist endpoint (pipelined 2PC Round 2).
///
/// Called by participant B after its PREPARE_PERSIST is durable, to notify
/// coordinator A that B is ready for COMMIT_PERSIST.
///
/// `POST /v1/database/:name_or_identity/2pc/prepared-to-persist/:prepare_id`
pub async fn prepared_to_persist_2pc<S: ControlStateDelegate + NodeDelegate>(
State(worker_ctx): State<S>,
Extension(_auth): Extension<SpacetimeAuth>,
Path(TwoPcParams {
name_or_identity,
prepare_id,
}): Path<TwoPcParams>,
) -> axum::response::Result<impl IntoResponse> {
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;

module.signal_persist_prepared(&prepare_id).map_err(|e| {
log::warn!("2PC prepared-to-persist: {e}");
(StatusCode::NOT_FOUND, e).into_response()
})?;

Ok(StatusCode::OK)
}

/// 2PC commit-persist endpoint (pipelined 2PC Round 2).
///
/// Called by coordinator A to tell participant B that A's COMMIT_PERSIST is durable
/// and B should finalize (delete PREPARE marker, flush deferred transactions).
///
/// `POST /v1/database/:name_or_identity/2pc/commit-persist/:prepare_id`
pub async fn commit_persist_2pc<S: ControlStateDelegate + NodeDelegate>(
State(worker_ctx): State<S>,
Extension(_auth): Extension<SpacetimeAuth>,
Path(TwoPcParams {
name_or_identity,
prepare_id,
}): Path<TwoPcParams>,
) -> axum::response::Result<impl IntoResponse> {
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;

module.commit_persist_prepared(&prepare_id).map_err(|e| {
log::error!("2PC commit-persist failed: {e}");
(StatusCode::NOT_FOUND, e).into_response()
})?;

Ok(StatusCode::OK)
}

/// Encode a reducer return value as an HTTP response.
///
/// If the outcome is an error, return a raw string with `application/text`. Ignore `want_bsatn` in this case.
Expand Down Expand Up @@ -1447,6 +1495,10 @@ pub struct DatabaseRoutes<S> {
pub status_2pc_get: MethodRouter<S>,
/// POST: /database/:name_or_identity/2pc/ack-commit/:prepare_id
pub ack_commit_2pc_post: MethodRouter<S>,
/// POST: /database/:name_or_identity/2pc/prepared-to-persist/:prepare_id
pub prepared_to_persist_2pc_post: MethodRouter<S>,
/// POST: /database/:name_or_identity/2pc/commit-persist/:prepare_id
pub commit_persist_2pc_post: MethodRouter<S>,
}

impl<S> Default for DatabaseRoutes<S>
Expand Down Expand Up @@ -1477,6 +1529,8 @@ where
abort_2pc_post: post(abort_2pc::<S>),
status_2pc_get: get(status_2pc::<S>),
ack_commit_2pc_post: post(ack_commit_2pc::<S>),
prepared_to_persist_2pc_post: post(prepared_to_persist_2pc::<S>),
commit_persist_2pc_post: post(commit_persist_2pc::<S>),
}
}
}
Expand Down Expand Up @@ -1506,7 +1560,12 @@ where
.route("/2pc/commit/:prepare_id", self.commit_2pc_post)
.route("/2pc/abort/:prepare_id", self.abort_2pc_post)
.route("/2pc/status/:prepare_id", self.status_2pc_get)
.route("/2pc/ack-commit/:prepare_id", self.ack_commit_2pc_post);
.route("/2pc/ack-commit/:prepare_id", self.ack_commit_2pc_post)
.route(
"/2pc/prepared-to-persist/:prepare_id",
self.prepared_to_persist_2pc_post,
)
.route("/2pc/commit-persist/:prepare_id", self.commit_persist_2pc_post);

axum::Router::new()
.route("/", self.root_post)
Expand Down
135 changes: 128 additions & 7 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@ pub const ONLY_MODULE_VERSION: &str = "0.0.1";
/// for each entry in [`ConnectedClients`].
pub type ConnectedClients = HashSet<(Identity, ConnectionId)>;

/// Durability barrier for pipelined 2PC.
///
/// Supports multiple concurrent 2PC transactions. Each active barrier is
/// identified by its tx_offset. Transactions above the *minimum* active
/// barrier offset are deferred. When a barrier is cleared, pending
/// transactions up to the new minimum (or all, if no barriers remain)
/// are flushed to the durability worker.
struct DurabilityBarrier {
/// Active barrier offsets (one per in-flight 2PC transaction).
active: std::collections::BTreeSet<u64>,
/// Transactions deferred by the barrier.
pending: Vec<(Option<ReducerContext>, Arc<TxData>)>,
}

pub struct RelationalDB {
database_identity: Identity,
owner_identity: Identity,
Expand All @@ -112,6 +126,10 @@ pub struct RelationalDB {

/// An async queue for recording transaction metrics off the main thread
metrics_recorder_queue: Option<MetricsRecorderQueue>,

/// Pipelined 2PC durability barrier.
/// When set, transactions past the barrier offset are deferred (not sent to disk).
durability_barrier: std::sync::Mutex<Option<DurabilityBarrier>>,
}

/// Perform a snapshot every `SNAPSHOT_FREQUENCY` transactions.
Expand Down Expand Up @@ -176,6 +194,7 @@ impl RelationalDB {

workload_type_to_exec_counters,
metrics_recorder_queue,
durability_barrier: std::sync::Mutex::new(None),
}
}

Expand Down Expand Up @@ -464,6 +483,14 @@ impl RelationalDB {
self.with_auto_commit(Workload::Internal, |tx| tx.scan_st_2pc_state().map_err(DBError::from))
}

/// The offset that will be assigned to the next committed transaction.
///
/// Safe to call while holding the write lock (MutTxId) -- the offset won't change
/// until the write lock is released via commit.
pub fn next_tx_offset(&self) -> u64 {
self.inner.next_tx_offset()
}

/// Read any 2PC coordinator log entries that have not yet been acknowledged by their
/// participants. Used on coordinator crash-recovery to retransmit COMMIT decisions.
pub fn pending_2pc_coordinator_commits(
Expand Down Expand Up @@ -841,9 +868,7 @@ impl RelationalDB {
self.maybe_do_snapshot(&tx_data);

let tx_data = Arc::new(tx_data);
if let Some(durability) = &self.durability {
durability.request_durability(reducer_context, &tx_data);
}
self.request_durability_maybe_barrier(reducer_context, &tx_data);

Ok(Some((tx_offset, tx_data, tx_metrics, reducer)))
}
Expand All @@ -857,9 +882,7 @@ impl RelationalDB {
self.maybe_do_snapshot(&tx_data);

let tx_data = Arc::new(tx_data);
if let Some(durability) = &self.durability {
durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data);
}
self.request_durability_maybe_barrier(tx.ctx.reducer_context().cloned(), &tx_data);

(tx_data, tx_metrics, tx)
}
Expand All @@ -869,8 +892,106 @@ impl RelationalDB {
/// Used by the 2PC participant path to make the `st_2pc_state` PREPARE marker durable
/// while the main write lock is still held (i.e. without going through a full commit).
pub fn request_durability_for_tx_data(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
self.request_durability_maybe_barrier(reducer_context, tx_data);
}

/// Send a tx to the durability worker, unless a durability barrier is active
/// and the tx's offset exceeds the minimum active barrier. In that case, defer the tx.
fn request_durability_maybe_barrier(
&self,
reducer_context: Option<ReducerContext>,
tx_data: &Arc<TxData>,
) {
let Some(durability) = &self.durability else {
return;
};

let mut barrier = self.durability_barrier.lock().unwrap();
if let Some(ref mut b) = *barrier {
if let Some(&min_barrier) = b.active.first() {
if let Some(offset) = tx_data.tx_offset() {
if offset > min_barrier {
// Past the lowest active barrier: defer.
b.pending.push((reducer_context, tx_data.clone()));
return;
}
}
}
}
// At or before the barrier (or no barrier): normal path.
durability.request_durability(reducer_context, tx_data);
}

/// Set a durability barrier at `barrier_offset`.
///
/// Transactions at this offset pass through to the durability worker normally.
/// Transactions with higher offsets are deferred until all barriers are cleared.
/// Multiple concurrent barriers are supported; the effective barrier is the minimum.
///
/// Call while holding the database write lock to prevent races.
pub fn set_durability_barrier(&self, barrier_offset: u64) {
let mut barrier = self.durability_barrier.lock().unwrap();
let b = barrier.get_or_insert_with(|| DurabilityBarrier {
active: std::collections::BTreeSet::new(),
pending: Vec::new(),
});
b.active.insert(barrier_offset);
}

/// Abort a durability barrier, discarding ALL deferred transactions.
///
/// Used when Round 2 of pipelined 2PC aborts. All transactions behind the
/// barrier are tainted (they may have read data from the aborted 2PC tx)
/// and must not reach disk. On restart, the in-memory state is lost and
/// the pipeline is effectively flushed.
pub fn abort_durability_barrier(&self, barrier_offset: u64) {
let mut barrier = self.durability_barrier.lock().unwrap();
let Some(ref mut b) = *barrier else {
return;
};
b.active.remove(&barrier_offset);
if b.active.is_empty() {
// Drop all pending transactions -- they are tainted.
*barrier = None;
}
// If other barriers remain, the pending list stays (those transactions
// are still blocked by the other barriers and will be resolved by them).
}

/// Clear one durability barrier, flushing deferred transactions that are now
/// below the new minimum barrier (or all if no barriers remain).
pub fn clear_durability_barrier(&self, barrier_offset: u64) {
let to_flush = {
let mut barrier = self.durability_barrier.lock().unwrap();
let Some(ref mut b) = *barrier else {
return;
};
b.active.remove(&barrier_offset);
if b.active.is_empty() {
// No more barriers: flush everything.
let pending = std::mem::take(&mut b.pending);
*barrier = None;
pending
} else {
// Flush pending transactions up to the new minimum barrier.
let &new_min = b.active.first().unwrap();
let mut flush = Vec::new();
b.pending.retain(|(ctx, td)| {
if let Some(offset) = td.tx_offset() {
if offset <= new_min {
flush.push((ctx.clone(), td.clone()));
return false;
}
}
true
});
flush
}
};
if let Some(durability) = &self.durability {
durability.request_durability(reducer_context, tx_data);
for (reducer_context, tx_data) in to_flush {
durability.request_durability(reducer_context, &tx_data);
}
}
}

Expand Down
13 changes: 12 additions & 1 deletion crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,8 @@ async fn make_replica_ctx(
call_reducer_client,
call_reducer_router,
call_reducer_auth_token,
prepared_txs: crate::host::prepared_tx::PreparedTransactions::new(),
on_panic: std::sync::Arc::new(std::sync::OnceLock::new()),
})
}

Expand Down Expand Up @@ -830,14 +832,23 @@ impl<F: Fn() + Send + Sync + 'static> ModuleLauncher<F> {
)
.await
.map(Arc::new)?;

// Share the unregister function with the replica context so that async tasks
// (e.g., 2PC Round 2) can trigger module restart without holding the executor thread.
let on_panic = std::sync::Arc::new(self.on_panic);
let _ = replica_ctx.on_panic.set(Box::new({
let op = on_panic.clone();
move || op()
}));

let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db().clone());
let (program, module_host) = make_module_host(
self.runtimes.clone(),
replica_ctx.clone(),
scheduler.clone(),
self.program,
self.energy_monitor,
self.on_panic,
move || on_panic(),
self.core,
)
.await?;
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1509,6 +1509,8 @@ mod test {
call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()),
call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")),
call_reducer_auth_token: None,
prepared_txs: crate::host::prepared_tx::PreparedTransactions::new(),
on_panic: std::sync::Arc::new(std::sync::OnceLock::new()),
},
runtime,
))
Expand Down
Loading
Loading