From d7f0df6170be505cfc935f1df860df1f9dff675c Mon Sep 17 00:00:00 2001 From: Maria Kuklina Date: Tue, 13 Jun 2023 14:40:15 +0200 Subject: [PATCH] refactor service names usage --- Cargo.lock | 1 + crates/created-swarm/src/swarm.rs | 11 +- crates/nox-tests/Cargo.toml | 1 + crates/nox-tests/tests/builtin.rs | 7 +- crates/server-config/src/lib.rs | 2 +- .../src/system_services_config.rs | 35 +++- crates/system-services/src/lib.rs | 183 +++++++----------- 7 files changed, 120 insertions(+), 120 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb6bbdb08e..d3d30f5a44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4506,6 +4506,7 @@ dependencies = [ "script-storage", "serde", "serde_json", + "server-config", "service-modules", "sorcerer", "spell-event-bus", diff --git a/crates/created-swarm/src/swarm.rs b/crates/created-swarm/src/swarm.rs index 4d6c7de5a9..0c762a6f2c 100644 --- a/crates/created-swarm/src/swarm.rs +++ b/crates/created-swarm/src/swarm.rs @@ -225,11 +225,13 @@ pub struct SwarmConfig { pub spell_base_dir: Option, pub timer_resolution: Duration, pub allowed_binaries: Vec, - pub disabled_system_services: Vec, + pub disabled_system_services: Vec, } impl SwarmConfig { pub fn new(bootstraps: Vec, listen_on: Multiaddr) -> Self { + use server_config::system_services_config::ServiceKey; + let transport = match listen_on.iter().next() { Some(Protocol::Memory(_)) => Transport::Memory, _ => Transport::Network, @@ -246,12 +248,7 @@ impl SwarmConfig { spell_base_dir: None, timer_resolution: default_script_storage_timer_resolution(), allowed_binaries: vec!["/usr/bin/ipfs".to_string(), "/usr/bin/curl".to_string()], - disabled_system_services: vec![ - "decider".to_string(), - "aqua-ipfs".to_string(), - "registry".to_string(), - "trust-graph".to_string(), - ], + disabled_system_services: ServiceKey::all_values(), } } } diff --git a/crates/nox-tests/Cargo.toml b/crates/nox-tests/Cargo.toml index 819c0817ab..f858739dc8 100644 --- a/crates/nox-tests/Cargo.toml +++ b/crates/nox-tests/Cargo.toml @@ -20,6 +20,7 @@ now-millis = { path = "../now-millis" } local-vm = { path = "../local-vm" } control-macro = { path = "../control-macro" } json-utils = { path = "../json-utils" } +server-config = { workspace = true } log-utils = { workspace = true } fluence-spell-dtos = { workspace = true } diff --git a/crates/nox-tests/tests/builtin.rs b/crates/nox-tests/tests/builtin.rs index 0e2380fa13..a57f285d88 100644 --- a/crates/nox-tests/tests/builtin.rs +++ b/crates/nox-tests/tests/builtin.rs @@ -37,6 +37,7 @@ use now_millis::now_ms; use particle_protocol::Particle; use serde::Deserialize; use serde_json::{json, Value as JValue}; +use server_config::system_services_config::ServiceKey::Registry; use service_modules::load_module; use std::collections::HashMap; use std::str::FromStr; @@ -1540,7 +1541,7 @@ async fn sign_verify() { async fn sign_invalid_tetraplets() { let swarms = make_swarms_with_cfg(2, |mut cfg| { cfg.disabled_system_services - .retain(|service| service != "registry"); + .retain(|service| service != Registry); cfg }) .await; @@ -1610,7 +1611,7 @@ async fn sign_invalid_tetraplets() { async fn sig_verify_invalid_signature() { let swarms = make_swarms_with_cfg(1, |mut cfg| { cfg.disabled_system_services - .retain(|service| service != "registry"); + .retain(|service| service != Registry); cfg }) .await; @@ -1802,7 +1803,7 @@ async fn insecure_sign_verify() { let kp = KeyPair::from_secret_key(INSECURE_KEYPAIR_SEED.collect(), KeyFormat::Ed25519).unwrap(); let swarms = make_swarms_with_cfg(1, |mut cfg| { cfg.disabled_system_services - .retain(|service| service != "registry"); + .retain(|service| service != Registry); cfg }) .await; diff --git a/crates/server-config/src/lib.rs b/crates/server-config/src/lib.rs index a007a185e6..ee970e378e 100644 --- a/crates/server-config/src/lib.rs +++ b/crates/server-config/src/lib.rs @@ -38,7 +38,7 @@ mod network_config; mod node_config; mod resolved_config; mod services_config; -mod system_services_config; +pub mod system_services_config; pub use defaults::{builtins_base_dir, *}; pub use resolved_config::load_config; diff --git a/crates/server-config/src/system_services_config.rs b/crates/server-config/src/system_services_config.rs index a39d70a5e8..3de67e532e 100644 --- a/crates/server-config/src/system_services_config.rs +++ b/crates/server-config/src/system_services_config.rs @@ -16,11 +16,44 @@ use super::defaults::*; use serde::{Deserialize, Serialize}; +use std::fmt::Formatter; + +#[non_exhaustive] +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +pub enum ServiceKey { + AquaIpfs, + TrustGraph, + Registry, + Decider, +} + +impl ServiceKey { + pub fn all_values() -> Vec { + vec![ + Self::AquaIpfs, + Self::TrustGraph, + Self::Registry, + Self::Decider, + ] + } +} + +impl std::fmt::Display for ServiceKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::AquaIpfs => write!(f, "aqua-ipfs"), + Self::TrustGraph => write!(f, "trust-graph"), + Self::Registry => write!(f, "registry"), + Self::Decider => write!(f, "decider"), + } + } +} #[derive(Clone, Serialize, Deserialize, Debug, Default)] pub struct SystemServicesConfig { #[serde(default)] - pub disable: Vec, + pub disable: Vec, #[serde(default)] pub aqua_ipfs: AquaIpfsConfig, #[serde(default)] diff --git a/crates/system-services/src/lib.rs b/crates/system-services/src/lib.rs index d5ef208031..7e2359b21a 100644 --- a/crates/system-services/src/lib.rs +++ b/crates/system-services/src/lib.rs @@ -8,7 +8,8 @@ use particle_execution::FunctionOutcome; use particle_modules::{AddBlueprint, ModuleRepository}; use particle_services::{ParticleAppServices, ServiceError, ServiceType}; use serde_json::{json, Value as JValue}; -use server_config::{DeciderConfig, SystemServicesConfig}; +use server_config::system_services_config::ServiceKey::*; +use server_config::{system_services_config::ServiceKey, DeciderConfig, SystemServicesConfig}; use sorcerer::{get_spell_info, install_spell, remove_spell}; use spell_event_bus::api::SpellEventBusApi; use spell_storage::SpellStorage; @@ -41,11 +42,11 @@ enum ServiceUpdateStatus { struct ServiceDistro { modules: HashMap<&'static str, &'static [u8]>, config: &'static [u8], - name: &'static str, + name: String, } struct SpellDistro { - name: &'static str, + name: String, air: &'static str, kv: HashMap<&'static str, JValue>, trigger_config: TriggerConfig, @@ -86,66 +87,59 @@ impl Deployer { } } - pub async fn deploy_system_services(&self) -> Result<(), JError> { - /* - let deployers: HashMap<_, Box Result<(), JError>>> = maplit::hashmap! { - "aqua-ipfs" => Box::new(Self::deploy_aqua_ipfs), - "trust-graph" => Box::new(Self::deploy_trust_graph), - /* - "registry" => || self.deploy_registry(), - "decider" => || { self.deploy_connector()?; self.deploy_decider().await }, - */ - }; - - for (name, deployer) in deployers { - if !self.config.disable.contains(&name.to_string()) { - deployer()?; + async fn deploy_system_service(&self, key: &ServiceKey) -> Result<(), JError> { + match key { + AquaIpfs => self.deploy_aqua_ipfs(), + TrustGraph => self.deploy_trust_graph(), + Registry => self.deploy_registry().await, + Decider => { + self.deploy_connector()?; + self.deploy_decider().await + } + x => { + log::error!("installation of a service {x} is not implemented",); + Ok(()) } } - */ - - if !self.config.disable.contains(&"aqua-ipfs".to_string()) { - self.deploy_aqua_ipfs()?; - } - - if !self.config.disable.contains(&"trust-graph".to_string()) { - self.deploy_trust_graph()?; - } - if !self.config.disable.contains(&"registry".to_string()) { - self.deploy_registry().await?; - } + } - if !self.config.disable.contains(&"decider".to_string()) { - self.deploy_connector()?; - self.deploy_decider().await?; + pub async fn deploy_system_services(&self) -> Result<(), JError> { + let services_to_deploy = ServiceKey::all_values() + .into_iter() + .filter(|service| !self.config.disable.contains(&service)) + .collect::>(); + for service in services_to_deploy { + self.deploy_system_service(&service).await?; } Ok(()) } fn deploy_aqua_ipfs(&self) -> Result<(), JError> { let aqua_ipfs_distro = Self::get_ipfs_service_distro(); - let service_id = match self.deploy_system_service(aqua_ipfs_distro)? { - ServiceStatus::Existing(id) => { - log::info!("found `aqua-ipfs` [{id}] service, no need to redeploy"); + let service_name = aqua_ipfs_distro.name.clone(); + let service_id = match self.deploy_service_common(aqua_ipfs_distro)? { + ServiceStatus::Existing(_id) => { return Ok(()); } - ServiceStatus::Created(id) => { - log::info!("deployed `aqua-ipfs` [{id}] service"); - id - } + ServiceStatus::Created(id) => id, }; - self.call_service( - "aqua-ipfs", + let set_local_result = self.call_service( + &service_name, "set_local_api_multiaddr", vec![json!(self.config.aqua_ipfs.local_api_multiaddr)], - )?; + ); - self.call_service( - "aqua-ipfs", + let set_external_result = self.call_service( + &service_name, "set_external_api_multiaddr", vec![json!(self.config.aqua_ipfs.external_api_multiaddr)], - )?; + ); + + // try to set local and external api multiaddrs, and only then produce an error + set_local_result?; + set_external_result?; + log::info!("initialized `aqua-ipfs` [{}] service", service_id); Ok(()) @@ -153,77 +147,43 @@ impl Deployer { fn deploy_connector(&self) -> Result<(), JError> { let connector_distro = Self::get_connector_distro(); - let deployed = self.deploy_system_service(connector_distro)?; - match deployed { - ServiceStatus::Existing(id) => { - log::info!("found `fluence_aurora_connector` [{id}] service, no need to redeploy"); - } - ServiceStatus::Created(id) => { - log::info!("deployed `fluence_aurora_connector` [{id}] service"); - } - }; + let _deployed = self.deploy_service_common(connector_distro)?; Ok(()) } async fn deploy_decider(&self) -> Result<(), JError> { let decider_distro = Self::get_decider_distro(self.config.decider.clone()); - let deployed = self.deploy_system_spell(decider_distro).await?; - match deployed { - ServiceStatus::Existing(id) => { - log::info!("found `decider` [{id}] spell, no need to redeploy"); - } - ServiceStatus::Created(id) => { - log::info!("deployed `decider` [{id}] spell"); - } - }; + let _deployed = self.deploy_system_spell(decider_distro).await?; Ok(()) } async fn deploy_registry(&self) -> Result<(), JError> { let (registry_distro, registry_spell_distros) = Self::get_registry_distro(); - let deployed = self - .deploy_system_service(registry_distro) + let _deployed = self + .deploy_service_common(registry_distro) .map_err(|e| JError::new(e.to_string()))?; - match deployed { - ServiceStatus::Existing(id) => { - log::info!("found `registry` [{id}] service, no need to redeploy",); - } - ServiceStatus::Created(id) => { - log::info!("deployed `registry` [{id}] service"); - } - }; for spell_distro in registry_spell_distros { - let spell_name = spell_distro.name.to_string(); - let deployed = self.deploy_system_spell(spell_distro).await?; - match deployed { - ServiceStatus::Existing(id) => { - log::info!("found `{spell_name}` [{id}] spell, no need to redeploy"); - } - ServiceStatus::Created(id) => { - log::info!("deployed `{spell_name}` [{id}] spell"); - } - }; + let _deployed = self.deploy_system_spell(spell_distro).await?; } Ok(()) } fn deploy_trust_graph(&self) -> Result<(), JError> { let service_distro = Self::get_trust_graph_distro(); - let service_id = match self.deploy_system_service(service_distro)? { - ServiceStatus::Existing(id) => { - log::info!("found `trust-graph` [{id}] service, no need to redeploy",); + let service_name = service_distro.name.clone(); + let service_id = match self.deploy_service_common(service_distro)? { + ServiceStatus::Existing(_id) => { return Ok(()); } ServiceStatus::Created(id) => id, }; - log::info!("deployed `trust-graph` [{service_id}] service"); let certs = &trust_graph_distro::KRAS_CERTS; self.call_service( - "trust-graph", + &service_name, "set_root", vec![json!(certs.root_node), json!(certs.max_chain_length)], )?; @@ -231,12 +191,12 @@ impl Deployer { let timestamp = now_millis::now_sec(); for cert_chain in &certs.certs { self.call_service( - "trust-graph", + &service_name, "insert_cert", vec![json!(cert_chain), json!(timestamp)], )?; } - log::info!("initialized `trust-graph` [{service_id}] service"); + log::info!("initialized `{service_name}` [{service_id}] service"); Ok(()) } @@ -246,7 +206,7 @@ impl Deployer { ServiceDistro { modules: modules(), config: CONFIG, - name: "trust-graph", + name: TrustGraph.to_string(), } } @@ -256,7 +216,7 @@ impl Deployer { let distro = ServiceDistro { modules: modules(), config: CONFIG, - name: "registry", + name: Registry.to_string(), }; let spells_distro = scripts() .into_iter() @@ -265,7 +225,7 @@ impl Deployer { trigger_config.clock.start_sec = 1; trigger_config.clock.period_sec = script.period_sec; SpellDistro { - name: script.name, + name: script.name.to_string(), air: script.air, kv: HashMap::new(), trigger_config, @@ -280,7 +240,7 @@ impl Deployer { ServiceDistro { modules: modules(), config: CONFIG, - name: "aqua-ipfs", + name: AquaIpfs.to_string(), } } @@ -289,7 +249,7 @@ impl Deployer { ServiceDistro { modules: connector_service_distro.modules, config: connector_service_distro.config, - name: connector_service_distro.name, + name: connector_service_distro.name.to_string(), } } @@ -306,7 +266,7 @@ impl Deployer { decider_trigger_config.clock.start_sec = 1; decider_trigger_config.clock.period_sec = decider_settings.decider_period_sec; SpellDistro { - name: "decider", + name: Decider.to_string(), air: decider_spell_distro.air, kv: decider_spell_distro.kv, trigger_config: decider_trigger_config, @@ -370,17 +330,20 @@ impl Deployer { &self, spell_distro: SpellDistro, ) -> Result { + let spell_name = spell_distro.name.clone(); match self.find_same_spell(&spell_distro) { ServiceUpdateStatus::NeedUpdate(spell_id) => { - self.clean_old_spell(spell_distro.name, spell_id).await; + log::info!("found existing spell `{spell_name}` [{spell_id}]; will redeploy",); + self.clean_old_spell(&spell_name, spell_id).await; } ServiceUpdateStatus::NoUpdate(spell_id) => { + log::info!("found existing spell `{spell_name}` [{spell_id}]; no need to redeploy",); return Ok(ServiceStatus::Existing(spell_id)); } ServiceUpdateStatus::NotFound => {} } - let spell_id = self.deploy_new_system_spell(spell_distro).await?; - + let spell_id = self.deploy_spell_common(spell_distro).await?; + log::info!("deployed a system spell `{spell_name}` [{spell_id}]",); Ok(ServiceStatus::Created(spell_id)) } @@ -402,12 +365,11 @@ impl Deployer { let empty_config = TriggerConfig::default(); // Stop old spell let result: Result<_, JError> = try { - // Unsubscribe spell from execution - self.spell_event_bus_api - .unsubscribe(spell_id.clone()) - .await?; // Stop the spell to avoid re-subscription - self.call_service(&spell_id, "set_trigger_config", vec![json!(empty_config)]) + self.call_service(&spell_id, "set_trigger_config", vec![json!(empty_config)])?; + + // Unsubscribe spell from execution + self.spell_event_bus_api.unsubscribe(spell_id.clone()).await }; if let Err(err) = result { log::error!( @@ -417,7 +379,7 @@ impl Deployer { } } - async fn deploy_new_system_spell(&self, spell_distro: SpellDistro) -> Result { + async fn deploy_spell_common(&self, spell_distro: SpellDistro) -> Result { let spell_id = install_spell( &self.services, &self.spell_storage, @@ -508,17 +470,18 @@ impl Deployer { ServiceUpdateStatus::NoUpdate(spell.id) } - fn deploy_system_service( + fn deploy_service_common( &self, service_distro: ServiceDistro, ) -> Result { - let service_name = service_distro.name; + let service_name = service_distro.name.clone(); let blueprint_id = self .add_modules(service_distro) .map_err(|e| JError::new(e.to_string()))?; match self.find_same_service(service_name.to_string(), &blueprint_id) { ServiceUpdateStatus::NeedUpdate(service_id) => { + log::info!("found existing service {service_name} [{service_id}]; will redeploy"); let result = self.services.remove_service( DEPLOYER_PARTICLE_ID, self.root_worker_id, @@ -533,6 +496,9 @@ impl Deployer { } } ServiceUpdateStatus::NoUpdate(service_id) => { + log::info!( + "found existing service {service_name} [{service_id}]; no need to redeploy" + ); return Ok(ServiceStatus::Existing(service_id)); } ServiceUpdateStatus::NotFound => {} @@ -550,6 +516,7 @@ impl Deployer { service_id.clone(), self.management_id, )?; + log::info!("deployed service {service_name} [{service_id}]"); Ok(ServiceStatus::Created(service_id)) } @@ -562,7 +529,7 @@ impl Deployer { if let Ok(service) = existing_service { if service.service_type == ServiceType::Spell { log::warn!( - "alias `{}` already used for a spell [{}]; it will be used for a service, the spell won't be removed", + "alias `{}` already used for a spell [{}]; it will be used for a new service, the spell won't be removed", service_name, service.id );