Skip to content

Commit

Permalink
fix(cleanup): cleanup par + max batch size (#1967)
Browse files Browse the repository at this point in the history
* add tests

* add tests

* fix(cleanup): max cleanup size

* run cleanup in parallel

* review fixes

* review fixes

* review fixes

* review fixes

---------

Co-authored-by: folex <0xdxdy@gmail.com>
  • Loading branch information
gurinderu and folex committed Dec 19, 2023
1 parent 2d13c8a commit 1063eae
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 27 deletions.
1 change: 0 additions & 1 deletion aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ where
data_store: Arc<ParticleDataStore>,
deal_id: Option<String>,
) -> Self {
let particle = particle;
Self {
deadline: Deadline::from(particle),
functions,
Expand Down
43 changes: 25 additions & 18 deletions aquamarine/src/particle_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use std::time::Duration;
use avm_server::avm_runner::RawAVMOutcome;
use avm_server::{AnomalyData, CallResults, ParticleParameters};
use fluence_libp2p::PeerId;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use thiserror::Error;
use tracing::instrument;

Expand Down Expand Up @@ -105,25 +107,30 @@ impl ParticleDataStore {
Ok(data)
}

pub async fn batch_cleanup_data(&self, data: Vec<(String, PeerId)>) {
for (particle_id, peer_id) in data {
tracing::debug!(
target: "particle_reap",
particle_id = particle_id, worker_id = peer_id.to_string(),
"Reaping particle's actor"
);

if let Err(err) = self
.cleanup_data(particle_id.as_str(), peer_id.to_string().as_str())
.await
{
tracing::warn!(
particle_id = particle_id,
"Error cleaning up after particle {:?}",
err
pub async fn batch_cleanup_data(&self, cleanup_keys: Vec<(String, PeerId)>) {
let futures: FuturesUnordered<_> = cleanup_keys
.into_iter()
.map(|(particle_id, peer_id)| async move {
let peer_id = peer_id.to_string();
tracing::debug!(
target: "particle_reap",
particle_id = particle_id, worker_id = peer_id,
"Reaping particle's actor"
);
}
}

if let Err(err) = self
.cleanup_data(particle_id.as_str(), peer_id.as_str())
.await
{
tracing::warn!(
particle_id = particle_id,
"Error cleaning up after particle {:?}",
err
);
}
})
.collect();
let _results: Vec<_> = futures.collect().await;
}

async fn cleanup_data(&self, particle_id: &str, current_peer_id: &str) -> Result<()> {
Expand Down
25 changes: 18 additions & 7 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,17 @@ use crate::particle_functions::Functions;
use crate::vm_pool::VmPool;
use crate::ParticleDataStore;

/// particle signature is used as a particle id
#[derive(PartialEq, Hash, Eq)]
struct ParticleId(Vec<u8>);
struct ActorKey {
signature: Vec<u8>,
worker_id: PeerId,
}

const MAX_CLEANUP_KEYS_SIZE: usize = 1024;

pub struct Plumber<RT: AquaRuntime, F> {
events: VecDeque<Result<RoutingEffects, AquamarineApiError>>,
actors: HashMap<(ParticleId, PeerId), Actor<RT, F>>,
actors: HashMap<ActorKey, Actor<RT, F>>,
vm_pool: VmPool<RT>,
data_store: Arc<ParticleDataStore>,
builtins: F,
Expand Down Expand Up @@ -129,7 +133,10 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
}

let builtins = &self.builtins;
let key = (ParticleId(particle.particle.signature.clone()), worker_id);
let key = ActorKey {
signature: particle.particle.signature.clone(),
worker_id,
};
let entry = self.actors.entry(key);

let actor = match entry {
Expand Down Expand Up @@ -261,12 +268,16 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
// do not schedule task if another in progress
if self.cleanup_future.is_none() {
// Remove expired actors
let mut cleanup_keys: Vec<(String, PeerId)> = vec![];
let mut cleanup_keys: Vec<(String, PeerId)> = Vec::with_capacity(MAX_CLEANUP_KEYS_SIZE);
let now = now_ms();
self.actors.retain(|_, actor| {
// TODO: this code isn't optimal we continue iterate over actors if cleanup keys is full
// should be simpler to optimize it after fixing NET-632
// also delete fn actor.cleanup_key()
if cleanup_keys.len() >= MAX_CLEANUP_KEYS_SIZE {
return true;
}
// if actor hasn't yet expired or is still executing, keep it
// TODO: if actor is expired, cancel execution and return VM back to pool
// https://github.com/fluencelabs/fluence/issues/1212
if !actor.is_expired(now) || actor.is_executing() {
return true; // keep actor
}
Expand Down
2 changes: 1 addition & 1 deletion nox/src/effectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Effectors {
async move {
// resolve contact
if let Some(contact) = connectivity
.resolve_contact(target, &particle.as_ref())
.resolve_contact(target, particle.as_ref())
.await
{
// forward particle
Expand Down

0 comments on commit 1063eae

Please sign in to comment.