Skip to content

Commit

Permalink
feat: support keypairs for spells [NET-237 NET-239 NET-281 NET-283] (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh committed Jan 16, 2023
1 parent f7c3b0a commit 63d0759
Show file tree
Hide file tree
Showing 48 changed files with 967 additions and 260 deletions.
24 changes: 23 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ members = [
"crates/test-constants",
"crates/peer-metrics",
"crates/spell-event-bus",
"crates/key-manager",
"sorcerer",
"crates/builtins-tests",
"crates/particle-node-tests",
Expand Down Expand Up @@ -73,6 +74,7 @@ connected-client = { path = "crates/connected-client" }
test-constants = { path = "crates/test-constants" }
peer-metrics = { path = "crates/peer-metrics" }
spell-event-bus = { path = "crates/spell-event-bus" }
key-manager = { path = "crates/key-manager" }
sorcerer = { path = "sorcerer" }
builtins-tests = { path = "crates/builtins-tests" }
particle-node = { path = "particle-node" }
Expand Down Expand Up @@ -113,3 +115,5 @@ futures = "0.3.24"
thiserror = "1.0.37"
serde = "1.0.145"
toml = "0.5.9"
itertools = "0.10.5"
humantime-serde = "1.1.1"
1 change: 1 addition & 0 deletions aquamarine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ control-macro = { workspace = true }
fs-utils = { workspace = true }
peer-metrics = { workspace = true }
particle-execution = { workspace = true }
key-manager = { workspace = true}

avm-server = { workspace = true }
libp2p = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::Particle;

use crate::deadline::Deadline;
use crate::particle_effects::NetworkEffects;
use crate::particle_effects::RoutingEffects;
use crate::particle_executor::{Fut, FutResult, ParticleExecutor};
use crate::particle_functions::{Functions, SingleCallStat};
use crate::{AquaRuntime, InterpretationStats};
Expand Down Expand Up @@ -105,7 +105,7 @@ where
pub fn poll_completed(
&mut self,
cx: &mut Context<'_>,
) -> Poll<FutResult<(usize, RT), NetworkEffects, InterpretationStats>> {
) -> Poll<FutResult<(usize, RT), RoutingEffects, InterpretationStats>> {
use Poll::Ready;

self.waker = Some(cx.waker().clone());
Expand All @@ -120,7 +120,7 @@ where
// Schedule execution of functions
self.functions.execute(r.effects.call_requests, waker);

let effects = NetworkEffects {
let effects = RoutingEffects {
particle: r.effects.particle,
next_peers: r.effects.next_peers,
};
Expand Down
15 changes: 10 additions & 5 deletions aquamarine/src/aquamarine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use futures::{channel::mpsc, SinkExt, StreamExt};

use fluence_libp2p::types::{BackPressuredInlet, BackPressuredOutlet, Outlet};
use fluence_libp2p::PeerId;
use key_manager::KeyManager;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::Particle;
use peer_metrics::{ParticleExecutorMetrics, VmPoolMetrics};
Expand All @@ -31,16 +32,17 @@ use crate::aqua_runtime::AquaRuntime;
use crate::command::Command;
use crate::command::Command::{AddService, Ingest, RemoveService};
use crate::error::AquamarineApiError;
use crate::particle_effects::NetworkEffects;
use crate::particle_effects::RoutingEffects;
use crate::vm_pool::VmPool;
use crate::{Plumber, VmPoolConfig};

pub type EffectsChannel = Outlet<Result<NetworkEffects, AquamarineApiError>>;
pub type EffectsChannel = Outlet<Result<RoutingEffects, AquamarineApiError>>;

pub struct AquamarineBackend<RT: AquaRuntime, F> {
inlet: BackPressuredInlet<Command>,
plumber: Plumber<RT, F>,
out: EffectsChannel,
host_peer_id: PeerId,
}

impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
Expand All @@ -51,17 +53,19 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
out: EffectsChannel,
plumber_metrics: Option<ParticleExecutorMetrics>,
vm_pool_metrics: Option<VmPoolMetrics>,
host_peer_id: PeerId,
key_manager: KeyManager,
) -> (Self, AquamarineApi) {
// TODO: make `100` configurable
let (outlet, inlet) = mpsc::channel(100);
let sender = AquamarineApi::new(outlet, config.execution_timeout);
let vm_pool = VmPool::new(config.pool_size, runtime_config, vm_pool_metrics);
let plumber = Plumber::new(vm_pool, builtins, plumber_metrics, host_peer_id);
let host_peer_id = key_manager.get_host_peer_id();
let plumber = Plumber::new(vm_pool, builtins, plumber_metrics, key_manager);
let this = Self {
inlet,
plumber,
out,
host_peer_id,
};

(this, sender)
Expand All @@ -76,7 +80,8 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
Poll::Ready(Some(Ingest { particle, function })) => {
wake = true;
// set new particle to be executed
self.plumber.ingest(particle, function);
// every particle that comes from the connection pool first executed on the host peer id
self.plumber.ingest(particle, function, self.host_peer_id);
}
Poll::Ready(Some(AddService {
service,
Expand Down
3 changes: 2 additions & 1 deletion aquamarine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
unused_unsafe,
unreachable_patterns
)]
#![feature(drain_filter)]

pub use avm_server::AVM;
// reexport
Expand All @@ -34,7 +35,7 @@ pub use aqua_runtime::AquaRuntime;
pub use config::{VmConfig, VmPoolConfig};
pub use error::AquamarineApiError;
pub use particle_data_store::{DataStoreError, ParticleDataStore};
pub use particle_effects::{InterpretationStats, NetworkEffects, ParticleEffects};
pub use particle_effects::{InterpretationStats, ParticleEffects, RoutingEffects};
pub use plumber::Plumber;

pub use crate::aquamarine::{AquamarineApi, AquamarineBackend};
Expand Down
6 changes: 3 additions & 3 deletions aquamarine/src/particle_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ pub struct InterpretationStats {
pub success: bool,
}

/// Network part of the [[ParticleEffects]. Can't be executed by Aquamarine layer,
/// thus delegated to outside.
/// Routing part of the [[ParticleEffects].
/// Instruct to send particle to either virtual or remote peers.
#[derive(Clone, Debug)]
pub struct NetworkEffects {
pub struct RoutingEffects {
pub particle: Particle,
pub next_peers: Vec<PeerId>,
}
Loading

0 comments on commit 63d0759

Please sign in to comment.