Skip to content

Commit

Permalink
Enhance/cleanup unconfirmed execution (#375)
Browse files Browse the repository at this point in the history
* Unify tx execution and cleanup storage fns
  • Loading branch information
oxade committed Feb 8, 2022
1 parent c9dab7b commit 82f1283
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 285 deletions.
17 changes: 8 additions & 9 deletions fastpay/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,12 @@ fn find_cached_owner_by_object_id(
.map(|acc| &acc.address)
}

fn show_object_effects(order_effects: OrderEffects) {
fn show_object_effects(order_info_resp: OrderInfoResponse) {
let order_effects = order_info_resp.signed_effects.unwrap().effects;

if order_effects.status != ExecutionStatus::Success {
error!("Error publishing module: {:#?}", order_effects.status);
}
if !order_effects.created.is_empty() {
println!("Created Objects:");
for (obj, _) in order_effects.created {
Expand Down Expand Up @@ -514,13 +519,7 @@ fn main() {
let pub_resp = client_state.publish(path, gas_obj_ref).await;

match pub_resp {
Ok(resp) => {
if resp.1.status != ExecutionStatus::Success {
error!("Error publishing module: {:#?}", resp.1.status);
}
let (_, effects) = resp;
show_object_effects(effects);
}
Ok(resp) => show_object_effects(resp.1),
Err(err) => error!("{:#?}", err),
}
});
Expand Down Expand Up @@ -676,7 +675,7 @@ fn main() {
.await;
info!("Starting transfer");
let time_start = Instant::now();
let cert = client_state
let (cert, _) = client_state
.transfer_object(object_id, gas_object_id, to)
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion fastpay_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ move-package = { git = "https://github.com/diem/move", rev="62b5a5075378ae6a7102
move-vm-runtime = { git = "https://github.com/diem/move", rev="62b5a5075378ae6a7102bbfc1fb33b57ba6125d2" }


typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "5221b8ff87e18a1bff2ffe930711685d54c04062"}
typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "e62e090a0864ed35ecd4a2766fdfc6f5ef18f58a"}

[dev-dependencies]
fdlimit = "0.2.1"
Expand Down
85 changes: 46 additions & 39 deletions fastpay_core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,43 +829,63 @@ where
})
}

/// Broadcast missing confirmation orders and invoke handle_order on each authority client.
async fn broadcast_and_handle_order(
/// Broadcast transaction order on each authority client.
async fn broadcast_tx_order(
&self,
order: Order,
) -> Result<(Vec<(CertifiedOrder, OrderInfoResponse)>, CertifiedOrder), anyhow::Error> {
) -> Result<(OrderInfoResponse, CertifiedOrder), anyhow::Error> {
let committee = self.committee.clone();
let (responses, votes) = self
// We are not broadcasting any confirmation orders, so certificates_to_broadcast vec is empty
let (_confirmation_responses, order_votes) = self
.broadcast_and_execute(Vec::new(), |name, authority| {
let order = order.clone();
let committee = committee.clone();
Box::pin(async move {
match authority.handle_order(order).await {
Ok(OrderInfoResponse {
signed_order: Some(inner_signed_order),
..
}) => {
fp_ensure!(
inner_signed_order.authority == name,
FastPayError::ErrorWhileProcessingTransferOrder
);
inner_signed_order.check(&committee)?;
Ok((inner_signed_order.authority, inner_signed_order.signature))
// Check if the response is okay
Ok(response) =>
// Verify we have a signed order
{
match response.clone().signed_order {
Some(inner_signed_order) => {
fp_ensure!(
inner_signed_order.authority == name,
FastPayError::ErrorWhileProcessingTransactionOrder {
err: "Signed by unexpected authority".to_string()
}
);
inner_signed_order.check(&committee)?;
Ok(response)
}
None => Err(FastPayError::ErrorWhileProcessingTransactionOrder {
err: "Invalid order response".to_string(),
}),
}
}
Err(err) => Err(err),
_ => Err(FastPayError::ErrorWhileProcessingTransferOrder),
}
})
})
.await?;
let certificate = CertifiedOrder {
order,
signatures: votes,
};
// Collate the signatures
// If we made it here, values are safe
let signatures = order_votes
.iter()
.map(|vote| {
(
vote.signed_order.as_ref().unwrap().authority,
vote.signed_order.as_ref().unwrap().signature,
)
})
.collect::<Vec<_>>();

let certificate = CertifiedOrder { order, signatures };
// Certificate is valid because
// * `communicate_with_quorum` ensured a sufficient "weight" of (non-error) answers were returned by authorities.
// * each answer is a vote signed by the expected authority.
Ok((responses, certificate))

// Assume all responses are same. Pick first
Ok((order_votes.get(0).unwrap().clone(), certificate))
}

/// Broadcast missing confirmation orders and execute provided authority action on each authority.
Expand All @@ -880,7 +900,7 @@ where
) -> Result<(Vec<(CertifiedOrder, OrderInfoResponse)>, Vec<V>), anyhow::Error>
where
F: Fn(AuthorityName, &'a A) -> AsyncResult<'a, V, FastPayError> + Send + Sync + Copy,
V: Copy,
V: Clone,
{
let result = self
.communicate_with_quorum(|name, client| {
Expand All @@ -900,7 +920,7 @@ where
})
.await?;

let action_results = result.iter().map(|(_, result)| *result).collect();
let action_results = result.iter().map(|(_, result)| result.clone()).collect();

// Assume all responses are the same, pick the first one.
let order_response = result
Expand Down Expand Up @@ -1024,7 +1044,7 @@ where
) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error> {
let new_certificate = self.execute_transaction_without_confirmation(order).await?;

// Confirm last transfer certificate if needed.
// Confirm transfer certificate if specified.
let responses = self
.broadcast_confirmation_orders(vec![new_certificate.clone()])
.await?;
Expand All @@ -1038,29 +1058,16 @@ where
Ok((new_certificate, response))
}

/// Exposing `execute_transaction_without_confirmation` as a public API
/// so that it can be called by `transfer_to_fastx_unsafe_unconfirmed`.
/// This function alone does not properly lock/unlock pending orders,
/// and calls to it should be avoided unless absolutely necessary.
/// And caller will need to properly lock/unlock pending orders.
pub async fn execute_transaction_without_confirmation_unsafe(
&self,
order: &Order,
) -> Result<CertifiedOrder, anyhow::Error> {
self.execute_transaction_without_confirmation(order).await
}

/// Execute (or retry) an order without confirmation. Update local object states using newly created certificate.
async fn execute_transaction_without_confirmation(
&self,
order: &Order,
) -> Result<CertifiedOrder, anyhow::Error> {
let result = self.broadcast_and_handle_order(order.clone()).await;
let result = self.broadcast_tx_order(order.clone()).await;

// order_info_response contains response from broadcasting old unconfirmed order, if any.
let (_order_info_responses, new_sent_certificate) = result?;
let (_, new_sent_certificate) = result?;
assert_eq!(&new_sent_certificate.order, order);
// TODO: Verify that we don't need to update client objects here based on _order_info_responses,
// TODO: Verify that we don't need to update client objects here based on order_info_responses,
// but can do it at the caller site.

Ok(new_sent_certificate)
Expand Down
Loading

1 comment on commit 82f1283

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bench results

�[0m�[0m�[1m�[32m Finished�[0m release [optimized + debuginfo] target(s) in 1.40s
�[0m�[0m�[1m�[32m Running�[0m target/release/bench
�[2m2022-02-08T01:01:16.093454Z�[0m �[32m INFO�[0m �[2mbench�[0m�[2m:�[0m Starting benchmark: OrdersAndCerts
�[2m2022-02-08T01:01:16.093477Z�[0m �[32m INFO�[0m �[2mbench�[0m�[2m:�[0m Preparing accounts.
�[2m2022-02-08T01:01:16.095526Z�[0m �[32m INFO�[0m �[2mbench�[0m�[2m:�[0m Open database on path: "/tmp/DB_8870799CB457C13ED85DBC877E1A4F16B7812EA0"
�[2m2022-02-08T01:01:20.923446Z�[0m �[32m INFO�[0m �[2mbench�[0m�[2m:�[0m Preparing transactions.
�[2m2022-02-08T01:01:30.139946Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m Listening to TCP traffic on 127.0.0.1:9555
�[2m2022-02-08T01:01:31.141408Z�[0m �[32m INFO�[0m �[2mbench�[0m�[2m:�[0m Number of TCP connections: 2
�[2m2022-02-08T01:01:31.141437Z�[0m �[32m INFO�[0m �[2mbench�[0m�[2m:�[0m Set max_in_flight to 500
�[2m2022-02-08T01:01:31.141441Z�[0m �[32m INFO�[0m �[2mbench�[0m�[2m:�[0m Sending requests.
�[2m2022-02-08T01:01:31.145526Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m Sending TCP requests to 127.0.0.1:9555
�[2m2022-02-08T01:01:31.148998Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m Sending TCP requests to 127.0.0.1:9555
�[2m2022-02-08T01:01:31.963300Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 5000 packets
�[2m2022-02-08T01:01:32.686098Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 35000
�[2m2022-02-08T01:01:32.697972Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 35000
�[2m2022-02-08T01:01:32.771590Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 10000 packets
�[2m2022-02-08T01:01:33.577894Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 15000 packets
�[2m2022-02-08T01:01:34.301173Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 30000
�[2m2022-02-08T01:01:34.369603Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 30000
�[2m2022-02-08T01:01:34.394160Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 20000 packets
�[2m2022-02-08T01:01:35.205995Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 25000 packets
�[2m2022-02-08T01:01:35.877993Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 25000
�[2m2022-02-08T01:01:35.982033Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 25000
�[2m2022-02-08T01:01:36.017966Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 30000 packets
�[2m2022-02-08T01:01:36.831074Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 35000 packets
�[2m2022-02-08T01:01:37.544295Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 20000
�[2m2022-02-08T01:01:37.621343Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 20000
�[2m2022-02-08T01:01:37.648624Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 40000 packets
�[2m2022-02-08T01:01:38.468761Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 45000 packets
�[2m2022-02-08T01:01:39.170400Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 15000
�[2m2022-02-08T01:01:39.265740Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 15000
�[2m2022-02-08T01:01:39.287254Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 50000 packets
�[2m2022-02-08T01:01:40.108057Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 55000 packets
�[2m2022-02-08T01:01:40.760684Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 10000
�[2m2022-02-08T01:01:40.939234Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 60000 packets
�[2m2022-02-08T01:01:40.956892Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 10000
�[2m2022-02-08T01:01:41.778359Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 65000 packets
�[2m2022-02-08T01:01:42.459162Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 5000
�[2m2022-02-08T01:01:42.586827Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m In flight 500 Remaining 5000
�[2m2022-02-08T01:01:42.599557Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 70000 packets
�[2m2022-02-08T01:01:43.412437Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 75000 packets
�[2m2022-02-08T01:01:44.181136Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m Done sending TCP requests to 127.0.0.1:9555
�[2m2022-02-08T01:01:44.318023Z�[0m �[32m INFO�[0m �[2mfastpay_core::authority_server�[0m�[2m:�[0m 127.0.0.1:9555 has processed 80000 packets
�[2m2022-02-08T01:01:44.361841Z�[0m �[32m INFO�[0m �[2mfastx_network::network�[0m�[2m:�[0m Done sending TCP requests to 127.0.0.1:9555
�[2m2022-02-08T01:01:44.363623Z�[0m �[32m INFO�[0m �[2mbench�[0m�[2m:�[0m Received 80000 responses.
�[2m2022-02-08T01:01:44.561671Z�[0m �[33m WARN�[0m �[2mbench�[0m�[2m:�[0m Completed benchmark for OrdersAndCerts
Total time: 13222169us, items: 40000, tx/sec: 3025.2222611887655

Please sign in to comment.