Skip to content

Commit

Permalink
[consensus][restart] commit upto highest ledger info after rebuild
Browse files Browse the repository at this point in the history
Closes: #1410
Approved by: aching
  • Loading branch information
zekun000 authored and bors-libra committed Oct 23, 2019
1 parent 4755886 commit f367100
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 76 deletions.
63 changes: 61 additions & 2 deletions consensus/src/chained_bft/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
block_storage::{block_tree::BlockTree, BlockReader, VoteReceptionResult},
persistent_storage::{PersistentStorage, RecoveryData},
},
counters,
state_replication::StateComputer,
};
use consensus_types::{
Expand All @@ -22,14 +23,15 @@ use failure::ResultExt;
use libra_crypto::HashValue;
use libra_logger::prelude::*;

use libra_types::crypto_proxies::ValidatorVerifier;
use libra_types::crypto_proxies::{LedgerInfoWithSignatures, ValidatorVerifier};
#[cfg(any(test, feature = "fuzzing"))]
use libra_types::validator_set::ValidatorSet;
use mirai_annotations::checked_precondition;
use std::{
collections::{vec_deque::VecDeque, HashMap},
sync::{Arc, RwLock},
};
use termion::color::*;

#[cfg(test)]
#[path = "block_store_test.rs"]
Expand Down Expand Up @@ -154,6 +156,50 @@ impl<T: Payload> BlockStore<T> {
tree
}

/// Commit the given block id with the proof, returns the path from current root or error
pub async fn commit(
&self,
finality_proof: LedgerInfoWithSignatures,
) -> failure::Result<Vec<Arc<ExecutedBlock<T>>>> {
let block_id_to_commit = finality_proof.ledger_info().consensus_block_id();
let block_to_commit = self
.get_block(block_id_to_commit)
.ok_or_else(|| format_err!("Committed block id not found"))?;

// First make sure that this commit is new.
ensure!(
block_to_commit.round() > self.root().round(),
"Committed block round lower than root"
);

let blocks_to_commit = self
.path_from_root(block_id_to_commit)
.unwrap_or_else(Vec::new);

let payload_and_output_list = blocks_to_commit
.iter()
.map(|b| {
(
b.payload().unwrap_or(&T::default()).clone(),
Arc::clone(b.output()),
)
})
.collect();
self.state_computer
.commit(payload_and_output_list, finality_proof)
.await
.unwrap_or_else(|e| panic!("Failed to persist commit due to {:?}", e));
counters::LAST_COMMITTED_ROUND.set(block_to_commit.round() as i64);
debug!("{}Committed{} {}", Fg(Blue), Fg(Reset), *block_to_commit);
event!("committed",
"block_id": block_to_commit.id().short_str(),
"round": block_to_commit.round(),
"parent_id": block_to_commit.parent_id().short_str(),
);
self.prune_tree(block_to_commit.id());
Ok(blocks_to_commit)
}

pub async fn rebuild(
&self,
root: (Block<T>, QuorumCert, QuorumCert),
Expand All @@ -178,6 +224,19 @@ impl<T: Payload> BlockStore<T> {
error!("fail to delete block: {:?}", e);
}
*self.inner.write().unwrap() = tree;
// If we fail to commit B_i via state computer and crash, after restart our highest ledger info
// will not match the latest commit B_j(j<i) of state computer.
// This introduces an inconsistent state if we send out SyncInfo and others try to sync to
// B_i and figure out we only have B_j.
// Here we commit up to the highest_ledger_info to maintain highest_ledger_info == state_computer.committed_trees.
if let Some(block_to_commit) = self.highest_ledger_info().committed_block_id() {
if block_to_commit != self.root().id() {
let finality_proof = self.highest_ledger_info().ledger_info().clone();
if let Err(e) = self.commit(finality_proof).await {
warn!("{:?}", e);
}
}
}
}

/// Execute and insert a block if it passes all validation tests.
Expand Down Expand Up @@ -320,7 +379,7 @@ impl<T: Payload> BlockStore<T> {
/// B3--> B4, root = B3
///
/// Returns the block ids of the blocks removed.
pub fn prune_tree(&self, next_root_id: HashValue) -> VecDeque<HashValue> {
fn prune_tree(&self, next_root_id: HashValue) -> VecDeque<HashValue> {
let id_to_remove = self
.inner
.read()
Expand Down
1 change: 0 additions & 1 deletion consensus/src/chained_bft/chained_bft_smr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ impl<T: Payload> ChainedBftSMR<T> {
proposer_election,
proposal_generator,
safety_rules,
state_computer,
txn_manager,
network.clone(),
Arc::clone(&self.storage),
Expand Down
75 changes: 8 additions & 67 deletions consensus/src/chained_bft/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
persistent_storage::PersistentStorage,
},
counters,
state_replication::{StateComputer, TxnManager},
state_replication::TxnManager,
util::time_service::{
duration_since_epoch, wait_if_possible, TimeService, WaitingError, WaitingSuccess,
},
Expand All @@ -30,7 +30,6 @@ use consensus_types::{
vote_proposal::VoteProposal,
};
use failure::ResultExt;
use libra_crypto::HashValue;
use libra_logger::prelude::*;
use libra_types::crypto_proxies::{LedgerInfoWithSignatures, ValidatorVerifier};
use mirai_annotations::{
Expand Down Expand Up @@ -64,7 +63,6 @@ pub struct EventProcessor<T> {
proposer_election: Box<dyn ProposerElection<T> + Send + Sync>,
proposal_generator: ProposalGenerator<T>,
safety_rules: SafetyRules,
state_computer: Arc<dyn StateComputer<Payload = T>>,
txn_manager: Arc<dyn TxnManager<Payload = T>>,
network: ConsensusNetworkImpl,
storage: Arc<dyn PersistentStorage<T>>,
Expand All @@ -83,7 +81,6 @@ impl<T: Payload> EventProcessor<T> {
proposer_election: Box<dyn ProposerElection<T> + Send + Sync>,
proposal_generator: ProposalGenerator<T>,
safety_rules: SafetyRules,
state_computer: Arc<dyn StateComputer<Payload = T>>,
txn_manager: Arc<dyn TxnManager<Payload = T>>,
network: ConsensusNetworkImpl,
storage: Arc<dyn PersistentStorage<T>>,
Expand All @@ -103,7 +100,6 @@ impl<T: Payload> EventProcessor<T> {
proposer_election,
proposal_generator,
safety_rules,
state_computer,
txn_manager,
network,
storage,
Expand Down Expand Up @@ -429,7 +425,7 @@ impl<T: Payload> EventProcessor<T> {
highest_committed_proposal_round = Some(block.round());
}
let finality_proof = qc.ledger_info().clone();
self.process_commit(block.id(), finality_proof).await;
self.process_commit(finality_proof).await;
}
let mut tc_round = None;
if let Some(timeout_cert) = tc {
Expand Down Expand Up @@ -739,64 +735,17 @@ impl<T: Payload> EventProcessor<T> {
}

/// Upon (potentially) new commit:
/// 0. Verify that this commit is newer than the current root.
/// 1. Notify state computer with the finality proof.
/// 1. Commit the blocks via block store.
/// 2. After the state is finalized, update the txn manager with the status of the committed
/// transactions.
/// 3. Prune the tree.
async fn process_commit(
&self,
block_id_to_commit: HashValue,
finality_proof: LedgerInfoWithSignatures,
) {
let block_to_commit = match self.block_store.get_block(block_id_to_commit) {
Some(block) => block,
None => {
async fn process_commit(&self, finality_proof: LedgerInfoWithSignatures) {
let blocks_to_commit = match self.block_store.commit(finality_proof).await {
Ok(blocks) => blocks,
Err(e) => {
error!("{:?}", e);
return;
}
};

// First make sure that this commit is new.
if block_to_commit.round() <= self.block_store.root().round() {
return;
}

// Verify that the ledger info is indeed for the block we're planning to
// commit.
assert_eq!(
finality_proof.ledger_info().consensus_block_id(),
block_to_commit.id()
);

let blocks_to_commit = self
.block_store
.path_from_root(block_id_to_commit)
.unwrap_or_else(Vec::new);

let payload_and_output_list = blocks_to_commit
.iter()
.map(|b| {
(
b.payload().unwrap_or(&T::default()).clone(),
Arc::clone(b.output()),
)
})
.collect();
if let Err(e) = self
.state_computer
.commit(payload_and_output_list, finality_proof)
.await
{
// We assume that state computer cannot enter an inconsistent state that might
// violate safety of the protocol. Specifically, an executor service is going to panic
// if it fails to persist the commit requests, which would crash the whole process
// including consensus.
error!(
"Failed to persist commit, mempool will not be notified: {:?}",
e
);
return;
}
// At this moment the new state is persisted and we can notify the clients.
// Multiple blocks might be committed at once: notify about all the transactions in the
// path from the old root to the new root.
Expand All @@ -821,14 +770,6 @@ impl<T: Payload> EventProcessor<T> {
}
}
}
counters::LAST_COMMITTED_ROUND.set(block_to_commit.round() as i64);
debug!("{}Committed{} {}", Fg(Blue), Fg(Reset), *block_to_commit);
event!("committed",
"block_id": block_to_commit.id().short_str(),
"round": block_to_commit.round(),
"parent_id": block_to_commit.parent_id().short_str(),
);
self.block_store.prune_tree(block_to_commit.id());
}

/// Retrieve a n chained blocks from the block store starting from
Expand Down
4 changes: 0 additions & 4 deletions consensus/src/chained_bft/event_processor_fuzzing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@ fn create_node_for_fuzzing() -> EventProcessor<TestPayload> {
// TODO: have two different nodes, one for proposing, one for accepting a proposal
let proposer_election = Box::new(RotatingProposer::new(vec![signer.author()], 1));

// TODO: do we want to fuzz the real StateComputer as well?
let empty_state_computer = Arc::new(EmptyStateComputer);

// We do not want to care about the time
let enforce_increasing_timestamps = false;

Expand All @@ -151,7 +148,6 @@ fn create_node_for_fuzzing() -> EventProcessor<TestPayload> {
proposer_election,
proposal_generator,
safety_rules,
empty_state_computer,
Arc::new(MockTransactionManager::new()),
network,
storage.clone(),
Expand Down
1 change: 0 additions & 1 deletion consensus/src/chained_bft/event_processor_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ impl NodeSetup {
proposer_election,
proposal_generator,
safety_rules,
state_computer,
Arc::new(MockTransactionManager::new()),
network,
storage.clone(),
Expand Down
11 changes: 10 additions & 1 deletion consensus/src/chained_bft/liveness/proposal_generator_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
},
util::mock_time_service::SimulatedTimeService,
};
use consensus_types::block::block_test_utils::placeholder_certificate_for_block;
use consensus_types::{block::Block, quorum_cert::QuorumCert};
use futures::executor::block_on;
use libra_types::crypto_proxies::ValidatorSigner;
Expand Down Expand Up @@ -127,7 +128,15 @@ fn test_empty_proposal_after_reconfiguration() {
let a2 = inserter.insert_reconfiguration_block(&a1, 2);
inserter.insert_qc_for_block(a2.as_ref(), None);
// if reconfiguration is committed, generate normal proposal
block_store.prune_tree(a2.id());
let li = placeholder_certificate_for_block(
vec![inserter.signer()],
a2.id(),
a2.round(),
a2.id(),
a2.round(),
Some(a2.id()),
);
block_on(block_store.commit(li.ledger_info().clone())).unwrap();
let normal_proposal_2 =
block_on(proposal_generator.generate_proposal(43, minute_from_now())).unwrap();
assert!(!normal_proposal_2.payload().unwrap().is_empty());
Expand Down

0 comments on commit f367100

Please sign in to comment.