Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JSON RPC API] - New APIs for getting object's dynamic field #5882

Merged
merged 13 commits into from
Dec 16, 2022
Merged
268 changes: 259 additions & 9 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use sui_json_rpc_types::{
SuiTransactionEffects,
};
use sui_simulator::nondeterministic;
use sui_storage::indexes::ObjectIndexChanges;
use sui_storage::write_ahead_log::WriteAheadLog;
use sui_storage::{
event_store::{EventStore, EventStoreType, StoredEvent},
Expand All @@ -56,10 +57,13 @@ use sui_storage::{
};
use sui_types::committee::EpochId;
use sui_types::crypto::{AuthorityKeyPair, NetworkKeyPair};
use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldType};
use sui_types::event::{Event, EventID};
use sui_types::gas::GasCostSummary;
use sui_types::messages_checkpoint::{CheckpointRequest, CheckpointResponse};
use sui_types::object::{Owner, PastObjectRead};
use sui_types::query::{EventQuery, TransactionQuery};
use sui_types::storage::WriteKind;
use sui_types::sui_system_state::SuiSystemState;
use sui_types::temporary_store::InnerTemporaryStore;
pub use sui_types::temporary_store::TemporaryStore;
Expand Down Expand Up @@ -99,8 +103,6 @@ use crate::{
transaction_streamer::TransactionStreamer,
};

use sui_types::gas::GasCostSummary;

use self::authority_store::ObjectKey;

#[cfg(test)]
Expand Down Expand Up @@ -1254,6 +1256,10 @@ impl AuthorityState {
effects: &SignedTransactionEffects,
timestamp_ms: u64,
) -> SuiResult {
let changes = self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just learning here, when does index_tx get run? This is some separate service or something right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this runs in fullnode (@gegaowp 's indexer work will probably move this to the indexer), there are 2 code path leading to this method, one is from the node_sync when the fullnode receive new transactions; another path is from the TransactiondOrchestrator which updates the local store after successful tx execution.

.process_object_index(effects)
.tap_err(|e| warn!("{e}"))?;

indexes.index_tx(
cert.sender_address(),
cert.data()
Expand All @@ -1272,12 +1278,163 @@ impl AuthorityState {
.move_calls()
.iter()
.map(|mc| (mc.package.0, mc.module.clone(), mc.function.clone())),
changes,
seq,
digest,
timestamp_ms,
)
}

fn process_object_index(
&self,
effects: &SignedTransactionEffects,
) -> Result<ObjectIndexChanges, SuiError> {
let modified_at_version = effects
.modified_at_versions
.iter()
.cloned()
.collect::<HashMap<_, _>>();

let mut deleted_owners = vec![];
let mut deleted_dynamic_fields = vec![];
for (id, _, _) in &effects.deleted {
let Some(old_version) = modified_at_version.get(id) else{
error!("Error processing object owner index for tx [{}], cannot find modified at version for deleted object [{id}].", effects.transaction_digest);
continue;
};
match self.get_owner_at_version(id, *old_version)? {
Owner::AddressOwner(addr) => deleted_owners.push((addr, *id)),
Owner::ObjectOwner(object_id) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want to filter for the type here too. I don't think you want this event/index for the secondary object in the dynamic object field case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, we need to check that the object's type is sui::dynamic_field::Field, otherwise it could be some other object being used with sui::dynamic_object_field

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reviewing but leaving these comments here as I go: Do we need a type filter here if we are filtering on creation?

Copy link
Contributor Author

@patrickkuo patrickkuo Dec 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! this will remove unnecessary seek and delete ops on db

Copy link
Contributor Author

@patrickkuo patrickkuo Dec 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually after looking at the code (this was done quite a while ago 😅), it will require an extra DB read to retrieve the object type because TransactionEffects only contains ObjectRef, which will be more expensive because it will deserialise the whole Object from storage. We could do this without the db read after this PR which adds ObjectType to the effects.

I will add a TODO for now.

deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
}
_ => {}
}
}

let mut new_owners = vec![];
let mut new_dynamic_fields = vec![];

for (oref, owner, kind) in effects.all_mutated() {
let id = &oref.0;
// For mutated objects, retrieve old owner and delete old index if there is a owner change.
if let WriteKind::Mutate = kind {
let Some(old_version) = modified_at_version.get(id) else{
error!("Error processing object owner index for tx [{}], cannot find modified at version for mutated object [{id}].", effects.transaction_digest);
continue;
};
let Some(old_object) = self.database.get_object_by_key(id, *old_version)? else {
error!("Error processing object owner index for tx [{}], cannot find object [{id}] at version [{old_version}].", effects.transaction_digest);
continue;
};
if &old_object.owner != owner {
match old_object.owner {
Owner::AddressOwner(addr) => {
deleted_owners.push((addr, *id));
}
Owner::ObjectOwner(object_id) => {
deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same type filtering issue as above

}
_ => {}
}
}
}

match owner {
Owner::AddressOwner(addr) => {
// TODO: We can remove the object fetching after we added ObjectType to TransactionEffects
let Some(o) = self.database.get_object_by_key(id, oref.1)? else{
continue;
};

let type_ = o
.type_()
.map(|type_| ObjectType::Struct(type_.clone()))
.unwrap_or(ObjectType::Package);

new_owners.push((
(*addr, *id),
ObjectInfo {
object_id: *id,
version: oref.1,
digest: oref.2,
type_,
owner: *owner,
previous_transaction: effects.transaction_digest,
},
));
}
Owner::ObjectOwner(owner) => {
let Some(o) = self.database.get_object_by_key(&oref.0, oref.1)? else{
continue;
};
let Some(df_info) = self.try_create_dynamic_field_info(o)? else{
// Skip indexing for non dynamic field objects.
continue;
};
new_dynamic_fields.push(((ObjectID::from(*owner), *id), df_info))
}
_ => {}
}
}

Ok(ObjectIndexChanges {
deleted_owners,
deleted_dynamic_fields,
new_owners,
new_dynamic_fields,
})
}

fn try_create_dynamic_field_info(&self, o: Object) -> SuiResult<Option<DynamicFieldInfo>> {
// Skip if not a move object
let Some(move_object) = o.data.try_as_move().cloned() else {
return Ok(None);
};
// We only index dynamic field objects
if !DynamicFieldInfo::is_dynamic_field(&move_object.type_) {
return Ok(None);
}
let move_struct = move_object.to_move_struct_with_resolver(
ObjectFormatOptions::default(),
self.module_cache.as_ref(),
)?;

let (name, type_, object_id) =
DynamicFieldInfo::parse_move_object(&move_struct).tap_err(|e| warn!("{e}"))?;

Ok(Some(match type_ {
DynamicFieldType::DynamicObject => {
// Find the actual object from storage using the object id obtained from the wrapper.
let Some(object) = self.database.find_object_lt_or_eq_version(object_id, o.version()) else{
return Err(SuiError::ObjectNotFound {
object_id,
version: Some(o.version()),
})
};
let version = object.version();
let digest = object.digest();
let object_type = object.data.type_().unwrap();

DynamicFieldInfo {
name,
type_,
object_type: object_type.to_string(),
object_id,
version,
digest,
}
}
DynamicFieldType::DynamicField { .. } => DynamicFieldInfo {
name,
type_,
object_type: move_object.type_.type_params[1].to_string(),
object_id: o.id(),
version: o.version(),
digest: o.digest(),
},
}))
}

#[instrument(level = "debug", skip_all, fields(seq=?seq, tx_digest=?digest), err)]
async fn post_process_one_tx(
&self,
Expand Down Expand Up @@ -1613,6 +1770,10 @@ impl AuthorityState {
let authority_state = Arc::downgrade(&state);
spawn_monitored_task!(execution_process(authority_state, rx_ready_certificates));

state
.create_owner_index_if_empty()
.expect("Error indexing genesis objects.");

state
}

Expand Down Expand Up @@ -1663,19 +1824,29 @@ impl AuthorityState {
None,
));

let index_store = Some(Arc::new(IndexStore::open_tables_read_write(
path.join("indexes"),
None,
None,
)));

// add the object_basics module
AuthorityState::new(
let state = AuthorityState::new(
secret.public().into(),
secret.clone(),
store,
node_sync_store,
epochs,
None,
index_store,
None,
None,
&Registry::new(),
)
.await
.await;

state.create_owner_index_if_empty().unwrap();

state
}

/// Adds certificates to the pending certificate store and transaction manager for ordered execution.
Expand Down Expand Up @@ -1724,6 +1895,38 @@ impl AuthorityState {
Ok(())
}

fn create_owner_index_if_empty(&self) -> SuiResult {
let Some(index_store) = &self.indexes else{
return Ok(())
};

let mut new_owners = vec![];
let mut new_dynamic_fields = vec![];
for (_, o) in self.database.perpetual_tables.objects.iter() {
match o.owner {
Owner::AddressOwner(addr) => new_owners.push((
(addr, o.id()),
ObjectInfo::new(&o.compute_object_reference(), &o),
)),
Owner::ObjectOwner(object_id) => {
let id = o.id();
let Some(info) = self.try_create_dynamic_field_info(o)? else{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not important, but I've been noticing on my PRs too, it doesn't seem like rustfmt likes let-else. There isn't a space in else{

Just noting and summoning my normal person for Rust questions, @bmwill. Do we need to do something else to configure rustfmt to handle let-else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, unfortunately the let-else feature was stabilized without the corresponding formatting being added to rustfmt

continue;
};
new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
}
_ => {}
}
}

index_store.insert_genesis_objects(ObjectIndexChanges {
deleted_owners: vec![],
deleted_dynamic_fields: vec![],
new_owners,
new_dynamic_fields,
})
}

pub fn reconfigure(&self, new_committee: Committee) -> SuiResult {
// TODO: We should move the committee into epoch db store, so that the operation below
// can become atomic.
Expand Down Expand Up @@ -1861,15 +2064,62 @@ impl AuthorityState {
}
}

pub fn get_owner_objects(&self, owner: Owner) -> SuiResult<Vec<ObjectInfo>> {
self.database.get_owner_objects(owner)
fn get_owner_at_version(
&self,
object_id: &ObjectID,
version: SequenceNumber,
) -> Result<Owner, SuiError> {
self.database
.get_object_by_key(object_id, version)?
.ok_or(SuiError::ObjectNotFound {
object_id: *object_id,
version: Some(version),
})
.map(|o| o.owner)
}

pub fn get_owner_objects(&self, owner: SuiAddress) -> SuiResult<Vec<ObjectInfo>> {
if let Some(indexes) = &self.indexes {
indexes.get_owner_objects(owner)
} else {
Err(SuiError::IndexStoreNotAvailable)
}
}

pub fn get_owner_objects_iterator(
&self,
owner: Owner,
owner: SuiAddress,
) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
self.database.get_owner_objects_iterator(owner)
if let Some(indexes) = &self.indexes {
indexes.get_owner_objects_iterator(owner)
} else {
Err(SuiError::IndexStoreNotAvailable)
}
}

pub fn get_dynamic_fields(
&self,
owner: ObjectID,
cursor: Option<ObjectID>,
limit: usize,
) -> SuiResult<Vec<DynamicFieldInfo>> {
if let Some(indexes) = &self.indexes {
indexes.get_dynamic_fields(owner, cursor, limit)
} else {
Err(SuiError::IndexStoreNotAvailable)
}
}

pub fn get_dynamic_field_object_id(
&self,
owner: ObjectID,
name: &str,
) -> SuiResult<Option<ObjectID>> {
if let Some(indexes) = &self.indexes {
indexes.get_dynamic_field_object_id(owner, name)
} else {
Err(SuiError::IndexStoreNotAvailable)
}
}

pub fn get_total_transaction_number(&self) -> Result<u64, anyhow::Error> {
Expand Down
12 changes: 12 additions & 0 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,18 @@ impl AuthorityStore {
self.lock_service.initialize_locks(&old_locks, true).await?;
Ok(())
}
/// Return the object with version less then or eq to the provided seq number.
/// This is used by indexer to find the correct version of dynamic field child object.
/// We do not store the version of the child object, but because of lamport timestamp,
/// we know the child must have version number less then or eq to the parent.
pub fn find_object_lt_or_eq_version(
&self,
object_id: ObjectID,
version: SequenceNumber,
) -> Option<Object> {
self.perpetual_tables
.find_object_lt_or_eq_version(object_id, version)
}

/// Returns the last entry we have for this object in the parents_sync index used
/// to facilitate client and authority sync. In turn the latest entry provides the
Expand Down
16 changes: 16 additions & 0 deletions crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ impl AuthorityPerpetualTables {
}
}

// This is used by indexer to find the correct version of dynamic field child object.
// We do not store the version of the child object, but because of lamport timestamp,
// we know the child must have version number less then or eq to the parent.
pub fn find_object_lt_or_eq_version(
&self,
object_id: ObjectID,
version: SequenceNumber,
) -> Option<Object> {
let Ok(iter) = self.objects
.iter()
.skip_prior_to(&ObjectKey(object_id, version))else {
return None
};
iter.reverse().next().map(|(_, o)| o)
}

pub fn get_latest_parent_entry(
&self,
object_id: ObjectID,
Expand Down
Loading