diff --git a/Cargo.lock b/Cargo.lock index 9b0cdecd85..39ae904abf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1229,6 +1229,7 @@ dependencies = [ "fluence-libp2p", "fs-utils", "futures", + "hyper", "libp2p", "log", "maplit", @@ -4158,6 +4159,7 @@ dependencies = [ "test-utils", "tokio", "toy-vms", + "tracing", "uuid-utils", ] diff --git a/Cargo.toml b/Cargo.toml index df08033a4f..7c7070ff25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,7 @@ fluence-spell-dtos = "=0.5.17" fluence-spell-distro = "=0.5.17" # marine -fluence-app-service = {version = "=0.26.4-feat-wasm-backend-interface-update-b3cf050-636-1.0", registry = "fluence"} +fluence-app-service = { version = "=0.26.4-feat-wasm-backend-interface-update-b3cf050-636-1.0", registry = "fluence" } marine-utils = "0.5.0" marine-it-parser = "0.12.1" @@ -131,6 +131,7 @@ maplit = "1.0.2" log = "0.4.19" tracing = { version = "0.1.37", default-features = false, features = ["log"] } tracing-subscriber = "0.3.17" +console-subscriber = "0.1.10" futures = "0.3.28" thiserror = "1.0.44" serde = "1.0.177" @@ -140,6 +141,7 @@ humantime-serde = "1.1.1" cid = "0.10.1" libipld = "0.16.0" axum = "0.6.19" +hyper = "0.14.27" # Enable a small amount of optimization in debug mode [profile.dev] diff --git a/crates/created-swarm/Cargo.toml b/crates/created-swarm/Cargo.toml index e580547cb8..95e45586a7 100644 --- a/crates/created-swarm/Cargo.toml +++ b/crates/created-swarm/Cargo.toml @@ -35,3 +35,4 @@ serde_json = { workspace = true } base64 = { workspace = true } serde = { workspace = true } maplit = { workspace = true } +hyper = { workspace = true } diff --git a/crates/created-swarm/src/swarm.rs b/crates/created-swarm/src/swarm.rs index 776d745942..951a0bef4f 100644 --- a/crates/created-swarm/src/swarm.rs +++ b/crates/created-swarm/src/swarm.rs @@ -15,11 +15,12 @@ */ use std::convert::identity; +use std::net::SocketAddr; use std::{path::PathBuf, time::Duration}; use derivative::Derivative; use fluence_keypair::KeyPair; -use futures::{stream::iter, FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt}; use libp2p::core::multiaddr::Protocol; use libp2p::{core::Multiaddr, PeerId}; use serde::Deserialize; @@ -28,11 +29,12 @@ use air_interpreter_fs::{air_interpreter_path, write_default_air_interpreter}; use aquamarine::{AquaRuntime, VmConfig}; use aquamarine::{AquamarineApi, DataStoreError}; use base64::{engine::general_purpose::STANDARD as base64, Engine}; -use connection_pool::{ConnectionPoolApi, ConnectionPoolT}; use fluence_libp2p::random_multiaddr::{create_memory_maddr, create_tcp_maddr}; use fluence_libp2p::Transport; use fs_utils::{create_dir, make_tmp_dir_peer_id, to_abs_path}; use futures::future::join_all; +use futures::stream::iter; +use hyper::{Body, Request, StatusCode}; use nox::{Connectivity, Node}; use particle_protocol::ProtocolConfig; use server_config::{default_script_storage_timer_resolution, BootstrapConfig, UnresolvedConfig}; @@ -40,6 +42,8 @@ use test_constants::{EXECUTION_TIMEOUT, KEEP_ALIVE_TIMEOUT, TRANSPORT_TIMEOUT}; use tokio::sync::oneshot; use toy_vms::EasyVM; +const HEALTH_CHECK_POLLING_INTERVAL: Duration = Duration::from_millis(100); + #[allow(clippy::upper_case_acronyms)] type AVM = aquamarine::AVM; @@ -54,12 +58,13 @@ pub struct CreatedSwarm { #[derivative(Debug = "ignore")] pub management_keypair: KeyPair, // stop signal - pub outlet: oneshot::Sender<()>, + pub exit_outlet: oneshot::Sender<()>, // node connectivity #[derivative(Debug = "ignore")] pub connectivity: Connectivity, #[derivative(Debug = "ignore")] pub aquamarine_api: AquamarineApi, + http_listen_addr: SocketAddr, } #[tracing::instrument] @@ -148,50 +153,38 @@ where let nodes = addrs .iter() .map(|addr| { - #[rustfmt::skip] - let addrs = addrs.iter().filter(|&a| a != addr).cloned().collect::>(); + let addrs = addrs + .iter() + .filter(|&a| a != addr) + .cloned() + .collect::>(); let bootstraps = bootstraps(addrs); - let bootstraps_num = bootstraps.len(); let (id, node, m_kp, config) = create_node(bootstraps, addr.clone()); - ((id, m_kp, config), node, bootstraps_num) + ((id, m_kp, config), node) }) .collect::>(); - let pools = iter( - nodes - .iter() - .map(|(_, n, bootstraps_num)| (n.connectivity.clone(), *bootstraps_num)) - .collect::>(), - ); - let connected = pools.for_each_concurrent(None, |(pool, bootstraps_num)| async move { - let pool = AsRef::::as_ref(&pool); - let mut events = pool.lifecycle_events(); - loop { - let num = pool.count_connections().await; - if num >= bootstraps_num { - break; - } - // wait until something changes - events.next().await; - } - }); - // start all nodes let infos = join_all(nodes.into_iter().map( - move |((peer_id, management_keypair, config), node, _)| { + move |((peer_id, management_keypair, config), node)| { let connectivity = node.connectivity.clone(); let aquamarine_api = node.aquamarine_api.clone(); async move { - let outlet = node.start(peer_id).await.expect("node start"); + let started_node = node.start(peer_id).await.expect("node start"); + let http = started_node + .http_bind_inlet + .await + .expect("Could not bind http"); CreatedSwarm { peer_id, multiaddr: config.listen_on, tmp_dir: config.tmp_dir.unwrap(), management_keypair, - outlet, + exit_outlet: started_node.exit_outlet, connectivity, aquamarine_api, + http_listen_addr: http.listen_addr, } } .boxed() @@ -200,12 +193,42 @@ where .await; if wait_connected { - connected.await; + let addrs = infos + .iter() + .map(|info| info.http_listen_addr) + .collect::>(); + wait_connected_on_addrs(addrs).await; } infos } +async fn wait_connected_on_addrs(addrs: Vec) { + let http_client = &hyper::Client::new(); + + let healthcheck = iter(addrs).for_each_concurrent(None, |addr| async move { + loop { + let response = http_client + .request( + Request::builder() + .uri(format!("http://{}/health", addr)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + if response.status() == StatusCode::OK { + break; + } + + tokio::time::sleep(HEALTH_CHECK_POLLING_INTERVAL).await + } + }); + + healthcheck.await; +} + #[derive(Clone, Derivative)] #[derivative(Debug)] pub struct SwarmConfig { @@ -223,6 +246,7 @@ pub struct SwarmConfig { pub timer_resolution: Duration, pub allowed_binaries: Vec, pub enabled_system_services: Vec, + pub http_port: u16, } impl SwarmConfig { @@ -244,6 +268,7 @@ impl SwarmConfig { timer_resolution: default_script_storage_timer_resolution(), allowed_binaries: vec!["/usr/bin/ipfs".to_string(), "/usr/bin/curl".to_string()], enabled_system_services: vec![], + http_port: 0, } } } @@ -306,10 +331,12 @@ pub fn create_swarm_with_runtime( "generate_on_absence": false, "value": base64.encode(config.builtins_keypair.to_vec()), }, + "builtins_base_dir": config.builtins_dir, "external_multiaddresses": [config.listen_on], "spell_base_dir": Some(config.spell_base_dir.clone().unwrap_or(to_abs_path(PathBuf::from("spell")))), - "http_port": null + "http_port": config.http_port, + "listen_ip": "127.0.0.1" }); let node_config: UnresolvedConfig = diff --git a/crates/log-utils/Cargo.toml b/crates/log-utils/Cargo.toml index 08453e8cda..f3a5dc0923 100644 --- a/crates/log-utils/Cargo.toml +++ b/crates/log-utils/Cargo.toml @@ -7,6 +7,6 @@ edition = "2021" [dependencies] log = { workspace = true } tracing = { workspace = true, features = ["async-await", "log"] } -tracing-subscriber.workspace = true -console-subscriber = { version = "0.1.10", features = ["parking_lot"] } +tracing-subscriber = { workspace = true, features = ["parking_lot", "env-filter"] } +console-subscriber = { workspace = true, features = ["parking_lot"] } tracing-logfmt = "0.3.2" diff --git a/crates/nox-tests/Cargo.toml b/crates/nox-tests/Cargo.toml index d9cadaafc1..7be5e3163b 100644 --- a/crates/nox-tests/Cargo.toml +++ b/crates/nox-tests/Cargo.toml @@ -55,3 +55,4 @@ base64 = { workspace = true } tokio = { workspace = true } log = { workspace = true } +tracing = { workspace = true } diff --git a/crates/nox-tests/tests/builtin.rs b/crates/nox-tests/tests/builtin.rs index 24d68c4255..8fd632af67 100644 --- a/crates/nox-tests/tests/builtin.rs +++ b/crates/nox-tests/tests/builtin.rs @@ -194,7 +194,10 @@ async fn remove_service_restart() { } // stop swarm - swarms.into_iter().map(|s| s.outlet.send(())).for_each(drop); + swarms + .into_iter() + .map(|s| s.exit_outlet.send(())) + .for_each(drop); let swarms = make_swarms_with_keypair(1, kp, None).await; let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) .await @@ -1930,7 +1933,10 @@ async fn exec_script_as_admin<'a>( ) .await; - swarms.into_iter().map(|s| s.outlet.send(())).for_each(drop); + swarms + .into_iter() + .map(|s| s.exit_outlet.send(())) + .for_each(drop); result } @@ -2036,7 +2042,10 @@ async fn aliases_restart() { use serde_json::Value::Array; // stop swarm - swarms.into_iter().map(|s| s.outlet.send(())).for_each(drop); + swarms + .into_iter() + .map(|s| s.exit_outlet.send(())) + .for_each(drop); let swarms = make_swarms_with_keypair(1, kp, None).await; let mut client = ConnectedClient::connect_with_keypair( swarms[0].multiaddr.clone(), diff --git a/crates/nox-tests/tests/network/network_explore.rs b/crates/nox-tests/tests/network/network_explore.rs index 1427d2ef46..442769900e 100644 --- a/crates/nox-tests/tests/network/network_explore.rs +++ b/crates/nox-tests/tests/network/network_explore.rs @@ -259,9 +259,10 @@ async fn explore_services() { // N - 1 neighborhood each with N - 1 elements. let total_neighs = (swarms.len() - 1) * (swarms.len() - 1); - client.send_particle( - format!( - r#" + let args = client + .execute_particle( + format!( + r#" (seq (seq (call relay ("kad" "neighborhood") [relay] neighs_top) @@ -294,25 +295,23 @@ async fn explore_services() { ) ) "#, - join_stream( - "external_addresses", - "relay", - &total_neighs.to_string(), - "joined_addresses", + join_stream( + "external_addresses", + "relay", + &total_neighs.to_string(), + "joined_addresses", + ) ) + .as_str(), + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + }, ) - .as_str(), - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - }, - ); - - let args = client - .receive_args() .await .wrap_err("receive args") .unwrap(); + let external_addrs = args.into_iter().next().unwrap(); let external_addrs = external_addrs.as_array().unwrap(); let mut external_addrs = external_addrs diff --git a/crates/nox-tests/tests/spells.rs b/crates/nox-tests/tests/spells.rs index adb4d551e4..e9f174a533 100644 --- a/crates/nox-tests/tests/spells.rs +++ b/crates/nox-tests/tests/spells.rs @@ -197,7 +197,10 @@ async fn spell_error_handling_test() { assert_eq!(result.len(), 1); - swarms.into_iter().map(|s| s.outlet.send(())).for_each(drop); + swarms + .into_iter() + .map(|s| s.exit_outlet.send(())) + .for_each(drop); } #[tokio::test] diff --git a/nox/Cargo.toml b/nox/Cargo.toml index ce5454fc60..aa0413d604 100644 --- a/nox/Cargo.toml +++ b/nox/Cargo.toml @@ -52,13 +52,13 @@ humantime-serde = { workspace = true } log = { workspace = true } tracing-log = { version = "0.1.3" } -console-subscriber = { version = "0.1.10", features = ["parking_lot"] } +console-subscriber = { workspace = true, features = ["parking_lot"] } axum = { workspace = true, features = ["macros"] } itertools = { workspace = true } eyre = { workspace = true } base64 = { workspace = true } tracing = { workspace = true, features = ["async-await", "log"] } -tracing-subscriber.workspace = true +tracing-subscriber = { workspace = true, features = ["parking_lot", "env-filter"] } tracing-logfmt = "0.3.2" tracing-opentelemetry = "0.19.0" opentelemetry = { version = "0.19.0", features = ["rt-tokio-current-thread"] } @@ -76,7 +76,7 @@ rand = "0.8.5" bs58 = { workspace = true } connected-client = { path = "../crates/connected-client" } log-utils = { workspace = true } -hyper = "0.14.27" +hyper = { workspace = true } [[bench]] name = "network_api_bench" diff --git a/nox/src/http.rs b/nox/src/http.rs index 9ea2f534b3..f3c7c21def 100644 --- a/nox/src/http.rs +++ b/nox/src/http.rs @@ -16,7 +16,7 @@ use prometheus_client::registry::Registry; use serde_json::{json, Value}; use std::net::SocketAddr; use std::sync::Arc; -use tokio::sync::Notify; +use tokio::sync::oneshot; async fn handler_404() -> impl IntoResponse { (StatusCode::NOT_FOUND, "No such endpoint") @@ -106,6 +106,10 @@ struct Inner { peer_id: PeerId, versions: Versions, } +#[derive(Debug)] +pub struct StartedHttp { + pub listen_addr: SocketAddr, +} pub async fn start_http_endpoint( listen_addr: SocketAddr, @@ -113,7 +117,7 @@ pub async fn start_http_endpoint( health_registry: Option, peer_id: PeerId, versions: Versions, - notify: Arc, + notify: oneshot::Sender, ) { let state = RouteState(Arc::new(Inner { metric_registry, @@ -130,7 +134,11 @@ pub async fn start_http_endpoint( .with_state(state); let server = axum::Server::bind(&listen_addr).serve(app.into_make_service()); - notify.notify_one(); + notify + .send(StartedHttp { + listen_addr: server.local_addr(), + }) + .expect("Could not send http info"); server.await.expect("Could not make http endpoint") } @@ -139,17 +147,7 @@ mod tests { use super::*; use axum::http::Request; use health::HealthCheck; - use std::net::{SocketAddr, TcpListener}; - - fn get_free_tcp_port() -> u16 { - let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind to port 0"); - let socket_addr = listener - .local_addr() - .expect("Failed to retrieve local address"); - let port = socket_addr.port(); - drop(listener); - port - } + use std::net::SocketAddr; fn test_versions() -> Versions { Versions { @@ -168,11 +166,9 @@ mod tests { #[tokio::test] async fn test_version_route() { // Create a test server - let port = get_free_tcp_port(); - let addr = format!("127.0.0.1:{port}").parse::().unwrap(); + let addr = format!("127.0.0.1:0").parse::().unwrap(); - let notify = Arc::new(Notify::new()); - let cloned_notify = notify.clone(); + let (notify_sender, notify_receiver) = oneshot::channel(); tokio::spawn(async move { start_http_endpoint( addr, @@ -180,19 +176,19 @@ mod tests { None, PeerId::random(), test_versions(), - cloned_notify, + notify_sender, ) .await; }); - notify.notified().await; + let http_info = notify_receiver.await.unwrap(); let client = hyper::Client::new(); let response = client .request( Request::builder() - .uri(format!("http://{}/versions", addr)) + .uri(format!("http://{}/versions", http_info.listen_addr)) .body(Body::empty()) .unwrap(), ) @@ -207,24 +203,22 @@ mod tests { #[tokio::test] async fn test_peer_id_route() { // Create a test server - let port = get_free_tcp_port(); - let addr = format!("127.0.0.1:{port}").parse::().unwrap(); + let addr = format!("127.0.0.1:0").parse::().unwrap(); let peer_id = PeerId::random(); - let notify = Arc::new(Notify::new()); - let cloned_notify = notify.clone(); + let (notify_sender, notify_receiver) = oneshot::channel(); tokio::spawn(async move { - start_http_endpoint(addr, None, None, peer_id, test_versions(), cloned_notify).await; + start_http_endpoint(addr, None, None, peer_id, test_versions(), notify_sender).await; }); - notify.notified().await; + let http_info = notify_receiver.await.unwrap(); let client = hyper::Client::new(); let response = client .request( Request::builder() - .uri(format!("http://{}/peer_id", addr)) + .uri(format!("http://{}/peer_id", http_info.listen_addr)) .body(Body::empty()) .unwrap(), ) @@ -242,12 +236,10 @@ mod tests { #[tokio::test] async fn test_health_route_empty_registry() { // Create a test server - let port = get_free_tcp_port(); - let addr = format!("127.0.0.1:{port}").parse::().unwrap(); + let addr = format!("127.0.0.1:0").parse::().unwrap(); let peer_id = PeerId::random(); - let notify = Arc::new(Notify::new()); - let cloned_notify = notify.clone(); + let (notify_sender, notify_receiver) = oneshot::channel(); let health_registry = HealthCheckRegistry::new(); tokio::spawn(async move { start_http_endpoint( @@ -256,19 +248,19 @@ mod tests { Some(health_registry), peer_id, test_versions(), - cloned_notify, + notify_sender, ) .await; }); - notify.notified().await; + let http_info = notify_receiver.await.unwrap(); let client = hyper::Client::new(); let response = client .request( Request::builder() - .uri(format!("http://{}/health", addr)) + .uri(format!("http://{}/health", http_info.listen_addr)) .body(Body::empty()) .unwrap(), ) @@ -283,12 +275,10 @@ mod tests { #[tokio::test] async fn test_health_route_success_checks() { // Create a test server - let port = get_free_tcp_port(); - let addr = format!("127.0.0.1:{port}").parse::().unwrap(); + let addr = format!("127.0.0.1:0").parse::().unwrap(); let peer_id = PeerId::random(); - let notify = Arc::new(Notify::new()); - let cloned_notify = notify.clone(); + let (notify_sender, notify_receiver) = oneshot::channel(); let mut health_registry = HealthCheckRegistry::new(); struct SuccessHealthCheck {} impl HealthCheck for SuccessHealthCheck { @@ -305,19 +295,19 @@ mod tests { Some(health_registry), peer_id, test_versions(), - cloned_notify, + notify_sender, ) .await; }); - notify.notified().await; + let http_info = notify_receiver.await.unwrap(); let client = hyper::Client::new(); let response = client .request( Request::builder() - .uri(format!("http://{}/health", addr)) + .uri(format!("http://{}/health", http_info.listen_addr)) .body(Body::empty()) .unwrap(), ) @@ -332,12 +322,10 @@ mod tests { #[tokio::test] async fn test_health_route_warn_checks() { // Create a test server - let port = get_free_tcp_port(); - let addr = format!("127.0.0.1:{port}").parse::().unwrap(); + let addr = format!("127.0.0.1:0").parse::().unwrap(); let peer_id = PeerId::random(); - let notify = Arc::new(Notify::new()); - let cloned_notify = notify.clone(); + let (notify_sender, notify_receiver) = oneshot::channel(); let mut health_registry = HealthCheckRegistry::new(); struct SuccessHealthCheck {} impl HealthCheck for SuccessHealthCheck { @@ -362,19 +350,19 @@ mod tests { Some(health_registry), peer_id, test_versions(), - cloned_notify, + notify_sender, ) .await; }); - notify.notified().await; + let http_info = notify_receiver.await.unwrap(); let client = hyper::Client::new(); let response = client .request( Request::builder() - .uri(format!("http://{}/health", addr)) + .uri(format!("http://{}/health", http_info.listen_addr)) .body(Body::empty()) .unwrap(), ) @@ -392,12 +380,10 @@ mod tests { #[tokio::test] async fn test_health_route_fail_checks() { // Create a test server - let port = get_free_tcp_port(); - let addr = format!("127.0.0.1:{port}").parse::().unwrap(); + let addr = format!("127.0.0.1:0").parse::().unwrap(); let peer_id = PeerId::random(); - let notify = Arc::new(Notify::new()); - let cloned_notify = notify.clone(); + let (notify_sender, notify_receiver) = oneshot::channel(); let mut health_registry = HealthCheckRegistry::new(); struct FailHealthCheck {} impl HealthCheck for FailHealthCheck { @@ -414,19 +400,19 @@ mod tests { Some(health_registry), peer_id, test_versions(), - cloned_notify, + notify_sender, ) .await; }); - notify.notified().await; + let http_info = notify_receiver.await.unwrap(); let client = hyper::Client::new(); let response = client .request( Request::builder() - .uri(format!("http://{}/health", addr)) + .uri(format!("http://{}/health", http_info.listen_addr)) .body(Body::empty()) .unwrap(), ) diff --git a/nox/src/lib.rs b/nox/src/lib.rs index 4c259309e6..b2a5a1ce68 100644 --- a/nox/src/lib.rs +++ b/nox/src/lib.rs @@ -47,6 +47,7 @@ mod behaviour { } pub use behaviour::{FluenceNetworkBehaviour, FluenceNetworkBehaviourEvent}; +pub use http::StartedHttp; pub use node::Node; // to be available in benchmarks diff --git a/nox/src/main.rs b/nox/src/main.rs index 57939d1178..e8b8dcd425 100644 --- a/nox/src/main.rs +++ b/nox/src/main.rs @@ -33,6 +33,7 @@ use tokio::signal; use tokio::sync::oneshot; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::EnvFilter; use air_interpreter_fs::write_default_air_interpreter; use aquamarine::{VmConfig, AVM}; @@ -95,6 +96,8 @@ fn main() -> eyre::Result<()> { .expect("Could not make tokio runtime") .block_on(async { tracing_subscriber::registry() + //takes log levels from env variable RUST_LOG + .with(EnvFilter::from_default_env()) .with(log_layer(&config.log)) .with(tokio_console_layer(&config.console)?) .with(tracing_layer(&config.tracing)?) @@ -140,7 +143,7 @@ async fn start_fluence(config: ResolvedConfig) -> eyre::Result { .wrap_err("error create node instance")?; node.listen(listen_addrs).wrap_err("error on listen")?; - let node_exit_outlet = node.start(peer_id).await.wrap_err("node failed to start")?; + let started_node = node.start(peer_id).await.wrap_err("node failed to start")?; struct Fluence { node_exit_outlet: oneshot::Sender<()>, @@ -154,7 +157,9 @@ async fn start_fluence(config: ResolvedConfig) -> eyre::Result { } } - Ok(Fluence { node_exit_outlet }) + Ok(Fluence { + node_exit_outlet: started_node.exit_outlet, + }) } fn vm_config(config: &ResolvedConfig) -> VmConfig { diff --git a/nox/src/node.rs b/nox/src/node.rs index 64473a2ad4..a1cdf8dec7 100644 --- a/nox/src/node.rs +++ b/nox/src/node.rs @@ -50,7 +50,7 @@ use sorcerer::Sorcerer; use spell_event_bus::api::{PeerEvent, SpellEventBusApi, TriggerEvent}; use spell_event_bus::bus::SpellEventBus; use system_services::Deployer; -use tokio::sync::{mpsc, oneshot, Notify}; +use tokio::sync::{mpsc, oneshot}; use tokio::task; use crate::builtins::make_peer_builtin; @@ -60,7 +60,7 @@ use crate::{Connectivity, Versions}; use super::behaviour::FluenceNetworkBehaviour; use crate::behaviour::FluenceNetworkBehaviourEvent; -use crate::http::start_http_endpoint; +use crate::http::{start_http_endpoint, StartedHttp}; // TODO: documentation pub struct Node { @@ -393,6 +393,11 @@ impl Node { } } +pub struct StartedNode { + pub exit_outlet: oneshot::Sender<()>, + pub http_bind_inlet: oneshot::Receiver, +} + impl Node { #[allow(clippy::too_many_arguments)] pub fn with( @@ -451,8 +456,9 @@ impl Node { /// Starts node service #[allow(clippy::boxed_local)] // Mike said it should be boxed - pub async fn start(self: Box, peer_id: PeerId) -> eyre::Result> { + pub async fn start(self: Box, peer_id: PeerId) -> eyre::Result { let (exit_outlet, exit_inlet) = oneshot::channel(); + let (http_bind_outlet, http_bind_inlet) = oneshot::channel(); let particle_stream = self.particle_stream; let effects_stream = self.effects_stream; @@ -476,7 +482,7 @@ impl Node { task::Builder::new().name(&task_name.clone()).spawn(async move { let mut http_server = if let Some(http_listen_addr) = http_listen_addr{ log::info!("Starting http endpoint at {}", http_listen_addr); - start_http_endpoint(http_listen_addr, metrics_registry, health_registry, peer_id, versions, Arc::new(Notify::new())).boxed() + start_http_endpoint(http_listen_addr, metrics_registry, health_registry, peer_id, versions, http_bind_outlet).boxed() } else { futures::future::pending().boxed() }; @@ -532,7 +538,10 @@ impl Node { .map_err(|e| eyre::eyre!("{e}")) .context("running spell event bus failed")?; - Ok(exit_outlet) + Ok(StartedNode { + exit_outlet, + http_bind_inlet, + }) } /// Starts node service listener. @@ -594,7 +603,7 @@ mod tests { let listening_address: Multiaddr = "/ip4/127.0.0.1/tcp/7777".parse().unwrap(); node.listen(vec![listening_address.clone()]).unwrap(); let peer_id = PeerId::random(); - let exit_outlet = node.start(peer_id).await.expect("start node"); + let started_node = node.start(peer_id).await.expect("start node"); let mut client = ConnectedClient::connect_to(listening_address) .await @@ -617,6 +626,6 @@ mod tests { .await .unwrap(); - exit_outlet.send(()).unwrap(); + started_node.exit_outlet.send(()).unwrap(); } }