Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workers): separate AVM pools [fixes NET-753] #2125

Merged
merged 37 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ jobs:
uses: fluencelabs/aqua/.github/workflows/tests.yml@main
with:
nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}"
ref: renovate/fluencelabs-js-client-0.x

# registry:
# needs:
Expand Down
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ members = [
"crates/chain-data",
"crates/chain-types",
"crates/types",
"crates/core-manager"
]
"crates/core-manager",
]
exclude = [
"nox/tests/tetraplets",
]
Expand Down
40 changes: 36 additions & 4 deletions aquamarine/src/aqua_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
* limitations under the License.
*/

use std::str::FromStr;
use std::{error::Error, task::Waker};

use avm_server::avm_runner::{AVMRunner, RawAVMOutcome};
use avm_server::{AVMMemoryStats, AVMRuntimeLimits, CallResults, ParticleParameters, RunnerError};
use avm_server::{
AVMMemoryStats, AVMRuntimeLimits, CallRequests, CallResults, ParticleParameters, RunnerError,
};
use fluence_keypair::KeyPair;
use log::LevelFilter;
use libp2p::PeerId;
use tracing::Level;

use crate::config::VmConfig;
use crate::invoke::{parse_outcome, ExecutionError};
use crate::error::{ExecutionError, FieldError};
use crate::particle_effects::ParticleEffects;

pub trait AquaRuntime: Sized + Send + 'static {
Expand Down Expand Up @@ -98,10 +102,12 @@ impl AquaRuntime for AVMRunner {
particle_id,
"Executed particle, next_peer_pks is empty, no call requests. Nothing to do.",
);
if log::max_level() >= LevelFilter::Debug {

if tracing::enabled!(Level::DEBUG) {
let data = String::from_utf8_lossy(data.as_slice());
tracing::debug!(particle_id, "particle next_peer_pks = [], data: {}", data);
}

ParticleEffects::empty()
}
Err(ExecutionError::AquamarineError(err)) => {
Expand Down Expand Up @@ -144,3 +150,29 @@ impl AquaRuntime for AVMRunner {
self.memory_stats()
}
}

pub fn parse_outcome(
outcome: Result<RawAVMOutcome, RunnerError>,
) -> Result<(Vec<u8>, Vec<PeerId>, CallRequests), ExecutionError> {
let outcome = outcome.map_err(ExecutionError::AquamarineError)?;

let peer_ids = outcome
.next_peer_pks
.into_iter()
.map(|id| {
parse_peer_id(id.as_str()).map_err(|error| ExecutionError::InvalidResultField {
field: "next_peer_pks[..]",
error,
})
})
.collect::<Result<_, ExecutionError>>()?;

Ok((outcome.data, peer_ids, outcome.call_requests))
}

fn parse_peer_id(s: &str) -> Result<PeerId, FieldError> {
PeerId::from_str(s).map_err(|err| FieldError::InvalidPeerId {
peer_id: s.to_string(),
err: err.to_string(),
})
}
41 changes: 35 additions & 6 deletions aquamarine/src/aquamarine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::ExtendedParticle;
use particle_services::PeerScope;
use peer_metrics::{ParticleExecutorMetrics, VmPoolMetrics};
use workers::{KeyStorage, PeerScopes, Workers};
use workers::{Event, KeyStorage, PeerScopes, Receiver, Workers};

use crate::aqua_runtime::AquaRuntime;
use crate::command::Command;
use crate::command::Command::{AddService, Ingest, RemoveService};
use crate::error::AquamarineApiError;
use crate::particle_effects::RemoteRoutingEffects;
use crate::vm_pool::VmPool;
use crate::{DataStoreConfig, ParticleDataStore, Plumber, VmPoolConfig};
use crate::{
AquaRuntime, DataStoreConfig, ParticleDataStore, Plumber, RemoteRoutingEffects, VmPoolConfig,
};

pub type EffectsChannel = mpsc::Sender<Result<RemoteRoutingEffects, AquamarineApiError>>;

pub struct AquamarineBackend<RT: AquaRuntime, F> {
inlet: mpsc::Receiver<Command>,
worker_events: Receiver<Event>,
plumber: Plumber<RT, F>,
out: EffectsChannel,
data_store: Arc<ParticleDataStore>,
Expand All @@ -62,6 +63,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
workers: Arc<Workers>,
key_storage: Arc<KeyStorage>,
scopes: PeerScopes,
worker_events: Receiver<Event>,
) -> eyre::Result<(Self, AquamarineApi)> {
// TODO: make `100` configurable
let (outlet, inlet) = mpsc::channel(100);
Expand All @@ -75,11 +77,12 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
let data_store: Arc<ParticleDataStore> = Arc::new(data_store);
let vm_pool = VmPool::new(
config.pool_size,
runtime_config,
runtime_config.clone(),
vm_pool_metrics,
health_registry,
);
let plumber = Plumber::new(
runtime_config,
vm_pool,
data_store.clone(),
builtins,
Expand All @@ -90,6 +93,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
);
let this = Self {
inlet,
worker_events,
plumber,
out,
data_store,
Expand All @@ -99,7 +103,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
}

pub fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> {
let mut wake = false;
let mut wake = self.process_worker_events();

// check if there are new particles
loop {
Expand Down Expand Up @@ -143,6 +147,31 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
}
}

fn process_worker_events(&mut self) -> bool {
let mut wake = false;
loop {
let res = self.worker_events.try_recv();
match res {
Ok(event) => match event {
Event::WorkerCreated {
worker_id,
thread_count,
} => {
wake = true;
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
self.plumber.on_worker_created(worker_id, thread_count);
}
Event::WorkerRemoved { worker_id } => {
self.plumber.on_worker_removed(worker_id);
}
},
Err(_) => {
break;
}
}
}
wake
}

pub fn start(mut self) -> JoinHandle<()> {
let data_store = self.data_store.clone();
let mut stream = futures::stream::poll_fn(move |cx| self.poll(cx).map(|_| Some(()))).fuse();
Expand Down
30 changes: 15 additions & 15 deletions aquamarine/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ use libp2p::PeerId;
use std::path::PathBuf;
use std::time::Duration;

#[derive(Debug, Clone)]
pub struct VmPoolConfig {
/// Number of VMs to create
pub pool_size: usize,
/// Timeout of a particle execution
pub execution_timeout: Duration,
}

#[derive(Debug, Clone)]
pub struct VmConfig {
pub current_peer_id: PeerId,
Expand All @@ -44,13 +36,12 @@ pub struct VmConfig {
pub hard_limit_enabled: bool,
}

impl VmPoolConfig {
pub fn new(pool_size: usize, execution_timeout: Duration) -> Self {
Self {
pool_size,
execution_timeout,
}
}
#[derive(Debug, Clone)]
pub struct VmPoolConfig {
/// Number of VMs to create
pub pool_size: usize,
/// Timeout of a particle execution
pub execution_timeout: Duration,
}

impl VmConfig {
Expand All @@ -75,6 +66,15 @@ impl VmConfig {
}
}

impl VmPoolConfig {
pub fn new(pool_size: usize, execution_timeout: Duration) -> Self {
Self {
pool_size,
execution_timeout,
}
}
}

#[derive(Debug, Clone)]
pub struct DataStoreConfig {
/// Dir for the interpreter to persist particle data
Expand Down
54 changes: 51 additions & 3 deletions aquamarine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
* limitations under the License.
*/

use avm_server::RunnerError;
use humantime::FormattedDuration;
use std::error::Error;
use std::fmt::{Display, Formatter};
use thiserror::Error;

use particle_protocol::ParticleError;
Expand Down Expand Up @@ -42,9 +45,7 @@ pub enum AquamarineApiError {
particle_id: String,
timeout: FormattedDuration,
},
#[error(
"AquamarineApiError::AquamarineQueueFull: can't send particle {particle_id:?} to Aquamarine"
)]
#[error("AquamarineApiError::AquamarineQueueFull: can't send particle {particle_id:?} to Aquamarine")]
AquamarineQueueFull { particle_id: Option<String> },
#[error("AquamarineApiError::SignatureVerificationFailed: particle_id = {particle_id}, error = {err}")]
SignatureVerificationFailed {
Expand Down Expand Up @@ -75,3 +76,50 @@ impl AquamarineApiError {
}
}
}

impl std::error::Error for ExecutionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match &self {
ExecutionError::InvalidResultField { error, .. } => Some(error),
ExecutionError::AquamarineError(err) => Some(err),
}
}
}

impl Display for ExecutionError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ExecutionError::InvalidResultField { field, error } => {
write!(f, "Execution error: invalid result field {field}: {error}")
}
ExecutionError::AquamarineError(err) => {
write!(f, "Execution error: aquamarine error: {err}")
}
}
}
}

#[derive(Debug)]
pub enum FieldError {
InvalidPeerId { peer_id: String, err: String },
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
}

impl std::error::Error for FieldError {}
impl Display for FieldError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
FieldError::InvalidPeerId { peer_id, err } => {
write!(f, "invalid PeerId '{peer_id}': {err}")
}
}
}
}

#[derive(Debug)]
pub enum ExecutionError {
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
InvalidResultField {
field: &'static str,
error: FieldError,
},
AquamarineError(RunnerError),
}
1 change: 1 addition & 0 deletions aquamarine/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl HealthCheck for VMPoolHealth {
#[cfg(test)]
mod tests {
use super::*;
use crate::health::VMPoolHealth;
use std::thread;

#[test]
Expand Down
Loading
Loading