Skip to content

Commit

Permalink
chore: Ensure strictly monotonic batch times
Browse files Browse the repository at this point in the history
  • Loading branch information
alin-at-dfinity committed Feb 9, 2024
1 parent f9d1abe commit b7c2aa9
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 31 deletions.
20 changes: 10 additions & 10 deletions rs/messaging/src/message_routing.rs
Expand Up @@ -102,7 +102,7 @@ const CRITICAL_ERROR_MISSING_OR_INVALID_API_BOUNDARY_NODES: &str =
"mr_missing_or_invalid_api_boundary_nodes";
const CRITICAL_ERROR_NO_CANISTER_ALLOCATION_RANGE: &str = "mr_empty_canister_allocation_range";
const CRITICAL_ERROR_FAILED_TO_READ_REGISTRY: &str = "mr_failed_to_read_registry_error";
pub const CRITICAL_ERROR_BATCH_TIME_REGRESSION: &str = "mr_batch_time_regression";
pub const CRITICAL_ERROR_NON_INCREASING_BATCH_TIME: &str = "mr_non_increasing_batch_time";

/// Records the timestamp when all messages before the given index (down to the
/// previous `MessageTime`) were first added to / learned about in a stream.
Expand Down Expand Up @@ -309,9 +309,9 @@ pub(crate) struct MessageRoutingMetrics {
critical_error_no_canister_allocation_range: IntCounter,
/// Critical error: reading from the registry failed during processing a batch.
critical_error_failed_to_read_registry: IntCounter,
/// Critical error: the batch times of successive batches regressed (when they
/// are supposed to be monotonically increasing).
critical_error_batch_time_regression: IntCounter,
/// Critical error: the batch times of successive batches were not strictly
/// monotonically increasing.
critical_error_non_increasing_batch_time: IntCounter,

/// Metrics for query stats aggregator
pub query_stats_metrics: QueryStatsAggregatorMetrics,
Expand Down Expand Up @@ -396,8 +396,8 @@ impl MessageRoutingMetrics {
.error_counter(CRITICAL_ERROR_NO_CANISTER_ALLOCATION_RANGE),
critical_error_failed_to_read_registry: metrics_registry
.error_counter(CRITICAL_ERROR_FAILED_TO_READ_REGISTRY),
critical_error_batch_time_regression: metrics_registry
.error_counter(CRITICAL_ERROR_BATCH_TIME_REGRESSION),
critical_error_non_increasing_batch_time: metrics_registry
.error_counter(CRITICAL_ERROR_NON_INCREASING_BATCH_TIME),

query_stats_metrics: QueryStatsAggregatorMetrics::new(metrics_registry),
}
Expand All @@ -413,18 +413,18 @@ impl MessageRoutingMetrics {
);
}

pub fn observe_batch_time_regression(
pub fn observe_non_increasing_batch_time(
&self,
log: &ReplicaLogger,
state_time: Time,
batch_time: Time,
batch_height: Height,
) {
self.critical_error_batch_time_regression.inc();
self.critical_error_non_increasing_batch_time.inc();
warn!(
log,
"{}: Batch time regressed at height {}: state_time = {}, batch_time = {}.",
CRITICAL_ERROR_BATCH_TIME_REGRESSION,
"{}: Non-increasing batch time at height {}: state_time = {}, batch_time = {}.",
CRITICAL_ERROR_NON_INCREASING_BATCH_TIME,
batch_height,
state_time,
batch_time
Expand Down
6 changes: 3 additions & 3 deletions rs/messaging/src/state_machine.rs
Expand Up @@ -93,11 +93,11 @@ impl StateMachine for StateMachineImpl {
);
}

if batch.time >= state.metadata.batch_time {
if batch.time > state.metadata.batch_time {
state.metadata.batch_time = batch.time;
} else {
// Batch time regressed. This is a bug. (Implicitly) retain the old batch time.
self.metrics.observe_batch_time_regression(
// Batch time did not advance. This is a bug. (Implicitly) retain the old batch time.
self.metrics.observe_non_increasing_batch_time(
&self.log,
state.metadata.batch_time,
batch.time,
Expand Down
68 changes: 50 additions & 18 deletions rs/messaging/src/state_machine/tests.rs
@@ -1,5 +1,5 @@
use super::*;
use crate::message_routing::CRITICAL_ERROR_BATCH_TIME_REGRESSION;
use crate::message_routing::CRITICAL_ERROR_NON_INCREASING_BATCH_TIME;
use crate::{
routing::demux::MockDemux, routing::stream_builder::MockStreamBuilder,
state_machine::StateMachineImpl,
Expand Down Expand Up @@ -236,15 +236,53 @@ fn test_delivered_batch_interface() {

#[test]
fn test_batch_time_regression() {
// Batch with a batch_time of 1.
test_batch_time_impl(
Time::from_nanos_since_unix_epoch(2),
Time::from_nanos_since_unix_epoch(1),
Time::from_nanos_since_unix_epoch(2),
1,
);
}

#[test]
fn test_batch_time_same() {
test_batch_time_impl(
Time::from_nanos_since_unix_epoch(2),
Time::from_nanos_since_unix_epoch(2),
Time::from_nanos_since_unix_epoch(2),
1,
);
}

#[test]
fn test_batch_time_advance() {
test_batch_time_impl(
Time::from_nanos_since_unix_epoch(2),
Time::from_nanos_since_unix_epoch(3),
Time::from_nanos_since_unix_epoch(3),
0,
);
}

/// Executes a batch with the given `batch_time` on a state with the given
/// `state_batch_time`. Tests the resulting state's `batch_time` against
/// `expected_batch_time`, as well as the `mr_non_increasing_batch_time`
/// critical error counter.
fn test_batch_time_impl(
state_batch_time: Time,
batch_time: Time,
expected_batch_time: Time,
expected_regression_count: u64,
) {
// Batch with the provided `batch_time`.
let provided_batch = BatchBuilder::new()
.batch_number(Height::new(1))
.time(Time::from_nanos_since_unix_epoch(1))
.time(batch_time)
.build();

// Fixture wrapping a state with a batch_time 2 (ahead of the batch).
// Fixture wrapping a state with the given `state_time` as `batch_time`.
let mut fixture = test_fixture(&provided_batch);
fixture.initial_state.metadata.batch_time = Time::from_nanos_since_unix_epoch(2);
fixture.initial_state.metadata.batch_time = state_batch_time;

with_test_replica_logger(|log| {
let _ = &fixture;
Expand All @@ -259,12 +297,9 @@ fn test_batch_time_regression() {

assert_eq!(
Some(0),
fetch_critical_error_batch_time_regression_count(&fixture.metrics_registry)
);
assert_eq!(
Time::from_nanos_since_unix_epoch(2),
fixture.initial_state.metadata.batch_time,
fetch_critical_error_non_increasing_batch_time_count(&fixture.metrics_registry)
);
assert_eq!(state_batch_time, fixture.initial_state.metadata.batch_time,);

let state = state_machine.execute_round(
fixture.initial_state,
Expand All @@ -277,20 +312,17 @@ fn test_batch_time_regression() {
);

assert_eq!(
Some(1),
fetch_critical_error_batch_time_regression_count(&fixture.metrics_registry)
);
assert_eq!(
Time::from_nanos_since_unix_epoch(2),
state.metadata.batch_time,
Some(expected_regression_count),
fetch_critical_error_non_increasing_batch_time_count(&fixture.metrics_registry)
);
assert_eq!(expected_batch_time, state.metadata.batch_time,);
});
}

fn fetch_critical_error_batch_time_regression_count(
fn fetch_critical_error_non_increasing_batch_time_count(
metrics_registry: &MetricsRegistry,
) -> Option<u64> {
fetch_int_counter_vec(metrics_registry, "critical_errors")
.get(&btreemap! { "error".to_string() => CRITICAL_ERROR_BATCH_TIME_REGRESSION.to_string() })
.get(&btreemap! { "error".to_string() => CRITICAL_ERROR_NON_INCREASING_BATCH_TIME.to_string() })
.cloned()
}

0 comments on commit b7c2aa9

Please sign in to comment.