Skip to content

Commit 05ffe45

Browse files
feat(MR): [MR-662] Track cycles lost when dropping messages (#4331)
When * a best-effort message is timed out or shed without a reject response refunding the cycles; or * a best-effort message is deduplicated; or * a message of any class is dropped because it is obviously misrouted (existing error cases where we already bump a critical error counter); add the cycles that had been attached to the message to the `DroppedMessages` counter of the `consumed_cycles_by_use_case` subnet metric.
1 parent 15cc473 commit 05ffe45

File tree

9 files changed

+528
-143
lines changed

9 files changed

+528
-143
lines changed

rs/messaging/src/routing/stream_handler.rs

Lines changed: 99 additions & 47 deletions
Large diffs are not rendered by default.

rs/messaging/src/routing/stream_handler/tests.rs

Lines changed: 104 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1406,7 +1406,7 @@ fn garbage_collect_local_state_with_legal_reject_signal_for_response_success() {
14061406
);
14071407
}
14081408

1409-
/// Tests that garbage collecting with a legal reject signal does raise a critical error.
1409+
/// Tests that garbage collecting with an illegal reject signal does raise a critical error.
14101410
#[test]
14111411
fn garbage_collect_local_state_with_illegal_reject_signal_for_response_success() {
14121412
garbage_collect_local_state_with_reject_signals_for_response_success_impl(
@@ -1504,6 +1504,16 @@ fn garbage_collect_local_state_with_reject_signals_for_request_from_absent_canis
15041504
..StreamConfig::default()
15051505
});
15061506
expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]);
1507+
// Cycles attached to the request / reject response are lost.
1508+
expected_state
1509+
.metadata
1510+
.subnet_metrics
1511+
.observe_consumed_cycles_with_use_case(
1512+
DroppedMessages,
1513+
message_in_stream(state.get_stream(&REMOTE_SUBNET), 21)
1514+
.cycles()
1515+
.into(),
1516+
);
15071517

15081518
// Act and compare to expected.
15091519
let mut available_guaranteed_response_memory =
@@ -1537,7 +1547,7 @@ fn garbage_collect_local_state_with_reject_signals_for_request_from_absent_canis
15371547

15381548
/// Tests that an incoming reject signal for a request to and from `LOCAL_CANISTER` in the stream
15391549
/// to `REMOTE_SUBNET` triggers locally generating a reject response that is successfully inducted
1540-
/// but the misrouted request (it should be in the loopback stream) raises a critical error.
1550+
/// but the misrouted request (it should have been in the loopback stream) raises a critical error.
15411551
#[test]
15421552
fn garbage_collect_local_state_with_reject_signals_for_misrouted_request() {
15431553
with_test_setup(
@@ -1674,7 +1684,7 @@ fn garbage_collect_local_state_with_reject_signals_for_request_from_migrating_ca
16741684
/// Calls `induct_stream_slices()` with one stream slice coming from `CANISTER_MIGRATION_SUBNET`
16751685
/// as input containing 3 messages:
16761686
/// - a reject response for a request sent to `REMOTE_CANISTER` from `LOCAL_CANISTER`,
1677-
/// - a data response with the same recipients.
1687+
/// - a reply with the same recipients.
16781688
/// - and a request with the same recipients.
16791689
///
16801690
/// `LOCAL_CANISTER` is marked as having migrated from `CANISTER_MIGRATION_SUBNET` to
@@ -1696,9 +1706,9 @@ fn induct_stream_slices_reject_response_from_old_host_subnet_is_accepted() {
16961706
messages: vec![
16971707
// ...a reject response for a request sent to `REMOTE_CANISTER` @0...
16981708
RejectResponse(*REMOTE_CANISTER, *LOCAL_CANISTER, RejectReason::QueueFull),
1699-
// ...a data response @1...
1709+
// ...a reply @1...
17001710
Response(*REMOTE_CANISTER, *LOCAL_CANISTER),
1701-
// ...and a request from @2.
1711+
// ...and a request @2.
17021712
Request(*REMOTE_CANISTER, *LOCAL_CANISTER),
17031713
],
17041714
..StreamSliceConfig::default()
@@ -1726,6 +1736,14 @@ fn induct_stream_slices_reject_response_from_old_host_subnet_is_accepted() {
17261736
});
17271737
expected_state.with_streams(btreemap![CANISTER_MIGRATION_SUBNET => expected_stream]);
17281738

1739+
// Cycles attached to the dropped reply and request are lost.
1740+
let cycles_lost = messages_in_slice(slices.get(&CANISTER_MIGRATION_SUBNET), 1..=2)
1741+
.fold(Cycles::zero(), |acc, msg| acc + msg.cycles());
1742+
expected_state
1743+
.metadata
1744+
.subnet_metrics
1745+
.observe_consumed_cycles_with_use_case(DroppedMessages, cycles_lost.into());
1746+
17291747
let mut available_guaranteed_response_memory =
17301748
stream_handler.available_guaranteed_response_memory(&state);
17311749
let inducted_state = stream_handler.induct_stream_slices(
@@ -1976,8 +1994,11 @@ fn check_stream_handler_generated_reject_signal_queue_full() {
19761994
let mut callback_id = 2;
19771995
while state
19781996
.push_input(
1979-
Request(*LOCAL_CANISTER, *LOCAL_CANISTER)
1980-
.build_with(CallbackId::new(callback_id), 0),
1997+
Request(*LOCAL_CANISTER, *LOCAL_CANISTER).build_with(
1998+
CallbackId::new(callback_id),
1999+
0,
2000+
Cycles::new(1),
2001+
),
19812002
&mut (i64::MAX / 2),
19822003
)
19832004
.is_ok()
@@ -2036,6 +2057,11 @@ fn duplicate_best_effort_response_is_dropped() {
20362057
..StreamConfig::default()
20372058
});
20382059
expected_state.with_streams(btreemap![LOCAL_SUBNET => loopback_stream]);
2060+
// Cycles of the duplicate response are lost.
2061+
expected_state
2062+
.metadata
2063+
.subnet_metrics
2064+
.observe_consumed_cycles_with_use_case(DroppedMessages, response.cycles().into());
20392065

20402066
// Push the clone of the best effort response onto the loopback stream.
20412067
state.modify_streams(|streams| streams.get_mut(&LOCAL_SUBNET).unwrap().push(response));
@@ -2071,6 +2097,7 @@ fn failing_to_induct_best_effort_response_does_not_raise_a_critical_error_impl(
20712097
}],
20722098
|stream_handler, mut state, metrics| {
20732099
prepare_state(&mut state);
2100+
let response = message_in_stream(state.get_stream(&LOCAL_SUBNET), 21).clone();
20742101

20752102
// Expecting an unchanged state...
20762103
let mut expected_state = state.clone();
@@ -2080,7 +2107,12 @@ fn failing_to_induct_best_effort_response_does_not_raise_a_critical_error_impl(
20802107
signals_end: 22,
20812108
..StreamConfig::default()
20822109
});
2083-
expected_state.with_streams(btreemap![LOCAL_SUBNET => loopback_stream]);
2110+
expected_state.with_streams(btreemap![LOCAL_SUBNET => loopback_stream.clone()]);
2111+
// Cycles attached to the dropped response are lost.
2112+
expected_state
2113+
.metadata
2114+
.subnet_metrics
2115+
.observe_consumed_cycles_with_use_case(DroppedMessages, response.cycles().into());
20842116

20852117
let inducted_state = stream_handler.induct_loopback_stream(state, &mut (i64::MAX / 2));
20862118
assert_eq!(expected_state, inducted_state);
@@ -2215,8 +2247,11 @@ fn legacy_check_stream_handler_generated_reject_response_queue_full() {
22152247
let mut callback_id = 2;
22162248
while state
22172249
.push_input(
2218-
Request(*LOCAL_CANISTER, *LOCAL_CANISTER)
2219-
.build_with(CallbackId::new(callback_id), 0),
2250+
Request(*LOCAL_CANISTER, *LOCAL_CANISTER).build_with(
2251+
CallbackId::new(callback_id),
2252+
0,
2253+
Cycles::new(1),
2254+
),
22202255
&mut (i64::MAX / 2),
22212256
)
22222257
.is_ok()
@@ -2275,7 +2310,7 @@ fn induct_stream_slices_partial_success() {
22752310
btreemap![REMOTE_SUBNET => StreamSliceConfig {
22762311
messages_begin: 43,
22772312
messages: vec![
2278-
// ...two incoming request @43 and @44...
2313+
// ...two incoming requests @43 and @44...
22792314
Request(*REMOTE_CANISTER, *LOCAL_CANISTER),
22802315
Request(*REMOTE_CANISTER, *LOCAL_CANISTER),
22812316
// ...an incoming response @45...
@@ -2325,6 +2360,16 @@ fn induct_stream_slices_partial_success() {
23252360
});
23262361
expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]);
23272362

2363+
// Cycles attached to the dropped requests from `LOCAL_CANISTER` and
2364+
// `UNKNOWN_CANISTER`; as well as those attached to the non-existent response to
2365+
// `OTHER_LOCAL_CANISTER` are lost.
2366+
let cycles_lost = messages_in_slice(slices.get(&REMOTE_SUBNET), 47..=49)
2367+
.fold(Cycles::zero(), |acc, msg| acc + msg.cycles());
2368+
expected_state
2369+
.metadata
2370+
.subnet_metrics
2371+
.observe_consumed_cycles_with_use_case(DroppedMessages, cycles_lost.into());
2372+
23282373
let initial_available_guaranteed_response_memory =
23292374
stream_handler.available_guaranteed_response_memory(&state);
23302375
let mut available_guaranteed_response_memory =
@@ -2451,6 +2496,16 @@ fn legacy_induct_stream_slices_partial_success() {
24512496
});
24522497
expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]);
24532498

2499+
// Cycles attached to the dropped requests from `LOCAL_CANISTER` and
2500+
// `UNKNOWN_CANISTER`; as well as those attached to the non-existent response to
2501+
// `OTHER_LOCAL_CANISTER` are lost.
2502+
let cycles_lost = messages_in_slice(slices.get(&REMOTE_SUBNET), 47..=49)
2503+
.fold(Cycles::zero(), |acc, msg| acc + msg.cycles());
2504+
expected_state
2505+
.metadata
2506+
.subnet_metrics
2507+
.observe_consumed_cycles_with_use_case(DroppedMessages, cycles_lost.into());
2508+
24542509
let initial_available_guaranteed_response_memory =
24552510
stream_handler.available_guaranteed_response_memory(&state);
24562511
let mut available_guaranteed_response_memory =
@@ -2564,6 +2619,14 @@ fn induct_stream_slices_receiver_subnet_mismatch() {
25642619
});
25652620
expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]);
25662621

2622+
// Cycles attached to all messages in the slice are lost.
2623+
let cycles_lost = messages_in_slice(slices.get(&REMOTE_SUBNET), 43..=46)
2624+
.fold(Cycles::zero(), |acc, msg| acc + msg.cycles());
2625+
expected_state
2626+
.metadata
2627+
.subnet_metrics
2628+
.observe_consumed_cycles_with_use_case(DroppedMessages, cycles_lost.into());
2629+
25672630
let mut available_guaranteed_response_memory =
25682631
stream_handler.available_guaranteed_response_memory(&state);
25692632
let inducted_state = stream_handler.induct_stream_slices(
@@ -3367,6 +3430,16 @@ fn process_stream_slices_with_reject_signals_partial_success() {
33673430
REMOTE_SUBNET => expected_outgoing_stream,
33683431
CANISTER_MIGRATION_SUBNET => rerouted_stream,
33693432
]);
3433+
// Cycles attached to the dropped request from `UNKNOWN_CANISTER` are lost.
3434+
expected_state
3435+
.metadata
3436+
.subnet_metrics
3437+
.observe_consumed_cycles_with_use_case(
3438+
DroppedMessages,
3439+
message_in_slice(slices.get(&REMOTE_SUBNET), 154)
3440+
.cycles()
3441+
.into(),
3442+
);
33703443

33713444
// Act.
33723445
let inducted_state = stream_handler.process_stream_slices(state, slices);
@@ -3978,6 +4051,7 @@ fn with_test_setup_and_config(
39784051
// For all other messages a `other_callback_id` is used, i.e. something that
39794052
// simulates callback IDs generated in a different canister.
39804053
let mut other_callback_id = 0_u64;
4054+
let mut cycles = Cycles::new(1);
39814055
let mut messages_from_builders = |builders: Vec<MessageBuilder>| -> Vec<RequestOrResponse> {
39824056
builders
39834057
.into_iter()
@@ -3993,6 +4067,9 @@ fn with_test_setup_and_config(
39934067
(respondent, originator, NO_DEADLINE)
39944068
}
39954069
};
4070+
// Attach a unique (bit mask-like) number of cycles to each message (assuming
4071+
// that there are fewer than 127 messages).
4072+
cycles = cycles + cycles;
39964073

39974074
// Register a callback and make an input queue reservation if `msg_config`
39984075
// corresponds to `LOCAL_CANISTER`; else use a dummy callback id.
@@ -4022,11 +4099,15 @@ fn with_test_setup_and_config(
40224099
// Empty output queues.
40234100
canister_state.output_into_iter().count();
40244101

4025-
builder.build_with(callback_id, payload_size_bytes)
4102+
builder.build_with(callback_id, payload_size_bytes, cycles)
40264103
} else {
40274104
// Message will not be inducted, use a replacement
40284105
other_callback_id += 1;
4029-
builder.build_with(CallbackId::new(other_callback_id), payload_size_bytes)
4106+
builder.build_with(
4107+
CallbackId::new(other_callback_id),
4108+
payload_size_bytes,
4109+
cycles,
4110+
)
40304111
}
40314112
})
40324113
.collect()
@@ -4334,27 +4415,35 @@ enum MessageBuilder {
43344415
}
43354416

43364417
impl MessageBuilder {
4337-
fn build_with(self, callback_id: CallbackId, payload_size_bytes: usize) -> RequestOrResponse {
4418+
fn build_with(
4419+
self,
4420+
callback_id: CallbackId,
4421+
payload_size_bytes: usize,
4422+
cycles: Cycles,
4423+
) -> RequestOrResponse {
43384424
match self {
43394425
Self::Request(sender, receiver) => RequestBuilder::new()
43404426
.sender(sender)
43414427
.receiver(receiver)
43424428
.sender_reply_callback(callback_id)
43434429
.method_payload(vec![0_u8; payload_size_bytes])
4430+
.payment(cycles)
43444431
.build()
43454432
.into(),
43464433
Self::Response(respondent, originator) => ResponseBuilder::new()
43474434
.respondent(respondent)
43484435
.originator(originator)
43494436
.originator_reply_callback(callback_id)
43504437
.response_payload(Payload::Data(vec![0_u8; payload_size_bytes]))
4438+
.refund(cycles)
43514439
.build()
43524440
.into(),
43534441
Self::BestEffortResponse(respondent, originator, deadline) => ResponseBuilder::new()
43544442
.respondent(respondent)
43554443
.originator(originator)
43564444
.originator_reply_callback(callback_id)
43574445
.response_payload(Payload::Data(vec![0_u8; payload_size_bytes]))
4446+
.refund(cycles)
43584447
.deadline(deadline)
43594448
.build()
43604449
.into(),
@@ -4364,6 +4453,7 @@ impl MessageBuilder {
43644453
.sender(originator)
43654454
.receiver(respondent)
43664455
.sender_reply_callback(callback_id)
4456+
.payment(cycles)
43674457
.build(),
43684458
),
43694459
}

rs/messaging/src/state_machine.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use ic_interfaces::time_source::system_time_now;
1111
use ic_logger::{error, fatal, ReplicaLogger};
1212
use ic_query_stats::deliver_query_stats;
1313
use ic_registry_subnet_features::SubnetFeatures;
14+
use ic_replicated_state::canister_state::system_state::CyclesUseCase::DroppedMessages;
1415
use ic_replicated_state::{NetworkTopology, ReplicatedState};
1516
use ic_types::batch::Batch;
1617
use ic_types::{ExecutionRound, NumBytes};
@@ -122,10 +123,14 @@ impl StateMachine for StateMachineImpl {
122123
}
123124

124125
// Time out expired messages.
125-
let timed_out_messages = state.time_out_messages();
126+
let (timed_out_messages, lost_cycles) = state.time_out_messages();
126127
self.metrics
127128
.timed_out_messages_total
128129
.inc_by(timed_out_messages as u64);
130+
state
131+
.metadata
132+
.subnet_metrics
133+
.observe_consumed_cycles_with_use_case(DroppedMessages, lost_cycles.into());
129134
self.observe_phase_duration(PHASE_TIME_OUT_MESSAGES, &since);
130135

131136
// Time out expired callbacks.
@@ -209,12 +214,16 @@ impl StateMachine for StateMachineImpl {
209214

210215
let since = Instant::now();
211216
// Shed enough messages to stay below the best-effort message memory limit.
212-
let (shed_messages, shed_message_bytes) = state_after_stream_builder
217+
let (shed_messages, shed_message_bytes, lost_cycles) = state_after_stream_builder
213218
.enforce_best_effort_message_limit(self.best_effort_message_memory_capacity);
214219
self.metrics.shed_messages_total.inc_by(shed_messages);
215220
self.metrics
216221
.shed_message_bytes_total
217222
.inc_by(shed_message_bytes.get());
223+
state_after_stream_builder
224+
.metadata
225+
.subnet_metrics
226+
.observe_consumed_cycles_with_use_case(DroppedMessages, lost_cycles.into());
218227
self.observe_phase_duration(PHASE_SHED_MESSAGES, &since);
219228

220229
state_after_stream_builder

0 commit comments

Comments
 (0)