diff --git a/particle-builtins/src/builtins.rs b/particle-builtins/src/builtins.rs index 0a63a81dc2..dca20c8faa 100644 --- a/particle-builtins/src/builtins.rs +++ b/particle-builtins/src/builtins.rs @@ -42,7 +42,7 @@ use particle_modules::{ AddBlueprint, ModuleConfig, ModuleRepository, NamedModuleConfig, WASIConfig, }; use particle_protocol::Contact; -use particle_services::{ParticleAppServices, VIRTUAL_PARTICLE_VAULT_PREFIX}; +use particle_services::{ParticleAppServices, ServiceType, VIRTUAL_PARTICLE_VAULT_PREFIX}; use peer_metrics::ServicesMetrics; use script_storage::ScriptStorageApi; use server_config::ServicesConfig; @@ -819,9 +819,12 @@ where let mut args = args.function_args.into_iter(); let blueprint_id: String = Args::next("blueprint_id", &mut args)?; - let service_id = - self.services - .create_service(blueprint_id, params.init_peer_id, params.host_id)?; + let service_id = self.services.create_service( + ServiceType::Service, + blueprint_id, + params.init_peer_id, + params.host_id, + )?; Ok(JValue::String(service_id)) } diff --git a/particle-services/src/app_services.rs b/particle-services/src/app_services.rs index 1f156679ce..b838e21b5b 100644 --- a/particle-services/src/app_services.rs +++ b/particle-services/src/app_services.rs @@ -24,7 +24,7 @@ use fluence_app_service::{ }; use humantime_serde::re::humantime::format_duration as pretty; use parking_lot::{Mutex, RwLock}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use serde_json::{json, Value as JValue}; use fluence_libp2p::{peerid_serializer, PeerId}; @@ -52,14 +52,15 @@ type ServiceAlias = String; type Services = HashMap; type Aliases = HashMap>; -#[derive(Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "lowercase")] pub enum ServiceType { Service, Spell, } impl ServiceType { - fn is_spell(&self) -> bool { + pub fn is_spell(&self) -> bool { matches!(self, ServiceType::Spell) } } @@ -68,6 +69,7 @@ impl ServiceType { pub struct ServiceInfo { pub id: String, pub blueprint_id: String, + pub service_type: ServiceType, #[serde(with = "peerid_serializer")] pub owner_id: PeerId, pub aliases: Vec, @@ -82,7 +84,6 @@ pub struct Service { pub service: Mutex, pub service_id: String, pub blueprint_id: String, - // temp hack to detect if the service is a spell pub service_type: ServiceType, pub owner_id: PeerId, pub aliases: Vec, @@ -109,6 +110,7 @@ impl Service { worker_id, } } + pub fn persist(&self, services_dir: &Path) -> Result<(), ServiceError> { persist_service(services_dir, PersistedService::from_service(self)) } @@ -127,6 +129,7 @@ impl Service { ServiceInfo { id: id.to_string(), blueprint_id: self.blueprint_id.clone(), + service_type: self.service_type.clone(), owner_id: self.owner_id, aliases: self.aliases.clone(), worker_id: self.worker_id, @@ -257,12 +260,14 @@ impl ParticleAppServices { pub fn create_service( &self, + service_type: ServiceType, blueprint_id: String, owner_id: PeerId, worker_id: PeerId, ) -> Result { let service_id = uuid::Uuid::new_v4().to_string(); self.create_service_inner( + service_type, blueprint_id, owner_id, worker_id, @@ -775,7 +780,25 @@ impl ParticleAppServices { for s in services { let worker_id = s.worker_id.expect("every service must have worker id"); let start = Instant::now(); + // If the service_type doesn't set in PersitedService, will try to find out if it's a spell by blueprint name + // This is mostly done for migration from the old detection method to the new. + let service_type = s.service_type.unwrap_or_else(|| { + let is_spell: Option<_> = try { + let blueprint_name = self + .modules + .get_blueprint_from_cache(&s.blueprint_id) + .ok()? + .name; + blueprint_name == "spell" + }; + if is_spell.unwrap_or(false) { + ServiceType::Spell + } else { + ServiceType::Service + } + }); let result = self.create_service_inner( + service_type, s.blueprint_id, s.owner_id, worker_id, @@ -821,6 +844,7 @@ impl ParticleAppServices { fn create_service_inner( &self, + service_type: ServiceType, blueprint_id: String, owner_id: PeerId, worker_id: PeerId, @@ -843,14 +867,6 @@ impl ParticleAppServices { let stats = service.module_memory_stats(); let stats = ServiceMemoryStat::new(&stats); - // Would be nice to determine that we create a spell service, but we don't have a reliable way to detect it yet - // so for now temp hack - let blueprint_name = self.modules.get_blueprint_from_cache(&blueprint_id)?.name; - let service_type = if blueprint_name == "spell" { - ServiceType::Spell - } else { - ServiceType::Service - }; let service = Service::new( Mutex::new(service), service_id.clone(), @@ -923,6 +939,7 @@ mod tests { use service_modules::load_module; use service_modules::{Dependency, Hash}; + use crate::app_services::ServiceType; use crate::persistence::load_persisted_services; use crate::{ParticleAppServices, ServiceError}; @@ -1001,7 +1018,7 @@ mod tests { .add_blueprint(AddBlueprint::new(module_name, vec![dep])) .unwrap(); - pas.create_service(bp, RandomPeerId::random(), worker_id) + pas.create_service(ServiceType::Service, bp, RandomPeerId::random(), worker_id) .map_err(|e| e.to_string()) } diff --git a/particle-services/src/lib.rs b/particle-services/src/lib.rs index ad9de7fad7..bac5de086e 100644 --- a/particle-services/src/lib.rs +++ b/particle-services/src/lib.rs @@ -30,6 +30,7 @@ pub use fluence_app_service::{IType, IValue}; pub use app_services::ParticleAppServices; +pub use app_services::ServiceType; pub use crate::error::ServiceError; diff --git a/particle-services/src/persistence.rs b/particle-services/src/persistence.rs index 5f3ad83a73..35899914ba 100644 --- a/particle-services/src/persistence.rs +++ b/particle-services/src/persistence.rs @@ -25,6 +25,7 @@ use fs_utils::{create_dir, list_files}; use service_modules::{is_service, service_file_name}; use crate::ServiceError::{SerializePersistedService, WritePersistedService}; +use crate::ServiceType; use serde::{Deserialize, Serialize}; use std::path::Path; @@ -32,6 +33,7 @@ use std::path::Path; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct PersistedService { pub service_id: String, + pub service_type: Option, pub blueprint_id: String, #[serde(default)] // Old versions of PersistedService may omit `aliases` field, tolerate that @@ -50,6 +52,7 @@ impl PersistedService { pub fn from_service(service: &Service) -> Self { PersistedService { service_id: service.service_id.clone(), + service_type: Some(service.service_type.clone()), blueprint_id: service.blueprint_id.clone(), aliases: service.aliases.clone(), owner_id: service.owner_id, diff --git a/sorcerer/src/spells.rs b/sorcerer/src/spells.rs index 8eb1ebce0f..257e7453ed 100644 --- a/sorcerer/src/spells.rs +++ b/sorcerer/src/spells.rs @@ -22,7 +22,7 @@ use key_manager::KeyManager; use libp2p::PeerId; use particle_args::{Args, JError}; use particle_execution::ParticleParams; -use particle_services::ParticleAppServices; +use particle_services::{ParticleAppServices, ServiceType}; use spell_event_bus::api::EventBusError; use spell_event_bus::{api, api::SpellEventBusApi}; use spell_storage::SpellStorage; @@ -78,7 +78,12 @@ pub(crate) async fn spell_install( return Err(JError::new(format!("Failed to install spell on {worker_id}, spell can be installed by worker creator {worker_creator}, worker itself {worker_id} or peer manager; init_peer_id={init_peer_id}"))); } - let spell_id = services.create_service(spell_storage.get_blueprint(), worker_id, worker_id)?; + let spell_id = services.create_service( + ServiceType::Spell, + spell_storage.get_blueprint(), + worker_id, + worker_id, + )?; spell_storage.register_spell(worker_id, spell_id.clone()); // TODO: refactor these service calls diff --git a/spell-storage/src/storage.rs b/spell-storage/src/storage.rs index 0b82a86097..57a50c8756 100644 --- a/spell-storage/src/storage.rs +++ b/spell-storage/src/storage.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -20,7 +20,7 @@ type SpellId = String; #[derive(Derivative)] #[derivative(Debug, Clone)] pub struct SpellStorage { - // The blueprint for the latest spell service. + // The blueprint for the latest spell service. It's used to create new spells spell_blueprint_id: String, // All currently existing spells registered_spells: Arc>>>, @@ -39,7 +39,7 @@ impl SpellStorage { } else { Self::load_spell_service_from_crate(modules)? }; - let registered_spells = Self::restore_spells(services, modules); + let registered_spells = Self::restore_spells(services); Ok(( Self { @@ -108,23 +108,11 @@ impl SpellStorage { )) } - fn restore_spells( - services: &ParticleAppServices, - modules: &ModuleRepository, - ) -> HashMap> { - // Find blueprint ids of the already existing spells. They might be of older versions of the spell service. - // These blueprint ids marked with name "spell" to differ from other blueprints. - let all_spell_blueprint_ids = modules - .get_blueprints() - .into_iter() - .filter(|blueprint| blueprint.name == "spell") - .map(|x| x.id) - .collect::>(); - // Find already created spells by corresponding blueprint_ids. + fn restore_spells(services: &ParticleAppServices) -> HashMap> { services .list_services_with_info() .into_iter() - .filter(|s| all_spell_blueprint_ids.contains(&s.blueprint_id)) + .filter(|s| s.service_type.is_spell()) .map(|s| (s.worker_id, s.id)) .into_group_map() }