diff --git a/aquamarine/src/vm_pool.rs b/aquamarine/src/vm_pool.rs index 703ba0aab5..afa49b218d 100644 --- a/aquamarine/src/vm_pool.rs +++ b/aquamarine/src/vm_pool.rs @@ -162,6 +162,7 @@ impl VmPool { pub fn poll(&mut self, cx: &mut Context<'_>) { let creating_vms = match &mut self.creating_runtimes { None => { + tracing::debug!("Starting creation {} AVMs", self.pool_size); self.creating_runtimes = Some( (0..self.pool_size) .map(|id| (id, self.create_avm(cx))) diff --git a/crates/connected-client/src/connected_client.rs b/crates/connected-client/src/connected_client.rs index c61ae9ef72..5287d9f5a0 100644 --- a/crates/connected-client/src/connected_client.rs +++ b/crates/connected-client/src/connected_client.rs @@ -28,7 +28,7 @@ use particle_protocol::Particle; use serde_json::{Value as JValue, Value}; use tempfile::TempDir; use test_constants::{ - IDLE_CONNECTION_TIMEOUT, KAD_TIMEOUT, PARTICLE_TTL, SHORT_TIMEOUT, TIMEOUT, TRANSPORT_TIMEOUT, + IDLE_CONNECTION_TIMEOUT, PARTICLE_TTL, SHORT_TIMEOUT, TIMEOUT, TRANSPORT_TIMEOUT, }; use crate::client::Client; @@ -43,7 +43,6 @@ pub struct ConnectedClient { pub node_address: Multiaddr, pub timeout: Duration, pub short_timeout: Duration, - pub kad_timeout: Duration, pub local_vm: tokio::sync::OnceCell>, pub data_store: Arc, pub particle_ttl: Duration, @@ -59,10 +58,6 @@ impl ConnectedClient { self.short_timeout } - pub fn kad_timeout(&self) -> Duration { - self.kad_timeout - } - pub fn particle_ttl(&self) -> Duration { self.particle_ttl } @@ -201,7 +196,6 @@ impl ConnectedClient { node_address, timeout: TIMEOUT, short_timeout: SHORT_TIMEOUT, - kad_timeout: KAD_TIMEOUT, local_vm, data_store, particle_ttl: particle_ttl.unwrap_or(Duration::from_millis(PARTICLE_TTL as u64)), diff --git a/crates/created-swarm/src/swarm.rs b/crates/created-swarm/src/swarm.rs index 76f86d955d..8b94333910 100644 --- a/crates/created-swarm/src/swarm.rs +++ b/crates/created-swarm/src/swarm.rs @@ -17,11 +17,12 @@ use std::convert::identity; use std::net::SocketAddr; use std::sync::Arc; +use std::thread::available_parallelism; use std::{path::PathBuf, time::Duration}; use derivative::Derivative; use fluence_keypair::KeyPair; -use futures::{FutureExt, StreamExt}; +use futures::{stream, FutureExt, StreamExt}; use libp2p::core::multiaddr::Protocol; use libp2p::{core::Multiaddr, PeerId}; use serde::Deserialize; @@ -33,7 +34,7 @@ use base64::{engine::general_purpose::STANDARD as base64, Engine}; use fluence_libp2p::random_multiaddr::{create_memory_maddr, create_tcp_maddr}; use fluence_libp2p::Transport; use fs_utils::to_abs_path; -use futures::future::{join_all, BoxFuture}; +use futures::future::BoxFuture; use futures::stream::iter; use nox::{Connectivity, Node}; use particle_protocol::ProtocolConfig; @@ -48,6 +49,8 @@ use tracing::{Instrument, Span}; const HEALTH_CHECK_POLLING_INTERVAL: Duration = Duration::from_millis(100); +// default bound on the number of computations it can perform simultaneously +const DEFAULT_PARALLELISM: usize = 2; #[allow(clippy::upper_case_acronyms)] type AVM = aquamarine::AVMRunner; @@ -77,11 +80,11 @@ pub async fn make_swarms(n: usize) -> Vec { pub async fn make_swarms_with_cfg(n: usize, mut update_cfg: F) -> Vec where - F: FnMut(SwarmConfig) -> SwarmConfig, + F: (FnMut(SwarmConfig) -> SwarmConfig) + 'static + Send, { make_swarms_with( n, - |bs, maddr| create_swarm(update_cfg(SwarmConfig::new(bs, maddr))).boxed(), + move |bs, maddr| create_swarm(update_cfg(SwarmConfig::new(bs, maddr))).boxed(), create_memory_maddr, identity, true, @@ -96,7 +99,7 @@ pub async fn make_swarms_with_transport_and_mocked_vm( make_swarms_with::( n, |bs, maddr| create_swarm_with_runtime(SwarmConfig::new(bs, maddr), |_| None).boxed(), - || match transport { + move || match transport { Transport::Memory => create_memory_maddr(), Transport::Network => create_tcp_maddr(), }, @@ -113,12 +116,12 @@ pub async fn make_swarms_with_mocked_vm( bootstraps: B, ) -> Vec where - F: FnMut(SwarmConfig) -> SwarmConfig, - B: FnMut(Vec) -> Vec, + F: (FnMut(SwarmConfig) -> SwarmConfig) + 'static + Send, + B: (FnMut(Vec) -> Vec) + 'static + Send, { make_swarms_with::( n, - |bs, maddr| { + move |bs, maddr| { create_swarm_with_runtime(update_cfg(SwarmConfig::new(bs, maddr)), move |_| delay) .boxed() }, @@ -134,7 +137,7 @@ pub async fn make_swarms_with_keypair( keypair: KeyPair, spell_base_dir: Option, ) -> Vec { - make_swarms_with_cfg(n, |mut cfg| { + make_swarms_with_cfg(n, move |mut cfg| { cfg.keypair = keypair.clone(); cfg.spell_base_dir = spell_base_dir.clone().map(PathBuf::from); cfg @@ -150,65 +153,65 @@ pub async fn make_swarms_with( wait_connected: bool, ) -> Vec where - F: FnMut( - Vec, - Multiaddr, - ) -> BoxFuture<'static, (PeerId, Box>, KeyPair, SwarmConfig, Span)>, - M: FnMut() -> Multiaddr, - B: FnMut(Vec) -> Vec, + F: (FnMut( + Vec, + Multiaddr, + ) -> BoxFuture<'static, (PeerId, Box>, KeyPair, SwarmConfig, Span)>) + + 'static + + Send, + M: (FnMut() -> Multiaddr) + 'static + Send, + B: (FnMut(Vec) -> Vec) + 'static + Send, { let addrs = (0..n).map(|_| create_maddr()).collect::>(); - let nodes = addrs - .iter() + + let parallelism = available_parallelism() + .map(|x| x.get()) + .unwrap_or(DEFAULT_PARALLELISM); + + let nodes: Vec = stream::iter(addrs.clone()) .map(|addr| { - let addrs = addrs - .iter() - .filter(|&a| a != addr) - .cloned() - .collect::>(); + let addrs: Vec = addrs.clone().into_iter().filter(|a| a != &addr).collect(); let bootstraps = bootstraps(addrs); - create_node(bootstraps, addr.clone()) + let create_node_future = create_node(bootstraps, addr.clone()); + async move { + let (peer_id, node, management_keypair, config, span) = create_node_future.await; + let connectivity = node.connectivity.clone(); + let aquamarine_api = node.aquamarine_api.clone(); + let started_node = node + .start(peer_id) + .instrument(span) + .await + .expect("node start"); + let http_listen_addr = started_node + .http_listen_addr + .expect("could not take http listen addr"); + CreatedSwarm { + peer_id, + multiaddr: config.listen_on, + tmp_dir: config.tmp_dir.clone(), + management_keypair, + exit_outlet: started_node.exit_outlet, + connectivity, + aquamarine_api, + http_listen_addr, + } + } + .boxed() }) - .collect::>(); + .buffer_unordered(parallelism) + .collect() + .await; // start all nodes - let infos = join_all(nodes.into_iter().map(move |tasks| { - async { - let (peer_id, node, management_keypair, config, span) = tasks.await; - let connectivity = node.connectivity.clone(); - let aquamarine_api = node.aquamarine_api.clone(); - let started_node = node - .start(peer_id) - .instrument(span) - .await - .expect("node start"); - let http_listen_addr = started_node - .http_listen_addr - .expect("could not take http listen addr"); - CreatedSwarm { - peer_id, - multiaddr: config.listen_on, - tmp_dir: config.tmp_dir.clone(), - management_keypair, - exit_outlet: started_node.exit_outlet, - connectivity, - aquamarine_api, - http_listen_addr, - } - } - .boxed() - })) - .await; - if wait_connected { - let addrs = infos + let addrs = nodes .iter() .map(|info| info.http_listen_addr) .collect::>(); wait_connected_on_addrs(addrs).await; } - infos + nodes } async fn wait_connected_on_addrs(addrs: Vec) { @@ -323,12 +326,15 @@ pub async fn create_swarm_with_runtime( let peer_id = libp2p::identity::Keypair::from(config.keypair.clone()) .public() .to_peer_id(); - let spawn = tracing::info_span!("Node", peer_id = peer_id.to_base58()); + let parent_span = tracing::info_span!("Node", peer_id = peer_id.to_base58()); + let config_apply_span = tracing::info_span!(parent: &parent_span, "config"); + let node_listen_span = tracing::info_span!(parent: &parent_span, "config"); + let node_creation_span = tracing::info_span!(parent: &parent_span, "config"); - let tmp_dir = config.tmp_dir.path().to_path_buf(); - let _enter = spawn.enter(); + let (node, management_kp) = config_apply_span.in_scope(||{ + let tmp_dir = config.tmp_dir.path().to_path_buf(); - let node_config = json!({ + let node_config = json!({ "base_dir": tmp_dir.to_string_lossy(), "root_key_pair": { "format": format, @@ -349,87 +355,93 @@ pub async fn create_swarm_with_runtime( "cc_events_dir": config.cc_events_dir, }); - let node_config: UnresolvedConfig = - UnresolvedConfig::deserialize(node_config).expect("created_swarm: deserialize config"); - - let mut resolved = node_config.resolve().expect("failed to resolve config"); + let node_config: UnresolvedConfig = + UnresolvedConfig::deserialize(node_config).expect("created_swarm: deserialize config"); - resolved.node_config.transport_config.transport = Transport::Memory; - resolved.node_config.transport_config.socket_timeout = TRANSPORT_TIMEOUT; - resolved.node_config.protocol_config = - ProtocolConfig::new(TRANSPORT_TIMEOUT, TRANSPORT_TIMEOUT); + let mut resolved = node_config.resolve().expect("failed to resolve config"); - resolved.node_config.bootstrap_nodes = config.bootstraps.clone(); - resolved.node_config.bootstrap_config = BootstrapConfig::zero(); - resolved.node_config.bootstrap_frequency = 1; + resolved.node_config.transport_config.transport = Transport::Memory; + resolved.node_config.transport_config.socket_timeout = TRANSPORT_TIMEOUT; + resolved.node_config.protocol_config = + ProtocolConfig::new(TRANSPORT_TIMEOUT, TRANSPORT_TIMEOUT); - resolved.metrics_config.metrics_enabled = false; + resolved.node_config.bootstrap_nodes = config.bootstraps.clone(); + resolved.node_config.bootstrap_config = BootstrapConfig::zero(); + resolved.node_config.bootstrap_frequency = 1; - resolved.node_config.allow_local_addresses = true; + resolved.metrics_config.metrics_enabled = false; - resolved.node_config.aquavm_pool_size = config.pool_size.unwrap_or(1); - resolved.node_config.particle_execution_timeout = EXECUTION_TIMEOUT; + resolved.node_config.allow_local_addresses = true; - resolved.node_config.allowed_binaries = config.allowed_binaries.clone(); + resolved.node_config.aquavm_pool_size = config.pool_size.unwrap_or(1); + resolved.node_config.particle_execution_timeout = EXECUTION_TIMEOUT; - if let Some(config) = config.override_system_services_config.clone() { - resolved.system_services = config; - } - // `enable_system_services` has higher priority then `enable` field of the SystemServicesConfig - resolved.system_services.enable = config - .enabled_system_services - .iter() - .map(|service| { - system_services_config::ServiceKey::from_string(service) - .unwrap_or_else(|| panic!("service {service} doesn't exist")) - }) - .collect(); + resolved.node_config.allowed_binaries = config.allowed_binaries.clone(); - if let Some(endpoint) = config.connector_api_endpoint.clone() { - resolved.system_services.decider.network_api_endpoint = endpoint; - } + if let Some(config) = config.override_system_services_config.clone() { + resolved.system_services = config; + } + // `enable_system_services` has higher priority then `enable` field of the SystemServicesConfig + resolved.system_services.enable = config + .enabled_system_services + .iter() + .map(|service| { + system_services_config::ServiceKey::from_string(service) + .unwrap_or_else(|| panic!("service {service} doesn't exist")) + }) + .collect(); + + if let Some(endpoint) = config.connector_api_endpoint.clone() { + resolved.system_services.decider.network_api_endpoint = endpoint; + } - let management_kp = fluence_keypair::KeyPair::generate_ed25519(); - let management_peer_id = libp2p::identity::Keypair::from(management_kp.clone()) - .public() - .to_peer_id(); - resolved.node_config.management_peer_id = management_peer_id; - resolved.chain_listener_config = config.chain_listener.clone(); - - let vm_config = vm_config(BaseVmConfig { - peer_id, - tmp_dir: tmp_dir.clone(), - listen_on: config.listen_on.clone(), - manager: management_peer_id, + let management_kp = fluence_keypair::KeyPair::generate_ed25519(); + let management_peer_id = libp2p::identity::Keypair::from(management_kp.clone()) + .public() + .to_peer_id(); + resolved.node_config.management_peer_id = management_peer_id; + resolved.chain_listener_config = config.chain_listener.clone(); + + let vm_config = vm_config(BaseVmConfig { + peer_id, + tmp_dir: tmp_dir.clone(), + listen_on: config.listen_on.clone(), + manager: management_peer_id, + }); + + let data_store_config = DataStoreConfig::new(tmp_dir.clone()); + + let system_services_config = resolved.system_services.clone(); + let system_service_distros = + system_services::SystemServiceDistros::default_from(system_services_config) + .expect("Failed to get default system service distros") + .extend(config.extend_system_services.clone()); + let node = Node::new( + resolved, + vm_config, + data_store_config, + "some version", + "some version", + system_service_distros, + ); + (node, management_kp) }); - let data_store_config = DataStoreConfig::new(tmp_dir.clone()); - - let system_services_config = resolved.system_services.clone(); - let system_service_distros = - system_services::SystemServiceDistros::default_from(system_services_config) - .expect("Failed to get default system service distros") - .extend(config.extend_system_services.clone()); - - let mut node = Node::new( - resolved, - vm_config, - data_store_config, - "some version", - "some version", - system_service_distros, - ) - .await - .expect("create node"); - node.listen(vec![config.listen_on.clone()]).expect("listen"); - - ( - node.scope.get_host_peer_id(), - node, - management_kp, - config, - spawn.clone(), - ) + let mut node = node + .instrument(node_creation_span) + .await + .expect("create node"); + + node_listen_span.in_scope(|| { + node.listen(vec![config.listen_on.clone()]).expect("listen"); + ( + node.scope.get_host_peer_id(), + node, + management_kp, + config, + parent_span.clone(), + ) + }) } pub async fn create_swarm( diff --git a/crates/nox-tests/tests/builtin.rs b/crates/nox-tests/tests/builtin.rs index ad12d131a4..944c2b20da 100644 --- a/crates/nox-tests/tests/builtin.rs +++ b/crates/nox-tests/tests/builtin.rs @@ -2279,7 +2279,7 @@ async fn aliases_restart() { .map(|s| s.exit_outlet.send(())) .for_each(drop); - let swarms = make_swarms_with_cfg(1, |mut cfg| { + let swarms = make_swarms_with_cfg(1, move |mut cfg| { cfg.keypair = kp.clone(); cfg.tmp_dir = tmp_dir.clone(); cfg @@ -2370,7 +2370,7 @@ async fn subnet_resolve() { .with_body("invalid mock was hit. Check that request body matches 'match_body' clause'") .create(); - let swarms = make_swarms_with_cfg(1, |mut cfg| { + let swarms = make_swarms_with_cfg(1, move |mut cfg| { cfg.connector_api_endpoint = Some(url.clone()); cfg }) diff --git a/crates/nox-tests/tests/chain_listener.rs b/crates/nox-tests/tests/chain_listener.rs index d7b775f039..3774d883a4 100644 --- a/crates/nox-tests/tests/chain_listener.rs +++ b/crates/nox-tests/tests/chain_listener.rs @@ -83,13 +83,14 @@ async fn test_chain_listener_cc() { let (addr, server) = run_server().await.unwrap(); let url = format!("ws://{}", addr); let events_dir = TempDir::new().unwrap(); - let _swarm = make_swarms_with_cfg(1, |mut cfg| { + let cc_events_dir = events_dir.path().to_path_buf(); + let _swarm = make_swarms_with_cfg(1, move |mut cfg| { cfg.chain_listener = Some(ChainListenerConfig { ws_endpoint: url.clone(), cc_contract_address: "".to_string(), }); - cfg.cc_events_dir = Some(events_dir.path().to_path_buf()); + cfg.cc_events_dir = Some(cc_events_dir.clone()); cfg }) .await; diff --git a/crates/nox-tests/tests/network/network_explore.rs b/crates/nox-tests/tests/network/network_explore.rs index d85ca0ae31..bbae59c171 100644 --- a/crates/nox-tests/tests/network/network_explore.rs +++ b/crates/nox-tests/tests/network/network_explore.rs @@ -14,7 +14,6 @@ * limitations under the License. */ use std::str::FromStr; -use std::thread::sleep; use std::time::{Duration, Instant}; use base64::{engine::general_purpose::STANDARD as base64, Engine}; @@ -30,7 +29,6 @@ use serde::Deserialize; use serde_json::json; use serde_json::Value as JValue; use service_modules::{load_module, module_config, Hash}; -use test_constants::KAD_TIMEOUT; use test_utils::{create_service, timeout}; use crate::network::join_stream; @@ -252,8 +250,6 @@ async fn explore_services_heavy() { enable_logs(); let swarms = make_swarms(5).await; - tokio::time::sleep(KAD_TIMEOUT).await; - let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) .await .wrap_err("connect client") @@ -339,7 +335,6 @@ async fn explore_services_heavy() { async fn explore_services_fixed_heavy() { enable_logs(); let swarms = make_swarms(5).await; - sleep(KAD_TIMEOUT); // language=Clojure let script = r#" diff --git a/crates/nox-tests/tests/spells.rs b/crates/nox-tests/tests/spells.rs index 7eb3165914..ec8f2e68e4 100644 --- a/crates/nox-tests/tests/spells.rs +++ b/crates/nox-tests/tests/spells.rs @@ -2088,17 +2088,19 @@ async fn set_alias_by_worker_creator() { async fn test_decider_api_endpoint_rewrite() { let expected_endpoint = "test1".to_string(); let swarm_keypair = KeyPair::generate_ed25519(); - let swarms = make_swarms_with_cfg(1, |mut cfg| { + let inner_keypair = swarm_keypair.clone(); + let inner_endpoint = expected_endpoint.clone(); + let swarms = make_swarms_with_cfg(1, move |mut cfg| { let tmp_dir = tempfile::tempdir().expect("Could not create temp dir"); let tmp_dir = Arc::new(tmp_dir); - cfg.keypair = swarm_keypair.clone(); + cfg.keypair = inner_keypair.clone(); cfg.tmp_dir = tmp_dir; cfg.enabled_system_services = vec!["decider".to_string()]; cfg.override_system_services_config = Some(SystemServicesConfig { enable: vec![], aqua_ipfs: Default::default(), decider: DeciderConfig { - network_api_endpoint: expected_endpoint.clone(), + network_api_endpoint: inner_endpoint.clone(), ..Default::default() }, registry: Default::default(), @@ -2146,17 +2148,19 @@ async fn test_decider_api_endpoint_rewrite() { .for_each(drop); let another_endpoint = "another_endpoint_test".to_string(); - let swarms = make_swarms_with_cfg(1, |mut cfg| { + let inner_keypair = swarm_keypair.clone(); + let inner_endpoint = another_endpoint.clone(); + let swarms = make_swarms_with_cfg(1, move |mut cfg| { let tmp_dir = tempfile::tempdir().expect("Could not create temp dir"); let tmp_dir = Arc::new(tmp_dir); - cfg.keypair = swarm_keypair.clone(); + cfg.keypair = inner_keypair.clone(); cfg.tmp_dir = tmp_dir; cfg.enabled_system_services = vec!["decider".to_string()]; cfg.override_system_services_config = Some(SystemServicesConfig { enable: vec![], aqua_ipfs: Default::default(), decider: DeciderConfig { - network_api_endpoint: another_endpoint.clone(), + network_api_endpoint: inner_endpoint.clone(), ..Default::default() }, registry: Default::default(), @@ -2201,7 +2205,7 @@ async fn test_decider_api_endpoint_rewrite() { #[tokio::test] async fn test_activate_deactivate() { let worker_period_sec = 120u32; - let swarms = make_swarms_with_cfg(1, |mut cfg| { + let swarms = make_swarms_with_cfg(1, move |mut cfg| { cfg.override_system_services_config = Some(SystemServicesConfig { enable: vec![], aqua_ipfs: Default::default(), diff --git a/crates/nox-tests/tests/topology.rs b/crates/nox-tests/tests/topology.rs index 4682dac5f9..7873b5070c 100644 --- a/crates/nox-tests/tests/topology.rs +++ b/crates/nox-tests/tests/topology.rs @@ -25,7 +25,6 @@ use connected_client::ConnectedClient; use created_swarm::make_swarms; use log_utils::enable_logs; use network::join::join_stream; -use test_constants::KAD_TIMEOUT; pub mod network { pub mod join; @@ -35,7 +34,6 @@ pub mod network { async fn identity_heavy() { enable_logs(); let swarms = make_swarms(3).await; - tokio::time::sleep(KAD_TIMEOUT).await; let mut a = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) .await diff --git a/crates/test-constants/src/lib.rs b/crates/test-constants/src/lib.rs index 4c25d071e2..94fdd1634a 100644 --- a/crates/test-constants/src/lib.rs +++ b/crates/test-constants/src/lib.rs @@ -34,7 +34,6 @@ pub static TIMEOUT: Duration = Duration::from_secs(150); pub static TIMEOUT: Duration = Duration::from_secs(15); pub static SHORT_TIMEOUT: Duration = Duration::from_millis(300); -pub static KAD_TIMEOUT: Duration = Duration::from_millis(500); pub static TRANSPORT_TIMEOUT: Duration = Duration::from_millis(500); pub static IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30); pub static EXECUTION_TIMEOUT: Duration = Duration::from_millis(5000); diff --git a/nox/src/behaviour/network.rs b/nox/src/behaviour/network.rs index ea0faae2f9..601a12df46 100644 --- a/nox/src/behaviour/network.rs +++ b/nox/src/behaviour/network.rs @@ -29,7 +29,7 @@ use particle_protocol::{ExtendedParticle, PROTOCOL_NAME}; use server_config::NetworkConfig; use crate::connectivity::Connectivity; -use crate::health::{BootstrapNodesHealth, ConnectivityHealth}; +use crate::health::{BootstrapNodesHealth, ConnectivityHealth, KademliaBootstrapHealth}; /// Coordinates protocols, so they can cooperate #[derive(NetworkBehaviour)] @@ -80,9 +80,14 @@ impl FluenceNetworkBehaviour { let health = health_registry.map(|registry| { let bootstrap_nodes = BootstrapNodesHealth::new(bootstrap_nodes); + let kademlia_bootstrap = KademliaBootstrapHealth::default(); registry.register("bootstrap_nodes", bootstrap_nodes.clone()); + registry.register("kademlia_bootstrap", kademlia_bootstrap.clone()); - ConnectivityHealth { bootstrap_nodes } + ConnectivityHealth { + bootstrap_nodes, + kademlia_bootstrap, + } }); let connectivity = Connectivity { diff --git a/nox/src/connectivity.rs b/nox/src/connectivity.rs index 6cb7a5e7eb..5c080bf9ab 100644 --- a/nox/src/connectivity.rs +++ b/nox/src/connectivity.rs @@ -175,38 +175,52 @@ impl Connectivity { let pool = self.connection_pool; let bootstrap_nodes = self.bootstrap_nodes; let frequency = self.bootstrap_frequency; + let health = self.health.as_ref(); - // Count connected (and reconnected) bootstrap nodes - let connections = { - use tokio_stream::StreamExt as stream; - let events = pool.lifecycle_events(); - stream::filter_map(events, move |e| { - log::trace!(target: "network", "Connection pool event: {:?}", e); - if let LifecycleEvent::Connected(c) = e { - let mut addresses = c.addresses.iter(); - addresses.find(|addr| bootstrap_nodes.contains(addr))?; - return Some(c); - } - None - }) - } - .enumerate(); + if !bootstrap_nodes.is_empty() { + // Count connected (and reconnected) bootstrap nodes + let connections = { + use tokio_stream::StreamExt as stream; + let events = pool.lifecycle_events(); + stream::filter_map(events, move |e| { + log::trace!(target: "network", "Connection pool event: {:?}", e); + if let LifecycleEvent::Connected(c) = e { + let mut addresses = c.addresses.iter(); + addresses.find(|addr| bootstrap_nodes.contains(addr))?; + return Some(c); + } + None + }) + } + .enumerate(); - connections - .for_each(move |(n, contact)| { - let kademlia = kademlia.clone(); - async move { - if n % frequency == 0 { - kademlia.add_contact(contact); - if let Err(err) = kademlia.bootstrap().await { - log::warn!("Kademlia bootstrap failed: {}", err) - } else { - log::info!("Kademlia bootstrap finished"); + connections + .for_each(move |(n, contact)| { + let kademlia = kademlia.clone(); + async move { + if n % frequency == 0 { + kademlia.add_contact(contact); + if let Err(err) = kademlia.bootstrap().await { + log::warn!("Kademlia bootstrap failed: {}", err); + if let Some(h) = health { + h.kademlia_bootstrap.on_boostrap_failed() + } + } else { + log::info!("Kademlia bootstrap finished"); + if let Some(h) = health { + h.kademlia_bootstrap.on_boostrap_finished() + } + } } } - } - }) - .await; + }) + .await; + } else { + // there is no bootstrap nodes, that means bootstrap process is finished + if let Some(h) = health { + h.kademlia_bootstrap.on_boostrap_finished(); + } + } } /// Dial bootstraps, and then re-dial on each disconnection diff --git a/nox/src/health.rs b/nox/src/health.rs index 5f5b98d885..01b6f66639 100644 --- a/nox/src/health.rs +++ b/nox/src/health.rs @@ -2,11 +2,13 @@ use health::HealthCheck; use libp2p::Multiaddr; use parking_lot::RwLock; use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; #[derive(Clone)] pub struct ConnectivityHealth { pub bootstrap_nodes: BootstrapNodesHealth, + pub kademlia_bootstrap: KademliaBootstrapHealth, } #[derive(Clone)] @@ -50,6 +52,40 @@ impl HealthCheck for BootstrapNodesHealth { } } +#[derive(Clone)] +pub struct KademliaBootstrapHealth { + status: Arc, +} + +impl KademliaBootstrapHealth { + pub fn on_boostrap_finished(&self) { + self.status.store(true, Ordering::Release) + } + + pub fn on_boostrap_failed(&self) { + self.status.store(false, Ordering::Release) + } +} + +impl Default for KademliaBootstrapHealth { + fn default() -> Self { + Self { + status: Arc::new(AtomicBool::default()), + } + } +} + +impl HealthCheck for KademliaBootstrapHealth { + fn status(&self) -> eyre::Result<()> { + let status = self.status.load(Ordering::Acquire); + if status { + Ok(()) + } else { + Err(eyre::eyre!("Kademlia bootstrap not finished")) + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -112,4 +148,49 @@ mod tests { let status = bootstrap_health.status(); assert!(status.is_ok()); } + + #[test] + fn new_health_instance_should_have_default_status_false() { + let health = KademliaBootstrapHealth::default(); + assert_eq!(health.status.load(Ordering::Acquire), false); + } + + #[test] + fn on_bootstrap_finished_should_set_status_to_true() { + let health = KademliaBootstrapHealth::default(); + health.on_boostrap_finished(); + assert_eq!(health.status.load(Ordering::Acquire), true); + } + + #[test] + fn on_bootstrap_failed_should_set_status_to_false() { + let health = KademliaBootstrapHealth::default(); + health.on_boostrap_failed(); + assert_eq!(health.status.load(Ordering::Acquire), false); + } + + #[test] + fn status_should_return_ok_if_bootstrap_finished() { + let health = KademliaBootstrapHealth::default(); + health.on_boostrap_finished(); + assert!(health.status().is_ok()); + } + + #[test] + fn status_should_return_error_if_bootstrap_failed() { + let health = KademliaBootstrapHealth::default(); + health.on_boostrap_failed(); + assert!(health.status().is_err()); + } + + #[test] + fn status_error_should_contain_expected_message() { + let health = KademliaBootstrapHealth::default(); + health.on_boostrap_failed(); + let result = health.status(); + assert_eq!( + result.err().unwrap().to_string(), + "Kademlia bootstrap not finished" + ); + } }