Skip to content

Commit

Permalink
fix: fairness of the Ingress Messages to Management Canister
Browse files Browse the repository at this point in the history
  • Loading branch information
dragoljub-duric committed Feb 6, 2024
1 parent 293b694 commit b34272a
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 80 deletions.
26 changes: 18 additions & 8 deletions rs/execution_environment/tests/dts.rs
@@ -1,5 +1,6 @@
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use assert_matches::assert_matches;
use candid::Encode;
use ic_config::{
embedders::{Config as EmbeddersConfig, MeteringType},
Expand Down Expand Up @@ -586,8 +587,8 @@ fn dts_pending_upgrade_with_heartbeat() {
///
/// The expectations:
/// - the install code messages run one by one.
/// - the canister status messages are blocked by the corresponding
/// install code messages.
/// - the canister status messages are completed immediately except for one
/// for the canister on which the code install is running.
#[test]
fn dts_scheduling_of_install_code() {
if should_skip_test_due_to_disabled_dts() {
Expand Down Expand Up @@ -658,7 +659,7 @@ fn dts_scheduling_of_install_code() {

for _ in 0..5 {
// With checkpoints enabled, the first install code will be repeatedly
// aborted, so there will be no progress.
// aborted, so there will be no progress for other install code messages.
env.tick();
}

Expand All @@ -672,7 +673,8 @@ fn dts_scheduling_of_install_code() {

let mut status = vec![];

// All other ingress messages are blocked by the first install code message.
// All other canister status messages are completed except for the canister
// on which the code install is running.
for c in canister.iter().take(n - 1) {
let id = env.send_ingress(
user_id,
Expand All @@ -685,7 +687,7 @@ fn dts_scheduling_of_install_code() {

for _ in 0..5 {
// With checkpoints enabled, the first install code will be repeatedly
// aborted, so there will be no progress.
// aborted, so there will be no progress for other install code messages.
env.tick();
}

Expand All @@ -696,10 +698,18 @@ fn dts_scheduling_of_install_code() {
}
}

for s in status.iter().take(n - 1) {
assert_eq!(
// The canister status ingress message for the canister on which
// the code is installing is blocked.
assert_eq!(
ingress_state(env.ingress_status(&status[0])),
Some(IngressState::Received)
);

// Canister status ingress messages for all other canisters are executed.
for s in status.iter().take(n - 1).skip(1) {
assert_matches!(
ingress_state(env.ingress_status(s)),
Some(IngressState::Received)
Some(IngressState::Completed(..))
);
}

Expand Down
23 changes: 11 additions & 12 deletions rs/replicated_state/src/canister_state/queues.rs
Expand Up @@ -41,7 +41,7 @@ pub const REQUEST_LIFETIME: Duration = Duration::from_secs(300);
pub struct CanisterQueuesLoopDetector {
pub local_queue_skip_count: usize,
pub remote_queue_skip_count: usize,
pub skipped_ingress_queue: bool,
pub ingress_queue_skip_count: usize,
}

impl CanisterQueuesLoopDetector {
Expand All @@ -53,16 +53,17 @@ impl CanisterQueuesLoopDetector {
let skipped_all_local =
self.local_queue_skip_count >= canister_queues.local_subnet_input_schedule.len();

let ingress_is_empty = canister_queues.peek_ingress().is_none();
let skipped_all_ingress =
self.ingress_queue_skip_count >= canister_queues.ingress_queue.ingress_schedule_size();

// An empty queue is skipped implicitly by `peek_input()` and `pop_input()`.
// This means that no new messages can be consumed from an input source if
// - either it is empty,
// - or all its queues were explicitly skipped.
// Note that `skipped_all_remote` and `skipped_all_local` are trivially
// true if the corresponding input source is empty because empty queues
// are removed from the source.
skipped_all_remote && skipped_all_local && (ingress_is_empty || self.skipped_ingress_queue)
// Note that `skipped_all_remote`, `skipped_all_local`, and `skipped_all_ingress`
// are trivially true if the corresponding input source is empty because empty
// queues are removed from the source.
skipped_all_remote && skipped_all_local && skipped_all_ingress
}
}

Expand Down Expand Up @@ -264,7 +265,7 @@ impl CanisterQueues {
}

/// Peeks the next ingress message from `ingress_queue`.
fn peek_ingress(&self) -> Option<&Arc<Ingress>> {
fn peek_ingress(&self) -> Option<Arc<Ingress>> {
self.ingress_queue.peek()
}

Expand Down Expand Up @@ -468,10 +469,7 @@ impl CanisterQueues {
// Try all 3 inputs: Ingress, Local, and Remote subnets
for _ in 0..3 {
let next_input = match self.next_input_queue {
NextInputQueue::Ingress => self
.peek_ingress()
.map(|ingress| CanisterMessage::Ingress(Arc::clone(ingress))),

NextInputQueue::Ingress => self.peek_ingress().map(CanisterMessage::Ingress),
NextInputQueue::RemoteSubnet => {
self.peek_canister_input(InputQueueType::RemoteSubnet)
}
Expand Down Expand Up @@ -501,7 +499,8 @@ impl CanisterQueues {
let current_input_queue = self.next_input_queue;
match current_input_queue {
NextInputQueue::Ingress => {
loop_detector.skipped_ingress_queue = true;
self.ingress_queue.skip_ingress_input();
loop_detector.ingress_queue_skip_count += 1;
self.next_input_queue = NextInputQueue::RemoteSubnet
}

Expand Down

0 comments on commit b34272a

Please sign in to comment.