Skip to content

Commit

Permalink
feat(workers): Create a ThreadPool for each worker [fixes NET-688] (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu committed Jan 15, 2024
1 parent 717d190 commit fa5a17a
Show file tree
Hide file tree
Showing 22 changed files with 544 additions and 167 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions aquamarine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ log = { workspace = true }

tokio = { workspace = true, features = ["fs", "rt"] }
tokio-stream = { workspace = true }
tokio-util = { workspace = true, features = ["rt"] }
tracing = { workspace = true }
serde_json = { workspace = true }
parking_lot = { workspace = true }
Expand All @@ -41,6 +42,7 @@ bytesize = "1.3.0"
async-trait = "0.1.77"
health = { workspace = true }
config = { version = "0.13.4", features = [] }
enum_dispatch = "0.3.12"

[dev-dependencies]
tempfile = { workspace = true }
57 changes: 35 additions & 22 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ use std::{
};
use tracing::{instrument, Instrument, Span};

use fluence_keypair::KeyPair;
use fluence_libp2p::PeerId;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::{ExtendedParticle, Particle};

use crate::deadline::Deadline;
use crate::particle_effects::RoutingEffects;
use crate::particle_executor::{FutResult, ParticleExecutor};
use crate::particle_functions::{Functions, SingleCallStat};
use crate::spawner::{SpawnFunctions, Spawner};
use crate::{AquaRuntime, InterpretationStats, ParticleDataStore, ParticleEffects};
use fluence_keypair::KeyPair;
use fluence_libp2p::PeerId;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::{ExtendedParticle, Particle};

struct Reusables<RT> {
vm_id: usize,
Expand Down Expand Up @@ -65,6 +65,7 @@ pub struct Actor<RT, F> {
current_peer_id: PeerId,
key_pair: KeyPair,
data_store: Arc<ParticleDataStore>,
spawner: Spawner,
deal_id: Option<String>,
}

Expand All @@ -80,6 +81,7 @@ where
key_pair: KeyPair,
data_store: Arc<ParticleDataStore>,
deal_id: Option<String>,
spawner: Spawner,
) -> Self {
Self {
deadline: Deadline::from(particle),
Expand All @@ -95,6 +97,7 @@ where
current_peer_id,
key_pair,
data_store,
spawner,
deal_id,
}
}
Expand Down Expand Up @@ -156,9 +159,11 @@ where

self.future.take();

let spawner = self.spawner.clone();
let waker = cx.waker().clone();
// Schedule execution of functions
self.functions.execute(
spawner,
self.particle.id.clone(),
effects.call_requests,
waker,
Expand Down Expand Up @@ -227,24 +232,32 @@ where
let (async_span, linking_span) =
self.create_spans(call_spans, ext_particle, particle.id.as_str());

let spawner = self.spawner.clone();
self.future = Some(
async move {
let res = vm
.execute(data_store, (particle.clone(), calls), peer_id, key_pair)
.in_current_span()
.await;

waker.wake();

let reusables = Reusables {
vm_id,
vm: res.runtime,
};

(reusables, res.effects, res.stats, linking_span)
}
.instrument(async_span)
.boxed(),
self.spawner
.wrap(async move {
let res = vm
.execute(
spawner,
data_store,
(particle.clone(), calls),
peer_id,
key_pair,
)
.in_current_span()
.await;

waker.wake();

let reusables = Reusables {
vm_id,
vm: res.runtime,
};

(reusables, res.effects, res.stats, linking_span)
})
.instrument(async_span)
.boxed(),
);
self.wake();

Expand Down
4 changes: 2 additions & 2 deletions aquamarine/src/aquamarine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use health::HealthCheckRegistry;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::ExtendedParticle;
use peer_metrics::{ParticleExecutorMetrics, VmPoolMetrics};
use workers::{Scope, Workers};
use workers::{PeerScope, Workers};

use crate::aqua_runtime::AquaRuntime;
use crate::command::Command;
Expand Down Expand Up @@ -61,7 +61,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
vm_pool_metrics: Option<VmPoolMetrics>,
health_registry: Option<&mut HealthCheckRegistry>,
workers: Arc<Workers>,
scope: Scope,
scope: PeerScope,
) -> eyre::Result<(Self, AquamarineApi)> {
// TODO: make `100` configurable
let (outlet, inlet) = mpsc::channel(100);
Expand Down
1 change: 1 addition & 0 deletions aquamarine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ mod particle_effects;
mod particle_executor;
mod particle_functions;
mod plumber;
mod spawner;
mod vm_pool;
90 changes: 50 additions & 40 deletions aquamarine/src/particle_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use fluence_libp2p::PeerId;
use particle_protocol::Particle;

use crate::aqua_runtime::AquaRuntime;
use crate::spawner::SpawnFunctions;
use crate::spawner::Spawner;
use crate::{InterpretationStats, ParticleDataStore, ParticleEffects};

pub(super) type AVMRes<RT> = FutResult<Option<RT>, ParticleEffects, InterpretationStats>;
Expand All @@ -39,6 +41,7 @@ pub trait ParticleExecutor {
type Particle;
async fn execute(
mut self,
spawner: Spawner,
data_store: Arc<ParticleDataStore>,
p: Self::Particle,
current_peer_id: PeerId,
Expand Down Expand Up @@ -73,6 +76,7 @@ impl<RT: AquaRuntime> ParticleExecutor for RT {
#[instrument(level = tracing::Level::INFO, skip_all)]
async fn execute(
mut self,
spawner: Spawner,
data_store: Arc<ParticleDataStore>,
p: Self::Particle,
current_peer_id: PeerId,
Expand All @@ -83,12 +87,14 @@ impl<RT: AquaRuntime> ParticleExecutor for RT {
tracing::trace!(target: "execution", particle_id = particle_id, "Executing particle");

let prev_data = data_store
.clone()
.read_data(particle_id.as_str(), current_peer_id.to_base58().as_str())
.await;

if let Ok(prev_data) = prev_data {
execute_with_prev_data(
self,
spawner,
data_store,
current_peer_id,
key_pair,
Expand All @@ -110,6 +116,7 @@ impl<RT: AquaRuntime> ParticleExecutor for RT {
#[instrument(level = tracing::Level::INFO, skip_all)]
async fn execute_with_prev_data<RT: AquaRuntime>(
vm: RT,
spawner: Spawner,
data_store: Arc<ParticleDataStore>,
current_peer_id: PeerId,
key_pair: KeyPair,
Expand All @@ -121,6 +128,7 @@ async fn execute_with_prev_data<RT: AquaRuntime>(
let prev_data_len = prev_data.len();

let avm_result = avm_call(
spawner,
vm,
current_peer_id,
key_pair,
Expand Down Expand Up @@ -234,52 +242,54 @@ where

#[instrument(level = tracing::Level::INFO, skip_all)]
async fn avm_call<'a, RT: AquaRuntime>(
spawner: Spawner,
mut vm: RT,
current_peer_id: PeerId,
key_pair: KeyPair,
particle: Particle,
call_results: CallResults,
prev_data: Vec<u8>,
) -> Result<AVMCallResult<'a, RT>, JoinError> {
tokio::task::spawn_blocking(move || {
let particle_id = particle.id.clone();
let now = Instant::now();
let memory_size_before = vm.memory_stats().memory_size;
let particle_params = ParticleParameters {
current_peer_id: Cow::Owned(current_peer_id.to_string()),
init_peer_id: Cow::Owned(particle.init_peer_id.to_string()),
particle_id: Cow::Owned(particle_id),
timestamp: particle.timestamp,
ttl: particle.ttl,
};
let current_data = &particle.data[..];
let avm_outcome = vm.call(
&particle.script,
prev_data,
current_data,
particle_params.clone(),
call_results.clone(),
&key_pair,
);
let memory_size_after = vm.memory_stats().memory_size;
spawner
.spawn_avm_call(move || {
let particle_id = particle.id.clone();
let now = Instant::now();
let memory_size_before = vm.memory_stats().memory_size;
let particle_params = ParticleParameters {
current_peer_id: Cow::Owned(current_peer_id.to_string()),
init_peer_id: Cow::Owned(particle.init_peer_id.to_string()),
particle_id: Cow::Owned(particle_id),
timestamp: particle.timestamp,
ttl: particle.ttl,
};
let current_data = &particle.data[..];
let avm_outcome = vm.call(
&particle.script,
prev_data,
current_data,
particle_params.clone(),
call_results.clone(),
&key_pair,
);
let memory_size_after = vm.memory_stats().memory_size;

let interpretation_time = now.elapsed();
let new_data_len = avm_outcome.as_ref().map(|e| e.data.len()).ok();
let memory_delta = memory_size_after - memory_size_before;
let stats = InterpretationStats {
memory_delta,
interpretation_time,
new_data_len,
success: avm_outcome.is_ok(),
};
AVMCallResult {
avm_outcome,
stats,
particle,
call_results,
particle_params,
vm,
}
})
.await
let interpretation_time = now.elapsed();
let new_data_len = avm_outcome.as_ref().map(|e| e.data.len()).ok();
let memory_delta = memory_size_after - memory_size_before;
let stats = InterpretationStats {
memory_delta,
interpretation_time,
new_data_len,
success: avm_outcome.is_ok(),
};
AVMCallResult {
avm_outcome,
stats,
particle,
call_results,
particle_params,
vm,
}
})
.await
}
Loading

0 comments on commit fa5a17a

Please sign in to comment.