Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add service type to distinguish services and spells #1567

Merged
merged 5 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions particle-builtins/src/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use particle_modules::{
AddBlueprint, ModuleConfig, ModuleRepository, NamedModuleConfig, WASIConfig,
};
use particle_protocol::Contact;
use particle_services::{ParticleAppServices, VIRTUAL_PARTICLE_VAULT_PREFIX};
use particle_services::{ParticleAppServices, ServiceType, VIRTUAL_PARTICLE_VAULT_PREFIX};
use peer_metrics::ServicesMetrics;
use script_storage::ScriptStorageApi;
use server_config::ServicesConfig;
Expand Down Expand Up @@ -819,9 +819,12 @@ where
let mut args = args.function_args.into_iter();
let blueprint_id: String = Args::next("blueprint_id", &mut args)?;

let service_id =
self.services
.create_service(blueprint_id, params.init_peer_id, params.host_id)?;
let service_id = self.services.create_service(
ServiceType::Service,
blueprint_id,
params.init_peer_id,
params.host_id,
)?;

Ok(JValue::String(service_id))
}
Expand Down
43 changes: 30 additions & 13 deletions particle-services/src/app_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use fluence_app_service::{
};
use humantime_serde::re::humantime::format_duration as pretty;
use parking_lot::{Mutex, RwLock};
use serde::Serialize;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value as JValue};

use fluence_libp2p::{peerid_serializer, PeerId};
Expand Down Expand Up @@ -52,14 +52,15 @@ type ServiceAlias = String;
type Services = HashMap<ServiceId, Service>;
type Aliases = HashMap<PeerId, HashMap<ServiceAlias, ServiceId>>;

#[derive(Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "lowercase")]
pub enum ServiceType {
Service,
Spell,
}

impl ServiceType {
fn is_spell(&self) -> bool {
pub fn is_spell(&self) -> bool {
matches!(self, ServiceType::Spell)
}
}
Expand All @@ -68,6 +69,7 @@ impl ServiceType {
pub struct ServiceInfo {
pub id: String,
pub blueprint_id: String,
pub service_type: ServiceType,
#[serde(with = "peerid_serializer")]
pub owner_id: PeerId,
pub aliases: Vec<ServiceAlias>,
Expand All @@ -82,7 +84,6 @@ pub struct Service {
pub service: Mutex<AppService>,
pub service_id: String,
pub blueprint_id: String,
// temp hack to detect if the service is a spell
pub service_type: ServiceType,
pub owner_id: PeerId,
pub aliases: Vec<ServiceAlias>,
Expand All @@ -109,6 +110,7 @@ impl Service {
worker_id,
}
}

pub fn persist(&self, services_dir: &Path) -> Result<(), ServiceError> {
persist_service(services_dir, PersistedService::from_service(self))
}
Expand All @@ -127,6 +129,7 @@ impl Service {
ServiceInfo {
id: id.to_string(),
blueprint_id: self.blueprint_id.clone(),
service_type: self.service_type.clone(),
owner_id: self.owner_id,
aliases: self.aliases.clone(),
worker_id: self.worker_id,
Expand Down Expand Up @@ -257,12 +260,14 @@ impl ParticleAppServices {

pub fn create_service(
&self,
service_type: ServiceType,
blueprint_id: String,
owner_id: PeerId,
worker_id: PeerId,
) -> Result<String, ServiceError> {
let service_id = uuid::Uuid::new_v4().to_string();
self.create_service_inner(
service_type,
blueprint_id,
owner_id,
worker_id,
Expand Down Expand Up @@ -775,7 +780,25 @@ impl ParticleAppServices {
for s in services {
let worker_id = s.worker_id.expect("every service must have worker id");
let start = Instant::now();
// If the service_type doesn't set in PersitedService, will try to find out if it's a spell by blueprint name
// This is mostly done for migration from the old detection method to the new.
let service_type = s.service_type.unwrap_or_else(|| {
let is_spell: Option<_> = try {
let blueprint_name = self
.modules
.get_blueprint_from_cache(&s.blueprint_id)
.ok()?
.name;
blueprint_name == "spell"
};
if is_spell.unwrap_or(false) {
ServiceType::Spell
} else {
ServiceType::Service
}
});
let result = self.create_service_inner(
service_type,
s.blueprint_id,
s.owner_id,
worker_id,
Expand Down Expand Up @@ -821,6 +844,7 @@ impl ParticleAppServices {

fn create_service_inner(
&self,
service_type: ServiceType,
blueprint_id: String,
owner_id: PeerId,
worker_id: PeerId,
Expand All @@ -843,14 +867,6 @@ impl ParticleAppServices {
let stats = service.module_memory_stats();
let stats = ServiceMemoryStat::new(&stats);

// Would be nice to determine that we create a spell service, but we don't have a reliable way to detect it yet
// so for now temp hack
let blueprint_name = self.modules.get_blueprint_from_cache(&blueprint_id)?.name;
let service_type = if blueprint_name == "spell" {
ServiceType::Spell
} else {
ServiceType::Service
};
let service = Service::new(
Mutex::new(service),
service_id.clone(),
Expand Down Expand Up @@ -923,6 +939,7 @@ mod tests {
use service_modules::load_module;
use service_modules::{Dependency, Hash};

use crate::app_services::ServiceType;
use crate::persistence::load_persisted_services;
use crate::{ParticleAppServices, ServiceError};

Expand Down Expand Up @@ -1001,7 +1018,7 @@ mod tests {
.add_blueprint(AddBlueprint::new(module_name, vec![dep]))
.unwrap();

pas.create_service(bp, RandomPeerId::random(), worker_id)
pas.create_service(ServiceType::Service, bp, RandomPeerId::random(), worker_id)
.map_err(|e| e.to_string())
}

Expand Down
1 change: 1 addition & 0 deletions particle-services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
pub use fluence_app_service::{IType, IValue};

pub use app_services::ParticleAppServices;
pub use app_services::ServiceType;

pub use crate::error::ServiceError;

Expand Down
3 changes: 3 additions & 0 deletions particle-services/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ use fs_utils::{create_dir, list_files};
use service_modules::{is_service, service_file_name};

use crate::ServiceError::{SerializePersistedService, WritePersistedService};
use crate::ServiceType;
use serde::{Deserialize, Serialize};
use std::path::Path;

// TODO: all fields could be references, but I don't know how to achieve that
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PersistedService {
pub service_id: String,
pub service_type: Option<ServiceType>,
pub blueprint_id: String,
#[serde(default)]
// Old versions of PersistedService may omit `aliases` field, tolerate that
Expand All @@ -50,6 +52,7 @@ impl PersistedService {
pub fn from_service(service: &Service) -> Self {
PersistedService {
service_id: service.service_id.clone(),
service_type: Some(service.service_type.clone()),
blueprint_id: service.blueprint_id.clone(),
aliases: service.aliases.clone(),
owner_id: service.owner_id,
Expand Down
9 changes: 7 additions & 2 deletions sorcerer/src/spells.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use key_manager::KeyManager;
use libp2p::PeerId;
use particle_args::{Args, JError};
use particle_execution::ParticleParams;
use particle_services::ParticleAppServices;
use particle_services::{ParticleAppServices, ServiceType};
use spell_event_bus::api::EventBusError;
use spell_event_bus::{api, api::SpellEventBusApi};
use spell_storage::SpellStorage;
Expand Down Expand Up @@ -78,7 +78,12 @@ pub(crate) async fn spell_install(
return Err(JError::new(format!("Failed to install spell on {worker_id}, spell can be installed by worker creator {worker_creator}, worker itself {worker_id} or peer manager; init_peer_id={init_peer_id}")));
}

let spell_id = services.create_service(spell_storage.get_blueprint(), worker_id, worker_id)?;
let spell_id = services.create_service(
ServiceType::Spell,
spell_storage.get_blueprint(),
worker_id,
worker_id,
)?;
spell_storage.register_spell(worker_id, spell_id.clone());

// TODO: refactor these service calls
Expand Down
20 changes: 4 additions & 16 deletions spell-storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type SpellId = String;
#[derive(Derivative)]
#[derivative(Debug, Clone)]
pub struct SpellStorage {
// The blueprint for the latest spell service.
// The blueprint for the latest spell service. It's used to create new spells
spell_blueprint_id: String,
// All currently existing spells
registered_spells: Arc<RwLock<HashMap<WorkerId, Vec<SpellId>>>>,
Expand All @@ -39,7 +39,7 @@ impl SpellStorage {
} else {
Self::load_spell_service_from_crate(modules)?
};
let registered_spells = Self::restore_spells(services, modules);
let registered_spells = Self::restore_spells(services);

Ok((
Self {
Expand Down Expand Up @@ -108,23 +108,11 @@ impl SpellStorage {
))
}

fn restore_spells(
services: &ParticleAppServices,
modules: &ModuleRepository,
) -> HashMap<WorkerId, Vec<SpellId>> {
// Find blueprint ids of the already existing spells. They might be of older versions of the spell service.
// These blueprint ids marked with name "spell" to differ from other blueprints.
let all_spell_blueprint_ids = modules
.get_blueprints()
.into_iter()
.filter(|blueprint| blueprint.name == "spell")
.map(|x| x.id)
.collect::<HashSet<_>>();
// Find already created spells by corresponding blueprint_ids.
fn restore_spells(services: &ParticleAppServices) -> HashMap<WorkerId, Vec<SpellId>> {
services
.list_services_with_info()
.into_iter()
.filter(|s| all_spell_blueprint_ids.contains(&s.blueprint_id))
.filter(|s| s.service_type.is_spell())
.map(|s| (s.worker_id, s.id))
.into_group_map()
}
Expand Down