Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tests): Wait for healtcheck in nox tests #1739

Merged
merged 14 commits into from
Aug 3, 2023
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/created-swarm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ serde_json = { workspace = true }
base64 = { workspace = true }
serde = { workspace = true }
maplit = { workspace = true }
hyper = "0.14.27"
91 changes: 61 additions & 30 deletions crates/created-swarm/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
*/

use std::convert::identity;
use std::net::TcpListener;
use std::{path::PathBuf, time::Duration};

use derivative::Derivative;
use fluence_keypair::KeyPair;
use futures::{stream::iter, FutureExt, StreamExt};
use futures::{FutureExt, StreamExt};
use libp2p::core::multiaddr::Protocol;
use libp2p::{core::Multiaddr, PeerId};
use serde::Deserialize;
Expand All @@ -28,18 +29,22 @@ use air_interpreter_fs::{air_interpreter_path, write_default_air_interpreter};
use aquamarine::{AquaRuntime, VmConfig};
use aquamarine::{AquamarineApi, DataStoreError};
use base64::{engine::general_purpose::STANDARD as base64, Engine};
use connection_pool::{ConnectionPoolApi, ConnectionPoolT};
//use connection_pool::{ConnectionPoolApi, ConnectionPoolT};
folex marked this conversation as resolved.
Show resolved Hide resolved
use fluence_libp2p::random_multiaddr::{create_memory_maddr, create_tcp_maddr};
use fluence_libp2p::Transport;
use fs_utils::{create_dir, make_tmp_dir_peer_id, to_abs_path};
use futures::future::join_all;
use futures::stream::iter;
use hyper::{Body, Request, StatusCode};
use nox::{Connectivity, Node};
use particle_protocol::ProtocolConfig;
use server_config::{default_script_storage_timer_resolution, BootstrapConfig, UnresolvedConfig};
use test_constants::{EXECUTION_TIMEOUT, KEEP_ALIVE_TIMEOUT, TRANSPORT_TIMEOUT};
use tokio::sync::oneshot;
use toy_vms::EasyVM;

const POLLING_INTERVAL: Duration = Duration::from_millis(100);
folex marked this conversation as resolved.
Show resolved Hide resolved

#[allow(clippy::upper_case_acronyms)]
type AVM = aquamarine::AVM<DataStoreError>;

Expand Down Expand Up @@ -140,41 +145,26 @@ pub async fn make_swarms_with<RT: AquaRuntime, F, M, B>(
wait_connected: bool,
) -> Vec<CreatedSwarm>
where
F: FnMut(Vec<Multiaddr>, Multiaddr) -> (PeerId, Box<Node<RT>>, KeyPair, SwarmConfig),
F: FnMut(Vec<Multiaddr>, Multiaddr) -> (PeerId, Box<Node<RT>>, KeyPair, SwarmConfig, u16),
folex marked this conversation as resolved.
Show resolved Hide resolved
M: FnMut() -> Multiaddr,
B: FnMut(Vec<Multiaddr>) -> Vec<Multiaddr>,
{
let addrs = (0..n).map(|_| create_maddr()).collect::<Vec<_>>();
let nodes = addrs
.iter()
.map(|addr| {
#[rustfmt::skip]
let addrs = addrs.iter().filter(|&a| a != addr).cloned().collect::<Vec<_>>();
let addrs = addrs
.iter()
.filter(|&a| a != addr)
.cloned()
.collect::<Vec<_>>();
let bootstraps = bootstraps(addrs);
let bootstraps_num = bootstraps.len();
let (id, node, m_kp, config) = create_node(bootstraps, addr.clone());
((id, m_kp, config), node, bootstraps_num)
let (id, node, m_kp, config, http_port) = create_node(bootstraps, addr.clone());
((id, m_kp, config), node, http_port)
})
.collect::<Vec<_>>();

let pools = iter(
nodes
.iter()
.map(|(_, n, bootstraps_num)| (n.connectivity.clone(), *bootstraps_num))
.collect::<Vec<_>>(),
);
let connected = pools.for_each_concurrent(None, |(pool, bootstraps_num)| async move {
let pool = AsRef::<ConnectionPoolApi>::as_ref(&pool);
let mut events = pool.lifecycle_events();
loop {
let num = pool.count_connections().await;
if num >= bootstraps_num {
break;
}
// wait until something changes
events.next().await;
}
});
folex marked this conversation as resolved.
Show resolved Hide resolved
let http_ports: Vec<u16> = nodes.iter().map(|(_, _, http_port)| *http_port).collect();

// start all nodes
let infos = join_all(nodes.into_iter().map(
Expand All @@ -200,12 +190,38 @@ where
.await;

if wait_connected {
connected.await;
wait_connected_on_ports(http_ports).await;
}

infos
}

async fn wait_connected_on_ports(http_ports: Vec<u16>) {
let http_client = &hyper::Client::new();

let connected = iter(http_ports).for_each_concurrent(None, |http_port| async move {
folex marked this conversation as resolved.
Show resolved Hide resolved
loop {
let response = http_client
.request(
Request::builder()
.uri(format!("http://localhost:{}/health", http_port))
folex marked this conversation as resolved.
Show resolved Hide resolved
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();

if response.status() == StatusCode::OK {
break;
}

tokio::time::sleep(POLLING_INTERVAL).await
}
});

connected.await;
}

#[derive(Clone, Derivative)]
#[derivative(Debug)]
pub struct SwarmConfig {
Expand Down Expand Up @@ -271,11 +287,21 @@ pub fn aqua_vm_config(
VmConfig::new(peer_id, avm_base_dir, air_interpreter, None)
}

fn get_free_tcp_port() -> u16 {
let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind to port 0");
let socket_addr = listener
.local_addr()
.expect("Failed to retrieve local address");
let port = socket_addr.port();
drop(listener);
port
}

#[tracing::instrument(skip(vm_config))]
pub fn create_swarm_with_runtime<RT: AquaRuntime>(
mut config: SwarmConfig,
vm_config: impl Fn(BaseVmConfig) -> RT::Config,
) -> (PeerId, Box<Node<RT>>, KeyPair, SwarmConfig) {
) -> (PeerId, Box<Node<RT>>, KeyPair, SwarmConfig, u16) {
use serde_json::json;

let format = match &config.keypair {
Expand All @@ -294,6 +320,8 @@ pub fn create_swarm_with_runtime<RT: AquaRuntime>(
let tmp_dir = config.tmp_dir.as_ref().unwrap();
create_dir(tmp_dir).expect("create tmp dir");

let http_port = get_free_tcp_port();
folex marked this conversation as resolved.
Show resolved Hide resolved

let node_config = json!({
"base_dir": tmp_dir.to_string_lossy(),
"root_key_pair": {
Expand All @@ -306,10 +334,12 @@ pub fn create_swarm_with_runtime<RT: AquaRuntime>(
"generate_on_absence": false,
"value": base64.encode(config.builtins_keypair.to_vec()),
},

"builtins_base_dir": config.builtins_dir,
"external_multiaddresses": [config.listen_on],
"spell_base_dir": Some(config.spell_base_dir.clone().unwrap_or(to_abs_path(PathBuf::from("spell")))),
"http_port": null
"http_port": http_port,
folex marked this conversation as resolved.
Show resolved Hide resolved
"listen_ip": "127.0.0.1"
});

let node_config: UnresolvedConfig =
Expand Down Expand Up @@ -358,10 +388,11 @@ pub fn create_swarm_with_runtime<RT: AquaRuntime>(
node,
management_kp,
config,
http_port,
)
}

#[tracing::instrument]
pub fn create_swarm(config: SwarmConfig) -> (PeerId, Box<Node<AVM>>, KeyPair, SwarmConfig) {
pub fn create_swarm(config: SwarmConfig) -> (PeerId, Box<Node<AVM>>, KeyPair, SwarmConfig, u16) {
create_swarm_with_runtime(config, aqua_vm_config)
}
2 changes: 2 additions & 0 deletions crates/nox-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,5 @@ base64 = { workspace = true }

tokio = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
dhat = "0.3.2"
33 changes: 16 additions & 17 deletions crates/nox-tests/tests/network/network_explore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,10 @@ async fn explore_services() {
// N - 1 neighborhood each with N - 1 elements.
let total_neighs = (swarms.len() - 1) * (swarms.len() - 1);

client.send_particle(
format!(
r#"
let args = client
.execute_particle(
format!(
r#"
(seq
(seq
(call relay ("kad" "neighborhood") [relay] neighs_top)
Expand Down Expand Up @@ -294,25 +295,23 @@ async fn explore_services() {
)
)
"#,
join_stream(
"external_addresses",
"relay",
&total_neighs.to_string(),
"joined_addresses",
join_stream(
"external_addresses",
"relay",
&total_neighs.to_string(),
"joined_addresses",
)
)
.as_str(),
hashmap! {
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string()),
},
)
.as_str(),
hashmap! {
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string()),
},
);

let args = client
.receive_args()
.await
.wrap_err("receive args")
.unwrap();

let external_addrs = args.into_iter().next().unwrap();
let external_addrs = external_addrs.as_array().unwrap();
let mut external_addrs = external_addrs
Expand Down
2 changes: 2 additions & 0 deletions nox/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use tokio::signal;
use tokio::sync::oneshot;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;

use air_interpreter_fs::write_default_air_interpreter;
use aquamarine::{VmConfig, AVM};
Expand Down Expand Up @@ -95,6 +96,7 @@ fn main() -> eyre::Result<()> {
.expect("Could not make tokio runtime")
.block_on(async {
tracing_subscriber::registry()
.with(EnvFilter::from_default_env())
folex marked this conversation as resolved.
Show resolved Hide resolved
.with(log_layer(&config.log))
.with(tokio_console_layer(&config.console)?)
.with(tracing_layer(&config.tracing)?)
Expand Down