Skip to content

Commit

Permalink
feature(CON-1150): ic-boundary controlled by orchestrator
Browse files Browse the repository at this point in the history
  • Loading branch information
Sawchord committed Dec 19, 2023
1 parent 7373766 commit 58613a1
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 5 deletions.
1 change: 1 addition & 0 deletions ic-os/guestos/defs.bzl
Expand Up @@ -38,6 +38,7 @@ def image_deps(mode, malicious = False):
"//publish/binaries:ic-regedit": "/opt/ic/bin/ic-regedit:0755",
"//publish/binaries:ic-recovery": "/opt/ic/bin/ic-recovery:0755",
"//publish/binaries:orchestrator": "/opt/ic/bin/orchestrator:0755",
"//publish/binaries:ic-boundary": "/opt/ic/bin/ic-boundary:0755",
("//publish/malicious:replica" if malicious else "//publish/binaries:replica"): "/opt/ic/bin/replica:0755", # Install the malicious replica if set
"//publish/binaries:sandbox_launcher": "/opt/ic/bin/sandbox_launcher:0755",
"//publish/binaries:state-tool": "/opt/ic/bin/state-tool:0755",
Expand Down
152 changes: 152 additions & 0 deletions rs/orchestrator/src/boundary_node.rs
@@ -0,0 +1,152 @@
use crate::{
error::{OrchestratorError, OrchestratorResult},
metrics::OrchestratorMetrics,
process_manager::{Process, ProcessManager},
registry_helper::RegistryHelper,
};
use ic_logger::{info, warn, ReplicaLogger};
use ic_types::{NodeId, ReplicaVersion};
use std::{
path::PathBuf,
sync::{Arc, Mutex},
};

struct BoundaryNodeProcess {
version: ReplicaVersion,
binary: String,
args: Vec<String>,
}

impl Process for BoundaryNodeProcess {
const NAME: &'static str = "Boundary Node";

type Version = ReplicaVersion;

fn get_version(&self) -> &Self::Version {
&self.version
}

fn get_binary(&self) -> &str {
&self.binary
}

fn get_args(&self) -> &[String] {
&self.args
}
}

pub(crate) struct BoundaryNodeManager {
registry: Arc<RegistryHelper>,
_metrics: Arc<OrchestratorMetrics>,
process: Arc<Mutex<ProcessManager<BoundaryNodeProcess>>>,
ic_binary_dir: PathBuf,
version: ReplicaVersion,
logger: ReplicaLogger,
node_id: NodeId,
}

impl BoundaryNodeManager {
pub(crate) fn new(
registry: Arc<RegistryHelper>,
metrics: Arc<OrchestratorMetrics>,
version: ReplicaVersion,
node_id: NodeId,
ic_binary_dir: PathBuf,
logger: ReplicaLogger,
) -> Self {
Self {
registry,
_metrics: metrics,
process: Arc::new(Mutex::new(ProcessManager::new(
logger.clone().inner_logger.root,
))),
ic_binary_dir,
version,
logger,
node_id,
}
}

pub(crate) async fn check(&mut self) {
let registry_version = self.registry.get_latest_version();

match self
.registry
.get_api_boundary_node_version(self.node_id, registry_version)
{
Ok(replica_version) => {
// BN manager is waiting for Upgrade to be performed
if replica_version != self.version {
warn!(
every_n_seconds => 60,
self.logger, "Boundary node runs outdated version ({:?}), expecting upgrade to {:?}", self.version, replica_version
);
// NOTE: We could also shutdown the boundary node here. However, it makes sense to continue
// serving requests while the orchestrator is downloading the new image in most cases.
} else if let Err(err) = self.ensure_boundary_node_running(&self.version) {
warn!(self.logger, "Failed to start Boundary Node: {}", err);
}
}
// BN should not be active
Err(OrchestratorError::ApiBoundaryNodeMissingError(_, _)) => {
if let Err(err) = self.ensure_boundary_node_stopped() {
warn!(self.logger, "Failed to stop Boundary Node: {}", err);
}
}
// Failing to read the registry
Err(err) => warn!(
self.logger,
"Failed to fetch Boundary Node version: {}", err
),
}
}

/// Start the current boundary node process
fn ensure_boundary_node_running(&self, version: &ReplicaVersion) -> OrchestratorResult<()> {
let mut process = self.process.lock().unwrap();

if process.is_running() {
return Ok(());
}
info!(self.logger, "Starting new boundary node process");

let binary = self
.ic_binary_dir
.join("ic-boundary")
.as_path()
.display()
.to_string();
// TODO: Should these values be settable via config?
let args = vec![
format!("--local-store-path=/var/lib/ic/data/ic_registry_local_store"),
format!("--http-port=4444"),
format!("--metrics-addr=[::]:9324"),
format!("--disable-registry-replicator"),
];

process
.start(BoundaryNodeProcess {
version: version.clone(),
binary,
args,
})
.map_err(|e| {
OrchestratorError::IoError(
"Error when attempting to start new boundary node".into(),
e,
)
})
}

/// Stop the current boundary node process.
fn ensure_boundary_node_stopped(&self) -> OrchestratorResult<()> {
let mut process = self.process.lock().unwrap();
if process.is_running() {
return process.stop().map_err(|e| {
OrchestratorError::IoError("Error when attempting to stop boundary node".into(), e)
});
}

Ok(())
}
}
9 changes: 9 additions & 0 deletions rs/orchestrator/src/error.rs
Expand Up @@ -20,6 +20,10 @@ pub enum OrchestratorError {
/// version
SubnetMissingError(SubnetId, RegistryVersion),

/// The given node id does not map to an `ApiBoundaryNodeRecord` at the
/// given version
ApiBoundaryNodeMissingError(NodeId, RegistryVersion),

/// An error occurred when querying the Registry that prevents Orchestrator
/// from making progress
RegistryClientError(RegistryClientError),
Expand Down Expand Up @@ -120,6 +124,11 @@ impl fmt::Display for OrchestratorError {
"Subnet ID {:?} does not exist in the Registry at registry version {:?}",
subnet_id, registry_version
),
OrchestratorError::ApiBoundaryNodeMissingError(node_id, registry_version) => write!(
f,
"Api Boundary Node ID {:?} does not exist in the Registry at registry version {:?}",
node_id, registry_version
),
OrchestratorError::ReplicaVersionParseError(e) => {
write!(f, "Failed to parse replica version: {}", e)
}
Expand Down
1 change: 1 addition & 0 deletions rs/orchestrator/src/lib.rs
Expand Up @@ -33,6 +33,7 @@
//! system to read.

pub mod args;
mod boundary_node;
mod catch_up_package_provider;
mod dashboard;
pub mod error;
Expand Down
39 changes: 38 additions & 1 deletion rs/orchestrator/src/orchestrator.rs
@@ -1,4 +1,5 @@
use crate::args::OrchestratorArgs;
use crate::boundary_node::BoundaryNodeManager;
use crate::catch_up_package_provider::CatchUpPackageProvider;
use crate::dashboard::{Dashboard, OrchestratorDashboard};
use crate::firewall::Firewall;
Expand Down Expand Up @@ -38,6 +39,7 @@ pub struct Orchestrator {
_metrics_runtime: MetricsHttpEndpoint,
upgrade: Option<Upgrade>,
hostos_upgrade: Option<HostosUpgrader>,
boundary_node_manager: Option<BoundaryNodeManager>,
firewall: Option<Firewall>,
ssh_access_manager: Option<SshAccessManager>,
orchestrator_dashboard: Option<OrchestratorDashboard>,
Expand Down Expand Up @@ -215,7 +217,7 @@ impl Orchestrator {
replica_version.clone(),
args.replica_config_file.clone(),
node_id,
ic_binary_directory,
ic_binary_directory.clone(),
registry_replicator,
args.replica_binary_dir.clone(),
logger.clone(),
Expand Down Expand Up @@ -248,6 +250,15 @@ impl Orchestrator {
),
};

let boundary_node = BoundaryNodeManager::new(
Arc::clone(&registry),
Arc::clone(&metrics),
replica_version.clone(),
node_id,
ic_binary_directory,
logger.clone(),
);

let firewall = Firewall::new(
node_id,
Arc::clone(&registry),
Expand Down Expand Up @@ -283,6 +294,7 @@ impl Orchestrator {
_metrics_runtime,
upgrade,
hostos_upgrade,
boundary_node_manager: Some(boundary_node),
firewall: Some(firewall),
ssh_access_manager: Some(ssh_access_manager),
orchestrator_dashboard,
Expand Down Expand Up @@ -359,6 +371,22 @@ impl Orchestrator {
info!(log, "Shut down the HostOS upgrade loop");
}

async fn boundary_node_check(
mut boundary_node_manager: BoundaryNodeManager,
mut exit_signal: Receiver<bool>,
log: ReplicaLogger,
) {
while !*exit_signal.borrow() {
boundary_node_manager.check().await;

tokio::select! {
_ = tokio::time::sleep(CHECK_INTERVAL_SECS) => {}
_ = exit_signal.changed() => {}
}
}
info!(log, "Shut down the boundary node management loop");
}

async fn tecdsa_key_rotation_check(
maybe_subnet_id: Arc<RwLock<Option<SubnetId>>>,
registration: NodeRegistration,
Expand Down Expand Up @@ -430,6 +458,15 @@ impl Orchestrator {
)));
}

if let Some(boundary_node) = self.boundary_node_manager.take() {
info!(self.logger, "Spawning boundary node management loop");
self.task_handles.push(tokio::spawn(boundary_node_check(
boundary_node,
self.exit_signal.clone(),
self.logger.clone(),
)));
}

if let (Some(ssh), Some(firewall)) = (self.ssh_access_manager.take(), self.firewall.take())
{
info!(
Expand Down
29 changes: 29 additions & 0 deletions rs/orchestrator/src/registry_helper.rs
Expand Up @@ -2,10 +2,12 @@ use crate::error::{OrchestratorError, OrchestratorResult};
use ic_consensus::dkg::make_registry_cup;
use ic_interfaces_registry::RegistryClient;
use ic_logger::ReplicaLogger;
use ic_protobuf::registry::api_boundary_node::v1::ApiBoundaryNodeRecord;
use ic_protobuf::registry::firewall::v1::FirewallRuleSet;
use ic_protobuf::registry::hostos_version::v1::HostosVersionRecord;
use ic_protobuf::registry::replica_version::v1::ReplicaVersionRecord;
use ic_protobuf::registry::subnet::v1::SubnetRecord;
use ic_registry_client_helpers::api_boundary_node::ApiBoundaryNodeRegistry;
use ic_registry_client_helpers::firewall::FirewallRegistry;
use ic_registry_client_helpers::hostos_version::HostosRegistry;
use ic_registry_client_helpers::node::NodeRegistry;
Expand Down Expand Up @@ -92,6 +94,23 @@ impl RegistryHelper {
}
}

pub(crate) fn get_api_boundary_node_record(
&self,
node_id: NodeId,
version: RegistryVersion,
) -> OrchestratorResult<ApiBoundaryNodeRecord> {
match self
.registry_client
.get_api_boundary_node_record(node_id, version)
.map_err(|err| OrchestratorError::RegistryClientError(err))?
{
Some(record) => Ok(record),
_ => Err(OrchestratorError::ApiBoundaryNodeMissingError(
node_id, version,
)),
}
}

/// Return the `ReplicaVersionRecord` for the given replica version
pub(crate) fn get_replica_version_record(
&self,
Expand Down Expand Up @@ -219,6 +238,16 @@ impl RegistryHelper {
}
}

pub(crate) fn get_api_boundary_node_version(
&self,
node_id: NodeId,
version: RegistryVersion,
) -> OrchestratorResult<ReplicaVersion> {
let api_boundary_node_record = self.get_api_boundary_node_record(node_id, version)?;
ReplicaVersion::try_from(api_boundary_node_record.version.as_ref())
.map_err(OrchestratorError::ReplicaVersionParseError)
}

/// Return the DC ID where the current replica is located.
pub fn dc_id(&self) -> Option<String> {
let registry_version = self.get_latest_version();
Expand Down
11 changes: 10 additions & 1 deletion rs/orchestrator/src/upgrade.rs
Expand Up @@ -326,9 +326,18 @@ impl Upgrade {

async fn check_for_upgrade_as_unassigned(&mut self) -> OrchestratorResult<()> {
let registry_version = self.registry.get_latest_version();

// If the node is a boundary node, we upgrade to that version, otherwise we upgrade to the unassigned version
let replica_version = self
.registry
.get_unassigned_replica_version(registry_version)?;
.get_api_boundary_node_version(self.node_id, registry_version)
.or_else(|err| match err {
OrchestratorError::NodeUnassignedError(_, _) => self
.registry
.get_unassigned_replica_version(registry_version),
err => Err(err),
})?;

if self.replica_version == replica_version {
return Ok(());
}
Expand Down

0 comments on commit 58613a1

Please sign in to comment.