From 67730a29e2c5272bcf7c3ad23e9ffa7309e1aede Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20Ali=C4=87?= Date: Fri, 2 Feb 2024 01:29:30 +0000 Subject: [PATCH] fix(consensus): Stop ingress selector prematurely removing canisters from selection --- rs/ingress_manager/src/ingress_selector.rs | 105 ++++++++++++++++++--- 1 file changed, 94 insertions(+), 11 deletions(-) diff --git a/rs/ingress_manager/src/ingress_selector.rs b/rs/ingress_manager/src/ingress_selector.rs index f3c4ed6e9c6..8163236ffec 100644 --- a/rs/ingress_manager/src/ingress_selector.rs +++ b/rs/ingress_manager/src/ingress_selector.rs @@ -30,6 +30,21 @@ use ic_types::{ use ic_validator::RequestValidationError; use std::{collections::BTreeMap, collections::HashMap, sync::Arc}; +/// Number of round-robin iterations that need to happen, before we weaken the selection +/// rule #2. This weakening helps the ingress selector progress when the quota is either +/// not increasing fast enough or in the worst case stuck. +/// +/// Strong inclusion rule: +/// The quota is a hard limit, with the exception of a canister's *first* message. +/// +/// Weak inclusion rule: +/// The quota is a hard limit, with the exception of a canister's first *n* +/// messages, where n is the current round-robin iteration count. +/// +/// The weak rule compromises on fairness to ensure our ingress selector doesn't get +/// stuck. +const ITERATIONS_BEFORE_WEAKEN_INCLUDE_RULE: u32 = 4; + impl IngressSelector for IngressManager { fn get_ingress_payload( &self, @@ -91,6 +106,7 @@ impl IngressSelector for IngressManager { struct CanisterQueue<'a> { /// Number of bytes the canister's queue that was included in ingress bytes_included: usize, + msgs_included: u32, msgs: Vec<&'a ValidatedIngressArtifact>, } @@ -134,9 +150,12 @@ impl IngressSelector for IngressManager { let mut canisters: Vec<_> = canister_queues.keys().cloned().collect(); - // Iterate over all canisters until the payload is full or no messages are left + // Do round-robin iterations until the payload is full or no messages are left + let mut round_robin_iter: u32 = 0; 'outer: while !canister_queues.is_empty() { - // Iterate over canisters, selecting messages in a quota-based round-robin fashion + round_robin_iter += 1; + // Execute a single round-robin iteration, by looping through the canisters + // and selecting messages up bound by per-canister quota and payload size. let mut i = 0; while i < canisters.len() { let canister_id = canisters[i]; @@ -154,6 +173,8 @@ impl IngressSelector for IngressManager { num_messages, &mut cycles_needed, ); + // Any message that generates validation errors gets removed from + // the canister's queue. match result { Ok(()) => (), Err(ValidationError::Permanent( @@ -175,14 +196,23 @@ impl IngressSelector for IngressManager { break 'outer; } - // Break criterion #2: canister with at least one included message - // crossed quota - if queue.bytes_included > 0 && (queue.bytes_included + ingress_size) > quota { + // Break criterion #2: canister with at least max(1, n) included + // messages crossed quota, where n is the number of round robin + // iterations - ITERATIONS_BEFORE_WEAKEN_INCLUDE_RULE. + // See documentation of [`ITERATIONS_BEFORE_WEAKEN_INCLUDE_RULE`]. + if queue.msgs_included + >= std::cmp::max( + 1, + round_robin_iter.saturating_sub(ITERATIONS_BEFORE_WEAKEN_INCLUDE_RULE), + ) + && (queue.bytes_included + ingress_size) > quota + { break; } num_messages += 1; accumulated_size += ingress_size; + queue.msgs_included += 1; queue.bytes_included += ingress_size; // The quota is not a hard limit. We always include the first message // of each canister. This is why we check the third break criterion @@ -191,9 +221,8 @@ impl IngressSelector for IngressManager { queue.msgs.pop(); } - // If the canister didn't exceed the quota, we know that it has no or - // only invalid ingress messages left. Thus it's removed from the map. - if queue.bytes_included < quota { + // Swap-remove canisters with an empty queue. + if queue.msgs.is_empty() { canisters.swap_remove(i); // iterate again over current index because of swap_remove } else { @@ -2065,7 +2094,7 @@ mod tests { let msgs: Vec = payload.try_into().unwrap(); assert_eq!( - 1, + 2, msgs.iter() .filter(|m| m.canister_id() == canister_test_id(0)) .count() @@ -2076,11 +2105,65 @@ mod tests { .filter(|m| m.canister_id() == canister_test_id(1)) .count() ); - assert_eq!( - 2, + // Greater-equals, because we can't rely on the order in which canisters + // are iterated over. If the canister_id(0) is earlier in the iteration + // order, we'll include 2 messages from canister_id(2) - otherwise it's 3. + assert!( msgs.iter() .filter(|m| m.canister_id() == canister_test_id(2)) .count() + >= 2 + ); + }, + ) + } + #[tokio::test] + async fn test_not_stuck() { + const MSG_SIZE: usize = 154; + const CANISTER_COUNT: usize = MSG_SIZE + 1; + const MAX_SIZE: usize = MSG_SIZE * (CANISTER_COUNT + 1); + let subnet_id = subnet_test_id(0); + let registry = setup_registry(subnet_id, MAX_SIZE); + let time = mock_time(); + + let mut small_payloads = Vec::new(); + + for i in 0..CANISTER_COUNT { + let (messages_0, canister_0) = generate_ingress_with_params( + canister_test_id(i as u64), + /* msg_count = */ 10, + /* bytes = */ 1, + time + Duration::from_secs(40), + ); + + small_payloads.push((messages_0, canister_0)); + } + + let mut replicated_state = ReplicatedStateBuilder::new().with_subnet_id(subnet_id); + for p in small_payloads.iter() { + replicated_state = replicated_state.with_canister(p.1.clone()); + } + + setup_with_params( + None, + Some((registry, subnet_id)), + None, + Some(replicated_state.build()), + |ingress_manager, ingress_pool| { + let validation_context = ValidationContext { + time, + registry_version: RegistryVersion::from(1), + certified_height: Height::from(0), + }; + for p in small_payloads.into_iter() { + let timestamp = p.0[0].expiry_time(); + insert_unvalidated_ingress_with_timestamp(p.0, &ingress_pool, timestamp); + } + // This should not get stuck. If it does, the ingress selector has a bug. + ingress_manager.get_ingress_payload( + &HashSet::new(), + &validation_context, + NumBytes::new(MAX_SIZE as u64), ); }, )