diff --git a/Cargo.lock b/Cargo.lock index 7ef6919fbd..bb99e7adc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -243,6 +243,7 @@ dependencies = [ "fluence-libp2p", "fs-utils", "futures", + "health", "humantime 2.1.0", "key-manager", "libp2p", @@ -2142,6 +2143,13 @@ dependencies = [ "num-traits", ] +[[package]] +name = "health" +version = "0.1.0" +dependencies = [ + "eyre", +] + [[package]] name = "heck" version = "0.3.3" @@ -4061,6 +4069,7 @@ dependencies = [ "fs-utils", "fstrings", "futures", + "health", "humantime-serde", "hyper", "itertools 0.11.0", @@ -4369,6 +4378,7 @@ dependencies = [ "fluence-app-service", "fluence-keypair", "futures", + "health", "humantime-serde", "itertools 0.11.0", "ivalue-utils", @@ -4481,6 +4491,7 @@ dependencies = [ "fluence-app-service", "fluence-libp2p", "fs-utils", + "health", "humantime-serde", "json-utils", "libp2p-identity", diff --git a/Cargo.toml b/Cargo.toml index f86cf2aeec..0533c71278 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ members = [ "crates/peer-metrics", "crates/spell-event-bus", "crates/key-manager", + "crates/health", "sorcerer", "crates/nox-tests", "nox", @@ -86,6 +87,7 @@ script-storage = { path = "script-storage" } spell-storage = { path = "spell-storage" } particle-execution = { path = "particle-execution" } system-services = { path = "crates/system-services" } +health = { path = "crates/health" } # spell fluence-spell-dtos = "=0.5.16" diff --git a/aquamarine/Cargo.toml b/aquamarine/Cargo.toml index 694533a9e3..340d0125d3 100644 --- a/aquamarine/Cargo.toml +++ b/aquamarine/Cargo.toml @@ -39,3 +39,4 @@ anyhow = "1.0.72" eyre = { workspace = true } bytesize = "1.2.0" async-trait = "0.1.71" +health = { workspace = true } diff --git a/aquamarine/src/aquamarine.rs b/aquamarine/src/aquamarine.rs index 0e42720cf3..6b6b962caf 100644 --- a/aquamarine/src/aquamarine.rs +++ b/aquamarine/src/aquamarine.rs @@ -23,6 +23,7 @@ use tokio::sync::mpsc; use tokio::task::JoinHandle; use fluence_libp2p::PeerId; +use health::HealthCheckRegistry; use key_manager::KeyManager; use particle_execution::{ParticleFunctionStatic, ServiceFunction}; use particle_protocol::Particle; @@ -53,12 +54,18 @@ impl AquamarineBackend { out: EffectsChannel, plumber_metrics: Option, vm_pool_metrics: Option, + health_registry: Option<&mut HealthCheckRegistry>, key_manager: KeyManager, ) -> (Self, AquamarineApi) { // TODO: make `100` configurable let (outlet, inlet) = mpsc::channel(100); let sender = AquamarineApi::new(outlet, config.execution_timeout); - let vm_pool = VmPool::new(config.pool_size, runtime_config, vm_pool_metrics); + let vm_pool = VmPool::new( + config.pool_size, + runtime_config, + vm_pool_metrics, + health_registry, + ); let host_peer_id = key_manager.get_host_peer_id(); let plumber = Plumber::new(vm_pool, builtins, plumber_metrics, key_manager); let this = Self { diff --git a/aquamarine/src/vm_pool.rs b/aquamarine/src/vm_pool.rs index c85779d6f0..6959675b52 100644 --- a/aquamarine/src/vm_pool.rs +++ b/aquamarine/src/vm_pool.rs @@ -14,9 +14,12 @@ * limitations under the License. */ +use std::sync::Arc; use std::task::{Context, Poll}; use futures::{future::BoxFuture, FutureExt}; +use health::{HealthCheck, HealthCheckRegistry}; +use parking_lot::Mutex; use peer_metrics::VmPoolMetrics; @@ -39,6 +42,7 @@ pub struct VmPool { runtime_config: RT::Config, pool_size: usize, metrics: Option, + health: Option, } impl VmPool { @@ -47,13 +51,21 @@ impl VmPool { pool_size: usize, runtime_config: RT::Config, metrics: Option, + health_registry: Option<&mut HealthCheckRegistry>, ) -> Self { + let health = health_registry.map(|registry| { + let vm_pool = VMPoolHealCheck::new(pool_size); + registry.register("vm_pool", vm_pool.clone()); + vm_pool + }); + let mut this = Self { runtimes: Vec::with_capacity(pool_size), creating_runtimes: None, runtime_config, pool_size, metrics, + health, }; this.runtimes.resize_with(pool_size, || None); @@ -161,7 +173,10 @@ impl VmPool { // Put created vm to self.vms match vm { - Ok(vm) => vms[id] = Some(vm), + Ok(vm) => { + vms[id] = Some(vm); + self.health.as_ref().map(|h| h.increment_count()); + } Err(err) => log::error!("Failed to create vm: {:?}", err), // TODO: don't panic } @@ -175,3 +190,39 @@ impl VmPool { } } } + +#[derive(Clone)] +pub struct VMPoolHealCheck { + expected_count: usize, + current_count: Arc>, +} + +impl VMPoolHealCheck { + pub fn new(expected_count: usize) -> Self { + Self { + expected_count, + current_count: Arc::new(Mutex::new(0)), + } + } + + pub fn increment_count(&self) { + let mut guard = self.current_count.lock(); + *guard += 1; + } +} + +impl HealthCheck for VMPoolHealCheck { + fn check(&self) -> eyre::Result<()> { + let guard = self.current_count.lock(); + let current = *guard; + if self.expected_count != current { + return Err(eyre::eyre!( + "VM pool isn't full. Current: {}, Expected: {}", + current, + self.expected_count + )); + } + + Ok(()) + } +} diff --git a/crates/health/Cargo.toml b/crates/health/Cargo.toml new file mode 100644 index 0000000000..abfa08ffe5 --- /dev/null +++ b/crates/health/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "health" +version = "0.1.0" +authors = ["Fluence Labs"] +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +eyre = { workspace = true } diff --git a/crates/health/src/lib.rs b/crates/health/src/lib.rs new file mode 100644 index 0000000000..dd21a77a7c --- /dev/null +++ b/crates/health/src/lib.rs @@ -0,0 +1,51 @@ +pub trait HealthCheck: Send + Sync + 'static { + fn check(&self) -> eyre::Result<()>; +} + +pub struct HealthCheckRegistry { + checks: Vec<(String, Box)>, +} + +pub enum HealthCheckResult { + Ok(Vec), + Warning(Vec, Vec), + Fail(Vec), +} + +impl HealthCheckRegistry { + pub fn new() -> Self { + HealthCheckRegistry { checks: Vec::new() } + } + + pub fn register(&mut self, name: &str, check: impl HealthCheck) { + self.checks.push((name.to_string(), Box::new(check))); + } + + pub fn check(&self) -> HealthCheckResult { + let mut fails = Vec::new(); + let mut oks = Vec::new(); + + for (name, check) in &self.checks { + match check.check() { + Ok(_) => oks.push(name.clone()), + Err(_) => { + fails.push(name.clone()); + } + } + } + + if fails.is_empty() { + HealthCheckResult::Ok(oks) + } else if fails.len() == self.checks.len() { + HealthCheckResult::Fail(fails) + } else { + HealthCheckResult::Warning(oks, fails) + } + } +} + +impl Default for HealthCheckRegistry { + fn default() -> Self { + HealthCheckRegistry::new() + } +} diff --git a/crates/server-config/src/defaults.rs b/crates/server-config/src/defaults.rs index 7c4c17ec8e..3235157d34 100644 --- a/crates/server-config/src/defaults.rs +++ b/crates/server-config/src/defaults.rs @@ -71,6 +71,10 @@ pub fn default_metrics_enabled() -> bool { true } +pub fn default_health_check_enabled() -> bool { + true +} + pub fn default_services_metrics_timer_resolution() -> Duration { Duration::from_secs(60) } diff --git a/crates/server-config/src/network_config.rs b/crates/server-config/src/network_config.rs index 1da3d49baf..e7ba4edda2 100644 --- a/crates/server-config/src/network_config.rs +++ b/crates/server-config/src/network_config.rs @@ -38,7 +38,6 @@ pub struct NetworkConfig { pub bootstrap_frequency: usize, pub connectivity_metrics: Option, pub connection_pool_metrics: Option, - #[allow(deprecated)] pub connection_limits: ConnectionLimits, } diff --git a/crates/server-config/src/node_config.rs b/crates/server-config/src/node_config.rs index f4971be1b7..925146d72b 100644 --- a/crates/server-config/src/node_config.rs +++ b/crates/server-config/src/node_config.rs @@ -71,6 +71,9 @@ pub struct UnresolvedNodeConfig { #[serde(flatten)] pub metrics_config: MetricsConfig, + #[serde(flatten)] + pub health_config: HealthConfig, + #[serde(flatten)] pub http_config: Option, @@ -178,6 +181,7 @@ impl UnresolvedNodeConfig { external_address: self.external_address, external_multiaddresses: self.external_multiaddresses, metrics_config: self.metrics_config, + health_config: self.health_config, bootstrap_config: self.bootstrap_config, root_weights: self.root_weights, services_envs: self.services_envs, @@ -304,6 +308,8 @@ pub struct NodeConfig { pub metrics_config: MetricsConfig, + pub health_config: HealthConfig, + pub bootstrap_config: BootstrapConfig, pub root_weights: HashMap, @@ -399,6 +405,13 @@ pub struct MetricsConfig { pub max_builtin_metrics_storage_size: usize, } +#[derive(Clone, Deserialize, Serialize, Derivative)] +#[derivative(Debug)] +pub struct HealthConfig { + #[serde(default = "default_health_check_enabled")] + pub health_check_enabled: bool, +} + #[derive(Clone, Deserialize, Serialize, Derivative)] #[derivative(Debug)] pub struct ListenConfig { diff --git a/nox/Cargo.toml b/nox/Cargo.toml index 6e507e81ac..b2ee6f1ac8 100644 --- a/nox/Cargo.toml +++ b/nox/Cargo.toml @@ -16,6 +16,7 @@ connection-pool = { workspace = true } script-storage = { workspace = true } aquamarine = { workspace = true } sorcerer = { workspace = true } +health = { workspace = true } dhat = { version = "0.3.2", optional = true } serde_json = { workspace = true } diff --git a/nox/src/behaviour/network.rs b/nox/src/behaviour/network.rs index d1c4e4bb9a..d89e8af28a 100644 --- a/nox/src/behaviour/network.rs +++ b/nox/src/behaviour/network.rs @@ -23,11 +23,12 @@ use libp2p::{ use tokio::sync::mpsc; use connection_pool::ConnectionPoolBehaviour; +use health::HealthCheckRegistry; use kademlia::{Kademlia, KademliaConfig}; use particle_protocol::{Particle, PROTOCOL_NAME}; use server_config::NetworkConfig; -use crate::connectivity::Connectivity; +use crate::connectivity::{BootstrapNodesHealthCheck, Connectivity, ConnectivityHealthChecks}; /// Coordinates protocols, so they can cooperate #[derive(NetworkBehaviour)] @@ -40,7 +41,10 @@ pub struct FluenceNetworkBehaviour { } impl FluenceNetworkBehaviour { - pub fn new(cfg: NetworkConfig) -> (Self, Connectivity, mpsc::Receiver) { + pub fn new( + cfg: NetworkConfig, + health_registry: Option<&mut HealthCheckRegistry>, + ) -> (Self, Connectivity, mpsc::Receiver) { let local_public_key = cfg.key_pair.public(); let identify = Identify::new( IdentifyConfig::new(PROTOCOL_NAME.into(), local_public_key) @@ -71,6 +75,15 @@ impl FluenceNetworkBehaviour { ping, }; + let bootstrap_nodes = cfg.bootstrap_nodes.clone(); + + let health = health_registry.map(|registry| { + let bootstrap_nodes = BootstrapNodesHealthCheck::new(bootstrap_nodes); + registry.register("bootstrap_nodes", bootstrap_nodes.clone()); + + ConnectivityHealthChecks { bootstrap_nodes } + }); + let connectivity = Connectivity { peer_id: cfg.local_peer_id, kademlia: kademlia_api, @@ -78,6 +91,7 @@ impl FluenceNetworkBehaviour { bootstrap_nodes: cfg.bootstrap_nodes.into_iter().collect(), bootstrap_frequency: cfg.bootstrap_frequency, metrics: cfg.connectivity_metrics, + health, }; (this, connectivity, particle_stream) diff --git a/nox/src/connectivity.rs b/nox/src/connectivity.rs index 5bfbb140f1..cca3ae7f64 100644 --- a/nox/src/connectivity.rs +++ b/nox/src/connectivity.rs @@ -15,19 +15,21 @@ */ use std::cmp::min; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use std::time::Duration; -use futures::{stream::iter, StreamExt}; -use humantime_serde::re::humantime::format_duration as pretty; -use libp2p::Multiaddr; -use tokio::time::sleep; - use connection_pool::{ConnectionPoolApi, ConnectionPoolT, LifecycleEvent}; use fluence_libp2p::PeerId; +use futures::{stream::iter, StreamExt}; +use health::HealthCheck; +use humantime_serde::re::humantime::format_duration as pretty; use kademlia::{KademliaApi, KademliaApiT, KademliaError}; +use libp2p::Multiaddr; +use parking_lot::Mutex; use particle_protocol::{Contact, Particle, SendStatus}; use peer_metrics::{ConnectivityMetrics, Resolution}; +use tokio::time::sleep; use crate::tasks::Tasks; @@ -44,6 +46,7 @@ pub struct Connectivity { /// This setting specify that N. pub bootstrap_frequency: usize, pub metrics: Option, + pub health: Option, } impl Connectivity { @@ -207,6 +210,7 @@ impl Connectivity { let kademlia = self.kademlia; let bootstrap_nodes = self.bootstrap_nodes; let metrics = self.metrics.as_ref(); + let health = self.health.as_ref(); let disconnections = { use tokio_stream::StreamExt as stream; @@ -215,11 +219,17 @@ impl Connectivity { let events = pool.lifecycle_events(); stream::filter_map(events, move |e| { if let LifecycleEvent::Disconnected(Contact { addresses, .. }) = e { - metrics.map(|m| m.bootstrap_disconnected.inc()); let addresses = addresses.into_iter(); let addresses = addresses.filter(|addr| bootstrap_nodes.contains(addr)); - let addresses = iter(addresses.collect::>()); - return Some(addresses); + let addresses = addresses.collect::>(); + if !addresses.is_empty() { + metrics.map(|m| m.bootstrap_disconnected.inc()); + health.map(|h| { + h.bootstrap_nodes + .on_bootstrap_disconnected(addresses.clone()) + }); + }; + return Some(iter(addresses)); } None }) @@ -240,6 +250,7 @@ impl Connectivity { let ok = kademlia.add_contact(contact); debug_assert!(ok, "kademlia.add_contact"); metrics.map(|m| m.bootstrap_connected.inc()); + health.map(|h| h.bootstrap_nodes.on_bootstrap_connected(addr)); break; } @@ -268,3 +279,49 @@ impl AsRef for Connectivity { &self.connection_pool } } + +#[derive(Clone)] +pub struct ConnectivityHealthChecks { + pub bootstrap_nodes: BootstrapNodesHealthCheck, +} + +#[derive(Clone)] +pub struct BootstrapNodesHealthCheck { + bootstrap_nodes_statuses: Arc>>, +} + +impl BootstrapNodesHealthCheck { + pub fn new(bootstrap_nodes: Vec) -> Self { + let bootstrap_nodes_statuses = bootstrap_nodes + .into_iter() + .map(|addr| (addr, false)) + .collect::>(); + Self { + bootstrap_nodes_statuses: Arc::new(Mutex::new(bootstrap_nodes_statuses)), + } + } + + fn on_bootstrap_disconnected(&self, addresses: Vec) { + let mut guard = self.bootstrap_nodes_statuses.lock(); + for addr in addresses { + guard.insert(addr, false); + } + } + + fn on_bootstrap_connected(&self, addr: Multiaddr) { + let mut guard = self.bootstrap_nodes_statuses.lock(); + guard.insert(addr, true); + } +} + +impl HealthCheck for BootstrapNodesHealthCheck { + fn check(&self) -> eyre::Result<()> { + let guard = self.bootstrap_nodes_statuses.lock(); + for (addr, connected) in guard.iter() { + if !connected { + return Err(eyre::eyre!("Bootstrap {} is not connected", addr)); + } + } + Ok(()) + } +} diff --git a/nox/src/http.rs b/nox/src/http.rs index 1ac541d814..1f71a0a615 100644 --- a/nox/src/http.rs +++ b/nox/src/http.rs @@ -1,6 +1,7 @@ use crate::Versions; use axum::body::Body; use axum::http::header::CONTENT_TYPE; +use axum::response::ErrorResponse; use axum::{ extract::State, http::StatusCode, @@ -8,10 +9,11 @@ use axum::{ routing::get, Json, Router, }; +use health::{HealthCheckRegistry, HealthCheckResult}; use libp2p::PeerId; use prometheus_client::encoding::text::encode; use prometheus_client::registry::Registry; -use serde_json::json; +use serde_json::{json, Value}; use std::net::SocketAddr; use std::sync::Arc; use tokio::sync::Notify; @@ -20,14 +22,17 @@ async fn handler_404() -> impl IntoResponse { (StatusCode::NOT_FOUND, "nothing to see here") } -async fn handle_metrics(State(state): State) -> Response { +async fn handle_metrics(State(state): State) -> axum::response::Result> { let mut buf = String::new(); let registry = state .0 - .registry + .metric_registry .as_ref() - .expect("Registry is not initialized"); - encode(&mut buf, registry).unwrap(); //TODO: fix unwrap + .ok_or((StatusCode::NOT_FOUND, "nothing to see here"))?; + encode(&mut buf, registry).map_err(|e| { + tracing::warn!("Could not encode metrics: {}", e); + ErrorResponse::from(StatusCode::INTERNAL_SERVER_ERROR) + })?; let body = Body::from(buf); Response::builder() @@ -36,7 +41,10 @@ async fn handle_metrics(State(state): State) -> Response { "application/openmetrics-text; version=1.0.0; charset=utf-8", ) .body(body) - .unwrap() //TODO: fix unwrap + .map_err(|e| { + tracing::warn!("Could not create response: {}", e); + ErrorResponse::from(StatusCode::INTERNAL_SERVER_ERROR) + }) } async fn handle_peer_id(State(state): State) -> Response { @@ -61,25 +69,56 @@ async fn handle_versions(State(state): State) -> Response { .into_response() } -#[derive(Debug, Clone)] +async fn handle_health(State(state): State) -> axum::response::Result { + fn make_json(keys: Vec, status: &str) -> Vec { + keys.into_iter().map(|k| json!({k: status})).collect() + } + + let registry = state + .0 + .health_registry + .as_ref() + .ok_or((StatusCode::NOT_FOUND, "nothing to see here"))?; + let result = match registry.check() { + HealthCheckResult::Ok(keys) => { + (StatusCode::OK, Json(make_json(keys, "Ok"))).into_response() + } + HealthCheckResult::Warning(ok, fail) => { + let mut result = make_json(ok, "Ok"); + let mut fail = make_json(fail, "Fail"); + result.append(&mut fail); + (StatusCode::TOO_MANY_REQUESTS, Json(result)).into_response() + } + HealthCheckResult::Fail(keys) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(make_json(keys, "Fail")), + ) + .into_response(), + }; + Ok(result) +} + +#[derive(Clone)] struct RouteState(Arc); -#[derive(Debug)] struct Inner { - registry: Option, + metric_registry: Option, + health_registry: Option, peer_id: PeerId, versions: Versions, } pub async fn start_http_endpoint( listen_addr: SocketAddr, - registry: Option, + metric_registry: Option, + health_registry: Option, peer_id: PeerId, versions: Versions, notify: Arc, ) { let state = RouteState(Arc::new(Inner { - registry, + metric_registry, + health_registry, peer_id, versions, })); @@ -87,6 +126,7 @@ pub async fn start_http_endpoint( .route("/metrics", get(handle_metrics)) .route("/peer_id", get(handle_peer_id)) .route("/versions", get(handle_versions)) + .route("/health", get(handle_health)) .fallback(handler_404) .with_state(state); @@ -134,7 +174,15 @@ mod tests { let notify = Arc::new(Notify::new()); let cloned_notify = notify.clone(); tokio::spawn(async move { - start_http_endpoint(addr, None, PeerId::random(), test_versions(), cloned_notify).await; + start_http_endpoint( + addr, + None, + None, + PeerId::random(), + test_versions(), + cloned_notify, + ) + .await; }); notify.notified().await; @@ -166,7 +214,7 @@ mod tests { let notify = Arc::new(Notify::new()); let cloned_notify = notify.clone(); tokio::spawn(async move { - start_http_endpoint(addr, None, peer_id, test_versions(), cloned_notify).await; + start_http_endpoint(addr, None, None, peer_id, test_versions(), cloned_notify).await; }); notify.notified().await; diff --git a/nox/src/node.rs b/nox/src/node.rs index c8e05385c5..64473a2ad4 100644 --- a/nox/src/node.rs +++ b/nox/src/node.rs @@ -25,6 +25,7 @@ use config_utils::to_peer_id; use connection_pool::{ConnectionPoolApi, ConnectionPoolT}; use fluence_libp2p::build_transport; use futures::{stream::StreamExt, FutureExt}; +use health::HealthCheckRegistry; use key_manager::KeyManager; use libp2p::swarm::SwarmEvent; use libp2p::{ @@ -32,7 +33,6 @@ use libp2p::{ identity::Keypair, PeerId, Swarm, TransportError, }; -#[allow(deprecated)] use libp2p_connection_limits::ConnectionLimits; use libp2p_metrics::{Metrics, Recorder}; use libp2p_swarm::SwarmBuilder; @@ -80,7 +80,8 @@ pub struct Node { spell_events_receiver: mpsc::UnboundedReceiver, sorcerer: Sorcerer, - registry: Option, + metrics_registry: Option, + health_registry: Option, libp2p_metrics: Option>, services_metrics_backend: ServicesMetricsBackend, @@ -133,6 +134,13 @@ impl Node { } else { None }; + + let mut health_registry = if config.health_config.health_check_enabled { + Some(HealthCheckRegistry::default()) + } else { + None + }; + let libp2p_metrics = metrics_registry.as_mut().map(|r| Arc::new(Metrics::new(r))); let connectivity_metrics = metrics_registry.as_mut().map(ConnectivityMetrics::new); let connection_pool_metrics = metrics_registry.as_mut().map(ConnectionPoolMetrics::new); @@ -172,6 +180,7 @@ impl Node { network_config, transport, config.external_addresses(), + health_registry.as_mut(), ); let (particle_failures_out, particle_failures_in) = mpsc::unbounded_channel(); @@ -213,6 +222,7 @@ impl Node { script_storage_api, services_metrics, key_manager.clone(), + health_registry.as_mut(), )); let (effects_out, effects_in) = mpsc::unbounded_channel(); @@ -226,6 +236,7 @@ impl Node { effects_out, plumber_metrics, vm_pool_metrics, + health_registry.as_mut(), key_manager.clone(), ); let effectors = Effectors::new(connectivity.clone()); @@ -328,6 +339,7 @@ impl Node { spell_events_receiver, sorcerer, metrics_registry, + health_registry, libp2p_metrics, services_metrics_backend, config.http_listen_addr(), @@ -343,13 +355,14 @@ impl Node { network_config: NetworkConfig, transport: Boxed<(PeerId, StreamMuxerBox)>, external_addresses: Vec, + health_registry: Option<&mut HealthCheckRegistry>, ) -> ( Swarm, Connectivity, mpsc::Receiver, ) { let (behaviour, connectivity, particle_stream) = - FluenceNetworkBehaviour::new(network_config); + FluenceNetworkBehaviour::new(network_config, health_registry); let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build(); @@ -367,6 +380,7 @@ impl Node { script_storage_api: ScriptStorageApi, services_metrics: ServicesMetrics, key_manager: KeyManager, + health_registry: Option<&mut HealthCheckRegistry>, ) -> Builtins { Builtins::new( connectivity, @@ -374,6 +388,7 @@ impl Node { services_config, services_metrics, key_manager, + health_registry, ) } } @@ -394,7 +409,8 @@ impl Node { spell_event_bus: SpellEventBus, spell_events_receiver: mpsc::UnboundedReceiver, sorcerer: Sorcerer, - registry: Option, + metrics_registry: Option, + health_registry: Option, libp2p_metrics: Option>, services_metrics_backend: ServicesMetricsBackend, http_listen_addr: Option, @@ -419,7 +435,8 @@ impl Node { spell_events_receiver, sorcerer, - registry, + metrics_registry, + health_registry, libp2p_metrics, services_metrics_backend, http_listen_addr, @@ -447,7 +464,8 @@ impl Node { let spell_event_bus = self.spell_event_bus; let spell_events_receiver = self.spell_events_receiver; let sorcerer = self.sorcerer; - let registry = self.registry; + let metrics_registry = self.metrics_registry; + let health_registry = self.health_registry; let services_metrics_backend = self.services_metrics_backend; let http_listen_addr = self.http_listen_addr; let task_name = format!("node-{peer_id}"); @@ -458,7 +476,7 @@ impl Node { task::Builder::new().name(&task_name.clone()).spawn(async move { let mut http_server = if let Some(http_listen_addr) = http_listen_addr{ log::info!("Starting http endpoint at {}", http_listen_addr); - start_http_endpoint(http_listen_addr, registry, peer_id, versions, Arc::new(Notify::new())).boxed() + start_http_endpoint(http_listen_addr, metrics_registry, health_registry, peer_id, versions, Arc::new(Notify::new())).boxed() } else { futures::future::pending().boxed() }; diff --git a/particle-builtins/Cargo.toml b/particle-builtins/Cargo.toml index 2998d70765..4499ad313c 100644 --- a/particle-builtins/Cargo.toml +++ b/particle-builtins/Cargo.toml @@ -47,6 +47,7 @@ fluence-app-service = { workspace = true } async-trait = { version = "0.1.71" } eyre = { workspace = true } base64 = { workspace = true } +health = { workspace = true } [dev-dependencies] proptest = "1.2.0" diff --git a/particle-builtins/src/builtins.rs b/particle-builtins/src/builtins.rs index 246c055c00..2e537d08ee 100644 --- a/particle-builtins/src/builtins.rs +++ b/particle-builtins/src/builtins.rs @@ -34,6 +34,7 @@ use tokio::sync::RwLock; use JValue::Array; use connection_pool::{ConnectionPoolApi, ConnectionPoolT}; +use health::HealthCheckRegistry; use kademlia::{KademliaApi, KademliaApiT}; use key_manager::KeyManager; use now_millis::{now_ms, now_sec}; @@ -107,6 +108,7 @@ where config: ServicesConfig, services_metrics: ServicesMetrics, key_manager: KeyManager, + health_registry: Option<&mut HealthCheckRegistry>, ) -> Self { let modules_dir = &config.modules_dir; let blueprint_dir = &config.blueprint_dir; @@ -123,7 +125,12 @@ where let management_peer_id = config.management_peer_id; let builtins_management_peer_id = config.builtins_management_peer_id; let local_peer_id = config.local_peer_id; - let services = ParticleAppServices::new(config, modules.clone(), Some(services_metrics)); + let services = ParticleAppServices::new( + config, + modules.clone(), + Some(services_metrics), + health_registry, + ); Self { connectivity, diff --git a/particle-services/Cargo.toml b/particle-services/Cargo.toml index 769c7a50d6..0474f43da4 100644 --- a/particle-services/Cargo.toml +++ b/particle-services/Cargo.toml @@ -31,6 +31,7 @@ thiserror = { workspace = true } derivative = { workspace = true } eyre = { workspace = true } humantime-serde = { workspace = true } +health = { workspace = true } [dev-dependencies] tempdir = "0.3.7" diff --git a/particle-services/src/app_services.rs b/particle-services/src/app_services.rs index 06e6e41ed6..6b81ebacaf 100644 --- a/particle-services/src/app_services.rs +++ b/particle-services/src/app_services.rs @@ -29,6 +29,7 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Value as JValue}; use fluence_libp2p::{peerid_serializer, PeerId}; +use health::{HealthCheck, HealthCheckRegistry}; use now_millis::now_ms; use particle_args::{Args, JError}; use particle_execution::{FunctionOutcome, ParticleParams, ParticleVault}; @@ -174,6 +175,7 @@ pub struct ParticleAppServices { management_peer_id: PeerId, builtins_management_peer_id: PeerId, pub metrics: Option, + health: Option, } /// firstly, try to find by alias in worker scope, secondly, in root scope @@ -254,11 +256,17 @@ impl ParticleAppServices { config: ServicesConfig, modules: ModuleRepository, metrics: Option, + health_registry: Option<&mut HealthCheckRegistry>, ) -> Self { let vault = ParticleVault::new(config.particles_vault_dir.clone()); let management_peer_id = config.management_peer_id; let builtins_management_peer_id = config.builtins_management_peer_id; - let this = Self { + let health = health_registry.map(|registry| { + let persisted_services = PersistedServiceHealthCheck::new(); + registry.register("persisted_services", persisted_services.clone()); + persisted_services + }); + let mut this = Self { config, vault, services: <_>::default(), @@ -267,6 +275,7 @@ impl ParticleAppServices { management_peer_id, builtins_management_peer_id, metrics, + health, }; this.create_persisted_services(); @@ -814,10 +823,15 @@ impl ParticleAppServices { Ok(stats) } - fn create_persisted_services(&self) { + fn create_persisted_services(&mut self) { let services = - load_persisted_services(&self.config.services_dir, self.config.local_peer_id) - .into_iter(); + load_persisted_services(&self.config.services_dir, self.config.local_peer_id); + let service_count = services.len(); + if let Some(h) = self.health.as_mut() { + h.start_loading() + } + let services = services.into_iter(); + let services = services.filter_map(|r| match r { Ok(service) => service.into(), Err(err) => { @@ -826,6 +840,7 @@ impl ParticleAppServices { } }); + let mut loaded_service_count = 0; for s in services { let worker_id = s.worker_id.expect("every service must have worker id"); let start = Instant::now(); @@ -881,7 +896,7 @@ impl ParticleAppServices { replaced.is_none(), "shouldn't replace any existing services" ); - + loaded_service_count += 1; log::info!( "Persisted service {} created in {}, aliases: {:?}", s.service_id, @@ -889,6 +904,11 @@ impl ParticleAppServices { s.aliases ); } + if loaded_service_count != service_count { + if let Some(h) = self.health.as_mut() { + h.mark_has_errors() + } + } } fn create_service_inner( @@ -1384,3 +1404,46 @@ mod tests { // - list_services // - test on service persisting } + +#[derive(Debug, Clone)] +pub struct PersistedServiceHealthCheck { + started: Arc>, + has_errors: Arc>, +} + +impl PersistedServiceHealthCheck { + pub fn new() -> Self { + PersistedServiceHealthCheck { + started: Arc::new(Mutex::new(false)), + has_errors: Arc::new(Mutex::new(false)), + } + } + + pub fn start_loading(&mut self) { + let mut guard = self.started.lock(); + *guard = true; + } + + pub fn mark_has_errors(&mut self) { + let mut guard = self.has_errors.lock(); + *guard = true; + } +} + +impl HealthCheck for PersistedServiceHealthCheck { + fn check(&self) -> eyre::Result<()> { + let started_guard = self.started.lock(); + let errors_guard = self.has_errors.lock(); + let started = *started_guard; + if started { + let has_errors = *errors_guard; + if has_errors { + Err(eyre::eyre!("Persisted services loading failed")) + } else { + Ok(()) + } + } else { + Err(eyre::eyre!("Not loaded yet")) + } + } +}