@@ -231,15 +231,27 @@ impl StreamHandler for StreamHandlerImpl {
231
231
} ) ;
232
232
}
233
233
234
+ // A lower bound running estimate of the subnet's avaliable message memory. It
235
+ // accurately reflects all memory allocated by inducted and rejected messages
236
+ // and released by inducting responses; but not the changes to
237
+ //`Streams::responses_size_bytes` (the size of responses already routed to
238
+ // streams), as some of its entries may refer to deleted or migrated canisters.
239
+ let mut subnet_available_memory = self . subnet_available_memory ( & state) ;
240
+
234
241
// Induct our own loopback stream first, if one exists and has any messages.
235
- state = self . induct_loopback_stream ( state) ;
242
+ state = self . induct_loopback_stream ( state, & mut subnet_available_memory) ;
243
+ debug_assert ! ( self . subnet_available_memory( & state) >= subnet_available_memory) ;
236
244
237
245
// Garbage collect our stream state based on the contents of the slices.
238
246
state = self . garbage_collect_local_state ( state, & stream_slices) ;
247
+ debug_assert ! ( self . subnet_available_memory( & state) >= subnet_available_memory) ;
239
248
self . observe_backlog_durations ( & stream_slices) ;
240
249
241
250
// Induct the messages in `stream_slices`, updating signals as appropriate.
242
- self . induct_stream_slices ( state, stream_slices)
251
+ let state = self . induct_stream_slices ( state, stream_slices, & mut subnet_available_memory) ;
252
+ debug_assert ! ( self . subnet_available_memory( & state) >= subnet_available_memory) ;
253
+
254
+ state
243
255
}
244
256
}
245
257
@@ -250,6 +262,9 @@ impl StreamHandlerImpl {
250
262
/// responses or rerouted responses and no signals. All initial messages and
251
263
/// corresponding signals will have been garbage collected.
252
264
///
265
+ /// Updates `subnet_available_memory` to reflect change in memory usage, in such
266
+ /// a way that it remains a lower-bound estimate of the actual available memory.
267
+ ///
253
268
/// The sequence of steps is as follows:
254
269
///
255
270
/// 1. All messages in the loopback stream ("initial messages"; cloned,
@@ -262,7 +277,11 @@ impl StreamHandlerImpl {
262
277
/// loopback stream.
263
278
/// 4. Any rejected `Responses` collected at step (2) are rerouted into the
264
279
/// appropriate streams as per the routing table.
265
- fn induct_loopback_stream ( & self , mut state : ReplicatedState ) -> ReplicatedState {
280
+ fn induct_loopback_stream (
281
+ & self ,
282
+ mut state : ReplicatedState ,
283
+ subnet_available_memory : & mut i64 ,
284
+ ) -> ReplicatedState {
266
285
let loopback_stream = state. get_stream ( & self . subnet_id ) ;
267
286
268
287
// All done if the loopback stream does not exist or is empty.
@@ -285,7 +304,7 @@ impl StreamHandlerImpl {
285
304
// 1. Induct all messages. This will add signals to the loopback stream.
286
305
let mut stream_slices = BTreeMap :: new ( ) ;
287
306
stream_slices. insert ( self . subnet_id , loopback_stream_slice) ;
288
- state = self . induct_stream_slices ( state, stream_slices) ;
307
+ state = self . induct_stream_slices ( state, stream_slices, subnet_available_memory ) ;
289
308
290
309
let mut streams = state. take_streams ( ) ;
291
310
// We know for sure that the loopback stream exists, so it is safe to unwrap.
@@ -515,15 +534,15 @@ impl StreamHandlerImpl {
515
534
///
516
535
/// See [`Self::induct_message`] for the possible outcomes of inducting a
517
536
/// message.
537
+ ///
538
+ /// Updates `subnet_available_memory` to reflect change in memory usage, in such
539
+ /// a way that it remains a lower-bound estimate of the actual available memory.
518
540
fn induct_stream_slices (
519
541
& self ,
520
542
mut state : ReplicatedState ,
521
543
stream_slices : BTreeMap < SubnetId , StreamSlice > ,
544
+ subnet_available_memory : & mut i64 ,
522
545
) -> ReplicatedState {
523
- let memory_taken = state. memory_taken ( ) ;
524
- let message_memory_taken = memory_taken. messages ( ) ;
525
- let mut subnet_available_memory =
526
- self . subnet_message_memory_capacity . get ( ) as i64 - message_memory_taken. get ( ) as i64 ;
527
546
let mut streams = state. take_streams ( ) ;
528
547
529
548
for ( remote_subnet_id, mut stream_slice) in stream_slices {
@@ -538,7 +557,7 @@ impl StreamHandlerImpl {
538
557
stream_index,
539
558
& mut state,
540
559
& mut stream,
541
- & mut subnet_available_memory,
560
+ subnet_available_memory,
542
561
) ;
543
562
}
544
563
}
@@ -596,43 +615,45 @@ impl StreamHandlerImpl {
596
615
let payload_size = msg. payload_size_bytes ( ) . get ( ) ;
597
616
match receiver_host_subnet {
598
617
// Matching receiver subnet, try inducting message.
599
- Some ( host_subnet) if host_subnet == self . subnet_id => match state
600
- . push_input ( msg, subnet_available_memory)
601
- {
602
- // Message successfully inducted, all done.
603
- Ok ( ( ) ) => {
604
- self . observe_inducted_message_status ( msg_type, LABEL_VALUE_SUCCESS ) ;
605
- self . observe_inducted_payload_size ( payload_size) ;
606
- }
618
+ Some ( host_subnet) if host_subnet == self . subnet_id => {
619
+ match state. push_input ( msg, subnet_available_memory) {
620
+ // Message successfully inducted, all done.
621
+ Ok ( ( ) ) => {
622
+ self . observe_inducted_message_status ( msg_type, LABEL_VALUE_SUCCESS ) ;
623
+ self . observe_inducted_payload_size ( payload_size) ;
624
+ }
607
625
608
- // Message not inducted.
609
- Err ( ( err, msg) ) => {
610
- self . observe_inducted_message_status ( msg_type, err. to_label_value ( ) ) ;
626
+ // Message not inducted.
627
+ Err ( ( err, msg) ) => {
628
+ self . observe_inducted_message_status ( msg_type, err. to_label_value ( ) ) ;
611
629
612
- match msg {
613
- RequestOrResponse :: Request ( _) => {
614
- debug ! (
630
+ match msg {
631
+ RequestOrResponse :: Request ( _) => {
632
+ debug ! (
615
633
self . log,
616
634
"Induction failed with error '{}', generating reject Response for {:?}" ,
617
635
& err,
618
636
& msg
619
637
) ;
620
- let code = reject_code_for_state_error ( & err) ;
621
- stream. push ( generate_reject_response ( msg, code, err. to_string ( ) ) )
622
- }
623
- RequestOrResponse :: Response ( response) => {
624
- // Critical error, responses should always be inducted successfully.
625
- error ! (
626
- self . log,
627
- "{}: Inducting response failed: {:?}" ,
628
- CRITICAL_ERROR_INDUCT_RESPONSE_FAILED ,
629
- response
630
- ) ;
631
- self . metrics . critical_error_induct_response_failed . inc ( )
638
+ let code = reject_code_for_state_error ( & err) ;
639
+ * subnet_available_memory -= stream
640
+ . push ( generate_reject_response ( msg, code, err. to_string ( ) ) )
641
+ as i64 ;
642
+ }
643
+ RequestOrResponse :: Response ( response) => {
644
+ // Critical error, responses should always be inducted successfully.
645
+ error ! (
646
+ self . log,
647
+ "{}: Inducting response failed: {:?}" ,
648
+ CRITICAL_ERROR_INDUCT_RESPONSE_FAILED ,
649
+ response
650
+ ) ;
651
+ self . metrics . critical_error_induct_response_failed . inc ( )
652
+ }
632
653
}
633
654
}
634
655
}
635
- } ,
656
+ }
636
657
637
658
// Receiver canister is migrating to/from this subnet.
638
659
Some ( host_subnet) if self . should_reroute_message_to ( & msg, host_subnet, state) => {
@@ -646,11 +667,11 @@ impl StreamHandlerImpl {
646
667
host_subnet
647
668
) ;
648
669
debug ! ( self . log, "Canister {} is being migrated, generating reject response for {:?}" , msg. receiver( ) , msg) ;
649
- stream. push ( generate_reject_response (
670
+ * subnet_available_memory -= stream. push ( generate_reject_response (
650
671
msg,
651
672
RejectCode :: SysTransient ,
652
673
reject_message,
653
- ) ) ;
674
+ ) ) as i64 ;
654
675
}
655
676
656
677
RequestOrResponse :: Response ( _) => {
@@ -787,6 +808,14 @@ impl StreamHandlerImpl {
787
808
. unwrap_or ( false )
788
809
}
789
810
811
+ /// Computes the subnet's available message memory, as the difference
812
+ /// between the subnet's message memory capacity and its current usage.
813
+ fn subnet_available_memory ( & self , state : & ReplicatedState ) -> i64 {
814
+ let memory_taken = state. memory_taken ( ) ;
815
+ let message_memory_taken = memory_taken. messages ( ) ;
816
+ self . subnet_message_memory_capacity . get ( ) as i64 - message_memory_taken. get ( ) as i64
817
+ }
818
+
790
819
/// Observes "time in backlog" (since learning about their existence from
791
820
/// the stream header) for each of the given inducted messages.
792
821
fn observe_backlog_durations ( & self , stream_slices : & BTreeMap < SubnetId , StreamSlice > ) {
0 commit comments