Skip to content

Commit

Permalink
[Consensus] Clean up the pending messages of the previous epoch
Browse files Browse the repository at this point in the history
Summary:
When we start a new epoch there might be still pending messages in the LibraChannel between
networking and SMR event loop.
In this diff we introduce a capability of cleaning up the LibraChannel from all the messages and
use it in NetworkReceivers. (As a side effect this also gives a way to GC old keys from LibraChannel).

Testing:
Added a unit test for the LibraChannel.
  • Loading branch information
Dmitri Perelman committed Nov 5, 2019
1 parent 39ad3ae commit c8c9657
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 0 deletions.
9 changes: 9 additions & 0 deletions common/channel/src/libra_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ pub struct Receiver<K: Eq + Hash + Clone, M> {
shared_state: Arc<Mutex<SharedState<K, M>>>,
}

impl<K: Eq + Hash + Clone, M> Receiver<K, M> {
/// Removes all the previously sent transactions that have not been consumed yet and cleans up
/// the internal queue structure (GC of the previous keys).
pub fn clear(&mut self) {
let mut shared_state = self.shared_state.lock().unwrap();
shared_state.internal_queue.clear();
}
}

impl<K: Eq + Hash + Clone, M> Drop for Receiver<K, M> {
fn drop(&mut self) {
let mut shared_state = self.shared_state.lock().unwrap();
Expand Down
6 changes: 6 additions & 0 deletions common/channel/src/message_queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,10 @@ impl<K: Eq + Hash + Clone, T> PerKeyQueue<K, T> {
}
message
}

/// Clears all the pending messages and cleans up the queue from the previous metadata.
pub(crate) fn clear(&mut self) {
self.per_key_queue.clear();
self.round_robin_queue.clear();
}
}
31 changes: 31 additions & 0 deletions common/channel/src/message_queues_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,34 @@ fn test_lifo_round_robin() {
);
assert_eq!(q.pop(), None);
}

#[test]
fn test_message_queue_clear() {
let mut q = PerKeyQueue::new(QueueStyle::LIFO, 3, None);
let validator = AccountAddress::new([0u8; ADDRESS_LENGTH]);

q.push(
validator,
ProposalMsg {
msg: "msg1".to_string(),
},
);
q.push(
validator,
ProposalMsg {
msg: "msg2".to_string(),
},
);
assert_eq!(q.pop().unwrap().msg, "msg2".to_string());

q.clear();
assert_eq!(q.pop(), None);

q.push(
validator,
ProposalMsg {
msg: "msg3".to_string(),
},
);
assert_eq!(q.pop().unwrap().msg, "msg3".to_string());
}
2 changes: 2 additions & 0 deletions consensus/src/chained_bft/chained_bft_smr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ impl<T: Payload> ChainedBftSMR<T> {
ledger_info = network_receivers.epoch_change.select_next_some() => {
idle_duration = pre_select_instant.elapsed();
event_processor = epoch_manager.start_new_epoch(ledger_info);
// clean up all the previous messages from the old epochs
network_receivers.clear_prev_epoch_msgs();
}
future_epoch_and_peer = network_receivers.future_epoch.select_next_some() => {
idle_duration = pre_select_instant.elapsed();
Expand Down
10 changes: 10 additions & 0 deletions consensus/src/chained_bft/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ pub struct NetworkReceivers<T> {
pub epoch_retrieval: libra_channel::Receiver<AccountAddress, (u64, AccountAddress)>,
}

impl<T> NetworkReceivers<T> {
pub fn clear_prev_epoch_msgs(&mut self) {
// clear all the channels that are relevant for the previous epoch event processor
self.proposals.clear();
self.votes.clear();
self.block_retrieval.clear();
self.sync_info_msgs.clear();
}
}

/// Implements the actual networking support for all consensus messaging.
#[derive(Clone)]
pub struct NetworkSender {
Expand Down

0 comments on commit c8c9657

Please sign in to comment.