Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu committed Oct 30, 2023
1 parent a54a032 commit 91a17c0
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 15 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 14 additions & 12 deletions crates/connected-client/src/connected_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

use core::ops::Deref;
use std::{cell::LazyCell, collections::HashMap, ops::DerefMut, time::Duration};
use std::{collections::HashMap, ops::DerefMut, time::Duration};

use eyre::Result;
use eyre::{bail, eyre, WrapErr};
Expand All @@ -40,7 +40,7 @@ pub struct ConnectedClient {
pub timeout: Duration,
pub short_timeout: Duration,
pub kad_timeout: Duration,
pub local_vm: LazyCell<tokio::sync::Mutex<AVM>, Box<dyn FnOnce() -> tokio::sync::Mutex<AVM>>>,
pub local_vm: tokio::sync::OnceCell<tokio::sync::Mutex<AVM>>,
pub particle_ttl: Duration,
}

Expand Down Expand Up @@ -141,17 +141,19 @@ impl ConnectedClient {
Ok(result)
}

pub async fn get_local_vm(&self) -> &tokio::sync::Mutex<AVM> {
let peer_id = self.client.peer_id;
self.local_vm
.get_or_init(|| async { tokio::sync::Mutex::new(make_vm(peer_id)) })
.await
}
pub fn new(
client: Client,
node: PeerId,
node_address: Multiaddr,
particle_ttl: Option<Duration>,
) -> Self {
let peer_id = client.peer_id;
let f: Box<dyn FnOnce() -> tokio::sync::Mutex<AVM>> =
Box::new(move || tokio::sync::Mutex::new(make_vm(peer_id)));
let local_vm = LazyCell::new(f);

let local_vm = tokio::sync::OnceCell::const_new();
Self {
client,
node,
Expand Down Expand Up @@ -195,7 +197,7 @@ impl ConnectedClient {
.into_iter()
.map(|(key, value)| (key.to_string(), value))
.collect();
let mut guard = self.local_vm.lock().await;
let mut guard = self.get_local_vm().await.lock().await;
let particle = make_particle(
self.peer_id,
&data,
Expand Down Expand Up @@ -249,7 +251,7 @@ impl ConnectedClient {

pub async fn receive_args(&mut self) -> Result<Vec<JValue>> {
let particle = self.receive().await.wrap_err("receive_args")?;
let mut guard = self.local_vm.lock().await;
let mut guard = self.get_local_vm().await.lock().await;
let result = read_args(particle, self.peer_id, &mut guard, &self.key_pair).await;
match result {
Some(result) => result.map_err(|args| eyre!("AIR caught an error: {:?}", args)),
Expand All @@ -270,7 +272,7 @@ impl ConnectedClient {
match head {
Some(index) => {
let particle = self.fetched.remove(index);
let mut guard = self.local_vm.lock().await;
let mut guard = self.get_local_vm().await.lock().await;
let result = read_args(particle, self.peer_id, &mut guard, &self.key_pair).await;
drop(guard);
if let Some(result) = result {
Expand All @@ -293,7 +295,7 @@ impl ConnectedClient {
let particle = self.raw_receive().await.ok();
if let Some(particle) = particle {
if particle.id == particle_id.as_ref() {
let mut guard = self.local_vm.lock().await;
let mut guard = self.get_local_vm().await.lock().await;
let result =
read_args(particle, self.peer_id, &mut guard, &self.key_pair).await;
if let Some(result) = result {
Expand All @@ -319,7 +321,7 @@ impl ConnectedClient {

let particle = self.receive().await.ok();
if let Some(particle) = particle {
let mut guard = self.local_vm.lock().await;
let mut guard = self.get_local_vm().await.lock().await;
let args = read_args(particle, self.peer_id, &mut guard, &self.key_pair).await;
if let Some(args) = args {
return f(args);
Expand Down
1 change: 1 addition & 0 deletions crates/nox-tests/tests/network/loop_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ async fn fold_fold_fold_seq_two_par_null_folds_flaky() {

#[tokio::test]
async fn fold_par_same_node_stream() {
log_utils::enable_console();
let swarms = make_swarms(3).await;

let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
Expand Down
2 changes: 1 addition & 1 deletion crates/nox-tests/tests/network/network_explore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ async fn explore_services_fixed_flaky() {
if let Ok(Some(event)) = timeout(Duration::from_secs(1), receive_task).await {
match event {
ClientEvent::Particle { particle, .. } => {
let mut guard = client.local_vm.lock().await;
let mut guard = client.get_local_vm().await.lock().await;
let args = read_args(particle, client.peer_id, &mut guard, &client.key_pair)
.await
.expect("read args")
Expand Down

0 comments on commit 91a17c0

Please sign in to comment.