From b34272ac0eccc832235c955a61a7bbc1e99f3957 Mon Sep 17 00:00:00 2001 From: Dragoljub Duric Date: Tue, 6 Feb 2024 13:38:04 +0000 Subject: [PATCH] fix: fairness of the Ingress Messages to Management Canister --- rs/execution_environment/tests/dts.rs | 26 ++- .../src/canister_state/queues.rs | 23 +- .../src/canister_state/queues/queue.rs | 199 ++++++++++++++---- .../src/canister_state/queues/queue/tests.rs | 195 +++++++++++++++-- .../src/canister_state/queues/tests.rs | 5 +- .../src/types/messages/ingress.rs | 6 + 6 files changed, 374 insertions(+), 80 deletions(-) diff --git a/rs/execution_environment/tests/dts.rs b/rs/execution_environment/tests/dts.rs index 711ec22862e..5e1323369b3 100644 --- a/rs/execution_environment/tests/dts.rs +++ b/rs/execution_environment/tests/dts.rs @@ -1,5 +1,6 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use assert_matches::assert_matches; use candid::Encode; use ic_config::{ embedders::{Config as EmbeddersConfig, MeteringType}, @@ -586,8 +587,8 @@ fn dts_pending_upgrade_with_heartbeat() { /// /// The expectations: /// - the install code messages run one by one. -/// - the canister status messages are blocked by the corresponding -/// install code messages. +/// - the canister status messages are completed immediately except for one +/// for the canister on which the code install is running. #[test] fn dts_scheduling_of_install_code() { if should_skip_test_due_to_disabled_dts() { @@ -658,7 +659,7 @@ fn dts_scheduling_of_install_code() { for _ in 0..5 { // With checkpoints enabled, the first install code will be repeatedly - // aborted, so there will be no progress. + // aborted, so there will be no progress for other install code messages. env.tick(); } @@ -672,7 +673,8 @@ fn dts_scheduling_of_install_code() { let mut status = vec![]; - // All other ingress messages are blocked by the first install code message. + // All other canister status messages are completed except for the canister + // on which the code install is running. for c in canister.iter().take(n - 1) { let id = env.send_ingress( user_id, @@ -685,7 +687,7 @@ fn dts_scheduling_of_install_code() { for _ in 0..5 { // With checkpoints enabled, the first install code will be repeatedly - // aborted, so there will be no progress. + // aborted, so there will be no progress for other install code messages. env.tick(); } @@ -696,10 +698,18 @@ fn dts_scheduling_of_install_code() { } } - for s in status.iter().take(n - 1) { - assert_eq!( + // The canister status ingress message for the canister on which + // the code is installing is blocked. + assert_eq!( + ingress_state(env.ingress_status(&status[0])), + Some(IngressState::Received) + ); + + // Canister status ingress messages for all other canisters are executed. + for s in status.iter().take(n - 1).skip(1) { + assert_matches!( ingress_state(env.ingress_status(s)), - Some(IngressState::Received) + Some(IngressState::Completed(..)) ); } diff --git a/rs/replicated_state/src/canister_state/queues.rs b/rs/replicated_state/src/canister_state/queues.rs index f37c379b88b..422c38b1fb3 100644 --- a/rs/replicated_state/src/canister_state/queues.rs +++ b/rs/replicated_state/src/canister_state/queues.rs @@ -41,7 +41,7 @@ pub const REQUEST_LIFETIME: Duration = Duration::from_secs(300); pub struct CanisterQueuesLoopDetector { pub local_queue_skip_count: usize, pub remote_queue_skip_count: usize, - pub skipped_ingress_queue: bool, + pub ingress_queue_skip_count: usize, } impl CanisterQueuesLoopDetector { @@ -53,16 +53,17 @@ impl CanisterQueuesLoopDetector { let skipped_all_local = self.local_queue_skip_count >= canister_queues.local_subnet_input_schedule.len(); - let ingress_is_empty = canister_queues.peek_ingress().is_none(); + let skipped_all_ingress = + self.ingress_queue_skip_count >= canister_queues.ingress_queue.ingress_schedule_size(); // An empty queue is skipped implicitly by `peek_input()` and `pop_input()`. // This means that no new messages can be consumed from an input source if // - either it is empty, // - or all its queues were explicitly skipped. - // Note that `skipped_all_remote` and `skipped_all_local` are trivially - // true if the corresponding input source is empty because empty queues - // are removed from the source. - skipped_all_remote && skipped_all_local && (ingress_is_empty || self.skipped_ingress_queue) + // Note that `skipped_all_remote`, `skipped_all_local`, and `skipped_all_ingress` + // are trivially true if the corresponding input source is empty because empty + // queues are removed from the source. + skipped_all_remote && skipped_all_local && skipped_all_ingress } } @@ -264,7 +265,7 @@ impl CanisterQueues { } /// Peeks the next ingress message from `ingress_queue`. - fn peek_ingress(&self) -> Option<&Arc> { + fn peek_ingress(&self) -> Option> { self.ingress_queue.peek() } @@ -468,10 +469,7 @@ impl CanisterQueues { // Try all 3 inputs: Ingress, Local, and Remote subnets for _ in 0..3 { let next_input = match self.next_input_queue { - NextInputQueue::Ingress => self - .peek_ingress() - .map(|ingress| CanisterMessage::Ingress(Arc::clone(ingress))), - + NextInputQueue::Ingress => self.peek_ingress().map(CanisterMessage::Ingress), NextInputQueue::RemoteSubnet => { self.peek_canister_input(InputQueueType::RemoteSubnet) } @@ -501,7 +499,8 @@ impl CanisterQueues { let current_input_queue = self.next_input_queue; match current_input_queue { NextInputQueue::Ingress => { - loop_detector.skipped_ingress_queue = true; + self.ingress_queue.skip_ingress_input(); + loop_detector.ingress_queue_skip_count += 1; self.next_input_queue = NextInputQueue::RemoteSubnet } diff --git a/rs/replicated_state/src/canister_state/queues/queue.rs b/rs/replicated_state/src/canister_state/queues/queue.rs index bcb3e5af671..aabae61cfbe 100644 --- a/rs/replicated_state/src/canister_state/queues/queue.rs +++ b/rs/replicated_state/src/canister_state/queues/queue.rs @@ -2,10 +2,12 @@ use crate::StateError; #[cfg(test)] mod tests; +use ic_base_types::CanisterId; use ic_protobuf::proxy::ProxyDecodeError; use ic_protobuf::state::{ingress::v1 as pb_ingress, queues::v1 as pb_queues}; use ic_types::messages::{Ingress, Request, RequestOrResponse, Response}; use ic_types::{CountBytes, Cycles, Time}; +use std::collections::BTreeMap; use std::{ collections::VecDeque, convert::{From, TryFrom, TryInto}, @@ -813,40 +815,117 @@ impl TryFrom for OutputQueue { } } -/// Representation of the Ingress queue. There is no upper bound on +/// Representation of the Ingress queue. There is no upper bound on /// the number of messages it can store. +/// +/// `IngressQueue` has a separate queue of Ingress messages for each +/// target canister based on `effective_canister_id`, and `schedule` +/// of executing target canisters with incoming Ingress messages. +/// +/// When the Ingress message is pushed to the `IngressQueue`, it is added +/// to the queue of Ingress messages of the target canister. If the target +/// canister does not have other incoming Ingress messages it is added to +/// the back of `schedule`. +/// +/// When `pop()` is called `IngressQueue` returns the first Ingress message +/// from the canister at the front of the `schedlue`. If that canister +/// has other incoming Ingress messages it is moved to the +/// back of `schedule`, otherwise it is removed from `schedule`. +/// +/// When `skip_ingress_input()` is called canister from the front of the +/// `schedule` is moved to its back. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub(super) struct IngressQueue { - queue: VecDeque>, - + // Schedule of canisters that have Ingress messages to be processed. + // Because `effective_canister_id` of `Ingress` message has type Option, + // the same type is used for entries `schedule` and keys in `queues`. + schedule: VecDeque>, + // Per canister queue of Ingress messages. + queues: BTreeMap, VecDeque>>, + // Total number of Ingress messages that are waiting to be executed. + total_ingress_count: usize, /// Estimated size in bytes. size_bytes: usize, } +const PER_CANISTER_QUEUE_OVERHEAD_BYTES: usize = + size_of::>() + size_of::>>(); + impl IngressQueue { + /// Pushes a new ingress message to the back of the queue. pub(super) fn push(&mut self, msg: Ingress) { - self.size_bytes += Self::ingress_size_bytes(&msg); - self.queue.push_back(Arc::new(msg)); - debug_assert_eq!(Self::size_bytes(&self.queue), self.size_bytes); + let msg_size = Self::ingress_size_bytes(&msg); + let receiver_ingress_queue = self.queues.entry(msg.effective_canister_id).or_default(); + + if receiver_ingress_queue.is_empty() { + self.schedule.push_back(msg.effective_canister_id); + self.size_bytes += PER_CANISTER_QUEUE_OVERHEAD_BYTES; + } + + receiver_ingress_queue.push_back(Arc::new(msg)); + + self.size_bytes += msg_size; + debug_assert_eq!(Self::size_bytes(&self.queues), self.size_bytes); + + self.total_ingress_count += 1; } + /// Returns `None` if the queue is empty, otherwise removes the first Ingress + /// message of the first scheduled canister, returns it, and moves + /// that canister at the end of the schedule if it has more messages. pub(super) fn pop(&mut self) -> Option> { - let res = self.queue.pop_front(); - if let Some(msg) = res.as_ref() { - self.size_bytes -= Self::ingress_size_bytes(msg.as_ref()); - debug_assert_eq!(Self::size_bytes(&self.queue), self.size_bytes); + let canister_id = self.schedule.pop_front()?; + + let canister_ingress_queue = self.queues.get_mut(&canister_id).unwrap(); + + let res = canister_ingress_queue.pop_front(); + + if !canister_ingress_queue.is_empty() { + self.schedule.push_back(canister_id); + } else { + self.queues.remove(&canister_id); + self.size_bytes -= PER_CANISTER_QUEUE_OVERHEAD_BYTES; } - res + + let msg = res.unwrap(); + self.size_bytes -= Self::ingress_size_bytes(msg.as_ref()); + debug_assert_eq!(Self::size_bytes(&self.queues), self.size_bytes); + + self.total_ingress_count -= 1; + + Some(msg) } - pub(super) fn peek(&self) -> Option<&Arc> { - self.queue.front() + /// Skips the ingress messages for the currently scheduled canister, + /// and moves the canister to the end of scheduling queue. + pub(super) fn skip_ingress_input(&mut self) { + if let Some(canister_id) = self.schedule.pop_front() { + self.schedule.push_back(canister_id); + } } + /// Returns a reference to the ingress message at the front of the queue, + /// or `None` if the queue is empty. + pub(super) fn peek(&self) -> Option> { + let canister_id = self.schedule.front()?; + // It is safe to unwrap here since for every value in `self.schedule` + // we must have corresponding non-empty queue in `self.queues`. + let ingress = self.queues.get(canister_id).unwrap().front().unwrap(); + Some(Arc::clone(ingress)) + } + + /// Returns the number of Ingress messages in the queue. pub(super) fn size(&self) -> usize { - self.queue.len() + self.total_ingress_count + } + + /// Returns the number of canisters with incoming ingress messages. + pub(super) fn ingress_schedule_size(&self) -> usize { + self.schedule.len() } + /// Return true if there are no Ingress messages in the queue, + /// or false otherwise. pub(super) fn is_empty(&self) -> bool { self.size() == 0 } @@ -859,44 +938,70 @@ impl IngressQueue { where F: FnMut(&Arc) -> bool, { - // This method operates in place, visiting each element exactly once in the - // original order, and preserves the order of the dropped elements. let mut filtered_messages = vec![]; - self.queue.retain_mut(|item| { - if filter(item) { - true - } else { - filtered_messages.push(Arc::clone(item)); + for canister_ingress_queue in self.queues.values_mut() { + canister_ingress_queue.retain_mut(|item| { + if filter(item) { + true + } else { + // Empty `canister_ingress_queues` and their corresponding schedule entry + // are pruned below. + filtered_messages.push(Arc::clone(item)); + self.size_bytes -= Self::ingress_size_bytes(&(*item)); + self.total_ingress_count -= 1; + false + } + }); + } + + self.schedule.retain_mut(|canister_id| { + let canister_ingress_queue = self.queues.get(canister_id).unwrap(); + if canister_ingress_queue.is_empty() { + self.queues.remove(canister_id); + self.size_bytes -= PER_CANISTER_QUEUE_OVERHEAD_BYTES; false + } else { + true } }); - self.size_bytes = Self::size_bytes(&self.queue); + filtered_messages } + /// Returns an estimate of the size of an ingress message in bytes. + fn ingress_size_bytes(msg: &Ingress) -> usize { + size_of::>() + msg.count_bytes() + } + /// Calculates the size in bytes of an `IngressQueue` holding the given /// ingress messages. /// /// Time complexity: O(num_messages). - fn size_bytes(queue: &VecDeque>) -> usize { - size_of::() - + queue + fn size_bytes( + per_canister_queues: &BTreeMap, VecDeque>>, + ) -> usize { + let mut size = size_of::(); + for queue in per_canister_queues.values() { + size += queue .iter() .map(|i| Self::ingress_size_bytes(i)) .sum::() - } - - /// Returns an estimate of the size of an ingress message in bytes. - fn ingress_size_bytes(msg: &Ingress) -> usize { - size_of::>() + msg.count_bytes() + + PER_CANISTER_QUEUE_OVERHEAD_BYTES; + } + size } } impl Default for IngressQueue { fn default() -> Self { - let queue = Default::default(); - let size_bytes = Self::size_bytes(&queue); - Self { queue, size_bytes } + let queues = BTreeMap::new(); + let size_bytes = Self::size_bytes(&queues); + Self { + schedule: VecDeque::new(), + queues, + total_ingress_count: 0, + size_bytes, + } } } @@ -909,7 +1014,18 @@ impl CountBytes for IngressQueue { impl From<&IngressQueue> for Vec { fn from(item: &IngressQueue) -> Self { - item.queue.iter().map(|i| i.as_ref().into()).collect() + // When serializing the IngressQueue, we iterate over + // `schedule` and persist the queues in that order. + item.schedule + .iter() + .flat_map(|canister_id| { + item.queues + .get(canister_id) + .unwrap() + .iter() + .map(|v| pb_ingress::Ingress::from(&(**v))) + }) + .collect() } } @@ -917,12 +1033,15 @@ impl TryFrom> for IngressQueue { type Error = ProxyDecodeError; fn try_from(item: Vec) -> Result { - let queue = item - .into_iter() - .map(|i| i.try_into().map(Arc::new)) - .collect::, _>>()?; - let size_bytes = Self::size_bytes(&queue); + let mut res = Self::default(); - Ok(IngressQueue { queue, size_bytes }) + for ingress_pb in item { + // Because the contents of `Self::queues` were serialized in `Self::schedule` + // order, pushing the messages in that same order will implicitly reconstruct + // `Self::schedule`. + res.push(ingress_pb.try_into()?); + } + + Ok(res) } } diff --git a/rs/replicated_state/src/canister_state/queues/queue/tests.rs b/rs/replicated_state/src/canister_state/queues/queue/tests.rs index 3eba592fa56..889aedac5b5 100644 --- a/rs/replicated_state/src/canister_state/queues/queue/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/queue/tests.rs @@ -718,6 +718,7 @@ fn msg_from_number(num: u64) -> Ingress { IngressBuilder::default() .source(user_test_id(num)) .receiver(canister_test_id(num)) + .effective_canister_id(Some(canister_test_id(num))) .method_name(num.to_string()) .message_id(message_test_id(num)) .build() @@ -738,24 +739,6 @@ fn empty_and_len_agree_on_non_empty() { assert!(!q.is_empty()); } -#[test] -fn order_is_fifo() { - let mut q = IngressQueue::default(); - let msg1 = msg_from_number(1); - let msg2 = msg_from_number(2); - q.push(msg1.clone()); - q.push(msg2.clone()); - - assert_eq!(q.size(), 2); - assert_eq!(q.pop(), Some(msg1.into())); - - assert_eq!(q.size(), 1); - assert_eq!(q.pop(), Some(msg2.into())); - - assert_eq!(q.size(), 0); - assert_eq!(q.pop(), None); -} - #[test] fn ingress_filter() { let mut queue = IngressQueue::default(); @@ -772,3 +755,179 @@ fn ingress_filter() { assert_eq!(queue.size(), 1); assert_eq!(queue.pop(), Some(msg3.into())); } + +#[test] +fn ingress_queue_empty() { + let mut queue = IngressQueue::default(); + assert_eq!(queue.peek(), None); + assert_eq!(queue.pop(), None); + assert_eq!(queue.size(), 0); + assert_eq!(queue.ingress_schedule_size(), 0); + assert!(queue.is_empty()); +} + +#[test] +fn ingress_queue_round_robin_order() { + let mut queue = IngressQueue::default(); + // First ingress for canister A + let mut msg11 = msg_from_number(1); + msg11.message_id = message_test_id(11); + queue.push(msg11.clone()); + // First ingress for canister B + let mut msg21 = msg_from_number(2); + msg21.message_id = message_test_id(21); + queue.push(msg21.clone()); + // Second ingress for canister A + let mut msg22 = msg_from_number(2); + msg22.message_id = message_test_id(22); + queue.push(msg22.clone()); + // Second ingress for canister B + let mut msg12 = msg_from_number(1); + msg12.message_id = message_test_id(12); + queue.push(msg12.clone()); + + // We have 4 ingress messages for 2 canisters in the queue. + assert_eq!(queue.size(), 4); + assert_eq!(queue.ingress_schedule_size(), 2); + assert!(!queue.is_empty()); + + // The message on the front of queue is first message for canister A. + assert_eq!(queue.peek(), Some(msg11.clone().into())); + assert_eq!(queue.pop(), Some(msg11.into())); + + // We have 3 ingress messages for 2 canisters in the queue. + assert_eq!(queue.size(), 3); + assert_eq!(queue.ingress_schedule_size(), 2); + assert!(!queue.is_empty()); + + // The message on the front of queue is first message for canister B. + assert_eq!(queue.peek(), Some(msg21.clone().into())); + assert_eq!(queue.pop(), Some(msg21.into())); + + // We have 2 ingress messages for 2 canisters in the queue. + assert_eq!(queue.size(), 2); + assert_eq!(queue.ingress_schedule_size(), 2); + assert!(!queue.is_empty()); + + // The message on the front of queue is second message for canister A. + assert_eq!(queue.peek(), Some(msg12.clone().into())); + assert_eq!(queue.pop(), Some(msg12.into())); + + // We have 1 ingress message for 1 canister in the queue. + assert_eq!(queue.size(), 1); + assert_eq!(queue.ingress_schedule_size(), 1); + assert!(!queue.is_empty()); + + // The message on the front of queue is second message for canister B. + assert_eq!(queue.peek(), Some(msg22.clone().into())); + assert_eq!(queue.pop(), Some(msg22.into())); + + // The queue is empty. + assert_eq!(queue.size(), 0); + assert_eq!(queue.ingress_schedule_size(), 0); + assert!(queue.is_empty()); + + assert_eq!(queue.peek(), None); + assert_eq!(queue.pop(), None); +} + +#[test] +fn ingress_queue_round_robin_order_with_skipping_ingress_input() { + let mut queue = IngressQueue::default(); + // First ingress for canister A + let mut msg11 = msg_from_number(1); + msg11.message_id = message_test_id(11); + queue.push(msg11.clone()); + // First ingress for canister B + let mut msg21 = msg_from_number(2); + msg21.message_id = message_test_id(21); + queue.push(msg21.clone()); + // Second ingress for canister A + let mut msg22 = msg_from_number(2); + msg22.message_id = message_test_id(22); + queue.push(msg22.clone()); + // Second ingress for canister B + let mut msg12 = msg_from_number(1); + msg12.message_id = message_test_id(12); + queue.push(msg12.clone()); + + // We have 4 ingress messages for 2 canisters in the queue. + assert_eq!(queue.size(), 4); + assert_eq!(queue.ingress_schedule_size(), 2); + assert!(!queue.is_empty()); + + // The message on the front of queue is first message for canister A. + assert_eq!(queue.peek(), Some(msg11.clone().into())); + assert_eq!(queue.pop(), Some(msg11.into())); + + // We have 3 ingress messages for 2 canisters in the queue. + assert_eq!(queue.size(), 3); + assert_eq!(queue.ingress_schedule_size(), 2); + assert!(!queue.is_empty()); + + // The message on the front of queue is first message for canister B. + assert_eq!(queue.peek(), Some(msg21.clone().into())); + assert_eq!(queue.pop(), Some(msg21.into())); + + // We have 2 ingress messages for 2 canisters in the queue. + assert_eq!(queue.size(), 2); + assert_eq!(queue.ingress_schedule_size(), 2); + assert!(!queue.is_empty()); + + // The message on the front of queue is second message for canister A. + assert_eq!(queue.peek(), Some(msg12.clone().into())); + + // We are skipping the canister A. + queue.skip_ingress_input(); + + // We still have 2 ingress messages for 2 canisters in the queue. + assert_eq!(queue.size(), 2); + assert_eq!(queue.ingress_schedule_size(), 2); + assert!(!queue.is_empty()); + + // The message on the front of queue is second message for canister B. + assert_eq!(queue.peek(), Some(msg22.clone().into())); + assert_eq!(queue.pop(), Some(msg22.into())); + + // We have 1 ingress message for 1 canister in the queue. + assert_eq!(queue.size(), 1); + assert_eq!(queue.ingress_schedule_size(), 1); + assert!(!queue.is_empty()); + + // The message on the front of queue is second message for canister A. + assert_eq!(queue.peek(), Some(msg12.clone().into())); + assert_eq!(queue.pop(), Some(msg12.into())); + + // The queue is empty. + assert_eq!(queue.size(), 0); + assert_eq!(queue.ingress_schedule_size(), 0); + assert!(queue.is_empty()); + + assert_eq!(queue.peek(), None); + assert_eq!(queue.pop(), None); +} + +#[test] +fn serialize_deserialize_ingress_queue() { + let mut queue = IngressQueue::default(); + + let number_of_messages_per_canister = 5; + let number_of_canisters = 10; + + for i in 0..number_of_messages_per_canister { + for j in 0..number_of_canisters { + let mut ingress = msg_from_number(j); + ingress.message_id = message_test_id(i * number_of_canisters + j); + queue.push(ingress); + } + } + + let pb_vec_ingress: Vec = (&queue.clone()).into(); + let mut queue_deserialized = IngressQueue::try_from(pb_vec_ingress).unwrap(); + + while !queue.is_empty() { + assert_eq!(queue.pop(), queue_deserialized.pop()); + } + + assert!(queue_deserialized.is_empty()); +} diff --git a/rs/replicated_state/src/canister_state/queues/tests.rs b/rs/replicated_state/src/canister_state/queues/tests.rs index 91f74181a0d..ea407eef5d2 100644 --- a/rs/replicated_state/src/canister_state/queues/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/tests.rs @@ -725,7 +725,7 @@ fn test_skip_round_robin() { // Skip ingress. assert_eq!(queues.peek_input().unwrap(), ingress_input); queues.skip_input(&mut loop_detector); - assert!(loop_detector.skipped_ingress_queue); + assert_eq!(loop_detector.ingress_queue_skip_count, 1); assert!(!loop_detector.detected_loop(&queues)); let peeked_input = CanisterMessage::Request(Arc::new(local_requests.get(1).unwrap().clone())); @@ -736,12 +736,13 @@ fn test_skip_round_robin() { assert_eq!(queues.peek_input().unwrap(), ingress_input); queues.skip_input(&mut loop_detector); assert!(!loop_detector.detected_loop(&queues)); + assert_eq!(loop_detector.ingress_queue_skip_count, 2); // Skip local. let peeked_input = CanisterMessage::Request(Arc::new(local_requests.get(2).unwrap().clone())); assert_eq!(queues.peek_input().unwrap(), peeked_input); queues.skip_input(&mut loop_detector); - assert!(loop_detector.skipped_ingress_queue); + assert_eq!(loop_detector.ingress_queue_skip_count, 2); assert!(loop_detector.detected_loop(&queues)); } diff --git a/rs/test_utilities/src/types/messages/ingress.rs b/rs/test_utilities/src/types/messages/ingress.rs index fc236362cc5..4cc79a7289b 100644 --- a/rs/test_utilities/src/types/messages/ingress.rs +++ b/rs/test_utilities/src/types/messages/ingress.rs @@ -52,6 +52,12 @@ impl IngressBuilder { self } + /// Sets the `effective_canister_id` field. + pub fn effective_canister_id(mut self, effective_canister_id: Option) -> Self { + self.ingress.effective_canister_id = effective_canister_id; + self + } + /// Sets the `method_name` field. pub fn method_name(mut self, method_name: S) -> Self { self.ingress.method_name = method_name.to_string();