Skip to content

Commit

Permalink
(BOUN-1018) Save Api Boundary Nodes in ReplicatedState metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-komarevskiy committed Jan 29, 2024
1 parent 3a1dc30 commit ad2d9b3
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 24 deletions.
27 changes: 15 additions & 12 deletions rs/messaging/src/message_routing.rs
Expand Up @@ -1003,24 +1003,26 @@ impl BatchProcessorImpl {
continue;
};

let Ok(ipv6_address) = http.ip_addr.parse::<Ipv6Addr>() else {
let ipv6_address = http.ip_addr;
if ipv6_address.parse::<Ipv6Addr>().is_err() {
raise_critical_error_for_api_boundary_nodes(&format!(
"failed to parse ipv6 field in NodeRecord for node_id {api_bn_id}",
));
continue;
};
}

// ipv4 is not mandatory for the node record. No critical errors need to be raised if it is `None`.
let Ok(ipv4_address) = node_record
let ipv4_address = node_record
.public_ipv4_config
.map(|public_ipv4_config| public_ipv4_config.ip_addr.parse::<Ipv4Addr>())
.transpose()
else {
raise_critical_error_for_api_boundary_nodes(&format!(
"failed to parse ipv4 address of node {api_bn_id}",
));
continue;
};
.map(|ipv4_config| ipv4_config.ip_addr);
if let Some(ref ipv4) = ipv4_address {
if ipv4.parse::<Ipv4Addr>().is_err() {
raise_critical_error_for_api_boundary_nodes(&format!(
"failed to parse ipv4 address of node {api_bn_id}",
));
continue;
}
}

api_boundary_nodes.insert(
api_bn_id,
Expand Down Expand Up @@ -1095,7 +1097,7 @@ impl BatchProcessor for BatchProcessorImpl {
subnet_features,
registry_execution_settings,
node_public_keys,
_api_boundary_nodes,
api_boundary_nodes,
) = self.read_registry(registry_version, state.metadata.own_subnet_id);

self.metrics.blocks_proposed_total.inc();
Expand All @@ -1114,6 +1116,7 @@ impl BatchProcessor for BatchProcessorImpl {
subnet_features,
&registry_execution_settings,
node_public_keys,
api_boundary_nodes,
);
// Garbage collect empty canister queue pairs before checkpointing.
if certification_scope == CertificationScope::Full {
Expand Down
13 changes: 5 additions & 8 deletions rs/messaging/src/message_routing/tests.rs
Expand Up @@ -40,7 +40,6 @@ use ic_types::{
NodeId, PrincipalId, Randomness,
};
use maplit::{btreemap, btreeset};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::{fmt::Debug, str::FromStr, sync::Arc, time::Duration};
use tempfile::TempDir;

Expand Down Expand Up @@ -487,10 +486,12 @@ impl StateMachine for FakeStateMachine {
subnet_features: SubnetFeatures,
registry_settings: &RegistryExecutionSettings,
node_public_keys: NodePublicKeys,
api_boundary_nodes: ApiBoundaryNodes,
) -> ReplicatedState {
state.metadata.network_topology = network_topology;
state.metadata.own_subnet_features = subnet_features;
state.metadata.node_public_keys = node_public_keys;
state.metadata.api_boundary_nodes = api_boundary_nodes;
let mut canister_states = BTreeMap::new();
canister_states.insert(
canister_test_id(1),
Expand Down Expand Up @@ -825,10 +826,8 @@ fn try_read_registry_succeeds_with_fully_specified_registry_records() {
entry_1,
&ApiBoundaryNodeEntry {
domain: "api-bn11.example.org".to_string(),
ipv4_address: Some("127.0.0.1".parse::<Ipv4Addr>().unwrap()),
ipv6_address: "2001:0db8:85a3:0000:0000:8a2e:0370:7334"
.parse::<Ipv6Addr>()
.unwrap(),
ipv4_address: Some("127.0.0.1".to_string()),
ipv6_address: "2001:0db8:85a3:0000:0000:8a2e:0370:7334".to_string(),
pubkey: None,
}
);
Expand All @@ -838,9 +837,7 @@ fn try_read_registry_succeeds_with_fully_specified_registry_records() {
&ApiBoundaryNodeEntry {
domain: "api-bn12.example.org".to_string(),
ipv4_address: None,
ipv6_address: "2001:0db8:85a3:0000:0000:8a2e:0370:7335"
.parse::<Ipv6Addr>()
.unwrap(),
ipv6_address: "2001:0db8:85a3:0000:0000:8a2e:0370:7335".to_string(),
pubkey: None,
}
);
Expand Down
5 changes: 4 additions & 1 deletion rs/messaging/src/state_machine.rs
@@ -1,4 +1,4 @@
use crate::message_routing::{MessageRoutingMetrics, NodePublicKeys};
use crate::message_routing::{ApiBoundaryNodes, MessageRoutingMetrics, NodePublicKeys};
use crate::routing::{demux::Demux, stream_builder::StreamBuilder};
use ic_interfaces::execution_environment::{
ExecutionRoundType, RegistryExecutionSettings, Scheduler,
Expand Down Expand Up @@ -27,6 +27,7 @@ pub(crate) trait StateMachine: Send {
subnet_features: SubnetFeatures,
registry_settings: &RegistryExecutionSettings,
node_public_keys: NodePublicKeys,
api_boundary_nodes: ApiBoundaryNodes,
) -> ReplicatedState;
}
pub(crate) struct StateMachineImpl {
Expand Down Expand Up @@ -76,6 +77,7 @@ impl StateMachine for StateMachineImpl {
subnet_features: SubnetFeatures,
registry_settings: &RegistryExecutionSettings,
node_public_keys: NodePublicKeys,
api_boundary_nodes: ApiBoundaryNodes,
) -> ReplicatedState {
let since = Instant::now();

Expand Down Expand Up @@ -106,6 +108,7 @@ impl StateMachine for StateMachineImpl {
state.metadata.network_topology = network_topology;
state.metadata.own_subnet_features = subnet_features;
state.metadata.node_public_keys = node_public_keys;
state.metadata.api_boundary_nodes = api_boundary_nodes;
if let Err(message) = state.metadata.init_allocation_ranges_if_empty() {
self.metrics
.observe_no_canister_allocation_range(&self.log, message);
Expand Down
3 changes: 3 additions & 0 deletions rs/messaging/src/state_machine/tests.rs
Expand Up @@ -171,6 +171,7 @@ fn state_machine_populates_network_topology() {
Default::default(),
&test_registry_settings(),
Default::default(),
Default::default(),
);

assert_eq!(state.metadata.network_topology, fixture.network_topology);
Expand Down Expand Up @@ -200,6 +201,7 @@ fn test_delivered_batch(provided_batch: Batch) -> ReplicatedState {
Default::default(),
&test_registry_settings(),
Default::default(),
Default::default(),
)
})
}
Expand Down Expand Up @@ -271,6 +273,7 @@ fn test_batch_time_regression() {
Default::default(),
&test_registry_settings(),
Default::default(),
Default::default(),
);

assert_eq!(
Expand Down
10 changes: 10 additions & 0 deletions rs/protobuf/def/state/metadata/v1/metadata.proto
Expand Up @@ -246,6 +246,14 @@ message NodePublicKeyEntry {
bytes public_key = 2;
}

message ApiBoundaryNodeEntry {
types.v1.NodeId node_id = 1;
string domain = 2;
optional string ipv4_address = 3;
string ipv6_address = 4;
optional bytes pubkey = 5;
}

message NodeBlockmakerStats {
types.v1.NodeId node_id = 1;
uint64 blocks_proposed_total = 2;
Expand Down Expand Up @@ -302,6 +310,8 @@ message SystemMetadata {
repeated NodePublicKeyEntry node_public_keys = 19;

BlockmakerMetricsTimeSeries blockmaker_metrics_time_series = 20;

repeated ApiBoundaryNodeEntry api_boundary_nodes = 21;
}

message StableMemory {
Expand Down
16 changes: 16 additions & 0 deletions rs/protobuf/src/gen/state/state.metadata.v1.rs
Expand Up @@ -381,6 +381,20 @@ pub struct NodePublicKeyEntry {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ApiBoundaryNodeEntry {
#[prost(message, optional, tag = "1")]
pub node_id: ::core::option::Option<super::super::super::types::v1::NodeId>,
#[prost(string, tag = "2")]
pub domain: ::prost::alloc::string::String,
#[prost(string, optional, tag = "3")]
pub ipv4_address: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, tag = "4")]
pub ipv6_address: ::prost::alloc::string::String,
#[prost(bytes = "vec", optional, tag = "5")]
pub pubkey: ::core::option::Option<::prost::alloc::vec::Vec<u8>>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct NodeBlockmakerStats {
#[prost(message, optional, tag = "1")]
pub node_id: ::core::option::Option<super::super::super::types::v1::NodeId>,
Expand Down Expand Up @@ -455,6 +469,8 @@ pub struct SystemMetadata {
pub node_public_keys: ::prost::alloc::vec::Vec<NodePublicKeyEntry>,
#[prost(message, optional, tag = "20")]
pub blockmaker_metrics_time_series: ::core::option::Option<BlockmakerMetricsTimeSeries>,
#[prost(message, repeated, tag = "21")]
pub api_boundary_nodes: ::prost::alloc::vec::Vec<ApiBoundaryNodeEntry>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
37 changes: 34 additions & 3 deletions rs/replicated_state/src/metadata_state.rs
Expand Up @@ -45,7 +45,6 @@ use ic_types::{
};
use ic_wasm_types::WasmHash;
use serde::{Deserialize, Serialize};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::ops::Bound::{Included, Unbounded};
use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
Expand Down Expand Up @@ -97,6 +96,8 @@ pub struct SystemMetadata {
/// DER-encoded public keys of the subnet's nodes.
pub node_public_keys: BTreeMap<NodeId, Vec<u8>>,

pub api_boundary_nodes: BTreeMap<NodeId, ApiBoundaryNodeEntry>,

/// "Subnet split in progress" marker: `Some(original_subnet_id)` if this
/// replicated state is in the process of being split from `original_subnet_id`;
/// `None` otherwise.
Expand Down Expand Up @@ -201,9 +202,9 @@ pub struct ApiBoundaryNodeEntry {
/// Domain name, required field from NodeRecord
pub domain: String,
/// Ipv4, optional field from NodeRecord
pub ipv4_address: Option<Ipv4Addr>,
pub ipv4_address: Option<String>,
/// Ipv6, required field from NodeRecord
pub ipv6_address: Ipv6Addr,
pub ipv6_address: String,
pub pubkey: Option<Vec<u8>>,
}

Expand Down Expand Up @@ -587,6 +588,19 @@ impl From<&SystemMetadata> for pb_metadata::SystemMetadata {
public_key: public_key.clone(),
})
.collect(),
api_boundary_nodes: item
.api_boundary_nodes
.iter()
.map(
|(node_id, api_boundary_node_entry)| pb_metadata::ApiBoundaryNodeEntry {
node_id: Some(node_id_into_protobuf(*node_id)),
domain: api_boundary_node_entry.domain.clone(),
ipv4_address: api_boundary_node_entry.ipv4_address.clone(),
ipv6_address: api_boundary_node_entry.ipv6_address.clone(),
pubkey: api_boundary_node_entry.pubkey.clone(),
},
)
.collect(),
blockmaker_metrics_time_series: Some((&item.blockmaker_metrics_time_series).into()),
}
}
Expand Down Expand Up @@ -651,6 +665,19 @@ impl TryFrom<(pb_metadata::SystemMetadata, &dyn CheckpointLoadingMetrics)> for S
node_public_keys.insert(node_id_try_from_option(entry.node_id)?, entry.public_key);
}

let mut api_boundary_nodes = BTreeMap::<NodeId, ApiBoundaryNodeEntry>::new();
for entry in item.api_boundary_nodes {
api_boundary_nodes.insert(
node_id_try_from_option(entry.node_id)?,
ApiBoundaryNodeEntry {
domain: entry.domain,
ipv4_address: entry.ipv4_address,
ipv6_address: entry.ipv6_address,
pubkey: entry.pubkey,
},
);
}

Ok(Self {
own_subnet_id: subnet_id_try_from_protobuf(try_from_option_field(
item.own_subnet_id,
Expand All @@ -662,6 +689,7 @@ impl TryFrom<(pb_metadata::SystemMetadata, &dyn CheckpointLoadingMetrics)> for S
own_subnet_type: SubnetType::default(),
own_subnet_features: item.own_subnet_features.unwrap_or_default().into(),
node_public_keys,
api_boundary_nodes,
// Note: `load_checkpoint()` will set this to the contents of `split_marker.pbuf`,
// when present.
split_from: None,
Expand Down Expand Up @@ -722,6 +750,7 @@ impl SystemMetadata {
subnet_call_context_manager: Default::default(),
own_subnet_features: SubnetFeatures::default(),
node_public_keys: Default::default(),
api_boundary_nodes: Default::default(),
split_from: None,
// StateManager populates proper values of these fields before
// committing each state.
Expand Down Expand Up @@ -996,6 +1025,7 @@ impl SystemMetadata {
own_subnet_features: _,
// Overwritten as soon as the round begins, no explicit action needed.
node_public_keys: _,
api_boundary_nodes: _,
ref mut split_from,
subnet_call_context_manager: _,
// Set by `commit_and_certify()` at the end of the round. Not used before.
Expand Down Expand Up @@ -2282,6 +2312,7 @@ pub(crate) mod testing {
subnet_call_context_manager: Default::default(),
own_subnet_features: SubnetFeatures::default(),
node_public_keys: Default::default(),
api_boundary_nodes: Default::default(),
split_from: None,
prev_state_hash: Default::default(),
state_sync_version: CURRENT_STATE_SYNC_VERSION,
Expand Down
8 changes: 8 additions & 0 deletions rs/replicated_state/src/metadata_state/tests.rs
Expand Up @@ -454,6 +454,14 @@ fn roundtrip_encoding() {
system_metadata.node_public_keys = btreemap! {
node_test_id(1) => pk_der,
};
system_metadata.api_boundary_nodes = btreemap! {
node_test_id(1) => ApiBoundaryNodeEntry {
domain: "api-example.com".to_string(),
ipv4_address: Some("127.0.0.1".to_string()),
ipv6_address: "2001:0db8:85a3:0000:0000:8a2e:0370:7334".to_string(),
pubkey: None,
},
};
system_metadata.bitcoin_get_successors_follow_up_responses =
btreemap! { 10.into() => vec![vec![1], vec![2]] };

Expand Down

1 comment on commit ad2d9b3

@ilbertt
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikolay-komarevskiy why are the API Boundary Nodes necessary in the replicated state? What do replica nodes use this information for?

Please sign in to comment.