Skip to content

Commit

Permalink
Commit transactions with one write batch per checkpoint (#17194)
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark committed Apr 18, 2024
1 parent b3bb3d5 commit 722a87a
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 66 deletions.
54 changes: 34 additions & 20 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ impl AuthorityStore {
#[instrument(level = "trace", skip_all)]
async fn acquire_read_locks_for_indirect_objects(
&self,
written: &WrittenObjects,
written: &[Object],
) -> Vec<RwLockGuard> {
// locking is required to avoid potential race conditions with the pruner
// potential race:
Expand All @@ -797,7 +797,7 @@ impl AuthorityStore {
// read locks are sufficient because ref count increments are safe,
// concurrent transaction executions produce independent ref count increments and don't corrupt the state
let digests = written
.values()
.iter()
.filter_map(|object| {
let StoreObjectPair(_, indirect_object) =
get_store_object_pair(object.clone(), self.indirect_objects_threshold);
Expand All @@ -815,7 +815,35 @@ impl AuthorityStore {
pub async fn write_transaction_outputs(
&self,
epoch_id: EpochId,
tx_outputs: Arc<TransactionOutputs>,
tx_outputs: &[Arc<TransactionOutputs>],
) -> SuiResult {
let mut written = Vec::with_capacity(tx_outputs.len());
for outputs in tx_outputs {
written.extend(outputs.written.values().cloned());
}

let _locks = self.acquire_read_locks_for_indirect_objects(&written).await;

let mut write_batch = self.perpetual_tables.transactions.batch();
for outputs in tx_outputs {
self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
}
// test crashing before writing the batch
fail_point_async!("crash");

write_batch.write()?;

// test crashing before notifying
fail_point_async!("crash");

Ok(())
}

fn write_one_transaction_outputs(
&self,
write_batch: &mut DBBatch,
epoch_id: EpochId,
tx_outputs: &TransactionOutputs,
) -> SuiResult {
let TransactionOutputs {
transaction,
Expand All @@ -828,12 +856,7 @@ impl AuthorityStore {
locks_to_delete,
new_locks_to_init,
..
} = &*tx_outputs;

let _locks = self.acquire_read_locks_for_indirect_objects(written).await;

// Extract the new state from the execution
let mut write_batch = self.perpetual_tables.transactions.batch();
} = tx_outputs;

// Store the certificate indexed by transaction digest
let transaction_digest = transaction.digest();
Expand Down Expand Up @@ -913,11 +936,11 @@ impl AuthorityStore {

write_batch.insert_batch(&self.perpetual_tables.events, events)?;

self.initialize_live_object_markers_impl(&mut write_batch, new_locks_to_init, false)?;
self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;

// Note: deletes locks for received objects as well (but not for objects that were in
// `Receiving` arguments which were not received)
self.delete_live_object_markers(&mut write_batch, locks_to_delete)?;
self.delete_live_object_markers(write_batch, locks_to_delete)?;

write_batch
.insert_batch(
Expand All @@ -929,15 +952,6 @@ impl AuthorityStore {
[(transaction_digest, effects_digest)],
)?;

// test crashing before writing the batch
fail_point_async!("crash");

// Commit.
write_batch.write()?;

// test crashing before notifying
fail_point_async!("crash");

debug!(effects_digest = ?effects.digest(), "commit_certificate finished");

Ok(())
Expand Down
12 changes: 5 additions & 7 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,10 @@ impl CheckpointExecutor {
// Commit all transaction effects to disk
let cache_commit = self.state.get_cache_commit();
debug!(seq = ?checkpoint.sequence_number, "committing checkpoint transactions to disk");
for digest in all_tx_digests {
cache_commit
.commit_transaction_outputs(epoch_store.epoch(), digest)
.await
.expect("commit_transaction_outputs cannot fail");
}
cache_commit
.commit_transaction_outputs(epoch_store.epoch(), all_tx_digests)
.await
.expect("commit_transaction_outputs cannot fail");

if !checkpoint.is_last_checkpoint_of_epoch() {
self.bump_highest_executed_checkpoint(checkpoint);
Expand Down Expand Up @@ -563,7 +561,7 @@ impl CheckpointExecutor {

let cache_commit = self.state.get_cache_commit();
cache_commit
.commit_transaction_outputs(cur_epoch, &change_epoch_tx_digest)
.commit_transaction_outputs(cur_epoch, &[change_epoch_tx_digest])
.await
.expect("commit_transaction_outputs cannot fail");

Expand Down
10 changes: 5 additions & 5 deletions crates/sui-core/src/execution_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ impl ExecutionCacheMetrics {
pub type ExecutionCache = PassthroughCache;

pub trait ExecutionCacheCommit: Send + Sync {
/// Durably commit the transaction outputs of the given transaction to the database.
/// Durably commit the outputs of the given transactions to the database.
/// Will be called by CheckpointExecutor to ensure that transaction outputs are
/// written durably before marking a checkpoint as finalized.
fn commit_transaction_outputs(
&self,
fn commit_transaction_outputs<'a>(
&'a self,
epoch: EpochId,
digest: &TransactionDigest,
) -> BoxFuture<'_, SuiResult>;
digest: &'a [TransactionDigest],
) -> BoxFuture<'a, SuiResult>;
}

pub trait ExecutionCacheRead: Send + Sync {
Expand Down
10 changes: 5 additions & 5 deletions crates/sui-core/src/execution_cache/passthrough_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl ExecutionCacheWrite for PassthroughCache {
.check_owned_objects_are_live(&tx_outputs.locks_to_delete)?;

self.store
.write_transaction_outputs(epoch_id, tx_outputs)
.write_transaction_outputs(epoch_id, &[tx_outputs])
.await?;

self.executed_effects_digests_notify_read
Expand Down Expand Up @@ -360,11 +360,11 @@ impl AccumulatorStore for PassthroughCache {
}

impl ExecutionCacheCommit for PassthroughCache {
fn commit_transaction_outputs(
&self,
fn commit_transaction_outputs<'a>(
&'a self,
_epoch: EpochId,
_digest: &TransactionDigest,
) -> BoxFuture<'_, SuiResult> {
_digests: &'a [TransactionDigest],
) -> BoxFuture<'a, SuiResult> {
// Nothing needs to be done since they were already committed in write_transaction_outputs
async { Ok(()) }.boxed()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl Scenario {

// commit a transaction to the database
pub async fn commit(&mut self, tx: TransactionDigest) -> SuiResult {
let res = self.cache().commit_transaction_outputs(1, &tx).await;
let res = self.cache().commit_transaction_outputs(1, &[tx]).await;
self.count_action();
res
}
Expand Down Expand Up @@ -550,7 +550,7 @@ async fn test_committed() {
s.assert_live(&[1, 2]);
s.assert_dirty(&[1, 2]);
s.cache()
.commit_transaction_outputs(1, &tx)
.commit_transaction_outputs(1, &[tx])
.await
.expect("commit failed");
s.assert_not_dirty(&[1, 2]);
Expand Down
67 changes: 42 additions & 25 deletions crates/sui-core/src/execution_cache/writeback_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,33 +630,48 @@ impl WritebackCache {
async fn commit_transaction_outputs(
&self,
epoch: EpochId,
digest: TransactionDigest,
digests: &[TransactionDigest],
) -> SuiResult {
fail_point_async!("writeback-cache-commit");

let DashMapEntry::Occupied(occupied) = self.dirty.pending_transaction_writes.entry(digest)
else {
panic!("Attempt to commit unknown transaction {:?}", digest);
};

let outputs = occupied.get().clone();
let mut all_outputs = Vec::with_capacity(digests.len());
for tx in digests {
let Some(outputs) = self
.dirty
.pending_transaction_writes
.get(tx)
.map(|o| o.clone())
else {
panic!("Attempt to commit unknown transaction {:?}", tx);
};
all_outputs.push(outputs);
}

// Flush writes to disk before removing anything from dirty set. otherwise,
// a cache eviction could cause a value to disappear briefly, even if we insert to the
// cache before removing from the dirty set.
self.store
.write_transaction_outputs(epoch, outputs.clone())
.write_transaction_outputs(epoch, &all_outputs)
.await?;

// Cache transaction before removing entry from self.dirty to avoid
// unnecessary cache misses
self.cached
.transactions
.insert(digest, outputs.transaction.clone());
for (tx_digest, outputs) in digests.iter().zip(all_outputs.into_iter()) {
assert!(self
.dirty
.pending_transaction_writes
.remove(tx_digest)
.is_some());
self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, &outputs);
}

// releases lock on pending_transaction_writes
occupied.remove();
Ok(())
}

fn flush_transactions_from_dirty_to_cached(
&self,
epoch: EpochId,
tx_digest: TransactionDigest,
outputs: &TransactionOutputs,
) {
// Now, remove each piece of committed data from the dirty state and insert it into the cache.
// TODO: outputs should have a strong count of 1 so we should be able to move out of it
let TransactionOutputs {
Expand All @@ -668,18 +683,22 @@ impl WritebackCache {
wrapped,
events,
..
} = &*outputs;
} = outputs;

let tx_digest = *transaction.digest();
let effects_digest = effects.digest();
let events_digest = events.digest();

// Update cache before removing from self.dirty to avoid
// unnecessary cache misses
self.cached
.transactions
.insert(tx_digest, transaction.clone());
self.cached
.transaction_effects
.insert(effects_digest, effects.clone().into());
self.cached
.executed_effects_digests
.insert(digest, effects_digest);
.insert(tx_digest, effects_digest);
self.cached
.transaction_events
.insert(events_digest, events.clone().into());
Expand Down Expand Up @@ -747,8 +766,6 @@ impl WritebackCache {
&ObjectEntry::Wrapped,
);
}

Ok(())
}

// Move the oldest/least entry from the dirty queue to the cache queue.
Expand Down Expand Up @@ -870,12 +887,12 @@ impl WritebackCache {
impl ExecutionCacheAPI for WritebackCache {}

impl ExecutionCacheCommit for WritebackCache {
fn commit_transaction_outputs(
&self,
fn commit_transaction_outputs<'a>(
&'a self,
epoch: EpochId,
digest: &TransactionDigest,
) -> BoxFuture<'_, SuiResult> {
WritebackCache::commit_transaction_outputs(self, epoch, *digest).boxed()
digests: &'a [TransactionDigest],
) -> BoxFuture<'a, SuiResult> {
WritebackCache::commit_transaction_outputs(self, epoch, digests).boxed()
}
}

Expand Down
7 changes: 5 additions & 2 deletions crates/sui-single-node-benchmark/src/benchmark_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ impl BenchmarkContext {
self.validator()
.get_validator()
.get_cache_commit()
.commit_transaction_outputs(effects.executed_epoch(), effects.transaction_digest())
.commit_transaction_outputs(
effects.executed_epoch(),
&[*effects.transaction_digest()],
)
.await
.unwrap();
let (owner, root_object) = effects
Expand Down Expand Up @@ -185,7 +188,7 @@ impl BenchmarkContext {
// For checkpoint executor, in order to commit a checkpoint it is required previous versions
// of objects are already committed.
cache_commit
.commit_transaction_outputs(epoch_id, effects.transaction_digest())
.commit_transaction_outputs(epoch_id, &[*effects.transaction_digest()])
.await
.unwrap();
}
Expand Down

0 comments on commit 722a87a

Please sign in to comment.