Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not wait any longer once quorum threshold is reached in handle_certificate calls #4606

Merged
merged 1 commit into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 59 additions & 42 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,11 @@ where
skip_all
)]
async fn sync_authority_source_to_destination<CertHandler: CertificateHandler>(
&self,
source_client: SafeClient<A>,
cert: CertifiedTransaction,
source_authority: AuthorityName,
cert_handler: &CertHandler,
) -> Result<(), SuiError> {
// TODO(panic): this panics
let source_client = self.authority_clients[&source_authority].clone();

// This represents a stack of certificates that we need to register with the
// destination authority. The stack is a LIFO queue, and therefore later insertions
// represent certificates that earlier insertions depend on. Thus updating an
Expand Down Expand Up @@ -408,31 +405,55 @@ where
cert: CertifiedTransaction,
destination_authority: AuthorityName,
retries: usize,
) -> Result<(), SuiError> {
self.sync_certificate_to_authority_with_timeout(
cert,
destination_authority,
self.timeouts.authority_request_timeout,
retries,
)
.await
authority_timeout: Duration,
total_timeout: Duration,
) -> Result<TransactionInfoResponse, SuiError> {
let committee = self.committee.clone();
let client = self
.authority_clients
.get(&destination_authority)
.unwrap()
.clone();
let authority_clients = self.authority_clients.clone();
if let Ok(res) = timeout(total_timeout, tokio::spawn(async move {
Self::sync_certificate_to_authority_with_timeout(
&committee,
&authority_clients,
cert.clone(),
destination_authority,
authority_timeout,
retries,
)
.await?;
client.handle_certificate(cert).instrument(tracing::trace_span!("handle_cert_after_sync", authority =? destination_authority, retry = true)).await
})).await {
res.map_err(|e| SuiError::CertificateSyncError {
authority_name: destination_authority.to_string(),
err: e.to_string(),
})?
} else {
Err(SuiError::TimeoutError)
}
}

async fn sync_certificate_to_authority_with_timeout(
&self,
committee: &Committee,
authority_clients: &BTreeMap<AuthorityName, SafeClient<A>>,
cert: CertifiedTransaction,
destination_authority: AuthorityName,
timeout_period: Duration,
retries: usize,
) -> Result<(), SuiError> {
let cert_handler = RemoteCertificateHandler {
destination_authority,
destination_client: self.authority_clients[&destination_authority].clone(),
destination_client: authority_clients[&destination_authority].clone(),
};
debug!(cert =? cert.digest(),
dest_authority =? destination_authority,
"Syncing certificate to dest authority");
self.sync_certificate_to_authority_with_timeout_inner(
Self::sync_certificate_to_authority_with_timeout_inner(
committee,
authority_clients,
cert,
destination_authority,
&cert_handler,
Expand All @@ -450,7 +471,8 @@ where
/// the certificate. The time devoted to each attempt is bounded by
/// `timeout_milliseconds`.
async fn sync_certificate_to_authority_with_timeout_inner<CertHandler: CertificateHandler>(
&self,
committee: &Committee,
authority_clients: &BTreeMap<AuthorityName, SafeClient<A>>,
cert: CertifiedTransaction,
destination_authority: AuthorityName,
cert_handler: &CertHandler,
Expand All @@ -461,7 +483,7 @@ where
// and its full history. We should be able to use these are source authorities.
let mut candidate_source_authorties: HashSet<AuthorityName> = cert
.auth_sign_info
.authorities(&self.committee)
.authorities(committee)
.collect::<SuiResult<HashSet<_>>>()?
.iter()
.map(|&&name| name)
Expand All @@ -474,7 +496,7 @@ where
//
// TODO: add a filter parameter to sample, so that we can directly
// sample from a subset which is more efficient.
let sample_authority = self.committee.sample();
let sample_authority = committee.sample();
if candidate_source_authorties.contains(sample_authority) {
candidate_source_authorties.remove(sample_authority);
source_authorities.push(*sample_authority);
Expand All @@ -487,8 +509,9 @@ where
// Note: here we could improve this function by passing into the
// `sync_authority_source_to_destination` call a cache of
// certificates and parents to avoid re-downloading them.

let sync_fut = self.sync_authority_source_to_destination(
let source_client = authority_clients[&source_authority].clone();
let sync_fut = Self::sync_authority_source_to_destination(
source_client,
cert.clone(),
source_authority,
cert_handler,
Expand Down Expand Up @@ -516,8 +539,8 @@ where
};

// Report the error to both authority clients.
let source_client = &self.authority_clients[&source_authority];
let destination_client = &self.authority_clients[&destination_authority];
let source_client = &authority_clients[&source_authority];
let destination_client = &authority_clients[&destination_authority];

source_client.report_client_error(&inner_err);
destination_client.report_client_error(&inner_err);
Expand Down Expand Up @@ -1175,7 +1198,13 @@ where
// NOTE: this is right now done sequentially, we should do them in parallel using
// the usual FuturesUnordered.
let _result = self
.sync_certificate_to_authority(cert.clone(), name, DEFAULT_RETRIES)
.sync_certificate_to_authority(
cert.clone(),
name,
DEFAULT_RETRIES,
self.timeouts.authority_request_timeout,
self.timeouts.pre_quorum_timeout,
)
.await;

// TODO: collect errors and propagate them to the right place
Expand Down Expand Up @@ -1422,7 +1451,7 @@ where
errors: vec![],
};

let tx_digest = certificate.digest();
let tx_digest = *certificate.digest();
let timeout_after_quorum = self.timeouts.post_quorum_timeout;

let cert_ref = &certificate;
Expand All @@ -1435,7 +1464,6 @@ where
?timeout_after_quorum,
"Broadcasting certificate to authorities"
);

let state = self
.quorum_map_then_reduce_with_timeout(
state,
Expand Down Expand Up @@ -1476,24 +1504,17 @@ where
}

debug!(authority =? name, error =? res, ?timeout_after_quorum, "Validator out of date - syncing certificates");
// If we got LockErrors, we try to update the authority.
self
.sync_certificate_to_authority(
// If we got LockErrors, we try to update the authority asynchronously
self.sync_certificate_to_authority(
cert_ref.clone(),
name,
DEFAULT_RETRIES,
self.timeouts.authority_request_timeout,
self.timeouts.pre_quorum_timeout,
)
.instrument(tracing::trace_span!("sync_cert", authority =? name))
.await
.map_err(|e| { info!(err =? e, "Error from sync_certificate"); e})?;

// Now try again
client
.handle_certificate(
cert_ref.clone(),
)
.instrument(tracing::trace_span!("handle_cert_after_sync", authority =? name, retry = true))
.await
.map_err(|e| { info!(err =? e, "Error from sync_certificate"); e})
})
},
|mut state, name, weight, result| {
Expand All @@ -1518,15 +1539,11 @@ where
entry.signatures.push((name, inner_effects.auth_signature.signature));

if entry.stake >= threshold {
// It will set the timeout quite high.
debug!(
tx_digest = ?tx_digest,
"Got quorum for validators handle_certificate."
);
return Ok(ReduceOutput::ContinueWithTimeout(
state,
timeout_after_quorum,
));
return Ok(ReduceOutput::End(state));
}
}
Err(err) => {
Expand Down
36 changes: 24 additions & 12 deletions crates/sui-core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,19 +368,12 @@ impl AuthorityAPI for LocalAuthorityClient {
&self,
certificate: CertifiedTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
if self.fault_config.fail_before_handle_confirmation {
return Err(SuiError::GenericAuthorityError {
error: "Mock error before handle_confirmation_transaction".to_owned(),
});
}
let state = self.state.clone();
let result = state.handle_certificate(certificate).await;
if self.fault_config.fail_after_handle_confirmation {
return Err(SuiError::GenericAuthorityError {
error: "Mock error after handle_confirmation_transaction".to_owned(),
});
}
result
let cert = certificate.clone();
let fault_config = self.fault_config;
tokio::spawn(async move { Self::handle_certificate(state, cert, fault_config).await })
.await
.unwrap()
}

async fn handle_account_info_request(
Expand Down Expand Up @@ -476,6 +469,25 @@ impl LocalAuthorityClient {
fault_config: LocalAuthorityClientFaultConfig::default(),
}
}

async fn handle_certificate(
state: Arc<AuthorityState>,
certificate: CertifiedTransaction,
fault_config: LocalAuthorityClientFaultConfig,
) -> Result<TransactionInfoResponse, SuiError> {
if fault_config.fail_before_handle_confirmation {
return Err(SuiError::GenericAuthorityError {
error: "Mock error before handle_confirmation_transaction".to_owned(),
});
}
let result = state.handle_certificate(certificate).await;
if fault_config.fail_after_handle_confirmation {
return Err(SuiError::GenericAuthorityError {
error: "Mock error after handle_confirmation_transaction".to_owned(),
});
}
result
}
}

#[derive(Clone)]
Expand Down
Loading