Skip to content

Commit

Permalink
feat(worker_pool)!: Shift service creation to the worker pool [Fixes N…
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu authored Feb 2, 2024
1 parent 3065a95 commit d3e4855
Show file tree
Hide file tree
Showing 59 changed files with 1,836 additions and 1,453 deletions.
31 changes: 29 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ members = [
"crates/system-services",
"crates/chain-listener",
"crates/hex-utils",
"crates/chain-data"
"crates/chain-data",
"crates/types"
]
exclude = [
"nox/tests/tetraplets",
Expand Down Expand Up @@ -96,6 +97,7 @@ subnet-resolver = { path = "crates/subnet-resolver" }
hex-utils = { path = "crates/hex-utils"}
chain-data = { path = "crates/chain-data"}
chain-listener = { path = "crates/chain-listener"}
types = { path = "crates/types"}

# spell
fluence-spell-dtos = "=0.6.10"
Expand Down Expand Up @@ -157,10 +159,11 @@ ethabi = "18.0.0"
jsonrpsee = "0.21.0"
blake3 = "1.5.0"
rand = "0.8.5"
futures-util = "0.3.30"

# Enable a small amount of optimization in debug mode
[profile.dev]
opt-level = 1
opt-level = 0

# Enable high optimizations for dependencies, but not for our code:
[profile.dev.package."*"]
Expand Down
8 changes: 4 additions & 4 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
use tracing::{instrument, Instrument, Span};

use crate::deadline::Deadline;
use crate::particle_effects::RoutingEffects;
use crate::particle_effects::RawRoutingEffects;
use crate::particle_executor::{FutResult, ParticleExecutor};
use crate::particle_functions::{Functions, SingleCallStat};
use crate::spawner::{SpawnFunctions, Spawner};
Expand All @@ -39,7 +39,7 @@ struct Reusables<RT> {
vm: Option<RT>,
}

type AVMCallResult<RT> = FutResult<(usize, Option<RT>), RoutingEffects, InterpretationStats>;
type AVMCallResult<RT> = FutResult<(usize, Option<RT>), RawRoutingEffects, InterpretationStats>;
type AVMTask<RT> = BoxFuture<
'static,
(
Expand Down Expand Up @@ -134,7 +134,7 @@ where
pub fn poll_completed(
&mut self,
cx: &mut Context<'_>,
) -> Poll<FutResult<(usize, Option<RT>), RoutingEffects, InterpretationStats>> {
) -> Poll<FutResult<(usize, Option<RT>), RawRoutingEffects, InterpretationStats>> {
self.waker = Some(cx.waker().clone());

self.functions.poll(cx);
Expand Down Expand Up @@ -171,7 +171,7 @@ where
parent_span.clone(),
);

let effects = RoutingEffects {
let effects = RawRoutingEffects {
particle: ExtendedParticle::linked(
Particle {
data: effects.new_data,
Expand Down
19 changes: 9 additions & 10 deletions aquamarine/src/aquamarine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,27 @@ use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{instrument, Instrument};

use fluence_libp2p::PeerId;
use health::HealthCheckRegistry;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::ExtendedParticle;
use particle_services::PeerScope;
use peer_metrics::{ParticleExecutorMetrics, VmPoolMetrics};
use workers::{PeerScope, Workers};
use workers::{KeyStorage, PeerScopes, Workers};

use crate::aqua_runtime::AquaRuntime;
use crate::command::Command;
use crate::command::Command::{AddService, Ingest, RemoveService};
use crate::error::AquamarineApiError;
use crate::particle_effects::RoutingEffects;
use crate::particle_effects::RemoteRoutingEffects;
use crate::vm_pool::VmPool;
use crate::{DataStoreConfig, ParticleDataStore, Plumber, VmPoolConfig};

pub type EffectsChannel = mpsc::Sender<Result<RoutingEffects, AquamarineApiError>>;
pub type EffectsChannel = mpsc::Sender<Result<RemoteRoutingEffects, AquamarineApiError>>;

pub struct AquamarineBackend<RT: AquaRuntime, F> {
inlet: mpsc::Receiver<Command>,
plumber: Plumber<RT, F>,
out: EffectsChannel,
host_peer_id: PeerId,
data_store: Arc<ParticleDataStore>,
}

Expand All @@ -61,7 +60,8 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
vm_pool_metrics: Option<VmPoolMetrics>,
health_registry: Option<&mut HealthCheckRegistry>,
workers: Arc<Workers>,
scope: PeerScope,
key_storage: Arc<KeyStorage>,
scopes: PeerScopes,
) -> eyre::Result<(Self, AquamarineApi)> {
// TODO: make `100` configurable
let (outlet, inlet) = mpsc::channel(100);
Expand All @@ -79,20 +79,19 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
vm_pool_metrics,
health_registry,
);
let host_peer_id = scope.get_host_peer_id();
let plumber = Plumber::new(
vm_pool,
data_store.clone(),
builtins,
plumber_metrics,
workers,
scope,
key_storage,
scopes,
);
let this = Self {
inlet,
plumber,
out,
host_peer_id,
data_store,
};

Expand All @@ -111,7 +110,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
let _guard = span.entered();
// set new particle to be executed
// every particle that comes from the connection pool first executed on the host peer id
self.plumber.ingest(particle, function, self.host_peer_id);
self.plumber.ingest(particle, function, PeerScope::Host);
}
Poll::Ready(Some(AddService {
service,
Expand Down
2 changes: 1 addition & 1 deletion aquamarine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub use aqua_runtime::AquaRuntime;
pub use config::{DataStoreConfig, VmConfig, VmPoolConfig};
pub use error::AquamarineApiError;
pub use particle_data_store::{DataStoreError, ParticleDataStore};
pub use particle_effects::{InterpretationStats, ParticleEffects, RoutingEffects};
pub use particle_effects::{InterpretationStats, ParticleEffects, RemoteRoutingEffects};
pub use plumber::Plumber;

pub use crate::aquamarine::{AquamarineApi, AquamarineBackend};
Expand Down
15 changes: 14 additions & 1 deletion aquamarine/src/particle_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use particle_protocol::ExtendedParticle;
use std::time::Duration;

use libp2p::PeerId;
use particle_services::PeerScope;

#[derive(Clone, Debug)]
/// Effects produced by particle execution. Currently the only effect is that of sending particles.
Expand Down Expand Up @@ -64,7 +65,19 @@ impl InterpretationStats {
/// Routing part of the [[ParticleEffects].
/// Instruct to send particle to either virtual or remote peers.
#[derive(Clone, Debug)]
pub struct RoutingEffects {
pub struct RawRoutingEffects {
pub particle: ExtendedParticle,
pub next_peers: Vec<PeerId>,
}

#[derive(Clone, Debug)]
pub struct RemoteRoutingEffects {
pub particle: ExtendedParticle,
pub next_peers: Vec<PeerId>,
}

#[derive(Clone, Debug)]
pub struct LocalRoutingEffects {
pub particle: ExtendedParticle,
pub next_peers: Vec<PeerScope>,
}
Loading

0 comments on commit d3e4855

Please sign in to comment.