Skip to content

Commit

Permalink
Do not wait any longer once quorum threshold is reached in handle_cer…
Browse files Browse the repository at this point in the history
…tificate calls
  • Loading branch information
sadhansood committed Sep 14, 2022
1 parent 66113ea commit 0a73242
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 63 deletions.
98 changes: 55 additions & 43 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,11 @@ where
skip_all
)]
async fn sync_authority_source_to_destination<CertHandler: CertificateHandler>(
&self,
source_client: SafeClient<A>,
cert: CertifiedTransaction,
source_authority: AuthorityName,
cert_handler: &CertHandler,
) -> Result<(), SuiError> {
// TODO(panic): this panics
let source_client = self.authority_clients[&source_authority].clone();

// This represents a stack of certificates that we need to register with the
// destination authority. The stack is a LIFO queue, and therefore later insertions
// represent certificates that earlier insertions depend on. Thus updating an
Expand Down Expand Up @@ -408,31 +405,50 @@ where
cert: CertifiedTransaction,
destination_authority: AuthorityName,
retries: usize,
) -> Result<(), SuiError> {
self.sync_certificate_to_authority_with_timeout(
cert,
destination_authority,
self.timeouts.authority_request_timeout,
retries,
)
.await
timeout_period: Duration,
) -> Result<TransactionInfoResponse, SuiError> {
let committee = self.committee.clone();
let client = self
.authority_clients
.get(&destination_authority)
.unwrap()
.clone();
let authority_clients = self.authority_clients.clone();
tokio::spawn(async move {
Self::sync_certificate_to_authority_with_timeout(
&committee,
&authority_clients,
cert.clone(),
destination_authority,
timeout_period,
retries,
)
.await?;
client.handle_certificate(cert).instrument(tracing::trace_span!("handle_cert_after_sync", authority =? destination_authority, retry = true)).await
}).await.map_err(|e| SuiError::CertificateSyncError {
authority_name: destination_authority.to_string(),
err: e.to_string(),
})?
}

async fn sync_certificate_to_authority_with_timeout(
&self,
committee: &Committee,
authority_clients: &BTreeMap<AuthorityName, SafeClient<A>>,
cert: CertifiedTransaction,
destination_authority: AuthorityName,
timeout_period: Duration,
retries: usize,
) -> Result<(), SuiError> {
let cert_handler = RemoteCertificateHandler {
destination_authority,
destination_client: self.authority_clients[&destination_authority].clone(),
destination_client: authority_clients[&destination_authority].clone(),
};
debug!(cert =? cert.digest(),
dest_authority =? destination_authority,
"Syncing certificate to dest authority");
self.sync_certificate_to_authority_with_timeout_inner(
Self::sync_certificate_to_authority_with_timeout_inner(
committee,
authority_clients,
cert,
destination_authority,
&cert_handler,
Expand All @@ -450,7 +466,8 @@ where
/// the certificate. The time devoted to each attempt is bounded by
/// `timeout_milliseconds`.
async fn sync_certificate_to_authority_with_timeout_inner<CertHandler: CertificateHandler>(
&self,
committee: &Committee,
authority_clients: &BTreeMap<AuthorityName, SafeClient<A>>,
cert: CertifiedTransaction,
destination_authority: AuthorityName,
cert_handler: &CertHandler,
Expand All @@ -461,7 +478,7 @@ where
// and its full history. We should be able to use these are source authorities.
let mut candidate_source_authorties: HashSet<AuthorityName> = cert
.auth_sign_info
.authorities(&self.committee)
.authorities(committee)
.collect::<SuiResult<HashSet<_>>>()?
.iter()
.map(|&&name| name)
Expand All @@ -474,7 +491,7 @@ where
//
// TODO: add a filter parameter to sample, so that we can directly
// sample from a subset which is more efficient.
let sample_authority = self.committee.sample();
let sample_authority = committee.sample();
if candidate_source_authorties.contains(sample_authority) {
candidate_source_authorties.remove(sample_authority);
source_authorities.push(*sample_authority);
Expand All @@ -487,8 +504,9 @@ where
// Note: here we could improve this function by passing into the
// `sync_authority_source_to_destination` call a cache of
// certificates and parents to avoid re-downloading them.

let sync_fut = self.sync_authority_source_to_destination(
let source_client = authority_clients[&source_authority].clone();
let sync_fut = Self::sync_authority_source_to_destination(
source_client,
cert.clone(),
source_authority,
cert_handler,
Expand Down Expand Up @@ -516,8 +534,8 @@ where
};

// Report the error to both authority clients.
let source_client = &self.authority_clients[&source_authority];
let destination_client = &self.authority_clients[&destination_authority];
let source_client = &authority_clients[&source_authority];
let destination_client = &authority_clients[&destination_authority];

source_client.report_client_error(&inner_err);
destination_client.report_client_error(&inner_err);
Expand Down Expand Up @@ -1175,7 +1193,12 @@ where
// NOTE: this is right now done sequentially, we should do them in parallel using
// the usual FuturesUnordered.
let _result = self
.sync_certificate_to_authority(cert.clone(), name, DEFAULT_RETRIES)
.sync_certificate_to_authority(
cert.clone(),
name,
DEFAULT_RETRIES,
self.timeouts.authority_request_timeout,
)
.await;

// TODO: collect errors and propagate them to the right place
Expand Down Expand Up @@ -1422,7 +1445,7 @@ where
errors: vec![],
};

let tx_digest = certificate.digest();
let tx_digest = *certificate.digest();
let timeout_after_quorum = self.timeouts.post_quorum_timeout;

let cert_ref = &certificate;
Expand All @@ -1435,7 +1458,8 @@ where
?timeout_after_quorum,
"Broadcasting certificate to authorities"
);

let timeout = self.timeouts.authority_request_timeout;
let initial_timeout = self.timeouts.pre_quorum_timeout;
let state = self
.quorum_map_then_reduce_with_timeout(
state,
Expand Down Expand Up @@ -1476,24 +1500,16 @@ where
}

debug!(authority =? name, error =? res, ?timeout_after_quorum, "Validator out of date - syncing certificates");
// If we got LockErrors, we try to update the authority.
self
.sync_certificate_to_authority(
// If we got LockErrors, we try to update the authority asynchronously
self.sync_certificate_to_authority(
cert_ref.clone(),
name,
DEFAULT_RETRIES,
timeout,
)
.instrument(tracing::trace_span!("sync_cert", authority =? name))
.await
.map_err(|e| { info!(err =? e, "Error from sync_certificate"); e})?;

// Now try again
client
.handle_certificate(
cert_ref.clone(),
)
.instrument(tracing::trace_span!("handle_cert_after_sync", authority =? name, retry = true))
.await
.map_err(|e| { info!(err =? e, "Error from sync_certificate"); e})
})
},
|mut state, name, weight, result| {
Expand All @@ -1518,15 +1534,11 @@ where
entry.signatures.push((name, inner_effects.auth_signature.signature));

if entry.stake >= threshold {
// It will set the timeout quite high.
debug!(
tx_digest = ?tx_digest,
"Got quorum for validators handle_certificate."
);
return Ok(ReduceOutput::ContinueWithTimeout(
state,
timeout_after_quorum,
));
return Ok(ReduceOutput::End(state));
}
}
Err(err) => {
Expand All @@ -1547,7 +1559,7 @@ where
})
},
// A long timeout before we hear back from a quorum
self.timeouts.pre_quorum_timeout,
initial_timeout,
)
.await?;

Expand Down
36 changes: 24 additions & 12 deletions crates/sui-core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,19 +368,12 @@ impl AuthorityAPI for LocalAuthorityClient {
&self,
certificate: CertifiedTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
if self.fault_config.fail_before_handle_confirmation {
return Err(SuiError::GenericAuthorityError {
error: "Mock error before handle_confirmation_transaction".to_owned(),
});
}
let state = self.state.clone();
let result = state.handle_certificate(certificate).await;
if self.fault_config.fail_after_handle_confirmation {
return Err(SuiError::GenericAuthorityError {
error: "Mock error after handle_confirmation_transaction".to_owned(),
});
}
result
let cert = certificate.clone();
let fault_config = self.fault_config;
tokio::spawn(async move { Self::handle_certificate(state, cert, fault_config).await })
.await
.unwrap()
}

async fn handle_account_info_request(
Expand Down Expand Up @@ -476,6 +469,25 @@ impl LocalAuthorityClient {
fault_config: LocalAuthorityClientFaultConfig::default(),
}
}

async fn handle_certificate(
state: Arc<AuthorityState>,
certificate: CertifiedTransaction,
fault_config: LocalAuthorityClientFaultConfig,
) -> Result<TransactionInfoResponse, SuiError> {
if fault_config.fail_before_handle_confirmation {
return Err(SuiError::GenericAuthorityError {
error: "Mock error before handle_confirmation_transaction".to_owned(),
});
}
let result = state.handle_certificate(certificate).await;
if fault_config.fail_after_handle_confirmation {
return Err(SuiError::GenericAuthorityError {
error: "Mock error after handle_confirmation_transaction".to_owned(),
});
}
result
}
}

#[derive(Clone)]
Expand Down
Loading

0 comments on commit 0a73242

Please sign in to comment.