Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance/cleanup unconfirmed execution #375

Merged
merged 9 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions fastpay/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,12 @@ fn find_cached_owner_by_object_id(
.map(|acc| &acc.address)
}

fn show_object_effects(order_effects: OrderEffects) {
fn show_object_effects(order_info_resp: OrderInfoResponse) {
let order_effects = order_info_resp.signed_effects.unwrap().effects;

if order_effects.status != ExecutionStatus::Success {
error!("Error publishing module: {:#?}", order_effects.status);
}
if !order_effects.created.is_empty() {
println!("Created Objects:");
for (obj, _) in order_effects.created {
Expand Down Expand Up @@ -514,13 +519,7 @@ fn main() {
let pub_resp = client_state.publish(path, gas_obj_ref).await;

match pub_resp {
Ok(resp) => {
if resp.1.status != ExecutionStatus::Success {
error!("Error publishing module: {:#?}", resp.1.status);
}
let (_, effects) = resp;
show_object_effects(effects);
}
Ok(resp) => show_object_effects(resp.1),
Err(err) => error!("{:#?}", err),
}
});
Expand Down Expand Up @@ -676,7 +675,7 @@ fn main() {
.await;
info!("Starting transfer");
let time_start = Instant::now();
let cert = client_state
let (cert, _) = client_state
.transfer_object(object_id, gas_object_id, to)
.await
.unwrap();
Expand All @@ -699,7 +698,10 @@ fn main() {
recv_timeout,
)
.await;
recipient_client_state.receive_object(&cert).await.unwrap();
recipient_client_state
.update_state_by_certificate(&cert)
.await
.unwrap();
accounts_config.update_from_state(&recipient_client_state);
accounts_config
.write(accounts_config_path)
Expand Down
2 changes: 1 addition & 1 deletion fastpay_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ move-package = { git = "https://github.com/diem/move", rev="62b5a5075378ae6a7102
move-vm-runtime = { git = "https://github.com/diem/move", rev="62b5a5075378ae6a7102bbfc1fb33b57ba6125d2" }


typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "5221b8ff87e18a1bff2ffe930711685d54c04062"}
typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "e62e090a0864ed35ecd4a2766fdfc6f5ef18f58a"}

[dev-dependencies]
fdlimit = "0.2.1"
Expand Down
91 changes: 49 additions & 42 deletions fastpay_core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,43 +829,63 @@ where
})
}

/// Broadcast missing confirmation orders and invoke handle_order on each authority client.
async fn broadcast_and_handle_order(
/// Broadcast transaction order on each authority client.
async fn broadcast_tx_order(
&self,
order: Order,
) -> Result<(Vec<(CertifiedOrder, OrderInfoResponse)>, CertifiedOrder), anyhow::Error> {
) -> Result<(OrderInfoResponse, CertifiedOrder), anyhow::Error> {
let committee = self.committee.clone();
let (responses, votes) = self
// We are not broadcasting any confirmation orders, so certificates_to_broadcast vec is empty
let (_confirmation_responses, order_votes) = self
.broadcast_and_execute(Vec::new(), |name, authority| {
let order = order.clone();
let committee = committee.clone();
Box::pin(async move {
match authority.handle_order(order).await {
Ok(OrderInfoResponse {
signed_order: Some(inner_signed_order),
..
}) => {
fp_ensure!(
inner_signed_order.authority == name,
FastPayError::ErrorWhileProcessingTransferOrder
);
inner_signed_order.check(&committee)?;
Ok((inner_signed_order.authority, inner_signed_order.signature))
// Check if the response is okay
Ok(response) =>
// Verify we have a signed order
{
match response.clone().signed_order {
Some(inner_signed_order) => {
fp_ensure!(
inner_signed_order.authority == name,
FastPayError::ErrorWhileProcessingTransactionOrder {
err: "Signed by unexpected authority".to_string()
}
);
inner_signed_order.check(&committee)?;
Ok(response)
}
None => Err(FastPayError::ErrorWhileProcessingTransactionOrder {
err: "Invalid order response".to_string(),
}),
oxade marked this conversation as resolved.
Show resolved Hide resolved
}
}
Err(err) => Err(err),
_ => Err(FastPayError::ErrorWhileProcessingTransferOrder),
}
})
})
.await?;
let certificate = CertifiedOrder {
order,
signatures: votes,
};
// Collate the signatures
// If we made it here, values are safe
let signatures = order_votes
.iter()
.map(|vote| {
(
vote.signed_order.as_ref().unwrap().authority,
vote.signed_order.as_ref().unwrap().signature,
)
})
.collect::<Vec<_>>();

let certificate = CertifiedOrder { order, signatures };
Comment on lines +873 to +882
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why do we have to do this here instead of keeping the old logic (match returns the pairs)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The responses from the old code will always be empty because it's the output of the catch-up confirmation steps, which tx_order does not do anymore. It's a bit misleading.

// Certificate is valid because
// * `communicate_with_quorum` ensured a sufficient "weight" of (non-error) answers were returned by authorities.
// * each answer is a vote signed by the expected authority.
Ok((responses, certificate))

// Assume all responses are same. Pick first
oxade marked this conversation as resolved.
Show resolved Hide resolved
Ok((order_votes.get(0).unwrap().clone(), certificate))
}

/// Broadcast missing confirmation orders and execute provided authority action on each authority.
Expand All @@ -880,7 +900,7 @@ where
) -> Result<(Vec<(CertifiedOrder, OrderInfoResponse)>, Vec<V>), anyhow::Error>
where
F: Fn(AuthorityName, &'a A) -> AsyncResult<'a, V, FastPayError> + Send + Sync + Copy,
V: Copy,
V: Clone,
{
let result = self
.communicate_with_quorum(|name, client| {
Expand All @@ -900,7 +920,7 @@ where
})
.await?;

let action_results = result.iter().map(|(_, result)| *result).collect();
let action_results = result.iter().map(|(_, result)| result.clone()).collect();

// Assume all responses are the same, pick the first one.
let order_response = result
Expand Down Expand Up @@ -1022,9 +1042,9 @@ where
&self,
order: &Order,
) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error> {
let new_certificate = self.execute_transaction_without_confirmation(order).await?;
let (new_certificate, _) = self.execute_transaction_without_confirmation(order).await?;

// Confirm last transfer certificate if needed.
// Confirm transfer certificate if specified.
let responses = self
.broadcast_confirmation_orders(vec![new_certificate.clone()])
.await?;
Expand All @@ -1038,32 +1058,19 @@ where
Ok((new_certificate, response))
}

/// Exposing `execute_transaction_without_confirmation` as a public API
/// so that it can be called by `transfer_to_fastx_unsafe_unconfirmed`.
/// This function alone does not properly lock/unlock pending orders,
/// and calls to it should be avoided unless absolutely necessary.
/// And caller will need to properly lock/unlock pending orders.
pub async fn execute_transaction_without_confirmation_unsafe(
&self,
order: &Order,
) -> Result<CertifiedOrder, anyhow::Error> {
self.execute_transaction_without_confirmation(order).await
}

/// Execute (or retry) an order without confirmation. Update local object states using newly created certificate.
async fn execute_transaction_without_confirmation(
&self,
order: &Order,
) -> Result<CertifiedOrder, anyhow::Error> {
let result = self.broadcast_and_handle_order(order.clone()).await;
) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error> {
oxade marked this conversation as resolved.
Show resolved Hide resolved
let result = self.broadcast_tx_order(order.clone()).await;

// order_info_response contains response from broadcasting old unconfirmed order, if any.
let (_order_info_responses, new_sent_certificate) = result?;
let (order_info_responses, new_sent_certificate) = result?;
assert_eq!(&new_sent_certificate.order, order);
// TODO: Verify that we don't need to update client objects here based on _order_info_responses,
// TODO: Verify that we don't need to update client objects here based on order_info_responses,
// but can do it at the caller site.

Ok(new_sent_certificate)
Ok((new_sent_certificate, order_info_responses))
}

// TODO: This is incomplete at the moment.
Expand Down
Loading