Skip to content

Commit

Permalink
feat(tests): Wait for healtcheck in nox tests (#1739)
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu committed Aug 3, 2023
1 parent 7a268b2 commit 2c4748c
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 123 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fluence-spell-dtos = "=0.5.17"
fluence-spell-distro = "=0.5.17"

# marine
fluence-app-service = {version = "=0.26.4-feat-wasm-backend-interface-update-b3cf050-636-1.0", registry = "fluence"}
fluence-app-service = { version = "=0.26.4-feat-wasm-backend-interface-update-b3cf050-636-1.0", registry = "fluence" }
marine-utils = "0.5.0"
marine-it-parser = "0.12.1"

Expand Down Expand Up @@ -131,6 +131,7 @@ maplit = "1.0.2"
log = "0.4.19"
tracing = { version = "0.1.37", default-features = false, features = ["log"] }
tracing-subscriber = "0.3.17"
console-subscriber = "0.1.10"
futures = "0.3.28"
thiserror = "1.0.44"
serde = "1.0.177"
Expand All @@ -140,6 +141,7 @@ humantime-serde = "1.1.1"
cid = "0.10.1"
libipld = "0.16.0"
axum = "0.6.19"
hyper = "0.14.27"

# Enable a small amount of optimization in debug mode
[profile.dev]
Expand Down
1 change: 1 addition & 0 deletions crates/created-swarm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ serde_json = { workspace = true }
base64 = { workspace = true }
serde = { workspace = true }
maplit = { workspace = true }
hyper = { workspace = true }
89 changes: 58 additions & 31 deletions crates/created-swarm/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
*/

use std::convert::identity;
use std::net::SocketAddr;
use std::{path::PathBuf, time::Duration};

use derivative::Derivative;
use fluence_keypair::KeyPair;
use futures::{stream::iter, FutureExt, StreamExt};
use futures::{FutureExt, StreamExt};
use libp2p::core::multiaddr::Protocol;
use libp2p::{core::Multiaddr, PeerId};
use serde::Deserialize;
Expand All @@ -28,18 +29,21 @@ use air_interpreter_fs::{air_interpreter_path, write_default_air_interpreter};
use aquamarine::{AquaRuntime, VmConfig};
use aquamarine::{AquamarineApi, DataStoreError};
use base64::{engine::general_purpose::STANDARD as base64, Engine};
use connection_pool::{ConnectionPoolApi, ConnectionPoolT};
use fluence_libp2p::random_multiaddr::{create_memory_maddr, create_tcp_maddr};
use fluence_libp2p::Transport;
use fs_utils::{create_dir, make_tmp_dir_peer_id, to_abs_path};
use futures::future::join_all;
use futures::stream::iter;
use hyper::{Body, Request, StatusCode};
use nox::{Connectivity, Node};
use particle_protocol::ProtocolConfig;
use server_config::{default_script_storage_timer_resolution, BootstrapConfig, UnresolvedConfig};
use test_constants::{EXECUTION_TIMEOUT, KEEP_ALIVE_TIMEOUT, TRANSPORT_TIMEOUT};
use tokio::sync::oneshot;
use toy_vms::EasyVM;

const HEALTH_CHECK_POLLING_INTERVAL: Duration = Duration::from_millis(100);

#[allow(clippy::upper_case_acronyms)]
type AVM = aquamarine::AVM<DataStoreError>;

Expand All @@ -54,12 +58,13 @@ pub struct CreatedSwarm {
#[derivative(Debug = "ignore")]
pub management_keypair: KeyPair,
// stop signal
pub outlet: oneshot::Sender<()>,
pub exit_outlet: oneshot::Sender<()>,
// node connectivity
#[derivative(Debug = "ignore")]
pub connectivity: Connectivity,
#[derivative(Debug = "ignore")]
pub aquamarine_api: AquamarineApi,
http_listen_addr: SocketAddr,
}

#[tracing::instrument]
Expand Down Expand Up @@ -148,50 +153,38 @@ where
let nodes = addrs
.iter()
.map(|addr| {
#[rustfmt::skip]
let addrs = addrs.iter().filter(|&a| a != addr).cloned().collect::<Vec<_>>();
let addrs = addrs
.iter()
.filter(|&a| a != addr)
.cloned()
.collect::<Vec<_>>();
let bootstraps = bootstraps(addrs);
let bootstraps_num = bootstraps.len();
let (id, node, m_kp, config) = create_node(bootstraps, addr.clone());
((id, m_kp, config), node, bootstraps_num)
((id, m_kp, config), node)
})
.collect::<Vec<_>>();

let pools = iter(
nodes
.iter()
.map(|(_, n, bootstraps_num)| (n.connectivity.clone(), *bootstraps_num))
.collect::<Vec<_>>(),
);
let connected = pools.for_each_concurrent(None, |(pool, bootstraps_num)| async move {
let pool = AsRef::<ConnectionPoolApi>::as_ref(&pool);
let mut events = pool.lifecycle_events();
loop {
let num = pool.count_connections().await;
if num >= bootstraps_num {
break;
}
// wait until something changes
events.next().await;
}
});

// start all nodes
let infos = join_all(nodes.into_iter().map(
move |((peer_id, management_keypair, config), node, _)| {
move |((peer_id, management_keypair, config), node)| {
let connectivity = node.connectivity.clone();
let aquamarine_api = node.aquamarine_api.clone();
async move {
let outlet = node.start(peer_id).await.expect("node start");
let started_node = node.start(peer_id).await.expect("node start");

let http = started_node
.http_bind_inlet
.await
.expect("Could not bind http");
CreatedSwarm {
peer_id,
multiaddr: config.listen_on,
tmp_dir: config.tmp_dir.unwrap(),
management_keypair,
outlet,
exit_outlet: started_node.exit_outlet,
connectivity,
aquamarine_api,
http_listen_addr: http.listen_addr,
}
}
.boxed()
Expand All @@ -200,12 +193,42 @@ where
.await;

if wait_connected {
connected.await;
let addrs = infos
.iter()
.map(|info| info.http_listen_addr)
.collect::<Vec<_>>();
wait_connected_on_addrs(addrs).await;
}

infos
}

async fn wait_connected_on_addrs(addrs: Vec<SocketAddr>) {
let http_client = &hyper::Client::new();

let healthcheck = iter(addrs).for_each_concurrent(None, |addr| async move {
loop {
let response = http_client
.request(
Request::builder()
.uri(format!("http://{}/health", addr))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();

if response.status() == StatusCode::OK {
break;
}

tokio::time::sleep(HEALTH_CHECK_POLLING_INTERVAL).await
}
});

healthcheck.await;
}

#[derive(Clone, Derivative)]
#[derivative(Debug)]
pub struct SwarmConfig {
Expand All @@ -223,6 +246,7 @@ pub struct SwarmConfig {
pub timer_resolution: Duration,
pub allowed_binaries: Vec<String>,
pub enabled_system_services: Vec<server_config::system_services_config::ServiceKey>,
pub http_port: u16,
}

impl SwarmConfig {
Expand All @@ -244,6 +268,7 @@ impl SwarmConfig {
timer_resolution: default_script_storage_timer_resolution(),
allowed_binaries: vec!["/usr/bin/ipfs".to_string(), "/usr/bin/curl".to_string()],
enabled_system_services: vec![],
http_port: 0,
}
}
}
Expand Down Expand Up @@ -306,10 +331,12 @@ pub fn create_swarm_with_runtime<RT: AquaRuntime>(
"generate_on_absence": false,
"value": base64.encode(config.builtins_keypair.to_vec()),
},

"builtins_base_dir": config.builtins_dir,
"external_multiaddresses": [config.listen_on],
"spell_base_dir": Some(config.spell_base_dir.clone().unwrap_or(to_abs_path(PathBuf::from("spell")))),
"http_port": null
"http_port": config.http_port,
"listen_ip": "127.0.0.1"
});

let node_config: UnresolvedConfig =
Expand Down
4 changes: 2 additions & 2 deletions crates/log-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ edition = "2021"
[dependencies]
log = { workspace = true }
tracing = { workspace = true, features = ["async-await", "log"] }
tracing-subscriber.workspace = true
console-subscriber = { version = "0.1.10", features = ["parking_lot"] }
tracing-subscriber = { workspace = true, features = ["parking_lot", "env-filter"] }
console-subscriber = { workspace = true, features = ["parking_lot"] }
tracing-logfmt = "0.3.2"
1 change: 1 addition & 0 deletions crates/nox-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ base64 = { workspace = true }

tokio = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
15 changes: 12 additions & 3 deletions crates/nox-tests/tests/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ async fn remove_service_restart() {
}

// stop swarm
swarms.into_iter().map(|s| s.outlet.send(())).for_each(drop);
swarms
.into_iter()
.map(|s| s.exit_outlet.send(()))
.for_each(drop);
let swarms = make_swarms_with_keypair(1, kp, None).await;
let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.await
Expand Down Expand Up @@ -1930,7 +1933,10 @@ async fn exec_script_as_admin<'a>(
)
.await;

swarms.into_iter().map(|s| s.outlet.send(())).for_each(drop);
swarms
.into_iter()
.map(|s| s.exit_outlet.send(()))
.for_each(drop);

result
}
Expand Down Expand Up @@ -2036,7 +2042,10 @@ async fn aliases_restart() {
use serde_json::Value::Array;

// stop swarm
swarms.into_iter().map(|s| s.outlet.send(())).for_each(drop);
swarms
.into_iter()
.map(|s| s.exit_outlet.send(()))
.for_each(drop);
let swarms = make_swarms_with_keypair(1, kp, None).await;
let mut client = ConnectedClient::connect_with_keypair(
swarms[0].multiaddr.clone(),
Expand Down
33 changes: 16 additions & 17 deletions crates/nox-tests/tests/network/network_explore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,10 @@ async fn explore_services() {
// N - 1 neighborhood each with N - 1 elements.
let total_neighs = (swarms.len() - 1) * (swarms.len() - 1);

client.send_particle(
format!(
r#"
let args = client
.execute_particle(
format!(
r#"
(seq
(seq
(call relay ("kad" "neighborhood") [relay] neighs_top)
Expand Down Expand Up @@ -294,25 +295,23 @@ async fn explore_services() {
)
)
"#,
join_stream(
"external_addresses",
"relay",
&total_neighs.to_string(),
"joined_addresses",
join_stream(
"external_addresses",
"relay",
&total_neighs.to_string(),
"joined_addresses",
)
)
.as_str(),
hashmap! {
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string()),
},
)
.as_str(),
hashmap! {
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string()),
},
);

let args = client
.receive_args()
.await
.wrap_err("receive args")
.unwrap();

let external_addrs = args.into_iter().next().unwrap();
let external_addrs = external_addrs.as_array().unwrap();
let mut external_addrs = external_addrs
Expand Down
5 changes: 4 additions & 1 deletion crates/nox-tests/tests/spells.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ async fn spell_error_handling_test() {

assert_eq!(result.len(), 1);

swarms.into_iter().map(|s| s.outlet.send(())).for_each(drop);
swarms
.into_iter()
.map(|s| s.exit_outlet.send(()))
.for_each(drop);
}

#[tokio::test]
Expand Down
6 changes: 3 additions & 3 deletions nox/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ humantime-serde = { workspace = true }

log = { workspace = true }
tracing-log = { version = "0.1.3" }
console-subscriber = { version = "0.1.10", features = ["parking_lot"] }
console-subscriber = { workspace = true, features = ["parking_lot"] }
axum = { workspace = true, features = ["macros"] }
itertools = { workspace = true }
eyre = { workspace = true }
base64 = { workspace = true }
tracing = { workspace = true, features = ["async-await", "log"] }
tracing-subscriber.workspace = true
tracing-subscriber = { workspace = true, features = ["parking_lot", "env-filter"] }
tracing-logfmt = "0.3.2"
tracing-opentelemetry = "0.19.0"
opentelemetry = { version = "0.19.0", features = ["rt-tokio-current-thread"] }
Expand All @@ -76,7 +76,7 @@ rand = "0.8.5"
bs58 = { workspace = true }
connected-client = { path = "../crates/connected-client" }
log-utils = { workspace = true }
hyper = "0.14.27"
hyper = { workspace = true }

[[bench]]
name = "network_api_bench"
Expand Down
Loading

0 comments on commit 2c4748c

Please sign in to comment.