Skip to content

Commit

Permalink
Merge branch 'maksym-queue-filtering-opt' into 'master'
Browse files Browse the repository at this point in the history
perf: [EXC-1356] speed up execute_round by optimising queue::filter_messages

Optimise `queue::filter_messages` for the case when number of dropped messages is small, less than a half of all the messages in the queue.

Current approach iterates over all the messages in the queue and splits them into two buckets on the fly (filtered, not filtered), which essentially means every time reallocating the whole queue even when there are no changes in it.

New approach first iterates over all the messages and collect indexes of dropped messages. If there's none, then stop early, without any extra memory allocations (presumably the most popular case). Otherwise split the initial queue into two buckets of a known size (which also saves some time on memory re-allocations).

On a synthetic test with 20k total and 20 heartbeat canisters this optimisation showed -5...-8% of a total `execute_round` time. 

See merge request dfinity-lab/public/ic!11252
  • Loading branch information
maksymar committed Mar 14, 2023
2 parents 71849f8 + 97a5c3e commit 2c2fb8e
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions rs/replicated_state/src/canister_state/queues/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,22 +852,22 @@ impl IngressQueue {
}

/// Calls `filter` on each ingress message in the queue, retaining the
/// messages for whom the filter returns `true` and dropping the rest.
/// messages for which the filter returns `true` and returning the rest.
pub(super) fn filter_messages<F>(&mut self, mut filter: F) -> Vec<Arc<Ingress>>
where
F: FnMut(&Arc<Ingress>) -> bool,
{
// This method operates in place, visiting each element exactly once in the
// original order, and preserves the order of the dropped elements.
let mut filtered_messages = vec![];
let mut new_queue = VecDeque::new();
for item in self.queue.iter() {
self.queue.retain_mut(|item| {
if filter(item) {
new_queue.push_back(Arc::clone(item));
true
} else {
filtered_messages.push(Arc::clone(item));
false
}
}

self.queue = new_queue;
});
self.size_bytes = Self::size_bytes(&self.queue);
filtered_messages
}
Expand Down

0 comments on commit 2c2fb8e

Please sign in to comment.