From a45b305f6dbd14df6f87dfa622b1b05025118ae9 Mon Sep 17 00:00:00 2001 From: sadhan Date: Tue, 13 Sep 2022 12:57:45 -0700 Subject: [PATCH] Do not wait any longer once quorum threshold is reached in handle_certificate calls --- crates/sui-core/src/authority_aggregator.rs | 101 ++++++++++-------- crates/sui-core/src/authority_client.rs | 36 ++++--- .../unit_tests/authority_aggregator_tests.rs | 97 +++++++++++++++-- crates/sui-types/src/error.rs | 2 + 4 files changed, 173 insertions(+), 63 deletions(-) diff --git a/crates/sui-core/src/authority_aggregator.rs b/crates/sui-core/src/authority_aggregator.rs index bf63e4704d425..a4828a04be8d2 100644 --- a/crates/sui-core/src/authority_aggregator.rs +++ b/crates/sui-core/src/authority_aggregator.rs @@ -278,14 +278,11 @@ where skip_all )] async fn sync_authority_source_to_destination( - &self, + source_client: SafeClient, cert: CertifiedTransaction, source_authority: AuthorityName, cert_handler: &CertHandler, ) -> Result<(), SuiError> { - // TODO(panic): this panics - let source_client = self.authority_clients[&source_authority].clone(); - // This represents a stack of certificates that we need to register with the // destination authority. The stack is a LIFO queue, and therefore later insertions // represent certificates that earlier insertions depend on. Thus updating an @@ -408,18 +405,40 @@ where cert: CertifiedTransaction, destination_authority: AuthorityName, retries: usize, - ) -> Result<(), SuiError> { - self.sync_certificate_to_authority_with_timeout( - cert, - destination_authority, - self.timeouts.authority_request_timeout, - retries, - ) - .await + authority_timeout: Duration, + total_timeout: Duration, + ) -> Result { + let committee = self.committee.clone(); + let client = self + .authority_clients + .get(&destination_authority) + .unwrap() + .clone(); + let authority_clients = self.authority_clients.clone(); + if let Ok(res) = timeout(total_timeout, tokio::spawn(async move { + Self::sync_certificate_to_authority_with_timeout( + &committee, + &authority_clients, + cert.clone(), + destination_authority, + authority_timeout, + retries, + ) + .await?; + client.handle_certificate(cert).instrument(tracing::trace_span!("handle_cert_after_sync", authority =? destination_authority, retry = true)).await + })).await { + res.map_err(|e| SuiError::CertificateSyncError { + authority_name: destination_authority.to_string(), + err: e.to_string(), + })? + } else { + Err(SuiError::TimeoutError) + } } async fn sync_certificate_to_authority_with_timeout( - &self, + committee: &Committee, + authority_clients: &BTreeMap>, cert: CertifiedTransaction, destination_authority: AuthorityName, timeout_period: Duration, @@ -427,12 +446,14 @@ where ) -> Result<(), SuiError> { let cert_handler = RemoteCertificateHandler { destination_authority, - destination_client: self.authority_clients[&destination_authority].clone(), + destination_client: authority_clients[&destination_authority].clone(), }; debug!(cert =? cert.digest(), dest_authority =? destination_authority, "Syncing certificate to dest authority"); - self.sync_certificate_to_authority_with_timeout_inner( + Self::sync_certificate_to_authority_with_timeout_inner( + committee, + authority_clients, cert, destination_authority, &cert_handler, @@ -450,7 +471,8 @@ where /// the certificate. The time devoted to each attempt is bounded by /// `timeout_milliseconds`. async fn sync_certificate_to_authority_with_timeout_inner( - &self, + committee: &Committee, + authority_clients: &BTreeMap>, cert: CertifiedTransaction, destination_authority: AuthorityName, cert_handler: &CertHandler, @@ -461,7 +483,7 @@ where // and its full history. We should be able to use these are source authorities. let mut candidate_source_authorties: HashSet = cert .auth_sign_info - .authorities(&self.committee) + .authorities(committee) .collect::>>()? .iter() .map(|&&name| name) @@ -474,7 +496,7 @@ where // // TODO: add a filter parameter to sample, so that we can directly // sample from a subset which is more efficient. - let sample_authority = self.committee.sample(); + let sample_authority = committee.sample(); if candidate_source_authorties.contains(sample_authority) { candidate_source_authorties.remove(sample_authority); source_authorities.push(*sample_authority); @@ -487,8 +509,9 @@ where // Note: here we could improve this function by passing into the // `sync_authority_source_to_destination` call a cache of // certificates and parents to avoid re-downloading them. - - let sync_fut = self.sync_authority_source_to_destination( + let source_client = authority_clients[&source_authority].clone(); + let sync_fut = Self::sync_authority_source_to_destination( + source_client, cert.clone(), source_authority, cert_handler, @@ -516,8 +539,8 @@ where }; // Report the error to both authority clients. - let source_client = &self.authority_clients[&source_authority]; - let destination_client = &self.authority_clients[&destination_authority]; + let source_client = &authority_clients[&source_authority]; + let destination_client = &authority_clients[&destination_authority]; source_client.report_client_error(&inner_err); destination_client.report_client_error(&inner_err); @@ -1175,7 +1198,13 @@ where // NOTE: this is right now done sequentially, we should do them in parallel using // the usual FuturesUnordered. let _result = self - .sync_certificate_to_authority(cert.clone(), name, DEFAULT_RETRIES) + .sync_certificate_to_authority( + cert.clone(), + name, + DEFAULT_RETRIES, + self.timeouts.authority_request_timeout, + self.timeouts.pre_quorum_timeout, + ) .await; // TODO: collect errors and propagate them to the right place @@ -1422,7 +1451,7 @@ where errors: vec![], }; - let tx_digest = certificate.digest(); + let tx_digest = *certificate.digest(); let timeout_after_quorum = self.timeouts.post_quorum_timeout; let cert_ref = &certificate; @@ -1435,7 +1464,6 @@ where ?timeout_after_quorum, "Broadcasting certificate to authorities" ); - let state = self .quorum_map_then_reduce_with_timeout( state, @@ -1476,24 +1504,17 @@ where } debug!(authority =? name, error =? res, ?timeout_after_quorum, "Validator out of date - syncing certificates"); - // If we got LockErrors, we try to update the authority. - self - .sync_certificate_to_authority( + // If we got LockErrors, we try to update the authority asynchronously + self.sync_certificate_to_authority( cert_ref.clone(), name, DEFAULT_RETRIES, + self.timeouts.authority_request_timeout, + self.timeouts.pre_quorum_timeout, ) .instrument(tracing::trace_span!("sync_cert", authority =? name)) .await - .map_err(|e| { info!(err =? e, "Error from sync_certificate"); e})?; - - // Now try again - client - .handle_certificate( - cert_ref.clone(), - ) - .instrument(tracing::trace_span!("handle_cert_after_sync", authority =? name, retry = true)) - .await + .map_err(|e| { info!(err =? e, "Error from sync_certificate"); e}) }) }, |mut state, name, weight, result| { @@ -1518,15 +1539,11 @@ where entry.signatures.push((name, inner_effects.auth_signature.signature)); if entry.stake >= threshold { - // It will set the timeout quite high. debug!( tx_digest = ?tx_digest, "Got quorum for validators handle_certificate." ); - return Ok(ReduceOutput::ContinueWithTimeout( - state, - timeout_after_quorum, - )); + return Ok(ReduceOutput::End(state)); } } Err(err) => { diff --git a/crates/sui-core/src/authority_client.rs b/crates/sui-core/src/authority_client.rs index a01cdcdbd6cff..12ef2f5722ca3 100644 --- a/crates/sui-core/src/authority_client.rs +++ b/crates/sui-core/src/authority_client.rs @@ -368,19 +368,12 @@ impl AuthorityAPI for LocalAuthorityClient { &self, certificate: CertifiedTransaction, ) -> Result { - if self.fault_config.fail_before_handle_confirmation { - return Err(SuiError::GenericAuthorityError { - error: "Mock error before handle_confirmation_transaction".to_owned(), - }); - } let state = self.state.clone(); - let result = state.handle_certificate(certificate).await; - if self.fault_config.fail_after_handle_confirmation { - return Err(SuiError::GenericAuthorityError { - error: "Mock error after handle_confirmation_transaction".to_owned(), - }); - } - result + let cert = certificate.clone(); + let fault_config = self.fault_config; + tokio::spawn(async move { Self::handle_certificate(state, cert, fault_config).await }) + .await + .unwrap() } async fn handle_account_info_request( @@ -476,6 +469,25 @@ impl LocalAuthorityClient { fault_config: LocalAuthorityClientFaultConfig::default(), } } + + async fn handle_certificate( + state: Arc, + certificate: CertifiedTransaction, + fault_config: LocalAuthorityClientFaultConfig, + ) -> Result { + if fault_config.fail_before_handle_confirmation { + return Err(SuiError::GenericAuthorityError { + error: "Mock error before handle_confirmation_transaction".to_owned(), + }); + } + let result = state.handle_certificate(certificate).await; + if fault_config.fail_after_handle_confirmation { + return Err(SuiError::GenericAuthorityError { + error: "Mock error after handle_confirmation_transaction".to_owned(), + }); + } + result + } } #[derive(Clone)] diff --git a/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs b/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs index 895b6004b4973..486572619c8f6 100644 --- a/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs +++ b/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs @@ -1,13 +1,12 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::collections::BTreeMap; -use std::path::PathBuf; -use std::sync::{Arc, Mutex}; - use move_core_types::{account_address::AccountAddress, ident_str}; use move_package::BuildConfig; use signature::Signer; - +use std::collections::BTreeMap; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use sui_config::gateway::GatewayConfig; use sui_config::genesis::Genesis; use sui_config::ValidatorInfo; use sui_types::crypto::{ @@ -15,19 +14,52 @@ use sui_types::crypto::{ AuthorityPublicKeyBytes, SuiKeyPair, }; use sui_types::crypto::{KeypairTraits, Signature}; +use test_utils::authority::SuiNode; use sui_types::messages::*; use sui_types::object::{Object, GAS_VALUE_FOR_TESTING}; +use test_utils::authority::{spawn_test_authorities, test_and_configure_authority_configs}; use super::*; use crate::authority::AuthorityState; use crate::authority_client::{ AuthorityAPI, BatchInfoResponseItemStream, LocalAuthorityClient, - LocalAuthorityClientFaultConfig, + LocalAuthorityClientFaultConfig, NetworkAuthorityClient, NetworkAuthorityClientMetrics, }; +use crate::gateway_state::GatewayState; use tokio::time::Instant; +pub async fn init_network_authorities( + committee_size: usize, + genesis_objects: Vec, +) -> AuthorityAggregator { + let configs = test_and_configure_authority_configs(committee_size); + let _nodes: Vec = spawn_test_authorities(genesis_objects, &configs).await; + let gateway_config = GatewayConfig { + epoch: 0, + validator_set: configs.validator_set().to_vec(), + send_timeout: Duration::from_secs(4), + recv_timeout: Duration::from_secs(4), + buffer_size: 650000, + db_folder_path: PathBuf::from("/tmp/client_db"), + }; + let committee = GatewayState::make_committee(&gateway_config).unwrap(); + let epoch_store = Arc::new(EpochStore::new_for_testing(&committee)); + let auth_clients = GatewayState::make_authority_clients( + &gateway_config, + NetworkAuthorityClientMetrics::new_for_tests(), + ); + let registry = prometheus::Registry::new(); + AuthorityAggregator::new( + committee, + epoch_store, + auth_clients, + AuthAggMetrics::new(®istry), + SafeClientMetrics::new(®istry), + ) +} + pub async fn init_local_authorities( committee_size: usize, mut genesis_objects: Vec, @@ -398,6 +430,53 @@ async fn execute_transaction_with_fault_configs( Ok(()) } +/// The intent of this is to test whether client side timeouts +/// have any impact on the server execution. Turns out because +/// we spawn a tokio task on the server, client timing out and +/// terminating the connection does not stop server from completing +/// execution on its side +#[tokio::test] +async fn test_quorum_map_and_reduce_timeout() { + let build_config = BuildConfig::default(); + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("src/unit_tests/data/object_basics"); + let modules = sui_framework::build_move_package(&path, build_config).unwrap(); + let pkg = Object::new_package(modules, TransactionDigest::genesis()); + let pkg_ref = pkg.compute_object_reference(); + let (addr1, key1): (_, AccountKeyPair) = get_key_pair(); + let gas_object1 = Object::with_owner_for_testing(addr1); + let gas_ref_1 = gas_object1.compute_object_reference(); + let genesis_objects = vec![pkg, gas_object1]; + let mut authorities = init_network_authorities(4, genesis_objects).await; + let tx = crate_object_move_transaction(addr1, &key1, addr1, 100, pkg_ref, gas_ref_1); + let certified_tx = authorities.process_transaction(tx.clone()).await; + assert!(certified_tx.is_ok()); + let certificate = certified_tx.unwrap(); + // Send request with a very small timeout to trigger timeout error + authorities.timeouts.pre_quorum_timeout = Duration::from_millis(1); + authorities.timeouts.post_quorum_timeout = Duration::from_millis(1); + let certified_effects = authorities.process_certificate(certificate.clone()).await; + // Ensure it is an error + assert!(certified_effects.is_err()); + assert!(matches!( + certified_effects, + Err(SuiError::QuorumFailedToExecuteCertificate { .. }) + )); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + let tx_info = TransactionInfoRequest { + transaction_digest: *tx.digest(), + }; + for (_, client) in authorities.authority_clients.iter() { + let resp = client + .handle_transaction_info_request(tx_info.clone()) + .await; + // Server should return a signed effect even though previous calls + // failed due to timeout + assert!(resp.is_ok()); + assert!(resp.unwrap().signed_effects.is_some()); + } +} + #[tokio::test] async fn test_map_reducer() { let (authorities, _, _) = init_local_authorities(4, vec![]).await; @@ -775,8 +854,8 @@ async fn test_process_certificate() { // Test: process the certificate, including bring up to date authority 3. // which is 2 certs behind. authorities.process_certificate(cert2).await.unwrap(); - - // As a result, we have 2 gas objects and 1 created object. + // Give authority 3 enough time to process the certificate + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; let owned_object = get_owned_objects(&authorities, addr1).await; assert_eq!(3, owned_object.len()); // Check this is the latest version. @@ -820,7 +899,7 @@ async fn test_execute_cert_to_true_effects() { count += 1; } } - assert_eq!(count, 2); + assert!(count >= 2); } #[tokio::test] diff --git a/crates/sui-types/src/error.rs b/crates/sui-types/src/error.rs index fe21a98a29069..33ac193a18db3 100644 --- a/crates/sui-types/src/error.rs +++ b/crates/sui-types/src/error.rs @@ -90,6 +90,8 @@ pub enum SuiError { WrongEpoch { expected_epoch: EpochId }, #[error("Signatures in a certificate must form a quorum")] CertificateRequiresQuorum, + #[error("Authority {authority_name:?} could not sync certificate: {err:?}")] + CertificateSyncError { authority_name: String, err: String }, #[error( "The given sequence number ({given_sequence:?}) must match the next expected sequence ({expected_sequence:?}) number of the object ({object_id:?})" )]