Skip to content

Commit

Permalink
[consensus][reconfig] help peers who sent old epoch messages
Browse files Browse the repository at this point in the history
As part of reconfiguration, honest nodes in new epoch need to help others still in old epoch.
Otherwise we have a risk that one honest node join the new epoch and
stop consensus, and remaining 2f are not able to make any progress.
  • Loading branch information
zekun000 committed Nov 5, 2019
1 parent 5b41d41 commit 8cda614
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 43 deletions.
4 changes: 2 additions & 2 deletions consensus/src/chained_bft/chained_bft_smr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ impl<T: Payload> ChainedBftSMR<T> {
// clean up all the previous messages from the old epochs
network_receivers.clear_prev_epoch_msgs();
}
future_epoch_and_peer = network_receivers.future_epoch.select_next_some() => {
different_epoch_and_peer = network_receivers.different_epoch.select_next_some() => {
idle_duration = pre_select_instant.elapsed();
epoch_manager.process_future_epoch(future_epoch_and_peer.0, future_epoch_and_peer.1).await
epoch_manager.process_different_epoch(different_epoch_and_peer.0, different_epoch_and_peer.1).await
}
epoch_retrieval_and_peer = network_receivers.epoch_retrieval.select_next_some() => {
idle_duration = pre_select_instant.elapsed();
Expand Down
47 changes: 29 additions & 18 deletions consensus/src/chained_bft/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use network::proto::ConsensusMsg;
use network::proto::ConsensusMsg_oneof;
use network::validator_network::{ConsensusNetworkSender, Event};
use safety_rules::{ConsensusState, SafetyRules};
use std::cmp::Ordering;
use std::convert::TryInto;
use std::sync::Arc;

Expand Down Expand Up @@ -132,25 +133,35 @@ impl<T: Payload> EpochManager<T> {
};
}

pub async fn process_future_epoch(&mut self, target_epoch: u64, peer_id: AccountAddress) {
let request = EpochRetrievalRequest {
start_epoch: self.epoch,
target_epoch,
};
let msg = match request.try_into() {
Ok(bytes) => ConsensusMsg {
message: Some(ConsensusMsg_oneof::RequestEpoch(bytes)),
},
Err(e) => {
warn!("Fail to serialize EpochRetrievalRequest: {:?}", e);
return;
pub async fn process_different_epoch(&mut self, different_epoch: u64, peer_id: AccountAddress) {
match different_epoch.cmp(&self.epoch) {
// We try to help nodes that have lower epoch than us
Ordering::Less => self.process_epoch_retrieval(different_epoch, peer_id).await,
// We request proof to join higher epoch
Ordering::Greater => {
let request = EpochRetrievalRequest {
start_epoch: self.epoch,
target_epoch: different_epoch,
};
let msg = match request.try_into() {
Ok(bytes) => ConsensusMsg {
message: Some(ConsensusMsg_oneof::RequestEpoch(bytes)),
},
Err(e) => {
warn!("Fail to serialize EpochRetrievalRequest: {:?}", e);
return;
}
};
if let Err(e) = self.network_sender.send_to(peer_id, msg).await {
warn!(
"Failed to send a epoch retrieval to peer {}: {:?}",
peer_id, e
);
}
}
Ordering::Equal => {
warn!("Same epoch should not come to process_different_epoch");
}
};
if let Err(e) = self.network_sender.send_to(peer_id, msg).await {
warn!(
"Failed to send a epoch retrieval to peer {}: {:?}",
peer_id, e
);
}
}

Expand Down
45 changes: 22 additions & 23 deletions consensus/src/chained_bft/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct NetworkReceivers<T> {
pub block_retrieval: libra_channel::Receiver<AccountAddress, IncomingBlockRetrievalRequest<T>>,
pub sync_info_msgs: libra_channel::Receiver<AccountAddress, (SyncInfo, AccountAddress)>,
pub epoch_change: libra_channel::Receiver<AccountAddress, LedgerInfoWithSignatures>,
pub future_epoch: libra_channel::Receiver<AccountAddress, (u64, AccountAddress)>,
pub different_epoch: libra_channel::Receiver<AccountAddress, (u64, AccountAddress)>,
pub epoch_retrieval: libra_channel::Receiver<AccountAddress, (u64, AccountAddress)>,
}

Expand Down Expand Up @@ -255,7 +255,7 @@ pub struct NetworkTask<T> {
block_request_tx: libra_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest<T>>,
sync_info_tx: libra_channel::Sender<AccountAddress, (SyncInfo, AccountAddress)>,
epoch_change_tx: libra_channel::Sender<AccountAddress, LedgerInfoWithSignatures>,
future_epoch_tx: libra_channel::Sender<AccountAddress, (u64, AccountAddress)>,
different_epoch_tx: libra_channel::Sender<AccountAddress, (u64, AccountAddress)>,
epoch_retrieval_tx: libra_channel::Sender<AccountAddress, (u64, AccountAddress)>,
all_events: Box<dyn Stream<Item = failure::Result<Event<ConsensusMsg>>> + Send + Unpin>,
validators: Arc<ValidatorVerifier>,
Expand Down Expand Up @@ -314,7 +314,8 @@ impl<T: Payload> NetworkTask<T> {
dequeued_msgs_counter: &counters::EPOCH_CHANGE_DEQUEUED_MSGS,
}),
);
let (future_epoch_tx, future_epoch_rx) = libra_channel::new(QueueStyle::LIFO, 1, None);
let (different_epoch_tx, different_epoch_rx) =
libra_channel::new(QueueStyle::LIFO, 1, None);
let (epoch_retrieval_tx, epoch_retrieval_rx) =
libra_channel::new(QueueStyle::LIFO, 1, None);
let network_events = network_events.map_err(Into::<failure::Error>::into);
Expand All @@ -327,7 +328,7 @@ impl<T: Payload> NetworkTask<T> {
block_request_tx,
sync_info_tx,
epoch_change_tx,
future_epoch_tx,
different_epoch_tx,
epoch_retrieval_tx,
all_events,
validators,
Expand All @@ -338,7 +339,7 @@ impl<T: Payload> NetworkTask<T> {
block_retrieval: block_request_rx,
sync_info_msgs: sync_info_rx,
epoch_change: epoch_change_rx,
future_epoch: future_epoch_rx,
different_epoch: different_epoch_rx,
epoch_retrieval: epoch_retrieval_rx,
},
)
Expand Down Expand Up @@ -411,17 +412,16 @@ impl<T: Payload> NetworkTask<T> {
) -> failure::Result<()> {
let proposal = ProposalUncheckedSignatures::<T>::try_from(proposal)?;
match proposal.epoch().cmp(&self.epoch) {
Ordering::Less => Ok(()),
Ordering::Equal => {
let proposal = proposal
.validate_signatures(self.validators.as_ref())?
.verify_well_formed()?;
debug!("Received proposal {}", proposal);
Ok(self.proposal_tx.push(peer_id, proposal)?)
self.proposal_tx.push(peer_id, proposal)
}
Ordering::Greater => Ok(self
.future_epoch_tx
.push(peer_id, (proposal.epoch(), peer_id))?),
Ordering::Less | Ordering::Greater => self
.different_epoch_tx
.push(peer_id, (proposal.epoch(), peer_id)),
}
}

Expand All @@ -432,7 +432,6 @@ impl<T: Payload> NetworkTask<T> {
) -> failure::Result<()> {
let vote_msg = VoteMsg::try_from(vote_msg)?;
match vote_msg.epoch().cmp(&self.epoch) {
Ordering::Less => Ok(()),
Ordering::Equal => {
debug!("Received {}", vote_msg);
vote_msg
Expand All @@ -445,11 +444,11 @@ impl<T: Payload> NetworkTask<T> {
.log();
e
})?;
Ok(self.vote_tx.push(peer_id, vote_msg)?)
self.vote_tx.push(peer_id, vote_msg)
}
Ordering::Greater => Ok(self
.future_epoch_tx
.push(peer_id, (vote_msg.epoch(), peer_id))?),
Ordering::Less | Ordering::Greater => self
.different_epoch_tx
.push(peer_id, (vote_msg.epoch(), peer_id)),
}
}

Expand All @@ -460,7 +459,6 @@ impl<T: Payload> NetworkTask<T> {
) -> failure::Result<()> {
let sync_info = SyncInfo::try_from(sync_info)?;
match sync_info.epoch().cmp(&self.epoch) {
Ordering::Less => Ok(()),
Ordering::Equal => {
sync_info.verify(self.validators.as_ref()).map_err(|e| {
security_log(SecurityEvent::InvalidSyncInfoMsg)
Expand All @@ -469,11 +467,11 @@ impl<T: Payload> NetworkTask<T> {
.log();
e
})?;
Ok(self.sync_info_tx.push(peer_id, (sync_info, peer_id))?)
self.sync_info_tx.push(peer_id, (sync_info, peer_id))
}
Ordering::Greater => Ok(self
.future_epoch_tx
.push(peer_id, (sync_info.epoch(), peer_id))?),
Ordering::Less | Ordering::Greater => self
.different_epoch_tx
.push(peer_id, (sync_info.epoch(), peer_id)),
}
}

Expand Down Expand Up @@ -510,7 +508,6 @@ impl<T: Payload> NetworkTask<T> {
let proof = ValidatorChangeEventWithProof::try_from(proof)?;
let msg_epoch = proof.epoch()?;
match msg_epoch.cmp(&self.epoch) {
Ordering::Less => Ok(()),
Ordering::Equal => {
let target_ledger_info = proof.verify(self.epoch, &self.validators)?;
let validators = match target_ledger_info.ledger_info().next_validator_set() {
Expand All @@ -519,9 +516,11 @@ impl<T: Payload> NetworkTask<T> {
};
self.epoch = target_ledger_info.ledger_info().epoch() + 1;
self.validators = Arc::new(validators);
Ok(self.epoch_change_tx.push(peer_id, target_ledger_info)?)
self.epoch_change_tx.push(peer_id, target_ledger_info)
}
Ordering::Less | Ordering::Greater => {
self.different_epoch_tx.push(peer_id, (msg_epoch, peer_id))
}
Ordering::Greater => Ok(self.future_epoch_tx.push(peer_id, (msg_epoch, peer_id))?),
}
}

Expand Down

0 comments on commit 8cda614

Please sign in to comment.