diff --git a/Cargo.lock b/Cargo.lock index 5ee411ac14253..246fc9a6af45c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -939,6 +939,7 @@ dependencies = [ "aptos-short-hex-str", "aptos-types", "bcs 0.1.4", + "fail", "futures", "itertools 0.12.1", "mini-moka", diff --git a/consensus/consensus-types/Cargo.toml b/consensus/consensus-types/Cargo.toml index 3d78e0c65cc45..097da10ad0369 100644 --- a/consensus/consensus-types/Cargo.toml +++ b/consensus/consensus-types/Cargo.toml @@ -23,6 +23,7 @@ aptos-logger = { workspace = true } aptos-short-hex-str = { workspace = true } aptos-types = { workspace = true } bcs = { workspace = true } +fail = { workspace = true } futures = { workspace = true } itertools = { workspace = true } mini-moka = { workspace = true } @@ -41,4 +42,5 @@ serde_json = { workspace = true } [features] default = [] +failpoints = ["fail/failpoints"] fuzzing = ["proptest", "aptos-types/fuzzing", "aptos-crypto/fuzzing"] diff --git a/consensus/consensus-types/src/pipeline/commit_decision.rs b/consensus/consensus-types/src/pipeline/commit_decision.rs index 7f039cd3c4a87..102d4e0d3c5ff 100644 --- a/consensus/consensus-types/src/pipeline/commit_decision.rs +++ b/consensus/consensus-types/src/pipeline/commit_decision.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::common::Round; -use anyhow::Context; +use anyhow::{ensure, Context}; use aptos_types::{ledger_info::LedgerInfoWithSignatures, validator_verifier::ValidatorVerifier}; use serde::{Deserialize, Serialize}; use std::fmt::{Debug, Display, Formatter}; @@ -48,6 +48,10 @@ impl CommitDecision { /// Verifies that the signatures carried in the message forms a valid quorum, /// and then verifies the signature. pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { + ensure!( + !self.ledger_info.commit_info().is_ordered_only(), + "Unexpected ordered only commit info" + ); // We do not need to check the author because as long as the signature tree // is valid, the message should be valid. self.ledger_info diff --git a/consensus/consensus-types/src/sync_info.rs b/consensus/consensus-types/src/sync_info.rs index 0fd5b76ec8bcd..08c9bfa1e91e8 100644 --- a/consensus/consensus-types/src/sync_info.rs +++ b/consensus/consensus-types/src/sync_info.rs @@ -5,6 +5,7 @@ use crate::{common::Round, quorum_cert::QuorumCert, timeout_2chain::TwoChainTimeoutCertificate}; use anyhow::{ensure, Context}; use aptos_types::{block_info::BlockInfo, validator_verifier::ValidatorVerifier}; +use fail::fail_point; use serde::{Deserialize, Serialize}; use std::fmt::{Debug, Display, Formatter}; @@ -52,9 +53,17 @@ impl SyncInfo { let highest_2chain_timeout_cert = highest_2chain_timeout_cert .filter(|tc| tc.round() > highest_quorum_cert.certified_block().round()); + fail_point!("consensus::ordered_only_cert", |_| { + Self { + highest_quorum_cert: highest_quorum_cert.clone(), + highest_ordered_cert: Some(highest_ordered_cert.clone()), + highest_commit_cert: highest_ordered_cert.clone(), + highest_2chain_timeout_cert: highest_2chain_timeout_cert.clone(), + } + }); + let highest_ordered_cert = Some(highest_ordered_cert).filter(|hoc| hoc != &highest_quorum_cert); - Self { highest_quorum_cert, highest_ordered_cert, @@ -156,6 +165,16 @@ impl SyncInfo { "HLI has empty commit info" ); + // we don't have execution in unit tests, so this check would fail + #[cfg(not(any(test, feature = "fuzzing")))] + { + ensure!( + !self.highest_commit_cert().commit_info().is_ordered_only(), + "HLI {} has ordered only commit info", + self.highest_commit_cert().commit_info() + ); + } + self.highest_quorum_cert .verify(validator) .and_then(|_| { diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index 84611d65a46cb..2cddd2e79ce3a 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -200,12 +200,9 @@ impl BlockStore { }; for block in blocks { - block_store - .insert_ordered_block(block) - .await - .unwrap_or_else(|e| { - panic!("[BlockStore] failed to insert block during build {:?}", e) - }); + block_store.insert_block(block).await.unwrap_or_else(|e| { + panic!("[BlockStore] failed to insert block during build {:?}", e) + }); } for qc in quorum_certs { block_store @@ -309,7 +306,7 @@ impl BlockStore { /// Duplicate inserts will return the previously inserted block ( /// note that it is considered a valid non-error case, for example, it can happen if a validator /// receives a certificate for a block that is currently being added). - pub async fn insert_ordered_block(&self, block: Block) -> anyhow::Result> { + pub async fn insert_block(&self, block: Block) -> anyhow::Result> { if let Some(existing_block) = self.get_block(block.id()) { return Ok(existing_block); } @@ -612,6 +609,6 @@ impl BlockStore { if self.ordered_root().round() < block.quorum_cert().commit_info().round() { self.send_for_execution(block.quorum_cert().clone()).await?; } - self.insert_ordered_block(block).await + self.insert_block(block).await } } diff --git a/consensus/src/block_storage/block_store_test.rs b/consensus/src/block_storage/block_store_test.rs index 3943aba4c9ea3..7328688f2f48e 100644 --- a/consensus/src/block_storage/block_store_test.rs +++ b/consensus/src/block_storage/block_store_test.rs @@ -130,7 +130,7 @@ proptest! { let known_parent = block_store.block_exists(block.parent_id()); let certified_parent = block.quorum_cert().certified_block().id() == block.parent_id(); let verify_res = block.verify_well_formed(); - let res = timed_block_on(&runtime, block_store.insert_ordered_block(block.clone())); + let res = timed_block_on(&runtime, block_store.insert_block(block.clone())); if !certified_parent { prop_assert!(verify_res.is_err()); } else if !known_parent { @@ -366,9 +366,7 @@ async fn test_illegal_timestamp() { Vec::new(), ) .unwrap(); - let result = block_store - .insert_ordered_block(block_with_illegal_timestamp) - .await; + let result = block_store.insert_block(block_with_illegal_timestamp).await; assert!(result.is_err()); } diff --git a/consensus/src/block_storage/sync_manager.rs b/consensus/src/block_storage/sync_manager.rs index f7849a943b5fb..d7ba609a187e6 100644 --- a/consensus/src/block_storage/sync_manager.rs +++ b/consensus/src/block_storage/sync_manager.rs @@ -159,7 +159,7 @@ impl BlockStore { while let Some(block) = pending.pop() { let block_qc = block.quorum_cert().clone(); self.insert_single_quorum_cert(block_qc)?; - self.insert_ordered_block(block).await?; + self.insert_block(block).await?; } self.insert_single_quorum_cert(qc) } diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 07d9e7c8094ba..5212a23d999b6 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -793,7 +793,7 @@ impl RoundManager { // tries to add the same block again, which is okay as `execute_and_insert_block` call // is idempotent. self.block_store - .insert_ordered_block(proposal.clone()) + .insert_block(proposal.clone()) .await .context("[RoundManager] Failed to execute_and_insert the block")?; self.resend_verified_proposal_to_self( @@ -884,7 +884,7 @@ impl RoundManager { async fn execute_and_vote(&mut self, proposed_block: Block) -> anyhow::Result { let executed_block = self .block_store - .insert_ordered_block(proposed_block) + .insert_block(proposed_block) .await .context("[RoundManager] Failed to execute_and_insert the block")?; diff --git a/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs b/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs index dff8acfb8f0e7..e77f3bea86c29 100644 --- a/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs +++ b/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs @@ -191,6 +191,47 @@ async fn test_no_failures() { .unwrap(); } +#[tokio::test] +async fn test_ordered_only_cert() { + let num_validators = 3; + + let mut swarm = create_swarm(num_validators, 1).await; + + test_consensus_fault_tolerance( + &mut swarm, + 3, + 5.0, + 1, + Box::new(FailPointFailureInjection::new(Box::new(move |cycle, _| { + ( + vec![( + cycle % num_validators, + "consensus::ordered_only_cert".to_string(), + format!("{}%return", 50), + )], + true, + ) + }))), + Box::new(move |_, _, executed_rounds, executed_transactions, _, _| { + assert!( + executed_transactions >= 4, + "no progress with active consensus, only {} transactions", + executed_transactions + ); + assert!( + executed_rounds >= 2, + "no progress with active consensus, only {} rounds", + executed_rounds + ); + Ok(()) + }), + true, + false, + ) + .await + .unwrap(); +} + #[tokio::test] async fn test_fault_tolerance_of_network_send() { // Randomly increase network failure rate, until network halts, and check that it comes back afterwards.