Skip to content

Commit

Permalink
chore: Fix tests thread count (#1868)
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu committed Nov 1, 2023
1 parent ad3cdeb commit 5999ea5
Show file tree
Hide file tree
Showing 14 changed files with 475 additions and 397 deletions.
1 change: 0 additions & 1 deletion .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ retries = { backoff = "exponential", count = 3, delay = "5s", jitter = true }

[profile.ci]
fail-fast = false
test-threads = 10

[profile.ci.junit]
path = "junit.xml"
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 27 additions & 37 deletions crates/connected-client/src/connected_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
*/

use core::ops::Deref;
use std::{cell::LazyCell, collections::HashMap, ops::DerefMut, time::Duration};
use std::{collections::HashMap, ops::DerefMut, time::Duration};

use eyre::Result;
use eyre::{bail, eyre, WrapErr};
use fluence_keypair::KeyPair;
use fluence_libp2p::Transport;
use libp2p::{core::Multiaddr, PeerId};
use local_vm::{make_particle, make_vm, read_args, DataStoreError};
use parking_lot::Mutex;
use particle_protocol::Particle;
use serde_json::{Value as JValue, Value};
use test_constants::{KAD_TIMEOUT, PARTICLE_TTL, SHORT_TIMEOUT, TIMEOUT, TRANSPORT_TIMEOUT};
Expand All @@ -41,7 +40,7 @@ pub struct ConnectedClient {
pub timeout: Duration,
pub short_timeout: Duration,
pub kad_timeout: Duration,
pub local_vm: LazyCell<Mutex<AVM>, Box<dyn FnOnce() -> Mutex<AVM>>>,
pub local_vm: tokio::sync::OnceCell<tokio::sync::Mutex<AVM>>,
pub particle_ttl: Duration,
}

Expand Down Expand Up @@ -142,16 +141,19 @@ impl ConnectedClient {
Ok(result)
}

pub async fn get_local_vm(&self) -> &tokio::sync::Mutex<AVM> {
let peer_id = self.client.peer_id;
self.local_vm
.get_or_init(|| async { tokio::sync::Mutex::new(make_vm(peer_id)) })
.await
}
pub fn new(
client: Client,
node: PeerId,
node_address: Multiaddr,
particle_ttl: Option<Duration>,
) -> Self {
let peer_id = client.peer_id;
let f: Box<dyn FnOnce() -> Mutex<AVM>> = Box::new(move || Mutex::new(make_vm(peer_id)));
let local_vm = LazyCell::new(f);

let local_vm = tokio::sync::OnceCell::const_new();
Self {
client,
node,
Expand All @@ -168,24 +170,24 @@ impl ConnectedClient {
self.client.send(particle, self.node)
}

pub fn send_particle(
pub async fn send_particle(
&mut self,
script: impl Into<String>,
data: HashMap<&str, JValue>,
) -> String {
self.send_particle_ext(script, data, false)
self.send_particle_ext(script, data, false).await
}

pub async fn execute_particle(
&mut self,
script: impl Into<String>,
data: HashMap<&str, JValue>,
) -> Result<Vec<JValue>> {
let particle_id = self.send_particle_ext(script, data, false);
let particle_id = self.send_particle_ext(script, data, false).await;
self.wait_particle_args(particle_id.clone()).await
}

pub fn send_particle_ext(
pub async fn send_particle_ext(
&mut self,
script: impl Into<String>,
data: HashMap<&str, JValue>,
Expand All @@ -195,16 +197,18 @@ impl ConnectedClient {
.into_iter()
.map(|(key, value)| (key.to_string(), value))
.collect();
let mut guard = self.get_local_vm().await.lock().await;
let particle = make_particle(
self.peer_id,
&data,
script.into(),
self.node,
&mut self.local_vm.lock(),
&mut guard,
generated,
self.particle_ttl(),
&self.key_pair,
);
)
.await;
let id = particle.id.clone();
self.send(particle);
id
Expand Down Expand Up @@ -247,12 +251,8 @@ impl ConnectedClient {

pub async fn receive_args(&mut self) -> Result<Vec<JValue>> {
let particle = self.receive().await.wrap_err("receive_args")?;
let result = read_args(
particle,
self.peer_id,
&mut self.local_vm.lock(),
&self.key_pair,
);
let mut guard = self.get_local_vm().await.lock().await;
let result = read_args(particle, self.peer_id, &mut guard, &self.key_pair).await;
match result {
Some(result) => result.map_err(|args| eyre!("AIR caught an error: {:?}", args)),
None => Err(eyre!("Received a particle, but it didn't return anything")),
Expand All @@ -272,12 +272,9 @@ impl ConnectedClient {
match head {
Some(index) => {
let particle = self.fetched.remove(index);
let result = read_args(
particle,
self.peer_id,
&mut self.local_vm.lock(),
&self.key_pair,
);
let mut guard = self.get_local_vm().await.lock().await;
let result = read_args(particle, self.peer_id, &mut guard, &self.key_pair).await;
drop(guard);
if let Some(result) = result {
result.map_err(|args| eyre!("AIR caught an error: {:?}", args))
} else {
Expand All @@ -298,12 +295,9 @@ impl ConnectedClient {
let particle = self.raw_receive().await.ok();
if let Some(particle) = particle {
if particle.id == particle_id.as_ref() {
let result = read_args(
particle,
self.peer_id,
&mut self.local_vm.lock(),
&self.key_pair,
);
let mut guard = self.get_local_vm().await.lock().await;
let result =
read_args(particle, self.peer_id, &mut guard, &self.key_pair).await;
if let Some(result) = result {
break result.map_err(|args| eyre!("AIR caught an error: {:?}", args));
}
Expand All @@ -327,12 +321,8 @@ impl ConnectedClient {

let particle = self.receive().await.ok();
if let Some(particle) = particle {
let args = read_args(
particle,
self.peer_id,
&mut self.local_vm.lock(),
&self.key_pair,
);
let mut guard = self.get_local_vm().await.lock().await;
let args = read_args(particle, self.peer_id, &mut guard, &self.key_pair).await;
if let Some(args) = args {
return f(args);
}
Expand Down
1 change: 1 addition & 0 deletions crates/local-vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ parking_lot = { workspace = true }
maplit = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
7 changes: 4 additions & 3 deletions crates/local-vm/src/local_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ pub fn wrap_script(
}

#[allow(clippy::too_many_arguments)]
pub fn make_particle(
pub async fn make_particle(
peer_id: PeerId,
service_in: &HashMap<String, JValue>,
script: String,
Expand Down Expand Up @@ -299,6 +299,7 @@ pub fn make_particle(
let result = host_call(service_in, args);
call_results.insert(id, result.0);
}
tokio::task::yield_now().await;
}

tracing::info!(particle_id = id, "Made a particle");
Expand All @@ -318,7 +319,7 @@ pub fn make_particle(
particle
}

pub fn read_args(
pub async fn read_args(
particle: Particle,
peer_id: PeerId,
local_vm: &mut AVM<DataStoreError>,
Expand Down Expand Up @@ -347,7 +348,6 @@ pub fn read_args(
key_pair,
)
.expect("execute & make particle");

particle_data = data;
call_results = <_>::default();

Expand All @@ -363,5 +363,6 @@ pub fn read_args(
return returned;
}
}
tokio::task::yield_now().await;
}
}
Loading

0 comments on commit 5999ea5

Please sign in to comment.