diff --git a/Cargo.lock b/Cargo.lock index 837fbbf2c8..0b5e5eae15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -256,6 +256,16 @@ version = "1.0.70" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4" +[[package]] +name = "aqua-ipfs-distro" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e7f5f52ae024519186876aadb631e97386f28187e76815bb764c596511101dc" +dependencies = [ + "built 0.5.2", + "maplit", +] + [[package]] name = "aquamarine" version = "0.2.0" @@ -782,63 +792,6 @@ dependencies = [ "cargo-lock", ] -[[package]] -name = "builtins-deployer" -version = "0.1.0" -dependencies = [ - "aquamarine", - "base64 0.21.0", - "eyre", - "fluence-libp2p", - "fs-utils", - "fstrings", - "futures", - "humantime 2.1.0", - "local-vm", - "log", - "maplit", - "now-millis", - "parking_lot 0.12.1", - "particle-args", - "particle-execution", - "particle-modules", - "particle-protocol", - "regex", - "serde_json", - "service-modules", - "tokio", - "tracing", - "uuid-utils", -] - -[[package]] -name = "builtins-tests" -version = "0.1.0" -dependencies = [ - "builtins-deployer", - "connected-client", - "control-macro", - "created-swarm", - "eyre", - "fluence-keypair", - "fs-utils", - "fstrings", - "json-utils", - "libp2p", - "local-vm", - "log", - "log-utils", - "maplit", - "now-millis", - "particle-modules", - "serde", - "serde_json", - "service-modules", - "test-constants", - "test-utils", - "tokio", -] - [[package]] name = "bumpalo" version = "3.11.1" @@ -1435,7 +1388,6 @@ dependencies = [ "air-interpreter-fs", "aquamarine", "base64 0.21.0", - "builtins-deployer", "config-utils", "connection-pool", "derivative", @@ -1760,6 +1712,18 @@ dependencies = [ "syn 1.0.105", ] +[[package]] +name = "decider-distro" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4032fc22844d3ed3f66ce16c81ce226dde08f4a53a63b671c5fbf47e03875ffb" +dependencies = [ + "built 0.5.2", + "fluence-spell-dtos", + "maplit", + "serde_json", +] + [[package]] name = "der" version = "0.6.1" @@ -4453,7 +4417,6 @@ dependencies = [ "base64 0.21.0", "blake3", "bs58", - "builtins-deployer", "config-utils", "connected-client", "connection-pool", @@ -4492,6 +4455,7 @@ dependencies = [ "server-config", "sorcerer", "spell-event-bus", + "system-services", "tokio", "tokio-stream", "tracing", @@ -4542,6 +4506,7 @@ dependencies = [ "script-storage", "serde", "serde_json", + "server-config", "service-modules", "sorcerer", "spell-event-bus", @@ -5657,6 +5622,16 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c" +[[package]] +name = "registry-distro" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c85639be0ec692599f96709399f7441fa7a53034909081239624810b56d44cec" +dependencies = [ + "built 0.5.2", + "maplit", +] + [[package]] name = "remove_dir_all" version = "0.5.3" @@ -6510,6 +6485,36 @@ dependencies = [ "winx", ] +[[package]] +name = "system-services" +version = "0.1.0" +dependencies = [ + "aqua-ipfs-distro", + "decider-distro", + "eyre", + "fluence-app-service", + "fluence-spell-dtos", + "libp2p", + "log", + "maplit", + "now-millis", + "particle-args", + "particle-execution", + "particle-modules", + "particle-services", + "registry-distro", + "serde", + "serde_json", + "server-config", + "service-modules", + "sorcerer", + "spell-event-bus", + "spell-storage", + "toml 0.5.10", + "tracing", + "trust-graph-distro", +] + [[package]] name = "target-lexicon" version = "0.12.6" @@ -7060,6 +7065,19 @@ dependencies = [ "trust-dns-proto", ] +[[package]] +name = "trust-graph-distro" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d84f24ea1f10b42a6803f9e846cf7b93b021f56a7dd6a05ee0b2360a9b69144d" +dependencies = [ + "built 0.5.2", + "lazy_static", + "maplit", + "serde", + "serde_json", +] + [[package]] name = "try-lock" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index 682f17b733..64e6b41b5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,6 @@ members = [ "crates/now-millis", "crates/toml-utils", "crates/air-interpreter-fs", - "crates/builtins-deployer", "crates/created-swarm", "crates/toy-vms", "crates/connected-client", @@ -29,7 +28,6 @@ members = [ "crates/spell-event-bus", "crates/key-manager", "sorcerer", - "crates/builtins-tests", "crates/nox-tests", "nox", "aquamarine", @@ -41,6 +39,7 @@ members = [ "script-storage", "spell-storage", "particle-execution", + "crates/system-services" ] exclude = [ "nox/tests/tetraplets", @@ -66,7 +65,6 @@ async-unlock = { path = "crates/async-unlock" } now-millis = { path = "crates/now-millis" } toml-utils = { path = "crates/toml-utils" } air-interpreter-fs = { path = "crates/air-interpreter-fs" } -builtins-deployer = { path = "crates/builtins-deployer" } created-swarm = { path = "crates/created-swarm" } toy-vms = { path = "crates/toy-vms" } connected-client = { path = "crates/connected-client" } @@ -76,7 +74,6 @@ spell-event-bus = { path = "crates/spell-event-bus" } key-manager = { path = "crates/key-manager" } cid-utils = { path = "crates/cid-utils" } sorcerer = { path = "sorcerer" } -builtins-tests = { path = "crates/builtins-tests" } nox = { path = "nox" } aquamarine = { path = "aquamarine" } particle-protocol = { path = "particle-protocol" } @@ -87,6 +84,7 @@ connection-pool = { path = "connection-pool" } script-storage = { path = "script-storage" } spell-storage = { path = "spell-storage" } particle-execution = { path = "particle-execution" } +system-services = { path = "crates/system-services" } # spell fluence-spell-dtos = "=0.5.11" diff --git a/Makefile b/Makefile index bb36a75747..dc09463827 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ test: cargo test --release server: - RUST_LOG="info,tide=off,tracing=off,avm_server=off,run-console=debug" \ + RUST_LOG="info,tide=off,tracing=off,avm_server=off,run-console=debug,system_services=debug,sorcerer::spell_builtins=debug,sorcerer=debug" \ cargo run --release -p nox server-debug: diff --git a/crates/builtins-deployer/Cargo.toml b/crates/builtins-deployer/Cargo.toml deleted file mode 100644 index e748c78e95..0000000000 --- a/crates/builtins-deployer/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "builtins-deployer" -version = "0.1.0" -authors = ["Fluence Labs"] -edition = "2021" - -[dependencies] -particle-modules = { workspace = true } -particle-protocol = { workspace = true } -particle-execution = { workspace = true } -particle-args = { workspace = true } -aquamarine = { workspace = true } -service-modules = { workspace = true } -local-vm = { workspace = true } -fs-utils = { workspace = true } -uuid-utils = { workspace = true } -now-millis = { workspace = true } -fluence-libp2p = { workspace = true } -tracing = { workspace = true } - -futures = { workspace = true } -tokio = { workspace = true } -parking_lot = { workspace = true } -eyre = { workspace = true } -maplit = { workspace = true } -serde_json = { workspace = true } -log = { workspace = true } -base64 = { workspace = true } -regex = "1.8.1" -fstrings = { workspace = true } -humantime = "2.1.0" diff --git a/crates/builtins-deployer/src/builtin.rs b/crates/builtins-deployer/src/builtin.rs deleted file mode 100644 index 80c32771f7..0000000000 --- a/crates/builtins-deployer/src/builtin.rs +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2021 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use particle_modules::{AddBlueprint, NamedModuleConfig}; - -#[derive(Debug)] -pub struct ScheduledScript { - pub name: String, - pub data: String, - pub interval_sec: u64, -} - -#[derive(Debug)] -pub struct Module { - // .wasm data - pub data: Vec, - // parsed json module config - pub config: NamedModuleConfig, -} - -#[derive(Debug)] -pub struct Builtin { - // builtin alias - pub name: String, - // list of dependencies - pub modules: Vec, - pub blueprint: AddBlueprint, - pub blueprint_id: String, - pub on_start_script: Option, - pub on_start_data: Option, - pub scheduled_scripts: Vec, -} diff --git a/crates/builtins-deployer/src/builtins_deployer.rs b/crates/builtins-deployer/src/builtins_deployer.rs deleted file mode 100644 index a9c30a0ce0..0000000000 --- a/crates/builtins-deployer/src/builtins_deployer.rs +++ /dev/null @@ -1,498 +0,0 @@ -/* - * Copyright 2021 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::path::PathBuf; -use std::time::{Duration, Instant}; -use std::{collections::HashMap, fs}; - -use aquamarine::AquamarineApi; -use base64::{engine::general_purpose::STANDARD as base64, Engine}; -use eyre::{eyre, ErrReport, Result, WrapErr}; -use fluence_libp2p::PeerId; -use fs_utils::list_files; -use fs_utils::{file_name, to_abs_path}; -use futures::FutureExt; -use humantime::format_duration as pretty; -use local_vm::{client_functions, wrap_script}; -use maplit::hashmap; -use now_millis::now_ms; -use particle_args::Args; -use particle_execution::ServiceFunction; -use particle_protocol::Particle; -use serde_json::{json, Value as JValue}; -use service_modules::Blueprint; -use tokio::sync::oneshot::channel; -use uuid_utils::uuid; - -use crate::builtin::{Builtin, Module}; -use crate::utils::{ - assert_ok, load_blueprint, load_modules, load_scheduled_scripts, resolve_env_variables, -}; - -pub struct BuiltinsDeployer { - startup_peer_id: PeerId, - node_peer_id: PeerId, - aquamarine: AquamarineApi, - builtins_base_dir: PathBuf, - particle_ttl: Duration, - // if set to true, remove existing builtins before deploying - force_redeploy: bool, - // the number of ping attempts to check the readiness of the vm pool - retry_attempts_count: u16, -} - -impl BuiltinsDeployer { - pub fn new( - startup_peer_id: PeerId, - node_peer_id: PeerId, - aquamarine: AquamarineApi, - base_dir: PathBuf, - particle_ttl: Duration, - force_redeploy: bool, - retry_attempts_count: u16, - ) -> Self { - Self { - startup_peer_id, - node_peer_id, - aquamarine, - builtins_base_dir: base_dir, - particle_ttl, - force_redeploy, - retry_attempts_count, - } - } - - async fn send_particle( - &mut self, - script: String, - mut data: HashMap, - ) -> eyre::Result> { - data.insert("node".to_string(), json!(self.node_peer_id.to_string())); - data.insert("relay".to_string(), json!(self.node_peer_id.to_string())); - - // TODO: set to true if AIR script is generated from Aqua - let script = wrap_script(script, &data, None, false, Some(self.node_peer_id)); - let (outlet, inlet) = channel(); - - let mut outlet = Some(outlet); - let closure = move |args: Args, _| { - let result = client_functions(&data, args); - if let Some(returned) = result.returned { - if let Some(outlet) = outlet.take() { - outlet.send(returned).expect("send response back") - } else { - log::info!("outlet is None") - } - } - - let outcome = result.outcome; - async { outcome }.boxed() - }; - let aquamarine = self.aquamarine.clone(); - - let particle = Particle { - id: uuid(), - init_peer_id: self.startup_peer_id, - timestamp: now_ms() as u64, - ttl: self.particle_ttl.as_millis() as u32, - script, - signature: vec![], - data: vec![], - }; - let sent = Instant::now(); - - let result = try { - aquamarine - .execute( - particle.clone(), - Some(ServiceFunction::Mut(tokio::sync::Mutex::new(Box::new( - closure, - )))), - ) - .await?; - - let result = inlet.await; - - result - .map_err(|err| { - let failed = sent.elapsed(); - eyre!( - "error reading from inlet: {:?} (TTL = {}, duration = {})", - err, - pretty(self.particle_ttl), - pretty(failed) - ) - })? - .map_err(|args| eyre!("AIR caught an error on args: {:?}", args))? - }; - - let finished = sent.elapsed(); - tracing::debug!(target: "execution", particle_id = particle.id, "sending particle took {}", pretty(finished)); - result - } - - async fn add_module(&mut self, module: &Module) -> eyre::Result<()> { - let script = r#" - (xor - (seq - (call node ("dist" "add_module") [module_bytes module_config]) - (call node ("op" "return") [true]) - ) - (call node ("op" "return") [%last_error%.$.instruction]) - ) - "# - .to_string(); - - let data = hashmap! { - "module_bytes".to_string() => json!(base64.encode(&module.data)), - "module_config".to_string() => json!(module.config), - }; - - let result = self - .send_particle(script, data) - .await - .wrap_err("add_module call failed")?; - - assert_ok(result, "add_module call failed") - } - - async fn remove_service(&mut self, name: String) -> eyre::Result<()> { - let script = r#" - (xor - (seq - (call node ("srv" "remove") [name]) - (call node ("op" "return") [true]) - ) - (call node ("op" "return") [%last_error%.$.instruction]) - ) - "# - .to_string(); - - let result = self - .send_particle(script, hashmap! {"name".to_string() => json!(name)}) - .await - .wrap_err(format!("remove_service call failed, service {name}"))?; - - assert_ok(result, "remove_service call failed") - } - - async fn create_service(&mut self, builtin: &Builtin) -> eyre::Result<()> { - let script = r#" - (xor - (seq - (call node ("dist" "add_blueprint") [blueprint] blueprint_id) - (seq - (call node ("srv" "create") [blueprint_id] service_id) - (seq - (call node ("srv" "add_alias") [alias service_id] result) - (call node ("op" "return") [true]) - ) - ) - ) - (call node ("op" "return") [%last_error%.$.instruction]) - ) - "# - .to_string(); - - let data = hashmap! { - "blueprint".to_string() => json!(builtin.blueprint.to_string()?), - "alias".to_string() => json!(builtin.name), - }; - - let result = self.send_particle(script, data).await.wrap_err(format!( - "send_particle in create_service call failed, service {}", - builtin.name - ))?; - - assert_ok( - result, - format!("create_service for {} call failed", builtin.name).as_str(), - )?; - - log::info!("service {} was created!", builtin.name); - - Ok(()) - } - - // TODO: right now, if AIR in on_start.air is invalid, everything just hangs - // https://github.com/fluencelabs/fluence/issues/1214 - async fn run_on_start(&mut self, builtin: &Builtin) -> eyre::Result<()> { - if builtin.on_start_script.is_some() && builtin.on_start_data.is_some() { - let data: HashMap = serde_json::from_str(&resolve_env_variables( - builtin.on_start_data.as_ref().unwrap(), - &builtin.name, - )?)?; - - let res = self - .send_particle(builtin.on_start_script.as_ref().unwrap().to_string(), data) - .await - .wrap_err("on_start send_particle failed")?; - return assert_ok(res, "on_start call failed"); - } - - Ok(()) - } - - async fn run_scheduled_scripts(&mut self, builtin: &Builtin) -> eyre::Result<()> { - for scheduled_script in builtin.scheduled_scripts.iter() { - let script = r#" - (xor - (seq - (call node ("script" "add") [script interval_sec]) - (call node ("op" "return") [true]) - ) - (call node ("op" "return") [%last_error%.$.instruction]) - ) - "# - .to_string(); - - let data = hashmap! { - "script".to_string() => json!(scheduled_script.data), - "interval_sec".to_string() => json!(scheduled_script.interval_sec), - }; - - let res = self.send_particle(script, data).await.wrap_err(format!( - "scheduled script {} run failed", - scheduled_script.name - ))?; - - assert_ok( - res, - &format!("scheduled script {} run failed", scheduled_script.name), - )?; - } - - Ok(()) - } - - #[tracing::instrument(skip(self))] - async fn wait_for_vm_pool(&mut self) -> Result<()> { - let mut attempt = 0u16; - loop { - attempt += 1; - - let result: eyre::Result<()> = try { - let script = r#" - (seq - (null) - (call node ("op" "return") [true]) - ) - "# - .to_string(); - - let res = self - .send_particle(script, hashmap! {}) - .await - .map_err(|e| eyre::eyre!("ping send_particle #{} failed: {}", attempt, e))?; - - assert_ok(res, &format!("ping call #{attempt} failed"))? - }; - - if let Err(err) = result { - log::warn!("Attempt to ping vm pool failed: {}", err); - - if attempt > self.retry_attempts_count { - return Err(eyre::eyre!( - "Attempts limit exceeded. Can't connect to vm pool: {}", - err - )); - } - } else { - break; - } - } - - Ok(()) - } - - #[tracing::instrument(skip(self))] - pub async fn deploy_builtin_services(&mut self) -> Result<()> { - let from_disk = self.list_builtins()?; - if from_disk.is_empty() { - log::info!("No builtin services found at {:?}", self.builtins_base_dir); - return Ok(()); - } - - log::info!( - "{} builtin services found at {:?}", - from_disk.len(), - self.builtins_base_dir - ); - - self.wait_for_vm_pool().await?; - let mut local_services = self.get_service_blueprints().await?; - let mut to_create = vec![]; - let mut to_start = vec![]; - - // if force_redeploy is set, then first remove all builtins - if self.force_redeploy { - for builtin in from_disk.iter() { - if local_services.contains_key(&builtin.name) { - self.remove_service(builtin.name.clone()).await?; - local_services.remove(&builtin.name); - } - } - } - - for builtin in from_disk.iter() { - // check if builtin is already deployed - match local_services.get(&builtin.name) { - // already deployed - // if blueprint_id has changed, then redeploy builtin - Some(bp_id) if *bp_id != builtin.blueprint_id => { - self.remove_service(builtin.name.clone()).await?; - to_create.push(builtin) - } - // already deployed with expected blueprint_id - Some(_) => { - to_start.push(builtin); - } - // isn't deployed yet - None => to_create.push(builtin), - } - } - - for builtin in to_create { - let result: Result<()> = try { - self.upload_modules(builtin).await?; - self.create_service(builtin).await?; - to_start.push(builtin); - }; - - if let Err(err) = result { - log::error!("Builtin service {} init is failed: {}", builtin.name, err); - return Err(err); - } - } - - for builtin in to_start.into_iter() { - self.run_on_start(builtin).await?; - self.run_scheduled_scripts(builtin).await?; - - log::info!("Builtin service {} successfully started", builtin.name); - } - - Ok(()) - } - - async fn upload_modules(&mut self, builtin: &Builtin) -> Result<()> { - for module in builtin.modules.iter() { - self.add_module(module) - .await - .wrap_err(format!("builtin {} module upload failed", builtin.name))?; - } - - Ok(()) - } - - #[tracing::instrument(skip(self))] - fn list_builtins(&self) -> Result> { - let builtins_dir = to_abs_path(self.builtins_base_dir.clone()); - let builtins = list_files(&builtins_dir) - .ok_or_else(|| eyre!("{:#?} directory not found", builtins_dir))? - .filter(|p| p.is_dir()); - - let (successful, failed): (Vec, Vec) = builtins.fold( - (vec![], vec![]), - |(mut successful, mut failed): (Vec, Vec), path| { - let result = try { - let name = file_name(&path)?; - let add_blueprint = load_blueprint(&path)?; - let modules = load_modules(&path, &add_blueprint.dependencies)?; - let blueprint = Blueprint::new(add_blueprint.clone())?; - let scheduled_scripts = load_scheduled_scripts(&path)?; - - Builtin { - name, - modules, - blueprint: add_blueprint, - blueprint_id: blueprint.id, - on_start_script: fs::read_to_string(path.join("on_start.air")).ok(), - on_start_data: fs::read_to_string(path.join("on_start.json")).ok(), - scheduled_scripts, - } - }; - - match result { - Ok(builtin) => successful.push(builtin), - Err(err) => failed.push(err), - } - (successful, failed) - }, - ); - - failed - .iter() - .map(|err| { - log::error!("builtin load failed: {:#}", err); - }) - .for_each(drop); - - if !failed.is_empty() { - Err(eyre!( - "failed to load builtins from disk {:?}", - builtins_dir - )) - } else { - Ok(successful) - } - } - - async fn get_service_blueprints(&mut self) -> Result> { - let script = r#" - (xor - (seq - (call node ("srv" "list") [] list) - (call node ("op" "return") [list]) - ) - (call node ("op" "return") [%last_error%.$.instruction]) - ) - "# - .to_string(); - let result = self - .send_particle(script, hashmap! {}) - .await - .wrap_err("srv list call failed")?; - let result = match result.get(0) { - Some(JValue::Array(result)) => result, - _ => return Err(eyre!("list_services call failed")), - }; - - let mut blueprint_ids = hashmap! {}; - - for p in result.iter() { - let blueprint_id = match p.get("blueprint_id") { - Some(JValue::String(id)) => id, - _ => return Err(eyre!("list_services call failed")), - }; - - let aliases = match p.get("aliases") { - Some(JValue::Array(aliases)) => aliases, - _ => return Err(eyre!("list_services call failed")), - }; - - for alias in aliases.iter() { - let alias = alias - .as_str() - .ok_or_else(|| eyre!("list_services call failed"))? - .to_string(); - blueprint_ids.insert(alias, blueprint_id.clone()); - } - } - - Ok(blueprint_ids) - } -} diff --git a/crates/builtins-deployer/src/lib.rs b/crates/builtins-deployer/src/lib.rs deleted file mode 100644 index 9fd3656b95..0000000000 --- a/crates/builtins-deployer/src/lib.rs +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2021 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#![feature(try_blocks)] -#![warn(rust_2018_idioms)] -#![deny( - dead_code, - nonstandard_style, - unused_imports, - unused_mut, - unused_variables, - unused_unsafe, - unreachable_patterns -)] - -#[macro_use] -extern crate fstrings; - -pub use crate::builtins_deployer::BuiltinsDeployer; - -mod builtin; -mod builtins_deployer; -mod utils; - -pub static ALLOWED_ENV_PREFIX: &str = "$FLUENCE_ENV"; diff --git a/crates/builtins-deployer/src/utils.rs b/crates/builtins-deployer/src/utils.rs deleted file mode 100644 index 1402e85ed0..0000000000 --- a/crates/builtins-deployer/src/utils.rs +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2021 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::path::Path; -use std::{env, fs}; - -use eyre::{eyre, Result, WrapErr}; -use regex::Regex; -use serde_json::Value as JValue; - -use fs_utils::{file_stem, list_files}; -use particle_modules::AddBlueprint; -use service_modules::{module_config_name_json, module_file_name_hash, Hash}; - -use crate::builtin::{Module, ScheduledScript}; -use crate::ALLOWED_ENV_PREFIX; - -pub fn assert_ok(result: Vec, err_msg: &str) -> eyre::Result<()> { - match &result[..] { - [JValue::String(s)] if s == "ok" => Ok(()), - [JValue::Bool(true)] => Ok(()), - _ => Err(eyre!("{}: {:?}", err_msg.to_string(), result)), - } -} - -pub fn load_modules(path: &Path, dependencies: &[Hash]) -> Result> { - let mut modules: Vec = vec![]; - for hash in dependencies.iter() { - let config = path.join(&module_config_name_json(hash)); - let module = path.join(&module_file_name_hash(hash)); - - modules.push(Module { - data: fs::read(module.clone()).wrap_err(eyre!("module {:?} not found", module))?, - config: serde_json::from_str( - &fs::read_to_string(config.clone()) - .wrap_err(eyre!("config {:?} not found", config))?, - )?, - }); - } - - Ok(modules) -} - -pub fn load_blueprint(path: &Path) -> Result { - AddBlueprint::decode( - &fs::read(path.join("blueprint.json")).wrap_err(eyre!("blueprint {:?} not found", path))?, - ) -} - -pub fn load_scheduled_scripts(path: &Path) -> Result> { - let mut scripts = vec![]; - if let Some(files) = list_files(&path.join("scheduled")) { - for path in files { - let data = fs::read_to_string(&path)?; - let name = file_stem(&path)?; - - let mut script_info = name.split('_'); - let name = script_info - .next() - .ok_or_else(|| { - eyre!( - "invalid script name {}, should be in %name%_%interval_in_sec%.air form", - name - ) - })? - .to_string(); - let interval_sec: u64 = script_info - .next() - .ok_or_else(|| { - eyre!( - "invalid script name {}, should be in %name%_%interval_in_sec%.air form", - name - ) - })? - .parse()?; - - scripts.push(ScheduledScript { - name, - data, - interval_sec, - }); - } - } - - Ok(scripts) -} - -pub fn resolve_env_variables(data: &str, service_name: &str) -> Result { - let mut result = data.to_string(); - let env_prefix = format!( - "{}_{}", - ALLOWED_ENV_PREFIX, - service_name.to_uppercase().replace('-', "_") - ); - - let re = Regex::new(&f!(r"(\{env_prefix}_\w+)"))?; - for elem in re.captures_iter(data) { - result = result.replace( - &elem[0], - &env::var(&elem[0][1..]).map_err(|e| eyre!("{}: {}", e.to_string(), &elem[0][1..]))?, - ); - } - - Ok(result) -} diff --git a/crates/builtins-tests/.gitignore b/crates/builtins-tests/.gitignore deleted file mode 100644 index 3133a90483..0000000000 --- a/crates/builtins-tests/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -services/** -services/* diff --git a/crates/builtins-tests/Cargo.toml b/crates/builtins-tests/Cargo.toml deleted file mode 100644 index c442054a4a..0000000000 --- a/crates/builtins-tests/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "builtins-tests" -version = "0.1.0" -edition = "2021" -publish = false - -[dev-dependencies] -fstrings = { workspace = true } -eyre = { workspace = true } -maplit = { workspace = true } -serde_json = "1.0.96" -serde = { workspace = true } -log = "0.4.17" -libp2p = { workspace = true } - -fluence-keypair = { workspace = true } -tokio = { workspace = true } - -test-utils = { path = "../test-utils" } -log-utils = { path = "../log-utils" } -fs-utils = { path = "../fs-utils" } -service-modules = { path = "../service-modules" } -builtins-deployer = { path = "../builtins-deployer" } -particle-modules = { path = "../../particle-modules" } -created-swarm = { path = "../created-swarm" } -connected-client = { path = "../connected-client" } -test-constants = { path = "../test-constants" } -now-millis = { path = "../now-millis" } -local-vm = { path = "../local-vm" } -json-utils = { path = "../json-utils" } -control-macro = { path = "../control-macro" } diff --git a/crates/builtins-tests/build.rs b/crates/builtins-tests/build.rs deleted file mode 100644 index 5469e5631c..0000000000 --- a/crates/builtins-tests/build.rs +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2021 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::process::Command; - -fn main() { - let status = Command::new("./download_builtins.sh").status().unwrap(); - assert!(status.success(), "download_builtins failed with {status:?}"); - println!("cargo:rerun-if-changed=download_builtins.sh"); -} diff --git a/crates/builtins-tests/download_builtins.sh b/crates/builtins-tests/download_builtins.sh deleted file mode 100755 index eec066d634..0000000000 --- a/crates/builtins-tests/download_builtins.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env bash -set -o pipefail -o nounset -o errexit - -# get script directory -BUILTINS_DIR="$(dirname "$0")" -SERVICES_DIR="${BUILTINS_DIR}/../nox-tests/tests/builtins/services" -TAR="registry.tar.gz" - -echo "*** downloading $TAR ***" -URL="https://github.com/fluencelabs/registry/releases/download/registry-v0.8.5/registry.tar.gz" -curl --fail -L "$URL" -o "$TAR" - -echo "*** extracting $TAR ***" -mkdir -p "$SERVICES_DIR" -tar -C "$SERVICES_DIR" -xf "$TAR" - -rm "$TAR" - -echo "*** done ***" diff --git a/crates/builtins-tests/src/lib.rs b/crates/builtins-tests/src/lib.rs deleted file mode 100644 index 8b13789179..0000000000 --- a/crates/builtins-tests/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/builtins-tests/tests/src/builtins_deployer.rs b/crates/builtins-tests/tests/src/builtins_deployer.rs deleted file mode 100644 index 0591733f69..0000000000 --- a/crates/builtins-tests/tests/src/builtins_deployer.rs +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Copyright 2021 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::time::Duration; -use std::{env, fs, path::Path}; - -use eyre::WrapErr; -use fluence_keypair::KeyPair; -use fstrings::f; -use maplit::hashmap; -use serde::Deserialize; -use serde_json::json; - -use builtins_deployer::ALLOWED_ENV_PREFIX; -use connected_client::ConnectedClient; -use created_swarm::{make_swarms_with_builtins, make_swarms_with_keypair}; -use fs_utils::copy_dir_all; -use fs_utils::list_files; -use service_modules::load_module; -use test_utils::create_service; - -use crate::{SERVICES, SPELL}; - -async fn check_registry_builtin(client: &mut ConnectedClient) { - // TODO: get rid of FIVE SECONDS sleep - tokio::time::sleep(Duration::from_millis(5000)).await; - - let mut result = client - .execute_particle( - r#"(xor - (seq - (seq - (call relay ("srv" "resolve_alias") [alias] service_id) - (seq - (call relay ("peer" "timestamp_sec") [] timestamp) - (call relay (service_id "get_key_id") [label %init_peer_id%] result) - ) - ) - (call %init_peer_id% ("op" "return") [result]) - ) - (call %init_peer_id% ("op" "return") [%last_error%]) - ) - "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "alias" => json!("registry"), - "label" => json!("some_label"), - }, - ) - .await - .unwrap(); - match result.pop() { - Some(serde_json::Value::String(s)) => assert!(!s.contains("error")), - other => panic!("expected json string, got {:?}", other), - } -} - -#[tokio::test] -async fn builtins_test() { - let swarms = - make_swarms_with_builtins(1, Path::new(SERVICES), None, Some(SPELL.to_string())).await; - - let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) - .await - .wrap_err("connect client") - .unwrap(); - - check_registry_builtin(&mut client).await; -} - -#[tokio::test] -async fn builtins_replace_old() { - let keypair = KeyPair::generate_ed25519(); - let swarms = make_swarms_with_keypair(1, keypair.clone(), Some(SPELL.to_string())).await; - - let mut client = ConnectedClient::connect_with_keypair( - swarms[0].multiaddr.clone(), - Some(swarms[0].management_keypair.clone()), - ) - .await - .wrap_err("connect client") - .unwrap(); - - // use tetraplets as aqua-dht to emulate old builtin being replaced by a new version - let tetraplets_service = create_service( - &mut client, - "tetraplets", - load_module("../nox-tests/tests/tetraplets/artifacts", "tetraplets").expect("load module"), - ) - .await; - - let result = client - .execute_particle( - r#" - (xor - (seq - (call relay ("srv" "add_alias") [alias service]) - (call %init_peer_id% ("op" "return") ["ok"]) - ) - (call %init_peer_id% ("op" "return") [%last_error%.$.instruction]) - ) - "#, - hashmap! { - "relay" => json!(client.node.to_string()), - "service" => json!(tetraplets_service.id), - "alias" => json!("aqua-dht".to_string()), - }, - ) - .await - .unwrap(); - - let result = result[0].as_str().unwrap(); - assert_eq!(result, "ok"); - - // stop swarm - swarms.into_iter().map(|s| s.outlet.send(())).for_each(drop); - - // restart with same keypair - let swarms = make_swarms_with_builtins( - 1, - Path::new(SERVICES), - Some(keypair), - Some(SPELL.to_string()), - ) - .await; - - let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) - .await - .wrap_err("connect client") - .unwrap(); - - check_registry_builtin(&mut client).await; -} - -#[tokio::test] -async fn builtins_scheduled_scripts() { - let swarms = - make_swarms_with_builtins(1, Path::new(SERVICES), None, Some(SPELL.to_string())).await; - - let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) - .await - .wrap_err("connect client") - .unwrap(); - - let result = client - .execute_particle( - r#"(xor - (seq - (call relay ("script" "list") [] result) - (call %init_peer_id% ("op" "return") [result]) - ) - (call %init_peer_id% ("op" "return") [%last_error%.$.instruction]) - ) - "#, - hashmap! { - "relay" => json!(client.node.to_string()), - }, - ) - .await - .unwrap(); - - let result = result[0].as_array().unwrap(); - - let mut scripts_count = 0; - for dir in list_files(Path::new(SERVICES)).unwrap() { - scripts_count += list_files(&dir.join("scheduled")).unwrap().count(); - } - assert_eq!(result.len(), scripts_count) -} - -#[tokio::test] -#[ignore] -async fn builtins_resolving_env_variables() { - copy_dir_all(SERVICES, "./builtins_test_env").unwrap(); - let key = "some_key".to_string(); - let on_start_script = f!(r#" - (xor - (seq - (seq - (call relay ("peer" "timestamp_sec") [] timestamp0) - (call relay ("aqua-dht" "register_key") [key timestamp0 false 0]) - ) - (call relay ("op" "return") ["ok"]) - ) - (call relay ("op" "return") [%last_error%.$.instruction]) - ) - "#); - let env_variable_name = format!("{}_AQUA_DHT_{}", ALLOWED_ENV_PREFIX, "KEY"); - let on_start_data = json!({ "key": env_variable_name }); - env::set_var(&env_variable_name[1..], key.clone()); - fs::write("./builtins_test_env/aqua-dht/on_start.air", on_start_script).unwrap(); - fs::write( - "./builtins_test_env/aqua-dht/on_start.json", - on_start_data.to_string(), - ) - .unwrap(); - - let swarms = make_swarms_with_builtins( - 1, - Path::new("./builtins_test_env"), - None, - Some(SPELL.to_string()), - ) - .await; - let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) - .await - .wrap_err("connect client") - .unwrap(); - - let result = client - .execute_particle( - f!(r#"(xor - (seq - (seq - (call relay ("peer" "timestamp_sec") [] timestamp1) - (call relay ("aqua-dht" "get_key_metadata") ["{key}" timestamp1] result) - ) - (call %init_peer_id% ("op" "return") [result]) - ) - (call %init_peer_id% ("op" "return") [%last_error%.$.instruction]) - ) - "#), - hashmap! { - "relay" => json!(client.node.to_string()), - }, - ) - .await - .unwrap(); - - #[derive(Deserialize)] - pub struct Key { - pub key: String, - pub peer_id: String, - pub timestamp_created: u64, - pub pinned: bool, - pub weight: u32, - } - - #[derive(Deserialize)] - pub struct GetKeyMetadataResult { - pub success: bool, - pub error: String, - pub key: Key, - } - - let result = result.into_iter().next().unwrap(); - let result: GetKeyMetadataResult = serde_json::from_value(result).unwrap(); - - assert!(result.success); - assert_eq!(key, result.key.key); - fs::remove_dir_all("./builtins_test_env").unwrap(); -} diff --git a/crates/builtins-tests/tests/tests.rs b/crates/builtins-tests/tests/tests.rs deleted file mode 100644 index d131953b1e..0000000000 --- a/crates/builtins-tests/tests/tests.rs +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2021 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#[macro_use] -extern crate fstrings; - -pub const SERVICES: &'static str = "../nox-tests/tests/builtins/services"; -pub const SPELL: &'static str = "../nox-tests/spell"; - -mod src { - mod builtins_deployer; -} diff --git a/crates/created-swarm/Cargo.toml b/crates/created-swarm/Cargo.toml index f256dfbb34..e580547cb8 100644 --- a/crates/created-swarm/Cargo.toml +++ b/crates/created-swarm/Cargo.toml @@ -18,7 +18,6 @@ connection-pool = { workspace = true } script-storage = { workspace = true } fs-utils = { workspace = true } air-interpreter-fs = { workspace = true } -builtins-deployer = { workspace = true } toy-vms = { workspace = true } fluence-keypair = { workspace = true } diff --git a/crates/created-swarm/src/swarm.rs b/crates/created-swarm/src/swarm.rs index 85de49be92..4fba52a8df 100644 --- a/crates/created-swarm/src/swarm.rs +++ b/crates/created-swarm/src/swarm.rs @@ -15,7 +15,6 @@ */ use std::convert::identity; -use std::path::Path; use std::{path::PathBuf, time::Duration}; use derivative::Derivative; @@ -133,23 +132,6 @@ pub async fn make_swarms_with_keypair( .await } -pub async fn make_swarms_with_builtins( - n: usize, - path: &Path, - keypair: Option, - spell_base_dir: Option, -) -> Vec { - make_swarms_with_cfg(n, |mut cfg| { - if let Some(keypair) = &keypair { - cfg.keypair = keypair.clone(); - } - cfg.builtins_dir = Some(to_abs_path(path.into())); - cfg.spell_base_dir = spell_base_dir.clone().map(PathBuf::from); - cfg - }) - .await -} - pub async fn make_swarms_with( n: usize, mut create_node: F, @@ -243,6 +225,7 @@ pub struct SwarmConfig { pub spell_base_dir: Option, pub timer_resolution: Duration, pub allowed_binaries: Vec, + pub enabled_system_services: Vec, } impl SwarmConfig { @@ -262,7 +245,8 @@ impl SwarmConfig { builtins_dir: None, spell_base_dir: None, timer_resolution: default_script_storage_timer_resolution(), - allowed_binaries: vec![], + allowed_binaries: vec!["/usr/bin/ipfs".to_string(), "/usr/bin/curl".to_string()], + enabled_system_services: vec![], } } } @@ -354,6 +338,7 @@ pub fn create_swarm_with_runtime( resolved.node_config.script_storage_timer_resolution = config.timer_resolution; resolved.node_config.allowed_binaries = config.allowed_binaries.clone(); + resolved.system_services.enable = config.enabled_system_services.clone(); let management_kp = fluence_keypair::KeyPair::generate_ed25519(); let management_peer_id = libp2p::identity::Keypair::from(management_kp.clone()) diff --git a/crates/log-utils/src/lib.rs b/crates/log-utils/src/lib.rs index 02879d16cf..74d7da8d55 100644 --- a/crates/log-utils/src/lib.rs +++ b/crates/log-utils/src/lib.rs @@ -26,7 +26,6 @@ pub fn enable_logs() { tracing_subscriber::EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy() - .add_directive("builtins_deployer=trace".parse().unwrap()) .add_directive("script_storage=trace".parse().unwrap()) .add_directive("run-console=trace".parse().unwrap()) .add_directive("sorcerer=trace".parse().unwrap()) diff --git a/crates/nox-tests/Cargo.toml b/crates/nox-tests/Cargo.toml index 819c0817ab..f858739dc8 100644 --- a/crates/nox-tests/Cargo.toml +++ b/crates/nox-tests/Cargo.toml @@ -20,6 +20,7 @@ now-millis = { path = "../now-millis" } local-vm = { path = "../local-vm" } control-macro = { path = "../control-macro" } json-utils = { path = "../json-utils" } +server-config = { workspace = true } log-utils = { workspace = true } fluence-spell-dtos = { workspace = true } diff --git a/crates/nox-tests/tests/builtin.rs b/crates/nox-tests/tests/builtin.rs index 6b6b6a6ff2..12dc9c83f7 100644 --- a/crates/nox-tests/tests/builtin.rs +++ b/crates/nox-tests/tests/builtin.rs @@ -19,7 +19,7 @@ extern crate fstrings; use connected_client::ConnectedClient; use created_swarm::{ - make_swarms, make_swarms_with_builtins, make_swarms_with_keypair, + make_swarms, make_swarms_with_cfg, make_swarms_with_keypair, make_swarms_with_transport_and_mocked_vm, }; use eyre::{Report, WrapErr}; @@ -37,6 +37,7 @@ use now_millis::now_ms; use particle_protocol::Particle; use serde::Deserialize; use serde_json::{json, Value as JValue}; +use server_config::system_services_config::ServiceKey::Registry; use service_modules::load_module; use std::collections::HashMap; use std::str::FromStr; @@ -1466,15 +1467,17 @@ async fn service_stats_uninitialized() { } } +// Since KeyPair isn't in use for builtins anymore, we can't use this check as it is +// TODO: Need to ask Loysha to fix this test properly +/* #[tokio::test] async fn sign_verify() { let kp = KeyPair::generate_ed25519(); - let swarms = make_swarms_with_builtins( - 1, - "tests/builtins/services".as_ref(), - Some(kp.clone()), - None, - ) + let swarms = make_swarms_with_cfg(1, |mut cfg| { + cfg.disabled_system_services + .retain(|service| service != "registry"); + cfg + }) .await; let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) @@ -1512,7 +1515,10 @@ async fn sign_verify() { { let data: Vec<_> = data.iter().map(|n| n.as_u64().unwrap() as u8).collect(); - assert!(sig_result["success"].as_bool().unwrap()); + assert!( + sig_result["success"].as_bool().unwrap(), + "result isn't success" + ); let signature = sig_result["signature"].as_array().unwrap()[0] .as_array() .unwrap() @@ -1520,16 +1526,24 @@ async fn sign_verify() { .map(|n| n.as_u64().unwrap() as u8) .collect(); let signature = Signature::from_bytes(kp.public().get_key_format(), signature); - assert!(result); - assert!(kp.public().verify(&data, &signature).is_ok()); + assert!(result, "sig.verify result is false"); + assert!( + kp.public().verify(&data, &signature).is_ok(), + "kp verify failed" + ); } else { panic!("incorrect args: expected three arguments") } } +*/ #[tokio::test] async fn sign_invalid_tetraplets() { - let swarms = make_swarms_with_builtins(2, "tests/builtins/services".as_ref(), None, None).await; + let swarms = make_swarms_with_cfg(2, |mut cfg| { + cfg.enabled_system_services = vec![Registry]; + cfg + }) + .await; let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) .await @@ -1594,9 +1608,11 @@ async fn sign_invalid_tetraplets() { #[tokio::test] async fn sig_verify_invalid_signature() { - let kp = KeyPair::generate_ed25519(); - let swarms = - make_swarms_with_builtins(1, "tests/builtins/services".as_ref(), Some(kp), None).await; + let swarms = make_swarms_with_cfg(1, |mut cfg| { + cfg.enabled_system_services = vec![Registry]; + cfg + }) + .await; let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) .await @@ -1783,7 +1799,11 @@ async fn json_builtins() { #[tokio::test] async fn insecure_sign_verify() { let kp = KeyPair::from_secret_key(INSECURE_KEYPAIR_SEED.collect(), KeyFormat::Ed25519).unwrap(); - let swarms = make_swarms_with_builtins(1, "tests/builtins/services".as_ref(), None, None).await; + let swarms = make_swarms_with_cfg(1, |mut cfg| { + cfg.enabled_system_services = vec![Registry]; + cfg + }) + .await; let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) .await diff --git a/crates/nox-tests/tests/spells.rs b/crates/nox-tests/tests/spells.rs index 0f7a37319d..df02f829c5 100644 --- a/crates/nox-tests/tests/spells.rs +++ b/crates/nox-tests/tests/spells.rs @@ -24,7 +24,7 @@ use maplit::hashmap; use serde_json::{json, Value as JValue}; use connected_client::ConnectedClient; -use created_swarm::{make_swarms, make_swarms_with_builtins}; +use created_swarm::{make_swarms, make_swarms_with_cfg}; use fluence_spell_dtos::trigger_config::{ClockConfig, TriggerConfig}; use service_modules::load_module; use spell_event_bus::api::{TriggerInfo, TriggerInfoAqua, MAX_PERIOD_SEC}; @@ -407,11 +407,13 @@ async fn spell_install_fail_large_period() { let script = r#"(call %init_peer_id% ("peer" "identify") [] x)"#; let empty: HashMap = HashMap::new(); + let worker_id = create_worker(&mut client, None).await; // Note that when period is 0, the spell is executed only once let config = make_clock_config(MAX_PERIOD_SEC + 1, 1, 0); let data = hashmap! { + "worker_id" => json!(worker_id), "script" => json!(script.to_string()), "config" => json!(config), "client" => json!(client.peer_id.to_string()), @@ -422,7 +424,10 @@ async fn spell_install_fail_large_period() { .execute_particle( r#" (xor - (call relay ("spell" "install") [script data config] spell_id) + (seq + (call relay ("op" "noop") []) + (call worker_id ("spell" "install") [script data config] spell_id) + ) (call client ("return" "") [%last_error%.$.message]) )"#, data, @@ -451,26 +456,31 @@ async fn spell_install_fail_end_sec_past() { // Note that when period is 0, the spell is executed only once let config = make_clock_config(0, 10, 1); + let worker_id = create_worker(&mut client, None).await; let data = hashmap! { + "worker_id" => json!(worker_id), "script" => json!(script.to_string()), "config" => json!(config), "client" => json!(client.peer_id.to_string()), "relay" => json!(client.node.to_string()), "data" => json!(json!(empty).to_string()), }; + let result = client .execute_particle( r#" (xor - (call relay ("spell" "install") [script data config] spell_id) + (seq + (call relay ("op" "noop") []) + (call worker_id ("spell" "install") [script data config] spell_id) + ) (call client ("return" "") [%last_error%.$.message]) )"#, data.clone(), ) .await .unwrap(); - if let [JValue::String(error_msg)] = result.as_slice() { let expected = "Local service error, ret_code is 1, error message is '\"Error: invalid config: end_sec is less than start_sec or in the past"; assert!( @@ -500,8 +510,10 @@ async fn spell_install_fail_end_sec_before_start() { // Note that when period is 0, the spell is executed only once let config = make_clock_config(0, now as u32 + 100, now as u32 + 90); + let worker_id = create_worker(&mut client, None).await; let data = hashmap! { + "worker_id" => json!(worker_id), "script" => json!(script.to_string()), "config" => json!(config), "client" => json!(client.peer_id.to_string()), @@ -512,7 +524,10 @@ async fn spell_install_fail_end_sec_before_start() { .execute_particle( r#" (xor - (call relay ("spell" "install") [script data config] spell_id) + (seq + (call relay ("op" "noop") []) + (call worker_id ("spell" "install") [script data config] spell_id) + ) (call client ("return" "") [%last_error%.$.message]) )"#, data.clone(), @@ -1339,7 +1354,12 @@ async fn resolve_global_alias() { #[tokio::test] async fn worker_sig_test() { - let swarms = make_swarms_with_builtins(1, "tests/builtins/services".as_ref(), None, None).await; + let swarms = make_swarms_with_cfg(1, |mut cfg| { + cfg.enabled_system_services = + vec![server_config::system_services_config::ServiceKey::Registry]; + cfg + }) + .await; let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) .await diff --git a/crates/server-config/src/defaults.rs b/crates/server-config/src/defaults.rs index 50a08c8ad5..e849a5fa1d 100644 --- a/crates/server-config/src/defaults.rs +++ b/crates/server-config/src/defaults.rs @@ -27,6 +27,7 @@ use libp2p::PeerId; use fluence_libp2p::Transport; use crate::node_config::{KeypairConfig, PathOrValue}; +use crate::system_services_config::ServiceKey; const CONFIG_VERSION: usize = 1; @@ -204,3 +205,33 @@ pub fn default_max_builtin_metrics_storage_size() -> usize { pub fn default_allowed_binaries() -> Vec { vec!["/usr/bin/curl".to_string(), "/usr/bin/ipfs".to_string()] } + +pub fn default_system_services() -> Vec { + ServiceKey::all_values() +} + +pub fn default_ipfs_multiaddr() -> String { + "/dns4/ipfs.fluence.dev/tcp/5001".to_string() +} + +// 15 minutes +pub fn default_worker_spell_period_sec() -> u32 { + 900 +} + +// 2 minutes +pub fn default_decider_spell_period_sec() -> u32 { + 120 +} + +pub fn default_deal_network_api_endpoint() -> String { + "https://testnet.aurora.dev".to_string() +} + +pub fn default_deal_contract_address_hex() -> String { + "0xb497e025D3095A197E30Ca84DEc36a637E649868".to_string() +} + +pub fn default_deal_contract_block_hex() -> String { + "latest".to_string() +} diff --git a/crates/server-config/src/lib.rs b/crates/server-config/src/lib.rs index 2ffa2c4cac..ee970e378e 100644 --- a/crates/server-config/src/lib.rs +++ b/crates/server-config/src/lib.rs @@ -38,6 +38,7 @@ mod network_config; mod node_config; mod resolved_config; mod services_config; +pub mod system_services_config; pub use defaults::{builtins_base_dir, *}; pub use resolved_config::load_config; @@ -54,3 +55,4 @@ pub use resolved_config::LogFormat; pub use resolved_config::TracingConfig; pub use resolved_config::{ResolvedConfig, UnresolvedConfig}; pub use services_config::ServicesConfig; +pub use system_services_config::{AquaIpfsConfig, DeciderConfig, SystemServicesConfig}; diff --git a/crates/server-config/src/node_config.rs b/crates/server-config/src/node_config.rs index 91bb3ab4db..98cfd160fb 100644 --- a/crates/server-config/src/node_config.rs +++ b/crates/server-config/src/node_config.rs @@ -19,6 +19,7 @@ use fs_utils::to_abs_path; use particle_protocol::ProtocolConfig; use crate::keys::{decode_key, decode_secret_key, load_key}; +use crate::system_services_config::{ServiceKey, SystemServicesConfig}; use crate::{BootstrapConfig, KademliaConfig}; use super::defaults::*; @@ -140,10 +141,15 @@ pub struct UnresolvedNodeConfig { #[serde(default = "default_allowed_binaries")] pub allowed_binaries: Vec, + + #[serde(default)] + pub system_services: SystemServicesConfig, } impl UnresolvedNodeConfig { - pub fn resolve(self) -> eyre::Result { + pub fn resolve(mut self) -> eyre::Result { + self.load_system_services_envs(); + let bootstrap_nodes = match self.local { Some(true) => vec![], _ => self.bootstrap_nodes, @@ -192,10 +198,69 @@ impl UnresolvedNodeConfig { transport_config: self.transport_config, listen_config: self.listen_config, allowed_binaries: self.allowed_binaries, + system_services: self.system_services, }; Ok(result) } + + // This is a temporary solution to save backward compatibility for some time + // Couldn't figure out how to use layered configs for this + // Print warning not to forget to fix it in the future + fn load_system_services_envs(&mut self) { + if let Ok(aqua_ipfs_external_addr) = + std::env::var("FLUENCE_ENV_AQUA_IPFS_EXTERNAL_API_MULTIADDR") + { + self.system_services.aqua_ipfs.external_api_multiaddr = aqua_ipfs_external_addr; + log::warn!( + "Override configuration of aqua-ipfs system service (external multiaddr) from ENV" + ); + } + + if let Ok(aqua_ipfs_local_addr) = std::env::var("FLUENCE_ENV_AQUA_IPFS_LOCAL_API_MULTIADDR") + { + self.system_services.aqua_ipfs.local_api_multiaddr = aqua_ipfs_local_addr; + log::warn!( + "Override configuration of aqua-ipfs system service (local multiaddr) from ENV" + ); + } + + if let Ok(enable_decider) = std::env::var("FLUENCE_ENV_CONNECTOR_JOIN_ALL_DEALS") { + match enable_decider.as_str() { + "true" => { + self.system_services.enable.push(ServiceKey::Decider); + log::warn!( + "Override configuration of system services (enable decider) from ENV" + ); + } + "false" => { + self.system_services + .enable + .retain(|key| *key != ServiceKey::Decider); + log::warn!( + "Override configuration of system services (disable decider) from ENV" + ); + } + _ => {} + } + } + if let Ok(decider_api_endpoint) = std::env::var("FLUENCE_ENV_CONNECTOR_API_ENDPOINT") { + self.system_services.decider.network_api_endpoint = decider_api_endpoint; + log::warn!("Override configuration of decider system spell (api endpoint) from ENV"); + } + + if let Ok(decider_contract_addr) = std::env::var("FLUENCE_ENV_CONNECTOR_CONTRACT_ADDRESS") { + self.system_services.decider.contract_address_hex = decider_contract_addr; + log::warn!( + "Override configuration of decider system spell (contract address) from ENV" + ); + } + + if let Ok(decider_from_block) = std::env::var("FLUENCE_ENV_CONNECTOR_FROM_BLOCK") { + self.system_services.decider.contract_block_hex = decider_from_block; + log::warn!("Override configuration of decider system spell (from block) from ENV"); + } + } } #[derive(Clone, Derivative)] @@ -275,6 +340,8 @@ pub struct NodeConfig { pub management_peer_id: PeerId, pub allowed_binaries: Vec, + + pub system_services: SystemServicesConfig, } #[derive(Clone, Deserialize, Serialize, Derivative, Copy)] diff --git a/crates/server-config/src/system_services_config.rs b/crates/server-config/src/system_services_config.rs new file mode 100644 index 0000000000..e48045903b --- /dev/null +++ b/crates/server-config/src/system_services_config.rs @@ -0,0 +1,116 @@ +/* + * Copyright 2023 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::defaults::*; +use serde::{Deserialize, Serialize}; +use std::fmt::Formatter; + +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)] +#[serde(rename_all = "kebab-case")] +pub enum ServiceKey { + AquaIpfs, + TrustGraph, + Registry, + Decider, +} + +impl ServiceKey { + pub fn all_values() -> Vec { + vec![ + Self::AquaIpfs, + Self::TrustGraph, + Self::Registry, + Self::Decider, + ] + } +} + +impl std::fmt::Display for ServiceKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::AquaIpfs => write!(f, "aqua-ipfs"), + Self::TrustGraph => write!(f, "trust-graph"), + Self::Registry => write!(f, "registry"), + Self::Decider => write!(f, "decider"), + } + } +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct SystemServicesConfig { + #[serde(default = "default_system_services")] + pub enable: Vec, + #[serde(default)] + pub aqua_ipfs: AquaIpfsConfig, + #[serde(default)] + pub decider: DeciderConfig, +} + +impl Default for SystemServicesConfig { + fn default() -> Self { + Self { + enable: default_system_services(), + aqua_ipfs: Default::default(), + decider: Default::default(), + } + } +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct AquaIpfsConfig { + #[serde(default = "default_ipfs_multiaddr")] + pub external_api_multiaddr: String, + #[serde(default = "default_ipfs_multiaddr")] + pub local_api_multiaddr: String, +} + +impl Default for AquaIpfsConfig { + fn default() -> Self { + Self { + external_api_multiaddr: default_ipfs_multiaddr(), + local_api_multiaddr: default_ipfs_multiaddr(), + } + } +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct DeciderConfig { + #[serde(default = "default_decider_spell_period_sec")] + pub decider_period_sec: u32, + #[serde(default = "default_worker_spell_period_sec")] + pub worker_period_sec: u32, + #[serde(default = "default_ipfs_multiaddr")] + pub worker_ipfs_multiaddr: String, + #[serde(default = "default_deal_network_api_endpoint")] + pub network_api_endpoint: String, + #[serde(default = "default_deal_contract_address_hex")] + pub contract_address_hex: String, + #[serde(default = "default_deal_contract_block_hex")] + pub contract_block_hex: String, +} + +impl Default for DeciderConfig { + fn default() -> Self { + Self { + decider_period_sec: default_decider_spell_period_sec(), + worker_period_sec: default_worker_spell_period_sec(), + worker_ipfs_multiaddr: default_ipfs_multiaddr(), + network_api_endpoint: default_deal_network_api_endpoint(), + contract_address_hex: default_deal_contract_address_hex(), + contract_block_hex: default_deal_contract_block_hex(), + } + } +} diff --git a/crates/system-services/Cargo.toml b/crates/system-services/Cargo.toml new file mode 100644 index 0000000000..b367020fb6 --- /dev/null +++ b/crates/system-services/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "system-services" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +aqua-ipfs-distro = "=0.5.14" +decider-distro = "=0.4.15" +registry-distro = "=0.8.5" +trust-graph-distro = "=0.4.5" + +maplit = { workspace = true } +toml = { workspace = true } +fluence-app-service = { workspace = true } +particle-modules = { workspace = true } +particle-services = { workspace = true } +eyre = {workspace = true} +libp2p = { workspace = true } +service-modules = { workspace = true } +log = { workspace = true } +serde_json = { workspace = true } +serde = { workspace = true } +fluence-spell-dtos = { workspace = true } +sorcerer = { workspace = true } +particle-execution = { workspace = true } +particle-args = { workspace = true } +spell-storage = { workspace = true } +spell-event-bus = { workspace = true } +now-millis = { workspace = true } +server-config = { workspace = true } +tracing = { workspace = true } + + diff --git a/crates/system-services/src/deployer.rs b/crates/system-services/src/deployer.rs new file mode 100644 index 0000000000..e1c023190b --- /dev/null +++ b/crates/system-services/src/deployer.rs @@ -0,0 +1,573 @@ +use eyre::eyre; +use fluence_app_service::TomlMarineConfig; +use fluence_spell_dtos::trigger_config::TriggerConfig; +use libp2p::PeerId; +use particle_execution::FunctionOutcome; +use particle_modules::{AddBlueprint, ModuleRepository}; +use particle_services::{ParticleAppServices, ServiceError, ServiceType}; +use serde_json::{json, Value as JValue}; +use server_config::system_services_config::ServiceKey::*; +use server_config::{system_services_config::ServiceKey, DeciderConfig, SystemServicesConfig}; +use sorcerer::{get_spell_info, install_spell, remove_spell}; +use spell_event_bus::api::SpellEventBusApi; +use spell_storage::SpellStorage; +use std::collections::{HashMap, HashSet}; +use std::time::Duration; + +const DEPLOYER_TTL: u64 = 60_000; + +const DEPLOYER_PARTICLE_ID: &str = "system-services-deployment"; + +// A status of a service/spell after deployment +enum ServiceStatus { + // Id of a newly created service + Created(String), + // Id of a already existing service + Existing(String), +} + +// Status of the service or spell before deployment +enum ServiceUpdateStatus { + // A service is found and we need to update it + NeedUpdate(String), + // A service is found and it's up to date + NoUpdate(String), + // A service isn't found + NotFound, +} + +// This is supposed to be in a separate lib for all system services crates +struct ServiceDistro { + modules: HashMap<&'static str, &'static [u8]>, + config: &'static [u8], + name: String, +} + +struct SpellDistro { + name: String, + air: &'static str, + kv: HashMap<&'static str, JValue>, + trigger_config: TriggerConfig, +} + +pub struct Deployer { + // These fields are used for deploying system services + services: ParticleAppServices, + modules_repo: ModuleRepository, + // These fields are used for deploying system spells + spell_storage: SpellStorage, + spell_event_bus_api: SpellEventBusApi, + // These fields are used for deploying services and spells from the node name + root_worker_id: PeerId, + management_id: PeerId, + + config: SystemServicesConfig, +} + +impl Deployer { + pub fn new( + services: ParticleAppServices, + modules_repo: ModuleRepository, + spell_storage: SpellStorage, + spell_event_bus_api: SpellEventBusApi, + root_worker_id: PeerId, + management_id: PeerId, + config: SystemServicesConfig, + ) -> Self { + Self { + services, + modules_repo, + spell_storage, + spell_event_bus_api, + root_worker_id, + management_id, + config, + } + } + + async fn deploy_system_service(&self, key: &ServiceKey) -> eyre::Result<()> { + match key { + AquaIpfs => self.deploy_aqua_ipfs(), + TrustGraph => self.deploy_trust_graph(), + Registry => self.deploy_registry().await, + Decider => { + self.deploy_connector()?; + self.deploy_decider().await + } + } + } + + pub async fn deploy_system_services(&self) -> eyre::Result<()> { + let services = &self.config.enable.iter().collect::>(); + for service in services { + self.deploy_system_service(service).await?; + } + Ok(()) + } + + fn deploy_aqua_ipfs(&self) -> eyre::Result<()> { + let aqua_ipfs_distro = Self::get_ipfs_service_distro(); + let service_name = aqua_ipfs_distro.name.clone(); + let service_id = match self.deploy_service_common(aqua_ipfs_distro)? { + ServiceStatus::Existing(_id) => { + return Ok(()); + } + ServiceStatus::Created(id) => id, + }; + + let set_local_result = self.call_service( + &service_name, + "set_local_api_multiaddr", + vec![json!(self.config.aqua_ipfs.local_api_multiaddr)], + ); + + let set_external_result = self.call_service( + &service_name, + "set_external_api_multiaddr", + vec![json!(self.config.aqua_ipfs.external_api_multiaddr)], + ); + + // try to set local and external api multiaddrs, and only then produce an error + set_local_result?; + set_external_result?; + + log::info!("initialized `aqua-ipfs` [{}] service", service_id); + + Ok(()) + } + + fn deploy_connector(&self) -> eyre::Result<()> { + let connector_distro = Self::get_connector_distro(); + self.deploy_service_common(connector_distro)?; + Ok(()) + } + + async fn deploy_decider(&self) -> eyre::Result<()> { + let decider_distro = Self::get_decider_distro(self.config.decider.clone()); + self.deploy_system_spell(decider_distro).await?; + Ok(()) + } + + async fn deploy_registry(&self) -> eyre::Result<()> { + let (registry_distro, registry_spell_distros) = Self::get_registry_distro(); + let _deployed = self.deploy_service_common(registry_distro)?; + + for spell_distro in registry_spell_distros { + self.deploy_system_spell(spell_distro).await?; + } + Ok(()) + } + + fn deploy_trust_graph(&self) -> eyre::Result<()> { + let service_distro = Self::get_trust_graph_distro(); + let service_name = service_distro.name.clone(); + let service_id = match self.deploy_service_common(service_distro)? { + ServiceStatus::Existing(_id) => { + return Ok(()); + } + ServiceStatus::Created(id) => id, + }; + + let certs = &trust_graph_distro::KRAS_CERTS; + + self.call_service( + &service_name, + "set_root", + vec![json!(certs.root_node), json!(certs.max_chain_length)], + )?; + + let timestamp = now_millis::now_sec(); + for cert_chain in &certs.certs { + self.call_service( + &service_name, + "insert_cert", + vec![json!(cert_chain), json!(timestamp)], + )?; + } + log::info!("initialized `{service_name}` [{service_id}] service"); + Ok(()) + } + + // The plan is to move this to the corresponding crates + fn get_trust_graph_distro() -> ServiceDistro { + use trust_graph_distro::*; + ServiceDistro { + modules: modules(), + config: CONFIG, + name: TrustGraph.to_string(), + } + } + + fn get_registry_distro() -> (ServiceDistro, Vec) { + use registry_distro::*; + + let distro = ServiceDistro { + modules: modules(), + config: CONFIG, + name: Registry.to_string(), + }; + let spells_distro = scripts() + .into_iter() + .map(|script| { + let mut trigger_config = TriggerConfig::default(); + trigger_config.clock.start_sec = 1; + trigger_config.clock.period_sec = script.period_sec; + SpellDistro { + name: script.name.to_string(), + air: script.air, + kv: HashMap::new(), + trigger_config, + } + }) + .collect::<_>(); + (distro, spells_distro) + } + + fn get_ipfs_service_distro() -> ServiceDistro { + use aqua_ipfs_distro::*; + ServiceDistro { + modules: modules(), + config: CONFIG, + name: AquaIpfs.to_string(), + } + } + + fn get_connector_distro() -> ServiceDistro { + let connector_service_distro = decider_distro::connector_service_modules(); + ServiceDistro { + modules: connector_service_distro.modules, + config: connector_service_distro.config, + name: connector_service_distro.name.to_string(), + } + } + + fn get_decider_distro(decider_settings: DeciderConfig) -> SpellDistro { + let decider_config = decider_distro::DeciderConfig { + worker_period_sec: decider_settings.worker_period_sec, + worker_ipfs_multiaddr: decider_settings.worker_ipfs_multiaddr, + chain_network: decider_settings.network_api_endpoint, + chain_contract_addr: decider_settings.contract_address_hex, + chain_contract_block_hex: decider_settings.contract_block_hex, + }; + let decider_spell_distro = decider_distro::decider_spell(decider_config); + let mut decider_trigger_config = TriggerConfig::default(); + decider_trigger_config.clock.start_sec = 1; + decider_trigger_config.clock.period_sec = decider_settings.decider_period_sec; + SpellDistro { + name: Decider.to_string(), + air: decider_spell_distro.air, + kv: decider_spell_distro.kv, + trigger_config: decider_trigger_config, + } + } + + fn call_service( + &self, + service_id: &str, + function_name: &'static str, + args: Vec, + ) -> eyre::Result<()> { + let result = self.services.call_function( + self.root_worker_id, + service_id, + function_name, + args, + None, + self.root_worker_id, + Duration::from_millis(DEPLOYER_TTL), + ); + // similar to process_func_outcome in sorcerer/src/utils.rs, but that func is + // to specialized to spell specific + match result { + FunctionOutcome::Ok(result) => { + let call_result: Option> = try { + let result = result.as_object()?; + let is_success = result["success"].as_bool()?; + if !is_success { + if let Some(error) = result["error"].as_str() { + Err(eyre!( + "Call {service_id}.{function_name} returned error: {}", + error + )) + } else { + Err(eyre!("Call {service_id}.{function_name} returned error")) + } + } else { + Ok(()) + } + }; + call_result.unwrap_or_else(|| { + Err(eyre!( + "Call {service_id}.{function_name} return invalid result: {result}" + )) + }) + } + FunctionOutcome::NotDefined { .. } => { + Err(eyre!("Service {service_id} ({function_name}) not found")) + } + FunctionOutcome::Empty => Err(eyre!( + "Call {service_id}.{function_name} didn't return any result" + )), + FunctionOutcome::Err(err) => Err(eyre!(err)), + } + } + + async fn deploy_system_spell(&self, spell_distro: SpellDistro) -> eyre::Result { + let spell_name = spell_distro.name.clone(); + match self.find_same_spell(&spell_distro) { + ServiceUpdateStatus::NeedUpdate(spell_id) => { + tracing::debug!( + spell_name, + spell_id, + "found existing spell that needs to be updated; will remove tha old spell and deploy a new one", + ); + self.clean_old_spell(&spell_name, spell_id).await; + } + ServiceUpdateStatus::NoUpdate(spell_id) => { + tracing::debug!( + spell_name, + spell_id, + "found existing spell that don't need to be updated; will not update", + ); + return Ok(ServiceStatus::Existing(spell_id)); + } + ServiceUpdateStatus::NotFound => {} + } + let spell_id = self.deploy_spell_common(spell_distro).await?; + tracing::info!(spell_name, spell_id, "deployed a system spell",); + Ok(ServiceStatus::Created(spell_id)) + } + + async fn clean_old_spell(&self, spell_name: &str, spell_id: String) { + let result = remove_spell( + DEPLOYER_PARTICLE_ID, + &self.spell_storage, + &self.services, + &self.spell_event_bus_api, + spell_id.clone(), + self.root_worker_id, + ) + .await; + if let Err(err) = result { + tracing::error!( + spell_name, + spell_id, + "Failed to remove old spell (trying to stop it): {err}", + ); + + let empty_config = TriggerConfig::default(); + // Stop old spell + let result: eyre::Result<_> = try { + // Stop the spell to avoid re-subscription + self.call_service(&spell_id, "set_trigger_config", vec![json!(empty_config)])?; + + // Unsubscribe spell from execution + self.spell_event_bus_api.unsubscribe(spell_id.clone()).await + }; + if let Err(err) = result { + tracing::error!( + spell_name, + spell_id, + "couldn't stop the old spell (will install new spell nevertheless): {err}", + ); + } + } + } + + async fn deploy_spell_common(&self, spell_distro: SpellDistro) -> eyre::Result { + let spell_id = install_spell( + &self.services, + &self.spell_storage, + &self.spell_event_bus_api, + self.root_worker_id, + DEPLOYER_PARTICLE_ID.to_string(), + DEPLOYER_TTL, + spell_distro.trigger_config, + spell_distro.air.to_string(), + json!(spell_distro.kv), + ) + .await + .map_err(|e| eyre!(e))?; + self.services.add_alias( + spell_distro.name.to_string(), + self.root_worker_id, + spell_id.clone(), + self.management_id, + )?; + Ok(spell_id) + } + + // Two spells are the same if + // - they have the same alias + // + // Need to redeploy (stop the old one, create a new one) a spell if + // - the script is different + // - the trigger config is different + fn find_same_spell(&self, new_spell: &SpellDistro) -> ServiceUpdateStatus { + let existing_spell = + self.services + .get_service_info("", self.root_worker_id, new_spell.name.to_string()); + let spell = match existing_spell { + Ok(spell) => spell, + Err(ServiceError::NoSuchService(_)) => { + log::debug!("no existing spell found for {}", new_spell.name); + return ServiceUpdateStatus::NotFound; + } + Err(err) => { + log::error!( + "can't obtain details on a spell `{}` (will create a new one): {err}", + new_spell.name + ); + return ServiceUpdateStatus::NotFound; + } + }; + if spell.service_type != ServiceType::Spell { + log::warn!( + "alias `{}` already used for a service [{}]; it will be used for a spell, the service won't be removed", + new_spell.name, + spell.id + ); + return ServiceUpdateStatus::NotFound; + } + + // Request a script and a trigger config from the spell + let spell_info = get_spell_info( + &self.services, + self.root_worker_id, + DEPLOYER_TTL, + spell.id.clone(), + ); + let spell_info = match spell_info { + Err(err) => { + log::error!( + "can't obtain details on existing spell {} (will try to update nevertheless): {err}", + new_spell.name + ); + return ServiceUpdateStatus::NeedUpdate(spell.id); + } + Ok(s) => s, + }; + + if spell_info.script != new_spell.air { + log::debug!( + "found old {} spell but with a different script; updating the spell", + new_spell.name + ); + return ServiceUpdateStatus::NeedUpdate(spell.id); + } + if spell_info.trigger_config != new_spell.trigger_config { + log::debug!( + "found old {} spell but with a different trigger config; updating the spell", + new_spell.name + ); + return ServiceUpdateStatus::NeedUpdate(spell.id); + } + + ServiceUpdateStatus::NoUpdate(spell.id) + } + + fn deploy_service_common(&self, service_distro: ServiceDistro) -> eyre::Result { + let service_name = service_distro.name.clone(); + let blueprint_id = self.add_modules(service_distro)?; + + match self.find_same_service(service_name.to_string(), &blueprint_id) { + ServiceUpdateStatus::NeedUpdate(service_id) => { + tracing::debug!(service_name, service_id, "found existing service that needs to be updated; will remove the olf service and deploy a new one"); + let result = self.services.remove_service( + DEPLOYER_PARTICLE_ID, + self.root_worker_id, + service_id.clone(), + self.root_worker_id, + false, + ); + if let Err(err) = result { + tracing::error!( + service_name, service_id, + "couldn't remove the old service (will install new service nevertheless): {err}", + ); + } + } + ServiceUpdateStatus::NoUpdate(service_id) => { + tracing::debug!( + service_name, + service_id, + "found existing service that don't need to be updated; will skip update" + ); + return Ok(ServiceStatus::Existing(service_id)); + } + ServiceUpdateStatus::NotFound => {} + } + + let service_id = self.services.create_service( + ServiceType::Service, + blueprint_id, + self.root_worker_id, + self.root_worker_id, + )?; + self.services.add_alias( + service_name.to_string(), + self.root_worker_id, + service_id.clone(), + self.management_id, + )?; + tracing::info!(service_name, service_id, "deployed a new service"); + Ok(ServiceStatus::Created(service_id)) + } + + fn find_same_service(&self, service_name: String, blueprint_id: &str) -> ServiceUpdateStatus { + // Check that the service exist and has the same blueprint. + // In this case, we don't create a new one. + let existing_service = + self.services + .get_service_info("", self.root_worker_id, service_name.to_string()); + if let Ok(service) = existing_service { + if service.service_type == ServiceType::Spell { + log::warn!( + "alias `{}` already used for a spell [{}]; it will be used for a new service, the spell won't be removed", + service_name, + service.id + ); + return ServiceUpdateStatus::NotFound; + } + + if service.blueprint_id == blueprint_id { + ServiceUpdateStatus::NoUpdate(service.id) + } else { + ServiceUpdateStatus::NeedUpdate(service.id) + } + } else { + ServiceUpdateStatus::NotFound + } + } + + fn add_modules(&self, service_distro: ServiceDistro) -> eyre::Result { + let marine_config: TomlMarineConfig = toml::from_slice(service_distro.config)?; + let mut hashes = Vec::new(); + for config in marine_config.module { + let name = config.name.clone(); + // TODO: introduce nice errors for this + let module = service_distro + .modules + .get(name.as_str()) + .ok_or(eyre!(format!( + "there's no module `{name}` in the given modules map for system service {}", + service_distro.name + )))?; + let hash = self + .modules_repo + .add_module(module.to_vec(), config) + .map_err(|e| { + eyre!( + "error while adding module {name} of service `{}`: {:?}", + service_distro.name, + e + ) + })?; + hashes.push(hash) + } + let blueprint_id = self + .modules_repo + .add_blueprint(AddBlueprint::new(service_distro.name, hashes))?; + Ok(blueprint_id) + } +} diff --git a/crates/system-services/src/lib.rs b/crates/system-services/src/lib.rs new file mode 100644 index 0000000000..b95f75d0b5 --- /dev/null +++ b/crates/system-services/src/lib.rs @@ -0,0 +1,5 @@ +#![feature(try_blocks)] + +mod deployer; + +pub use deployer::Deployer; diff --git a/nox/Cargo.toml b/nox/Cargo.toml index 7fd96941a2..6208d9e158 100644 --- a/nox/Cargo.toml +++ b/nox/Cargo.toml @@ -24,11 +24,11 @@ server-config = { workspace = true } config-utils = { workspace = true } kademlia = { workspace = true } air-interpreter-fs = { workspace = true } -builtins-deployer = { workspace = true } fs-utils = { workspace = true } peer-metrics = { workspace = true } spell-event-bus = { workspace = true } key-manager = { workspace = true } +system-services = { workspace = true } fluence-keypair = { workspace = true } diff --git a/nox/src/node.rs b/nox/src/node.rs index 5f5077135b..1b4fdee49c 100644 --- a/nox/src/node.rs +++ b/nox/src/node.rs @@ -14,16 +14,15 @@ * limitations under the License. */ +use eyre::WrapErr; use std::sync::Arc; use std::{io, net::SocketAddr}; use aquamarine::{ AquaRuntime, AquamarineApi, AquamarineApiError, AquamarineBackend, RoutingEffects, VmPoolConfig, }; -use builtins_deployer::BuiltinsDeployer; use config_utils::to_peer_id; use connection_pool::{ConnectionPoolApi, ConnectionPoolT}; -use eyre::WrapErr; use fluence_libp2p::build_transport; use futures::{stream::StreamExt, FutureExt}; use key_manager::KeyManager; @@ -51,6 +50,7 @@ use server_config::{NetworkConfig, ResolvedConfig, ServicesConfig}; use sorcerer::Sorcerer; use spell_event_bus::api::{PeerEvent, SpellEventBusApi, TriggerEvent}; use spell_event_bus::bus::SpellEventBus; +use system_services::Deployer; use tokio::sync::{mpsc, oneshot}; use tokio::task; @@ -74,7 +74,8 @@ pub struct Node { pub dispatcher: Dispatcher, aquavm_pool: AquamarineBackend>>, script_storage: ScriptStorageBackend, - builtins_deployer: BuiltinsDeployer, + system_service_deployer: Deployer, + spell_event_bus_api: SpellEventBusApi, spell_event_bus: SpellEventBus, spell_events_receiver: mpsc::UnboundedReceiver, @@ -243,16 +244,6 @@ impl Node { ) }; - let builtins_deployer = BuiltinsDeployer::new( - builtins_peer_id, - key_manager.get_host_peer_id(), - aquamarine_api.clone(), - config.dir_config.builtins_base_dir.clone(), - config.node_config.autodeploy_particle_ttl, - config.node_config.force_builtins_redeploy, - config.node_config.autodeploy_retry_attempts, - ); - let recv_connection_pool_events = connectivity.connection_pool.lifecycle_events(); let sources = vec![recv_connection_pool_events.map(PeerEvent::from).boxed()]; @@ -286,6 +277,9 @@ impl Node { } custom_service_functions.extend_one(make_peer_builtin(node_info)); + let services = builtins.services.clone(); + let modules = builtins.modules.clone(); + custom_service_functions.into_iter().for_each( move |( service_id, @@ -303,6 +297,17 @@ impl Node { }, ); + let system_services_config = config.system_services.clone(); + let system_services_deployer = Deployer::new( + services, + modules, + sorcerer.spell_storage.clone(), + spell_event_bus_api.clone(), + key_manager.get_host_peer_id(), + builtins_peer_id, + system_services_config, + ); + Ok(Self::with( particle_stream, effects_in, @@ -312,7 +317,7 @@ impl Node { dispatcher, aquavm_pool, script_storage_backend, - builtins_deployer, + system_services_deployer, spell_event_bus_api, spell_event_bus, spell_events_receiver, @@ -380,7 +385,7 @@ impl Node { dispatcher: Dispatcher, aquavm_pool: AquamarineBackend>>, script_storage: ScriptStorageBackend, - builtins_deployer: BuiltinsDeployer, + system_service_deployer: Deployer, spell_event_bus_api: SpellEventBusApi, spell_event_bus: SpellEventBus, spell_events_receiver: mpsc::UnboundedReceiver, @@ -403,7 +408,7 @@ impl Node { dispatcher, aquavm_pool, script_storage, - builtins_deployer, + system_service_deployer, spell_event_bus_api, spell_event_bus, spell_events_receiver, @@ -494,16 +499,18 @@ impl Node { pool.abort(); }).expect("Could not spawn task"); - let mut builtins_deployer = self.builtins_deployer; - builtins_deployer - .deploy_builtin_services() + // Note: need to be after the start of the node to be able to subscribe spells + let deployer = self.system_service_deployer; + deployer + .deploy_system_services() .await - .wrap_err("builtins deploy failed")?; + .context("deploying system services failed")?; - let result = self.spell_event_bus_api.start_scheduling().await; - if let Err(e) = result { - log::error!("running spell event bus failed: {}", e); - } + self.spell_event_bus_api + .start_scheduling() + .await + .map_err(|e| eyre::eyre!("{e}")) + .context("running spell event bus failed")?; Ok(exit_outlet) } @@ -553,6 +560,7 @@ mod tests { .expect("Could not resolve config"); config.aquavm_pool_size = 1; config.dir_config.spell_base_dir = to_abs_path(PathBuf::from("spell")); + config.system_services.enable = vec![]; let vm_config = VmConfig::new( to_peer_id(&config.root_key_pair.clone().into()), config.dir_config.avm_base_dir.clone(), diff --git a/particle-builtins/src/builtins.rs b/particle-builtins/src/builtins.rs index db8951f014..423b5215a5 100644 --- a/particle-builtins/src/builtins.rs +++ b/particle-builtins/src/builtins.rs @@ -841,7 +841,7 @@ where } fn list_services(&self, params: ParticleParams) -> JValue { - JValue::Array(self.services.list_services(params.host_id)) + Array(self.services.list_services(params.host_id)) } fn call_service(&self, function_args: Args, particle: ParticleParams) -> FunctionOutcome { @@ -883,7 +883,7 @@ where self.services .get_service_info(¶ms.id, params.host_id, service_id_or_alias)?; - Ok(info) + Ok(json!(info)) } fn kademlia(&self) -> &KademliaApi { diff --git a/particle-services/src/app_services.rs b/particle-services/src/app_services.rs index 69bccffd41..daf1cdc102 100644 --- a/particle-services/src/app_services.rs +++ b/particle-services/src/app_services.rs @@ -54,7 +54,7 @@ type ServiceAlias = String; type Services = HashMap; type Aliases = HashMap>; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum ServiceType { Service, @@ -302,7 +302,7 @@ impl ParticleAppServices { particle_id: &str, worker_id: PeerId, service_id_or_alias: String, - ) -> Result { + ) -> Result { let services_read = self.services.read(); let (service, service_id) = get_service( particle_id, @@ -313,7 +313,7 @@ impl ParticleAppServices { service_id_or_alias, )?; - Ok(json!(service.get_info(&service_id))) + Ok(service.get_info(&service_id)) } pub fn remove_services(&self, worker_id: PeerId) -> Result<(), ServiceError> { diff --git a/sorcerer/src/lib.rs b/sorcerer/src/lib.rs index e1c9c50ec9..434b10c6d5 100644 --- a/sorcerer/src/lib.rs +++ b/sorcerer/src/lib.rs @@ -1,6 +1,7 @@ #![feature(try_blocks)] #![feature(extend_one)] pub use sorcerer::Sorcerer; +pub use spell_builtins::{get_spell_info, install_spell, remove_spell, SpellInfo}; #[macro_use] extern crate fstrings; @@ -8,6 +9,6 @@ extern crate fstrings; mod error; mod script_executor; mod sorcerer; -mod spells; +mod spell_builtins; mod utils; mod worker_builins; diff --git a/sorcerer/src/sorcerer.rs b/sorcerer/src/sorcerer.rs index e572f2efad..84e70b438d 100644 --- a/sorcerer/src/sorcerer.rs +++ b/sorcerer/src/sorcerer.rs @@ -23,7 +23,7 @@ use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; -use crate::spells::{ +use crate::spell_builtins::{ get_spell_arg, get_spell_id, spell_install, spell_list, spell_remove, spell_update_config, store_error, store_response, }; diff --git a/sorcerer/src/spells.rs b/sorcerer/src/spell_builtins.rs similarity index 81% rename from sorcerer/src/spells.rs rename to sorcerer/src/spell_builtins.rs index 8a6723318d..4b988470a1 100644 --- a/sorcerer/src/spells.rs +++ b/sorcerer/src/spell_builtins.rs @@ -13,11 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use fluence_spell_dtos::value::{StringValue, UnitValue}; -use serde_json::{json, Value as JValue, Value::Array}; +use fluence_spell_dtos::value::{ScriptValue, SpellValueT, StringValue, UnitValue}; +use serde_json::{json, Value as JValue, Value, Value::Array}; use crate::utils::{parse_spell_id_from, process_func_outcome}; -use fluence_spell_dtos::trigger_config::TriggerConfig; +use fluence_spell_dtos::trigger_config::{TriggerConfig, TriggerConfigValue}; use key_manager::KeyManager; use libp2p::PeerId; use particle_args::{Args, JError}; @@ -28,7 +28,7 @@ use spell_event_bus::{api, api::SpellEventBusApi}; use spell_storage::SpellStorage; use std::time::Duration; -pub(crate) async fn remove_spell( +pub async fn remove_spell( particle_id: &str, spell_storage: &SpellStorage, services: &ParticleAppServices, @@ -50,34 +50,19 @@ pub(crate) async fn remove_spell( Ok(()) } -pub(crate) async fn spell_install( - sargs: Args, - params: ParticleParams, - spell_storage: SpellStorage, - services: ParticleAppServices, - spell_event_bus_api: SpellEventBusApi, - key_manager: KeyManager, -) -> Result { - let mut args = sargs.function_args.clone().into_iter(); - let script: String = Args::next("script", &mut args)?; - let init_data: JValue = Args::next("data", &mut args)?; - let user_config: TriggerConfig = Args::next("config", &mut args)?; +#[allow(clippy::too_many_arguments)] +pub async fn install_spell( + services: &ParticleAppServices, + spell_storage: &SpellStorage, + spell_event_bus_api: &SpellEventBusApi, + worker_id: PeerId, + particle_id: String, + ttl: u64, + user_config: TriggerConfig, + script: String, + init_data: Value, +) -> Result { let config = api::from_user_config(user_config.clone())?; - let init_peer_id = params.init_peer_id; - - let is_management = key_manager.is_management(init_peer_id); - if key_manager.is_host(params.host_id) && !is_management { - return Err(JError::new("Failed to install spell in the root scope, only management peer id can install top-level spells")); - } - - let worker_id = params.host_id; - let worker_creator = key_manager.get_worker_creator(params.host_id)?; - - let is_worker = init_peer_id == worker_id; - let is_worker_creator = init_peer_id == worker_creator; - if !is_management && !is_worker && !is_worker_creator { - return Err(JError::new(format!("Failed to install spell on {worker_id}, spell can be installed by worker creator {worker_creator}, worker itself {worker_id} or peer manager; init_peer_id={init_peer_id}"))); - } let spell_id = services.create_service( ServiceType::Spell, @@ -97,7 +82,7 @@ pub(crate) async fn spell_install( vec![json!(script)], None, worker_id, - Duration::from_millis(params.ttl as u64), + Duration::from_millis(ttl), ), &spell_id, "set_script_source_to_file", @@ -112,7 +97,7 @@ pub(crate) async fn spell_install( vec![json!(init_data.to_string())], None, worker_id, - Duration::from_millis(params.ttl as u64), + Duration::from_millis(ttl), ), &spell_id, "set_json_fields", @@ -127,7 +112,7 @@ pub(crate) async fn spell_install( vec![json!(user_config)], None, worker_id, - Duration::from_millis(params.ttl as u64), + Duration::from_millis(ttl), ), &spell_id, "set_trigger_config", @@ -142,13 +127,7 @@ pub(crate) async fn spell_install( log::warn!("can't subscribe a spell {} to triggers {:?} via spell-event-bus-api: {}. Removing created spell service...", spell_id, config, err); spell_storage.unregister_spell(worker_id, &spell_id); - services.remove_service( - ¶ms.id, - params.host_id, - spell_id, - params.init_peer_id, - true, - )?; + services.remove_service(&particle_id, worker_id, spell_id, worker_id, true)?; return Err(JError::new(format!( "can't install a spell due to an internal error while subscribing to the triggers: {err}" @@ -156,12 +135,111 @@ pub(crate) async fn spell_install( } } else { tracing::trace!( - particle_id = params.id, + particle_id = particle_id, "empty config given for spell {}", spell_id ); } + Ok(spell_id) +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct SpellInfo { + pub script: String, + pub trigger_config: TriggerConfig, +} + +pub fn get_spell_info( + services: &ParticleAppServices, + worker_id: PeerId, + ttl: u64, + spell_id: String, +) -> Result { + let trigger_config_value = process_func_outcome::( + services.call_function( + worker_id, + &spell_id, + "get_trigger_config", + vec![], + None, + worker_id, + Duration::from_millis(ttl), + ), + &spell_id, + "get_trigger_config", + ) + .map_err(|e| JError::new(f!("Failed to get trigger_config for spell {spell_id}: {e}")))?; + let trigger_config = if trigger_config_value.is_success() { + trigger_config_value.config + } else { + return Err(JError::new(trigger_config_value.error)); + }; + + let script_value = process_func_outcome::( + services.call_function( + worker_id, + &spell_id, + "get_script_source_from_file", + vec![], + None, + worker_id, + Duration::from_millis(ttl), + ), + &spell_id, + "get_script_source_from_file", + ) + .map_err(|e| JError::new(f!("Failed to get trigger_config for spell {spell_id}: {e}")))?; + if script_value.is_success() { + Ok(SpellInfo { + script: script_value.source_code, + trigger_config, + }) + } else { + Err(JError::new(script_value.error)) + } +} + +pub(crate) async fn spell_install( + sargs: Args, + params: ParticleParams, + spell_storage: SpellStorage, + services: ParticleAppServices, + spell_event_bus_api: SpellEventBusApi, + key_manager: KeyManager, +) -> Result { + let mut args = sargs.function_args.clone().into_iter(); + let script: String = Args::next("script", &mut args)?; + let init_data: JValue = Args::next("data", &mut args)?; + let user_config: TriggerConfig = Args::next("config", &mut args)?; + let init_peer_id = params.init_peer_id; + + let is_management = key_manager.is_management(init_peer_id); + if key_manager.is_host(params.host_id) && !is_management { + return Err(JError::new("Failed to install spell in the root scope, only management peer id can install top-level spells")); + } + + let worker_id = params.host_id; + let worker_creator = key_manager.get_worker_creator(params.host_id)?; + + let is_worker = init_peer_id == worker_id; + let is_worker_creator = init_peer_id == worker_creator; + if !is_management && !is_worker && !is_worker_creator { + return Err(JError::new(format!("Failed to install spell on {worker_id}, spell can be installed by worker creator {worker_creator}, worker itself {worker_id} or peer manager; init_peer_id={init_peer_id}"))); + } + + let spell_id = install_spell( + &services, + &spell_storage, + &spell_event_bus_api, + worker_id, + params.id, + params.ttl as u64, + user_config, + script, + init_data, + ) + .await?; Ok(JValue::String(spell_id)) } diff --git a/sorcerer/src/utils.rs b/sorcerer/src/utils.rs index 325a9f3b53..14c79628ab 100644 --- a/sorcerer/src/utils.rs +++ b/sorcerer/src/utils.rs @@ -22,7 +22,7 @@ use particle_execution::{FunctionOutcome, ParticleParams}; // TODO: change function name to the better one /// Return Ok(T) if result.success is true, return Err(T.error) otherwise -pub(crate) fn process_func_outcome( +pub fn process_func_outcome( func_outcome: FunctionOutcome, spell_id: &str, function_name: &str, diff --git a/sorcerer/src/worker_builins.rs b/sorcerer/src/worker_builins.rs index 95df747e53..976e273bb2 100644 --- a/sorcerer/src/worker_builins.rs +++ b/sorcerer/src/worker_builins.rs @@ -17,7 +17,7 @@ use fluence_libp2p::PeerId; use serde_json::Value as JValue; use std::str::FromStr; -use crate::spells::remove_spell; +use crate::spell_builtins::remove_spell; use key_manager::KeyManager; use particle_args::{Args, JError}; use particle_execution::ParticleParams;