Skip to content

Commit

Permalink
chore(backend): Separate and simplify the function for displaying pro…
Browse files Browse the repository at this point in the history
…posed subnet changes
  • Loading branch information
sasa-tomic committed Jul 26, 2023
1 parent 4c6834d commit 73109b3
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 104 deletions.
2 changes: 1 addition & 1 deletion rs/decentralization/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ic_base_types::PrincipalId;
use ic_management_types::NodeFeature;
use serde::{self, Deserialize, Serialize};

#[derive(Clone, Deserialize, Serialize, Default)]
#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub struct SubnetChangeResponse {
pub added: Vec<PrincipalId>,
pub removed: Vec<PrincipalId>,
Expand Down
2 changes: 1 addition & 1 deletion rs/decentralization/src/nakamoto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ mod tests {
let available_nodes = available_nodes
.iter()
.sorted_by(|a, b| a.principal.cmp(&b.principal))
.filter(|n| n.subnet.is_none() && n.proposal.is_none())
.filter(|n| n.subnet_id.is_none() && n.proposal.is_none())
.map(Node::from)
.map(|n| Node {
decentralized: true,
Expand Down
39 changes: 25 additions & 14 deletions rs/decentralization/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ struct ReplacementCandidate {
impl DecentralizedSubnet {
/// Return a new instance of a DecentralizedSubnet that does not contain the
/// provided nodes.
pub fn without_nodes(&self, nodes: &[PrincipalId]) -> Result<Self, NetworkError> {
pub fn without_nodes(&self, nodes: Vec<Node>) -> Result<Self, NetworkError> {
let mut new_subnet_nodes = self.nodes.clone();
let mut removed = Vec::new();
for node in nodes {
if let Some(index) = new_subnet_nodes.iter().position(|n| n.id == *node) {
for node in &nodes {
if let Some(index) = new_subnet_nodes.iter().position(|n| n.id == node.id) {
removed.push(new_subnet_nodes.remove(index));
} else {
return Err(NetworkError::NodeNotFound(*node));
return Err(NetworkError::NodeNotFound(node.id));
}
}
let removed_is_empty = removed.is_empty();
Expand Down Expand Up @@ -740,7 +740,7 @@ pub trait AvailableNodesQuerier {
#[async_trait]
pub trait SubnetQuerier {
async fn subnet(&self, id: &PrincipalId) -> Result<DecentralizedSubnet, NetworkError>;
async fn subnet_of_nodes(&self, nodes: &[PrincipalId]) -> Result<DecentralizedSubnet, NetworkError>;
async fn subnet_of_nodes(&self, nodes: Vec<Node>) -> Result<DecentralizedSubnet, NetworkError>;
}

#[derive(Clone, Serialize, Deserialize, Debug, strum_macros::Display)]
Expand Down Expand Up @@ -773,10 +773,10 @@ pub trait TopologyManager: SubnetQuerier + AvailableNodesQuerier {
})
}

async fn replace_subnet_nodes(&self, nodes: &[PrincipalId]) -> Result<SubnetChangeRequest, NetworkError> {
async fn without_nodes(&self, nodes: Vec<Node>) -> Result<SubnetChangeRequest, NetworkError> {
Ok(SubnetChangeRequest {
available_nodes: self.available_nodes().await?,
subnet: self.subnet_of_nodes(nodes).await?,
subnet: self.subnet_of_nodes(nodes.clone()).await?,
..Default::default()
}
.without_nodes(nodes))
Expand All @@ -797,7 +797,7 @@ pub trait TopologyManager: SubnetQuerier + AvailableNodesQuerier {
}
.with_include_nodes(include_nodes.clone())
.with_exclude_nodes(exclude_nodes.clone())
.with_only_nodes(only_nodes.clone())
.with_only_nodes_that_have_features(only_nodes.clone())
.resize(size, 0)
}
}
Expand Down Expand Up @@ -839,12 +839,12 @@ impl SubnetChangeRequest {
}
}

/// Subnet without the listed nodes. The nodes are note added back into the
/// Subnet without the listed nodes. The nodes are not added back into the
/// available nodes.
pub fn without_nodes(&self, nodes: &[PrincipalId]) -> Self {
pub fn without_nodes(&self, nodes: Vec<Node>) -> Self {
let mut s = self.clone();
for node in nodes {
for node in s.subnet.nodes.clone().iter().filter(|n| n.id == *node) {
for node in s.subnet.nodes.clone().iter().filter(|n| n.id == node.id) {
s.removed_nodes.push(node.clone());
s.subnet.nodes.retain(|n| n.id != node.id);
}
Expand All @@ -863,7 +863,7 @@ impl SubnetChangeRequest {
}
}

pub fn with_only_nodes(self, only_nodes_or_features: Vec<String>) -> Self {
pub fn with_only_nodes_that_have_features(self, only_nodes_or_features: Vec<String>) -> Self {
let available_nodes = if only_nodes_or_features.is_empty() {
self.available_nodes.into_iter().collect()
} else {
Expand Down Expand Up @@ -897,10 +897,10 @@ impl SubnetChangeRequest {
pub fn optimize(
mut self,
optimize_count: usize,
replacements_unhealthy: &Vec<PrincipalId>,
replacements_unhealthy: &Vec<Node>,
) -> Result<SubnetChange, NetworkError> {
let old_nodes = self.subnet.nodes.clone();
self.subnet = self.subnet.without_nodes(replacements_unhealthy)?;
self.subnet = self.subnet.without_nodes(replacements_unhealthy.clone())?;
let result = self.resize(optimize_count + replacements_unhealthy.len(), optimize_count)?;
Ok(SubnetChange { old_nodes, ..result })
}
Expand Down Expand Up @@ -986,6 +986,17 @@ pub struct SubnetChange {
}

impl SubnetChange {
pub fn with_nodes(self, nodes: Vec<Node>) -> Self {
Self {
new_nodes: [self.new_nodes, nodes].concat(),
..self
}
}
pub fn without_nodes(mut self, nodes: Vec<Node>) -> Self {
self.new_nodes.retain(|n| !nodes.contains(n));
self
}

pub fn added(&self) -> Vec<Node> {
self.new_nodes
.clone()
Expand Down
2 changes: 1 addition & 1 deletion rs/ic-management-backend/src/endpoints/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn remove(
})
.filter(|(n, _)| n.proposal.is_none())
.filter_map(|(n, status)| {
if n.subnet.is_some() {
if n.subnet_id.is_some() {
return None;
}

Expand Down
15 changes: 11 additions & 4 deletions rs/ic-management-backend/src/endpoints/query_decentralization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ async fn get_decentralization_analysis(
registry: web::Data<Arc<RwLock<RegistryState>>>,
subnet: Option<PrincipalId>,
nodes_to_add: Option<Vec<PrincipalId>>,
nodes_to_remove: Option<Vec<PrincipalId>>,
node_ids_to_remove: Option<Vec<PrincipalId>>,
min_nakamoto_coefficients: Option<MinNakamotoCoefficients>,
) -> Result<HttpResponse, Error> {
let subnets = registry.read().await.subnets();
let nodes = registry.read().await.nodes();
let registry_nodes = registry.read().await.nodes();

let original_subnet = subnet
.map(|subnet_id| match subnets.get(&subnet_id) {
Expand Down Expand Up @@ -89,16 +89,23 @@ async fn get_decentralization_analysis(
run_log: Vec::new(),
});

let nodes_to_remove = node_ids_to_remove.map(|node_ids_to_remove| {
node_ids_to_remove
.iter()
.filter_map(|n| registry_nodes.get(n))
.map(|n| decentralization::network::Node::from(n))
.collect::<Vec<_>>()
});
let updated_subnet = match &nodes_to_remove {
Some(nodes_to_remove) => original_subnet.without_nodes(nodes_to_remove)?,
Some(nodes_to_remove) => original_subnet.without_nodes(nodes_to_remove.clone())?,
None => original_subnet.clone(),
};

let updated_subnet = match &nodes_to_add {
Some(nodes_to_add) => {
let nodes_to_add = nodes_to_add
.iter()
.map(|n| decentralization::network::Node::from(&nodes[n]))
.map(|n| decentralization::network::Node::from(&registry_nodes[n]))
.collect();
updated_subnet.with_nodes(nodes_to_add)
}
Expand Down
96 changes: 44 additions & 52 deletions rs/ic-management-backend/src/endpoints/subnet.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use super::*;
use crate::health;
use decentralization::{network::TopologyManager, SubnetChangeResponse};
use decentralization::network::TopologyManager;
use ic_base_types::PrincipalId;
use ic_management_backend::subnets::get_proposed_subnet_changes;
use ic_management_types::requests::{
MembershipReplaceRequest, ReplaceTarget, SubnetCreateRequest, SubnetResizeRequest,
};
use ic_management_types::Node;
use serde::Deserialize;
use std::collections::BTreeMap;

#[derive(Deserialize)]
struct SubnetRequest {
Expand Down Expand Up @@ -40,44 +43,16 @@ async fn change_preview(
request: web::Path<SubnetRequest>,
registry: web::Data<Arc<RwLock<RegistryState>>>,
) -> Result<HttpResponse, Error> {
let nodes = registry.read().await.nodes();
match registry.read().await.subnets_with_proposals().await {
Ok(subnets) => {
if let Some(subnet) = subnets.get(&request.subnet) {
if let Some(proposal) = &subnet.proposal {
let removed_nodes = subnet
.nodes
.iter()
.filter(|n| proposal.nodes.contains(&n.principal))
.map(|n| n.principal)
.collect::<Vec<_>>();
let change_request = registry
.read()
.await
.replace_subnet_nodes(&removed_nodes)
.await?
.with_custom_available_nodes(
nodes
.values()
.filter(|n| n.subnet.is_none() && proposal.nodes.contains(&n.principal))
.map(decentralization::network::Node::from)
.collect(),
);
let mut change = SubnetChangeResponse::from(&change_request.evaluate()?);
change.proposal_id = Some(proposal.id);
Ok(HttpResponse::Ok().json(change))
} else {
Err(error::ErrorBadRequest(anyhow::format_err!(
"subnet {} does not have open membership change proposals",
request.subnet
)))
}
} else {
Err(error::ErrorNotFound(anyhow::format_err!(
"subnet {} not found",
request.subnet
)))
}
let subnet = subnets
.get(&request.subnet)
.ok_or_else(|| error::ErrorNotFound(anyhow::format_err!("subnet {} not found", request.subnet)))?;
let registry_nodes: BTreeMap<PrincipalId, Node> = registry.read().await.nodes();

get_proposed_subnet_changes(&registry_nodes, subnet)
.map_err(|err| error::ErrorBadRequest(err))
.map(|r| HttpResponse::Ok().json(r))
}
Err(e) => Err(error::ErrorInternalServerError(format!(
"failed to fetch subnets: {}",
Expand All @@ -99,47 +74,64 @@ async fn replace(
registry: web::Data<Arc<RwLock<RegistryState>>>,
) -> Result<HttpResponse, Error> {
let registry = registry.read().await;
let all_nodes = registry.nodes();

let mut motivations: Vec<String> = vec![];

let change_request = match &request.target {
ReplaceTarget::Subnet(subnet) => registry.modify_subnet_nodes(*subnet).await?,
ReplaceTarget::Nodes { nodes, motivation } => {
motivations.push(motivation.clone());
registry.replace_subnet_nodes(nodes).await?
let nodes_to_replace = nodes
.iter()
.filter_map(|n| all_nodes.get(n))
.map(|n| decentralization::network::Node::from(n))
.collect::<Vec<_>>();
registry.without_nodes(nodes_to_replace).await?
}
}
.with_exclude_nodes(request.exclude.clone().unwrap_or_default())
.with_only_nodes(request.only.clone())
.with_only_nodes_that_have_features(request.only.clone())
.with_include_nodes(request.include.clone().unwrap_or_default())
.with_min_nakamoto_coefficients(request.min_nakamoto_coefficients.clone());

let mut replacements_unhealthy: Vec<PrincipalId> = Vec::new();
let mut replacements_unhealthy: Vec<decentralization::network::Node> = Vec::new();
if request.heal {
let subnet = change_request.subnet();
let health_client = health::HealthClient::new(registry.network());
let healths = health_client
.subnet(subnet.id)
.await
.map_err(|_| error::ErrorInternalServerError("failed to fetch subnet health".to_string()))?;
let unhealthy = &subnet
let unhealthy: Vec<decentralization::network::Node> = subnet
.nodes
.iter()
.filter(|n| {
healths
.get(&n.id)
// TODO: Add option to exclude degraded nodes from healing
.map(|s| !matches!(s, ic_management_types::Status::Healthy))
.unwrap_or(true)
.into_iter()
.filter_map(|n| match healths.get(&n.id) {
Some(health) => {
if *health == ic_management_types::Status::Healthy {
Some(n)
} else {
None
}
}
None => Some(n),
})
.map(|n| n.id)
.collect::<Vec<_>>();
if !unhealthy.is_empty() {
replacements_unhealthy.extend(unhealthy);
}
}
if let ReplaceTarget::Nodes { nodes, motivation: _ } = &request.target {
replacements_unhealthy.extend(nodes);
if let ReplaceTarget::Nodes {
nodes: req_replace_node_ids,
motivation: _,
} = &request.target
{
let req_replace_nodes = req_replace_node_ids
.iter()
.filter_map(|n| all_nodes.get(n))
.map(|n| decentralization::network::Node::from(n))
.collect::<Vec<_>>();
replacements_unhealthy.extend(req_replace_nodes);
};

let num_unhealthy = replacements_unhealthy.len();
Expand Down Expand Up @@ -200,7 +192,7 @@ async fn resize(
.await?
.with_exclude_nodes(request.exclude.clone().unwrap_or_default())
.with_include_nodes(request.include.clone().unwrap_or_default())
.with_only_nodes(request.only.clone().unwrap_or_default())
.with_only_nodes_that_have_features(request.only.clone().unwrap_or_default())
.resize(request.add, request.remove)?;

Ok(HttpResponse::Ok().json(decentralization::SubnetChangeResponse::from(&change)))
Expand Down
1 change: 1 addition & 0 deletions rs/ic-management-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ pub mod prometheus;
pub mod proposal;
pub mod registry;
pub mod release;
pub mod subnets;
4 changes: 2 additions & 2 deletions rs/ic-management-backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async fn main() -> std::io::Result<()> {
.service(rollout)
.service(subnets_release)
.service(version)
.service(subnets)
.service(list_subnets)
.service(nodes)
.service(available_nodes)
.service(missing_guests)
Expand Down Expand Up @@ -233,7 +233,7 @@ async fn version(registry: web::Data<Arc<RwLock<registry::RegistryState>>>) -> i
}

#[get("/subnets")]
async fn subnets(registry: web::Data<Arc<RwLock<registry::RegistryState>>>) -> impl Responder {
async fn list_subnets(registry: web::Data<Arc<RwLock<registry::RegistryState>>>) -> impl Responder {
let registry = registry.read().await;
response_from_result(registry.subnets_with_proposals().await)
}
Expand Down
Loading

0 comments on commit 73109b3

Please sign in to comment.