Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Fix tests thread count #1868

Merged
merged 18 commits into from
Nov 1, 2023
1 change: 0 additions & 1 deletion .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ retries = { backoff = "exponential", count = 3, delay = "5s", jitter = true }

[profile.ci]
fail-fast = false
test-threads = 10

[profile.ci.junit]
path = "junit.xml"
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 27 additions & 37 deletions crates/connected-client/src/connected_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
*/

use core::ops::Deref;
use std::{cell::LazyCell, collections::HashMap, ops::DerefMut, time::Duration};
use std::{collections::HashMap, ops::DerefMut, time::Duration};

use eyre::Result;
use eyre::{bail, eyre, WrapErr};
use fluence_keypair::KeyPair;
use fluence_libp2p::Transport;
use libp2p::{core::Multiaddr, PeerId};
use local_vm::{make_particle, make_vm, read_args, DataStoreError};
use parking_lot::Mutex;
use particle_protocol::Particle;
use serde_json::{Value as JValue, Value};
use test_constants::{KAD_TIMEOUT, PARTICLE_TTL, SHORT_TIMEOUT, TIMEOUT, TRANSPORT_TIMEOUT};
Expand All @@ -41,7 +40,7 @@ pub struct ConnectedClient {
pub timeout: Duration,
pub short_timeout: Duration,
pub kad_timeout: Duration,
pub local_vm: LazyCell<Mutex<AVM>, Box<dyn FnOnce() -> Mutex<AVM>>>,
pub local_vm: tokio::sync::OnceCell<tokio::sync::Mutex<AVM>>,
pub particle_ttl: Duration,
}

Expand Down Expand Up @@ -142,16 +141,19 @@ impl ConnectedClient {
Ok(result)
}

pub async fn get_local_vm(&self) -> &tokio::sync::Mutex<AVM> {
let peer_id = self.client.peer_id;
self.local_vm
.get_or_init(|| async { tokio::sync::Mutex::new(make_vm(peer_id)) })
.await
}
pub fn new(
client: Client,
node: PeerId,
node_address: Multiaddr,
particle_ttl: Option<Duration>,
) -> Self {
let peer_id = client.peer_id;
let f: Box<dyn FnOnce() -> Mutex<AVM>> = Box::new(move || Mutex::new(make_vm(peer_id)));
let local_vm = LazyCell::new(f);

let local_vm = tokio::sync::OnceCell::const_new();
Self {
client,
node,
Expand All @@ -168,24 +170,24 @@ impl ConnectedClient {
self.client.send(particle, self.node)
}

pub fn send_particle(
pub async fn send_particle(
&mut self,
script: impl Into<String>,
data: HashMap<&str, JValue>,
) -> String {
self.send_particle_ext(script, data, false)
self.send_particle_ext(script, data, false).await
}

pub async fn execute_particle(
&mut self,
script: impl Into<String>,
data: HashMap<&str, JValue>,
) -> Result<Vec<JValue>> {
let particle_id = self.send_particle_ext(script, data, false);
let particle_id = self.send_particle_ext(script, data, false).await;
self.wait_particle_args(particle_id.clone()).await
}

pub fn send_particle_ext(
pub async fn send_particle_ext(
&mut self,
script: impl Into<String>,
data: HashMap<&str, JValue>,
Expand All @@ -195,16 +197,18 @@ impl ConnectedClient {
.into_iter()
.map(|(key, value)| (key.to_string(), value))
.collect();
let mut guard = self.get_local_vm().await.lock().await;
let particle = make_particle(
self.peer_id,
&data,
script.into(),
self.node,
&mut self.local_vm.lock(),
&mut guard,
generated,
self.particle_ttl(),
&self.key_pair,
);
)
.await;
let id = particle.id.clone();
self.send(particle);
id
Expand Down Expand Up @@ -247,12 +251,8 @@ impl ConnectedClient {

pub async fn receive_args(&mut self) -> Result<Vec<JValue>> {
let particle = self.receive().await.wrap_err("receive_args")?;
let result = read_args(
particle,
self.peer_id,
&mut self.local_vm.lock(),
&self.key_pair,
);
let mut guard = self.get_local_vm().await.lock().await;
let result = read_args(particle, self.peer_id, &mut guard, &self.key_pair).await;
match result {
Some(result) => result.map_err(|args| eyre!("AIR caught an error: {:?}", args)),
None => Err(eyre!("Received a particle, but it didn't return anything")),
Expand All @@ -272,12 +272,9 @@ impl ConnectedClient {
match head {
Some(index) => {
let particle = self.fetched.remove(index);
let result = read_args(
particle,
self.peer_id,
&mut self.local_vm.lock(),
&self.key_pair,
);
let mut guard = self.get_local_vm().await.lock().await;
let result = read_args(particle, self.peer_id, &mut guard, &self.key_pair).await;
drop(guard);
if let Some(result) = result {
result.map_err(|args| eyre!("AIR caught an error: {:?}", args))
} else {
Expand All @@ -298,12 +295,9 @@ impl ConnectedClient {
let particle = self.raw_receive().await.ok();
if let Some(particle) = particle {
if particle.id == particle_id.as_ref() {
let result = read_args(
particle,
self.peer_id,
&mut self.local_vm.lock(),
&self.key_pair,
);
let mut guard = self.get_local_vm().await.lock().await;
let result =
read_args(particle, self.peer_id, &mut guard, &self.key_pair).await;
if let Some(result) = result {
break result.map_err(|args| eyre!("AIR caught an error: {:?}", args));
}
Expand All @@ -327,12 +321,8 @@ impl ConnectedClient {

let particle = self.receive().await.ok();
if let Some(particle) = particle {
let args = read_args(
particle,
self.peer_id,
&mut self.local_vm.lock(),
&self.key_pair,
);
let mut guard = self.get_local_vm().await.lock().await;
let args = read_args(particle, self.peer_id, &mut guard, &self.key_pair).await;
if let Some(args) = args {
return f(args);
}
Expand Down
1 change: 1 addition & 0 deletions crates/local-vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ parking_lot = { workspace = true }
maplit = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
7 changes: 4 additions & 3 deletions crates/local-vm/src/local_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ pub fn wrap_script(
}

#[allow(clippy::too_many_arguments)]
pub fn make_particle(
pub async fn make_particle(
peer_id: PeerId,
service_in: &HashMap<String, JValue>,
script: String,
Expand Down Expand Up @@ -299,6 +299,7 @@ pub fn make_particle(
let result = host_call(service_in, args);
call_results.insert(id, result.0);
}
tokio::task::yield_now().await;
}

tracing::info!(particle_id = id, "Made a particle");
Expand All @@ -318,7 +319,7 @@ pub fn make_particle(
particle
}

pub fn read_args(
pub async fn read_args(
particle: Particle,
peer_id: PeerId,
local_vm: &mut AVM<DataStoreError>,
Expand Down Expand Up @@ -347,7 +348,6 @@ pub fn read_args(
key_pair,
)
.expect("execute & make particle");

particle_data = data;
call_results = <_>::default();

Expand All @@ -363,5 +363,6 @@ pub fn read_args(
return returned;
}
}
tokio::task::yield_now().await;
}
}
Loading
Loading