Skip to content

Commit

Permalink
[consensus] add missing checks for commit cert (#13379)
Browse files Browse the repository at this point in the history
Co-authored-by: Zekun Li <zekunli@Zekuns-MBP-2.localdomain>
  • Loading branch information
zekun000 and Zekun Li committed May 28, 2024
1 parent c47d79e commit 0ee977e
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions consensus/consensus-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ aptos-logger = { workspace = true }
aptos-short-hex-str = { workspace = true }
aptos-types = { workspace = true }
bcs = { workspace = true }
fail = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
mini-moka = { workspace = true }
Expand All @@ -41,4 +42,5 @@ serde_json = { workspace = true }

[features]
default = []
failpoints = ["fail/failpoints"]
fuzzing = ["proptest", "aptos-types/fuzzing", "aptos-crypto/fuzzing"]
6 changes: 5 additions & 1 deletion consensus/consensus-types/src/pipeline/commit_decision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::common::Round;
use anyhow::Context;
use anyhow::{ensure, Context};
use aptos_types::{ledger_info::LedgerInfoWithSignatures, validator_verifier::ValidatorVerifier};
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display, Formatter};
Expand Down Expand Up @@ -48,6 +48,10 @@ impl CommitDecision {
/// Verifies that the signatures carried in the message forms a valid quorum,
/// and then verifies the signature.
pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
ensure!(
!self.ledger_info.commit_info().is_ordered_only(),
"Unexpected ordered only commit info"
);
// We do not need to check the author because as long as the signature tree
// is valid, the message should be valid.
self.ledger_info
Expand Down
21 changes: 20 additions & 1 deletion consensus/consensus-types/src/sync_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::{common::Round, quorum_cert::QuorumCert, timeout_2chain::TwoChainTimeoutCertificate};
use anyhow::{ensure, Context};
use aptos_types::{block_info::BlockInfo, validator_verifier::ValidatorVerifier};
use fail::fail_point;
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display, Formatter};

Expand Down Expand Up @@ -52,9 +53,17 @@ impl SyncInfo {
let highest_2chain_timeout_cert = highest_2chain_timeout_cert
.filter(|tc| tc.round() > highest_quorum_cert.certified_block().round());

fail_point!("consensus::ordered_only_cert", |_| {
Self {
highest_quorum_cert: highest_quorum_cert.clone(),
highest_ordered_cert: Some(highest_ordered_cert.clone()),
highest_commit_cert: highest_ordered_cert.clone(),
highest_2chain_timeout_cert: highest_2chain_timeout_cert.clone(),
}
});

let highest_ordered_cert =
Some(highest_ordered_cert).filter(|hoc| hoc != &highest_quorum_cert);

Self {
highest_quorum_cert,
highest_ordered_cert,
Expand Down Expand Up @@ -156,6 +165,16 @@ impl SyncInfo {
"HLI has empty commit info"
);

// we don't have execution in unit tests, so this check would fail
#[cfg(not(any(test, feature = "fuzzing")))]
{
ensure!(
!self.highest_commit_cert().commit_info().is_ordered_only(),
"HLI {} has ordered only commit info",
self.highest_commit_cert().commit_info()
);
}

self.highest_quorum_cert
.verify(validator)
.and_then(|_| {
Expand Down
13 changes: 5 additions & 8 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,9 @@ impl BlockStore {
};

for block in blocks {
block_store
.insert_ordered_block(block)
.await
.unwrap_or_else(|e| {
panic!("[BlockStore] failed to insert block during build {:?}", e)
});
block_store.insert_block(block).await.unwrap_or_else(|e| {
panic!("[BlockStore] failed to insert block during build {:?}", e)
});
}
for qc in quorum_certs {
block_store
Expand Down Expand Up @@ -309,7 +306,7 @@ impl BlockStore {
/// Duplicate inserts will return the previously inserted block (
/// note that it is considered a valid non-error case, for example, it can happen if a validator
/// receives a certificate for a block that is currently being added).
pub async fn insert_ordered_block(&self, block: Block) -> anyhow::Result<Arc<PipelinedBlock>> {
pub async fn insert_block(&self, block: Block) -> anyhow::Result<Arc<PipelinedBlock>> {
if let Some(existing_block) = self.get_block(block.id()) {
return Ok(existing_block);
}
Expand Down Expand Up @@ -612,6 +609,6 @@ impl BlockStore {
if self.ordered_root().round() < block.quorum_cert().commit_info().round() {
self.send_for_execution(block.quorum_cert().clone()).await?;
}
self.insert_ordered_block(block).await
self.insert_block(block).await
}
}
6 changes: 2 additions & 4 deletions consensus/src/block_storage/block_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ proptest! {
let known_parent = block_store.block_exists(block.parent_id());
let certified_parent = block.quorum_cert().certified_block().id() == block.parent_id();
let verify_res = block.verify_well_formed();
let res = timed_block_on(&runtime, block_store.insert_ordered_block(block.clone()));
let res = timed_block_on(&runtime, block_store.insert_block(block.clone()));
if !certified_parent {
prop_assert!(verify_res.is_err());
} else if !known_parent {
Expand Down Expand Up @@ -366,9 +366,7 @@ async fn test_illegal_timestamp() {
Vec::new(),
)
.unwrap();
let result = block_store
.insert_ordered_block(block_with_illegal_timestamp)
.await;
let result = block_store.insert_block(block_with_illegal_timestamp).await;
assert!(result.is_err());
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl BlockStore {
while let Some(block) = pending.pop() {
let block_qc = block.quorum_cert().clone();
self.insert_single_quorum_cert(block_qc)?;
self.insert_ordered_block(block).await?;
self.insert_block(block).await?;
}
self.insert_single_quorum_cert(qc)
}
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/round_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ impl RoundManager {
// tries to add the same block again, which is okay as `execute_and_insert_block` call
// is idempotent.
self.block_store
.insert_ordered_block(proposal.clone())
.insert_block(proposal.clone())
.await
.context("[RoundManager] Failed to execute_and_insert the block")?;
self.resend_verified_proposal_to_self(
Expand Down Expand Up @@ -884,7 +884,7 @@ impl RoundManager {
async fn execute_and_vote(&mut self, proposed_block: Block) -> anyhow::Result<Vote> {
let executed_block = self
.block_store
.insert_ordered_block(proposed_block)
.insert_block(proposed_block)
.await
.context("[RoundManager] Failed to execute_and_insert the block")?;

Expand Down
41 changes: 41 additions & 0 deletions testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,47 @@ async fn test_no_failures() {
.unwrap();
}

#[tokio::test]
async fn test_ordered_only_cert() {
let num_validators = 3;

let mut swarm = create_swarm(num_validators, 1).await;

test_consensus_fault_tolerance(
&mut swarm,
3,
5.0,
1,
Box::new(FailPointFailureInjection::new(Box::new(move |cycle, _| {
(
vec![(
cycle % num_validators,
"consensus::ordered_only_cert".to_string(),
format!("{}%return", 50),
)],
true,
)
}))),
Box::new(move |_, _, executed_rounds, executed_transactions, _, _| {
assert!(
executed_transactions >= 4,
"no progress with active consensus, only {} transactions",
executed_transactions
);
assert!(
executed_rounds >= 2,
"no progress with active consensus, only {} rounds",
executed_rounds
);
Ok(())
}),
true,
false,
)
.await
.unwrap();
}

#[tokio::test]
async fn test_fault_tolerance_of_network_send() {
// Randomly increase network failure rate, until network halts, and check that it comes back afterwards.
Expand Down

0 comments on commit 0ee977e

Please sign in to comment.