Skip to content

Commit

Permalink
chore: Assume that Callback::originator, respondent and prepayment_.*…
Browse files Browse the repository at this point in the history
… are always present
  • Loading branch information
alin-at-dfinity committed Jan 30, 2024
1 parent f3198ee commit e7c2f86
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 164 deletions.
19 changes: 2 additions & 17 deletions rs/execution_environment/src/execution/response.rs
Expand Up @@ -159,20 +159,13 @@ impl ResponseHelper {
// possible response when the request is being sent. Now that we
// have received the response, we can refund the cycles based on
// the actual size of the response.
let prepayment_for_response_transmission =
match original.callback.prepayment_for_response_transmission() {
Some(cycles) => cycles,
None => round
.cycles_account_manager
.prepayment_for_response_transmission(original.subnet_size),
};
let refund_for_response_transmission = round
.cycles_account_manager
.refund_for_response_transmission(
round.log,
round.counters.response_cycles_refund_error,
response,
prepayment_for_response_transmission,
original.callback.prepayment_for_response_transmission,
original.subnet_size,
);

Expand Down Expand Up @@ -536,19 +529,11 @@ impl ResponseHelper {
round.counters.ingress_with_cycles_error,
);

let prepayment_for_response_execution =
match original.callback.prepayment_for_response_execution() {
Some(cycles) => cycles,
None => round
.cycles_account_manager
.prepayment_for_response_execution(original.subnet_size),
};

round.cycles_account_manager.refund_unused_execution_cycles(
&mut self.canister.system_state,
instructions_left,
original.message_instruction_limit,
prepayment_for_response_execution,
original.callback.prepayment_for_response_execution,
round.counters.execution_refund_error,
original.subnet_size,
round.log,
Expand Down
19 changes: 0 additions & 19 deletions rs/execution_environment/src/scheduler.rs
Expand Up @@ -2054,8 +2054,6 @@ fn observe_replicated_state_metrics(
let mut canisters_not_in_routing_table = 0;
let mut canisters_with_old_open_call_contexts = 0;
let mut old_call_contexts_count = 0;
let mut callbacks_without_originator = 0;
let mut callbacks_without_prepayment = 0;
let mut num_stop_canister_calls_without_call_id = 0;

let canister_id_ranges = state.routing_table().ranges(own_subnet_id);
Expand Down Expand Up @@ -2129,17 +2127,6 @@ fn observe_replicated_state_metrics(
old_call_contexts_count += old_call_contexts.len();
canisters_with_old_open_call_contexts += 1;
}

for callback in manager.callbacks().values() {
if callback.originator().is_none() || callback.respondent().is_none() {
callbacks_without_originator += 1;
}
if callback.prepayment_for_response_execution().is_none()
|| callback.prepayment_for_response_transmission().is_none()
{
callbacks_without_prepayment += 1;
}
}
}
});
metrics
Expand All @@ -2150,12 +2137,6 @@ fn observe_replicated_state_metrics(
.canisters_with_old_open_call_contexts
.with_label_values(&[OLD_CALL_CONTEXT_LABEL_ONE_DAY])
.set(canisters_with_old_open_call_contexts as i64);
metrics
.callbacks_without_originator
.set(callbacks_without_originator as i64);
metrics
.callbacks_without_prepayment
.set(callbacks_without_prepayment as i64);
let streams_response_bytes = state
.metadata
.streams()
Expand Down
10 changes: 0 additions & 10 deletions rs/execution_environment/src/scheduler/scheduler_metrics.rs
Expand Up @@ -91,8 +91,6 @@ pub(super) struct SchedulerMetrics {
pub(super) canister_install_code_debits: Histogram,
pub(super) old_open_call_contexts: IntGaugeVec,
pub(super) canisters_with_old_open_call_contexts: IntGaugeVec,
pub(super) callbacks_without_originator: IntGauge,
pub(super) callbacks_without_prepayment: IntGauge,
pub(super) canister_invariants: IntCounter,
pub(super) scheduler_compute_allocation_invariant_broken: IntCounter,
pub(super) scheduler_cores_invariant_broken: IntCounter,
Expand Down Expand Up @@ -594,14 +592,6 @@ impl SchedulerMetrics {
"Number of canisters with call contexts that have been open for more than the given age.",
&["age"]
),
callbacks_without_originator: metrics_registry.int_gauge(
"scheduler_callbacks_without_originator",
"Number of callbacks (likely from before February 2022) without originator or respondent recorded."
),
callbacks_without_prepayment: metrics_registry.int_gauge(
"scheduler_callbacks_without_prepayment",
"Number of callbacks (likely from before February 2022) with no response prepayment amounts recorded."
),
canister_invariants: metrics_registry.error_counter(CANISTER_INVARIANT_BROKEN),
scheduler_compute_allocation_invariant_broken: metrics_registry.error_counter(SCHEDULER_COMPUTE_ALLOCATION_INVARIANT_BROKEN),
scheduler_cores_invariant_broken: metrics_registry.error_counter(SCHEDULER_CORES_INVARIANT_BROKEN),
Expand Down
Expand Up @@ -400,28 +400,19 @@ impl CallContextManager {
/// or if the response is not valid.
pub(crate) fn validate_response(&self, response: &Response) -> Result<(), StateError> {
match self.callback(response.originator_reply_callback) {
Some(callback) => {
// (EXC-877) Once this is deployed in production,
// it's safe to make `respondent` and `originator` non-optional.
// Currently optional to ensure backwards compatibility.
match (callback.respondent(), callback.originator()) {
(Some(respondent), Some(originator))
if response.respondent != respondent
|| response.originator != originator =>
{
Err(StateError::NonMatchingResponse {
err_str: format!(
"invalid details, expected => [originator => {}, respondent => {}], but got response with",
originator, respondent,
),
originator: response.originator,
callback_id: response.originator_reply_callback,
respondent: response.respondent,
})
}
_ => Ok(()),
}
Some(callback) if response.respondent != callback.respondent
|| response.originator != callback.originator => {
Err(StateError::NonMatchingResponse {
err_str: format!(
"invalid details, expected => [originator => {}, respondent => {}], but got response with",
callback.originator, callback.respondent,
),
originator: response.originator,
callback_id: response.originator_reply_callback,
respondent: response.respondent,
})
}
Some(_) => Ok(()),
None => {
// Received an unknown callback ID.
Err(StateError::NonMatchingResponse {
Expand Down
@@ -1,10 +1,7 @@
use super::*;
use ic_test_utilities::types::ids::canister_test_id;
use ic_test_utilities_time::mock_time;
use ic_types::{
messages::RequestMetadata,
methods::{WasmClosure, UNKNOWN_CANISTER_ID},
};
use ic_types::{messages::RequestMetadata, methods::WasmClosure};

#[test]
fn call_context_origin() {
Expand Down Expand Up @@ -78,8 +75,8 @@ fn call_context_handling() {
// First call (CallContext 1) makes two outgoing calls
let callback_id1 = call_context_manager.register_callback(Callback::new(
call_context_id1,
UNKNOWN_CANISTER_ID,
UNKNOWN_CANISTER_ID,
canister_test_id(1),
canister_test_id(2),
Cycles::zero(),
Cycles::new(42),
Cycles::new(84),
Expand All @@ -89,8 +86,8 @@ fn call_context_handling() {
));
let callback_id2 = call_context_manager.register_callback(Callback::new(
call_context_id1,
UNKNOWN_CANISTER_ID,
UNKNOWN_CANISTER_ID,
canister_test_id(1),
canister_test_id(2),
Cycles::zero(),
Cycles::new(43),
Cycles::new(85),
Expand All @@ -105,8 +102,8 @@ fn call_context_handling() {
// Second one (CallContext 2) has one outgoing call
let callback_id3 = call_context_manager.register_callback(Callback::new(
call_context_id2,
UNKNOWN_CANISTER_ID,
UNKNOWN_CANISTER_ID,
canister_test_id(1),
canister_test_id(2),
Cycles::zero(),
Cycles::new(44),
Cycles::new(86),
Expand Down Expand Up @@ -309,8 +306,8 @@ fn test_call_context_instructions_executed_is_updated() {
// Register a callback, so the call context is not deleted in `on_canister_result()` later.
let _callback_id = call_context_manager.register_callback(Callback::new(
call_context_id,
UNKNOWN_CANISTER_ID,
UNKNOWN_CANISTER_ID,
canister_test_id(1),
canister_test_id(2),
Cycles::zero(),
Cycles::new(42),
Cycles::new(84),
Expand Down
25 changes: 0 additions & 25 deletions rs/replicated_state/tests/canister_state.rs
Expand Up @@ -9,7 +9,6 @@ use ic_test_utilities::{
use ic_test_utilities_time::mock_time;
use ic_types::{
messages::{CallbackId, Request, RequestOrResponse},
methods::UNKNOWN_CANISTER_ID,
xnet::QueueId,
};
use std::sync::Arc;
Expand Down Expand Up @@ -255,27 +254,3 @@ fn validate_responses_against_callback_details() {
.push_input(input_response_from(canister_b_id, callback_id_1))
.unwrap();
}

#[test]
fn validate_responses_against_callback_without_originator_or_respondent() {
let mut fixture = CanisterFixture::running();

// A callback with no `originator` or `respondent` recorded.
let callback_id = CallbackId::from(CALLBACK_ID_RAW);
register_callback(
&mut fixture.canister_state,
UNKNOWN_CANISTER_ID,
UNKNOWN_CANISTER_ID,
callback_id,
);
fixture.with_input_reservation();

// Any response addressed to this canister is valid.
let response = ResponseBuilder::new()
.originator(CANISTER_ID)
.respondent(OTHER_CANISTER_ID)
.originator_reply_callback(callback_id)
.build()
.into();
fixture.push_input(response).unwrap();
}
67 changes: 7 additions & 60 deletions rs/types/types/src/methods.rs
Expand Up @@ -232,13 +232,9 @@ pub const UNKNOWN_CANISTER_ID: CanisterId =
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Callback {
pub call_context_id: CallContextId,
/// The request's sender ID.
///
/// `UNKNOWN_CANISTER_ID` if the `Callback` was created before February 2022.
/// The request sender's ID.
pub originator: CanisterId,
/// The ID of the principal that the request was addressed to.
///
/// `UNKNOWN_CANISTER_ID` if the `Callback` was created before February 2022.
pub respondent: CanisterId,
/// The number of cycles that were sent in the original request.
pub cycles_sent: Cycles,
Expand Down Expand Up @@ -283,68 +279,19 @@ impl Callback {
on_cleanup,
}
}

pub fn originator(&self) -> Option<CanisterId> {
if self.originator == UNKNOWN_CANISTER_ID {
None
} else {
Some(self.originator)
}
}

pub fn respondent(&self) -> Option<CanisterId> {
if self.respondent == UNKNOWN_CANISTER_ID {
None
} else {
Some(self.respondent)
}
}

pub fn prepayment_for_response_execution(&self) -> Option<Cycles> {
if self.prepayment_for_response_execution.is_zero() {
None
} else {
Some(self.prepayment_for_response_execution)
}
}

pub fn prepayment_for_response_transmission(&self) -> Option<Cycles> {
if self.prepayment_for_response_transmission.is_zero() {
None
} else {
Some(self.prepayment_for_response_transmission)
}
}
}

impl From<&Callback> for pb::Callback {
fn from(item: &Callback) -> Self {
Self {
call_context_id: item.call_context_id.get(),
originator: if item.originator == UNKNOWN_CANISTER_ID {
None
} else {
Some(pb_types::CanisterId::from(item.originator))
},
respondent: if item.respondent == UNKNOWN_CANISTER_ID {
None
} else {
Some(pb_types::CanisterId::from(item.respondent))
},
originator: Some(pb_types::CanisterId::from(item.originator)),
respondent: Some(pb_types::CanisterId::from(item.respondent)),
cycles_sent: Some(item.cycles_sent.into()),
prepayment_for_response_execution: if item.prepayment_for_response_execution.is_zero() {
None
} else {
Some(item.prepayment_for_response_execution.into())
},
prepayment_for_response_transmission: if item
.prepayment_for_response_transmission
.is_zero()
{
None
} else {
Some(item.prepayment_for_response_transmission.into())
},
prepayment_for_response_execution: Some(item.prepayment_for_response_execution.into()),
prepayment_for_response_transmission: Some(
item.prepayment_for_response_transmission.into(),
),
on_reply: Some(pb::WasmClosure {
func_idx: item.on_reply.func_idx,
env: item.on_reply.env,
Expand Down

0 comments on commit e7c2f86

Please sign in to comment.