Skip to content

Commit

Permalink
fix(cleanup): max cleanup size
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu committed Dec 18, 2023
1 parent 24a7e78 commit ce563f1
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,17 @@ use crate::particle_functions::Functions;
use crate::vm_pool::VmPool;
use crate::ParticleDataStore;

/// particle signature is used as a particle id
#[derive(PartialEq, Hash, Eq)]
struct ParticleId(Vec<u8>);
struct ActorKey {
signature: Vec<u8>,
peer_id: PeerId,
}

const MAX_CLEANUP_KEYS_SIZE: usize = 1024;

pub struct Plumber<RT: AquaRuntime, F> {
events: VecDeque<Result<RoutingEffects, AquamarineApiError>>,
actors: HashMap<(ParticleId, PeerId), Actor<RT, F>>,
actors: HashMap<ActorKey, Actor<RT, F>>,
vm_pool: VmPool<RT>,
data_store: Arc<ParticleDataStore>,
builtins: F,
Expand Down Expand Up @@ -126,7 +130,10 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
}

let builtins = &self.builtins;
let key = (ParticleId(particle.particle.signature.clone()), worker_id);
let key = ActorKey {
signature: particle.particle.signature.clone(),
peer_id: worker_id,
};
let entry = self.actors.entry(key);

let actor = match entry {
Expand Down Expand Up @@ -258,12 +265,16 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
// do not schedule task if another in progress
if self.cleanup_future.is_none() {
// Remove expired actors
let mut cleanup_keys: Vec<(String, PeerId)> = vec![];
let mut cleanup_keys: Vec<(String, PeerId)> = Vec::with_capacity(MAX_CLEANUP_KEYS_SIZE);
let now = now_ms();
self.actors.retain(|_, actor| {
// TODO: this code isn't optimal we continue iterate over actors if cleanup keys is full
// should be simpler to optimize it after fixing NET-632
// also delete fn actor.cleanup_key()
if cleanup_keys.len() >= MAX_CLEANUP_KEYS_SIZE {
return true;
}
// if actor hasn't yet expired or is still executing, keep it
// TODO: if actor is expired, cancel execution and return VM back to pool
// https://github.com/fluencelabs/fluence/issues/1212
if !actor.is_expired(now) || actor.is_executing() {
return true; // keep actor
}
Expand Down

0 comments on commit ce563f1

Please sign in to comment.