Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ZKS-00] Implement block-per-anchor instead of block-per-subdag #3066

Merged
merged 5 commits into from
Feb 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
225 changes: 148 additions & 77 deletions node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,94 +497,140 @@ impl<N: Network> BFT<N> {
leader_certificate: BatchCertificate<N>,
election_certificate_ids: IndexSet<Field<N>>,
) -> Result<()> {
// Retrieve the leader certificate round.
let leader_round = leader_certificate.round();
// Compute the commit subdag.
let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
Ok(subdag) => subdag,
Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
};
// Initialize a map for the deduped transmissions.
let mut transmissions = IndexMap::new();
// Start from the oldest leader certificate.
for certificate in commit_subdag.values().flatten() {
// Update the DAG.
if IS_SYNCING {
self.dag.write().commit(certificate, self.storage().max_gc_rounds());
}
// Retrieve the transmissions.
for transmission_id in certificate.transmission_ids() {
// If the transmission already exists in the map, skip it.
if transmissions.contains_key(transmission_id) {
continue;
}
// If the transmission already exists in the ledger, skip it.
// Note: On failure to read from the ledger, we skip including this transmission, out of safety.
if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
// Determine the list of all previous leader certificates since the last committed round.
// The order of the leader certificates is from **newest** to **oldest**.
let mut leader_certificates = vec![leader_certificate.clone()];
{
// Retrieve the leader round.
let leader_round = leader_certificate.round();

let mut current_certificate = leader_certificate;
for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
{
// Retrieve the previous committee for the leader round.
let previous_committee = match self.ledger().get_previous_committee_for_round(round) {
Ok(committee) => committee,
Err(e) => {
bail!("BFT failed to retrieve the previous committee for the even round {round} - {e}");
}
};
// Compute the leader address for the leader round.
let leader = match previous_committee.get_leader(round) {
Ok(leader) => leader,
Err(e) => {
bail!("BFT failed to compute the leader for the even round {round} - {e}");
}
};
// Retrieve the previous leader certificate.
let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
else {
continue;
}
// Retrieve the transmission.
let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
bail!(
"BFT failed to retrieve transmission '{}' from round {}",
fmt_id(transmission_id),
certificate.round()
);
};
// Add the transmission to the set.
transmissions.insert(*transmission_id, transmission);
// Determine if there is a path between the previous certificate and the current certificate.
if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
// Add the previous leader certificate to the list of certificates to commit.
leader_certificates.push(previous_certificate.clone());
// Update the current certificate to the previous leader certificate.
current_certificate = previous_certificate;
}
}
}
// If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
if !IS_SYNCING {
// Construct the subdag.
let subdag = Subdag::from(commit_subdag.clone(), election_certificate_ids.clone())?;
// Retrieve the anchor round.
let anchor_round = subdag.anchor_round();
// Retrieve the number of transmissions.
let num_transmissions = transmissions.len();
// Retrieve metadata about the subdag.
let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();

// Ensure the subdag anchor round matches the leader round.
ensure!(
anchor_round == leader_round,
"BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
);

// Trigger consensus.
if let Some(consensus_sender) = self.consensus_sender.get() {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the subdag and transmissions to consensus.
consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
// Await the callback to continue.
match callback_receiver.await {
Ok(Ok(())) => (), // continue
Ok(Err(e)) => {
error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
return Ok(());
// Iterate over the leader certificates to commit.
for leader_certificate in leader_certificates.into_iter().rev() {
// Retrieve the leader certificate round.
let leader_round = leader_certificate.round();
// Compute the commit subdag.
let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
Ok(subdag) => subdag,
Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
};
// Initialize a map for the deduped transmissions.
let mut transmissions = IndexMap::new();
// Start from the oldest leader certificate.
for certificate in commit_subdag.values().flatten() {
// Update the DAG.
if IS_SYNCING {
self.dag.write().commit(certificate, self.storage().max_gc_rounds());
}
// Retrieve the transmissions.
for transmission_id in certificate.transmission_ids() {
// If the transmission already exists in the map, skip it.
if transmissions.contains_key(transmission_id) {
continue;
}
Err(e) => {
error!("BFT failed to receive the callback for round {anchor_round} - {e}");
return Ok(());
// If the transmission already exists in the ledger, skip it.
// Note: On failure to read from the ledger, we skip including this transmission, out of safety.
if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
continue;
}
// Retrieve the transmission.
let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
bail!(
"BFT failed to retrieve transmission '{}' from round {}",
fmt_id(transmission_id),
certificate.round()
);
};
// Add the transmission to the set.
transmissions.insert(*transmission_id, transmission);
}
}
// If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
if !IS_SYNCING {
// Construct the subdag.
let subdag = Subdag::from(commit_subdag.clone(), election_certificate_ids.clone())?;
// Retrieve the anchor round.
let anchor_round = subdag.anchor_round();
// Retrieve the number of transmissions.
let num_transmissions = transmissions.len();
// Retrieve metadata about the subdag.
let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();

// Ensure the subdag anchor round matches the leader round.
ensure!(
anchor_round == leader_round,
"BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
);

// Trigger consensus.
if let Some(consensus_sender) = self.consensus_sender.get() {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the subdag and transmissions to consensus.
consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
// Await the callback to continue.
match callback_receiver.await {
Ok(Ok(())) => (), // continue
Ok(Err(e)) => {
error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
return Ok(());
}
Err(e) => {
error!("BFT failed to receive the callback for round {anchor_round} - {e}");
return Ok(());
}
}
}

info!(
"\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
);
// Update the DAG, as the subdag was successfully included into a block.
let mut dag_write = self.dag.write();
for certificate in commit_subdag.values().flatten() {
dag_write.commit(certificate, self.storage().max_gc_rounds());
info!(
"\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
);
// Update the DAG, as the subdag was successfully included into a block.
let mut dag_write = self.dag.write();
for certificate in commit_subdag.values().flatten() {
dag_write.commit(certificate, self.storage().max_gc_rounds());
}
}
// Update the last election certificate IDs.
// TODO (howardwu): This is currently writing the *latest* election certificate IDs,
// however this needs to be dynamically retrieving the election certificate IDs for the
// leader round (technically the `leader_round+1` round to get the election round)
// that it is currently committing.
{
let mut last_election_certificate_ids = self.last_election_certificate_ids.write();
*last_election_certificate_ids = election_certificate_ids.clone();
}
}
// Update the last election certificate IDs.
{
let mut last_election_certificate_ids = self.last_election_certificate_ids.write();
*last_election_certificate_ids = election_certificate_ids;
}
Ok(())
}
Expand Down Expand Up @@ -668,6 +714,31 @@ impl<N: Network> BFT<N> {
// Return the certificates to commit.
Ok(commit)
}

/// Returns `true` if there is a path from the previous certificate to the current certificate.
fn is_linked(
&self,
previous_certificate: BatchCertificate<N>,
current_certificate: BatchCertificate<N>,
) -> Result<bool> {
// Initialize the list containing the traversal.
let mut traversal = vec![current_certificate.clone()];
// Iterate over the rounds from the current certificate to the previous certificate.
for round in (previous_certificate.round()..current_certificate.round()).rev() {
// Retrieve all of the certificates for this past round.
let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
// This is a critical error, as the traversal should have these certificates.
// If this error is hit, it is likely that the maximum GC rounds should be increased.
bail!("BFT failed to retrieve the certificates for past round {round}");
};
// Filter the certificates to only include those that are in the traversal.
traversal = certificates
.into_values()
.filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
.collect();
}
Ok(traversal.contains(&previous_certificate))
}
}

impl<N: Network> BFT<N> {
Expand Down