From 5999ea54eff63308a28ad40c77f3c9f3067ad711 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 1 Nov 2023 13:38:42 +0300 Subject: [PATCH] chore: Fix tests thread count (#1868) --- .config/nextest.toml | 1 - Cargo.lock | 1 + .../connected-client/src/connected_client.rs | 64 ++-- crates/local-vm/Cargo.toml | 1 + crates/local-vm/src/local_vm.rs | 7 +- crates/nox-tests/tests/builtin.rs | 346 ++++++++++-------- crates/nox-tests/tests/local_vm.rs | 8 +- .../nox-tests/tests/network/loop_topology.rs | 120 +++--- .../tests/network/network_explore.rs | 57 +-- crates/nox-tests/tests/services.rs | 4 +- crates/nox-tests/tests/spells.rs | 144 ++++---- crates/nox-tests/tests/tetraplets.rs | 2 +- crates/nox-tests/tests/topology.rs | 45 ++- crates/nox-tests/tests/vault.rs | 72 ++-- 14 files changed, 475 insertions(+), 397 deletions(-) diff --git a/.config/nextest.toml b/.config/nextest.toml index c490634380..58b969139c 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -22,7 +22,6 @@ retries = { backoff = "exponential", count = 3, delay = "5s", jitter = true } [profile.ci] fail-fast = false -test-threads = 10 [profile.ci.junit] path = "junit.xml" diff --git a/Cargo.lock b/Cargo.lock index ec3bea9018..2a4e643eb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3727,6 +3727,7 @@ dependencies = [ "particle-execution", "particle-protocol", "serde_json", + "tokio", "tracing", "uuid-utils", ] diff --git a/crates/connected-client/src/connected_client.rs b/crates/connected-client/src/connected_client.rs index d75013f0a0..8f909a1d8c 100644 --- a/crates/connected-client/src/connected_client.rs +++ b/crates/connected-client/src/connected_client.rs @@ -15,7 +15,7 @@ */ use core::ops::Deref; -use std::{cell::LazyCell, collections::HashMap, ops::DerefMut, time::Duration}; +use std::{collections::HashMap, ops::DerefMut, time::Duration}; use eyre::Result; use eyre::{bail, eyre, WrapErr}; @@ -23,7 +23,6 @@ use fluence_keypair::KeyPair; use fluence_libp2p::Transport; use libp2p::{core::Multiaddr, PeerId}; use local_vm::{make_particle, make_vm, read_args, DataStoreError}; -use parking_lot::Mutex; use particle_protocol::Particle; use serde_json::{Value as JValue, Value}; use test_constants::{KAD_TIMEOUT, PARTICLE_TTL, SHORT_TIMEOUT, TIMEOUT, TRANSPORT_TIMEOUT}; @@ -41,7 +40,7 @@ pub struct ConnectedClient { pub timeout: Duration, pub short_timeout: Duration, pub kad_timeout: Duration, - pub local_vm: LazyCell, Box Mutex>>, + pub local_vm: tokio::sync::OnceCell>, pub particle_ttl: Duration, } @@ -142,16 +141,19 @@ impl ConnectedClient { Ok(result) } + pub async fn get_local_vm(&self) -> &tokio::sync::Mutex { + let peer_id = self.client.peer_id; + self.local_vm + .get_or_init(|| async { tokio::sync::Mutex::new(make_vm(peer_id)) }) + .await + } pub fn new( client: Client, node: PeerId, node_address: Multiaddr, particle_ttl: Option, ) -> Self { - let peer_id = client.peer_id; - let f: Box Mutex> = Box::new(move || Mutex::new(make_vm(peer_id))); - let local_vm = LazyCell::new(f); - + let local_vm = tokio::sync::OnceCell::const_new(); Self { client, node, @@ -168,12 +170,12 @@ impl ConnectedClient { self.client.send(particle, self.node) } - pub fn send_particle( + pub async fn send_particle( &mut self, script: impl Into, data: HashMap<&str, JValue>, ) -> String { - self.send_particle_ext(script, data, false) + self.send_particle_ext(script, data, false).await } pub async fn execute_particle( @@ -181,11 +183,11 @@ impl ConnectedClient { script: impl Into, data: HashMap<&str, JValue>, ) -> Result> { - let particle_id = self.send_particle_ext(script, data, false); + let particle_id = self.send_particle_ext(script, data, false).await; self.wait_particle_args(particle_id.clone()).await } - pub fn send_particle_ext( + pub async fn send_particle_ext( &mut self, script: impl Into, data: HashMap<&str, JValue>, @@ -195,16 +197,18 @@ impl ConnectedClient { .into_iter() .map(|(key, value)| (key.to_string(), value)) .collect(); + let mut guard = self.get_local_vm().await.lock().await; let particle = make_particle( self.peer_id, &data, script.into(), self.node, - &mut self.local_vm.lock(), + &mut guard, generated, self.particle_ttl(), &self.key_pair, - ); + ) + .await; let id = particle.id.clone(); self.send(particle); id @@ -247,12 +251,8 @@ impl ConnectedClient { pub async fn receive_args(&mut self) -> Result> { let particle = self.receive().await.wrap_err("receive_args")?; - let result = read_args( - particle, - self.peer_id, - &mut self.local_vm.lock(), - &self.key_pair, - ); + let mut guard = self.get_local_vm().await.lock().await; + let result = read_args(particle, self.peer_id, &mut guard, &self.key_pair).await; match result { Some(result) => result.map_err(|args| eyre!("AIR caught an error: {:?}", args)), None => Err(eyre!("Received a particle, but it didn't return anything")), @@ -272,12 +272,9 @@ impl ConnectedClient { match head { Some(index) => { let particle = self.fetched.remove(index); - let result = read_args( - particle, - self.peer_id, - &mut self.local_vm.lock(), - &self.key_pair, - ); + let mut guard = self.get_local_vm().await.lock().await; + let result = read_args(particle, self.peer_id, &mut guard, &self.key_pair).await; + drop(guard); if let Some(result) = result { result.map_err(|args| eyre!("AIR caught an error: {:?}", args)) } else { @@ -298,12 +295,9 @@ impl ConnectedClient { let particle = self.raw_receive().await.ok(); if let Some(particle) = particle { if particle.id == particle_id.as_ref() { - let result = read_args( - particle, - self.peer_id, - &mut self.local_vm.lock(), - &self.key_pair, - ); + let mut guard = self.get_local_vm().await.lock().await; + let result = + read_args(particle, self.peer_id, &mut guard, &self.key_pair).await; if let Some(result) = result { break result.map_err(|args| eyre!("AIR caught an error: {:?}", args)); } @@ -327,12 +321,8 @@ impl ConnectedClient { let particle = self.receive().await.ok(); if let Some(particle) = particle { - let args = read_args( - particle, - self.peer_id, - &mut self.local_vm.lock(), - &self.key_pair, - ); + let mut guard = self.get_local_vm().await.lock().await; + let args = read_args(particle, self.peer_id, &mut guard, &self.key_pair).await; if let Some(args) = args { return f(args); } diff --git a/crates/local-vm/Cargo.toml b/crates/local-vm/Cargo.toml index 0a344036a3..c225c9b869 100644 --- a/crates/local-vm/Cargo.toml +++ b/crates/local-vm/Cargo.toml @@ -28,3 +28,4 @@ parking_lot = { workspace = true } maplit = { workspace = true } log = { workspace = true } tracing = { workspace = true } +tokio = { workspace = true } diff --git a/crates/local-vm/src/local_vm.rs b/crates/local-vm/src/local_vm.rs index 584ab1143f..9d889ad9ed 100644 --- a/crates/local-vm/src/local_vm.rs +++ b/crates/local-vm/src/local_vm.rs @@ -247,7 +247,7 @@ pub fn wrap_script( } #[allow(clippy::too_many_arguments)] -pub fn make_particle( +pub async fn make_particle( peer_id: PeerId, service_in: &HashMap, script: String, @@ -299,6 +299,7 @@ pub fn make_particle( let result = host_call(service_in, args); call_results.insert(id, result.0); } + tokio::task::yield_now().await; } tracing::info!(particle_id = id, "Made a particle"); @@ -318,7 +319,7 @@ pub fn make_particle( particle } -pub fn read_args( +pub async fn read_args( particle: Particle, peer_id: PeerId, local_vm: &mut AVM, @@ -347,7 +348,6 @@ pub fn read_args( key_pair, ) .expect("execute & make particle"); - particle_data = data; call_results = <_>::default(); @@ -363,5 +363,6 @@ pub fn read_args( return returned; } } + tokio::task::yield_now().await; } } diff --git a/crates/nox-tests/tests/builtin.rs b/crates/nox-tests/tests/builtin.rs index e7ae077f90..67ee94f9ce 100644 --- a/crates/nox-tests/tests/builtin.rs +++ b/crates/nox-tests/tests/builtin.rs @@ -60,18 +60,20 @@ async fn identify() { .wrap_err("connect client") .unwrap(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (call relay ("peer" "identify") [] info) (call client ("op" "return") [info]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + }, + ) + .await; let info = client .receive_args() @@ -119,8 +121,9 @@ async fn remove_service() { ) .await; - client.send_particle( - r#" + client + .send_particle( + r#" (seq (seq (call relay ("srv" "list") [] list_before) @@ -132,11 +135,12 @@ async fn remove_service() { ) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + }, + ) + .await; use serde_json::Value::Array; @@ -165,8 +169,9 @@ async fn remove_service_restart() { ) .await; - client.send_particle( - r#" + client + .send_particle( + r#" (seq (seq (call relay ("srv" "list") [] list_before) @@ -178,11 +183,12 @@ async fn remove_service_restart() { ) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + }, + ) + .await; use serde_json::Value::Array; @@ -204,18 +210,20 @@ async fn remove_service_restart() { .wrap_err("connect client") .unwrap(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (call relay ("srv" "list") [] list_after) (call %init_peer_id% ("op" "return") [list_after]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + }, + ) + .await; if let [Array(after)] = client.receive_args().await.unwrap().as_slice() { assert_eq!(after.len(), 0); @@ -243,8 +251,9 @@ async fn remove_service_by_alias() { ) .await; - client.send_particle( - r#" + client + .send_particle( + r#" (seq (seq (call relay ("srv" "add_alias") [alias service]) @@ -259,12 +268,13 @@ async fn remove_service_by_alias() { ) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - "alias" => json!("some_alias".to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + "alias" => json!("some_alias".to_string()), + }, + ) + .await; use serde_json::Value::Array; @@ -297,8 +307,9 @@ async fn non_owner_remove_service() { ) .await; - client2.send_particle( - r#" + let args = client2 + .execute_particle( + r#" (seq (seq (call relay ("srv" "list") [] list_before) @@ -313,15 +324,16 @@ async fn non_owner_remove_service() { ) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + }, + ) + .await + .unwrap(); use serde_json::Value::{Array, String}; - let args = client2.receive_args().await.unwrap(); if let [Array(before), Array(after), String(error)] = args.as_slice() { assert_eq!(before.len(), 1); assert_eq!(after.len(), 1); @@ -352,8 +364,9 @@ async fn resolve_alias() { ) .await; - client.send_particle( - r#" + client + .send_particle( + r#" (seq (seq (call relay ("srv" "add_alias") [alias service]) @@ -362,12 +375,13 @@ async fn resolve_alias() { (call %init_peer_id% ("op" "return") [result]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - "alias" => json!("some_alias".to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + "alias" => json!("some_alias".to_string()), + }, + ) + .await; let service_id = client .receive_args() @@ -389,8 +403,9 @@ async fn resolve_alias_not_exists() { .wrap_err("connect client") .unwrap(); - client.send_particle( - r#" + client + .send_particle( + r#" (xor (seq (call relay ("srv" "resolve_alias") [alias] result) @@ -399,11 +414,12 @@ async fn resolve_alias_not_exists() { (call %init_peer_id% ("op" "return") [%last_error%.$.instruction]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "alias" => json!("some_alias".to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "alias" => json!("some_alias".to_string()), + }, + ) + .await; let error = client .receive_args() @@ -437,8 +453,9 @@ async fn resolve_alias_opt() { ) .await; - client.send_particle( - r#" + client + .send_particle( + r#" (seq (seq (call relay ("srv" "add_alias") [alias service]) @@ -447,12 +464,13 @@ async fn resolve_alias_opt() { (call %init_peer_id% ("op" "return") [result.$.[0]!]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - "alias" => json!("some_alias".to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + "alias" => json!("some_alias".to_string()), + }, + ) + .await; let service_id = client .receive_args() @@ -474,8 +492,9 @@ async fn resolve_alias_opt_not_exists() { .wrap_err("connect client") .unwrap(); - client.send_particle( - r#" + client + .send_particle( + r#" (xor (seq (call relay ("srv" "resolve_alias_opt") [alias] result) @@ -484,11 +503,12 @@ async fn resolve_alias_opt_not_exists() { (call %init_peer_id% ("op" "return") [%last_error%.$.instruction]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "alias" => json!("some_alias".to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "alias" => json!("some_alias".to_string()), + }, + ) + .await; let result = client .receive_args() @@ -520,8 +540,9 @@ async fn resolve_alias_removed() { ) .await; - client.send_particle( - r#" + client + .send_particle( + r#" (xor (seq (seq @@ -533,12 +554,13 @@ async fn resolve_alias_removed() { (call %init_peer_id% ("op" "return") [%last_error%.$.instruction]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - "alias" => json!("some_alias".to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + "alias" => json!("some_alias".to_string()), + }, + ) + .await; let error = client .receive_args() @@ -562,18 +584,20 @@ async fn timestamp_ms() { .wrap_err("connect client") .unwrap(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (call relay ("peer" "timestamp_ms") [] result) (call client ("op" "return") [result]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + }, + ) + .await; let result = client .receive_args() @@ -593,18 +617,20 @@ async fn timestamp_sec() { .wrap_err("connect client") .unwrap(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (call relay ("peer" "timestamp_sec") [] result) (call client ("op" "return") [result]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + }, + ) + .await; let result = client .receive_args() @@ -1384,18 +1410,20 @@ async fn service_mem() { ) .await; - client.send_particle( - r#" + client + .send_particle( + r#" (seq (call relay ("stat" "service_memory") [service] memory_stat) (call %init_peer_id% ("op" "return") [memory_stat]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + }, + ) + .await; use serde_json::Value::Array; @@ -1422,8 +1450,9 @@ async fn service_stats() { ) .await; - let particle_id = client.send_particle( - r#" + let particle_id = client + .send_particle( + r#" (seq (seq (call relay (service "not") [true] result) @@ -1435,30 +1464,33 @@ async fn service_stats() { (call %init_peer_id% ("op" "return") []) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - "key" => json!("keeeyyy"), - "bigstring" => json!("a".repeat(100_000)), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + "key" => json!("keeeyyy"), + "bigstring" => json!("a".repeat(100_000)), + }, + ) + .await; client .wait_particle_args(particle_id) .await .expect("receive particle"); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (call relay ("stat" "service_stat") [service] stat) (call %init_peer_id% ("op" "return") [stat]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + }, + ) + .await; if let Ok([result]) = client.receive_args().await.as_deref() { assert_eq!(result.get("error"), Some(&json!(""))); @@ -1522,18 +1554,20 @@ async fn service_stats_uninitialized() { ) .await; - client.send_particle( - r#" + client + .send_particle( + r#" (seq (call relay ("stat" "service_stat") [service] stat) (call %init_peer_id% ("op" "return") [stat]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + }, + ) + .await; use serde_json::Value::Object; @@ -1675,7 +1709,7 @@ async fn sign_invalid_tetraplets() { "wrong_peer" => json!(wrong_peer), "array" => json!(vec![1u8, 2u8, 3u8]) }, - ); + ).await; use serde_json::Value::String; @@ -1724,7 +1758,7 @@ async fn sig_verify_invalid_signature() { "invalid_signature" => json!(vec![1u8, 2u8, 3u8]), "invalid_data" => json!(vec![3u8, 2u8, 1u8]) }, - ); + ).await; use serde_json::Value::Bool; @@ -1910,7 +1944,7 @@ async fn insecure_sign_verify() { hashmap! { "relay" => json!(client.node.to_string()), }, - ); + ).await; use serde_json::Value::Array; use serde_json::Value::Bool; @@ -2043,8 +2077,9 @@ async fn add_alias_list() { .await; let alias = "tetraplets".to_string(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (seq (call relay ("srv" "add_alias") [alias service]) @@ -2053,12 +2088,13 @@ async fn add_alias_list() { (call %init_peer_id% ("op" "return") [list]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - "alias" => json!(alias.clone()) - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + "alias" => json!(alias.clone()) + }, + ) + .await; use serde_json::Value::Array; @@ -2101,8 +2137,9 @@ async fn aliases_restart() { .await; let alias = "tetraplets".to_string(); - client.send_particle( - r#" + client + .send_particle( + r#" (xor (seq (call relay ("srv" "add_alias") [alias service]) @@ -2111,12 +2148,13 @@ async fn aliases_restart() { (call %init_peer_id% ("op" "return") [%last_error%.$.instruction]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - "alias" => json!(alias.clone()) - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + "alias" => json!(alias.clone()) + }, + ) + .await; if let [JValue::String(result)] = client.receive_args().await.unwrap().as_slice() { assert_eq!(*result, "ok"); @@ -2138,18 +2176,20 @@ async fn aliases_restart() { .wrap_err("connect client") .unwrap(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (call relay ("srv" "list") [] list_after) (call %init_peer_id% ("op" "return") [list_after]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + }, + ) + .await; if let [Array(after)] = client.receive_args().await.unwrap().as_slice() { assert_eq!(after.len(), 1); @@ -2243,7 +2283,7 @@ async fn subnet_resolve() { hashmap! { "relay" => json!(client.node.to_string()), }, - ); + ).await; let mut result = client.receive_args().await.unwrap(); diff --git a/crates/nox-tests/tests/local_vm.rs b/crates/nox-tests/tests/local_vm.rs index d75f3eb9cb..20c050d4e2 100644 --- a/crates/nox-tests/tests/local_vm.rs +++ b/crates/nox-tests/tests/local_vm.rs @@ -26,8 +26,8 @@ use serde_json::json; use local_vm::{make_particle, make_vm, read_args}; -#[test] -fn make() { +#[tokio::test] +async fn make() { let keypair_a = KeyPair::generate_ed25519(); let keypair_b = KeyPair::generate_ed25519(); let client_a = keypair_a.get_peer_id(); @@ -58,9 +58,11 @@ fn make() { false, Duration::from_secs(20), &keypair_a, - ); + ) + .await; let args = read_args(particle, client_b, &mut local_vm_b, &keypair_b) + .await .expect("read args") .expect("read args"); assert_eq!(data["a"], args[0]); diff --git a/crates/nox-tests/tests/network/loop_topology.rs b/crates/nox-tests/tests/network/loop_topology.rs index f41df33cf9..f607632fc0 100644 --- a/crates/nox-tests/tests/network/loop_topology.rs +++ b/crates/nox-tests/tests/network/loop_topology.rs @@ -69,14 +69,16 @@ async fn abuse_fold(air: &str) -> Abuse { println!("elems {}", json!(elems)); - client.send_particle( - air, - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - "permutations" => json!(elems), - }, - ); + client + .send_particle( + air, + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + "permutations" => json!(elems), + }, + ) + .await; client.timeout = Duration::from_secs(1); @@ -286,9 +288,10 @@ async fn fold_par_same_node_stream() { client.timeout = Duration::from_secs(200); client.particle_ttl = Duration::from_secs(400); - client.send_particle( - format!( - r#" + client + .send_particle( + format!( + r#" (seq (seq (null) @@ -332,16 +335,17 @@ async fn fold_par_same_node_stream() { ) ) "#, - join_stream("result", "relay", "flat_length", "joined_result") + join_stream("result", "relay", "flat_length", "joined_result") + ) + .as_str(), + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + "permutations" => json!(permutations), + "flat_length" => json!(flat.len()) + }, ) - .as_str(), - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - "permutations" => json!(permutations), - "flat_length" => json!(flat.len()) - }, - ); + .await; let mut args = client .receive_args() @@ -383,8 +387,9 @@ async fn fold_fold_seq_join() { let flat: Vec<_> = array.iter().flatten().copied().collect(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (seq (fold array chars @@ -430,12 +435,13 @@ async fn fold_fold_seq_join() { ) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "array" => json!(array), - "flat_length" => json!(flat.len()) - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "array" => json!(array), + "flat_length" => json!(flat.len()) + }, + ) + .await; let mut args = client.receive_args().await.expect("receive args"); let can = args.remove(0); @@ -525,7 +531,7 @@ async fn fold_fold_pairs_seq_join() { "array" => json!(array), "flat_length" => json!(flat.len()) }, - ); + ).await; let mut args = client.receive_args().await.expect("receive args"); let can = args.remove(0); @@ -548,8 +554,9 @@ async fn fold_seq_join() { let array: Vec<_> = (1..10).collect(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (seq (fold array e @@ -576,12 +583,13 @@ async fn fold_seq_join() { ) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "array" => json!(array), - "array_length" => json!(array.len()) - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "array" => json!(array), + "array_length" => json!(array.len()) + }, + ) + .await; let arg = client.receive_args().await.expect("receive args").remove(0); let can: Vec = serde_json::from_value(arg).unwrap(); @@ -728,7 +736,7 @@ async fn fold_null_seq_same_node_stream() { "permutations" => json!(permutations), "flat_length" => json!(flat.len()) }, - ); + ).await; let mut args = client .receive_args() @@ -760,8 +768,9 @@ async fn fold_via() { .wrap_err("connect client") .unwrap(); - client.send_particle_ext( - r#" + client + .send_particle_ext( + r#" (xor (seq (seq @@ -821,13 +830,14 @@ async fn fold_via() { (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3]) ) "#, - hashmap! { - "-relay-" => json!(client.node.to_base58()), - "node_id" => json!(client.node.to_base58()), - "viaAr" => json!(swarms.iter().map(|s| s.peer_id.to_string()).collect::>()), - }, - true, - ); + hashmap! { + "-relay-" => json!(client.node.to_base58()), + "node_id" => json!(client.node.to_base58()), + "viaAr" => json!(swarms.iter().map(|s| s.peer_id.to_string()).collect::>()), + }, + true, + ) + .await; client.receive().await.unwrap(); } @@ -841,8 +851,9 @@ async fn join_empty_stream() { .wrap_err("connect client") .unwrap(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (xor (call relay ("op" "noop") []) @@ -854,11 +865,12 @@ async fn join_empty_stream() { ) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "nodes" => json!(swarms.iter().map(|s| s.peer_id.to_base58()).collect::>()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "nodes" => json!(swarms.iter().map(|s| s.peer_id.to_base58()).collect::>()), + }, + ) + .await; let err = client.receive_args().await.expect_err("receive error"); assert_eq!( diff --git a/crates/nox-tests/tests/network/network_explore.rs b/crates/nox-tests/tests/network/network_explore.rs index 442769900e..4d1f5780c8 100644 --- a/crates/nox-tests/tests/network/network_explore.rs +++ b/crates/nox-tests/tests/network/network_explore.rs @@ -85,8 +85,9 @@ async fn get_interfaces() { ) .await; - client.send_particle( - r#" + client + .send_particle( + r#" (seq (seq (call relay ("srv" "list") [] services) @@ -103,11 +104,12 @@ async fn get_interfaces() { ) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + }, + ) + .await; let args = client .receive_args() @@ -163,7 +165,7 @@ async fn get_modules() { "relay" => json!(client.node.to_string()), "client" => json!(client.peer_id.to_string()), }, - ); + ).await; let value = client .receive_args() @@ -195,8 +197,9 @@ async fn list_blueprints() { let bytes = b"module"; let module_hash = Hash::new(bytes).unwrap().to_string(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (call relay ("dist" "add_module") [module_bytes module_config] module_hash) (seq @@ -211,15 +214,16 @@ async fn list_blueprints() { ) ) "#, - hashmap! { - "module_bytes" => json!(base64.encode(bytes)), - "module_config" => json!(module_config("module")), - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - "name" => json!("blueprint"), - "dependencies" => json!(vec![module_hash.clone()]) , - }, - ); + hashmap! { + "module_bytes" => json!(base64.encode(bytes)), + "module_config" => json!(module_config("module")), + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + "name" => json!("blueprint"), + "dependencies" => json!(vec![module_hash.clone()]) , + }, + ) + .await; let args = client .receive_args() @@ -389,7 +393,7 @@ async fn explore_services_fixed_flaky() { "relayId" => json!(client.node.to_string()), }; - client.send_particle(script, data); + client.send_particle(script, data).await; let now = Instant::now(); let tout = Duration::from_secs(10); @@ -400,14 +404,11 @@ async fn explore_services_fixed_flaky() { if let Ok(Some(event)) = timeout(Duration::from_secs(1), receive_task).await { match event { ClientEvent::Particle { particle, .. } => { - let args = read_args( - particle, - client.peer_id, - &mut client.local_vm.lock(), - &client.key_pair, - ) - .expect("read args") - .expect("no error"); + let mut guard = client.get_local_vm().await.lock().await; + let args = read_args(particle, client.peer_id, &mut guard, &client.key_pair) + .await + .expect("read args") + .expect("no error"); received.push(args); } ClientEvent::NewConnection { .. } => {} diff --git a/crates/nox-tests/tests/services.rs b/crates/nox-tests/tests/services.rs index 23e16caca6..8921fef7ca 100644 --- a/crates/nox-tests/tests/services.rs +++ b/crates/nox-tests/tests/services.rs @@ -269,7 +269,7 @@ async fn create_service_from_config() { "config" => config, "module_bytes" => json!(base64.encode(module)), }; - client.send_particle_ext(script, data, true); + client.send_particle_ext(script, data, true).await; let result = client.receive_args().await.expect("receive"); if let [JValue::String(service_id)] = &result[..] { let result = client @@ -436,7 +436,7 @@ async fn handle_same_dir_in_preopens_and_mapped_dirs() { "config" => config, "module_bytes" => json!(base64.encode(module)), }; - client.send_particle_ext(script, data, true); + client.send_particle_ext(script, data, true).await; let result = client.receive_args().await; if result.is_ok() { panic!("expected error for module with invalid config") diff --git a/crates/nox-tests/tests/spells.rs b/crates/nox-tests/tests/spells.rs index 4e875e4ab4..85f853b301 100644 --- a/crates/nox-tests/tests/spells.rs +++ b/crates/nox-tests/tests/spells.rs @@ -1389,13 +1389,15 @@ async fn resolve_global_alias() { ) .await; - client.send_particle( - r#"(call relay ("srv" "add_alias") ["alias" service])"#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - }, - ); + client + .send_particle( + r#"(call relay ("srv" "add_alias") ["alias" service])"#, + hashmap! { + "relay" => json!(client.node.to_string()), + "service" => json!(tetraplets_service.id), + }, + ) + .await; let script = format!( r#" @@ -1518,8 +1520,9 @@ async fn spell_create_worker_twice() { "client" => json!(client.peer_id.to_string()), "relay" => json!(client.node.to_string()), }; - client.send_particle( - r#" + client + .send_particle( + r#" (xor (seq (seq @@ -1533,8 +1536,9 @@ async fn spell_create_worker_twice() { ) (call client ("return" "") [%last_error%.$.message worker_peer_id get_worker_peer_id]) )"#, - data.clone(), - ); + data.clone(), + ) + .await; let response = client.receive_args().await.wrap_err("receive").unwrap(); let error_msg = response[0].as_str().unwrap().to_string(); @@ -1669,8 +1673,9 @@ async fn create_remove_worker() { "spell_id" => json!(spell_id.clone()), "srv_id" => json!(service.id.clone()), }; - client.send_particle( - r#" + client + .send_particle( + r#" (xor (seq (seq @@ -1697,8 +1702,9 @@ async fn create_remove_worker() { (call client ("return" "") [%last_error%.$.message]) ) "#, - data.clone(), - ); + data.clone(), + ) + .await; if let [JValue::Array(before), JValue::String(spell_err), JValue::String(srv_err)] = client .receive_args() @@ -1754,8 +1760,9 @@ async fn spell_update_trigger_by_alias() { new_config.connections.connect = true; new_config.connections.disconnect = true; - let id = client.send_particle( - r#"(seq + let id = client + .send_particle( + r#"(seq (seq (call relay ("op" "noop") []) (call worker ("srv" "add_alias") ["alias" spell_id]) @@ -1765,13 +1772,14 @@ async fn spell_update_trigger_by_alias() { (call %init_peer_id% ("return" "") ["ok"]) ) )"#, - hashmap! { - "relay" => json!(client.node.to_string()), - "worker" => json!(worker), - "spell_id" => json!(spell_id.clone()), - "config" => json!(new_config) - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "worker" => json!(worker), + "spell_id" => json!(spell_id.clone()), + "config" => json!(new_config) + }, + ) + .await; client.wait_particle_args(id).await.unwrap(); @@ -1820,16 +1828,18 @@ async fn test_worker_list() { let worker_id1 = create_worker(&mut client, Some("deal_id1".to_string())).await; let worker_id2 = create_worker(&mut client, None).await; - client.send_particle( - r#"(seq + client + .send_particle( + r#"(seq (call relay ("worker" "list") [] result) (call client ("return" "") [result]) )"#, - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()) - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()) + }, + ) + .await; if let [JValue::Array(workers)] = client .receive_args() @@ -1889,8 +1899,9 @@ async fn test_spell_list() { ) .await; - client.send_particle( - r#"(seq + client + .send_particle( + r#"(seq (seq (call relay ("op" "noop") []) (seq @@ -1900,13 +1911,14 @@ async fn test_spell_list() { ) (call client ("return" "") [worker1_spells worker2_spells]) )"#, - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - "worker1" => json!(worker_id1), - "worker2" => json!(worker_id2), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + "worker1" => json!(worker_id1), + "worker2" => json!(worker_id2), + }, + ) + .await; if let [JValue::Array(worker1_spells), JValue::Array(worker2_spells)] = client .receive_args() @@ -2053,8 +2065,9 @@ async fn set_alias_by_worker_creator() { ) .await; - client.send_particle( - r#"(seq + client + .send_particle( + r#"(seq (seq (call relay ("op" "noop") []) (call worker ("srv" "add_alias") ["alias" service]) @@ -2064,13 +2077,14 @@ async fn set_alias_by_worker_creator() { (call client ("return" "") [resolved.$.[0]!]) ) )"#, - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - "service" => json!(tetraplets_service.id), - "worker" => json!(worker_id), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + "service" => json!(tetraplets_service.id), + "worker" => json!(worker_id), + }, + ) + .await; if let [JValue::String(resolved)] = client .receive_args() @@ -2111,19 +2125,21 @@ async fn test_decider_api_endpoint_rewrite() { .wrap_err("connect client") .unwrap(); - client.send_particle( - r#"(seq + client + .send_particle( + r#"(seq (call relay ("decider" "get_string") ["chain"] chain_info_str) (seq (call relay ("json" "parse") [chain_info_str.$.str] chain_info) (call client ("return" "") [chain_info.$.api_endpoint]) ) )"#, - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + }, + ) + .await; if let [JValue::String(endpoint)] = client .receive_args() @@ -2165,19 +2181,21 @@ async fn test_decider_api_endpoint_rewrite() { .wrap_err("connect client") .unwrap(); - client.send_particle( - r#"(seq + client + .send_particle( + r#"(seq (call relay ("decider" "get_string") ["chain"] chain_info_str) (seq (call relay ("json" "parse") [chain_info_str.$.str] chain_info) (call client ("return" "") [chain_info.$.api_endpoint]) ) )"#, - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + }, + ) + .await; if let [JValue::String(endpoint)] = client .receive_args() diff --git a/crates/nox-tests/tests/tetraplets.rs b/crates/nox-tests/tests/tetraplets.rs index b1c9947cea..d5e161f594 100644 --- a/crates/nox-tests/tests/tetraplets.rs +++ b/crates/nox-tests/tests/tetraplets.rs @@ -82,7 +82,7 @@ async fn test_tetraplets() { "service_id" => json!(tetraplets_service.id), }; - client.send_particle(script, data.clone()); + client.send_particle(script, data.clone()).await; let args = client.receive_args().await.wrap_err("receive").unwrap(); let mut args = args.into_iter(); diff --git a/crates/nox-tests/tests/topology.rs b/crates/nox-tests/tests/topology.rs index a5a1077f95..35ef94abb8 100644 --- a/crates/nox-tests/tests/topology.rs +++ b/crates/nox-tests/tests/topology.rs @@ -66,7 +66,8 @@ async fn identity() { "node_c" => json!(swarms[2].peer_id.to_string()), "client_b" => json!(b.peer_id.to_string()), }, - ); + ) + .await; b.receive().await.wrap_err("receive").unwrap(); } @@ -79,8 +80,9 @@ async fn init_peer_id() { .wrap_err("connect client") .unwrap(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (call relay ("kad" "neighborhood") [client] peers) (seq @@ -89,11 +91,12 @@ async fn init_peer_id() { ) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "client" => json!(client.peer_id.to_string()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + }, + ) + .await; client.receive().await.wrap_err("receive").unwrap(); } @@ -107,9 +110,10 @@ async fn join() { .wrap_err("connect client") .unwrap(); - client.send_particle( - format!( - r#" + client + .send_particle( + format!( + r#" (seq (seq (call relay ("op" "noop") []) @@ -132,15 +136,16 @@ async fn join() { ) ) "#, - join_stream("results", "%init_peer_id%", "len", "results"), - ), - hashmap! { - "nodes" => json!(swarms.iter().map(|s| s.peer_id.to_base58()).collect::>()), - "client" => json!(client.peer_id.to_string()), - "relay" => json!(client.node.to_string()), - "len" => json!(swarms.len()), - }, - ); + join_stream("results", "%init_peer_id%", "len", "results"), + ), + hashmap! { + "nodes" => json!(swarms.iter().map(|s| s.peer_id.to_base58()).collect::>()), + "client" => json!(client.peer_id.to_string()), + "relay" => json!(client.node.to_string()), + "len" => json!(swarms.len()), + }, + ) + .await; let received = client .listen_for_n(4, |peer_ids| { diff --git a/crates/nox-tests/tests/vault.rs b/crates/nox-tests/tests/vault.rs index 0b76869067..a4b2ded5c3 100644 --- a/crates/nox-tests/tests/vault.rs +++ b/crates/nox-tests/tests/vault.rs @@ -45,8 +45,9 @@ async fn share_file() { let first = create_file_share(&mut client).await; let second = create_file_share(&mut client).await; - client.send_particle( - r#" + client + .send_particle( + r#" (seq (call relay ("srv" "get_interface") [first] interface) (xor @@ -61,13 +62,14 @@ async fn share_file() { ) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "first" => json!(first.id), - "second" => json!(second.id), - "input_content" => json!("Hello!") - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "first" => json!(first.id), + "second" => json!(second.id), + "input_content" => json!("Hello!") + }, + ) + .await; use serde_json::Value::String; @@ -131,7 +133,7 @@ async fn deploy_from_vault() { "module" => json!(base64.encode(&module)), "q" => json!("\""), }, - ); + ).await; use serde_json::Value::String; @@ -154,8 +156,9 @@ async fn load_blueprint_from_vault() { // upload module let module = load_module("tests/file_share/artifacts", "file_share").expect("load module"); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (seq (call relay ("dist" "default_module_config") ["file_share"] config) @@ -164,11 +167,12 @@ async fn load_blueprint_from_vault() { (call %init_peer_id% ("op" "return") [hash]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "module" => json!(base64.encode(module)), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "module" => json!(base64.encode(module)), + }, + ) + .await; let args = client.receive_args().await.unwrap(); let module_hash = args[0].as_str().expect("single string"); @@ -182,8 +186,9 @@ async fn load_blueprint_from_vault() { ) .to_string() .unwrap(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (seq (call relay (first_service "create_vault_file") [blueprint_string] filename) @@ -201,12 +206,13 @@ async fn load_blueprint_from_vault() { ) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "first_service" => json!(file_share.id), - "blueprint_string" => json!(blueprint_string), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "first_service" => json!(file_share.id), + "blueprint_string" => json!(blueprint_string), + }, + ) + .await; use serde_json::Value::String; @@ -229,8 +235,9 @@ async fn put_cat_vault() { let payload = "test-test-test".to_string(); - client.send_particle( - r#" + client + .send_particle( + r#" (seq (seq (call relay ("vault" "put") [payload] filename) @@ -239,11 +246,12 @@ async fn put_cat_vault() { (call %init_peer_id% ("op" "return") [output_content]) ) "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "payload" => json!(payload.clone()), - }, - ); + hashmap! { + "relay" => json!(client.node.to_string()), + "payload" => json!(payload.clone()), + }, + ) + .await; use serde_json::Value::String;