From c8c9657bd7afb4124b462f161e518c0c88035f01 Mon Sep 17 00:00:00 2001 From: Dmitri Perelman Date: Mon, 4 Nov 2019 18:16:01 -0800 Subject: [PATCH] [Consensus] Clean up the pending messages of the previous epoch Summary: When we start a new epoch there might be still pending messages in the LibraChannel between networking and SMR event loop. In this diff we introduce a capability of cleaning up the LibraChannel from all the messages and use it in NetworkReceivers. (As a side effect this also gives a way to GC old keys from LibraChannel). Testing: Added a unit test for the LibraChannel. --- common/channel/src/libra_channel.rs | 9 ++++++ common/channel/src/message_queues.rs | 6 ++++ common/channel/src/message_queues_test.rs | 31 ++++++++++++++++++++ consensus/src/chained_bft/chained_bft_smr.rs | 2 ++ consensus/src/chained_bft/network.rs | 10 +++++++ 5 files changed, 58 insertions(+) diff --git a/common/channel/src/libra_channel.rs b/common/channel/src/libra_channel.rs index 742cd70b62a..9430b8b5edd 100644 --- a/common/channel/src/libra_channel.rs +++ b/common/channel/src/libra_channel.rs @@ -73,6 +73,15 @@ pub struct Receiver { shared_state: Arc>>, } +impl Receiver { + /// Removes all the previously sent transactions that have not been consumed yet and cleans up + /// the internal queue structure (GC of the previous keys). + pub fn clear(&mut self) { + let mut shared_state = self.shared_state.lock().unwrap(); + shared_state.internal_queue.clear(); + } +} + impl Drop for Receiver { fn drop(&mut self) { let mut shared_state = self.shared_state.lock().unwrap(); diff --git a/common/channel/src/message_queues.rs b/common/channel/src/message_queues.rs index a08acc69833..7c3f8529cd4 100644 --- a/common/channel/src/message_queues.rs +++ b/common/channel/src/message_queues.rs @@ -134,4 +134,10 @@ impl PerKeyQueue { } message } + + /// Clears all the pending messages and cleans up the queue from the previous metadata. + pub(crate) fn clear(&mut self) { + self.per_key_queue.clear(); + self.round_robin_queue.clear(); + } } diff --git a/common/channel/src/message_queues_test.rs b/common/channel/src/message_queues_test.rs index 6bd2bd70e30..a3d788faa93 100644 --- a/common/channel/src/message_queues_test.rs +++ b/common/channel/src/message_queues_test.rs @@ -273,3 +273,34 @@ fn test_lifo_round_robin() { ); assert_eq!(q.pop(), None); } + +#[test] +fn test_message_queue_clear() { + let mut q = PerKeyQueue::new(QueueStyle::LIFO, 3, None); + let validator = AccountAddress::new([0u8; ADDRESS_LENGTH]); + + q.push( + validator, + ProposalMsg { + msg: "msg1".to_string(), + }, + ); + q.push( + validator, + ProposalMsg { + msg: "msg2".to_string(), + }, + ); + assert_eq!(q.pop().unwrap().msg, "msg2".to_string()); + + q.clear(); + assert_eq!(q.pop(), None); + + q.push( + validator, + ProposalMsg { + msg: "msg3".to_string(), + }, + ); + assert_eq!(q.pop().unwrap().msg, "msg3".to_string()); +} diff --git a/consensus/src/chained_bft/chained_bft_smr.rs b/consensus/src/chained_bft/chained_bft_smr.rs index 053f6668e3f..e6cae28c8a9 100644 --- a/consensus/src/chained_bft/chained_bft_smr.rs +++ b/consensus/src/chained_bft/chained_bft_smr.rs @@ -125,6 +125,8 @@ impl ChainedBftSMR { ledger_info = network_receivers.epoch_change.select_next_some() => { idle_duration = pre_select_instant.elapsed(); event_processor = epoch_manager.start_new_epoch(ledger_info); + // clean up all the previous messages from the old epochs + network_receivers.clear_prev_epoch_msgs(); } future_epoch_and_peer = network_receivers.future_epoch.select_next_some() => { idle_duration = pre_select_instant.elapsed(); diff --git a/consensus/src/chained_bft/network.rs b/consensus/src/chained_bft/network.rs index 1aa5daabf70..b084f43d2b3 100644 --- a/consensus/src/chained_bft/network.rs +++ b/consensus/src/chained_bft/network.rs @@ -57,6 +57,16 @@ pub struct NetworkReceivers { pub epoch_retrieval: libra_channel::Receiver, } +impl NetworkReceivers { + pub fn clear_prev_epoch_msgs(&mut self) { + // clear all the channels that are relevant for the previous epoch event processor + self.proposals.clear(); + self.votes.clear(); + self.block_retrieval.clear(); + self.sync_info_msgs.clear(); + } +} + /// Implements the actual networking support for all consensus messaging. #[derive(Clone)] pub struct NetworkSender {