Skip to content

Commit 396b461

Browse files
stiegercIDX GitHub Automationalin-at-dfinity
authored
feat: [MR-636] Add size limits as fields to the stream builder (#3885)
The driver behind this change is to make smaller streams possible in state machine tests and also unit or prop tests. But for state machine tests especially, having to fill up an entire stream with 10k filler requests just to test something at capacity takes a long time. Since this change makes the stream limit constants public, removing the duplicate [MAX_STREAM_MESSAGES](https://sourcegraph.com/github.com/dfinity/ic/-/blob/rs/xnet/payload_builder/src/lib.rs?L229) in the payload builder as a small additional change makes sense here (even though it's not strictly related). --------- Co-authored-by: IDX GitHub Automation <infra+github-automation@dfinity.org> Co-authored-by: Alin Sinpalean <58422065+alin-at-dfinity@users.noreply.github.com>
1 parent 5d0fcce commit 396b461

File tree

10 files changed

+146
-74
lines changed

10 files changed

+146
-74
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ go_deps.bzl @dfinity/idx
113113
/rs/config/ @dfinity/consensus
114114
/rs/config/src/embedders.rs @dfinity/execution
115115
/rs/config/src/execution_environment.rs @dfinity/execution
116+
/rs/config/src/message_routing.rs @dfinity/ic-message-routing-owners
116117
/rs/config/src/state_manager.rs @dfinity/ic-message-routing-owners
117118
/rs/config/src/subnet_config.rs @dfinity/execution
118119
/rs/consensus/ @dfinity/consensus

rs/config/src/message_routing.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
use serde::{Deserialize, Serialize};
22

3+
/// Desired byte size of an outgoing stream.
4+
///
5+
/// At most `MAX_STREAM_MESSAGES` are enqueued into a stream; but only until its
6+
/// `count_bytes()` is greater than or equal to `TARGET_STREAM_SIZE_BYTES`.
7+
pub const TARGET_STREAM_SIZE_BYTES: usize = 10 * 1024 * 1024;
8+
9+
/// Maximum number of messages in a stream.
10+
///
11+
/// At most `MAX_STREAM_MESSAGES` are enqueued into a stream; but only until its
12+
/// `count_bytes()` is greater than or equal to `TARGET_STREAM_SIZE_BYTES`.
13+
pub const MAX_STREAM_MESSAGES: usize = 10_000;
14+
315
#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
416
#[serde(default)]
517
/// Message Routing replica config.

rs/messaging/src/message_routing.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::{
44
};
55
use ic_config::embedders::BestEffortResponsesFeature;
66
use ic_config::execution_environment::{BitcoinConfig, Config as HypervisorConfig};
7+
use ic_config::message_routing::{MAX_STREAM_MESSAGES, TARGET_STREAM_SIZE_BYTES};
78
use ic_cycles_account_manager::CyclesAccountManager;
89
use ic_interfaces::{crypto::ErrorReproducibility, execution_environment::ChainKeySettings};
910
use ic_interfaces::{
@@ -616,6 +617,7 @@ pub(crate) type NodePublicKeys = BTreeMap<NodeId, Vec<u8>>;
616617
pub(crate) type ApiBoundaryNodes = BTreeMap<NodeId, ApiBoundaryNodeEntry>;
617618

618619
impl BatchProcessorImpl {
620+
#[allow(clippy::too_many_arguments)]
619621
fn new(
620622
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
621623
certified_stream_store: Arc<dyn CertifiedStreamStore>,
@@ -624,6 +626,8 @@ impl BatchProcessorImpl {
624626
hypervisor_config: HypervisorConfig,
625627
cycles_account_manager: Arc<CyclesAccountManager>,
626628
subnet_id: SubnetId,
629+
max_stream_messages: usize,
630+
target_stream_size_bytes: usize,
627631
metrics: MessageRoutingMetrics,
628632
metrics_registry: &MetricsRegistry,
629633
log: ReplicaLogger,
@@ -657,6 +661,8 @@ impl BatchProcessorImpl {
657661
));
658662
let stream_builder = Box::new(routing::stream_builder::StreamBuilderImpl::new(
659663
subnet_id,
664+
max_stream_messages,
665+
target_stream_size_bytes,
660666
metrics_registry,
661667
&metrics,
662668
time_in_stream_metrics,
@@ -1491,6 +1497,11 @@ impl MessageRoutingImpl {
14911497
hypervisor_config,
14921498
cycles_account_manager,
14931499
subnet_id,
1500+
// Do NOT replace these constants. Stream limits must remain constant on mainnet,
1501+
// otherwise the payload builder might mistakenly identify subnets as dishonest.
1502+
// Changes must be carefully considered.
1503+
MAX_STREAM_MESSAGES,
1504+
TARGET_STREAM_SIZE_BYTES,
14941505
metrics.clone(),
14951506
metrics_registry,
14961507
log.clone(),
@@ -1512,6 +1523,8 @@ impl MessageRoutingImpl {
15121523
) -> Self {
15131524
let stream_builder = Box::new(routing::stream_builder::StreamBuilderImpl::new(
15141525
subnet_id,
1526+
MAX_STREAM_MESSAGES,
1527+
TARGET_STREAM_SIZE_BYTES,
15151528
metrics_registry,
15161529
&MessageRoutingMetrics::new(metrics_registry),
15171530
Arc::new(Mutex::new(LatencyMetrics::new_time_in_stream(
@@ -1597,8 +1610,7 @@ impl MessageRouting for MessageRoutingImpl {
15971610
}
15981611
}
15991612

1600-
/// An MessageRouting implementation that processes batches synchronously. Primarily used for
1601-
/// testing.
1613+
/// An MessageRouting implementation that processes batches synchronously. Used for state machine tests.
16021614
pub struct SyncMessageRouting {
16031615
batch_processor: Arc<Mutex<dyn BatchProcessor>>,
16041616
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
@@ -1616,6 +1628,8 @@ impl SyncMessageRouting {
16161628
hypervisor_config: HypervisorConfig,
16171629
cycles_account_manager: Arc<CyclesAccountManager>,
16181630
subnet_id: SubnetId,
1631+
max_stream_messages: usize,
1632+
target_stream_size_bytes: usize,
16191633
metrics_registry: &MetricsRegistry,
16201634
log: ReplicaLogger,
16211635
registry: Arc<dyn RegistryClient>,
@@ -1631,6 +1645,8 @@ impl SyncMessageRouting {
16311645
hypervisor_config,
16321646
cycles_account_manager,
16331647
subnet_id,
1648+
max_stream_messages,
1649+
target_stream_size_bytes,
16341650
metrics,
16351651
metrics_registry,
16361652
log.clone(),

rs/messaging/src/routing/stream_builder.rs

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,6 @@ struct StreamBuilderMetrics {
5353
pub critical_error_induct_response_failed: IntCounter,
5454
}
5555

56-
/// Desired byte size of an outgoing stream.
57-
///
58-
/// At most `MAX_STREAM_MESSAGES` are enqueued into a stream; but only until its
59-
/// `count_bytes()` is greater than or equal to `TARGET_STREAM_SIZE_BYTES`.
60-
const TARGET_STREAM_SIZE_BYTES: usize = 10 * 1024 * 1024;
61-
62-
/// Maximum number of messages in a stream.
63-
///
64-
/// At most `MAX_STREAM_MESSAGES` are enqueued into a stream; but only until its
65-
/// `count_bytes()` is greater than or equal to `TARGET_STREAM_SIZE_BYTES`.
66-
const MAX_STREAM_MESSAGES: usize = 10_000;
67-
6856
const METRIC_STREAM_MESSAGES: &str = "mr_stream_messages";
6957
const METRIC_STREAM_BYTES: &str = "mr_stream_bytes";
7058
const METRIC_STREAM_BEGIN: &str = "mr_stream_begin";
@@ -173,8 +161,14 @@ pub(crate) trait StreamBuilder: Send {
173161
fn build_streams(&self, state: ReplicatedState) -> ReplicatedState;
174162
}
175163

164+
/// Routes messages from canister output queues into streams, up to the specified limits.
165+
///
166+
/// At most `max_stream_messages` are enqueued into a stream; but only until its
167+
/// `count_bytes()` is greater than or equal to `target_stream_size_bytes`.
176168
pub(crate) struct StreamBuilderImpl {
177169
subnet_id: SubnetId,
170+
max_stream_messages: usize,
171+
target_stream_size_bytes: usize,
178172
metrics: StreamBuilderMetrics,
179173
time_in_stream_metrics: Arc<Mutex<LatencyMetrics>>,
180174

@@ -187,6 +181,8 @@ pub(crate) struct StreamBuilderImpl {
187181
impl StreamBuilderImpl {
188182
pub(crate) fn new(
189183
subnet_id: SubnetId,
184+
max_stream_messages: usize,
185+
target_stream_size_bytes: usize,
190186
metrics_registry: &MetricsRegistry,
191187
message_routing_metrics: &MessageRoutingMetrics,
192188
time_in_stream_metrics: Arc<Mutex<LatencyMetrics>>,
@@ -195,6 +191,8 @@ impl StreamBuilderImpl {
195191
) -> Self {
196192
Self {
197193
subnet_id,
194+
max_stream_messages,
195+
target_stream_size_bytes,
198196
metrics: StreamBuilderMetrics::new(metrics_registry, message_routing_metrics),
199197
time_in_stream_metrics,
200198
best_effort_responses,
@@ -270,15 +268,8 @@ impl StreamBuilderImpl {
270268
.observe(msg.payload_size_bytes().get() as f64);
271269
}
272270

273-
/// Implementation of `StreamBuilder::build_streams()` that takes a
274-
/// `target_stream_size_bytes` argument to limit how many messages will be
275-
/// routed into each stream.
276-
fn build_streams_impl(
277-
&self,
278-
mut state: ReplicatedState,
279-
max_stream_messages: usize,
280-
target_stream_size_bytes: usize,
281-
) -> ReplicatedState {
271+
/// Implementation of `StreamBuilder::build_streams()`.
272+
fn build_streams_impl(&self, mut state: ReplicatedState) -> ReplicatedState {
282273
/// Pops the previously peeked message.
283274
///
284275
/// Panics:
@@ -294,23 +285,20 @@ impl StreamBuilderImpl {
294285
message
295286
}
296287

297-
/// Tests whether a stream is over the message count limit, byte limit or (if
298-
/// directed at a system subnet) over `2 * SYSTEM_SUBNET_STREAM_MSG_LIMIT`.
299-
fn is_at_limit(
300-
stream: &btree_map::Entry<SubnetId, Stream>,
301-
max_stream_messages: usize,
302-
target_stream_size_bytes: usize,
303-
is_local_message: bool,
304-
destination_subnet_type: SubnetType,
305-
) -> bool {
288+
// Tests whether a stream is over the message count limit, byte limit or (if
289+
// directed at a system subnet) over `2 * SYSTEM_SUBNET_STREAM_MSG_LIMIT`.
290+
let is_at_limit = |stream: &btree_map::Entry<SubnetId, Stream>,
291+
is_local_message: bool,
292+
destination_subnet_type: SubnetType|
293+
-> bool {
306294
let stream = match stream {
307295
btree_map::Entry::Occupied(occupied_entry) => occupied_entry.get(),
308296
btree_map::Entry::Vacant(_) => return false,
309297
};
310298
let stream_messages_len = stream.messages().len();
311299

312-
if stream_messages_len >= max_stream_messages
313-
|| stream.count_bytes() >= target_stream_size_bytes
300+
if stream_messages_len >= self.max_stream_messages
301+
|| stream.count_bytes() >= self.target_stream_size_bytes
314302
{
315303
// At limit if message count or byte size limits (enforced across all outgoing
316304
// streams) are hit.
@@ -323,7 +311,7 @@ impl StreamBuilderImpl {
323311
!is_local_message
324312
&& destination_subnet_type == SubnetType::System
325313
&& stream_messages_len >= 2 * SYSTEM_SUBNET_STREAM_MSG_LIMIT
326-
}
314+
};
327315

328316
let mut streams = state.take_streams();
329317
let routing_table = state.routing_table();
@@ -373,8 +361,6 @@ impl StreamBuilderImpl {
373361
let dst_stream_entry = streams.entry(dst_subnet_id);
374362
if is_at_limit(
375363
&dst_stream_entry,
376-
max_stream_messages,
377-
target_stream_size_bytes,
378364
self.subnet_id == dst_subnet_id,
379365
destination_subnet_type,
380366
) {
@@ -588,6 +574,6 @@ impl StreamBuilderImpl {
588574

589575
impl StreamBuilder for StreamBuilderImpl {
590576
fn build_streams(&self, state: ReplicatedState) -> ReplicatedState {
591-
self.build_streams_impl(state, MAX_STREAM_MESSAGES, TARGET_STREAM_SIZE_BYTES)
577+
self.build_streams_impl(state)
592578
}
593579
}

rs/messaging/src/routing/stream_builder/tests.rs

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::message_routing::MessageRoutingMetrics;
22

33
use super::*;
44
use ic_base_types::NumSeconds;
5+
use ic_config::message_routing::{MAX_STREAM_MESSAGES, TARGET_STREAM_SIZE_BYTES};
56
use ic_error_types::RejectCode;
67
use ic_management_canister_types_private::Method;
78
use ic_registry_routing_table::{CanisterIdRange, RoutingTable};
@@ -336,9 +337,22 @@ fn build_streams_local_canisters() {
336337
}
337338

338339
#[test]
339-
fn build_streams_impl_at_limit_leaves_state_untouched() {
340+
fn build_streams_impl_at_message_limit_leaves_state_untouched() {
341+
build_streams_impl_at_limit_leaves_state_untouched_impl(0, usize::MAX);
342+
}
343+
344+
#[test]
345+
fn build_streams_impl_at_memory_limit_leaves_state_untouched() {
346+
build_streams_impl_at_limit_leaves_state_untouched_impl(usize::MAX, 0);
347+
}
348+
349+
fn build_streams_impl_at_limit_leaves_state_untouched_impl(
350+
max_stream_messages: usize,
351+
target_stream_size_bytes: usize,
352+
) {
340353
with_test_replica_logger(|log| {
341-
let (stream_builder, mut provided_state, metrics_registry) = new_fixture(&log);
354+
let (stream_builder, mut provided_state, metrics_registry) =
355+
new_fixture_with_limits(&log, max_stream_messages, target_stream_size_bytes);
342356
provided_state.metadata.network_topology.routing_table = Arc::new(RoutingTable::try_from(
343357
btreemap! {
344358
CanisterIdRange{ start: CanisterId::from(0), end: CanisterId::from(0xfff) } => REMOTE_SUBNET,
@@ -360,10 +374,7 @@ fn build_streams_impl_at_limit_leaves_state_untouched() {
360374
let expected_state = provided_state.clone();
361375

362376
// Act.
363-
let result_state = stream_builder.build_streams_impl(provided_state.clone(), usize::MAX, 0);
364-
assert_eq!(result_state, expected_state);
365-
366-
let result_state = stream_builder.build_streams_impl(provided_state, 0, usize::MAX);
377+
let result_state = stream_builder.build_streams_impl(provided_state.clone());
367378
assert_eq!(result_state, expected_state);
368379

369380
assert_eq!(
@@ -413,18 +424,26 @@ fn build_streams_impl_respects_limits(
413424
expected_messages: u64,
414425
) {
415426
with_test_replica_logger(|log| {
416-
let (stream_builder, mut provided_state, metrics_registry) = new_fixture(&log);
427+
let msgs = generate_messages_for_test(/* senders = */ 2, /* receivers = */ 2);
428+
let msg_count = msgs.len();
429+
// All messages returned by `generate_messages_for_test` are of the same size
430+
let msg_size = msgs.first().unwrap().count_bytes() as u64;
431+
432+
// Target stream size: stream struct plus `max_stream_messages_by_byte_size - 1`
433+
// messages plus 1 byte. Since this is a target / soft limit, it should ensure
434+
// that exactly `max_stream_messages_by_byte_size` messages (or
435+
// `max_stream_messages_by_byte_size * msg_size` bytes) are routed.
436+
let target_stream_size_bytes =
437+
size_of::<Stream>() + (max_stream_messages_by_byte_size - 1) * msg_size as usize + 1;
438+
439+
let (stream_builder, mut provided_state, metrics_registry) =
440+
new_fixture_with_limits(&log, max_stream_messages, target_stream_size_bytes);
417441
provided_state.metadata.network_topology.routing_table = Arc::new(RoutingTable::try_from(
418442
btreemap! {
419443
CanisterIdRange{ start: CanisterId::from(0), end: CanisterId::from(0xfff) } => REMOTE_SUBNET,
420444
},
421445
).unwrap());
422446

423-
let msgs = generate_messages_for_test(/* senders = */ 2, /* receivers = */ 2);
424-
let msg_count = msgs.len();
425-
// All messages returned by `generate_messages_for_test` are of the same size
426-
let msg_size = msgs.first().unwrap().count_bytes() as u64;
427-
428447
assert!(
429448
msg_count > expected_messages as usize,
430449
"Invalid test setup: msg_count ({}) must be greater than routed_messages ({})",
@@ -459,19 +478,8 @@ fn build_streams_impl_respects_limits(
459478
streams.insert(REMOTE_SUBNET, expected_stream);
460479
});
461480

462-
// Target stream size: stream struct plus `max_stream_messages_by_byte_size - 1`
463-
// messages plus 1 byte. Since this is a target / soft limit, it should ensure
464-
// that exactly `max_stream_messages_by_byte_size` messages (or
465-
// `max_stream_messages_by_byte_size * msg_size` bytes) are routed.
466-
let target_stream_size_bytes =
467-
size_of::<Stream>() + (max_stream_messages_by_byte_size - 1) * msg_size as usize + 1;
468-
469481
// Act.
470-
let result_state = stream_builder.build_streams_impl(
471-
provided_state,
472-
max_stream_messages,
473-
target_stream_size_bytes,
474-
);
482+
let result_state = stream_builder.build_streams_impl(provided_state);
475483

476484
assert_eq!(expected_state.canister_states, result_state.canister_states);
477485
assert_eq!(expected_state.metadata, result_state.metadata);
@@ -998,13 +1006,19 @@ fn build_streams_with_oversized_payloads() {
9981006
}
9991007

10001008
/// Sets up the `StreamHandlerImpl`, `ReplicatedState` and `MetricsRegistry` to
1001-
/// be used by a test.
1002-
fn new_fixture(log: &ReplicaLogger) -> (StreamBuilderImpl, ReplicatedState, MetricsRegistry) {
1009+
/// be used by a test using specific stream limits.
1010+
fn new_fixture_with_limits(
1011+
log: &ReplicaLogger,
1012+
max_stream_messages: usize,
1013+
target_stream_size_bytes: usize,
1014+
) -> (StreamBuilderImpl, ReplicatedState, MetricsRegistry) {
10031015
let mut state = ReplicatedState::new(LOCAL_SUBNET, SubnetType::Application);
10041016
state.metadata.batch_time = Time::from_nanos_since_unix_epoch(5);
10051017
let metrics_registry = MetricsRegistry::new();
10061018
let stream_builder = StreamBuilderImpl::new(
10071019
LOCAL_SUBNET,
1020+
max_stream_messages,
1021+
target_stream_size_bytes,
10081022
&metrics_registry,
10091023
&MessageRoutingMetrics::new(&metrics_registry),
10101024
Arc::new(Mutex::new(LatencyMetrics::new_time_in_stream(
@@ -1017,6 +1031,12 @@ fn new_fixture(log: &ReplicaLogger) -> (StreamBuilderImpl, ReplicatedState, Metr
10171031
(stream_builder, state, metrics_registry)
10181032
}
10191033

1034+
/// Sets up the `StreamHandlerImpl`, `ReplicatedState` and `MetricsRegistry` to
1035+
/// be used by a test using default stream limits.
1036+
fn new_fixture(log: &ReplicaLogger) -> (StreamBuilderImpl, ReplicatedState, MetricsRegistry) {
1037+
new_fixture_with_limits(log, MAX_STREAM_MESSAGES, TARGET_STREAM_SIZE_BYTES)
1038+
}
1039+
10201040
/// Simulates routing the given requests into a `StreamIndexedQueue` with the
10211041
/// given `start` index, until `byte_limit` is reached or exceeded.
10221042
///

0 commit comments

Comments
 (0)