From ff455be54132a5b08977d91ef5bf67e6264d2c80 Mon Sep 17 00:00:00 2001 From: Aleksey Proshutisnkiy Date: Fri, 31 Mar 2023 13:21:40 +0400 Subject: [PATCH] fix: bug with repeated alias for service [NET-434] (#1536) --- particle-modules/src/error.rs | 6 - particle-services/src/app_service.rs | 52 ++---- particle-services/src/app_services.rs | 254 +++++++++++++++++--------- particle-services/src/error.rs | 17 ++ particle-services/src/persistence.rs | 38 +--- 5 files changed, 210 insertions(+), 157 deletions(-) diff --git a/particle-modules/src/error.rs b/particle-modules/src/error.rs index 1602584479..03ca47ac9b 100644 --- a/particle-modules/src/error.rs +++ b/particle-modules/src/error.rs @@ -48,12 +48,6 @@ pub enum ModuleError { err: toml::ser::Error, blueprint: Blueprint, }, - #[error("Error serializing persisted service config to toml: {err} {config:?}")] - SerializePersistedService { - #[source] - err: toml::ser::Error, - config: Box, - }, #[error("Error saving config to {path:?}: {err}")] WriteConfig { path: PathBuf, diff --git a/particle-services/src/app_service.rs b/particle-services/src/app_service.rs index 376fc1dd1b..0b019659b3 100644 --- a/particle-services/src/app_service.rs +++ b/particle-services/src/app_service.rs @@ -15,13 +15,11 @@ */ use crate::error::ServiceError; -use crate::persistence::{persist_service, PersistedService}; use crate::{Result, VIRTUAL_PARTICLE_VAULT_PREFIX}; use fluence_app_service::{ AppService, AppServiceConfig, MarineConfig, MarineWASIConfig, ModuleDescriptor, }; -use fluence_libp2p::PeerId; use particle_modules::ModuleRepository; use peer_metrics::ServicesMetrics; use server_config::ServicesConfig; @@ -34,43 +32,29 @@ pub fn create_app_service( modules: &ModuleRepository, blueprint_id: String, service_id: String, - aliases: Vec, - owner_id: PeerId, - worker_id: PeerId, metrics: Option<&ServicesMetrics>, ) -> Result { - try { - let mut modules_config = modules.resolve_blueprint(&blueprint_id)?; - modules_config - .iter_mut() - .for_each(|module| inject_vault(&config.particles_vault_dir, module)); + let mut modules_config = modules.resolve_blueprint(&blueprint_id)?; + modules_config + .iter_mut() + .for_each(|module| inject_vault(&config.particles_vault_dir, module)); - if let Some(metrics) = metrics.as_ref() { - metrics.observe_service_config(config.max_heap_size.as_u64(), &modules_config); - } - - let modules = AppServiceConfig { - service_working_dir: config.workdir.join(&service_id), - service_base_dir: config.workdir, - marine_config: MarineConfig { - modules_dir: Some(config.modules_dir), - modules_config, - default_modules_config: None, - }, - }; - - log::debug!("Creating service {}, envs: {:?}", service_id, config.envs); - - let service = AppService::new(modules, service_id.clone(), config.envs) - .map_err(ServiceError::Engine)?; + if let Some(metrics) = metrics.as_ref() { + metrics.observe_service_config(config.max_heap_size.as_u64(), &modules_config); + } - // Save created service to disk, so it is recreated on restart - let persisted = - PersistedService::new(service_id, blueprint_id, aliases, owner_id, worker_id); - persist_service(&config.services_dir, persisted)?; + let modules = AppServiceConfig { + service_working_dir: config.workdir.join(&service_id), + service_base_dir: config.workdir, + marine_config: MarineConfig { + modules_dir: Some(config.modules_dir), + modules_config, + default_modules_config: None, + }, + }; - service - } + log::debug!("Creating service {}, envs: {:?}", service_id, config.envs); + AppService::new(modules, service_id, config.envs).map_err(ServiceError::Engine) } /// Map `vault_dir` to `/tmp/vault` inside the service. diff --git a/particle-services/src/app_services.rs b/particle-services/src/app_services.rs index 4cfa366edf..810e470710 100644 --- a/particle-services/src/app_services.rs +++ b/particle-services/src/app_services.rs @@ -14,6 +14,7 @@ * limitations under the License. */ use std::ops::Deref; +use std::path::Path; use std::time::{Duration, Instant}; use std::{collections::HashMap, sync::Arc}; @@ -43,6 +44,7 @@ use crate::error::ServiceError::{AliasAsServiceId, Forbidden, NoSuchAlias}; use crate::persistence::{ load_persisted_services, persist_service, remove_persisted_service, PersistedService, }; +use crate::ServiceError::{ForbiddenAliasRoot, ForbiddenAliasWorker, NoSuchService}; type ServiceId = String; type ServiceAlias = String; @@ -65,6 +67,7 @@ pub struct ServiceInfo { pub struct Service { #[derivative(Debug(format_with = "fmt_service"))] pub service: Mutex, + pub service_id: String, pub blueprint_id: String, pub owner_id: PeerId, pub aliases: Vec, @@ -72,8 +75,31 @@ pub struct Service { } impl Service { + pub fn new( + service: Mutex, + service_id: String, + blueprint_id: String, + owner_id: PeerId, + aliases: Vec, + worker_id: PeerId, + ) -> Self { + Self { + service, + service_id, + blueprint_id, + owner_id, + aliases, + worker_id, + } + } + pub fn persist(&self, services_dir: &Path) -> Result<(), ServiceError> { + persist_service(services_dir, PersistedService::from_service(self)) + } + pub fn remove_alias(&mut self, alias: &str) { - self.aliases.retain(|a| a.ne(alias)); + if let Some(pos) = self.aliases.iter().position(|x| *x == alias) { + self.aliases.remove(pos); + } } pub fn add_alias(&mut self, alias: String) { @@ -151,7 +177,7 @@ pub fn get_service<'l>( worker_id: PeerId, local_peer_id: PeerId, id_or_alias: String, -) -> Result<(&'l Service, String), String> { +) -> Result<(&'l Service, String), ServiceError> { // retrieve service by service id if let Some(service) = services.get(&id_or_alias) { return Ok((service, id_or_alias)); @@ -164,7 +190,27 @@ pub fn get_service<'l>( (service, resolved_id.clone()) }; - by_alias.ok_or(id_or_alias) + by_alias.ok_or(NoSuchService(id_or_alias)) +} + +fn get_service_mut<'l>( + services: &'l mut Services, + worker_id: PeerId, + service_id: &str, +) -> Result<&'l mut Service, ServiceError> { + let service = services + .get_mut(service_id) + .ok_or(NoSuchService(service_id.to_string()))?; + + if service.worker_id != worker_id { + // service is deployed on another worker_id + Err(ServiceError::AliasWrongWorkerId { + service_id: service_id.to_string(), + worker_id: service.worker_id, + }) + } else { + Ok(service) + } } impl ParticleAppServices { @@ -209,6 +255,10 @@ impl ParticleAppServices { Ok(service_id) } + pub fn service_exists(&self, service_id: &str) -> bool { + self.services.read().get(service_id).is_some() + } + pub fn get_service_info( &self, worker_id: PeerId, @@ -221,8 +271,7 @@ impl ParticleAppServices { worker_id, self.config.local_peer_id, service_id_or_alias, - ) - .map_err(ServiceError::NoSuchService)?; + )?; Ok(json!(service.get_info(&service_id))) } @@ -272,8 +321,7 @@ impl ParticleAppServices { worker_id, self.config.local_peer_id, service_id_or_alias, - ) - .map_err(ServiceError::NoSuchService)?; + )?; // tmp hack to forbid spell removal via srv.remove let blueprint_name = self @@ -341,11 +389,7 @@ impl ParticleAppServices { Ok(()) } - pub fn call_service( - &self, - mut function_args: Args, - particle: ParticleParams, - ) -> FunctionOutcome { + pub fn call_service(&self, function_args: Args, particle: ParticleParams) -> FunctionOutcome { let call_time_start = Instant::now(); let services = self.services.read(); let aliases = self.aliases.read(); @@ -357,15 +401,13 @@ impl ParticleAppServices { &aliases, worker_id, self.config.local_peer_id, - function_args.service_id, + function_args.service_id.clone(), ); let (service, service_id) = match service { Ok(found) => found, // If service is not found, report it - Err(service_id) => { - // move field back - function_args.service_id = service_id; + Err(_err) => { return FunctionOutcome::NotDefined { args: function_args, params: particle, @@ -495,6 +537,38 @@ impl ParticleAppServices { self.call_service(args, particle) } + fn add_alias_inner( + &self, + alias: String, + worker_id: PeerId, + service_id: ServiceId, + ) -> Result<(), ServiceError> { + let mut services = self.services.write(); + let service = get_service_mut(&mut services, worker_id, &service_id)?; + service.add_alias(alias); + service.persist(&self.config.services_dir) + } + + fn get_service_id(&self, worker_id: PeerId, alias: &str) -> Option { + self.aliases + .read() + .get(&worker_id) + .and_then(|aliases| aliases.get(alias)) + .cloned() + } + + fn remove_alias( + &self, + alias: String, + worker_id: PeerId, + service_id: &str, + ) -> Result<(), ServiceError> { + let mut services = self.services.write(); + let service = get_service_mut(&mut services, worker_id, service_id)?; + service.remove_alias(&alias); + service.persist(&self.config.services_dir) + } + pub fn add_alias( &self, alias: String, @@ -505,77 +579,34 @@ impl ParticleAppServices { let is_management = init_peer_id == self.management_peer_id || init_peer_id == self.builtins_management_peer_id; let is_root_scope = worker_id == self.config.local_peer_id; + let is_worker = init_peer_id == worker_id; if is_root_scope && !is_management { - return Err(Forbidden { - user: init_peer_id, - function: "add_alias", - reason: "only management peer id can add top-level aliases", - }); - } else if init_peer_id != worker_id && !is_management { - return Err(Forbidden { - user: init_peer_id, - function: "add_alias", - reason: "only worker and management peer id can add worker-level aliases", - }); + return Err(ForbiddenAliasRoot(init_peer_id)); + } else if !is_worker && !is_management { + return Err(ForbiddenAliasWorker(init_peer_id)); } - // if a client trying to add an alias that equals some created service id - // return an error - if self.services.read().get(&alias).is_some() { + // alias can't be equal to any existent service id + if self.service_exists(&alias) { return Err(AliasAsServiceId(alias)); } - let mut services = self.services.write(); - - let service = services - .get_mut(&service_id) - .ok_or_else(|| ServiceError::NoSuchService(service_id.clone()))?; - - if service.worker_id != worker_id { - // service is deployed on another worker_id - return Err(ServiceError::AliasWrongWorkerId { - service_id, - worker_id: service.worker_id, - }); + if !self.service_exists(&service_id) { + return Err(NoSuchService(service_id)); } - service.add_alias(alias.clone()); - - let persisted_new = PersistedService::from_service(service_id.clone(), service); - - // Find a service with the same alias if any - let previous_owner_id: Option<_> = try { - self.aliases - .read() - .get(&service.worker_id)? - .get(&alias) - .cloned()? - }; - - // If there is such a service remove the alias from its list of aliases - let previous_owner = try { - let previous_owner_id = previous_owner_id?; - let previous_owner_service = services.get_mut(&previous_owner_id)?; - - previous_owner_service.remove_alias(&alias); - - PersistedService::from_service(previous_owner_id, previous_owner_service) - }; - - drop(services); - if let Some(previous_owner) = previous_owner { - // Save the updated aliases list of the previous owner of the alias on disk - persist_service(&self.config.services_dir, previous_owner)?; + let prev_srv_id = self.get_service_id(worker_id, &alias); + if let Some(srv_id) = prev_srv_id { + self.remove_alias(alias.clone(), worker_id, &srv_id)?; } - // Save the target service with the new alias on disk - persist_service(&self.config.services_dir, persisted_new)?; + self.add_alias_inner(alias.clone(), worker_id, service_id.clone())?; self.aliases .write() .entry(worker_id) .or_default() - .insert(alias, service_id.clone()); + .insert(alias, service_id); Ok(()) } @@ -603,8 +634,7 @@ impl ParticleAppServices { worker_id, self.config.local_peer_id, service_id_or_alias, - ) - .map_err(ServiceError::NoSuchService)?; + )?; Ok(service_id) } @@ -620,8 +650,7 @@ impl ParticleAppServices { worker_id, self.config.local_peer_id, id_or_alias, - ) - .map_err(ServiceError::NoSuchService)?; + )?; Ok(service.owner_id) } @@ -638,8 +667,7 @@ impl ParticleAppServices { worker_id, self.config.local_peer_id, id_or_alias.clone(), - ) - .map_err(ServiceError::NoSuchService)?; + )?; if service.worker_id != worker_id { Err(ServiceError::CallServiceFailedWrongWorker { @@ -663,8 +691,7 @@ impl ParticleAppServices { worker_id, self.config.local_peer_id, service_id, - ) - .map_err(ServiceError::NoSuchService)?; + )?; Ok(self.modules.get_facade_interface(&service.blueprint_id)?) } @@ -700,8 +727,7 @@ impl ParticleAppServices { worker_id, self.config.local_peer_id, service_id, - ) - .map_err(ServiceError::NoSuchService)?; + )?; let lock = service.service.lock(); let stats = lock.module_memory_stats(); @@ -793,9 +819,6 @@ impl ParticleAppServices { &self.modules, blueprint_id.clone(), service_id.clone(), - aliases.clone(), - owner_id, - worker_id, self.metrics.as_ref(), ) .inspect_err(|_| { @@ -808,13 +831,16 @@ impl ParticleAppServices { let stats = service.module_memory_stats(); let stats = ServiceMemoryStat::new(&stats); let service_type = ServiceType::Service(aliases.first().cloned()); - let service = Service { - service: Mutex::new(service), + let service = Service::new( + Mutex::new(service), + service_id.clone(), blueprint_id, owner_id, aliases, worker_id, - }; + ); + // Save created service to disk, so it is recreated on restart + service.persist(&self.config.services_dir)?; let replaced = self.services.write().insert(service_id.clone(), service); let creation_end_time = creation_start_time.elapsed().as_secs(); @@ -943,7 +969,7 @@ mod tests { assert!(resp.is_err()); assert!(matches!( resp.err().unwrap(), - ServiceError::Forbidden { .. } + ServiceError::ForbiddenAliasRoot { .. } )) } @@ -1130,6 +1156,56 @@ mod tests { assert_eq!(persisted_service_2.aliases, vec![alias.to_string()]); } + #[test] + fn test_add_alias_twice() { + let base_dir = TempDir::new("test4").unwrap(); + let local_pid = create_pid(); + let management_pid = create_pid(); + let pas = create_pas(local_pid, management_pid, base_dir.into_path()); + + let module_name = "tetra".to_string(); + let hash = upload_tetra_service(&pas, module_name.clone()); + + let service_id = create_service(&pas, module_name.clone(), &hash, local_pid).unwrap(); + + let alias = "alias"; + // add an alias to a service + pas.add_alias( + alias.to_string(), + local_pid, + service_id.clone(), + management_pid, + ) + .unwrap(); + // give the alias to service again + pas.add_alias( + alias.to_string(), + local_pid, + service_id.clone(), + management_pid, + ) + .unwrap(); + + let services = pas.services.read(); + let service = services.get(&service_id).unwrap(); + + // the service's alias list must contain only 1 alias + assert_eq!(service.aliases, vec![alias.to_string()]); + + let persisted_services: Vec<_> = + load_persisted_services(&pas.config.services_dir, local_pid) + .into_iter() + .collect::>() + .unwrap(); + let persisted_service = persisted_services + .iter() + .find(|s| s.service_id == service_id) + .unwrap(); + + // the persisted service's alias list must contain only one alias + assert_eq!(persisted_service.aliases, vec![alias.to_string()]); + } + #[test] fn test_persisted_service() { let base_dir = TempDir::new("test4").unwrap(); diff --git a/particle-services/src/error.rs b/particle-services/src/error.rs index d1f3fd7c82..bc2cb02e7c 100644 --- a/particle-services/src/error.rs +++ b/particle-services/src/error.rs @@ -14,6 +14,7 @@ * limitations under the License. */ +use std::fmt::Debug; use std::path::PathBuf; use fluence_app_service::AppServiceError; @@ -45,6 +46,10 @@ pub enum ServiceError { function: &'static str, reason: &'static str, }, + #[error("Forbidden. User id '{0}' cannot call function 'add_alias': only management peer id can add top-level aliases")] + ForbiddenAliasRoot(PeerId), + #[error("Forbidden. User id '{0}' cannot call function 'add_alias': only worker and management peer id can add worker-level aliases")] + ForbiddenAliasWorker(PeerId), #[error("Cannot add alias '{0}' because there is a service with that id")] AliasAsServiceId(String), #[error( @@ -89,6 +94,18 @@ pub enum ServiceError { }, #[error(transparent)] VaultError(#[from] VaultError), + #[error("Error serializing persisted service config to toml: {err} {config:?}")] + SerializePersistedService { + #[source] + err: toml::ser::Error, + config: Box, + }, + #[error("Error saving persisted service to {path:?}: {err}")] + WritePersistedService { + path: PathBuf, + #[source] + err: std::io::Error, + }, } impl From for ServiceError { diff --git a/particle-services/src/persistence.rs b/particle-services/src/persistence.rs index b08ad9cc13..5f3ad83a73 100644 --- a/particle-services/src/persistence.rs +++ b/particle-services/src/persistence.rs @@ -22,9 +22,9 @@ use crate::error::ServiceError::{ use fluence_libp2p::{peerid_serializer, peerid_serializer_opt, PeerId, RandomPeerId}; use fs_utils::{create_dir, list_files}; -use particle_modules::ModuleError; use service_modules::{is_service, service_file_name}; +use crate::ServiceError::{SerializePersistedService, WritePersistedService}; use serde::{Deserialize, Serialize}; use std::path::Path; @@ -47,46 +47,28 @@ pub struct PersistedService { } impl PersistedService { - pub fn new( - service_id: String, - blueprint_id: String, - aliases: Vec, - owner_id: PeerId, - worker_id: PeerId, - ) -> Self { - Self { - service_id, - blueprint_id, - aliases, - owner_id, - worker_id: Some(worker_id), + pub fn from_service(service: &Service) -> Self { + PersistedService { + service_id: service.service_id.clone(), + blueprint_id: service.blueprint_id.clone(), + aliases: service.aliases.clone(), + owner_id: service.owner_id, + worker_id: Some(service.worker_id), } } - - pub fn from_service(service_id: String, service: &Service) -> Self { - PersistedService::new( - service_id, - service.blueprint_id.clone(), - service.aliases.clone(), - service.owner_id, - service.worker_id, - ) - } } /// Persist service info to disk, so it is recreated after restart pub fn persist_service( services_dir: &Path, persisted_service: PersistedService, -) -> Result<(), ModuleError> { - use ModuleError::*; - +) -> Result<(), ServiceError> { let path = services_dir.join(service_file_name(&persisted_service.service_id)); let bytes = toml::to_vec(&persisted_service).map_err(|err| SerializePersistedService { err, config: Box::new(persisted_service.clone()), })?; - std::fs::write(&path, bytes).map_err(|err| WriteConfig { path, err }) + std::fs::write(&path, bytes).map_err(|err| WritePersistedService { path, err }) } /// Load info about persisted services from disk, and create `AppService` for each of them