Skip to content

Commit

Permalink
Check for lock existence before attempting tx execution. (#6129)
Browse files Browse the repository at this point in the history
* Remove shared locks after execution

* Support for setting committee size and number of shared objects

* Verify locks exist before attempting to execute transactions.

* PR comments

* fix rebase problems

* Add debug logs to TransactionManager

* reinitialize old locks when reverting state update

* rebase fixes
  • Loading branch information
mystenmark committed Nov 22, 2022
1 parent 81a0d75 commit 616d228
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 27 deletions.
13 changes: 11 additions & 2 deletions crates/sui-benchmark/tests/simtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ mod test {

#[sim_test(config = "test_config()")]
async fn test_simulated_load() {
let test_cluster = init_cluster_builder_env_aware().build().await.unwrap();
let test_cluster = init_cluster_builder_env_aware()
.with_num_validators(get_var("SIM_STRESS_TEST_NUM_VALIDATORS", 4))
.build()
.await
.unwrap();
let swarm = &test_cluster.swarm;
let context = &test_cluster.wallet;
let sender = test_cluster.get_address_0();
Expand Down Expand Up @@ -85,7 +89,12 @@ mod test {
);

for w in workloads.iter_mut() {
w.workload.init(5, proxy.clone()).await;
w.workload
.init(
get_var("SIM_STRESS_TEST_NUM_SHARED_OBJECTS", 5),
proxy.clone(),
)
.await;
}

let driver = BenchDriver::new(5);
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,11 @@ impl AuthorityState {
.tap_err(|e| debug!(?tx_digest, "process_certificate failed: {e}"))
}

#[instrument(level = "trace", skip_all)]
async fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
self.database.check_owned_locks(owned_object_refs).await
}

#[instrument(level = "trace", skip_all)]
async fn check_shared_locks(
&self,
Expand Down Expand Up @@ -1043,6 +1048,9 @@ impl AuthorityState {
let (gas_status, input_objects) =
transaction_input_checker::check_certificate_input(&self.database, certificate).await?;

let owned_object_refs = input_objects.filter_owned_objects();
self.check_owned_locks(&owned_object_refs).await?;

// At this point we need to check if any shared objects need locks,
// and whether they have them.
let shared_object_refs = input_objects.filter_shared_objects();
Expand Down
97 changes: 87 additions & 10 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
.get(&ObjectKey(*object_id, version))?)
}

pub fn object_exists(
pub fn object_version_exists(
&self,
object_id: &ObjectID,
version: VersionNumber,
Expand Down Expand Up @@ -356,14 +356,15 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {

/// When making changes, please see if check_sequenced_input_objects() below needs
/// similar changes as well.
pub fn get_missing_input_objects(
pub async fn get_missing_input_objects(
&self,
digest: &TransactionDigest,
objects: &[InputObjectKind],
) -> Result<Vec<ObjectKey>, SuiError> {
let shared_locks_cell: OnceCell<HashMap<_, _>> = OnceCell::new();

let mut missing = Vec::new();
let mut probe_lock_exists = Vec::new();
for kind in objects {
match kind {
InputObjectKind::SharedMoveObject { id, .. } => {
Expand All @@ -374,7 +375,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
})?;
match shared_locks.get(id) {
Some(version) => {
if !self.object_exists(id, *version)? {
if !self.object_version_exists(id, *version)? {
// When this happens, other transactions that use smaller versions of
// this shared object haven't finished execution.
missing.push(ObjectKey(*id, *version));
Expand All @@ -387,19 +388,37 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
};
}
InputObjectKind::MovePackage(id) => {
if !self.object_exists(id, PACKAGE_VERSION)? {
if !self.object_version_exists(id, PACKAGE_VERSION)? {
// The cert cannot have been formed if immutable inputs were missing.
missing.push(ObjectKey(*id, PACKAGE_VERSION));
}
}
InputObjectKind::ImmOrOwnedMoveObject(objref) => {
if !self.object_exists(&objref.0, objref.1)? {
if let Some(obj) = self.get_object_by_key(&objref.0, objref.1)? {
if !obj.is_immutable() {
probe_lock_exists.push(*objref);
}
} else {
missing.push(ObjectKey::from(objref));
}
}
};
}

if !probe_lock_exists.is_empty() {
// It is possible that we probed the objects after they are written, but before the
// locks are created. In that case, if we attempt to execute the transaction, it will
// fail. Because the objects_committed() call is made only after the locks are written,
// the tx manager will be awoken after the locks are written.
missing.extend(
self.lock_service
.get_missing_locks(probe_lock_exists)
.await?
.into_iter()
.map(ObjectKey::from),
);
}

Ok(missing)
}

Expand Down Expand Up @@ -586,6 +605,12 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
.map_err(SuiError::from)
}

pub async fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
self.lock_service
.locks_exist(owned_object_refs.into())
.await
}

/// Read a lock for a specific (transaction, shared object) pair.
pub fn all_shared_locks(
&self,
Expand Down Expand Up @@ -763,6 +788,11 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
self.effects_notify_read
.notify(transaction_digest, effects.data());

// Cleanup the lock of the shared objects. This must be done after we write effects, as
// effects_exists is used as the guard to avoid re-locking objects for a previously
// executed tx. remove_shared_objects_locks.
self.remove_shared_objects_locks(transaction_digest, certificate)?;

Ok(seq)
}

Expand Down Expand Up @@ -1092,7 +1122,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
/// 2. Latest parent_sync entries for each mutated object are deleted.
/// 3. All new object states are deleted.
/// 4. owner_index table change is reverted.
pub fn revert_state_update(&self, tx_digest: &TransactionDigest) -> SuiResult {
pub async fn revert_state_update(&self, tx_digest: &TransactionDigest) -> SuiResult {
let effects = self.get_effects(tx_digest)?;
let mut write_batch = self.perpetual_tables.certificates.batch();
write_batch =
Expand Down Expand Up @@ -1149,21 +1179,33 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
.expect("version revert should never fail"),
)
});
let old_objects = self
let (old_objects, old_locks): (Vec<_>, Vec<_>) = self
.perpetual_tables
.objects
.multi_get(mutated_objects)?
.into_iter()
.map(|obj_opt| {
let obj = obj_opt.expect("Older object version not found");
let obj_ref = obj.compute_object_reference();
let lock = if obj.is_address_owned() {
Some(obj_ref)
} else {
None
};
(
(obj.owner, obj.id()),
ObjectInfo::new(&obj.compute_object_reference(), &obj),
((obj.owner, obj.id()), ObjectInfo::new(&obj_ref, &obj)),
lock,
)
});
})
.unzip();

let old_locks: Vec<_> = old_locks.into_iter().flatten().collect();

write_batch = write_batch.insert_batch(&self.perpetual_tables.owner_index, old_objects)?;

write_batch.write()?;

self.lock_service.initialize_locks(&old_locks, true).await?;
Ok(())
}

Expand All @@ -1185,6 +1227,41 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
self.perpetual_tables.get_latest_parent_entry(object_id)
}

pub fn object_exists(&self, object_id: ObjectID) -> SuiResult<bool> {
match self.get_latest_parent_entry(object_id)? {
None => Ok(false),
Some(entry) => Ok(entry.0 .2.is_alive()),
}
}

/// Remove the shared objects locks.
pub fn remove_shared_objects_locks(
&self,
transaction_digest: &TransactionDigest,
transaction: &VerifiedCertificate,
) -> SuiResult {
let mut sequenced_to_delete = Vec::new();
let mut schedule_to_delete = Vec::new();
for (object_id, _) in transaction.shared_input_objects() {
sequenced_to_delete.push((*transaction_digest, *object_id));

if !self.object_exists(*object_id)? {
schedule_to_delete.push(*object_id);
}
}
let mut write_batch = self.epoch_tables().assigned_object_versions.batch();
write_batch = write_batch.delete_batch(
&self.epoch_tables().assigned_object_versions,
sequenced_to_delete,
)?;
write_batch = write_batch.delete_batch(
&self.epoch_tables().next_object_versions,
schedule_to_delete,
)?;
write_batch.write()?;
Ok(())
}

/// Lock a sequence number for the shared objects of the input transaction based on the effects
/// of that transaction. Used by the nodes, which don't listen to consensus.
pub fn acquire_shared_locks_from_effects(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ where
}
// Assume only transient failure can happen. Permanent failure is probably
// a bug. There would be nothing that can be done for permanent failures.
warn!(tx_digest=?digest, "Failed to execute certified transaction! attempt {attempts}, {e}");
error!(tx_digest=?digest, "Failed to execute certified transaction! attempt {attempts}, {e}");
sleep(EXECUTION_FAILURE_RETRY_INTERVAL).await;
} else {
break;
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-core/src/authority_active/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub async fn test_gossip_after_revert() {
state
.database
.revert_state_update(&digests[0].transaction)
.await
.unwrap();
break;
}
Expand All @@ -99,6 +100,7 @@ pub async fn test_gossip_after_revert() {
state
.database
.revert_state_update(&digests[1].transaction)
.await
.unwrap();
}
}
Expand Down
9 changes: 8 additions & 1 deletion crates/sui-core/src/transaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::VerifiedCertificate};
use tokio::sync::mpsc::UnboundedSender;
use tracing::error;
use tracing::{debug, error};

use crate::authority::{authority_store::ObjectKey, AuthorityMetrics, AuthorityStore};

Expand Down Expand Up @@ -80,11 +80,15 @@ impl TransactionManager {
let missing = self
.authority_store
.get_missing_input_objects(&digest, &cert.data().data.input_objects()?)
.await
.expect("Are shared object locks set prior to enqueueing certificates?");

if missing.is_empty() {
debug!(tx_digest = ?digest, "certificate ready");
self.certificate_ready(cert);
continue;
} else {
debug!(tx_digest = ?digest, ?missing, "certificate waiting on missing objects");
}

for obj_key in missing {
Expand Down Expand Up @@ -142,7 +146,10 @@ impl TransactionManager {
continue;
}
};
debug!(tx_digest = ?digest, "certificate ready");
self.certificate_ready(cert);
} else {
debug!(tx_digest = ?digest, missing = ?set, "certificate waiting on missing");
}
}
self.metrics
Expand Down
10 changes: 5 additions & 5 deletions crates/sui-core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1901,7 +1901,7 @@ async fn test_store_revert_transfer_sui() {
.unwrap();

let db = &authority_state.database;
db.revert_state_update(&tx_digest).unwrap();
db.revert_state_update(&tx_digest).await.unwrap();

assert_eq!(
db.get_object(&gas_object_id).unwrap().unwrap().owner,
Expand Down Expand Up @@ -1980,7 +1980,7 @@ async fn test_store_revert_wrap_move_call() {
let wrapper_v0 = wrap_effects.created[0].0;

let db = &authority_state.database;
db.revert_state_update(&wrap_digest).unwrap();
db.revert_state_update(&wrap_digest).await.unwrap();

// The wrapped object is unwrapped once again (accessible from storage).
let object = db.get_object(&object_v0.0).unwrap().unwrap();
Expand Down Expand Up @@ -2067,7 +2067,7 @@ async fn test_store_revert_unwrap_move_call() {

let db = &authority_state.database;

db.revert_state_update(&unwrap_digest).unwrap();
db.revert_state_update(&unwrap_digest).await.unwrap();

// The unwrapped object is wrapped once again
assert!(db.get_object(&object_v0.0).unwrap().is_none());
Expand Down Expand Up @@ -2164,7 +2164,7 @@ async fn test_store_revert_add_ofield() {
assert_eq!(inner.version(), inner_v1.1);
assert_eq!(inner.owner, Owner::ObjectOwner(field_v0.0.into()));

db.revert_state_update(&add_digest).unwrap();
db.revert_state_update(&add_digest).await.unwrap();

let outer = db.get_object(&outer_v0.0).unwrap().unwrap();
assert_eq!(outer.version(), outer_v0.1);
Expand Down Expand Up @@ -2270,7 +2270,7 @@ async fn test_store_revert_remove_ofield() {
assert_eq!(inner.owner, Owner::AddressOwner(sender));
assert_eq!(inner.version(), inner_v2.1);

db.revert_state_update(&remove_ofield_digest).unwrap();
db.revert_state_update(&remove_ofield_digest).await.unwrap();

let outer = db.get_object(&outer_v0.0).unwrap().unwrap();
assert_eq!(outer.version(), outer_v1.1);
Expand Down
Loading

0 comments on commit 616d228

Please sign in to comment.