Skip to content

Commit

Permalink
Revert "Check for lock existence before attempting tx execution. (#6129
Browse files Browse the repository at this point in the history
…)"

This reverts commit 616d228.
  • Loading branch information
mwtian committed Nov 30, 2022
1 parent 3910efc commit 79100a5
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 177 deletions.
13 changes: 2 additions & 11 deletions crates/sui-benchmark/tests/simtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,7 @@ mod test {

#[sim_test(config = "test_config()")]
async fn test_simulated_load() {
let test_cluster = init_cluster_builder_env_aware()
.with_num_validators(get_var("SIM_STRESS_TEST_NUM_VALIDATORS", 4))
.build()
.await
.unwrap();
let test_cluster = init_cluster_builder_env_aware().build().await.unwrap();
let swarm = &test_cluster.swarm;
let context = &test_cluster.wallet;
let sender = test_cluster.get_address_0();
Expand Down Expand Up @@ -89,12 +85,7 @@ mod test {
);

for w in workloads.iter_mut() {
w.workload
.init(
get_var("SIM_STRESS_TEST_NUM_SHARED_OBJECTS", 5),
proxy.clone(),
)
.await;
w.workload.init(5, proxy.clone()).await;
}

let driver = BenchDriver::new(5);
Expand Down
8 changes: 0 additions & 8 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,11 +770,6 @@ impl AuthorityState {
.tap_err(|e| debug!(?tx_digest, "process_certificate failed: {e}"))
}

#[instrument(level = "trace", skip_all)]
async fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
self.database.check_owned_locks(owned_object_refs).await
}

#[instrument(level = "trace", skip_all)]
async fn check_shared_locks(
&self,
Expand Down Expand Up @@ -1046,9 +1041,6 @@ impl AuthorityState {
let (gas_status, input_objects) =
transaction_input_checker::check_certificate_input(&self.database, certificate).await?;

let owned_object_refs = input_objects.filter_owned_objects();
self.check_owned_locks(&owned_object_refs).await?;

// At this point we need to check if any shared objects need locks,
// and whether they have them.
let shared_object_refs = input_objects.filter_shared_objects();
Expand Down
97 changes: 10 additions & 87 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
.get(&ObjectKey(*object_id, version))?)
}

pub fn object_version_exists(
pub fn object_exists(
&self,
object_id: &ObjectID,
version: VersionNumber,
Expand Down Expand Up @@ -356,15 +356,14 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {

/// When making changes, please see if check_sequenced_input_objects() below needs
/// similar changes as well.
pub async fn get_missing_input_objects(
pub fn get_missing_input_objects(
&self,
digest: &TransactionDigest,
objects: &[InputObjectKind],
) -> Result<Vec<ObjectKey>, SuiError> {
let shared_locks_cell: OnceCell<HashMap<_, _>> = OnceCell::new();

let mut missing = Vec::new();
let mut probe_lock_exists = Vec::new();
for kind in objects {
match kind {
InputObjectKind::SharedMoveObject { id, .. } => {
Expand All @@ -375,7 +374,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
})?;
match shared_locks.get(id) {
Some(version) => {
if !self.object_version_exists(id, *version)? {
if !self.object_exists(id, *version)? {
// When this happens, other transactions that use smaller versions of
// this shared object haven't finished execution.
missing.push(ObjectKey(*id, *version));
Expand All @@ -388,37 +387,19 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
};
}
InputObjectKind::MovePackage(id) => {
if !self.object_version_exists(id, PACKAGE_VERSION)? {
if !self.object_exists(id, PACKAGE_VERSION)? {
// The cert cannot have been formed if immutable inputs were missing.
missing.push(ObjectKey(*id, PACKAGE_VERSION));
}
}
InputObjectKind::ImmOrOwnedMoveObject(objref) => {
if let Some(obj) = self.get_object_by_key(&objref.0, objref.1)? {
if !obj.is_immutable() {
probe_lock_exists.push(*objref);
}
} else {
if !self.object_exists(&objref.0, objref.1)? {
missing.push(ObjectKey::from(objref));
}
}
};
}

if !probe_lock_exists.is_empty() {
// It is possible that we probed the objects after they are written, but before the
// locks are created. In that case, if we attempt to execute the transaction, it will
// fail. Because the objects_committed() call is made only after the locks are written,
// the tx manager will be awoken after the locks are written.
missing.extend(
self.lock_service
.get_missing_locks(probe_lock_exists)
.await?
.into_iter()
.map(ObjectKey::from),
);
}

Ok(missing)
}

Expand Down Expand Up @@ -606,12 +587,6 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
.map_err(SuiError::from)
}

pub async fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
self.lock_service
.locks_exist(owned_object_refs.into())
.await
}

/// Read a lock for a specific (transaction, shared object) pair.
pub fn all_shared_locks(
&self,
Expand Down Expand Up @@ -789,11 +764,6 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
self.effects_notify_read
.notify(transaction_digest, effects.data());

// Cleanup the lock of the shared objects. This must be done after we write effects, as
// effects_exists is used as the guard to avoid re-locking objects for a previously
// executed tx. remove_shared_objects_locks.
self.remove_shared_objects_locks(transaction_digest, certificate)?;

Ok(seq)
}

Expand Down Expand Up @@ -1123,7 +1093,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
/// 2. Latest parent_sync entries for each mutated object are deleted.
/// 3. All new object states are deleted.
/// 4. owner_index table change is reverted.
pub async fn revert_state_update(&self, tx_digest: &TransactionDigest) -> SuiResult {
pub fn revert_state_update(&self, tx_digest: &TransactionDigest) -> SuiResult {
let effects = self.get_effects(tx_digest)?;
let mut write_batch = self.perpetual_tables.certificates.batch();
write_batch =
Expand Down Expand Up @@ -1180,33 +1150,21 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
.expect("version revert should never fail"),
)
});
let (old_objects, old_locks): (Vec<_>, Vec<_>) = self
let old_objects = self
.perpetual_tables
.objects
.multi_get(mutated_objects)?
.into_iter()
.map(|obj_opt| {
let obj = obj_opt.expect("Older object version not found");
let obj_ref = obj.compute_object_reference();
let lock = if obj.is_address_owned() {
Some(obj_ref)
} else {
None
};
(
((obj.owner, obj.id()), ObjectInfo::new(&obj_ref, &obj)),
lock,
(obj.owner, obj.id()),
ObjectInfo::new(&obj.compute_object_reference(), &obj),
)
})
.unzip();

let old_locks: Vec<_> = old_locks.into_iter().flatten().collect();

});
write_batch = write_batch.insert_batch(&self.perpetual_tables.owner_index, old_objects)?;

write_batch.write()?;

self.lock_service.initialize_locks(&old_locks, true).await?;
Ok(())
}

Expand All @@ -1228,41 +1186,6 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
self.perpetual_tables.get_latest_parent_entry(object_id)
}

pub fn object_exists(&self, object_id: ObjectID) -> SuiResult<bool> {
match self.get_latest_parent_entry(object_id)? {
None => Ok(false),
Some(entry) => Ok(entry.0 .2.is_alive()),
}
}

/// Remove the shared objects locks.
pub fn remove_shared_objects_locks(
&self,
transaction_digest: &TransactionDigest,
transaction: &VerifiedCertificate,
) -> SuiResult {
let mut sequenced_to_delete = Vec::new();
let mut schedule_to_delete = Vec::new();
for (object_id, _) in transaction.shared_input_objects() {
sequenced_to_delete.push((*transaction_digest, *object_id));

if !self.object_exists(*object_id)? {
schedule_to_delete.push(*object_id);
}
}
let mut write_batch = self.epoch_tables().assigned_object_versions.batch();
write_batch = write_batch.delete_batch(
&self.epoch_tables().assigned_object_versions,
sequenced_to_delete,
)?;
write_batch = write_batch.delete_batch(
&self.epoch_tables().next_object_versions,
schedule_to_delete,
)?;
write_batch.write()?;
Ok(())
}

/// Lock a sequence number for the shared objects of the input transaction based on the effects
/// of that transaction. Used by the nodes, which don't listen to consensus.
pub fn acquire_shared_locks_from_effects(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ where
}
// Assume only transient failure can happen. Permanent failure is probably
// a bug. There would be nothing that can be done for permanent failures.
error!(tx_digest=?digest, "Failed to execute certified transaction! attempt {attempts}, {e}");
warn!(tx_digest=?digest, "Failed to execute certified transaction! attempt {attempts}, {e}");
sleep(EXECUTION_FAILURE_RETRY_INTERVAL).await;
} else {
break;
Expand Down
2 changes: 0 additions & 2 deletions crates/sui-core/src/authority_active/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ pub async fn test_gossip_after_revert() {
state
.database
.revert_state_update(&digests[0].transaction)
.await
.unwrap();
break;
}
Expand All @@ -100,7 +99,6 @@ pub async fn test_gossip_after_revert() {
state
.database
.revert_state_update(&digests[1].transaction)
.await
.unwrap();
}
}
Expand Down
9 changes: 1 addition & 8 deletions crates/sui-core/src/transaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::VerifiedCertificate};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error};
use tracing::error;

use crate::authority::{authority_store::ObjectKey, AuthorityMetrics, AuthorityStore};

Expand Down Expand Up @@ -80,15 +80,11 @@ impl TransactionManager {
let missing = self
.authority_store
.get_missing_input_objects(&digest, &cert.data().data.input_objects()?)
.await
.expect("Are shared object locks set prior to enqueueing certificates?");

if missing.is_empty() {
debug!(tx_digest = ?digest, "certificate ready");
self.certificate_ready(cert);
continue;
} else {
debug!(tx_digest = ?digest, ?missing, "certificate waiting on missing objects");
}

for obj_key in missing {
Expand Down Expand Up @@ -146,10 +142,7 @@ impl TransactionManager {
continue;
}
};
debug!(tx_digest = ?digest, "certificate ready");
self.certificate_ready(cert);
} else {
debug!(tx_digest = ?digest, missing = ?set, "certificate waiting on missing");
}
}
self.metrics
Expand Down
10 changes: 5 additions & 5 deletions crates/sui-core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1896,7 +1896,7 @@ async fn test_store_revert_transfer_sui() {
.unwrap();

let db = &authority_state.database;
db.revert_state_update(&tx_digest).await.unwrap();
db.revert_state_update(&tx_digest).unwrap();

assert_eq!(
db.get_object(&gas_object_id).unwrap().unwrap().owner,
Expand Down Expand Up @@ -1975,7 +1975,7 @@ async fn test_store_revert_wrap_move_call() {
let wrapper_v0 = wrap_effects.created[0].0;

let db = &authority_state.database;
db.revert_state_update(&wrap_digest).await.unwrap();
db.revert_state_update(&wrap_digest).unwrap();

// The wrapped object is unwrapped once again (accessible from storage).
let object = db.get_object(&object_v0.0).unwrap().unwrap();
Expand Down Expand Up @@ -2062,7 +2062,7 @@ async fn test_store_revert_unwrap_move_call() {

let db = &authority_state.database;

db.revert_state_update(&unwrap_digest).await.unwrap();
db.revert_state_update(&unwrap_digest).unwrap();

// The unwrapped object is wrapped once again
assert!(db.get_object(&object_v0.0).unwrap().is_none());
Expand Down Expand Up @@ -2159,7 +2159,7 @@ async fn test_store_revert_add_ofield() {
assert_eq!(inner.version(), inner_v1.1);
assert_eq!(inner.owner, Owner::ObjectOwner(field_v0.0.into()));

db.revert_state_update(&add_digest).await.unwrap();
db.revert_state_update(&add_digest).unwrap();

let outer = db.get_object(&outer_v0.0).unwrap().unwrap();
assert_eq!(outer.version(), outer_v0.1);
Expand Down Expand Up @@ -2265,7 +2265,7 @@ async fn test_store_revert_remove_ofield() {
assert_eq!(inner.owner, Owner::AddressOwner(sender));
assert_eq!(inner.version(), inner_v2.1);

db.revert_state_update(&remove_ofield_digest).await.unwrap();
db.revert_state_update(&remove_ofield_digest).unwrap();

let outer = db.get_object(&outer_v0.0).unwrap().unwrap();
assert_eq!(outer.version(), outer_v1.1);
Expand Down
Loading

0 comments on commit 79100a5

Please sign in to comment.