Skip to content

Commit a2038d0

Browse files
committed
feat(CON-1242): QueryStats aggregation that is robust against malicious nodes
1 parent eb54bb5 commit a2038d0

File tree

14 files changed

+649
-224
lines changed

14 files changed

+649
-224
lines changed

rs/ingress_manager/src/ingress_selector.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,6 +1517,11 @@ mod tests {
15171517
None,
15181518
Some(
15191519
ReplicatedStateBuilder::default()
1520+
.with_node_ids(
1521+
(1..=SMALL_APP_SUBNET_MAX_SIZE as u64)
1522+
.map(node_test_id)
1523+
.collect(),
1524+
)
15201525
.with_canister(
15211526
CanisterStateBuilder::default()
15221527
.with_canister_id(canister_test_id(0))

rs/interfaces/src/query_stats.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ pub enum QueryStatsPermanentValidationError {
1010
InvalidNodeId { expected: NodeId, reported: NodeId },
1111
/// The epoch is lower than the aggregated height in the state manager
1212
EpochAlreadyAggregated {
13-
expected: QueryStatsEpoch,
14-
reported: QueryStatsEpoch,
13+
highest_aggregated_epoch: QueryStatsEpoch,
14+
payload_epoch: QueryStatsEpoch,
1515
},
1616
/// The epoch is higher than the certified height would allow for
1717
EpochTooHigh {
18-
expected: QueryStatsEpoch,
19-
reported: QueryStatsEpoch,
18+
max_valid_epoch: QueryStatsEpoch,
19+
payload_epoch: QueryStatsEpoch,
2020
},
2121
/// Stats for a [`CanisterId`] have been send twice
2222
DuplicateCanisterId(CanisterId),

rs/messaging/src/message_routing.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,6 @@ impl BatchProcessorImpl {
564564
stream_builder,
565565
log.clone(),
566566
metrics.clone(),
567-
hypervisor_config.query_stats_epoch_length,
568567
));
569568

570569
Self {

rs/messaging/src/scheduling/valid_set_rule/test.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use ic_test_utilities_state::{
1919
MockIngressHistory, ReplicatedStateBuilder,
2020
};
2121
use ic_test_utilities_types::{
22-
ids::{canister_test_id, message_test_id, subnet_test_id, user_test_id},
22+
ids::{canister_test_id, message_test_id, node_test_id, subnet_test_id, user_test_id},
2323
messages::SignedIngressBuilder,
2424
};
2525
use ic_types::{
@@ -464,6 +464,11 @@ fn canister_on_application_subnet_charges_for_ingress() {
464464
let own_subnet_type = SubnetType::Application;
465465
let own_subnet_id = subnet_test_id(0);
466466
let mut state = ReplicatedStateBuilder::new()
467+
.with_node_ids(
468+
(1..=SMALL_APP_SUBNET_MAX_SIZE as u64)
469+
.map(node_test_id)
470+
.collect(),
471+
)
467472
.with_subnet_type(own_subnet_type)
468473
.with_canister(
469474
CanisterStateBuilder::new()

rs/messaging/src/state_machine.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ pub(crate) struct StateMachineImpl {
3636
stream_builder: Box<dyn StreamBuilder>,
3737
log: ReplicaLogger,
3838
metrics: MessageRoutingMetrics,
39-
query_stats_epoch_length: u64,
4039
}
4140

4241
impl StateMachineImpl {
@@ -46,15 +45,13 @@ impl StateMachineImpl {
4645
stream_builder: Box<dyn StreamBuilder>,
4746
log: ReplicaLogger,
4847
metrics: MessageRoutingMetrics,
49-
query_stats_epoch_length: u64,
5048
) -> Self {
5149
Self {
5250
scheduler,
5351
demux,
5452
stream_builder,
5553
log,
5654
metrics,
57-
query_stats_epoch_length,
5855
}
5956
}
6057

@@ -86,9 +83,7 @@ impl StateMachine for StateMachineImpl {
8683
deliver_query_stats(
8784
query_stats,
8885
&mut state,
89-
batch.batch_number,
9086
&self.log,
91-
self.query_stats_epoch_length,
9287
&self.metrics.query_stats_metrics,
9388
);
9489
}

rs/messaging/src/state_machine/tests.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ fn state_machine_populates_network_topology() {
156156
fixture.stream_builder,
157157
log,
158158
fixture.metrics,
159-
ic_config::execution_environment::QUERY_STATS_EPOCH_LENGTH,
160159
));
161160

162161
assert_ne!(
@@ -191,7 +190,6 @@ fn test_delivered_batch(provided_batch: Batch) -> ReplicatedState {
191190
fixture.stream_builder,
192191
log,
193192
fixture.metrics,
194-
ic_config::execution_environment::QUERY_STATS_EPOCH_LENGTH,
195193
));
196194

197195
state_machine.execute_round(
@@ -292,7 +290,6 @@ fn test_batch_time_impl(
292290
fixture.stream_builder,
293291
log,
294292
fixture.metrics,
295-
ic_config::execution_environment::QUERY_STATS_EPOCH_LENGTH,
296293
);
297294

298295
assert_eq!(

rs/protobuf/def/state/stats/v1/stats.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ message Stats {
88
}
99

1010
message QueryStats {
11-
uint64 epoch = 1;
11+
uint64 highest_aggregated_epoch = 1;
1212
repeated QueryStatsInner query_stats = 2;
1313
}
1414

1515
message QueryStatsInner {
1616
types.v1.NodeId proposer = 1;
17+
uint64 epoch = 7;
1718
types.v1.CanisterId canister = 2;
1819
uint32 num_calls = 3;
1920
uint64 num_instructions = 4;

rs/protobuf/src/gen/state/state.stats.v1.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ pub struct Stats {
88
#[derive(Clone, PartialEq, ::prost::Message)]
99
pub struct QueryStats {
1010
#[prost(uint64, tag = "1")]
11-
pub epoch: u64,
11+
pub highest_aggregated_epoch: u64,
1212
#[prost(message, repeated, tag = "2")]
1313
pub query_stats: ::prost::alloc::vec::Vec<QueryStatsInner>,
1414
}
@@ -17,6 +17,8 @@ pub struct QueryStats {
1717
pub struct QueryStatsInner {
1818
#[prost(message, optional, tag = "1")]
1919
pub proposer: ::core::option::Option<super::super::super::types::v1::NodeId>,
20+
#[prost(uint64, tag = "7")]
21+
pub epoch: u64,
2022
#[prost(message, optional, tag = "2")]
2123
pub canister: ::core::option::Option<super::super::super::types::v1::CanisterId>,
2224
#[prost(uint32, tag = "3")]

rs/query_stats/src/metrics.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use ic_metrics::{buckets::decimal_buckets, MetricsRegistry};
2-
use prometheus::{HistogramVec, IntGauge};
2+
use prometheus::{HistogramVec, IntCounter, IntGauge};
3+
4+
pub(crate) const CRITICAL_ERROR_AGGREGATION_FAILURE: &str = "query_stats_aggregator_failure";
35

46
/// Metrics for query stats collector
57
///
@@ -77,6 +79,10 @@ pub struct QueryStatsAggregatorMetrics {
7779
/// This is lower than the epoch for which we collect stats, as there is
7880
/// a delay for propagating local query stats via consensus blocks.
7981
pub query_stats_aggregator_current_epoch: IntGauge,
82+
/// The number of records stored in the unaggregateed state
83+
pub query_stats_aggregator_num_records: IntGauge,
84+
/// Critical error occuring in aggregator
85+
pub query_stats_critical_error_aggregator_failure: IntCounter,
8086
}
8187

8288
impl QueryStatsAggregatorMetrics {
@@ -86,6 +92,12 @@ impl QueryStatsAggregatorMetrics {
8692
"query_stats_aggregator_current_epoch",
8793
"Current epoch of the query stats aggregator",
8894
),
95+
query_stats_aggregator_num_records: metrics_registry.int_gauge(
96+
"query_stats_aggregator_num_records",
97+
"The number of records stored in the unaggregateed state",
98+
),
99+
query_stats_critical_error_aggregator_failure: metrics_registry
100+
.error_counter(CRITICAL_ERROR_AGGREGATION_FAILURE),
89101
}
90102
}
91103
}

rs/query_stats/src/payload_builder.rs

Lines changed: 72 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::metrics::QueryStatsPayloadBuilderMetrics;
1+
use crate::{
2+
metrics::QueryStatsPayloadBuilderMetrics, state_machine::get_stats_for_node_id_and_epoch,
3+
};
24
use crossbeam_channel::{Receiver, TryRecvError};
35
use ic_interfaces::{
46
batch_payload::{BatchPayloadBuilder, PastPayload, ProposalContext},
@@ -237,8 +239,8 @@ impl QueryStatsPayloadBuilderImpl {
237239
if payload.epoch > max_valid_epoch {
238240
return Err(permanent_error(
239241
QueryStatsPermanentValidationError::EpochTooHigh {
240-
expected: max_valid_epoch,
241-
reported: payload.epoch,
242+
max_valid_epoch,
243+
payload_epoch: payload.epoch,
242244
},
243245
));
244246
}
@@ -321,33 +323,36 @@ impl QueryStatsPayloadBuilderImpl {
321323
let mut previous_ids = BTreeSet::<CanisterId>::new();
322324

323325
// Check that the epoch we are requesting has not been aggregated yet
324-
if epoch < state_stats.epoch.unwrap_or(0.into()) {
326+
// If there is no `highest_aggregated_epoch` in the state, we have not aggregated
327+
// any epochs, therefore we unwrap to `false`
328+
if state_stats
329+
.highest_aggregated_epoch
330+
.map(|highest_aggregated_epoch| epoch <= highest_aggregated_epoch)
331+
.unwrap_or(false)
332+
{
325333
warn!(
326334
every_n_seconds => 5,
327335
self.log,
328336
"QueryStats: requesting previous_ids for epoch {:?} that is below aggregated epoch {:?}",
329337
epoch,
330-
state_stats.epoch
338+
state_stats.highest_aggregated_epoch
331339
);
332340

333341
return Err(permanent_error(
334342
QueryStatsPermanentValidationError::EpochAlreadyAggregated {
335-
expected: state_stats.epoch.unwrap_or(0.into()),
336-
reported: epoch,
343+
highest_aggregated_epoch: state_stats
344+
.highest_aggregated_epoch
345+
.unwrap_or(0.into()),
346+
payload_epoch: epoch,
337347
},
338348
));
339349
}
340350

341-
// Check the certified state for stats already sent
342-
// Skip if certified state is not the same epoch
343-
if state_stats.epoch == Some(epoch) {
344-
previous_ids.extend(
345-
state_stats
346-
.stats
347-
.iter()
348-
.filter(|(_, stat_map)| stat_map.contains_key(&node_id))
349-
.map(|(canister_id, _)| canister_id),
350-
);
351+
// Check the certified state for stats that we have already sent
352+
if let Some(state_stats) = get_stats_for_node_id_and_epoch(state_stats, &node_id, &epoch)
353+
.map(|record| record.iter().map(|(canister_id, _)| canister_id))
354+
{
355+
previous_ids.extend(state_stats);
351356
}
352357

353358
// Check past payloads for stats already sent
@@ -475,7 +480,12 @@ mod tests {
475480
#[test]
476481
fn past_payload_test() {
477482
let test_stats = test_epoch_stats(0, 500);
478-
let state = test_state(epoch_stats_for_state(&test_stats, 0..200, node_test_id(1)));
483+
let state = test_state(epoch_stats_for_state(
484+
&test_stats,
485+
0..200,
486+
node_test_id(1),
487+
None,
488+
));
479489
let payload_builder = setup_payload_builder_impl(state, test_stats.clone());
480490
let validation_context = test_validation_context();
481491
let proposal_context = test_proposal_context(&validation_context);
@@ -517,8 +527,8 @@ mod tests {
517527
fn node_id_check_test() {
518528
let test_stats = test_epoch_stats(0, 500);
519529

520-
let stats1 = epoch_stats_for_state(&test_stats, 0..100, node_test_id(1));
521-
let stats2 = epoch_stats_for_state(&test_stats, 100..200, node_test_id(2));
530+
let stats1 = epoch_stats_for_state(&test_stats, 0..100, node_test_id(1), None);
531+
let stats2 = epoch_stats_for_state(&test_stats, 100..200, node_test_id(2), None);
522532
let stats = merge_raw_query_stats(stats1, stats2);
523533
let state = test_state(stats);
524534

@@ -594,7 +604,12 @@ mod tests {
594604
#[test]
595605
fn epoch_too_low_test() {
596606
let test_stats = test_epoch_stats(1234, 100);
597-
let state = test_state(epoch_stats_for_state(&test_stats, 0..100, node_test_id(1)));
607+
let state = test_state(epoch_stats_for_state(
608+
&test_stats,
609+
0..100,
610+
node_test_id(1),
611+
Some(1234),
612+
));
598613
let payload_builder = setup_payload_builder_impl(state, test_stats);
599614
let validation_context = test_validation_context();
600615
let proposal_context = test_proposal_context(&validation_context);
@@ -616,12 +631,12 @@ mod tests {
616631
Err(ValidationError::Permanent(
617632
PayloadPermanentError::QueryStatsPayloadValidationError(
618633
QueryStatsPermanentValidationError::EpochAlreadyAggregated {
619-
expected,
620-
reported,
634+
highest_aggregated_epoch,
635+
payload_epoch,
621636
},
622637
),
623-
)) if expected == QueryStatsEpoch::new(1234) && reported == QueryStatsEpoch::new(0) => {
624-
}
638+
)) if highest_aggregated_epoch == QueryStatsEpoch::new(1234)
639+
&& payload_epoch == QueryStatsEpoch::new(0) => {}
625640
Err(err) => panic!(
626641
"QueryStatsPayload had epoch too low, yet instead got error {:?}",
627642
err
@@ -655,7 +670,10 @@ mod tests {
655670
match validation_result {
656671
Err(ValidationError::Permanent(
657672
PayloadPermanentError::QueryStatsPayloadValidationError(
658-
QueryStatsPermanentValidationError::EpochTooHigh { expected, reported },
673+
QueryStatsPermanentValidationError::EpochTooHigh {
674+
max_valid_epoch: expected,
675+
payload_epoch: reported,
676+
},
659677
),
660678
)) if expected == QueryStatsEpoch::new(0) && reported == QueryStatsEpoch::new(1234) => {
661679
}
@@ -676,7 +694,12 @@ mod tests {
676694
#[test]
677695
fn duplicate_id_test() {
678696
let test_stats = test_epoch_stats(0, 4);
679-
let state = test_state(epoch_stats_for_state(&test_stats, 0..1, node_test_id(1)));
697+
let state = test_state(epoch_stats_for_state(
698+
&test_stats,
699+
0..1,
700+
node_test_id(1),
701+
None,
702+
));
680703
let payload_builder = setup_payload_builder_impl(state, test_stats.clone());
681704
let validation_context = test_validation_context();
682705
let proposal_context = test_proposal_context(&validation_context);
@@ -800,23 +823,34 @@ mod tests {
800823
query_stats: &LocalQueryStats,
801824
range: Range<usize>,
802825
node: NodeId,
826+
highest_aggregated_epoch: Option<u64>,
803827
) -> RawQueryStats {
828+
let stats = vec![(
829+
node,
830+
vec![(
831+
query_stats.epoch,
832+
query_stats.stats[range]
833+
.iter()
834+
.map(|stat| (stat.canister_id, stat.stats.clone()))
835+
.collect(),
836+
)]
837+
.into_iter()
838+
.collect(),
839+
)]
840+
.into_iter()
841+
.collect();
842+
804843
RawQueryStats {
805-
epoch: Some(query_stats.epoch),
806-
stats: query_stats.stats[range]
807-
.iter()
808-
.map(|stat| {
809-
(
810-
stat.canister_id,
811-
[(node, stat.stats.clone())].into_iter().collect(),
812-
)
813-
})
814-
.collect(),
844+
highest_aggregated_epoch: highest_aggregated_epoch.map(QueryStatsEpoch::new),
845+
stats,
815846
}
816847
}
817848

818849
fn merge_raw_query_stats(mut stats1: RawQueryStats, stats2: RawQueryStats) -> RawQueryStats {
819-
assert_eq!(stats1.epoch, stats2.epoch);
850+
assert_eq!(
851+
stats1.highest_aggregated_epoch,
852+
stats2.highest_aggregated_epoch
853+
);
820854

821855
for (canister_id, stat2) in stats2.stats {
822856
stats1

0 commit comments

Comments
 (0)