Skip to content

Commit

Permalink
perf: adjust blocking tasks in the primary
Browse files Browse the repository at this point in the history
Signed-off-by: ljedrz <ljedrz@gmail.com>
  • Loading branch information
ljedrz committed Feb 23, 2024
1 parent e9533b6 commit 32f3774
Showing 1 changed file with 26 additions and 19 deletions.
45 changes: 26 additions & 19 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ impl<N: Network> Primary<N> {
let BatchSignature { batch_id, signature } = batch_signature;

// Retrieve the signer.
let signer = spawn_blocking!(Ok(signature.to_address()))?;
let signer = signature.to_address();

// Ensure the batch signature is signed by the validator.
if self.gateway.resolver().get_address(peer_ip).map_or(true, |address| address != signer) {
Expand All @@ -643,15 +643,16 @@ impl<N: Network> Primary<N> {
bail!("Invalid peer - received a batch signature from myself ({signer})");
}

let proposal = {
let self_ = self.clone();
let Some(proposal) = spawn_blocking!({
// Acquire the write lock.
let mut proposed_batch = self.proposed_batch.write();
let mut proposed_batch = self_.proposed_batch.write();
// Add the signature to the batch, and determine if the batch is ready to be certified.
match proposed_batch.as_mut() {
Some(proposal) => {
// Ensure the batch ID matches the currently proposed batch ID.
if proposal.batch_id() != batch_id {
match self.storage.contains_batch(batch_id) {
match self_.storage.contains_batch(batch_id) {
true => bail!("This batch was already certified"),
false => bail!(
"Unknown batch ID '{batch_id}', expected '{}' for round {}",
Expand All @@ -661,9 +662,9 @@ impl<N: Network> Primary<N> {
}
}
// Retrieve the committee lookback for the round.
let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
// Retrieve the address of the validator.
let Some(signer) = self.gateway.resolver().get_address(peer_ip) else {
let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
bail!("Signature is from a disconnected validator");
};
// Add the signature to the batch.
Expand All @@ -672,17 +673,20 @@ impl<N: Network> Primary<N> {
// Check if the batch is ready to be certified.
if !proposal.is_quorum_threshold_reached(&committee_lookback) {
// If the batch is not ready to be certified, return early.
return Ok(());
return Ok(None);
}
}
// There is no proposed batch, so return early.
None => return Ok(()),
None => return Ok(None),
};
// Retrieve the batch proposal, clearing the proposed batch.
match proposed_batch.take() {
Some(proposal) => proposal,
None => return Ok(()),
Some(proposal) => Ok(Some(proposal)),
None => Ok(None),
}
})?
else {
return Ok(());
};

/* Proceeding to certify the batch. */
Expand Down Expand Up @@ -793,7 +797,8 @@ impl<N: Network> Primary<N> {
tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;

// Retrieve the block locators.
let block_locators = match self_.sync.get_block_locators() {
let self__ = self_.clone();
let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
Ok(block_locators) => block_locators,
Err(e) => {
warn!("Failed to retrieve block locators - {e}");
Expand Down Expand Up @@ -1113,14 +1118,16 @@ impl<N: Network> Primary<N> {

/// Stores the certified batch and broadcasts it to all validators, returning the certificate.
async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
// Create the batch certificate and transmissions.
let (certificate, transmissions) = proposal.to_certificate(committee)?;
// Convert the transmissions into a HashMap.
// Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
// Store the certified batch.
let (storage, certificate_) = (self.storage.clone(), certificate.clone());
spawn_blocking!(storage.insert_certificate(certificate_, transmissions))?;
let certificate = tokio::task::block_in_place(|| {
// Create the batch certificate and transmissions.
let (certificate, transmissions) = proposal.to_certificate(&committee)?;
// Convert the transmissions into a HashMap.
// Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
// Store the certified batch.
self.storage.insert_certificate(certificate.clone(), transmissions)?;
Ok::<_, anyhow::Error>(certificate)
})?;
debug!("Stored a batch certificate for round {}", certificate.round());
// If a BFT sender was provided, send the certificate to the BFT.
if let Some(bft_sender) = self.bft_sender.get() {
Expand Down

0 comments on commit 32f3774

Please sign in to comment.