Skip to content

Commit

Permalink
feat(http): add healthcheck endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu committed Jul 26, 2023
1 parent 655d3ec commit 0f11ca4
Show file tree
Hide file tree
Showing 19 changed files with 399 additions and 40 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ members = [
"crates/peer-metrics",
"crates/spell-event-bus",
"crates/key-manager",
"crates/health",
"sorcerer",
"crates/nox-tests",
"nox",
Expand Down Expand Up @@ -86,6 +87,7 @@ script-storage = { path = "script-storage" }
spell-storage = { path = "spell-storage" }
particle-execution = { path = "particle-execution" }
system-services = { path = "crates/system-services" }
health = { path = "crates/health" }

# spell
fluence-spell-dtos = "=0.5.16"
Expand Down
1 change: 1 addition & 0 deletions aquamarine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ anyhow = "1.0.72"
eyre = { workspace = true }
bytesize = "1.2.0"
async-trait = "0.1.71"
health = { workspace = true }
9 changes: 8 additions & 1 deletion aquamarine/src/aquamarine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use fluence_libp2p::PeerId;
use health::HealthCheckRegistry;
use key_manager::KeyManager;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::Particle;
Expand Down Expand Up @@ -53,12 +54,18 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
out: EffectsChannel,
plumber_metrics: Option<ParticleExecutorMetrics>,
vm_pool_metrics: Option<VmPoolMetrics>,
health_registry: Option<&mut HealthCheckRegistry>,
key_manager: KeyManager,
) -> (Self, AquamarineApi) {
// TODO: make `100` configurable
let (outlet, inlet) = mpsc::channel(100);
let sender = AquamarineApi::new(outlet, config.execution_timeout);
let vm_pool = VmPool::new(config.pool_size, runtime_config, vm_pool_metrics);
let vm_pool = VmPool::new(
config.pool_size,
runtime_config,
vm_pool_metrics,
health_registry,
);
let host_peer_id = key_manager.get_host_peer_id();
let plumber = Plumber::new(vm_pool, builtins, plumber_metrics, key_manager);
let this = Self {
Expand Down
53 changes: 52 additions & 1 deletion aquamarine/src/vm_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
* limitations under the License.
*/

use std::sync::Arc;
use std::task::{Context, Poll};

use futures::{future::BoxFuture, FutureExt};
use health::{HealthCheck, HealthCheckRegistry};
use parking_lot::Mutex;

use peer_metrics::VmPoolMetrics;

Expand All @@ -39,6 +42,7 @@ pub struct VmPool<RT: AquaRuntime> {
runtime_config: RT::Config,
pool_size: usize,
metrics: Option<VmPoolMetrics>,
health: Option<VMPoolHealCheck>,
}

impl<RT: AquaRuntime> VmPool<RT> {
Expand All @@ -47,13 +51,21 @@ impl<RT: AquaRuntime> VmPool<RT> {
pool_size: usize,
runtime_config: RT::Config,
metrics: Option<VmPoolMetrics>,
health_registry: Option<&mut HealthCheckRegistry>,
) -> Self {
let health = health_registry.map(|registry| {
let vm_pool = VMPoolHealCheck::new(pool_size);
registry.register("vm_pool", vm_pool.clone());
vm_pool
});

let mut this = Self {
runtimes: Vec::with_capacity(pool_size),
creating_runtimes: None,
runtime_config,
pool_size,
metrics,
health,
};

this.runtimes.resize_with(pool_size, || None);
Expand Down Expand Up @@ -161,7 +173,10 @@ impl<RT: AquaRuntime> VmPool<RT> {

// Put created vm to self.vms
match vm {
Ok(vm) => vms[id] = Some(vm),
Ok(vm) => {
vms[id] = Some(vm);
self.health.as_ref().map(|h| h.increment_count());
}
Err(err) => log::error!("Failed to create vm: {:?}", err), // TODO: don't panic
}

Expand All @@ -175,3 +190,39 @@ impl<RT: AquaRuntime> VmPool<RT> {
}
}
}

#[derive(Clone)]
pub struct VMPoolHealCheck {
expected_count: usize,
current_count: Arc<Mutex<usize>>,
}

impl VMPoolHealCheck {
pub fn new(expected_count: usize) -> Self {
Self {
expected_count,
current_count: Arc::new(Mutex::new(0)),
}
}

pub fn increment_count(&self) {
let mut guard = self.current_count.lock();
*guard += 1;
}
}

impl HealthCheck for VMPoolHealCheck {
fn check(&self) -> eyre::Result<()> {
let guard = self.current_count.lock();
let current = *guard;
if self.expected_count != current {
return Err(eyre::eyre!(
"VM pool isn't full. Current: {}, Expected: {}",
current,
self.expected_count
));
}

Ok(())
}
}
10 changes: 10 additions & 0 deletions crates/health/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "health"
version = "0.1.0"
authors = ["Fluence Labs"]
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
eyre = { workspace = true }
51 changes: 51 additions & 0 deletions crates/health/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
pub trait HealthCheck: Send + Sync + 'static {
fn check(&self) -> eyre::Result<()>;
}

pub struct HealthCheckRegistry {
checks: Vec<(String, Box<dyn HealthCheck>)>,
}

pub enum HealthCheckResult {
Ok(Vec<String>),
Warning(Vec<String>, Vec<String>),
Fail(Vec<String>),
}

impl HealthCheckRegistry {
pub fn new() -> Self {
HealthCheckRegistry { checks: Vec::new() }
}

pub fn register(&mut self, name: &str, check: impl HealthCheck) {
self.checks.push((name.to_string(), Box::new(check)));
}

pub fn check(&self) -> HealthCheckResult {
let mut fails = Vec::new();
let mut oks = Vec::new();

for (name, check) in &self.checks {
match check.check() {
Ok(_) => oks.push(name.clone()),
Err(_) => {
fails.push(name.clone());
}
}
}

if fails.is_empty() {
HealthCheckResult::Ok(oks)
} else if fails.len() == self.checks.len() {
HealthCheckResult::Fail(fails)
} else {
HealthCheckResult::Warning(oks, fails)
}
}
}

impl Default for HealthCheckRegistry {
fn default() -> Self {
HealthCheckRegistry::new()
}
}
4 changes: 4 additions & 0 deletions crates/server-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ pub fn default_metrics_enabled() -> bool {
true
}

pub fn default_health_check_enabled() -> bool {
true
}

pub fn default_services_metrics_timer_resolution() -> Duration {
Duration::from_secs(60)
}
Expand Down
1 change: 0 additions & 1 deletion crates/server-config/src/network_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub struct NetworkConfig {
pub bootstrap_frequency: usize,
pub connectivity_metrics: Option<ConnectivityMetrics>,
pub connection_pool_metrics: Option<ConnectionPoolMetrics>,
#[allow(deprecated)]
pub connection_limits: ConnectionLimits,
}

Expand Down
13 changes: 13 additions & 0 deletions crates/server-config/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ pub struct UnresolvedNodeConfig {
#[serde(flatten)]
pub metrics_config: MetricsConfig,

#[serde(flatten)]
pub health_config: HealthConfig,

#[serde(flatten)]
pub http_config: Option<HttpConfig>,

Expand Down Expand Up @@ -178,6 +181,7 @@ impl UnresolvedNodeConfig {
external_address: self.external_address,
external_multiaddresses: self.external_multiaddresses,
metrics_config: self.metrics_config,
health_config: self.health_config,
bootstrap_config: self.bootstrap_config,
root_weights: self.root_weights,
services_envs: self.services_envs,
Expand Down Expand Up @@ -304,6 +308,8 @@ pub struct NodeConfig {

pub metrics_config: MetricsConfig,

pub health_config: HealthConfig,

pub bootstrap_config: BootstrapConfig,

pub root_weights: HashMap<PeerIdSerializable, u32>,
Expand Down Expand Up @@ -399,6 +405,13 @@ pub struct MetricsConfig {
pub max_builtin_metrics_storage_size: usize,
}

#[derive(Clone, Deserialize, Serialize, Derivative)]
#[derivative(Debug)]
pub struct HealthConfig {
#[serde(default = "default_health_check_enabled")]
pub health_check_enabled: bool,
}

#[derive(Clone, Deserialize, Serialize, Derivative)]
#[derivative(Debug)]
pub struct ListenConfig {
Expand Down
1 change: 1 addition & 0 deletions nox/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ connection-pool = { workspace = true }
script-storage = { workspace = true }
aquamarine = { workspace = true }
sorcerer = { workspace = true }
health = { workspace = true }
dhat = { version = "0.3.2", optional = true }

serde_json = { workspace = true }
Expand Down
18 changes: 16 additions & 2 deletions nox/src/behaviour/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ use libp2p::{
use tokio::sync::mpsc;

use connection_pool::ConnectionPoolBehaviour;
use health::HealthCheckRegistry;
use kademlia::{Kademlia, KademliaConfig};
use particle_protocol::{Particle, PROTOCOL_NAME};
use server_config::NetworkConfig;

use crate::connectivity::Connectivity;
use crate::connectivity::{BootstrapNodesHealthCheck, Connectivity, ConnectivityHealthChecks};

/// Coordinates protocols, so they can cooperate
#[derive(NetworkBehaviour)]
Expand All @@ -40,7 +41,10 @@ pub struct FluenceNetworkBehaviour {
}

impl FluenceNetworkBehaviour {
pub fn new(cfg: NetworkConfig) -> (Self, Connectivity, mpsc::Receiver<Particle>) {
pub fn new(
cfg: NetworkConfig,
health_registry: Option<&mut HealthCheckRegistry>,
) -> (Self, Connectivity, mpsc::Receiver<Particle>) {
let local_public_key = cfg.key_pair.public();
let identify = Identify::new(
IdentifyConfig::new(PROTOCOL_NAME.into(), local_public_key)
Expand Down Expand Up @@ -71,13 +75,23 @@ impl FluenceNetworkBehaviour {
ping,
};

let bootstrap_nodes = cfg.bootstrap_nodes.clone();

let health = health_registry.map(|registry| {
let bootstrap_nodes = BootstrapNodesHealthCheck::new(bootstrap_nodes);
registry.register("bootstrap_nodes", bootstrap_nodes.clone());

ConnectivityHealthChecks { bootstrap_nodes }
});

let connectivity = Connectivity {
peer_id: cfg.local_peer_id,
kademlia: kademlia_api,
connection_pool: connection_pool_api,
bootstrap_nodes: cfg.bootstrap_nodes.into_iter().collect(),
bootstrap_frequency: cfg.bootstrap_frequency,
metrics: cfg.connectivity_metrics,
health,
};

(this, connectivity, particle_stream)
Expand Down
Loading

0 comments on commit 0f11ca4

Please sign in to comment.