Skip to content

Commit

Permalink
Merge branch 'tim/peer-manager3' into 'master'
Browse files Browse the repository at this point in the history
fix: [NET-1464] Tolerate invalid registry values in peer manager

In accordance to the current discovery [mechanism](https://sourcegraph.com/github.com/dfinity/ic/-/blob/rs/p2p/src/discovery.rs?L86), we should not return an error in the peer manager if a value is wrong or not present. The peer manager will try to parse all values, log a warning if unable and add any valid value to the topology. 

See merge request dfinity-lab/public/ic!12938
  • Loading branch information
tthebst committed Jun 14, 2023
2 parents f8bd0ac + d373d63 commit e0e5249
Showing 1 changed file with 69 additions and 101 deletions.
170 changes: 69 additions & 101 deletions rs/p2p/peer_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use std::{

use ic_interfaces::consensus_pool::ConsensusPoolCache;
use ic_interfaces_registry::RegistryClient;
use ic_logger::{error, ReplicaLogger};
use ic_logger::{warn, ReplicaLogger};
use ic_metrics::MetricsRegistry;
use ic_registry_client_helpers::subnet::{SubnetRegistry, SubnetTransportRegistry};
use ic_types::{registry::RegistryClientError, NodeId, RegistryVersion, SubnetId};
use ic_registry_client_helpers::subnet::SubnetTransportRegistry;
use ic_types::{NodeId, RegistryVersion, SubnetId};
use metrics::PeerManagerMetrics;
use tokio::{
runtime::Handle,
Expand Down Expand Up @@ -74,32 +74,23 @@ impl PeerManager {
let _ = interval.tick().await;
self.metrics.topology_updates.inc();

match self.get_latest_subnet_topology() {
Ok(ref mut topology) => {
let timer = self.metrics.topology_wachter_update_duration.start_timer();
// Update shared state with new topology. Only notify wachters if state actually changed.
self.topology_sender.send_if_modified(
move |old_topology: &mut SubnetTopology| {
if old_topology == topology {
false
} else {
std::mem::swap(old_topology, topology);
true
}
},
);
drop(timer)
}
Err(e) => {
self.metrics.topology_update_errors.inc();
error!(self.log, "Failed to update local subnet topology: {}", e);
}
}
let mut topology = self.get_latest_subnet_topology();
let _timer = self.metrics.topology_wachter_update_duration.start_timer();
// Update shared state with new topology. Only notify wachters if state actually changed.
self.topology_sender
.send_if_modified(move |old_topology: &mut SubnetTopology| {
if old_topology == &topology {
false
} else {
std::mem::swap(old_topology, &mut topology);
true
}
});
}
}

/// Get all nodes that are relevant for this subnet according to subnet memebership.
fn get_latest_subnet_topology(&self) -> Result<SubnetTopology, SubnetTopologyError> {
fn get_latest_subnet_topology(&self) -> SubnetTopology {
let _timer = self.metrics.topology_update_duration.start_timer();

let curr_registry_version = self.registry_client.get_latest_version();
Expand All @@ -117,61 +108,69 @@ impl PeerManager {
let latest_registry_version = consensus_registry_version.max(curr_registry_version);
for version in earliest_registry_version.get()..=latest_registry_version.get() {
let version = RegistryVersion::from(version);
let nodes_at_version = self
.registry_client
.get_node_ids_on_subnet(self.subnet_id, version)
.map_err(|e| SubnetTopologyError::RegistryError {
operation: "transport_infos".to_string(),
source: e,
})?
.ok_or(SubnetTopologyError::RegistryFieldEmpty {
field: "connection_endpoint".to_string(),
})?;

let transport_info = self

let transport_info = match self
.registry_client
.get_subnet_transport_infos(self.subnet_id, version)
.map_err(|e| SubnetTopologyError::RegistryError {
operation: "transport_infos".to_string(),
source: e,
})?
.ok_or(SubnetTopologyError::RegistryFieldEmpty {
field: "transport_infos".to_string(),
})?;
for node in nodes_at_version {
let flow_endpoint = transport_info
.iter()
.find(|&n| n.0 == node)
.ok_or(SubnetTopologyError::RegistryFieldEmpty {
field: "node_transport_info".to_string(),
})?
.1
{
Ok(Some(transport_info)) => transport_info,
Ok(None) => {
warn!(
self.log,
"Got transport infos but is empty. version {}", version
);
Vec::new()
}
Err(e) => {
warn!(
self.log,
"Failed to get transport information from registry at version {} : {}",
version,
e
);
Vec::new()
}
};

for (peer, info) in transport_info {
let maybe_endpoint = info
.p2p_flow_endpoints
.get(0)
.ok_or(SubnetTopologyError::RegistryFieldEmpty {
field: "flow_endpoints".to_string(),
})?
.endpoint
.as_ref()
.ok_or(SubnetTopologyError::RegistryFieldEmpty {
field: "connection_endpoint".to_string(),
})?;
let ip_addr = flow_endpoint.ip_addr.parse::<IpAddr>().map_err(|e| {
SubnetTopologyError::ParseError {
field: "flow_endpoint_ip_addr".to_string(),
reason: e.to_string(),
.and_then(|flow_endpoint| flow_endpoint.endpoint.as_ref());

match maybe_endpoint {
Some(flow_endpoint) => {
if let Ok(ip_addr) = flow_endpoint.ip_addr.parse::<IpAddr>() {
// Insert even if already present because we prefer to have the value
// with the highest registry version.
subnet_nodes
.insert(peer, SocketAddr::new(ip_addr, flow_endpoint.port as u16));
} else {
warn!(
self.log,
"Failed to get parse Ip addr {} for peer {} at registry version {}",
flow_endpoint.ip_addr,
peer,
version
);
}
}
None => {
warn!(
self.log,
"Failed to get flow endpoint for peer {} at registry version {}",
peer,
version
);
}
})?;
// Insert even if already present because we prefer to have the value
// with the highest registry version.
subnet_nodes.insert(node, SocketAddr::new(ip_addr, flow_endpoint.port as u16));
}
}
}
Ok(SubnetTopology {
SubnetTopology {
subnet_nodes,
earliest_registry_version,
latest_registry_version,
})
}
}
}

Expand Down Expand Up @@ -209,34 +208,3 @@ impl SubnetTopology {
self.subnet_nodes.keys().copied().collect()
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
enum SubnetTopologyError {
RegistryError {
operation: String,
source: RegistryClientError,
},
RegistryFieldEmpty {
field: String,
},
ParseError {
field: String,
reason: String,
},
}

impl std::fmt::Display for SubnetTopologyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::RegistryError { operation, source } => {
write!(f, "failed fetch {operation} from registry: {source}",)
}
Self::RegistryFieldEmpty { field } => {
write!(f, "registry field {field} was unexpectetly empty")
}
Self::ParseError { field, reason } => {
write!(f, "Failed to parse registry field {field}: {reason}")
}
}
}
}

0 comments on commit e0e5249

Please sign in to comment.