Skip to content

Commit

Permalink
feature(ipv4-for-nodes): make orchestrator pick up IPv4 config
Browse files Browse the repository at this point in the history
  • Loading branch information
r-birkner committed Jan 31, 2024
1 parent 97d3f03 commit 86c70df
Show file tree
Hide file tree
Showing 12 changed files with 559 additions and 82 deletions.
5 changes: 5 additions & 0 deletions rs/orchestrator/src/dashboard.rs
Expand Up @@ -22,6 +22,7 @@ pub(crate) struct OrchestratorDashboard {
node_id: NodeId,
last_applied_ssh_parameters: Arc<RwLock<SshAccessParameters>>,
last_applied_firewall_version: Arc<RwLock<RegistryVersion>>,
last_applied_ipv4_config_version: Arc<RwLock<RegistryVersion>>,
replica_process: Arc<Mutex<ProcessManager<ReplicaProcess>>>,
subnet_id: Arc<RwLock<Option<SubnetId>>>,
replica_version: ReplicaVersion,
Expand All @@ -48,6 +49,7 @@ impl Dashboard for OrchestratorDashboard {
scheduled upgrade: {}\n\
{}\n\
firewall config registry version: {}\n\
ipv4 config registry version: {}\n\
{}\n\
readonly keys: {}\n\
backup keys: {}\n\
Expand All @@ -65,6 +67,7 @@ impl Dashboard for OrchestratorDashboard {
self.get_scheduled_upgrade().await,
self.get_local_cup_info(),
*self.last_applied_firewall_version.read().await,
*self.last_applied_ipv4_config_version.read().await,
self.display_last_applied_ssh_parameters().await,
self.get_authorized_keys("readonly"),
self.get_authorized_keys("backup"),
Expand All @@ -83,6 +86,7 @@ impl OrchestratorDashboard {
node_id: NodeId,
last_applied_ssh_parameters: Arc<RwLock<SshAccessParameters>>,
last_applied_firewall_version: Arc<RwLock<RegistryVersion>>,
last_applied_ipv4_config_version: Arc<RwLock<RegistryVersion>>,
replica_process: Arc<Mutex<ProcessManager<ReplicaProcess>>>,
subnet_id: Arc<RwLock<Option<SubnetId>>>,
replica_version: ReplicaVersion,
Expand All @@ -95,6 +99,7 @@ impl OrchestratorDashboard {
node_id,
last_applied_ssh_parameters,
last_applied_firewall_version,
last_applied_ipv4_config_version,
replica_process,
subnet_id,
replica_version,
Expand Down
6 changes: 6 additions & 0 deletions rs/orchestrator/src/error.rs
Expand Up @@ -61,6 +61,9 @@ pub enum OrchestratorError {

/// SNP error while registering a SEV-SNP node
SnpError(String),

/// Network configuration error
NetworkConfigurationError(String),
}

impl OrchestratorError {
Expand Down Expand Up @@ -139,6 +142,9 @@ impl fmt::Display for OrchestratorError {
),
OrchestratorError::UpgradeError(msg) => write!(f, "Failed to upgrade: {}", msg),
OrchestratorError::SnpError(msg) => write!(f, "SEV-SNP Error: {}", msg),
OrchestratorError::NetworkConfigurationError(msg) => {
write!(f, "Failed to apply network configuration: {}", msg)
}
}
}
}
Expand Down
164 changes: 164 additions & 0 deletions rs/orchestrator/src/ipv4_network.rs
@@ -0,0 +1,164 @@
use crate::{
error::{OrchestratorError, OrchestratorResult},
metrics::OrchestratorMetrics,
registry_helper::RegistryHelper,
};

use ic_logger::{debug, info, warn, ReplicaLogger};
use ic_protobuf::registry::node::v1::IPv4InterfaceConfig;
use ic_types::RegistryVersion;
use std::{path::PathBuf, sync::Arc};
use tokio::process::Command;
use tokio::sync::RwLock;

/// Provides function to check the registry to determine if there
/// has been a change in the IPv4 config, and if so, updates the node's
/// network configuration accordingly.
pub(crate) struct Ipv4Configurator {
registry: Arc<RegistryHelper>,
metrics: Arc<OrchestratorMetrics>,
logger: ReplicaLogger,
last_applied_version: Arc<RwLock<RegistryVersion>>,
ic_binary_dir: PathBuf,
configuration: Option<IPv4InterfaceConfig>,
}

impl Ipv4Configurator {
pub(crate) fn new(
registry: Arc<RegistryHelper>,
metrics: Arc<OrchestratorMetrics>,
ic_binary_dir: PathBuf,
logger: ReplicaLogger,
) -> Self {
Self {
registry,
metrics,
logger,
last_applied_version: Default::default(),
ic_binary_dir,
configuration: None,
}
}

/// Applies an IPv4 conifguration of that node changed and applies it
async fn apply_ipv4_config_change(
&mut self,
ipv4_config: Option<IPv4InterfaceConfig>,
) -> OrchestratorResult<()> {
// call the helper to configure the interface
info!(
self.logger,
"Attempting to apply the new IPv4 configuration."
);
match &self.configuration {
Some(config) => info!(
self.logger,
"Current IPv4 config: address={}/{}, gateway={}.",
config.ip_addr,
config.prefix_length,
config.gateway_ip_addr.first().unwrap_or(&"n/a".to_string())
),
None => info!(self.logger, "Current IPv4 config: no configuration."),
}

match &ipv4_config {
Some(config) => info!(
self.logger,
"New IPv4 config: address={}/{}, gateway={}.",
config.ip_addr,
config.prefix_length,
config.gateway_ip_addr.first().unwrap_or(&"n/a".to_string())
),
None => info!(self.logger, "New IPv4 config: no configuration."),
}

let script = self.ic_binary_dir.join("guestos_tool");
let mut cmd = Command::new("sudo");

cmd.arg(script.into_os_string())
.arg("regenerate-network-config");

if let Some(config) = &ipv4_config {
cmd.arg(format!("--ipv4-address={}", config.ip_addr))
.arg(format!("--ipv4-prefix-length={}", config.prefix_length))
.arg(format!(
"--ipv4-gateway={}",
config.gateway_ip_addr.first().expect("missing gateway")
));
}

let out = cmd
.output()
.await
.map_err(|e| OrchestratorError::NetworkConfigurationError(e.to_string()))?;

if !out.status.success() {
return Err(OrchestratorError::NetworkConfigurationError(format!(
"guestos_tool failed to regenerate the network config: {:?} - stdout: {} - stderr: {}",
out.status,
String::from_utf8_lossy(&out.stdout).trim(),
String::from_utf8_lossy(&out.stderr).trim()
)));
} else {
info!(
self.logger,
"successfully applied the new network config - stdout: {} - stderr: {}",
String::from_utf8_lossy(&out.stdout).trim(),
String::from_utf8_lossy(&out.stderr).trim()
);

// after successfully applying the configuration, update the state
self.configuration = ipv4_config;
}

Ok(())
}

/// Checks for a change in the IPv4 configuration, and if found, updates the
/// local network configuration
pub async fn check_and_update(&mut self) {
let registry_version = self.registry.get_latest_version();
debug!(
self.logger,
"Checking IPv4 config at registry version: {}", registry_version
);

// fetch the IPv4 config from the registry
let ipv4_config = match self.registry.get_node_ipv4_config(registry_version) {
Ok(config) => config,
Err(e) => {
warn!(
self.logger,
"Failed to fetch the IPv4 config from the registry at version {}: {}",
registry_version,
e
);
return; // Early return on error
}
};

// check if the configuration changed and if so, apply the changes
if self.configuration != ipv4_config {
match self.apply_ipv4_config_change(ipv4_config).await {
Ok(()) => self
.metrics
.ipv4_registry_version
.set(registry_version.get() as i64),
Err(e) => {
warn!(
self.logger,
"Failed to apply the IPv4 config at version {}: {}", registry_version, e
);
return;
}
};
};

// keep track of the last successfully applied registry version (even if there was no change in the IPv4 config)
*self.last_applied_version.write().await = registry_version;
}

pub fn get_last_applied_version(&self) -> Arc<RwLock<RegistryVersion>> {
Arc::clone(&self.last_applied_version)
}
}
1 change: 1 addition & 0 deletions rs/orchestrator/src/lib.rs
Expand Up @@ -39,6 +39,7 @@ mod dashboard;
pub mod error;
mod firewall;
mod hostos_upgrade;
mod ipv4_network;
mod metrics;
pub mod orchestrator;
mod process_manager;
Expand Down
5 changes: 5 additions & 0 deletions rs/orchestrator/src/metrics.rs
Expand Up @@ -8,6 +8,7 @@ pub const PROMETHEUS_HTTP_PORT: u16 = 9091;
pub struct OrchestratorMetrics {
pub ssh_access_registry_version: IntGauge,
pub firewall_registry_version: IntGauge,
pub ipv4_registry_version: IntGauge,
pub reboot_duration: IntGauge,
pub orchestrator_info: IntGaugeVec,
pub key_rotation_status: IntGaugeVec,
Expand Down Expand Up @@ -49,6 +50,10 @@ impl OrchestratorMetrics {
"firewall_registry_version",
"Latest registry version used for firewall configuration",
),
ipv4_registry_version: metrics_registry.int_gauge(
"ipv4_registry_version",
"Latest registry version used for the IPv4 configuration",
),
reboot_duration: metrics_registry.int_gauge(
"reboot_duration_seconds",
"The time it took for the node to reboot",
Expand Down
38 changes: 30 additions & 8 deletions rs/orchestrator/src/orchestrator.rs
Expand Up @@ -4,6 +4,7 @@ use crate::catch_up_package_provider::CatchUpPackageProvider;
use crate::dashboard::{Dashboard, OrchestratorDashboard};
use crate::firewall::Firewall;
use crate::hostos_upgrade::HostosUpgrader;
use crate::ipv4_network::Ipv4Configurator;
use crate::metrics::OrchestratorMetrics;
use crate::process_manager::ProcessManager;
use crate::registration::NodeRegistration;
Expand Down Expand Up @@ -51,6 +52,7 @@ pub struct Orchestrator {
subnet_id: Arc<RwLock<Option<SubnetId>>>,
// Handles of async tasks used to wait for their completion
task_handles: Vec<JoinHandle<()>>,
ipv4_configurator: Option<Ipv4Configurator>,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -257,7 +259,7 @@ impl Orchestrator {
Arc::clone(&metrics),
replica_version.clone(),
node_id,
ic_binary_directory,
ic_binary_directory.clone(),
logger.clone(),
);

Expand All @@ -270,6 +272,13 @@ impl Orchestrator {
logger.clone(),
);

let ipv4_configurator = Ipv4Configurator::new(
Arc::clone(&registry),
Arc::clone(&metrics),
ic_binary_directory,
logger.clone(),
);

let ssh_access_manager =
SshAccessManager::new(Arc::clone(&registry), Arc::clone(&metrics), logger.clone());

Expand All @@ -280,6 +289,7 @@ impl Orchestrator {
node_id,
ssh_access_manager.get_last_applied_parameters(),
firewall.get_last_applied_version(),
ipv4_configurator.get_last_applied_version(),
replica_process,
Arc::clone(&subnet_id),
replica_version,
Expand All @@ -305,6 +315,7 @@ impl Orchestrator {
exit_signal,
subnet_id,
task_handles: Default::default(),
ipv4_configurator: Some(ipv4_configurator),
})
}

Expand Down Expand Up @@ -416,10 +427,11 @@ impl Orchestrator {
info!(log, "Shut down the tECDSA key rotation loop");
}

async fn ssh_key_and_firewall_rules_checks(
async fn ssh_key_and_firewall_rules_and_ipv4_config_checks(
maybe_subnet_id: Arc<RwLock<Option<SubnetId>>>,
mut ssh_access_manager: SshAccessManager,
mut firewall: Firewall,
mut ipv4_configurator: Ipv4Configurator,
mut exit_signal: Receiver<bool>,
log: ReplicaLogger,
) {
Expand All @@ -430,12 +442,17 @@ impl Orchestrator {
.await;
// Check and update the firewall rules
firewall.check_and_update().await;
// Check and update the network configuration
ipv4_configurator.check_and_update().await;
tokio::select! {
_ = tokio::time::sleep(CHECK_INTERVAL_SECS) => {}
_ = exit_signal.changed() => {}
}
}
info!(log, "Shut down the ssh keys & firewall monitoring loop");
info!(
log,
"Shut down the ssh keys, firewall, and IPv4 config monitoring loop"
);
}

async fn serve_dashboard(
Expand Down Expand Up @@ -475,20 +492,25 @@ impl Orchestrator {
)));
}

if let (Some(ssh), Some(firewall)) = (self.ssh_access_manager.take(), self.firewall.take())
{
if let (Some(ssh), Some(firewall), Some(ipv4_configurator)) = (
self.ssh_access_manager.take(),
self.firewall.take(),
self.ipv4_configurator.take(),
) {
info!(
self.logger,
"Spawning the ssh-key and firewall rules check loop"
);
self.task_handles
.push(tokio::spawn(ssh_key_and_firewall_rules_checks(
self.task_handles.push(tokio::spawn(
ssh_key_and_firewall_rules_and_ipv4_config_checks(
Arc::clone(&self.subnet_id),
ssh,
firewall,
ipv4_configurator,
self.exit_signal.clone(),
self.logger.clone(),
)));
),
));
}
if let Some(dashboard) = self.orchestrator_dashboard.take() {
info!(self.logger, "Spawning the orchestrator dashboard");
Expand Down
16 changes: 16 additions & 0 deletions rs/orchestrator/src/registry_helper.rs
Expand Up @@ -5,6 +5,7 @@ use ic_logger::ReplicaLogger;
use ic_protobuf::registry::api_boundary_node::v1::ApiBoundaryNodeRecord;
use ic_protobuf::registry::firewall::v1::FirewallRuleSet;
use ic_protobuf::registry::hostos_version::v1::HostosVersionRecord;
use ic_protobuf::registry::node::v1::IPv4InterfaceConfig;
use ic_protobuf::registry::replica_version::v1::ReplicaVersionRecord;
use ic_protobuf::registry::subnet::v1::SubnetRecord;
use ic_registry_client_helpers::api_boundary_node::ApiBoundaryNodeRegistry;
Expand Down Expand Up @@ -291,4 +292,19 @@ impl RegistryHelper {
})
.transpose()
}

pub(crate) fn get_node_ipv4_config(
&self,
version: RegistryVersion,
) -> OrchestratorResult<Option<IPv4InterfaceConfig>> {
let node_record_option = self
.registry_client
.get_node_record(self.node_id, version)
.map_err(OrchestratorError::RegistryClientError)?;

let result = node_record_option
.map(|node_record| node_record.public_ipv4_config)
.unwrap_or(None);
Ok(result)
}
}

0 comments on commit 86c70df

Please sign in to comment.