Skip to content

Commit

Permalink
feat: add service type to distinguish services and spells (#1567)
Browse files Browse the repository at this point in the history
* set service type on service creation
* use service-type in spell storage
  • Loading branch information
kmd-fl committed Apr 14, 2023
1 parent a62e8a7 commit d80f89b
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 36 deletions.
11 changes: 7 additions & 4 deletions particle-builtins/src/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use particle_modules::{
AddBlueprint, ModuleConfig, ModuleRepository, NamedModuleConfig, WASIConfig,
};
use particle_protocol::Contact;
use particle_services::{ParticleAppServices, VIRTUAL_PARTICLE_VAULT_PREFIX};
use particle_services::{ParticleAppServices, ServiceType, VIRTUAL_PARTICLE_VAULT_PREFIX};
use peer_metrics::ServicesMetrics;
use script_storage::ScriptStorageApi;
use server_config::ServicesConfig;
Expand Down Expand Up @@ -819,9 +819,12 @@ where
let mut args = args.function_args.into_iter();
let blueprint_id: String = Args::next("blueprint_id", &mut args)?;

let service_id =
self.services
.create_service(blueprint_id, params.init_peer_id, params.host_id)?;
let service_id = self.services.create_service(
ServiceType::Service,
blueprint_id,
params.init_peer_id,
params.host_id,
)?;

Ok(JValue::String(service_id))
}
Expand Down
43 changes: 30 additions & 13 deletions particle-services/src/app_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use fluence_app_service::{
};
use humantime_serde::re::humantime::format_duration as pretty;
use parking_lot::{Mutex, RwLock};
use serde::Serialize;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value as JValue};

use fluence_libp2p::{peerid_serializer, PeerId};
Expand Down Expand Up @@ -52,14 +52,15 @@ type ServiceAlias = String;
type Services = HashMap<ServiceId, Service>;
type Aliases = HashMap<PeerId, HashMap<ServiceAlias, ServiceId>>;

#[derive(Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "lowercase")]
pub enum ServiceType {
Service,
Spell,
}

impl ServiceType {
fn is_spell(&self) -> bool {
pub fn is_spell(&self) -> bool {
matches!(self, ServiceType::Spell)
}
}
Expand All @@ -68,6 +69,7 @@ impl ServiceType {
pub struct ServiceInfo {
pub id: String,
pub blueprint_id: String,
pub service_type: ServiceType,
#[serde(with = "peerid_serializer")]
pub owner_id: PeerId,
pub aliases: Vec<ServiceAlias>,
Expand All @@ -82,7 +84,6 @@ pub struct Service {
pub service: Mutex<AppService>,
pub service_id: String,
pub blueprint_id: String,
// temp hack to detect if the service is a spell
pub service_type: ServiceType,
pub owner_id: PeerId,
pub aliases: Vec<ServiceAlias>,
Expand All @@ -109,6 +110,7 @@ impl Service {
worker_id,
}
}

pub fn persist(&self, services_dir: &Path) -> Result<(), ServiceError> {
persist_service(services_dir, PersistedService::from_service(self))
}
Expand All @@ -127,6 +129,7 @@ impl Service {
ServiceInfo {
id: id.to_string(),
blueprint_id: self.blueprint_id.clone(),
service_type: self.service_type.clone(),
owner_id: self.owner_id,
aliases: self.aliases.clone(),
worker_id: self.worker_id,
Expand Down Expand Up @@ -257,12 +260,14 @@ impl ParticleAppServices {

pub fn create_service(
&self,
service_type: ServiceType,
blueprint_id: String,
owner_id: PeerId,
worker_id: PeerId,
) -> Result<String, ServiceError> {
let service_id = uuid::Uuid::new_v4().to_string();
self.create_service_inner(
service_type,
blueprint_id,
owner_id,
worker_id,
Expand Down Expand Up @@ -775,7 +780,25 @@ impl ParticleAppServices {
for s in services {
let worker_id = s.worker_id.expect("every service must have worker id");
let start = Instant::now();
// If the service_type doesn't set in PersitedService, will try to find out if it's a spell by blueprint name
// This is mostly done for migration from the old detection method to the new.
let service_type = s.service_type.unwrap_or_else(|| {
let is_spell: Option<_> = try {
let blueprint_name = self
.modules
.get_blueprint_from_cache(&s.blueprint_id)
.ok()?
.name;
blueprint_name == "spell"
};
if is_spell.unwrap_or(false) {
ServiceType::Spell
} else {
ServiceType::Service
}
});
let result = self.create_service_inner(
service_type,
s.blueprint_id,
s.owner_id,
worker_id,
Expand Down Expand Up @@ -821,6 +844,7 @@ impl ParticleAppServices {

fn create_service_inner(
&self,
service_type: ServiceType,
blueprint_id: String,
owner_id: PeerId,
worker_id: PeerId,
Expand All @@ -843,14 +867,6 @@ impl ParticleAppServices {
let stats = service.module_memory_stats();
let stats = ServiceMemoryStat::new(&stats);

// Would be nice to determine that we create a spell service, but we don't have a reliable way to detect it yet
// so for now temp hack
let blueprint_name = self.modules.get_blueprint_from_cache(&blueprint_id)?.name;
let service_type = if blueprint_name == "spell" {
ServiceType::Spell
} else {
ServiceType::Service
};
let service = Service::new(
Mutex::new(service),
service_id.clone(),
Expand Down Expand Up @@ -923,6 +939,7 @@ mod tests {
use service_modules::load_module;
use service_modules::{Dependency, Hash};

use crate::app_services::ServiceType;
use crate::persistence::load_persisted_services;
use crate::{ParticleAppServices, ServiceError};

Expand Down Expand Up @@ -1001,7 +1018,7 @@ mod tests {
.add_blueprint(AddBlueprint::new(module_name, vec![dep]))
.unwrap();

pas.create_service(bp, RandomPeerId::random(), worker_id)
pas.create_service(ServiceType::Service, bp, RandomPeerId::random(), worker_id)
.map_err(|e| e.to_string())
}

Expand Down
1 change: 1 addition & 0 deletions particle-services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
pub use fluence_app_service::{IType, IValue};

pub use app_services::ParticleAppServices;
pub use app_services::ServiceType;

pub use crate::error::ServiceError;

Expand Down
3 changes: 3 additions & 0 deletions particle-services/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ use fs_utils::{create_dir, list_files};
use service_modules::{is_service, service_file_name};

use crate::ServiceError::{SerializePersistedService, WritePersistedService};
use crate::ServiceType;
use serde::{Deserialize, Serialize};
use std::path::Path;

// TODO: all fields could be references, but I don't know how to achieve that
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PersistedService {
pub service_id: String,
pub service_type: Option<ServiceType>,
pub blueprint_id: String,
#[serde(default)]
// Old versions of PersistedService may omit `aliases` field, tolerate that
Expand All @@ -50,6 +52,7 @@ impl PersistedService {
pub fn from_service(service: &Service) -> Self {
PersistedService {
service_id: service.service_id.clone(),
service_type: Some(service.service_type.clone()),
blueprint_id: service.blueprint_id.clone(),
aliases: service.aliases.clone(),
owner_id: service.owner_id,
Expand Down
9 changes: 7 additions & 2 deletions sorcerer/src/spells.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use key_manager::KeyManager;
use libp2p::PeerId;
use particle_args::{Args, JError};
use particle_execution::ParticleParams;
use particle_services::ParticleAppServices;
use particle_services::{ParticleAppServices, ServiceType};
use spell_event_bus::api::EventBusError;
use spell_event_bus::{api, api::SpellEventBusApi};
use spell_storage::SpellStorage;
Expand Down Expand Up @@ -78,7 +78,12 @@ pub(crate) async fn spell_install(
return Err(JError::new(format!("Failed to install spell on {worker_id}, spell can be installed by worker creator {worker_creator}, worker itself {worker_id} or peer manager; init_peer_id={init_peer_id}")));
}

let spell_id = services.create_service(spell_storage.get_blueprint(), worker_id, worker_id)?;
let spell_id = services.create_service(
ServiceType::Spell,
spell_storage.get_blueprint(),
worker_id,
worker_id,
)?;
spell_storage.register_spell(worker_id, spell_id.clone());

// TODO: refactor these service calls
Expand Down
22 changes: 5 additions & 17 deletions spell-storage/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;

Expand All @@ -20,7 +20,7 @@ type SpellId = String;
#[derive(Derivative)]
#[derivative(Debug, Clone)]
pub struct SpellStorage {
// The blueprint for the latest spell service.
// The blueprint for the latest spell service. It's used to create new spells
spell_blueprint_id: String,
// All currently existing spells
registered_spells: Arc<RwLock<HashMap<WorkerId, Vec<SpellId>>>>,
Expand All @@ -39,7 +39,7 @@ impl SpellStorage {
} else {
Self::load_spell_service_from_crate(modules)?
};
let registered_spells = Self::restore_spells(services, modules);
let registered_spells = Self::restore_spells(services);

Ok((
Self {
Expand Down Expand Up @@ -108,23 +108,11 @@ impl SpellStorage {
))
}

fn restore_spells(
services: &ParticleAppServices,
modules: &ModuleRepository,
) -> HashMap<WorkerId, Vec<SpellId>> {
// Find blueprint ids of the already existing spells. They might be of older versions of the spell service.
// These blueprint ids marked with name "spell" to differ from other blueprints.
let all_spell_blueprint_ids = modules
.get_blueprints()
.into_iter()
.filter(|blueprint| blueprint.name == "spell")
.map(|x| x.id)
.collect::<HashSet<_>>();
// Find already created spells by corresponding blueprint_ids.
fn restore_spells(services: &ParticleAppServices) -> HashMap<WorkerId, Vec<SpellId>> {
services
.list_services_with_info()
.into_iter()
.filter(|s| all_spell_blueprint_ids.contains(&s.blueprint_id))
.filter(|s| s.service_type.is_spell())
.map(|s| (s.worker_id, s.id))
.into_group_map()
}
Expand Down

0 comments on commit d80f89b

Please sign in to comment.