Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fastx client / auth] Add OrderInfoRequest to authority and use it for more robust sync. #311

Merged
merged 9 commits into from
Jan 31, 2022
Merged
6 changes: 6 additions & 0 deletions fastpay/src/server_lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ impl MessageHandler for RunningServerState {
.handle_object_info_request(*message)
.await
.map(|info| Some(serialize_object_info_response(&info))),
SerializedMessage::OrderInfoReq(message) => self
.server
.state
.handle_order_info_request(*message)
.await
.map(|info| Some(serialize_order_info(&info))),
_ => Err(FastPayError::UnexpectedMessage),
}
}
Expand Down
18 changes: 17 additions & 1 deletion fastpay_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,26 @@ impl AuthorityState {
.map(|(_, object)| object)
.collect();

let mut transaction_dependencies: BTreeSet<_> = inputs
.iter()
.map(|object| object.previous_transaction)
.collect();

// Insert into the certificates map
let mut tx_ctx = TxContext::new(order.sender(), transaction_digest);

let gas_object_id = *order.gas_payment_object_id();
let (temporary_store, status) = self.execute_order(order, inputs, &mut tx_ctx)?;

// Remove from dependencies the generic hash
transaction_dependencies.remove(&TransactionDigest::genesis());

// Update the database in an atomic manner
let to_signed_effects = temporary_store.to_signed_effects(
&self.name,
&self.secret,
&transaction_digest,
transaction_dependencies.into_iter().collect(),
status,
&gas_object_id,
);
Expand All @@ -259,7 +268,7 @@ impl AuthorityState {
mut inputs: Vec<Object>,
tx_ctx: &mut TxContext,
) -> FastPayResult<(AuthorityTemporaryStore, ExecutionStatus)> {
let mut temporary_store = AuthorityTemporaryStore::new(self, &inputs);
let mut temporary_store = AuthorityTemporaryStore::new(self, &inputs, tx_ctx.digest());
// unwraps here are safe because we built `inputs`
let mut gas_object = inputs.pop().unwrap();

Expand Down Expand Up @@ -345,6 +354,13 @@ impl AuthorityState {
Ok(ExecutionStatus::Success)
}

pub async fn handle_order_info_request(
&self,
request: OrderInfoRequest,
) -> Result<OrderInfoResponse, FastPayError> {
self.make_order_info(&request.transaction_digest).await
}

pub async fn handle_account_info_request(
&self,
request: AccountInfoRequest,
Expand Down
10 changes: 9 additions & 1 deletion fastpay_core/src/authority/temporary_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub type InnerTemporaryStore = (

pub struct AuthorityTemporaryStore {
object_store: Arc<AuthorityStore>,
tx_digest: TransactionDigest,
objects: BTreeMap<ObjectID, Object>,
active_inputs: Vec<ObjectRef>, // Inputs that are not read only
// TODO: We need to study whether it's worth to optimize the lookup of
Expand All @@ -29,9 +30,11 @@ impl AuthorityTemporaryStore {
pub fn new(
authority_state: &AuthorityState,
_input_objects: &'_ [Object],
tx_digest: TransactionDigest,
) -> AuthorityTemporaryStore {
AuthorityTemporaryStore {
object_store: authority_state._database.clone(),
tx_digest,
objects: _input_objects.iter().map(|v| (v.id(), v.clone())).collect(),
active_inputs: _input_objects
.iter()
Expand Down Expand Up @@ -92,6 +95,7 @@ impl AuthorityTemporaryStore {
authority_name: &AuthorityName,
secret: &KeyPair,
transaction_digest: &TransactionDigest,
transaction_dependencies: Vec<TransactionDigest>,
status: ExecutionStatus,
gas_object_id: &ObjectID,
) -> SignedOrderEffects {
Expand Down Expand Up @@ -119,6 +123,7 @@ impl AuthorityTemporaryStore {
.collect(),
gas_object: (gas_object.to_object_reference(), gas_object.owner),
events: self.events.clone(),
dependencies: transaction_dependencies,
};
let signature = Signature::new(&effects, secret);

Expand Down Expand Up @@ -196,7 +201,7 @@ impl Storage for AuthorityTemporaryStore {
caller.
*/

fn write_object(&mut self, object: Object) {
fn write_object(&mut self, mut object: Object) {
// Check it is not read-only
#[cfg(test)] // Movevm should ensure this
if let Some(existing_object) = self.read_object(&object.id()) {
Expand All @@ -207,6 +212,9 @@ impl Storage for AuthorityTemporaryStore {
}
}

// The adapter is not very disciplined at filling in the correct
// previous transaction digest, so we ensure it is correct here.
object.previous_transaction = self.tx_digest;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Doesn't have to be here, but I think we should also move the increment of version here to ensure that all the metadata update happens in one place (and I'm happy to tackle this--should be a small change). That will get us closer to a coherent discipline.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only worry is that objects before they are written to the temp_store are going around with the incorrect previous_transaction. As per issue: #312

self.written.insert(object.id(), object);
}

Expand Down
90 changes: 63 additions & 27 deletions fastpay_core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ pub trait AuthorityClient {
&self,
request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, FastPayError>;

/// Handle Object information requests for this account.
async fn handle_order_info_request(
&self,
request: OrderInfoRequest,
) -> Result<OrderInfoResponse, FastPayError>;
}

#[async_trait]
Expand Down Expand Up @@ -99,6 +105,18 @@ impl AuthorityClient for network::Client {
)
.await
}

/// Handle Object information requests for this account.
async fn handle_order_info_request(
&self,
request: OrderInfoRequest,
) -> Result<OrderInfoResponse, FastPayError> {
self.send_recv_bytes(
serialize_order_info_request(&request),
order_info_deserializer,
)
.await
}
}

pub struct ClientState<AuthorityClient> {
Expand Down Expand Up @@ -402,7 +420,7 @@ where
source_authority: AuthorityName,
destination_authority: AuthorityName,
) -> Result<(), FastPayError> {
let source_client = self.authority_clients[&source_authority].clone();
let mut source_client = self.authority_clients[&source_authority].clone();
let mut destination_client = self.authority_clients[&destination_authority].clone();

// This represents a stack of certificates that we need to register with the
Expand Down Expand Up @@ -444,40 +462,58 @@ where
// TODO: Eventually the client will store more information, and we could
// first try to read certificates and parents from a local cache before
// asking an authority.
let input_objects = target_cert.certificate.order.input_objects();
// let input_objects = target_cert.certificate.order.input_objects();

let order_info = if missing_certificates.is_empty() {
// Here we cover a corner case due to the nature of using consistent
// broadcast: it is possible for the client to have a certificate
// signed by some authority, before the authority has processed the
// certificate. This can only happen to a certificate for objects
// not used in another certificicate, hence it can only be the case
// for the very first certificate we try to sync. For this reason for
// this one instead of asking for the effects of a previous execution
// we send the cert for execution. Since execution is idempotent this
// is ok.

source_client
.handle_confirmation_order(target_cert.clone())
.await?
} else {
// Unlike the previous case if a certificate created an object that
// was involved in the processing of another certificate the previous
// cert must have been processed, so here we just ask for the effects
// of such an execution.

source_client
.handle_order_info_request(OrderInfoRequest {
transaction_digest: cert_digest,
})
.await?
};

// Put back the target cert
missing_certificates.push(target_cert);
let signed_effects = &order_info
.signed_effects
.ok_or(FastPayError::AuthorityInformationUnavailable)?;

for object_kind in input_objects {
// Request the parent certificate from the authority.
let object_info_response = source_client
.handle_object_info_request(ObjectInfoRequest {
object_id: object_kind.object_id(),
request_sequence_number: Some(object_kind.version()),
})
.await;

let object_info = match object_info_response {
Ok(object_info) => object_info,
// Here we cover the case the object genuinely has no parent.
Err(FastPayError::ParentNotfound { .. }) => {
continue;
}
Err(e) => return Err(e),
};

let returned_certificate = object_info
.parent_certificate
.ok_or(FastPayError::AuthorityInformationUnavailable)?;
let returned_digest = returned_certificate.order.digest();

for returned_digest in &signed_effects.effects.dependencies {
// We check that we are not processing twice the same certificate, as
// it would be common if two objects used by one order, were also both
// mutated by the same preceeding order.
if !candidate_certificates.contains(&returned_digest) {
if !candidate_certificates.contains(returned_digest) {
// Add this cert to the set we have processed
candidate_certificates.insert(returned_digest);
candidate_certificates.insert(*returned_digest);

let inner_order_info = source_client
.handle_order_info_request(OrderInfoRequest {
transaction_digest: *returned_digest,
})
.await?;

let returned_certificate = inner_order_info
.certified_order
.ok_or(FastPayError::AuthorityInformationUnavailable)?;

// Check & Add it to the list of certificates to sync
returned_certificate.check(&self.committee).map_err(|_| {
Expand Down
10 changes: 10 additions & 0 deletions fastpay_core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,16 @@ async fn test_handle_confirmation_order_idempotent() {

// this is valid because we're checking the authority state does not change the certificate
compare_order_info_responses(&info, &info2);

// Now check the order info request is also the same
let info3 = authority_state
.handle_order_info_request(OrderInfoRequest {
transaction_digest: certified_transfer_order.order.digest(),
})
.await
.unwrap();

compare_order_info_responses(&info, &info3);
}

#[tokio::test]
Expand Down
Loading