Skip to content

Commit

Permalink
fix(consensus): Stop ingress selector prematurely removing canisters …
Browse files Browse the repository at this point in the history
…from selection
  • Loading branch information
dist1ll committed Feb 2, 2024
1 parent 0f7973a commit 67730a2
Showing 1 changed file with 94 additions and 11 deletions.
105 changes: 94 additions & 11 deletions rs/ingress_manager/src/ingress_selector.rs
Expand Up @@ -30,6 +30,21 @@ use ic_types::{
use ic_validator::RequestValidationError;
use std::{collections::BTreeMap, collections::HashMap, sync::Arc};

/// Number of round-robin iterations that need to happen, before we weaken the selection
/// rule #2. This weakening helps the ingress selector progress when the quota is either
/// not increasing fast enough or in the worst case stuck.
///
/// Strong inclusion rule:
/// The quota is a hard limit, with the exception of a canister's *first* message.
///
/// Weak inclusion rule:
/// The quota is a hard limit, with the exception of a canister's first *n*
/// messages, where n is the current round-robin iteration count.
///
/// The weak rule compromises on fairness to ensure our ingress selector doesn't get
/// stuck.
const ITERATIONS_BEFORE_WEAKEN_INCLUDE_RULE: u32 = 4;

impl IngressSelector for IngressManager {
fn get_ingress_payload(
&self,
Expand Down Expand Up @@ -91,6 +106,7 @@ impl IngressSelector for IngressManager {
struct CanisterQueue<'a> {
/// Number of bytes the canister's queue that was included in ingress
bytes_included: usize,
msgs_included: u32,
msgs: Vec<&'a ValidatedIngressArtifact>,
}

Expand Down Expand Up @@ -134,9 +150,12 @@ impl IngressSelector for IngressManager {

let mut canisters: Vec<_> = canister_queues.keys().cloned().collect();

// Iterate over all canisters until the payload is full or no messages are left
// Do round-robin iterations until the payload is full or no messages are left
let mut round_robin_iter: u32 = 0;
'outer: while !canister_queues.is_empty() {
// Iterate over canisters, selecting messages in a quota-based round-robin fashion
round_robin_iter += 1;
// Execute a single round-robin iteration, by looping through the canisters
// and selecting messages up bound by per-canister quota and payload size.
let mut i = 0;
while i < canisters.len() {
let canister_id = canisters[i];
Expand All @@ -154,6 +173,8 @@ impl IngressSelector for IngressManager {
num_messages,
&mut cycles_needed,
);
// Any message that generates validation errors gets removed from
// the canister's queue.
match result {
Ok(()) => (),
Err(ValidationError::Permanent(
Expand All @@ -175,14 +196,23 @@ impl IngressSelector for IngressManager {
break 'outer;
}

// Break criterion #2: canister with at least one included message
// crossed quota
if queue.bytes_included > 0 && (queue.bytes_included + ingress_size) > quota {
// Break criterion #2: canister with at least max(1, n) included

This comment has been minimized.

Copy link
@massimoalbarello

massimoalbarello Feb 10, 2024

Hello again @dist1ll :)

I'm trying to understand how the previous implementation could get stuck in the loop.

Does it happen because after the first round robin is finished (and therefore each canister has included at least one message - satisfying the first part of the old condition #2), if the updated quota is not big enough to accommodate any other message in the canister queue, the break condition #2 will always be true and therefore no more message will be added -> no more canister removed -> no quota increase?

This comment has been minimized.

Copy link
@dist1ll

dist1ll Feb 11, 2024

Author Member

Welcome back! :D

Actually, the previous implementation wouldn't have gotten stuck in a loop, because of the condition on line 196: if queue.bytes_included < quota {

If we reach a point where we invoke the break condition #2, then we falsely consider the canister to be "done" and remove it from the canisters collection.

To fix that premature eviction, we change the if predicate to queue.msgs.is_empty(). But that change alone would get us stuck, exactly in the way you are describing.

// messages crossed quota, where n is the number of round robin
// iterations - ITERATIONS_BEFORE_WEAKEN_INCLUDE_RULE.
// See documentation of [`ITERATIONS_BEFORE_WEAKEN_INCLUDE_RULE`].
if queue.msgs_included
>= std::cmp::max(
1,
round_robin_iter.saturating_sub(ITERATIONS_BEFORE_WEAKEN_INCLUDE_RULE),
)
&& (queue.bytes_included + ingress_size) > quota
{
break;
}

num_messages += 1;
accumulated_size += ingress_size;
queue.msgs_included += 1;
queue.bytes_included += ingress_size;
// The quota is not a hard limit. We always include the first message
// of each canister. This is why we check the third break criterion
Expand All @@ -191,9 +221,8 @@ impl IngressSelector for IngressManager {
queue.msgs.pop();
}

// If the canister didn't exceed the quota, we know that it has no or
// only invalid ingress messages left. Thus it's removed from the map.
if queue.bytes_included < quota {
// Swap-remove canisters with an empty queue.
if queue.msgs.is_empty() {
canisters.swap_remove(i);
// iterate again over current index because of swap_remove
} else {
Expand Down Expand Up @@ -2065,7 +2094,7 @@ mod tests {
let msgs: Vec<SignedIngress> = payload.try_into().unwrap();

assert_eq!(
1,
2,
msgs.iter()
.filter(|m| m.canister_id() == canister_test_id(0))
.count()
Expand All @@ -2076,11 +2105,65 @@ mod tests {
.filter(|m| m.canister_id() == canister_test_id(1))
.count()
);
assert_eq!(
2,
// Greater-equals, because we can't rely on the order in which canisters
// are iterated over. If the canister_id(0) is earlier in the iteration
// order, we'll include 2 messages from canister_id(2) - otherwise it's 3.
assert!(
msgs.iter()
.filter(|m| m.canister_id() == canister_test_id(2))
.count()
>= 2
);
},
)
}
#[tokio::test]
async fn test_not_stuck() {
const MSG_SIZE: usize = 154;
const CANISTER_COUNT: usize = MSG_SIZE + 1;
const MAX_SIZE: usize = MSG_SIZE * (CANISTER_COUNT + 1);
let subnet_id = subnet_test_id(0);
let registry = setup_registry(subnet_id, MAX_SIZE);
let time = mock_time();

let mut small_payloads = Vec::new();

for i in 0..CANISTER_COUNT {
let (messages_0, canister_0) = generate_ingress_with_params(
canister_test_id(i as u64),
/* msg_count = */ 10,
/* bytes = */ 1,
time + Duration::from_secs(40),
);

small_payloads.push((messages_0, canister_0));
}

let mut replicated_state = ReplicatedStateBuilder::new().with_subnet_id(subnet_id);
for p in small_payloads.iter() {
replicated_state = replicated_state.with_canister(p.1.clone());
}

setup_with_params(
None,
Some((registry, subnet_id)),
None,
Some(replicated_state.build()),
|ingress_manager, ingress_pool| {
let validation_context = ValidationContext {
time,
registry_version: RegistryVersion::from(1),
certified_height: Height::from(0),
};
for p in small_payloads.into_iter() {
let timestamp = p.0[0].expiry_time();
insert_unvalidated_ingress_with_timestamp(p.0, &ingress_pool, timestamp);
}
// This should not get stuck. If it does, the ingress selector has a bug.
ingress_manager.get_ingress_payload(
&HashSet::new(),
&validation_context,
NumBytes::new(MAX_SIZE as u64),
);
},
)
Expand Down

0 comments on commit 67730a2

Please sign in to comment.