From 1063eae0211bb4803db29c75a104bdd5bce0121a Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 19 Dec 2023 22:27:06 +0300 Subject: [PATCH] fix(cleanup): cleanup par + max batch size (#1967) * add tests * add tests * fix(cleanup): max cleanup size * run cleanup in parallel * review fixes * review fixes * review fixes * review fixes --------- Co-authored-by: folex <0xdxdy@gmail.com> --- aquamarine/src/actor.rs | 1 - aquamarine/src/particle_data_store.rs | 43 ++++++++++++++++----------- aquamarine/src/plumber.rs | 25 +++++++++++----- nox/src/effectors.rs | 2 +- 4 files changed, 44 insertions(+), 27 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index 31b46e519f..09aadf622a 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -81,7 +81,6 @@ where data_store: Arc, deal_id: Option, ) -> Self { - let particle = particle; Self { deadline: Deadline::from(particle), functions, diff --git a/aquamarine/src/particle_data_store.rs b/aquamarine/src/particle_data_store.rs index f0f57ce4d4..25cd560c6a 100644 --- a/aquamarine/src/particle_data_store.rs +++ b/aquamarine/src/particle_data_store.rs @@ -22,6 +22,8 @@ use std::time::Duration; use avm_server::avm_runner::RawAVMOutcome; use avm_server::{AnomalyData, CallResults, ParticleParameters}; use fluence_libp2p::PeerId; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use thiserror::Error; use tracing::instrument; @@ -105,25 +107,30 @@ impl ParticleDataStore { Ok(data) } - pub async fn batch_cleanup_data(&self, data: Vec<(String, PeerId)>) { - for (particle_id, peer_id) in data { - tracing::debug!( - target: "particle_reap", - particle_id = particle_id, worker_id = peer_id.to_string(), - "Reaping particle's actor" - ); - - if let Err(err) = self - .cleanup_data(particle_id.as_str(), peer_id.to_string().as_str()) - .await - { - tracing::warn!( - particle_id = particle_id, - "Error cleaning up after particle {:?}", - err + pub async fn batch_cleanup_data(&self, cleanup_keys: Vec<(String, PeerId)>) { + let futures: FuturesUnordered<_> = cleanup_keys + .into_iter() + .map(|(particle_id, peer_id)| async move { + let peer_id = peer_id.to_string(); + tracing::debug!( + target: "particle_reap", + particle_id = particle_id, worker_id = peer_id, + "Reaping particle's actor" ); - } - } + + if let Err(err) = self + .cleanup_data(particle_id.as_str(), peer_id.as_str()) + .await + { + tracing::warn!( + particle_id = particle_id, + "Error cleaning up after particle {:?}", + err + ); + } + }) + .collect(); + let _results: Vec<_> = futures.collect().await; } async fn cleanup_data(&self, particle_id: &str, current_peer_id: &str) -> Result<()> { diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index 052f8b4990..88bfe5fccc 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -49,13 +49,17 @@ use crate::particle_functions::Functions; use crate::vm_pool::VmPool; use crate::ParticleDataStore; -/// particle signature is used as a particle id #[derive(PartialEq, Hash, Eq)] -struct ParticleId(Vec); +struct ActorKey { + signature: Vec, + worker_id: PeerId, +} + +const MAX_CLEANUP_KEYS_SIZE: usize = 1024; pub struct Plumber { events: VecDeque>, - actors: HashMap<(ParticleId, PeerId), Actor>, + actors: HashMap>, vm_pool: VmPool, data_store: Arc, builtins: F, @@ -129,7 +133,10 @@ impl Plumber { } let builtins = &self.builtins; - let key = (ParticleId(particle.particle.signature.clone()), worker_id); + let key = ActorKey { + signature: particle.particle.signature.clone(), + worker_id, + }; let entry = self.actors.entry(key); let actor = match entry { @@ -261,12 +268,16 @@ impl Plumber { // do not schedule task if another in progress if self.cleanup_future.is_none() { // Remove expired actors - let mut cleanup_keys: Vec<(String, PeerId)> = vec![]; + let mut cleanup_keys: Vec<(String, PeerId)> = Vec::with_capacity(MAX_CLEANUP_KEYS_SIZE); let now = now_ms(); self.actors.retain(|_, actor| { + // TODO: this code isn't optimal we continue iterate over actors if cleanup keys is full + // should be simpler to optimize it after fixing NET-632 + // also delete fn actor.cleanup_key() + if cleanup_keys.len() >= MAX_CLEANUP_KEYS_SIZE { + return true; + } // if actor hasn't yet expired or is still executing, keep it - // TODO: if actor is expired, cancel execution and return VM back to pool - // https://github.com/fluencelabs/fluence/issues/1212 if !actor.is_expired(now) || actor.is_executing() { return true; // keep actor } diff --git a/nox/src/effectors.rs b/nox/src/effectors.rs index d4533c394f..553326f109 100644 --- a/nox/src/effectors.rs +++ b/nox/src/effectors.rs @@ -51,7 +51,7 @@ impl Effectors { async move { // resolve contact if let Some(contact) = connectivity - .resolve_contact(target, &particle.as_ref()) + .resolve_contact(target, particle.as_ref()) .await { // forward particle