diff --git a/Cargo.lock b/Cargo.lock index 3743692f3e..d6be6a6e42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -197,6 +197,7 @@ dependencies = [ "fs-utils", "futures", "humantime", + "key-manager", "libp2p", "log", "now-millis", @@ -2551,6 +2552,22 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "key-manager" +version = "0.1.0" +dependencies = [ + "eyre", + "fluence-keypair", + "fluence-libp2p", + "fs-utils", + "libp2p", + "log", + "parking_lot 0.12.1", + "serde", + "thiserror", + "toml", +] + [[package]] name = "kv-log-macro" version = "1.0.7" @@ -4087,6 +4104,7 @@ dependencies = [ "humantime-serde", "itertools", "kademlia", + "key-manager", "libp2p", "libp2p-metrics", "libp2p-swarm", @@ -4135,6 +4153,7 @@ dependencies = [ "libp2p", "local-vm", "log", + "log-utils", "maplit", "multihash", "now-millis", @@ -4166,6 +4185,7 @@ dependencies = [ "base64 0.20.0", "derivative", "eyre", + "fluence-keypair", "fluence-libp2p", "futures", "humantime-serde", @@ -4178,6 +4198,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "thiserror", ] [[package]] @@ -5320,14 +5341,15 @@ dependencies = [ "async-std", "connection-pool", "eyre", + "fluence-keypair", "fluence-libp2p", "fluence-spell-dtos", "fstrings", "futures", "kademlia", + "key-manager", "libp2p", "log", - "maplit", "now-millis", "parking_lot 0.12.1", "particle-args", diff --git a/Cargo.toml b/Cargo.toml index 8a3ec009f1..6d24290287 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ members = [ "crates/test-constants", "crates/peer-metrics", "crates/spell-event-bus", + "crates/key-manager", "sorcerer", "crates/builtins-tests", "crates/particle-node-tests", @@ -73,6 +74,7 @@ connected-client = { path = "crates/connected-client" } test-constants = { path = "crates/test-constants" } peer-metrics = { path = "crates/peer-metrics" } spell-event-bus = { path = "crates/spell-event-bus" } +key-manager = { path = "crates/key-manager" } sorcerer = { path = "sorcerer" } builtins-tests = { path = "crates/builtins-tests" } particle-node = { path = "particle-node" } @@ -113,3 +115,5 @@ futures = "0.3.24" thiserror = "1.0.37" serde = "1.0.145" toml = "0.5.9" +itertools = "0.10.5" +humantime-serde = "1.1.1" diff --git a/aquamarine/Cargo.toml b/aquamarine/Cargo.toml index 06911e17c0..55dbdc224b 100644 --- a/aquamarine/Cargo.toml +++ b/aquamarine/Cargo.toml @@ -17,6 +17,7 @@ control-macro = { workspace = true } fs-utils = { workspace = true } peer-metrics = { workspace = true } particle-execution = { workspace = true } +key-manager = { workspace = true} avm-server = { workspace = true } libp2p = { workspace = true } diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index e096b3e871..fba5f0f642 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -27,7 +27,7 @@ use particle_execution::{ParticleFunctionStatic, ServiceFunction}; use particle_protocol::Particle; use crate::deadline::Deadline; -use crate::particle_effects::NetworkEffects; +use crate::particle_effects::RoutingEffects; use crate::particle_executor::{Fut, FutResult, ParticleExecutor}; use crate::particle_functions::{Functions, SingleCallStat}; use crate::{AquaRuntime, InterpretationStats}; @@ -105,7 +105,7 @@ where pub fn poll_completed( &mut self, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { use Poll::Ready; self.waker = Some(cx.waker().clone()); @@ -120,7 +120,7 @@ where // Schedule execution of functions self.functions.execute(r.effects.call_requests, waker); - let effects = NetworkEffects { + let effects = RoutingEffects { particle: r.effects.particle, next_peers: r.effects.next_peers, }; diff --git a/aquamarine/src/aquamarine.rs b/aquamarine/src/aquamarine.rs index 59803d7190..018d9a9b8c 100644 --- a/aquamarine/src/aquamarine.rs +++ b/aquamarine/src/aquamarine.rs @@ -23,6 +23,7 @@ use futures::{channel::mpsc, SinkExt, StreamExt}; use fluence_libp2p::types::{BackPressuredInlet, BackPressuredOutlet, Outlet}; use fluence_libp2p::PeerId; +use key_manager::KeyManager; use particle_execution::{ParticleFunctionStatic, ServiceFunction}; use particle_protocol::Particle; use peer_metrics::{ParticleExecutorMetrics, VmPoolMetrics}; @@ -31,16 +32,17 @@ use crate::aqua_runtime::AquaRuntime; use crate::command::Command; use crate::command::Command::{AddService, Ingest, RemoveService}; use crate::error::AquamarineApiError; -use crate::particle_effects::NetworkEffects; +use crate::particle_effects::RoutingEffects; use crate::vm_pool::VmPool; use crate::{Plumber, VmPoolConfig}; -pub type EffectsChannel = Outlet>; +pub type EffectsChannel = Outlet>; pub struct AquamarineBackend { inlet: BackPressuredInlet, plumber: Plumber, out: EffectsChannel, + host_peer_id: PeerId, } impl AquamarineBackend { @@ -51,17 +53,19 @@ impl AquamarineBackend { out: EffectsChannel, plumber_metrics: Option, vm_pool_metrics: Option, - host_peer_id: PeerId, + key_manager: KeyManager, ) -> (Self, AquamarineApi) { // TODO: make `100` configurable let (outlet, inlet) = mpsc::channel(100); let sender = AquamarineApi::new(outlet, config.execution_timeout); let vm_pool = VmPool::new(config.pool_size, runtime_config, vm_pool_metrics); - let plumber = Plumber::new(vm_pool, builtins, plumber_metrics, host_peer_id); + let host_peer_id = key_manager.get_host_peer_id(); + let plumber = Plumber::new(vm_pool, builtins, plumber_metrics, key_manager); let this = Self { inlet, plumber, out, + host_peer_id, }; (this, sender) @@ -76,7 +80,8 @@ impl AquamarineBackend { Poll::Ready(Some(Ingest { particle, function })) => { wake = true; // set new particle to be executed - self.plumber.ingest(particle, function); + // every particle that comes from the connection pool first executed on the host peer id + self.plumber.ingest(particle, function, self.host_peer_id); } Poll::Ready(Some(AddService { service, diff --git a/aquamarine/src/lib.rs b/aquamarine/src/lib.rs index b62e7cee9c..ea95a95662 100644 --- a/aquamarine/src/lib.rs +++ b/aquamarine/src/lib.rs @@ -25,6 +25,7 @@ unused_unsafe, unreachable_patterns )] +#![feature(drain_filter)] pub use avm_server::AVM; // reexport @@ -34,7 +35,7 @@ pub use aqua_runtime::AquaRuntime; pub use config::{VmConfig, VmPoolConfig}; pub use error::AquamarineApiError; pub use particle_data_store::{DataStoreError, ParticleDataStore}; -pub use particle_effects::{InterpretationStats, NetworkEffects, ParticleEffects}; +pub use particle_effects::{InterpretationStats, ParticleEffects, RoutingEffects}; pub use plumber::Plumber; pub use crate::aquamarine::{AquamarineApi, AquamarineBackend}; diff --git a/aquamarine/src/particle_effects.rs b/aquamarine/src/particle_effects.rs index 60bc55a093..d3bf3171b0 100644 --- a/aquamarine/src/particle_effects.rs +++ b/aquamarine/src/particle_effects.rs @@ -49,10 +49,10 @@ pub struct InterpretationStats { pub success: bool, } -/// Network part of the [[ParticleEffects]. Can't be executed by Aquamarine layer, -/// thus delegated to outside. +/// Routing part of the [[ParticleEffects]. +/// Instruct to send particle to either virtual or remote peers. #[derive(Clone, Debug)] -pub struct NetworkEffects { +pub struct RoutingEffects { pub particle: Particle, pub next_peers: Vec, } diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index d107f1f1ea..cf8af7aef6 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -22,6 +22,7 @@ use std::{ use futures::task::Waker; use fluence_libp2p::PeerId; +use key_manager::KeyManager; /// For tests, mocked time is used #[cfg(test)] use mock_time::now_ms; @@ -36,18 +37,19 @@ use crate::actor::{Actor, ActorPoll}; use crate::aqua_runtime::AquaRuntime; use crate::deadline::Deadline; use crate::error::AquamarineApiError; -use crate::particle_effects::NetworkEffects; +use crate::particle_effects::RoutingEffects; use crate::particle_functions::Functions; use crate::vm_pool::VmPool; +type ParticleId = String; pub struct Plumber { - events: VecDeque>, - actors: HashMap>, + events: VecDeque>, + actors: HashMap<(ParticleId, PeerId), Actor>, vm_pool: VmPool, builtins: F, waker: Option, metrics: Option, - host_peer_id: PeerId, + key_manager: KeyManager, } impl Plumber { @@ -55,7 +57,7 @@ impl Plumber { vm_pool: VmPool, builtins: F, metrics: Option, - host_peer_id: PeerId, + key_manager: KeyManager, ) -> Self { Self { vm_pool, @@ -64,12 +66,17 @@ impl Plumber { actors: <_>::default(), waker: <_>::default(), metrics, - host_peer_id, + key_manager, } } /// Receives and ingests incoming particle: creates a new actor or forwards to the existing mailbox - pub fn ingest(&mut self, particle: Particle, function: Option) { + pub fn ingest( + &mut self, + particle: Particle, + function: Option, + scope_peer_id: PeerId, + ) { self.wake(); let deadline = Deadline::from(&particle); @@ -83,12 +90,14 @@ impl Plumber { } let builtins = &self.builtins; - let host_peer_id = self.host_peer_id; - let actor = self.actors.entry(particle.id.clone()).or_insert_with(|| { - let params = ParticleParams::clone_from(&particle); - let functions = Functions::new(params, builtins.clone()); - Actor::new(&particle, functions, host_peer_id) - }); + let actor = self + .actors + .entry((particle.id.clone(), scope_peer_id)) + .or_insert_with(|| { + let params = ParticleParams::clone_from(&particle); + let functions = Functions::new(params, builtins.clone()); + Actor::new(&particle, functions, scope_peer_id) + }); actor.ingest(particle); if let Some(function) = function { @@ -112,7 +121,7 @@ impl Plumber { pub fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.waker = Some(cx.waker().clone()); self.vm_pool.poll(cx); @@ -122,11 +131,33 @@ impl Plumber { } // Gather effects and put VMs back - let mut effects = vec![]; + let mut remote_effects = vec![]; + let mut local_effects: Vec = vec![]; + let mut interpretation_stats = vec![]; let mut mailbox_size = 0; + let key_manager = self.key_manager.clone(); for actor in self.actors.values_mut() { if let Poll::Ready(result) = actor.poll_completed(cx) { - effects.push((result.effects, result.stats)); + interpretation_stats.push(result.stats); + let (local_peers, remote_peers): (Vec<_>, Vec<_>) = result + .effects + .next_peers + .into_iter() + .partition(|p| key_manager.is_scope_peer_id(*p)); + + if !remote_peers.is_empty() { + remote_effects.push(RoutingEffects { + particle: result.effects.particle.clone(), + next_peers: remote_peers, + }) + } + + if !local_peers.is_empty() { + local_effects.push(RoutingEffects { + particle: result.effects.particle, + next_peers: local_peers, + }); + } let (vm_id, vm) = result.vm; self.vm_pool.put_vm(vm_id, vm); } @@ -136,7 +167,7 @@ impl Plumber { // Remove expired actors if let Some((vm_id, mut vm)) = self.vm_pool.get_vm() { let now = now_ms(); - self.actors.retain(|particle_id, actor| { + self.actors.retain(|(particle_id, _peer_id), actor| { // if actor hasn't yet expired or is still executing, keep it // TODO: if actor is expired, cancel execution and return VM back to pool // https://github.com/fluencelabs/fluence/issues/1212 @@ -181,7 +212,7 @@ impl Plumber { } self.meter(|m| { - for (_, stat) in &effects { + for stat in &interpretation_stats { // count particle interpretations if stat.success { m.interpretation_successes.inc(); @@ -201,11 +232,15 @@ impl Plumber { } }); - // Turn effects into events, and buffer them - for (effect, _) in effects { - self.events.push_back(Ok(effect)); + for effect in local_effects { + for local_peer in effect.next_peers { + self.ingest(effect.particle.clone(), None, local_peer); + } } + // Turn effects into events, and buffer them + self.events.extend(remote_effects.into_iter().map(Ok)); + // Return a new event if there is some if let Some(event) = self.events.pop_front() { return Poll::Ready(event); @@ -245,6 +280,7 @@ mod tests { use futures::future::BoxFuture; use futures::task::noop_waker_ref; use futures::FutureExt; + use key_manager::KeyManager; use particle_args::Args; use particle_execution::{ParticleFunction, ParticleParams, ServiceFunction}; @@ -339,7 +375,8 @@ mod tests { // Pool is of size 1 so it's easier to control tests let vm_pool = VmPool::new(1, (), None); let builtin_mock = Arc::new(MockF); - Plumber::new(vm_pool, builtin_mock, None, RandomPeerId::random()) + let key_manager = KeyManager::new("keypair".into(), RandomPeerId::random()); + Plumber::new(vm_pool, builtin_mock, None, key_manager) } fn particle(ts: u64, ttl: u32) -> Particle { @@ -366,7 +403,7 @@ mod tests { let deadline = Deadline::from(&particle); assert!(!deadline.is_expired(now_ms())); - plumber.ingest(particle, None); + plumber.ingest(particle, None, RandomPeerId::random()); assert_eq!(plumber.actors.len(), 1); let mut cx = context(); @@ -399,7 +436,7 @@ mod tests { let deadline = Deadline::from(&particle); assert!(deadline.is_expired(now_ms())); - plumber.ingest(particle.clone(), None); + plumber.ingest(particle.clone(), None, RandomPeerId::random()); assert_eq!(plumber.actors.len(), 0); diff --git a/connection-pool/Cargo.toml b/connection-pool/Cargo.toml index 2767024592..9e4a4b2244 100644 --- a/connection-pool/Cargo.toml +++ b/connection-pool/Cargo.toml @@ -15,7 +15,7 @@ futures = { workspace = true } log = { workspace = true } serde = { workspace = true } async-std = { workspace = true } -itertools = "0.10.5" +itertools = { workspace = true } [dev-dependencies] parking_lot = { workspace = true } diff --git a/crates/builtins-deployer/src/builtins_deployer.rs b/crates/builtins-deployer/src/builtins_deployer.rs index 6bd7dde618..7f9be74b37 100644 --- a/crates/builtins-deployer/src/builtins_deployer.rs +++ b/crates/builtins-deployer/src/builtins_deployer.rs @@ -27,11 +27,11 @@ use serde_json::{json, Value as JValue}; use aquamarine::AquamarineApi; use fluence_libp2p::PeerId; +use fs_utils::list_files; use fs_utils::{file_name, to_abs_path}; use humantime::format_duration as pretty; use local_vm::{client_functions, wrap_script}; use now_millis::now_ms; -use particle_modules::list_files; use particle_protocol::Particle; use uuid_utils::uuid; diff --git a/crates/builtins-deployer/src/utils.rs b/crates/builtins-deployer/src/utils.rs index 24112792fd..404731ba8f 100644 --- a/crates/builtins-deployer/src/utils.rs +++ b/crates/builtins-deployer/src/utils.rs @@ -21,8 +21,8 @@ use eyre::{eyre, Result, WrapErr}; use regex::Regex; use serde_json::Value as JValue; -use fs_utils::file_stem; -use particle_modules::{list_files, AddBlueprint}; +use fs_utils::{file_stem, list_files}; +use particle_modules::AddBlueprint; use service_modules::{ hash_dependencies, module_config_name_json, module_file_name, Dependency, Hash, }; diff --git a/crates/builtins-tests/tests/src/builtins_deployer.rs b/crates/builtins-tests/tests/src/builtins_deployer.rs index 1a9fc15671..0921bb571c 100644 --- a/crates/builtins-tests/tests/src/builtins_deployer.rs +++ b/crates/builtins-tests/tests/src/builtins_deployer.rs @@ -28,7 +28,7 @@ use builtins_deployer::ALLOWED_ENV_PREFIX; use connected_client::ConnectedClient; use created_swarm::{make_swarms_with_builtins, make_swarms_with_keypair}; use fs_utils::copy_dir_all; -use particle_modules::list_files; +use fs_utils::list_files; use service_modules::load_module; use test_utils::create_service; diff --git a/crates/connected-client/Cargo.toml b/crates/connected-client/Cargo.toml index 8601d3624f..efdd9dd5cc 100644 --- a/crates/connected-client/Cargo.toml +++ b/crates/connected-client/Cargo.toml @@ -18,7 +18,7 @@ futures = { workspace = true } serde = { version = "1.0.145", features = ["derive"] } serde_json = { workspace = true } log = { workspace = true } -derivative = "2.2.0" +derivative = { workspace = true } eyre = { workspace = true } parking_lot = { workspace = true } diff --git a/crates/created-swarm/Cargo.toml b/crates/created-swarm/Cargo.toml index 58ebfcbb1e..7f9124b202 100644 --- a/crates/created-swarm/Cargo.toml +++ b/crates/created-swarm/Cargo.toml @@ -27,7 +27,7 @@ libp2p = { workspace = true } async-std = { workspace = true } futures = { workspace = true } -derivative = "2.2.0" +derivative = { workspace = true } serde_json = { workspace = true } base64 = { workspace = true } serde = { workspace = true } diff --git a/crates/created-swarm/src/swarm.rs b/crates/created-swarm/src/swarm.rs index 2fdd42f53c..2d3eb33ed5 100644 --- a/crates/created-swarm/src/swarm.rs +++ b/crates/created-swarm/src/swarm.rs @@ -352,7 +352,12 @@ pub fn create_swarm_with_runtime( let mut node = Node::new(resolved, vm_config, "some version").expect("create node"); node.listen(vec![config.listen_on.clone()]).expect("listen"); - (node.local_peer_id, node, management_kp, config) + ( + node.key_manager.get_host_peer_id(), + node, + management_kp, + config, + ) } pub fn create_swarm(config: SwarmConfig) -> (PeerId, Box>, KeyPair, SwarmConfig) { diff --git a/crates/fs-utils/src/lib.rs b/crates/fs-utils/src/lib.rs index 2ff27a77fa..e56cb87d8a 100644 --- a/crates/fs-utils/src/lib.rs +++ b/crates/fs-utils/src/lib.rs @@ -167,3 +167,9 @@ pub fn canonicalize(path: impl AsRef) -> eyre::Result { path.as_ref().display() )) } + +/// List files in directory +pub fn list_files(dir: &Path) -> Option> { + let dir = std::fs::read_dir(dir).ok()?; + Some(dir.filter_map(|p| p.ok()?.path().into())) +} diff --git a/crates/key-manager/Cargo.toml b/crates/key-manager/Cargo.toml new file mode 100644 index 0000000000..886df1ba61 --- /dev/null +++ b/crates/key-manager/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "key-manager" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fs-utils = { workspace = true } +fluence-libp2p = { workspace = true} +fluence-keypair = { workspace = true } + +parking_lot = { workspace = true } +eyre = { workspace = true } +thiserror = { workspace = true } +serde = { workspace = true } +toml = { workspace = true } +log = { workspace = true } +libp2p = { workspace = true } diff --git a/crates/key-manager/src/error.rs b/crates/key-manager/src/error.rs new file mode 100644 index 0000000000..ca3551e252 --- /dev/null +++ b/crates/key-manager/src/error.rs @@ -0,0 +1,53 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::path::PathBuf; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum PersistedKeypairError { + #[error("Failed to persist keypair: RSA is not supported")] + CannotExtractRSASecretKey, + #[error("Error reading persisted keypair from {path:?}: {err}")] + ReadPersistedKeypair { + path: PathBuf, + #[source] + err: std::io::Error, + }, + #[error("Error deserializing persisted keypair from {path:?}: {err}")] + DeserializePersistedKeypair { + path: PathBuf, + #[source] + err: toml::de::Error, + }, + #[error("Error serializing persisted keypair: {err}")] + SerializePersistedKeypair { + #[source] + err: toml::ser::Error, + }, + #[error("Error writing persisted keypair to {path:?}: {err}")] + WriteErrorPersistedKeypair { + path: PathBuf, + #[source] + err: std::io::Error, + }, + #[error("Error creating directory for persisted keypairs {path:?}: {err}")] + CreateKeypairsDir { + path: PathBuf, + #[source] + err: std::io::Error, + }, +} diff --git a/crates/key-manager/src/key_manager.rs b/crates/key-manager/src/key_manager.rs new file mode 100644 index 0000000000..f4989d890a --- /dev/null +++ b/crates/key-manager/src/key_manager.rs @@ -0,0 +1,138 @@ +/* + * Copyright 2021 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use fluence_keypair::{KeyFormat, KeyPair}; +use libp2p::PeerId; +use std::collections::HashMap; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; + +use crate::error::PersistedKeypairError; +use crate::persistence::{load_persisted_keypairs, persist_keypair, PersistedKeypair}; +use parking_lot::RwLock; + +#[derive(Clone)] +pub struct KeyManager { + /// scope_peer_id -> scope_keypair + scope_keypairs: Arc>>, + /// remote_peer_id -> scope_peer_id + scope_peer_ids: Arc>>, + keypairs_dir: PathBuf, + host_peer_id: PeerId, +} + +impl KeyManager { + pub fn new(keypairs_dir: PathBuf, host_peer_id: PeerId) -> Self { + let this = Self { + scope_keypairs: Arc::new(Default::default()), + scope_peer_ids: Arc::new(Default::default()), + keypairs_dir, + host_peer_id, + }; + + this.load_persisted_keypairs(); + this + } + + pub fn load_persisted_keypairs(&self) { + let persisted_keypairs = load_persisted_keypairs(&self.keypairs_dir); + + for pkp in persisted_keypairs { + let res: eyre::Result<()> = try { + let persisted_kp = pkp?; + let keypair = KeyPair::from_secret_key( + persisted_kp.private_key_bytes, + KeyFormat::from_str(&persisted_kp.key_format)?, + )?; + let peer_id = keypair.get_peer_id(); + self.scope_peer_ids + .write() + .insert(persisted_kp.remote_peer_id, keypair.get_peer_id()); + + self.scope_keypairs.write().insert(peer_id, keypair); + }; + + if let Err(e) = res { + log::warn!("Failed to restore persisted keypair: {}", e); + } + } + } + + pub fn get_host_peer_id(&self) -> PeerId { + self.host_peer_id + } + + pub fn has_keypair(&self, remote_peer_id: PeerId) -> bool { + self.scope_peer_ids.read().contains_key(&remote_peer_id) + } + + pub fn is_scope_peer_id(&self, scope_peer_id: PeerId) -> bool { + self.scope_keypairs.read().contains_key(&scope_peer_id) + } + + /// For local peer ids is identity, + /// for remote returns associated peer id or generate a new one. + pub fn get_scope_peer_id(&self, init_peer_id: PeerId) -> Result { + // All "nested" spells share the same keypair. + // "nested" means spells which are created by other spells + if self.is_scope_peer_id(init_peer_id) { + Ok(init_peer_id) + } else { + let scope_peer_id = self.scope_peer_ids.read().get(&init_peer_id).cloned(); + match scope_peer_id { + Some(p) => Ok(p), + _ => { + let kp = self.generate_keypair(); + let scope_peer_id = kp.get_peer_id(); + self.store_keypair(init_peer_id, kp)?; + Ok(scope_peer_id) + } + } + } + } + + pub fn get_scope_keypair(&self, scope_peer_id: PeerId) -> eyre::Result { + self.scope_keypairs + .read() + .get(&scope_peer_id) + .cloned() + .ok_or_else(|| eyre::eyre!("Keypair for peer id {} not found", scope_peer_id)) + } + + pub fn generate_keypair(&self) -> KeyPair { + KeyPair::generate_ed25519() + } + + pub fn store_keypair( + &self, + remote_peer_id: PeerId, + keypair: KeyPair, + ) -> Result<(), PersistedKeypairError> { + persist_keypair( + &self.keypairs_dir, + PersistedKeypair::new(remote_peer_id, &keypair)?, + )?; + let scope_peer_id = keypair.get_peer_id(); + self.scope_peer_ids + .write() + .insert(remote_peer_id, scope_peer_id); + + self.scope_keypairs.write().insert(scope_peer_id, keypair); + + Ok(()) + } +} diff --git a/crates/key-manager/src/lib.rs b/crates/key-manager/src/lib.rs new file mode 100644 index 0000000000..36676351e7 --- /dev/null +++ b/crates/key-manager/src/lib.rs @@ -0,0 +1,7 @@ +#![feature(try_blocks)] + +mod error; +mod key_manager; +mod persistence; + +pub use key_manager::KeyManager; diff --git a/crates/key-manager/src/persistence.rs b/crates/key-manager/src/persistence.rs new file mode 100644 index 0000000000..2536a6eb51 --- /dev/null +++ b/crates/key-manager/src/persistence.rs @@ -0,0 +1,108 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use fs_utils::{create_dir, list_files}; + +use crate::error::PersistedKeypairError; +use crate::error::PersistedKeypairError::{ + CannotExtractRSASecretKey, CreateKeypairsDir, DeserializePersistedKeypair, + ReadPersistedKeypair, SerializePersistedKeypair, WriteErrorPersistedKeypair, +}; +use fluence_keypair::KeyPair; +use fluence_libp2p::peerid_serializer; +use libp2p::PeerId; +use serde::{Deserialize, Serialize}; +use std::path::Path; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PersistedKeypair { + #[serde(with = "peerid_serializer")] + pub remote_peer_id: PeerId, + pub private_key_bytes: Vec, + pub key_format: String, +} + +impl PersistedKeypair { + pub fn new(owner_id: PeerId, keypair: &KeyPair) -> Result { + Ok(Self { + remote_peer_id: owner_id, + private_key_bytes: keypair.secret().map_err(|_| CannotExtractRSASecretKey)?, + key_format: keypair.public().get_key_format().into(), + }) + } +} + +pub fn keypair_file_name(remote_peer_id: &str) -> String { + format!("{remote_peer_id}_keypair.toml") +} + +pub fn is_keypair(path: &Path) -> bool { + path.file_name() + .and_then(|n| n.to_str()) + .map_or(false, |n| n.ends_with("_keypair.toml")) +} + +/// Persist keypair info to disk, so it is recreated after restart +pub fn persist_keypair( + keypairs_dir: &Path, + persisted_keypair: PersistedKeypair, +) -> Result<(), PersistedKeypairError> { + let path = keypairs_dir.join(keypair_file_name( + &persisted_keypair.remote_peer_id.to_base58(), + )); + let bytes = + toml::to_vec(&persisted_keypair).map_err(|err| SerializePersistedKeypair { err })?; + std::fs::write(&path, bytes).map_err(|err| WriteErrorPersistedKeypair { path, err }) +} + +/// Load info about persisted keypairs from disk +pub fn load_persisted_keypairs( + keypairs_dir: &Path, +) -> Vec> { + // Load all persisted service file names + let files = match list_files(keypairs_dir) { + Some(files) => files, + None => { + // Attempt to create directory and exit + if let Err(err) = create_dir(keypairs_dir) { + return vec![Err(CreateKeypairsDir { + path: keypairs_dir.to_path_buf(), + err, + })]; + } + + return vec![]; + } + }; + + files + .filter(|p| is_keypair(p)) + .map(|file| { + // Load persisted keypair + let bytes = std::fs::read(&file).map_err(|err| ReadPersistedKeypair { + err, + path: file.to_path_buf(), + })?; + let keypair = + toml::from_slice(bytes.as_slice()).map_err(|err| DeserializePersistedKeypair { + err, + path: file.to_path_buf(), + })?; + + Ok(keypair) + }) + .collect() +} diff --git a/crates/log-utils/src/lib.rs b/crates/log-utils/src/lib.rs index 5f27127a3d..337319deab 100644 --- a/crates/log-utils/src/lib.rs +++ b/crates/log-utils/src/lib.rs @@ -27,6 +27,7 @@ pub fn enable_logs() { .filter(Some("script_storage"), Trace) .filter(Some("script_storage"), Trace) .filter(Some("sorcerer"), Trace) + .filter(Some("key_manager"), Trace) .filter(Some("spell_event_bus"), Trace) .filter(Some("aquamarine"), Trace) .filter(Some("network"), Trace) diff --git a/crates/particle-node-tests/Cargo.toml b/crates/particle-node-tests/Cargo.toml index 4ad262772e..6ae54b845e 100644 --- a/crates/particle-node-tests/Cargo.toml +++ b/crates/particle-node-tests/Cargo.toml @@ -20,6 +20,8 @@ now-millis = { path = "../now-millis" } local-vm = { path = "../local-vm" } control-macro = { path = "../control-macro" } json-utils = { path = "../json-utils" } + +log-utils = { workspace = true } fluence-spell-dtos = { workspace = true } fluence-app-service = { workspace = true } particle-protocol = { workspace = true } diff --git a/crates/particle-node-tests/tests/spells.rs b/crates/particle-node-tests/tests/spells.rs index d161fb1a7a..77a86f9fc0 100644 --- a/crates/particle-node-tests/tests/spells.rs +++ b/crates/particle-node-tests/tests/spells.rs @@ -13,27 +13,31 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; use eyre::Context; -use fluence_spell_dtos::trigger_config::TriggerConfig; use maplit::hashmap; use serde_json::{json, Value as JValue}; use connected_client::ConnectedClient; use created_swarm::make_swarms; +use fluence_spell_dtos::trigger_config::TriggerConfig; use service_modules::load_module; use spell_event_bus::api::MAX_PERIOD_SEC; use test_utils::create_service; +type SpellId = String; +type ScopePeerId = String; + fn create_spell( client: &mut ConnectedClient, script: &str, config: TriggerConfig, init_data: HashMap, -) -> String { +) -> (SpellId, ScopePeerId) { let data = hashmap! { "script" => json!(script.to_string()), "config" => json!(config), @@ -45,7 +49,10 @@ fn create_spell( r#" (seq (call relay ("spell" "install") [script data config] spell_id) - (call client ("return" "") [spell_id]) + (seq + (call relay ("scope" "get_peer_id") [] scope_peer_id) + (call client ("return" "") [spell_id scope_peer_id]) + ) )"#, data.clone(), ); @@ -53,57 +60,44 @@ fn create_spell( let response = client.receive_args().wrap_err("receive").unwrap(); let spell_id = response[0].as_str().unwrap().to_string(); assert_ne!(spell_id.len(), 0); + let scope_peer_id = response[1].as_str().unwrap().to_string(); + assert_ne!(script.len(), 0); - spell_id + (spell_id, scope_peer_id) } #[test] fn spell_simple_test() { let swarms = make_swarms(1); + let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) .wrap_err("connect client") .unwrap(); - let script = r#" + let script = format!( + r#" (seq (call %init_peer_id% ("getDataSrv" "spell_id") [] spell_id) (seq - (call %init_peer_id% (spell_id "get_script_source_from_file") [] script) - (call %init_peer_id% (spell_id "set_string") ["result" script.$.source_code]) + (seq + (call %init_peer_id% (spell_id "get_script_source_from_file") [] script) + (call %init_peer_id% (spell_id "get_u32") ["counter"] counter) + ) + (call "{}" ("return" "") [script.$.source_code counter]) ) - )"#; + )"#, + client.peer_id + ); let mut config = TriggerConfig::default(); config.clock.period_sec = 0; config.clock.start_sec = 1; - let spell_id = create_spell(&mut client, script, config, hashmap! {}); - - let mut result = "".to_string(); - let mut counter = 0; - for _ in 1..10 { - let data = hashmap! { - "spell_id" => json!(spell_id), - "client" => json!(client.peer_id.to_string()), - "relay" => json!(client.node.to_string()), - }; - client.send_particle( - r#" - (seq - (seq - (call relay (spell_id "get_string") ["result"] result) - (call relay (spell_id "get_u32") ["counter"] counter) - ) - (call client ("return" "") [result counter]) - )"#, - data.clone(), - ); + create_spell(&mut client, &script, config, hashmap! {}); - let response = client.receive_args().wrap_err("receive").unwrap(); - if response[0]["success"].as_bool().unwrap() && response[1]["success"].as_bool().unwrap() { - result = response[0]["str"].as_str().unwrap().to_string(); - counter = response[1]["num"].as_u64().unwrap(); - } - } + let response = client.receive_args().wrap_err("receive").unwrap(); + let result = response[0].as_str().unwrap().to_string(); + assert!(response[1]["success"].as_bool().unwrap()); + let counter = response[1]["num"].as_u64().unwrap(); assert_eq!(result, script); assert_ne!(counter, 0); @@ -126,7 +120,7 @@ fn spell_error_handling_test() { config.clock.period_sec = 1; config.clock.start_sec = 1; - let spell_id = create_spell(&mut client, failing_script, config, hashmap! {}); + let (spell_id, _) = create_spell(&mut client, failing_script, config, hashmap! {}); // let's retrieve error from the first spell particle let particle_id = format!("spell_{}_{}", spell_id, 0); @@ -259,7 +253,7 @@ fn spell_run_oneshot() { // Note that when period is 0, the spell is executed only once let mut config = TriggerConfig::default(); config.clock.start_sec = 1; - let spell_id = create_spell(&mut client, script, config, hashmap! {}); + let (spell_id, _) = create_spell(&mut client, script, config, hashmap! {}); let data = hashmap! { "spell_id" => json!(spell_id), @@ -292,7 +286,7 @@ fn spell_install_fail_empty_config() { .wrap_err("connect client") .unwrap(); - let script = r#"(call %init_peer_id% ("peer" "idenitfy") [] x)"#; + let script = r#"(call %init_peer_id% ("peer" "identify") [] x)"#; let empty: HashMap = HashMap::new(); // Note that when period is 0, the spell is executed only once @@ -332,7 +326,7 @@ fn spell_install_fail_large_period() { .wrap_err("connect client") .unwrap(); - let script = r#"(call %init_peer_id% ("peer" "idenitfy") [] x)"#; + let script = r#"(call %init_peer_id% ("peer" "identify") [] x)"#; let empty: HashMap = HashMap::new(); // Note that when period is 0, the spell is executed only once @@ -376,7 +370,7 @@ fn spell_install_fail_end_sec_past() { .wrap_err("connect client") .unwrap(); - let script = r#"(call %init_peer_id% ("peer" "idenitfy") [] x)"#; + let script = r#"(call %init_peer_id% ("peer" "identify") [] x)"#; let empty: HashMap = HashMap::new(); // Note that when period is 0, the spell is executed only once @@ -420,7 +414,7 @@ fn spell_install_fail_end_sec_before_start() { .wrap_err("connect client") .unwrap(); - let script = r#"(call %init_peer_id% ("peer" "idenitfy") [] x)"#; + let script = r#"(call %init_peer_id% ("peer" "identify") [] x)"#; let empty: HashMap = HashMap::new(); let now = std::time::SystemTime::now() @@ -467,11 +461,11 @@ fn spell_store_trigger_config() { .wrap_err("connect client") .unwrap(); - let script = r#"(call %init_peer_id% ("peer" "idenitfy") [] x)"#; + let script = r#"(call %init_peer_id% ("peer" "identify") [] x)"#; let mut config = TriggerConfig::default(); config.clock.period_sec = 13; config.clock.start_sec = 10; - let spell_id = create_spell(&mut client, script, config.clone(), hashmap! {}); + let (spell_id, _) = create_spell(&mut client, script, config.clone(), hashmap! {}); let data = hashmap! { "spell_id" => json!(spell_id), "client" => json!(client.peer_id.to_string()), @@ -496,15 +490,16 @@ fn spell_store_trigger_config() { #[test] fn spell_remove() { let swarms = make_swarms(1); + let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) .wrap_err("connect client") .unwrap(); - let script = r#"(call %init_peer_id% ("peer" "idenitfy") [] x)"#; + let script = r#"(call %init_peer_id% ("peer" "identify") [] x)"#; let mut config = TriggerConfig::default(); config.clock.period_sec = 2; config.clock.start_sec = 1; - let spell_id = create_spell(&mut client, script, config, hashmap! {}); + let (spell_id, scope) = create_spell(&mut client, script, config, hashmap! {}); let data = hashmap! { "spell_id" => json!(spell_id), @@ -533,7 +528,10 @@ fn spell_remove() { client.send_particle( r#" (seq - (call relay ("spell" "remove") [spell_id]) + (seq + (call relay ("scope" "get_peer_id") [] scope_id) + (call relay ("spell" "remove") [spell_id]) + ) (seq (call relay ("spell" "list") [] list) (call client ("return" "") [list]) @@ -545,7 +543,7 @@ fn spell_remove() { if let [JValue::Array(created_spells)] = client .receive_args() - .wrap_err("receive") + .wrap_err(format!("receive by {}, scope {}", client.peer_id, scope)) .unwrap() .as_slice() { @@ -560,12 +558,12 @@ fn spell_remove_spell_as_service() { .wrap_err("connect client") .unwrap(); - let script = r#"(call %init_peer_id% ("peer" "idenitfy") [] x)"#; + let script = r#"(call %init_peer_id% ("peer" "identify") [] x)"#; let mut config = TriggerConfig::default(); config.clock.period_sec = 2; config.clock.start_sec = 1; - let spell_id = create_spell(&mut client, script, config, hashmap! {}); + let (spell_id, _) = create_spell(&mut client, script, config, hashmap! {}); let data = hashmap! { "spell_id" => json!(spell_id), @@ -590,7 +588,10 @@ fn spell_remove_spell_as_service() { .as_slice() { let msg_end = "cannot call function 'remove_service': cannot remove a spell\"'"; - assert!(msg.ends_with(msg_end), "should end with `{msg_end}`"); + assert!( + msg.ends_with(msg_end), + "should end with `{msg_end}`, given msg `{msg}`" + ); } } @@ -607,6 +608,11 @@ fn spell_remove_service_as_spell() { load_module("tests/file_share/artifacts", "file_share").expect("load module"), ); + let script = r#"(call %init_peer_id% ("peer" "identify") [] x)"#; + let mut config = TriggerConfig::default(); + config.clock.start_sec = 1; + let _spell_id = create_spell(&mut client, script, config, hashmap! {}); + let data = hashmap! { "service_id" => json!(service.id), "relay" => json!(client.node.to_string()), @@ -630,7 +636,10 @@ fn spell_remove_service_as_spell() { .as_slice() { let msg_end = "cannot call function 'remove_spell': the service isn't a spell\"'"; - assert!(msg.ends_with(msg_end), "should end with `{msg_end}`"); + assert!( + msg.ends_with(msg_end), + "should end with `{msg_end}`, given msg `{msg}`" + ); } } @@ -655,11 +664,11 @@ fn spell_trigger_connection_pool() { ); let mut config = TriggerConfig::default(); config.connections.connect = true; - let spell_id1 = create_spell(&mut client, &script, config, hashmap! {}); + let (spell_id1, _) = create_spell(&mut client, &script, config, hashmap! {}); let mut config = TriggerConfig::default(); config.connections.disconnect = true; - let spell_id2 = create_spell(&mut client, &script, config, hashmap! {}); + let (spell_id2, _) = create_spell(&mut client, &script, config, hashmap! {}); // This connect should trigger the spell let connect_num = 5; @@ -713,7 +722,7 @@ fn spell_update_config() { let script = format!(r#"(call "{}" ("return" "") ["called"])"#, client.peer_id); let mut config = TriggerConfig::default(); config.connections.connect = true; - let spell_id = create_spell(&mut client, &script, config, hashmap! {}); + let (spell_id, _) = create_spell(&mut client, &script, config, hashmap! {}); let connected = ConnectedClient::connect_to(swarms[0].multiaddr.clone()).unwrap(); @@ -873,7 +882,7 @@ fn spell_set_u32() { let mut config = TriggerConfig::default(); config.connections.connect = true; - let spell_id = create_spell(&mut client, &script, config.clone(), hashmap! {}); + let (spell_id, _) = create_spell(&mut client, &script, config.clone(), hashmap! {}); let data = hashmap! { "spell_id" => json!(spell_id), @@ -912,3 +921,28 @@ fn spell_set_u32() { assert_eq!(two["absent"], json!(false)); assert_eq!(two["num"], json!(2)); } + +#[test] +fn spell_peer_id_test() { + let swarms = make_swarms(1); + let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) + .wrap_err("connect client") + .unwrap(); + + let script = format!( + r#" + (call "{}" ("return" "") [%init_peer_id%]) + "#, + client.peer_id + ); + + let mut config = TriggerConfig::default(); + config.clock.start_sec = 1; + let (_, scope_peer_id) = create_spell(&mut client, &script, config, hashmap! {}); + + let response = client.receive_args().wrap_err("receive").unwrap(); + + let result = response[0].as_str().unwrap().to_string(); + + assert_eq!(result, scope_peer_id); +} diff --git a/crates/server-config/Cargo.toml b/crates/server-config/Cargo.toml index 1fe0b1e3e8..30b3ab49e4 100644 --- a/crates/server-config/Cargo.toml +++ b/crates/server-config/Cargo.toml @@ -18,7 +18,7 @@ libp2p = { workspace = true } libp2p-metrics = { workspace = true } serde = { version = "1.0.145", features = ["derive"] } -humantime-serde = "1.1.1" +humantime-serde = { workspace = true } log = { workspace = true } rand = "0.8.5" @@ -28,6 +28,6 @@ bs58 = { workspace = true } base64 = { workspace = true } num_cpus = "1.13.1" eyre = { workspace = true } -derivative = "2.2.0" +derivative = { workspace = true } bytesize = {version = "1.1.0", features = ["serde"]} serde_with = "2.0.1" diff --git a/crates/server-config/src/dir_config.rs b/crates/server-config/src/dir_config.rs index 0cecea598d..f33dfde860 100644 --- a/crates/server-config/src/dir_config.rs +++ b/crates/server-config/src/dir_config.rs @@ -47,6 +47,9 @@ pub struct UnresolvedDirConfig { /// Path to spell service files (wasms, configs) #[serde(default)] pub spell_base_dir: Option, + + #[serde(default)] + pub keypairs_base_dir: Option, } impl UnresolvedDirConfig { @@ -60,6 +63,7 @@ impl UnresolvedDirConfig { .air_interpreter_path .unwrap_or(air_interpreter_path(&base)); let spell_base_dir = self.spell_base_dir.unwrap_or(base.join("spell")); + let keypairs_base_dir = self.keypairs_base_dir.unwrap_or(base.join("keypairs")); create_dirs(&[ &base, @@ -67,6 +71,7 @@ impl UnresolvedDirConfig { &avm_base_dir, &builtins_base_dir, &spell_base_dir, + &keypairs_base_dir, ]) .context("creating configured directories")?; @@ -75,6 +80,7 @@ impl UnresolvedDirConfig { let builtins_base_dir = canonicalize(builtins_base_dir)?; let avm_base_dir = canonicalize(avm_base_dir)?; let spell_base_dir = canonicalize(spell_base_dir)?; + let keypairs_base_dir = canonicalize(keypairs_base_dir)?; Ok(ResolvedDirConfig { base_dir: base, @@ -83,6 +89,7 @@ impl UnresolvedDirConfig { avm_base_dir, air_interpreter_path, spell_base_dir, + keypairs_base_dir, }) } } @@ -98,4 +105,5 @@ pub struct ResolvedDirConfig { /// Directory where interpreter's WASM module is stored pub air_interpreter_path: PathBuf, pub spell_base_dir: PathBuf, + pub keypairs_base_dir: PathBuf, } diff --git a/crates/toy-vms/Cargo.toml b/crates/toy-vms/Cargo.toml index 44498e961c..f00ac1b9a3 100644 --- a/crates/toy-vms/Cargo.toml +++ b/crates/toy-vms/Cargo.toml @@ -13,4 +13,4 @@ avm-server = { workspace = true } futures = { workspace = true } serde_json = { workspace = true } -itertools = "0.10.5" +itertools = { workspace = true } diff --git a/particle-builtins/Cargo.toml b/particle-builtins/Cargo.toml index cfd569f8b8..42bbdea4d6 100644 --- a/particle-builtins/Cargo.toml +++ b/particle-builtins/Cargo.toml @@ -33,10 +33,10 @@ log = { workspace = true } bs58 = { workspace = true } parking_lot = { workspace = true } thiserror = { workspace = true } -humantime-serde = "1.1.1" +humantime-serde = { workspace = true } rand = "0.8.5" futures = { workspace = true } -itertools = "0.10.5" +itertools = { workspace = true } bytesize = "1.1.0" derivative = { workspace = true } fluence-app-service = { workspace = true } diff --git a/particle-modules/src/files.rs b/particle-modules/src/files.rs index 898e4c57bd..9d31d37c0d 100644 --- a/particle-modules/src/files.rs +++ b/particle-modules/src/files.rs @@ -22,8 +22,8 @@ use service_modules::{ Blueprint, Hash, }; +use std::convert::TryInto; use std::path::Path; -use std::{convert::TryInto, path::PathBuf}; /// Load blueprint from disk pub fn load_blueprint(bp_dir: &Path, blueprint_id: &str) -> Result { @@ -67,12 +67,6 @@ pub fn load_config_by_path(path: &Path) -> Result { Ok(config) } -/// List files in directory -pub fn list_files(dir: &Path) -> Option> { - let dir = std::fs::read_dir(dir).ok()?; - Some(dir.filter_map(|p| p.ok()?.path().into())) -} - /// Adds a module to the filesystem, overwriting existing module. /// Also adds module config to the TomlMarineNamedModuleConfig pub fn add_module( diff --git a/particle-modules/src/lib.rs b/particle-modules/src/lib.rs index c91e06a38b..6fb9803adf 100644 --- a/particle-modules/src/lib.rs +++ b/particle-modules/src/lib.rs @@ -34,7 +34,7 @@ mod files; mod modules; pub use error::ModuleError; -pub use files::{list_files, load_blueprint, load_module_by_path, load_module_descriptor}; +pub use files::{load_blueprint, load_module_by_path, load_module_descriptor}; pub use modules::{AddBlueprint, ModuleRepository}; // reexport @@ -42,3 +42,4 @@ pub use fluence_app_service::{ TomlMarineModuleConfig as ModuleConfig, TomlMarineNamedModuleConfig as NamedModuleConfig, TomlWASIConfig as WASIConfig, }; +pub use fs_utils::list_files; diff --git a/particle-modules/src/modules.rs b/particle-modules/src/modules.rs index 0dfb49a22d..1661080cda 100644 --- a/particle-modules/src/modules.rs +++ b/particle-modules/src/modules.rs @@ -77,7 +77,7 @@ impl ModuleRepository { max_heap_size: ByteSize, default_heap_size: Option, ) -> Self { - let modules_by_name: HashMap<_, _> = files::list_files(modules_dir) + let modules_by_name: HashMap<_, _> = fs_utils::list_files(modules_dir) .into_iter() .flatten() .filter(|path| is_module_wasm(path)) @@ -300,7 +300,7 @@ impl ModuleRepository { pub fn list_modules(&self) -> std::result::Result { // TODO: refactor errors to enums - let modules = files::list_files(&self.modules_dir) + let modules = fs_utils::list_files(&self.modules_dir) .into_iter() .flatten() .filter_map(|path| { @@ -391,7 +391,7 @@ impl ModuleRepository { } fn load_blueprints(blueprints_dir: &Path) -> HashMap { - let blueprints: Vec = files::list_files(blueprints_dir) + let blueprints: Vec = fs_utils::list_files(blueprints_dir) .into_iter() .flatten() .filter_map(|path| { diff --git a/particle-node/Cargo.toml b/particle-node/Cargo.toml index 4af12b80e3..42c7e370ba 100644 --- a/particle-node/Cargo.toml +++ b/particle-node/Cargo.toml @@ -14,6 +14,7 @@ script-storage = { workspace = true } aquamarine = { workspace = true } sorcerer = { workspace = true } + fluence-libp2p = { workspace = true } ctrlc-adapter = { workspace = true } server-config = { workspace = true } @@ -24,6 +25,7 @@ builtins-deployer = { workspace = true } fs-utils = { workspace = true } peer-metrics = { workspace = true } spell-event-bus = { workspace = true } +key-manager = { workspace = true } fluence-keypair = { workspace = true } @@ -39,13 +41,13 @@ futures = { workspace = true } async-std = { workspace = true } parking_lot = { workspace = true } -humantime-serde = "1.1.1" +humantime-serde = { workspace = true } log = { workspace = true } env_logger = "0.9.1" clap = "3.2.23" tide = "0.17.0-beta.1" -itertools = "0.10.5" +itertools = { workspace = true } eyre = { workspace = true } base64 = { workspace = true } diff --git a/particle-node/src/dispatcher.rs b/particle-node/src/dispatcher.rs index 8f11f54c29..5a57361d3c 100644 --- a/particle-node/src/dispatcher.rs +++ b/particle-node/src/dispatcher.rs @@ -18,7 +18,7 @@ use async_std::task::spawn; use futures::{FutureExt, SinkExt, StreamExt}; use prometheus_client::registry::Registry; -use aquamarine::{AquamarineApi, AquamarineApiError, NetworkEffects}; +use aquamarine::{AquamarineApi, AquamarineApiError, RoutingEffects}; use fluence_libp2p::types::{BackPressuredInlet, Inlet, Outlet}; use fluence_libp2p::PeerId; use particle_protocol::Particle; @@ -27,8 +27,7 @@ use peer_metrics::DispatcherMetrics; use crate::effectors::Effectors; use crate::tasks::Tasks; -// TODO: move error into NetworkEffects -type Effects = Result; +type Effects = Result; #[derive(Clone)] pub struct Dispatcher { diff --git a/particle-node/src/effectors.rs b/particle-node/src/effectors.rs index f17c4b6488..500dd2628d 100644 --- a/particle-node/src/effectors.rs +++ b/particle-node/src/effectors.rs @@ -16,7 +16,7 @@ use futures::{stream::iter, SinkExt, StreamExt}; -use aquamarine::NetworkEffects; +use aquamarine::RoutingEffects; use fluence_libp2p::types::Outlet; use crate::connectivity::Connectivity; @@ -32,7 +32,7 @@ impl Effectors { } /// Perform effects that Aquamarine instructed us to - pub async fn execute(self, effects: NetworkEffects, particle_failures: Outlet) { + pub async fn execute(self, effects: RoutingEffects, particle_failures: Outlet) { if effects.particle.is_expired() { log::info!("Particle {} is expired", effects.particle.id); return; diff --git a/particle-node/src/node.rs b/particle-node/src/node.rs index 3f19144770..d81e71170a 100644 --- a/particle-node/src/node.rs +++ b/particle-node/src/node.rs @@ -40,13 +40,14 @@ use libp2p_metrics::{Metrics, Recorder}; use prometheus_client::registry::Registry; use aquamarine::{ - AquaRuntime, AquamarineApi, AquamarineApiError, AquamarineBackend, NetworkEffects, VmPoolConfig, + AquaRuntime, AquamarineApi, AquamarineApiError, AquamarineBackend, RoutingEffects, VmPoolConfig, }; use builtins_deployer::BuiltinsDeployer; use config_utils::to_peer_id; use connection_pool::{ConnectionPoolApi, ConnectionPoolT}; use fluence_libp2p::types::{BackPressuredInlet, Inlet}; use fluence_libp2p::{build_transport, types::OneshotOutlet}; +use key_manager::KeyManager; use particle_builtins::{Builtins, NodeInfo}; use particle_execution::{ParticleFunction, ParticleFunctionStatic}; use particle_protocol::Particle; @@ -56,7 +57,7 @@ use peer_metrics::{ }; use script_storage::{ScriptStorageApi, ScriptStorageBackend, ScriptStorageConfig}; use server_config::{NetworkConfig, ResolvedConfig, ServicesConfig}; -use sorcerer::Sorcerer; +use sorcerer::{Sorcerer, SpellBuiltin}; use spell_event_bus::api::{PeerEvent, SpellEventBusApi, TriggerEvent}; use spell_event_bus::bus::SpellEventBus; @@ -71,7 +72,7 @@ use crate::behaviour::FluenceNetworkBehaviourEvent; // TODO: documentation pub struct Node { particle_stream: BackPressuredInlet, - effects_stream: Inlet>, + effects_stream: Inlet>, pub swarm: Swarm, pub connectivity: Connectivity, @@ -89,8 +90,9 @@ pub struct Node { metrics_listen_addr: SocketAddr, - pub local_peer_id: PeerId, pub builtins_management_peer_id: PeerId, + + pub key_manager: KeyManager, } impl Node { @@ -100,7 +102,6 @@ impl Node { node_version: &'static str, ) -> eyre::Result> { let key_pair: Keypair = config.node_config.root_key_pair.clone().into(); - let local_peer_id = to_peer_id(&key_pair); let transport = config.transport_config.transport; let transport = build_transport( transport, @@ -110,8 +111,13 @@ impl Node { let builtins_peer_id = to_peer_id(&config.builtins_key_pair.clone().into()); + let key_manager = KeyManager::new( + config.dir_config.keypairs_base_dir.clone(), + to_peer_id(&key_pair), + ); + let services_config = ServicesConfig::new( - local_peer_id, + key_manager.get_host_peer_id(), config.dir_config.services_base_dir.clone(), config_utils::particles_vault_dir(&config.dir_config.avm_base_dir), config.services_envs.clone(), @@ -143,7 +149,7 @@ impl Node { ); let (swarm, connectivity, particle_stream) = Self::swarm( - local_peer_id, + key_manager.get_host_peer_id(), network_config, transport, config.external_addresses(), @@ -156,7 +162,7 @@ impl Node { timer_resolution: config.script_storage_timer_resolution, max_failures: config.script_storage_max_failures, particle_ttl: config.script_storage_particle_ttl, - peer_id: local_peer_id, + peer_id: key_manager.get_host_peer_id(), }; let pool: &ConnectionPoolApi = connectivity.as_ref(); @@ -196,14 +202,14 @@ impl Node { effects_out, plumber_metrics, vm_pool_metrics, - local_peer_id, + key_manager.clone(), ); let effectors = Effectors::new(connectivity.clone()); let dispatcher = { let failures = particle_failures_out; let parallelism = config.particle_processor_parallelism; Dispatcher::new( - local_peer_id, + key_manager.get_host_peer_id(), aquamarine_api.clone(), effectors, failures, @@ -214,7 +220,7 @@ impl Node { let builtins_deployer = BuiltinsDeployer::new( builtins_peer_id, - local_peer_id, + key_manager.get_host_peer_id(), aquamarine_api.clone(), config.dir_config.builtins_base_dir.clone(), config.node_config.autodeploy_particle_ttl, @@ -233,13 +239,17 @@ impl Node { builtins.modules.clone(), aquamarine_api.clone(), config.clone(), - local_peer_id, spell_event_bus_api, + key_manager.clone(), ); - spell_service_functions - .into_iter() - .for_each(|(srv_id, funcs, unhandled)| builtins.extend(srv_id, funcs, unhandled)); + spell_service_functions.into_iter().for_each( + |SpellBuiltin { + service_id, + functions, + unhandled, + }| builtins.extend(service_id, functions, unhandled), + ); Ok(Self::with( particle_stream, @@ -257,8 +267,8 @@ impl Node { metrics_registry, services_metrics_backend, config.metrics_listen_addr(), - local_peer_id, builtins_peer_id, + key_manager, )) } @@ -313,7 +323,7 @@ impl Node { #[allow(clippy::too_many_arguments)] pub fn with( particle_stream: BackPressuredInlet, - effects_stream: Inlet>, + effects_stream: Inlet>, swarm: Swarm, connectivity: Connectivity, @@ -330,8 +340,8 @@ impl Node { services_metrics_backend: ServicesMetricsBackend, metrics_listen_addr: SocketAddr, - local_peer_id: PeerId, builtins_management_peer_id: PeerId, + key_manager: KeyManager, ) -> Box { let node_service = Self { particle_stream, @@ -352,8 +362,8 @@ impl Node { services_metrics_backend, metrics_listen_addr, - local_peer_id, builtins_management_peer_id, + key_manager, }; Box::new(node_service) diff --git a/particle-protocol/Cargo.toml b/particle-protocol/Cargo.toml index 9f070b2983..39371cfed4 100644 --- a/particle-protocol/Cargo.toml +++ b/particle-protocol/Cargo.toml @@ -10,15 +10,18 @@ fluence-libp2p = { workspace = true } json-utils = { workspace = true } now-millis = { workspace = true } +fluence-keypair = { workspace = true } + serde_json = { workspace = true } futures = { workspace = true } eyre = { workspace = true } +thiserror = { workspace = true } serde = { workspace = true } serde_derive = "1.0.145" -humantime-serde = "1.1.1" +humantime-serde = { workspace = true } log = { workspace = true } -derivative = "2.2.0" -itertools = "0.10.5" +derivative = { workspace = true } +itertools = { workspace = true } base64 = { workspace = true } [dev-dependencies] diff --git a/particle-protocol/src/error.rs b/particle-protocol/src/error.rs new file mode 100644 index 0000000000..71cd5d7131 --- /dev/null +++ b/particle-protocol/src/error.rs @@ -0,0 +1,46 @@ +/* + * Copyright 2023 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use thiserror::Error; + +use fluence_keypair::error::{SigningError, VerificationError}; + +#[derive(Debug, Error)] +pub enum ParticleError { + #[error("Cannot sign particle {particle_id} with keypair not from init_peer_id {init_peer_id}: given {given_peer_id}")] + InvalidKeypair { + particle_id: String, + init_peer_id: String, + given_peer_id: String, + }, + #[error("Failed to sign particle {particle_id} signature: {err}")] + SigningFailed { + #[source] + err: SigningError, + particle_id: String, + }, + #[error("Failed to verify particle {particle_id} signature: {err}")] + SignatureVerificationFailed { + #[source] + err: VerificationError, + particle_id: String, + }, + #[error("Failed to decode public key from init_peer_id of particle {particle_id}: {err}")] + DecodingError { + #[source] + err: fluence_keypair::error::DecodingError, + particle_id: String, + }, +} diff --git a/particle-protocol/src/lib.rs b/particle-protocol/src/lib.rs index a046c6bcce..4508c29712 100644 --- a/particle-protocol/src/lib.rs +++ b/particle-protocol/src/lib.rs @@ -33,9 +33,11 @@ mod libp2p_protocol { } mod contact; +mod error; mod particle; pub use contact::Contact; +pub use error::ParticleError; pub use libp2p_protocol::message::CompletionChannel; pub use libp2p_protocol::message::SendStatus; pub use libp2p_protocol::message::{HandlerMessage, ProtocolMessage}; diff --git a/particle-protocol/src/particle.rs b/particle-protocol/src/particle.rs index 69f306d47c..653d73718d 100644 --- a/particle-protocol/src/particle.rs +++ b/particle-protocol/src/particle.rs @@ -14,12 +14,18 @@ * limitations under the License. */ +use std::convert::TryInto; use std::time::Duration; use derivative::Derivative; +use fluence_keypair::{KeyPair, PublicKey, Signature}; use libp2p::PeerId; use serde::{Deserialize, Serialize}; +use crate::error::ParticleError; +use crate::error::ParticleError::{ + DecodingError, InvalidKeypair, SignatureVerificationFailed, SigningFailed, +}; use fluence_libp2p::{peerid_serializer, RandomPeerId}; use json_utils::base64_serde; use now_millis::now_ms; @@ -80,6 +86,57 @@ impl Particle { Duration::default() } } + + /// return immutable particle fields in bytes for signing + /// concatenation of: + /// - id as bytes + /// - init_peer_id in base58 as bytes + /// - timestamp u64 as little-endian bytes + /// - ttl u32 as little-endian bytes + /// - script as bytes + fn as_bytes(&self) -> Vec { + let mut bytes = vec![]; + bytes.extend(self.id.as_bytes()); + bytes.extend(self.init_peer_id.to_base58().as_bytes()); + bytes.extend(self.timestamp.to_le_bytes()); + bytes.extend(self.ttl.to_le_bytes()); + bytes.extend(self.script.as_bytes()); + + bytes + } + + pub fn sign(&mut self, keypair: &KeyPair) -> Result<(), ParticleError> { + if self.init_peer_id != keypair.get_peer_id() { + return Err(InvalidKeypair { + particle_id: self.id.clone(), + init_peer_id: self.init_peer_id.to_base58(), + given_peer_id: keypair.get_peer_id().to_base58(), + }); + } + self.signature = keypair + .sign(self.as_bytes().as_slice()) + .map_err(|err| SigningFailed { + err, + particle_id: self.id.clone(), + })? + .to_vec() + .to_vec(); + + Ok(()) + } + + pub fn verify(&self) -> Result<(), ParticleError> { + let pk: PublicKey = self.init_peer_id.try_into().map_err(|err| DecodingError { + err, + particle_id: self.id.clone(), + })?; + let sig = Signature::from_bytes(pk.get_key_format(), self.signature.clone()); + pk.verify(&self.as_bytes(), &sig) + .map_err(|err| SignatureVerificationFailed { + err, + particle_id: self.id.clone(), + }) + } } #[allow(clippy::ptr_arg)] diff --git a/particle-services/Cargo.toml b/particle-services/Cargo.toml index f374820a14..c7e678ecc9 100644 --- a/particle-services/Cargo.toml +++ b/particle-services/Cargo.toml @@ -28,9 +28,9 @@ log = { workspace = true } uuid = { workspace = true } toml = "0.5.9" thiserror = { workspace = true } -derivative = "2.2.0" +derivative = { workspace = true } eyre = { workspace = true } -humantime-serde = "1.1.1" +humantime-serde = { workspace = true } [dev-dependencies] tempdir = "0.3.7" diff --git a/particle-services/src/app_services.rs b/particle-services/src/app_services.rs index 1a913b4d1b..089c554dc5 100644 --- a/particle-services/src/app_services.rs +++ b/particle-services/src/app_services.rs @@ -438,6 +438,13 @@ impl ParticleAppServices { Ok(service_id) } + pub fn get_service_owner(&self, service_id: String) -> Result { + let services_read = self.services.read(); + get_service(&services_read, &self.aliases.read(), service_id) + .map_err(ServiceError::NoSuchService) + .map(|(srv, _)| srv.owner_id) + } + pub fn get_interface(&self, service_id: String) -> Result { let services = self.services.read(); let (service, _) = get_service(&services, &self.aliases.read(), service_id) diff --git a/particle-services/src/persistence.rs b/particle-services/src/persistence.rs index d55b52f324..3fdba63f54 100644 --- a/particle-services/src/persistence.rs +++ b/particle-services/src/persistence.rs @@ -21,8 +21,8 @@ use crate::error::ServiceError::{ }; use fluence_libp2p::{peerid_serializer, PeerId, RandomPeerId}; -use fs_utils::create_dirs; -use particle_modules::{list_files, ModuleError}; +use fs_utils::{create_dir, list_files}; +use particle_modules::ModuleError; use service_modules::{is_service, service_file_name}; use serde::{Deserialize, Serialize}; @@ -89,15 +89,14 @@ pub fn load_persisted_services(services_dir: &Path) -> Vec files, None => { // Attempt to create directory and exit - return create_dirs(&[&services_dir]) - .map_err(|err| CreateServicesDir { + if let Err(err) = create_dir(services_dir) { + return vec![Err(CreateServicesDir { path: services_dir.to_path_buf(), err, - }) - .err() - .into_iter() - .map(Err) - .collect(); + })]; + } + + return vec![]; } }; diff --git a/sorcerer/Cargo.toml b/sorcerer/Cargo.toml index de4ddc883a..79bf140816 100644 --- a/sorcerer/Cargo.toml +++ b/sorcerer/Cargo.toml @@ -21,13 +21,14 @@ now-millis = { workspace = true } connection-pool = { workspace = true } kademlia = { workspace = true } fluence-libp2p = { workspace = true } +key-manager = { workspace = true } libp2p = { workspace = true } +fluence-keypair = { workspace = true } serde_json = { workspace = true } parking_lot = { workspace = true } log = { workspace = true } -maplit = { workspace = true } futures = { workspace = true } eyre = { workspace = true} fstrings = { workspace = true} diff --git a/sorcerer/src/error.rs b/sorcerer/src/error.rs new file mode 100644 index 0000000000..5d8d8f4311 --- /dev/null +++ b/sorcerer/src/error.rs @@ -0,0 +1,33 @@ +/* + * Copyright 2023 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use particle_protocol::ParticleError; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum SorcererError { + #[error("Failed to sign particle for spell {spell_id} : {err}")] + ParticleSigningFailed { + #[source] + err: ParticleError, + spell_id: String, + }, + #[error("Keypair for spell {spell_id} is missing: {err}")] + ScopeKeypairMissing { + #[source] + err: eyre::Report, + spell_id: String, + }, +} diff --git a/sorcerer/src/lib.rs b/sorcerer/src/lib.rs index d83e99e99c..c9dd118b2e 100644 --- a/sorcerer/src/lib.rs +++ b/sorcerer/src/lib.rs @@ -1,9 +1,11 @@ #![feature(try_blocks)] pub use sorcerer::Sorcerer; +pub use sorcerer::SpellBuiltin; #[macro_use] extern crate fstrings; +mod error; mod script_executor; mod sorcerer; mod spells; diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index 5cbc6a13e5..ceaa733dc0 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -13,9 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use fluence_libp2p::PeerId; use fluence_spell_dtos::value::{ScriptValue, U32Value, UnitValue}; use serde_json::json; +use crate::error::SorcererError::{ParticleSigningFailed, ScopeKeypairMissing}; use now_millis::now_ms; use particle_args::JError; use particle_protocol::Particle; @@ -25,13 +27,13 @@ use crate::utils::process_func_outcome; use crate::Sorcerer; impl Sorcerer { - fn get_spell_counter(&self, spell_id: String) -> Result { + fn get_spell_counter(&self, spell_id: String, scope_peer_id: PeerId) -> Result { let func_outcome = self.services.call_function( &spell_id, "get_u32", vec![json!("counter")], None, - self.node_peer_id, + scope_peer_id, self.spell_script_particle_ttl, ); @@ -48,26 +50,31 @@ impl Sorcerer { } } - fn set_spell_next_counter(&self, spell_id: String, next_counter: u32) -> Result<(), JError> { + fn set_spell_next_counter( + &self, + spell_id: String, + next_counter: u32, + scope_peer_id: PeerId, + ) -> Result<(), JError> { let func_outcome = self.services.call_function( &spell_id, "set_u32", vec![json!("counter"), json!(next_counter)], None, - self.node_peer_id, + scope_peer_id, self.spell_script_particle_ttl, ); process_func_outcome::(func_outcome, &spell_id, "set_u32").map(drop) } - fn get_spell_script(&self, spell_id: String) -> Result { + fn get_spell_script(&self, spell_id: String, scope_peer_id: PeerId) -> Result { let func_outcome = self.services.call_function( &spell_id, "get_script_source_from_file", vec![], None, - self.node_peer_id, + scope_peer_id, self.spell_script_particle_ttl, ); @@ -79,23 +86,44 @@ impl Sorcerer { .source_code) } - pub(crate) fn get_spell_particle(&self, spell_id: String) -> Result { - let spell_counter = self.get_spell_counter(spell_id.clone())?; - self.set_spell_next_counter(spell_id.clone(), spell_counter + 1)?; - let spell_script = self.get_spell_script(spell_id.clone())?; - - Ok(Particle { + pub(crate) fn make_spell_particle( + &self, + spell_id: String, + scope_peer_id: PeerId, + ) -> Result { + let spell_keypair = self + .key_manager + .get_scope_keypair(scope_peer_id) + .map_err(|err| ScopeKeypairMissing { + err, + spell_id: spell_id.clone(), + })?; + + let spell_counter = self.get_spell_counter(spell_id.clone(), scope_peer_id)?; + self.set_spell_next_counter(spell_id.clone(), spell_counter + 1, scope_peer_id)?; + let spell_script = self.get_spell_script(spell_id.clone(), scope_peer_id)?; + + let mut particle = Particle { id: f!("spell_{spell_id}_{spell_counter}"), - init_peer_id: self.node_peer_id, + init_peer_id: scope_peer_id, timestamp: now_ms() as u64, ttl: self.spell_script_particle_ttl.as_millis() as u32, script: spell_script, signature: vec![], data: vec![], - }) + }; + particle + .sign(&spell_keypair) + .map_err(|err| ParticleSigningFailed { err, spell_id })?; + + Ok(particle) } - pub(crate) fn store_trigger(&self, event: TriggerEvent) -> Result<(), JError> { + pub(crate) fn store_trigger( + &self, + event: TriggerEvent, + scope_peer_id: PeerId, + ) -> Result<(), JError> { let serialized_event = serde_json::to_string(&TriggerInfoAqua::from(event.info))?; let func_outcome = self.services.call_function( @@ -103,7 +131,7 @@ impl Sorcerer { "list_push_string", vec![json!("trigger_mailbox"), json!(serialized_event)], None, - self.node_peer_id, + scope_peer_id, self.spell_script_particle_ttl, ); @@ -113,9 +141,10 @@ impl Sorcerer { pub async fn execute_script(&self, event: TriggerEvent) { let error: Result<(), JError> = try { - let particle = self.get_spell_particle(event.spell_id.clone())?; + let scope_peer_id = self.services.get_service_owner(event.spell_id.clone())?; + let particle = self.make_spell_particle(event.spell_id.clone(), scope_peer_id)?; - self.store_trigger(event.clone())?; + self.store_trigger(event.clone(), scope_peer_id)?; self.aquamarine.clone().execute(particle, None).await?; }; diff --git a/sorcerer/src/sorcerer.rs b/sorcerer/src/sorcerer.rs index 4e5b1a8ecf..d3a6059026 100644 --- a/sorcerer/src/sorcerer.rs +++ b/sorcerer/src/sorcerer.rs @@ -20,11 +20,10 @@ use std::time::Duration; use async_std::task::{spawn, JoinHandle}; use fluence_spell_dtos::trigger_config::TriggerConfigValue; use futures::{FutureExt, StreamExt}; -use libp2p::PeerId; -use maplit::hashmap; use aquamarine::AquamarineApi; use fluence_libp2p::types::Inlet; +use key_manager::KeyManager; use particle_args::JError; use particle_builtins::{wrap, wrap_unit}; use particle_execution::ServiceFunction; @@ -35,8 +34,8 @@ use spell_event_bus::api::{from_user_config, SpellEventBusApi, TriggerEvent}; use spell_storage::SpellStorage; use crate::spells::{ - get_spell_arg, get_spell_id, spell_install, spell_list, spell_remove, spell_update_config, - store_error, store_response, + get_spell_arg, get_spell_id, scope_get_peer_id, spell_install, spell_list, spell_remove, + spell_update_config, store_error, store_response, }; use crate::utils::process_func_outcome; @@ -46,19 +45,34 @@ pub struct Sorcerer { pub services: ParticleAppServices, pub spell_storage: SpellStorage, pub spell_event_bus_api: SpellEventBusApi, - /// TODO: use owner-specific spell keypairs - pub node_peer_id: PeerId, pub spell_script_particle_ttl: Duration, + pub key_manager: KeyManager, } -type CustomService = ( - // service id - String, - // functions - HashMap, - // unhandled - Option, -); +pub struct SpellBuiltin { + pub service_id: String, + pub functions: HashMap, + pub unhandled: Option, +} + +impl SpellBuiltin { + pub fn new(service_id: &str) -> Self { + Self { + service_id: service_id.to_string(), + functions: Default::default(), + unhandled: None, + } + } + + pub fn append(&mut self, function_name: &str, service_function: ServiceFunction) { + self.functions + .insert(function_name.to_string(), service_function); + } + + pub fn set_unhandled(&mut self, unhandled: ServiceFunction) { + self.unhandled = Some(unhandled) + } +} impl Sorcerer { pub fn new( @@ -66,9 +80,9 @@ impl Sorcerer { modules: ModuleRepository, aquamarine: AquamarineApi, config: ResolvedConfig, - local_peer_id: PeerId, spell_event_bus_api: SpellEventBusApi, - ) -> (Self, Vec) { + key_manager: KeyManager, + ) -> (Self, Vec) { let spell_storage = SpellStorage::create(&config.dir_config.spell_base_dir, &services, &modules) .expect("Spell storage creation"); @@ -78,11 +92,11 @@ impl Sorcerer { services, spell_storage, spell_event_bus_api, - node_peer_id: local_peer_id, spell_script_particle_ttl: config.max_spell_particle_ttl, + key_manager, }; - let spell_service_functions = sorcerer.get_spell_service_functions(); + let spell_service_functions = sorcerer.make_spell_builtins(); (sorcerer, spell_service_functions) } @@ -91,13 +105,14 @@ impl Sorcerer { for spell_id in self.spell_storage.get_registered_spells() { log::info!("Rescheduling spell {}", spell_id); let result: Result<(), JError> = try { + let spell_owner = self.services.get_service_owner(spell_id.clone())?; let result = process_func_outcome::( self.services.call_function( &spell_id, "get_trigger_config", vec![], None, - self.node_peer_id, + spell_owner, self.spell_script_particle_ttl, ), &spell_id, @@ -132,92 +147,137 @@ impl Sorcerer { }) } - fn get_spell_service_functions(&self) -> Vec { - let mut service_functions: Vec = vec![]; + fn make_spell_builtins(&self) -> Vec { + let mut spell_builtins: Vec = vec![]; + + let mut spell_service = SpellBuiltin::new("spell"); + spell_service.append("install", self.make_spell_install_closure()); + spell_service.append("remove", self.make_spell_remove_closure()); + spell_service.append("list", self.make_spell_list_closure()); + spell_service.append( + "update_trigger_config", + self.make_spell_update_config_closure(), + ); + spell_builtins.push(spell_service); + + let mut get_data_srv = SpellBuiltin::new("getDataSrv"); + get_data_srv.append("spell_id", self.make_get_spell_id_closure()); + get_data_srv.set_unhandled(self.make_get_spell_arg_closure()); + spell_builtins.push(get_data_srv); + + let mut error_handler_srv = SpellBuiltin::new("errorHandlingSrv"); + error_handler_srv.append("error", self.make_error_handler_closure()); + spell_builtins.push(error_handler_srv); + + let mut callback_srv = SpellBuiltin::new("callbackSrv"); + callback_srv.append("response", self.make_response_handler_closure()); + spell_builtins.push(callback_srv); + + let mut scope_srv = SpellBuiltin::new("scope"); + scope_srv.append("get_peer_id", self.make_get_scope_peer_id_closure()); + spell_builtins.push(scope_srv); + + spell_builtins + } + + fn make_spell_install_closure(&self) -> ServiceFunction { let services = self.services.clone(); let storage = self.spell_storage.clone(); let spell_event_bus = self.spell_event_bus_api.clone(); - let install_closure: ServiceFunction = Box::new(move |args, params| { + let key_manager = self.key_manager.clone(); + Box::new(move |args, params| { let storage = storage.clone(); let services = services.clone(); let spell_event_bus_api = spell_event_bus.clone(); + let key_manager = key_manager.clone(); async move { - wrap(spell_install(args, params, storage, services, spell_event_bus_api).await) + wrap( + spell_install( + args, + params, + storage, + services, + spell_event_bus_api, + key_manager, + ) + .await, + ) } .boxed() - }); + }) + } + fn make_spell_remove_closure(&self) -> ServiceFunction { let services = self.services.clone(); let storage = self.spell_storage.clone(); let spell_event_bus_api = self.spell_event_bus_api.clone(); - let remove_closure: ServiceFunction = Box::new(move |args, params| { + let key_manager = self.key_manager.clone(); + + Box::new(move |args, params| { let storage = storage.clone(); let services = services.clone(); let api = spell_event_bus_api.clone(); - async move { wrap_unit(spell_remove(args, params, storage, services, api).await) } - .boxed() - }); + let key_manager = key_manager.clone(); + async move { + wrap_unit(spell_remove(args, params, storage, services, api, key_manager).await) + } + .boxed() + }) + } + fn make_spell_list_closure(&self) -> ServiceFunction { let storage = self.spell_storage.clone(); - let list_closure: ServiceFunction = Box::new(move |_args, _params| { + Box::new(move |_, _| { let storage = storage.clone(); async move { wrap(spell_list(storage)) }.boxed() - }); + }) + } + fn make_spell_update_config_closure(&self) -> ServiceFunction { let api = self.spell_event_bus_api.clone(); let services = self.services.clone(); - let update_closure: ServiceFunction = Box::new(move |args, params| { + let key_manager = self.key_manager.clone(); + Box::new(move |args, params| { let api = api.clone(); let services = services.clone(); - async move { wrap_unit(spell_update_config(args, params, services, api).await) }.boxed() - }); - - let functions = hashmap! { - "install".to_string() => install_closure, - "remove".to_string() => remove_closure, - "list".to_string() => list_closure, - "update_trigger_config".to_string() => update_closure, - }; - service_functions.push(("spell".to_string(), functions, None)); + let key_manager = key_manager.clone(); + async move { wrap_unit(spell_update_config(args, params, services, api, key_manager).await) }.boxed() + }) + } - let get_spell_id_closure: ServiceFunction = - Box::new(move |args, params| async move { wrap(get_spell_id(args, params)) }.boxed()); + fn make_get_spell_id_closure(&self) -> ServiceFunction { + Box::new(move |_, params| async move { wrap(get_spell_id(params)) }.boxed()) + } + fn make_get_spell_arg_closure(&self) -> ServiceFunction { let services = self.services.clone(); - let get_spell_arg_closure: ServiceFunction = Box::new(move |args, params| { + Box::new(move |args, params| { let services = services.clone(); async move { wrap(get_spell_arg(args, params, services)) }.boxed() - }); - service_functions.push(( - "getDataSrv".to_string(), - hashmap! {"spell_id".to_string() => get_spell_id_closure}, - Some(get_spell_arg_closure), - )); + }) + } + fn make_error_handler_closure(&self) -> ServiceFunction { let services = self.services.clone(); - let error_handler_closure: ServiceFunction = Box::new(move |args, params| { + Box::new(move |args, params| { let services = services.clone(); async move { wrap_unit(store_error(args, params, services)) }.boxed() - }); - - service_functions.push(( - "errorHandlingSrv".to_string(), - hashmap! {"error".to_string() => error_handler_closure}, - None, - )); + }) + } + fn make_response_handler_closure(&self) -> ServiceFunction { let services = self.services.clone(); - let response_handler_closure: ServiceFunction = Box::new(move |args, params| { + Box::new(move |args, params| { let services = services.clone(); async move { wrap_unit(store_response(args, params, services)) }.boxed() - }); - - service_functions.push(( - "callbackSrv".to_string(), - hashmap! {"response".to_string() => response_handler_closure}, - None, - )); + }) + } - service_functions + fn make_get_scope_peer_id_closure(&self) -> ServiceFunction { + let key_manager = self.key_manager.clone(); + Box::new(move |_, params| { + let key_manager = key_manager.clone(); + async move { wrap(scope_get_peer_id(params, key_manager)) }.boxed() + }) } } diff --git a/sorcerer/src/spells.rs b/sorcerer/src/spells.rs index f95e9c9c1c..8af2999d6a 100644 --- a/sorcerer/src/spells.rs +++ b/sorcerer/src/spells.rs @@ -18,6 +18,7 @@ use serde_json::{json, Value as JValue, Value::Array}; use crate::utils::{parse_spell_id_from, process_func_outcome}; use fluence_spell_dtos::trigger_config::TriggerConfig; +use key_manager::KeyManager; use particle_args::{Args, JError}; use particle_execution::ParticleParams; use particle_services::ParticleAppServices; @@ -31,16 +32,17 @@ pub(crate) async fn spell_install( spell_storage: SpellStorage, services: ParticleAppServices, spell_event_bus_api: SpellEventBusApi, + key_manager: KeyManager, ) -> Result { let mut args = sargs.function_args.clone().into_iter(); let script: String = Args::next("script", &mut args)?; let init_data: String = Args::next("data", &mut args)?; - log::info!("Init data: {}", json!(init_data)); let user_config: TriggerConfig = Args::next("config", &mut args)?; let config = api::from_user_config(user_config.clone())?; - // TODO: create service on behalf of spell keypair - let spell_id = services.create_service(spell_storage.get_blueprint(), params.init_peer_id)?; + let spell_peer_id = key_manager.get_scope_peer_id(params.init_peer_id)?; + + let spell_id = services.create_service(spell_storage.get_blueprint(), spell_peer_id)?; spell_storage.register_spell(spell_id.clone()); // TODO: refactor these service calls @@ -51,7 +53,7 @@ pub(crate) async fn spell_install( "set_script_source_to_file", vec![json!(script)], None, - params.init_peer_id, + spell_peer_id, Duration::from_millis(params.ttl as u64), ), &spell_id, @@ -65,7 +67,7 @@ pub(crate) async fn spell_install( "set_json_fields", vec![json!(init_data)], None, - params.init_peer_id, + spell_peer_id, Duration::from_millis(params.ttl as u64), ), &spell_id, @@ -79,7 +81,7 @@ pub(crate) async fn spell_install( "set_trigger_config", vec![json!(user_config)], None, - params.init_peer_id, + spell_peer_id, Duration::from_millis(params.ttl as u64), ), &spell_id, @@ -94,7 +96,7 @@ pub(crate) async fn spell_install( log::warn!("can't subscribe a spell {} to triggers {:?} via spell-event-bus-api: {}. Removing created spell service...", spell_id, config, err); spell_storage.unregister_spell(&spell_id); - services.remove_service(spell_id, params.init_peer_id, true)?; + services.remove_service(spell_id, spell_peer_id, true)?; return Err(JError::new(format!( "can't install a spell due to an internal error while subscribing to the triggers: {err}" @@ -120,9 +122,12 @@ pub(crate) async fn spell_remove( spell_storage: SpellStorage, services: ParticleAppServices, spell_event_bus_api: SpellEventBusApi, + key_manager: KeyManager, ) -> Result<(), JError> { let mut args = args.function_args.into_iter(); let spell_id: String = Args::next("spell_id", &mut args)?; + let spell_peer_id = key_manager.get_scope_peer_id(params.init_peer_id)?; + if let Err(err) = spell_event_bus_api.unsubscribe(spell_id.clone()).await { log::warn!( "can't unsubscribe a spell {} from its triggers via spell-event-bus-api: {}", @@ -136,7 +141,7 @@ pub(crate) async fn spell_remove( // TODO: remove spells by aliases too spell_storage.unregister_spell(&spell_id); - services.remove_service(spell_id, params.init_peer_id, true)?; + services.remove_service(spell_id, spell_peer_id, true)?; Ok(()) } @@ -145,22 +150,21 @@ pub(crate) async fn spell_update_config( params: ParticleParams, services: ParticleAppServices, spell_event_bus_api: SpellEventBusApi, + key_manager: KeyManager, ) -> Result<(), JError> { let mut args = args.function_args.into_iter(); let spell_id: String = Args::next("spell_id", &mut args)?; + let spell_peer_id = key_manager.get_scope_peer_id(params.init_peer_id)?; let user_config: TriggerConfig = Args::next("config", &mut args)?; let config = api::from_user_config(user_config.clone())?; - // TODO: implement proper permissions management: only the creator and the spell can modify the config - // Can't really do proper permission management here right now, so for now this call does the job. - // It fails for everyone who's not a spell owner, so only the spell owner can update spell's config. process_func_outcome::( services.call_function( &spell_id, "set_trigger_config", vec![json!(user_config)], None, - params.init_peer_id, + spell_peer_id, Duration::from_millis(params.ttl as u64), ), &spell_id, @@ -172,18 +176,15 @@ pub(crate) async fn spell_update_config( .await { log::warn!( - "save config to spell service {} SUCCESS; update trigger subscriptions FAIL: {}", - spell_id, - err + "save config to spell service {spell_id} SUCCESS; update trigger subscriptions FAIL: {err}" ); return Err(JError::new(format!( - "save config to spell service {spell_id} SUCCESS\nupdate trigger subscriptions FAIL: {err}\ncall update_config to try again" - ))); + "save config to spell service {spell_id} SUCCESS\nupdate trigger subscriptions FAIL: {err}\ncall update_config to try again"))); } Ok(()) } -pub(crate) fn get_spell_id(_args: Args, params: ParticleParams) -> Result { +pub(crate) fn get_spell_id(params: ParticleParams) -> Result { Ok(json!(parse_spell_id_from(¶ms.id)?)) } @@ -267,3 +268,13 @@ pub(crate) fn store_response( )) }) } + +// TODO: it's not quite right place for this builtin +pub(crate) fn scope_get_peer_id( + params: ParticleParams, + key_manager: KeyManager, +) -> Result { + Ok(json!(key_manager + .get_scope_peer_id(params.init_peer_id)? + .to_base58())) +}