From 0883ab8912ca19cb28dc9b6b388c48d5e6aa7306 Mon Sep 17 00:00:00 2001 From: Aleksey Proshutinskiy Date: Mon, 27 Nov 2023 20:34:11 +0200 Subject: [PATCH] feat(workers): add activate/deactivate [NET-587] (#1889) --- Cargo.lock | 2 + aquamarine/src/error.rs | 6 + aquamarine/src/plumber.rs | 8 + crates/connected-client/src/behaviour.rs | 13 +- crates/key-manager/Cargo.toml | 2 + crates/key-manager/src/error.rs | 39 +++- crates/key-manager/src/key_manager.rs | 209 +++++++++++------- crates/key-manager/src/persistence.rs | 224 +++++++++++++++----- crates/nox-tests/tests/spells.rs | 147 ++++++++++++- crates/peer-metrics/src/lib.rs | 10 +- crates/server-config/src/dir_config.rs | 9 + crates/server-config/src/node_config.rs | 4 +- crates/server-config/src/resolved_config.rs | 2 +- crates/spell-service-api/src/lib.rs | 2 + crates/system-services/src/deployer.rs | 2 +- docker/Config.default.toml | 1 + nox/src/node.rs | 1 + particle-builtins/src/builtins.rs | 2 +- particle-services/src/app_services.rs | 2 + sorcerer/src/sorcerer.rs | 107 ++++++++-- sorcerer/src/spell_builtins.rs | 5 +- sorcerer/src/worker_builins.rs | 168 ++++++++++++--- spell-storage/src/storage.rs | 8 + 23 files changed, 763 insertions(+), 210 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 97798182e2..9a3d039544 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3221,7 +3221,9 @@ dependencies = [ "parking_lot", "serde", "thiserror", + "tokio", "toml 0.5.11", + "tracing", ] [[package]] diff --git a/aquamarine/src/error.rs b/aquamarine/src/error.rs index cd807bf4d3..84674c5065 100644 --- a/aquamarine/src/error.rs +++ b/aquamarine/src/error.rs @@ -51,6 +51,11 @@ pub enum AquamarineApiError { particle_id: String, err: ParticleError, }, + #[error("AquamarineApiError::WorkerIsNotActive: worker_id = {worker_id}, particle_id = {particle_id}")] + WorkerIsNotActive { + worker_id: String, + particle_id: String, + }, } impl AquamarineApiError { @@ -59,6 +64,7 @@ impl AquamarineApiError { AquamarineApiError::ParticleExpired { particle_id } => Some(particle_id), AquamarineApiError::OneshotCancelled { particle_id } => Some(particle_id), AquamarineApiError::ExecutionTimedOut { particle_id, .. } => Some(particle_id), + AquamarineApiError::WorkerIsNotActive { particle_id, .. } => Some(particle_id), // Should it be `None` considering usage of signature as particle id? // It can compromise valid particles into thinking they are invalid. // But still there can be a case when signature was generated wrong diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index 649f54d2c7..a6dad5372d 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -104,6 +104,13 @@ impl Plumber { return; } + if !self.key_manager.is_worker_active(worker_id) + && !self.key_manager.is_management(particle.init_peer_id) + { + tracing::trace!(target: "worker_inactive", particle_id = particle.id, worker_id = worker_id.to_string(), "Worker is not active"); + return; + } + let builtins = &self.builtins; let key = (ParticleId(particle.signature.clone()), worker_id); let entry = self.actors.entry(key); @@ -432,6 +439,7 @@ mod tests { let builtin_mock = Arc::new(MockF); let key_manager = KeyManager::new( "keypair".into(), + "workers".into(), KeyPair::generate_ed25519(), RandomPeerId::random(), RandomPeerId::random(), diff --git a/crates/connected-client/src/behaviour.rs b/crates/connected-client/src/behaviour.rs index d85224aae3..b61f89c246 100644 --- a/crates/connected-client/src/behaviour.rs +++ b/crates/connected-client/src/behaviour.rs @@ -241,14 +241,11 @@ impl NetworkBehaviour for ClientBehaviour { ) { use ClientEvent::Particle; - match event { - Ok(HandlerMessage::InParticle(particle)) => { - self.events.push_back(GenerateEvent(Particle { - particle, - sender: peer_id, - })) - } - _ => {} + if let Ok(HandlerMessage::InParticle(particle)) = event { + self.events.push_back(GenerateEvent(Particle { + particle, + sender: peer_id, + })) } } diff --git a/crates/key-manager/Cargo.toml b/crates/key-manager/Cargo.toml index 886df1ba61..ff8fa68db7 100644 --- a/crates/key-manager/Cargo.toml +++ b/crates/key-manager/Cargo.toml @@ -17,3 +17,5 @@ serde = { workspace = true } toml = { workspace = true } log = { workspace = true } libp2p = { workspace = true } +tracing = { workspace = true } +tokio = { workspace = true } diff --git a/crates/key-manager/src/error.rs b/crates/key-manager/src/error.rs index b6c4175999..e8b68a21a9 100644 --- a/crates/key-manager/src/error.rs +++ b/crates/key-manager/src/error.rs @@ -34,6 +34,18 @@ pub enum KeyManagerError { #[source] err: toml::de::Error, }, + #[error("Failed to decode keypair {path}: {err}")] + PersistedKeypairDecodingError { + path: PathBuf, + #[source] + err: fluence_keypair::error::DecodingError, + }, + #[error("Invalid key format {path}: {err}")] + PersistedKeypairInvalidKeyformat { + path: PathBuf, + #[source] + err: fluence_keypair::error::Error, + }, #[error("Error serializing persisted keypair: {err}")] SerializePersistedKeypair { #[source] @@ -45,9 +57,28 @@ pub enum KeyManagerError { #[source] err: std::io::Error, }, - #[error("Error removing persisted keypair {path:?}: {err}")] + #[error("Error removing persisted keypair {path:?} for worker {worker_id}: {err}")] RemoveErrorPersistedKeypair { path: PathBuf, + worker_id: PeerId, + #[source] + err: std::io::Error, + }, + #[error("Error serializing persisted worker: {err}")] + SerializePersistedWorker { + #[source] + err: toml::ser::Error, + }, + #[error("Error writing persisted worker to {path:?}: {err}")] + WriteErrorPersistedWorker { + path: PathBuf, + #[source] + err: std::io::Error, + }, + #[error("Error removing persisted worker {path:?} for worker {worker_id}: {err}")] + RemoveErrorPersistedWorker { + path: PathBuf, + worker_id: PeerId, #[source] err: std::io::Error, }, @@ -57,6 +88,12 @@ pub enum KeyManagerError { #[source] err: std::io::Error, }, + #[error("Error creating directory for persisted workers {path:?}: {err}")] + CreateWorkersDir { + path: PathBuf, + #[source] + err: std::io::Error, + }, #[error("Keypair for peer_id {0} not found")] KeypairNotFound(PeerId), #[error("Worker for {deal_id} already exists")] diff --git a/crates/key-manager/src/key_manager.rs b/crates/key-manager/src/key_manager.rs index d10e3fdb29..d7fd5eb539 100644 --- a/crates/key-manager/src/key_manager.rs +++ b/crates/key-manager/src/key_manager.rs @@ -14,16 +14,16 @@ * limitations under the License. */ -use fluence_keypair::{KeyFormat, KeyPair}; +use fluence_keypair::KeyPair; use libp2p::PeerId; use std::collections::HashMap; use std::path::PathBuf; -use std::str::FromStr; use std::sync::Arc; use crate::error::KeyManagerError; use crate::persistence::{ - load_persisted_keypairs, persist_keypair, remove_keypair, PersistedKeypair, + load_persisted_keypairs, load_persisted_workers, persist_keypair, persist_worker, + remove_keypair, remove_worker, PersistedWorker, }; use crate::KeyManagerError::{WorkerAlreadyExists, WorkerNotFound, WorkerNotFoundByDeal}; use parking_lot::RwLock; @@ -31,19 +31,22 @@ use parking_lot::RwLock; type DealId = String; type WorkerId = PeerId; -#[derive(Clone)] pub struct WorkerInfo { pub deal_id: String, pub creator: PeerId, + pub active: RwLock, } #[derive(Clone)] pub struct KeyManager { /// worker_id -> worker_keypair worker_keypairs: Arc>>, + /// deal_id -> worker_id worker_ids: Arc>>, + /// worker_id -> worker_info worker_infos: Arc>>, keypairs_dir: PathBuf, + workers_dir: PathBuf, host_peer_id: PeerId, pub root_keypair: KeyPair, management_peer_id: PeerId, @@ -53,6 +56,7 @@ pub struct KeyManager { impl KeyManager { pub fn new( keypairs_dir: PathBuf, + workers_dir: PathBuf, root_keypair: KeyPair, management_peer_id: PeerId, builtins_management_peer_id: PeerId, @@ -62,6 +66,7 @@ impl KeyManager { worker_ids: Arc::new(Default::default()), worker_infos: Arc::new(Default::default()), keypairs_dir, + workers_dir, host_peer_id: root_keypair.get_peer_id(), root_keypair, management_peer_id, @@ -69,38 +74,29 @@ impl KeyManager { }; this.load_persisted_keypairs(); + this.load_persisted_workers(); this } - pub fn load_persisted_keypairs(&self) { - let persisted_keypairs = load_persisted_keypairs(&self.keypairs_dir); + fn load_persisted_keypairs(&self) { + let mut worker_keypairs = self.worker_keypairs.write(); + let keypairs = load_persisted_keypairs(&self.keypairs_dir); - for pkp in persisted_keypairs { - let res: eyre::Result<()> = try { - let persisted_kp = pkp?; - let keypair = KeyPair::from_secret_key( - persisted_kp.private_key_bytes, - KeyFormat::from_str(&persisted_kp.key_format)?, - )?; - let worker_id = keypair.get_peer_id(); - self.worker_ids - .write() - .insert(persisted_kp.deal_id.clone(), keypair.get_peer_id()); - - self.worker_keypairs.write().insert(worker_id, keypair); - - self.worker_infos.write().insert( - worker_id, - WorkerInfo { - deal_id: persisted_kp.deal_id, - creator: persisted_kp.deal_creator, - }, - ); - }; - - if let Err(e) = res { - log::warn!("Failed to restore persisted keypair: {}", e); - } + for keypair in keypairs { + let worker_id = keypair.get_peer_id(); + worker_keypairs.insert(worker_id, keypair); + } + } + fn load_persisted_workers(&self) { + let workers = load_persisted_workers(&self.workers_dir); + let mut worker_ids = self.worker_ids.write(); + let mut worker_infos = self.worker_infos.write(); + + for w in workers { + let worker_id = w.worker_id; + let deal_id = w.deal_id.clone(); + worker_infos.insert(worker_id, w.into()); + worker_ids.insert(deal_id, worker_id); } } @@ -124,36 +120,52 @@ impl KeyManager { self.host_peer_id } - pub fn generate_deal_id(init_peer_id: PeerId) -> String { - format!("direct_hosting_{init_peer_id}") - } - pub fn create_worker( &self, - deal_id: Option, + deal_id: String, init_peer_id: PeerId, ) -> Result { - // if deal_id is not provided, we associate it with init_peer_id - let deal_id = deal_id.unwrap_or(Self::generate_deal_id(init_peer_id)); let worker_id = self.worker_ids.read().get(&deal_id).cloned(); match worker_id { Some(_) => Err(WorkerAlreadyExists { deal_id }), _ => { - let kp = self.generate_keypair(); - let worker_id = kp.get_peer_id(); - self.store_keypair(deal_id, init_peer_id, kp)?; + let mut worker_ids = self.worker_ids.write(); + let mut worker_infos = self.worker_infos.write(); + let mut worker_keypairs = self.worker_keypairs.write(); + + if worker_ids.contains_key(&deal_id) { + return Err(WorkerAlreadyExists { deal_id }); + } + + let keypair = KeyPair::generate_ed25519(); + let worker_id = keypair.get_peer_id(); + persist_keypair(&self.keypairs_dir, worker_id, (&keypair).try_into()?)?; + + match self.store_worker(worker_id, deal_id.clone(), init_peer_id) { + Ok(worker_info) => { + worker_keypairs.insert(worker_id, keypair); + worker_ids.insert(deal_id, worker_id); + worker_infos.insert(worker_id, worker_info); + } + Err(err) => { + tracing::warn!( + target = "key-manager", + "Failed to store worker info for {}: {}", + worker_id, + err + ); + + remove_keypair(&self.keypairs_dir, worker_id)?; + return Err(err); + } + } + Ok(worker_id) } } } - pub fn get_worker_id( - &self, - deal_id: Option, - init_peer_id: PeerId, - ) -> Result { - // if deal_id is not provided, we associate it with init_peer_id - let deal_id = deal_id.unwrap_or(Self::generate_deal_id(init_peer_id)); + pub fn get_worker_id(&self, deal_id: String) -> Result { self.worker_ids .read() .get(&deal_id) @@ -169,16 +181,20 @@ impl KeyManager { self.worker_infos .read() .get(&worker_id) - .ok_or_else(|| KeyManagerError::WorkerNotFound(worker_id)) + .ok_or(WorkerNotFound(worker_id)) .map(|info| info.deal_id.clone()) } pub fn remove_worker(&self, worker_id: PeerId) -> Result<(), KeyManagerError> { let deal_id = self.get_deal_id(worker_id)?; - remove_keypair(&self.keypairs_dir, &deal_id)?; - let removed_worker_id = self.worker_ids.write().remove(&deal_id); - let removed_worker_info = self.worker_infos.write().remove(&worker_id); - let removed_worker_kp = self.worker_keypairs.write().remove(&worker_id); + let mut worker_ids = self.worker_ids.write(); + let mut worker_infos = self.worker_infos.write(); + let mut worker_keypairs = self.worker_keypairs.write(); + remove_keypair(&self.keypairs_dir, worker_id)?; + remove_worker(&self.workers_dir, worker_id)?; + let removed_worker_id = worker_ids.remove(&deal_id); + let removed_worker_info = worker_infos.remove(&worker_id); + let removed_worker_kp = worker_keypairs.remove(&worker_id); debug_assert!(removed_worker_id.is_some(), "worker_id does not exist"); debug_assert!(removed_worker_info.is_some(), "worker info does not exist"); @@ -206,37 +222,76 @@ impl KeyManager { self.worker_infos .read() .get(&worker_id) - .cloned() - .ok_or(WorkerNotFound(worker_id)) .map(|i| i.creator) + .ok_or(WorkerNotFound(worker_id)) } } - pub fn generate_keypair(&self) -> KeyPair { - KeyPair::generate_ed25519() - } - - pub fn store_keypair( + fn store_worker( &self, - deal_id: DealId, - deal_creator: PeerId, - keypair: KeyPair, - ) -> Result<(), KeyManagerError> { - persist_keypair( - &self.keypairs_dir, - PersistedKeypair::new(deal_creator, &keypair, deal_id.clone())?, - )?; - let worker_id = keypair.get_peer_id(); - self.worker_ids.write().insert(deal_id.clone(), worker_id); - self.worker_infos.write().insert( + worker_id: PeerId, + deal_id: String, + creator: PeerId, + ) -> Result { + let worker_info = WorkerInfo { + deal_id: deal_id.clone(), + creator, + active: RwLock::new(true), + }; + + persist_worker( + &self.workers_dir, worker_id, - WorkerInfo { + PersistedWorker { + worker_id, + creator, deal_id, - creator: deal_creator, + active: true, }, - ); - self.worker_keypairs.write().insert(worker_id, keypair); + )?; + Ok(worker_info) + } - Ok(()) + fn set_worker_status(&self, worker_id: PeerId, status: bool) -> Result<(), KeyManagerError> { + let guard = self.worker_infos.read(); + let worker_info = guard.get(&worker_id).ok_or(WorkerNotFound(worker_id))?; + + let mut active = worker_info.active.write(); + *active = status; + persist_worker( + &self.workers_dir, + worker_id, + PersistedWorker { + worker_id, + creator: worker_info.creator, + deal_id: worker_info.deal_id.clone(), + active: *active, + }, + ) + } + pub fn activate_worker(&self, worker_id: PeerId) -> Result<(), KeyManagerError> { + self.set_worker_status(worker_id, true) + } + + pub fn deactivate_worker(&self, worker_id: PeerId) -> Result<(), KeyManagerError> { + self.set_worker_status(worker_id, false) + } + + pub fn is_worker_active(&self, worker_id: PeerId) -> bool { + // host is always active + if self.is_host(worker_id) { + return true; + } + + let guard = self.worker_infos.read(); + let worker_info = guard.get(&worker_id); + + match worker_info { + Some(worker_info) => *worker_info.active.read(), + None => { + tracing::warn!(target = "key-manager", "Worker {} not found", worker_id); + false + } + } } } diff --git a/crates/key-manager/src/persistence.rs b/crates/key-manager/src/persistence.rs index 878f8f4d01..7685502417 100644 --- a/crates/key-manager/src/persistence.rs +++ b/crates/key-manager/src/persistence.rs @@ -21,105 +21,227 @@ use crate::error::KeyManagerError::{ CannotExtractRSASecretKey, CreateKeypairsDir, DeserializePersistedKeypair, ReadPersistedKeypair, SerializePersistedKeypair, WriteErrorPersistedKeypair, }; -use crate::KeyManager; -use crate::KeyManagerError::RemoveErrorPersistedKeypair; -use fluence_keypair::KeyPair; +use crate::key_manager::WorkerInfo; +use crate::KeyManagerError::{ + CreateWorkersDir, PersistedKeypairDecodingError, PersistedKeypairInvalidKeyformat, + RemoveErrorPersistedKeypair, RemoveErrorPersistedWorker, SerializePersistedWorker, + WriteErrorPersistedWorker, +}; +use fluence_keypair::{KeyFormat, KeyPair}; use fluence_libp2p::peerid_serializer; use libp2p::PeerId; +use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use std::path::Path; +use std::str::FromStr; + +pub const fn default_bool() -> bool { + V +} #[derive(Serialize, Deserialize, Debug, Clone)] pub struct PersistedKeypair { - #[serde(with = "peerid_serializer")] - #[serde(alias = "remote_peer_id")] - pub deal_creator: PeerId, pub private_key_bytes: Vec, pub key_format: String, +} + +#[derive(Serialize, Deserialize)] +pub struct PersistedWorker { + #[serde(with = "peerid_serializer")] + pub worker_id: PeerId, + #[serde(with = "peerid_serializer")] + pub creator: PeerId, #[serde(default)] pub deal_id: String, + #[serde(default = "default_bool::")] + pub active: bool, +} + +impl From for WorkerInfo { + fn from(val: PersistedWorker) -> Self { + WorkerInfo { + deal_id: val.deal_id, + creator: val.creator, + active: RwLock::new(val.active), + } + } } -impl PersistedKeypair { - pub fn new( - deal_creator: PeerId, - keypair: &KeyPair, - deal_id: String, - ) -> Result { +impl TryFrom<&KeyPair> for PersistedKeypair { + type Error = KeyManagerError; + + fn try_from(keypair: &KeyPair) -> Result { Ok(Self { - deal_creator, private_key_bytes: keypair.secret().map_err(|_| CannotExtractRSASecretKey)?, key_format: keypair.public().get_key_format().into(), - deal_id, }) } } -pub fn keypair_file_name(deal_id: &str) -> String { - format!("{deal_id}_keypair.toml") +pub fn keypair_file_name(worker_id: PeerId) -> String { + format!("{}_keypair.toml", worker_id.to_base58()) } -pub fn is_keypair(path: &Path) -> bool { +pub fn worker_file_name(worker_id: PeerId) -> String { + format!("{}_info.toml", worker_id.to_base58()) +} + +fn is_keypair(path: &Path) -> bool { path.file_name() .and_then(|n| n.to_str()) .map_or(false, |n| n.ends_with("_keypair.toml")) } +fn is_worker(path: &Path) -> bool { + path.file_name() + .and_then(|n| n.to_str()) + .map_or(false, |n| n.ends_with("_info.toml")) +} + /// Persist keypair info to disk, so it is recreated after restart pub fn persist_keypair( keypairs_dir: &Path, + worker_id: PeerId, persisted_keypair: PersistedKeypair, ) -> Result<(), KeyManagerError> { - let path = keypairs_dir.join(keypair_file_name(&persisted_keypair.deal_id)); + let path = keypairs_dir.join(keypair_file_name(worker_id)); let bytes = toml::to_vec(&persisted_keypair).map_err(|err| SerializePersistedKeypair { err })?; std::fs::write(&path, bytes).map_err(|err| WriteErrorPersistedKeypair { path, err }) } +fn load_persisted_keypair(file: &Path) -> Result { + let bytes = std::fs::read(file).map_err(|err| ReadPersistedKeypair { + err, + path: file.to_path_buf(), + })?; + let keypair: PersistedKeypair = + toml::from_slice(bytes.as_slice()).map_err(|err| DeserializePersistedKeypair { + err, + path: file.to_path_buf(), + })?; + + KeyPair::from_secret_key( + keypair.private_key_bytes, + KeyFormat::from_str(&keypair.key_format).map_err(|err| { + PersistedKeypairInvalidKeyformat { + err, + path: file.to_path_buf(), + } + })?, + ) + .map_err(|err| PersistedKeypairDecodingError { + err, + path: file.to_path_buf(), + }) +} + +fn load_persisted_worker(file: &Path) -> Result { + let bytes = std::fs::read(file).map_err(|err| ReadPersistedKeypair { + err, + path: file.to_path_buf(), + })?; + + toml::from_slice(bytes.as_slice()).map_err(|err| DeserializePersistedKeypair { + err, + path: file.to_path_buf(), + }) +} + /// Load info about persisted keypairs from disk -pub fn load_persisted_keypairs( - keypairs_dir: &Path, -) -> Vec> { - // Load all persisted service file names +pub fn load_persisted_keypairs(keypairs_dir: &Path) -> Vec { let files = match list_files(keypairs_dir) { - Some(files) => files, + Some(files) => files.collect(), None => { - // Attempt to create directory and exit + // Attempt to create directory if let Err(err) = create_dir(keypairs_dir) { - return vec![Err(CreateKeypairsDir { - path: keypairs_dir.to_path_buf(), - err, - })]; + log::warn!( + "{}", + CreateKeypairsDir { + path: keypairs_dir.to_path_buf(), + err, + } + ); + } + vec![] + } + }; + + let mut keypairs = vec![]; + for file in files.iter() { + let res: eyre::Result<()> = try { + if is_keypair(file) { + keypairs.push(load_persisted_keypair(file)?); } + }; - return vec![]; + if let Err(err) = res { + log::warn!("{err}") + } + } + + keypairs +} + +/// Load info about persisted workers from disk +pub fn load_persisted_workers(workers_dir: &Path) -> Vec { + let files = match list_files(workers_dir) { + Some(files) => files.collect(), + None => { + // Attempt to create directory + if let Err(err) = create_dir(workers_dir) { + log::warn!( + "{}", + CreateWorkersDir { + path: workers_dir.to_path_buf(), + err, + } + ); + } + vec![] } }; - files - .filter(|p| is_keypair(p)) - .map(|file| { - // Load persisted keypair - let bytes = std::fs::read(&file).map_err(|err| ReadPersistedKeypair { - err, - path: file.to_path_buf(), - })?; - let mut keypair: PersistedKeypair = - toml::from_slice(bytes.as_slice()).map_err(|err| DeserializePersistedKeypair { - err, - path: file.to_path_buf(), - })?; - - if keypair.deal_id.is_empty() { - keypair.deal_id = KeyManager::generate_deal_id(keypair.deal_creator); + let mut workers = vec![]; + for file in files.iter() { + let res: eyre::Result<()> = try { + if is_worker(file) { + workers.push(load_persisted_worker(file)?); } + }; - Ok(keypair) - }) - .collect() + if let Err(err) = res { + log::warn!("{err}") + } + } + + workers +} + +pub fn remove_keypair(keypairs_dir: &Path, worker_id: PeerId) -> Result<(), KeyManagerError> { + let path = keypairs_dir.join(keypair_file_name(worker_id)); + std::fs::remove_file(path.clone()).map_err(|err| RemoveErrorPersistedKeypair { + path, + worker_id, + err, + }) +} + +pub fn persist_worker( + workers_dir: &Path, + worker_id: PeerId, + worker: PersistedWorker, +) -> Result<(), KeyManagerError> { + let path = workers_dir.join(worker_file_name(worker_id)); + let bytes = toml::to_vec(&worker).map_err(|err| SerializePersistedWorker { err })?; + std::fs::write(&path, bytes).map_err(|err| WriteErrorPersistedWorker { path, err }) } -pub fn remove_keypair(keypairs_dir: &Path, deal_id: &str) -> Result<(), KeyManagerError> { - let path = keypairs_dir.join(keypair_file_name(deal_id)); - std::fs::remove_file(path.clone()).map_err(|err| RemoveErrorPersistedKeypair { path, err }) +pub fn remove_worker(workers_dir: &Path, worker_id: PeerId) -> Result<(), KeyManagerError> { + let path = workers_dir.join(worker_file_name(worker_id)); + std::fs::remove_file(path.clone()).map_err(|err| RemoveErrorPersistedWorker { + path, + worker_id, + err, + }) } diff --git a/crates/nox-tests/tests/spells.rs b/crates/nox-tests/tests/spells.rs index 85f853b301..9336a700a6 100644 --- a/crates/nox-tests/tests/spells.rs +++ b/crates/nox-tests/tests/spells.rs @@ -38,7 +38,7 @@ type WorkerPeerId = String; async fn create_worker(client: &mut ConnectedClient, deal_id: Option) -> WorkerPeerId { let data = hashmap! { - "deal_id" => deal_id.map(JValue::String).unwrap_or(JValue::Null), + "deal_id" => deal_id.map(JValue::String).unwrap_or(JValue::String("default_deal".to_string())), "relay" => json!(client.node.to_string()), "client" => json!(client.peer_id.to_string()), }; @@ -48,7 +48,10 @@ async fn create_worker(client: &mut ConnectedClient, deal_id: Option) -> (seq (xor (call relay ("worker" "create") [deal_id] worker_peer_id) - (call relay ("worker" "get_peer_id") [deal_id] worker_peer_id) + (seq + (call relay ("worker" "get_worker_id") [deal_id] get_worker_peer_id) + (ap get_worker_peer_id.$.[0] worker_peer_id) + ) ) (call client ("return" "") [worker_peer_id]) )"#, @@ -915,11 +918,25 @@ async fn spell_trigger_connection_pool() { ); let mut config = TriggerConfig::default(); config.connections.connect = true; - let (spell_id1, _) = create_spell(&mut client, &script, config, json!({}), None).await; + let (spell_id1, _) = create_spell( + &mut client, + &script, + config, + json!({}), + Some("deal_id1".to_string()), + ) + .await; let mut config = TriggerConfig::default(); config.connections.disconnect = true; - let (spell_id2, _) = create_spell(&mut client, &script, config, json!({}), None).await; + let (spell_id2, _) = create_spell( + &mut client, + &script, + config, + json!({}), + Some("deal_id2".to_string()), + ) + .await; // This connect should trigger the spell let connect_num = 5; @@ -1527,14 +1544,14 @@ async fn spell_create_worker_twice() { (seq (seq (call relay ("worker" "create") ["deal_id"] worker_peer_id) - (call relay ("worker" "get_peer_id") ["deal_id"] get_worker_peer_id) + (call relay ("worker" "get_worker_id") ["deal_id"] get_worker_peer_id) ) (seq (call relay ("worker" "create") ["deal_id"] failed_create) (call client ("return" "") ["test failed"]) ) ) - (call client ("return" "") [%last_error%.$.message worker_peer_id get_worker_peer_id]) + (call client ("return" "") [%last_error%.$.message worker_peer_id get_worker_peer_id.$.[0]]) )"#, data.clone(), ) @@ -2207,3 +2224,121 @@ async fn test_decider_api_endpoint_rewrite() { assert_eq!(*endpoint, another_endpoint); } } + +#[tokio::test] +async fn test_activate_deactivate() { + let worker_period_sec = 120u32; + let swarms = make_swarms_with_cfg(1, |mut cfg| { + cfg.override_system_services_config = Some(SystemServicesConfig { + enable: vec![], + aqua_ipfs: Default::default(), + decider: DeciderConfig { + worker_period_sec, + ..Default::default() + }, + registry: Default::default(), + connector: Default::default(), + }); + cfg + }) + .await; + let mut client = ConnectedClient::connect_with_keypair( + swarms[0].multiaddr.clone(), + Some(swarms[0].management_keypair.clone()), + ) + .await + .wrap_err("connect client") + .unwrap(); + + let deal_id = "deal-id-1".to_string(); + + let config = make_clock_config(worker_period_sec, 1, 0); + let (_, worker_id) = create_spell_with_alias( + &mut client, + r#"(call %init_peer_id% ("op" "noop") [])"#, + config.clone(), + json!({}), + Some(deal_id.clone()), + "worker-spell".to_string(), + ) + .await; + + let (_, _) = create_spell_with_alias( + &mut client, + r#"(call %init_peer_id% ("op" "noop") [])"#, + config.clone(), + json!({}), + Some(deal_id.clone()), + "other-spell".to_string(), + ) + .await; + + client + .send_particle( + r#"(seq + (seq + (seq + (call relay ("worker" "is_active") [deal_id] is_active_before) + (seq + (call worker ("worker-spell" "get_trigger_config") [] worker_trigger_config_before) + (call worker ("other-spell" "get_trigger_config") [] spell_trigger_config_before) + ) + ) + (seq + (seq + (call relay ("worker" "deactivate") [deal_id]) + (seq + (call relay ("worker" "is_active") [deal_id] is_active_after) + (seq + (call worker ("worker-spell" "get_trigger_config") [] worker_trigger_config_after) + (call worker ("other-spell" "get_trigger_config") [] spell_trigger_config_after) + ) + ) + ) + (seq + (call relay ("worker" "activate") [deal_id]) + (seq + (call relay ("worker" "is_active") [deal_id] is_active_after_restart) + (call worker ("worker-spell" "get_trigger_config") [] worker_trigger_config_after_restart) + ) + ) + ) + ) + (call client ("return" "") [is_active_before is_active_after worker_trigger_config_before worker_trigger_config_after spell_trigger_config_before spell_trigger_config_after is_active_after_restart worker_trigger_config_after_restart]) + )"#, + hashmap! { + "relay" => json!(client.node.to_string()), + "client" => json!(client.peer_id.to_string()), + "worker" => json!(worker_id), + "deal_id" => json!(deal_id), + }, + ) + .await; + + if let [JValue::Bool(is_active_before), JValue::Bool(is_active_after), JValue::Object(worker_trigger_config_before), JValue::Object(worker_trigger_config_after), JValue::Object(spell_trigger_config_before), JValue::Object(spell_trigger_config_after), JValue::Bool(is_active_after_restart), JValue::Object(worker_trigger_config_after_restart)] = + client.receive_args().await.unwrap().as_slice() + { + assert!(*is_active_before); + assert!(!*is_active_after); + let worker_trigger_config_before: TriggerConfig = + serde_json::from_value(worker_trigger_config_before["config"].clone()).unwrap(); + let worker_trigger_config_after: TriggerConfig = + serde_json::from_value(worker_trigger_config_after["config"].clone()).unwrap(); + assert_eq!(worker_trigger_config_before, config); + assert_eq!(worker_trigger_config_after, TriggerConfig::default()); + + let spell_trigger_config_before: TriggerConfig = + serde_json::from_value(spell_trigger_config_before["config"].clone()).unwrap(); + let spell_trigger_config_after: TriggerConfig = + serde_json::from_value(spell_trigger_config_after["config"].clone()).unwrap(); + assert_eq!(spell_trigger_config_before, config); + assert_eq!(spell_trigger_config_after, TriggerConfig::default()); + + assert!(*is_active_after_restart); + let worker_trigger_config_after_restart: TriggerConfig = + serde_json::from_value(worker_trigger_config_after_restart["config"].clone()).unwrap(); + assert_eq!(worker_trigger_config_after_restart, config); + } else { + panic!("expected result") + } +} diff --git a/crates/peer-metrics/src/lib.rs b/crates/peer-metrics/src/lib.rs index d87c8e6133..2135331049 100644 --- a/crates/peer-metrics/src/lib.rs +++ b/crates/peer-metrics/src/lib.rs @@ -56,7 +56,7 @@ pub struct ParticleLabel { } /// from 100 microseconds to 120 seconds -pub(self) fn execution_time_buckets() -> std::vec::IntoIter { +fn execution_time_buckets() -> std::vec::IntoIter { vec![ 0.0001, 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 4.0, 7.0, 15.0, 30.0, 60.0, 120.0, ] @@ -64,17 +64,17 @@ pub(self) fn execution_time_buckets() -> std::vec::IntoIter { } /// 1mib, 5mib, 10mib, 25mib, 50mib, 100mib, 200mib, 500mib, 1gib -pub(self) fn mem_buckets() -> std::vec::IntoIter { +fn mem_buckets() -> std::vec::IntoIter { to_mib(vec![1, 5, 10, 25, 50, 100, 200, 500, 1024].into_iter()) } /// 1mib, 5mib, 10mib, 25mib, 50mib, 100mib, 200mib, 500mib, 1gib, 2gib, 3gib, 4gib -pub(self) fn mem_buckets_4gib() -> std::vec::IntoIter { +fn mem_buckets_4gib() -> std::vec::IntoIter { to_mib(vec![1, 5, 10, 25, 50, 100, 200, 500, 1024, 2048, 3072, 4096].into_iter()) } /// 1mib, 5mib, 10mib, 25mib, 50mib, 100mib, 200mib, 500mib, 1gib, 2gib, 3gib, 4gib, 8gib -pub(self) fn mem_buckets_8gib() -> std::vec::IntoIter { +fn mem_buckets_8gib() -> std::vec::IntoIter { to_mib( vec![ 1, 5, 10, 25, 50, 100, 200, 500, 1024, 2048, 3072, 4096, 8192, @@ -90,7 +90,7 @@ fn to_mib(values: std::vec::IntoIter) -> std::vec::IntoIter { .into_iter() } -pub(self) fn register(registry: &mut Registry, metric: M, name: &str, help: &str) -> M +fn register(registry: &mut Registry, metric: M, name: &str, help: &str) -> M where M: 'static + EncodeMetric + Clone + Send + Sync + Debug, { diff --git a/crates/server-config/src/dir_config.rs b/crates/server-config/src/dir_config.rs index 0876ffcbe9..bab7478cfe 100644 --- a/crates/server-config/src/dir_config.rs +++ b/crates/server-config/src/dir_config.rs @@ -40,7 +40,11 @@ pub struct UnresolvedDirConfig { /// Path to spell service files (wasms, configs) pub spell_base_dir: Option, + /// Path to persisted worker keypairs pub keypairs_base_dir: Option, + + /// Path to persisted workers + pub workers_base_dir: Option, } impl UnresolvedDirConfig { @@ -54,6 +58,7 @@ impl UnresolvedDirConfig { .unwrap_or(air_interpreter_path(&base)); let spell_base_dir = self.spell_base_dir.unwrap_or(base.join("spell")); let keypairs_base_dir = self.keypairs_base_dir.unwrap_or(base.join("keypairs")); + let workers_base_dir = self.workers_base_dir.unwrap_or(base.join("workers")); create_dirs(&[ &base, @@ -61,6 +66,7 @@ impl UnresolvedDirConfig { &avm_base_dir, &spell_base_dir, &keypairs_base_dir, + &workers_base_dir, ]) .context("creating configured directories")?; @@ -69,6 +75,7 @@ impl UnresolvedDirConfig { let avm_base_dir = canonicalize(avm_base_dir)?; let spell_base_dir = canonicalize(spell_base_dir)?; let keypairs_base_dir = canonicalize(keypairs_base_dir)?; + let workers_base_dir = canonicalize(workers_base_dir)?; Ok(ResolvedDirConfig { base_dir: base, @@ -77,6 +84,7 @@ impl UnresolvedDirConfig { air_interpreter_path, spell_base_dir, keypairs_base_dir, + workers_base_dir, }) } } @@ -91,4 +99,5 @@ pub struct ResolvedDirConfig { pub air_interpreter_path: PathBuf, pub spell_base_dir: PathBuf, pub keypairs_base_dir: PathBuf, + pub workers_base_dir: PathBuf, } diff --git a/crates/server-config/src/node_config.rs b/crates/server-config/src/node_config.rs index 7a9b0582b8..ddf7c51f57 100644 --- a/crates/server-config/src/node_config.rs +++ b/crates/server-config/src/node_config.rs @@ -138,12 +138,12 @@ impl UnresolvedNodeConfig { let root_key_pair = self .root_key_pair - .unwrap_or(KeypairConfig::default()) + .unwrap_or_default() .get_keypair(default_keypair_path())?; let builtins_key_pair = self .builtins_key_pair - .unwrap_or(KeypairConfig::default()) + .unwrap_or_default() .get_keypair(default_builtins_keypair_path())?; let mut allowed_binaries = self.allowed_binaries; diff --git a/crates/server-config/src/resolved_config.rs b/crates/server-config/src/resolved_config.rs index a5ff282f4d..ad4b280b9f 100644 --- a/crates/server-config/src/resolved_config.rs +++ b/crates/server-config/src/resolved_config.rs @@ -227,7 +227,7 @@ pub fn load_config_with_args( .and_then(|str| str.into_string().ok()) .map(|str| { str.trim() - .split(",") + .split(',') .map(PathBuf::from) .map(|path| File::from(path.clone()).format(FileFormat::Toml)) .collect() diff --git a/crates/spell-service-api/src/lib.rs b/crates/spell-service-api/src/lib.rs index f35bad1e46..1cfe4d0bcb 100644 --- a/crates/spell-service-api/src/lib.rs +++ b/crates/spell-service-api/src/lib.rs @@ -300,9 +300,11 @@ mod tests { let startup_kp = Keypair::generate_ed25519(); let vault_dir = base_dir.join("..").join("vault"); let keypairs_dir = base_dir.join("..").join("keypairs"); + let workers_dir = base_dir.join("..").join("workers"); let key_manager = KeyManager::new( keypairs_dir, + workers_dir, startup_kp.clone().into(), management_pid, to_peer_id(&startup_kp), diff --git a/crates/system-services/src/deployer.rs b/crates/system-services/src/deployer.rs index de31e474a4..b2a4d0edbe 100644 --- a/crates/system-services/src/deployer.rs +++ b/crates/system-services/src/deployer.rs @@ -200,7 +200,7 @@ impl Deployer { &self.spell_storage, &self.services, &self.spell_event_bus_api, - &spell_id, + spell_id, self.root_worker_id, ) .await diff --git a/docker/Config.default.toml b/docker/Config.default.toml index 658a84d1b9..6c50fb3eda 100644 --- a/docker/Config.default.toml +++ b/docker/Config.default.toml @@ -4,6 +4,7 @@ # avm_base_dir = "/stepper" # spell_base_dir = "/spell" # keypairs_base_dir = "/keypairs" +# workers_base_dir = "/workers" # # Path to AIR interpreter .wasm is set to specific version by default # air_interpreter_path = "./aquamarine_${air_interpreter_wasm::VERSION}.wasm" diff --git a/nox/src/node.rs b/nox/src/node.rs index c3965805bb..633dfe924b 100644 --- a/nox/src/node.rs +++ b/nox/src/node.rs @@ -113,6 +113,7 @@ impl Node { let key_manager = KeyManager::new( config.dir_config.keypairs_base_dir.clone(), + config.dir_config.workers_base_dir.clone(), key_pair.clone().try_into()?, config.management_peer_id, builtins_peer_id, diff --git a/particle-builtins/src/builtins.rs b/particle-builtins/src/builtins.rs index 2febaf562f..0c94fafe6c 100644 --- a/particle-builtins/src/builtins.rs +++ b/particle-builtins/src/builtins.rs @@ -804,7 +804,7 @@ where .services .resolve_alias(¶ms.id, params.host_id, alias) .map(|id| vec![JValue::String(id)]) - .unwrap_or(vec![]); + .unwrap_or_default(); Ok(Array(service_id_opt)) } diff --git a/particle-services/src/app_services.rs b/particle-services/src/app_services.rs index 030056b48a..7ea48b1c72 100644 --- a/particle-services/src/app_services.rs +++ b/particle-services/src/app_services.rs @@ -1059,9 +1059,11 @@ mod tests { let startup_kp = Keypair::generate_ed25519(); let vault_dir = base_dir.join("..").join("vault"); let keypairs_dir = base_dir.join("..").join("keypairs"); + let workers_dir = base_dir.join("..").join("workers"); let max_heap_size = server_config::default_module_max_heap_size(); let key_manager = KeyManager::new( keypairs_dir, + workers_dir, root_keypair.clone().into(), management_pid, to_peer_id(&startup_kp), diff --git a/sorcerer/src/sorcerer.rs b/sorcerer/src/sorcerer.rs index 6fa0ea1eb4..30aa90b50e 100644 --- a/sorcerer/src/sorcerer.rs +++ b/sorcerer/src/sorcerer.rs @@ -26,7 +26,10 @@ use crate::spell_builtins::{ get_spell_arg, get_spell_id, spell_install, spell_list, spell_remove, spell_update_config, store_error, store_response, }; -use crate::worker_builins::{create_worker, get_worker_peer_id, remove_worker, worker_list}; +use crate::worker_builins::{ + activate_deal, create_worker, deactivate_deal, get_worker_peer_id, is_deal_active, + remove_worker, worker_list, +}; use aquamarine::AquamarineApi; use key_manager::KeyManager; use particle_args::JError; @@ -51,6 +54,7 @@ pub struct Sorcerer { pub key_manager: KeyManager, pub spell_service_api: SpellServiceApi, pub spell_metrics: Option, + pub worker_period_sec: u32, } impl Sorcerer { @@ -77,6 +81,7 @@ impl Sorcerer { key_manager, spell_service_api, spell_metrics, + worker_period_sec: config.system_services.decider.worker_period_sec, }; let mut builtin_functions = sorcerer.make_spell_builtins(); @@ -200,10 +205,12 @@ impl Sorcerer { CustomService::new( vec![ ("create", self.make_worker_create_closure()), - ("get_peer_id", self.make_worker_get_peer_id_closure()), // TODO: will be DEPRECATED soon ("get_worker_id", self.make_worker_get_worker_id_closure()), ("remove", self.make_worker_remove_closure()), ("list", self.make_worker_list_closure()), + ("activate", self.make_activate_deal_closure()), + ("deactivate", self.make_deactivate_deal_closure()), + ("is_active", self.make_is_deal_active_closure()), ], None, ), @@ -335,31 +342,19 @@ impl Sorcerer { let key_manager = self.key_manager.clone(); ServiceFunction::Immut(Box::new(move |args, params| { let key_manager = key_manager.clone(); - async move { wrap(create_worker(args, params, key_manager)) }.boxed() - })) - } - - // TODO: will be DEPRECATED soon - fn make_worker_get_peer_id_closure(&self) -> ServiceFunction { - let key_manager = self.key_manager.clone(); - ServiceFunction::Immut(Box::new(move |args, params| { - let key_manager = key_manager.clone(); - async move { wrap(get_worker_peer_id(args, params, key_manager)) }.boxed() + async move { + tokio::task::spawn_blocking(move || wrap(create_worker(args, params, key_manager))) + .await? + } + .boxed() })) } fn make_worker_get_worker_id_closure(&self) -> ServiceFunction { let key_manager = self.key_manager.clone(); - ServiceFunction::Immut(Box::new(move |args, params| { + ServiceFunction::Immut(Box::new(move |args, _| { let key_manager = key_manager.clone(); - async move { - wrap(crate::worker_builins::get_worker_peer_id_opt( - args, - params, - key_manager, - )) - } - .boxed() + async move { wrap(get_worker_peer_id(args, key_manager)) }.boxed() })) } @@ -388,4 +383,74 @@ impl Sorcerer { .boxed() })) } + + fn make_activate_deal_closure(&self) -> ServiceFunction { + let key_manager = self.key_manager.clone(); + let services = self.services.clone(); + let spell_event_bus_api = self.spell_event_bus_api.clone(); + let spells_api = self.spell_service_api.clone(); + let worker_period_sec = self.worker_period_sec; + ServiceFunction::Immut(Box::new(move |args, params| { + let services = services.clone(); + let spell_event_bus_api = spell_event_bus_api.clone(); + let key_manager = key_manager.clone(); + let spells_api = spells_api.clone(); + + async move { + wrap_unit( + activate_deal( + args, + params, + key_manager, + services, + spell_event_bus_api, + spells_api, + worker_period_sec, + ) + .await, + ) + } + .boxed() + })) + } + + fn make_deactivate_deal_closure(&self) -> ServiceFunction { + let key_manager = self.key_manager.clone(); + let spell_storage = self.spell_storage.clone(); + let spell_event_bus_api = self.spell_event_bus_api.clone(); + let spells_api = self.spell_service_api.clone(); + + ServiceFunction::Immut(Box::new(move |args, params| { + let key_manager = key_manager.clone(); + let spells_api = spells_api.clone(); + let spell_storage = spell_storage.clone(); + let spell_event_bus_api = spell_event_bus_api.clone(); + + async move { + wrap_unit( + deactivate_deal( + args, + params, + key_manager, + spell_storage, + spell_event_bus_api, + spells_api, + ) + .await, + ) + } + .boxed() + })) + } + + fn make_is_deal_active_closure(&self) -> ServiceFunction { + let key_manager = self.key_manager.clone(); + ServiceFunction::Immut(Box::new(move |args, _| { + let key_manager = key_manager.clone(); + async move { + tokio::task::spawn_blocking(move || wrap(is_deal_active(args, key_manager))).await? + } + .boxed() + })) + } } diff --git a/sorcerer/src/spell_builtins.rs b/sorcerer/src/spell_builtins.rs index 2982960199..cf19025e63 100644 --- a/sorcerer/src/spell_builtins.rs +++ b/sorcerer/src/spell_builtins.rs @@ -207,10 +207,7 @@ pub(crate) fn spell_list( ) -> Result { Ok(Array( spell_storage - .get_registered_spells() - .get(¶ms.host_id) - .cloned() - .unwrap_or_default() + .get_registered_spells_by(params.host_id) .into_iter() .map(JValue::String) .collect(), diff --git a/sorcerer/src/worker_builins.rs b/sorcerer/src/worker_builins.rs index d697667db9..45f14ae3b3 100644 --- a/sorcerer/src/worker_builins.rs +++ b/sorcerer/src/worker_builins.rs @@ -14,15 +14,19 @@ * limitations under the License. */ use fluence_libp2p::PeerId; +use fluence_spell_dtos::trigger_config::TriggerConfig; +use futures::TryFutureExt; use serde_json::Value as JValue; use std::str::FromStr; +use std::time::Duration; use crate::spell_builtins::remove_spell; use key_manager::KeyManager; use particle_args::{Args, JError}; use particle_execution::ParticleParams; use particle_services::ParticleAppServices; -use spell_event_bus::api::SpellEventBusApi; +use spell_event_bus::api::{from_user_config, SpellEventBusApi}; +use spell_service_api::{CallParams, SpellServiceApi}; use spell_storage::SpellStorage; pub(crate) fn create_worker( @@ -31,7 +35,7 @@ pub(crate) fn create_worker( key_manager: KeyManager, ) -> Result { let mut args = args.function_args.into_iter(); - let deal_id: Option = Args::next_opt("deal_id", &mut args)?; + let deal_id: String = Args::next("deal_id", &mut args)?; Ok(JValue::String( key_manager .create_worker(deal_id, params.init_peer_id)? @@ -39,32 +43,13 @@ pub(crate) fn create_worker( )) } -pub(crate) fn get_worker_peer_id( - args: Args, - params: ParticleParams, - key_manager: KeyManager, -) -> Result { +pub(crate) fn get_worker_peer_id(args: Args, key_manager: KeyManager) -> Result { let mut args = args.function_args.into_iter(); - let deal_id: Option = Args::next_opt("deal_id", &mut args)?; - - Ok(JValue::String( - key_manager - .get_worker_id(deal_id, params.init_peer_id)? - .to_base58(), - )) -} - -pub(crate) fn get_worker_peer_id_opt( - args: Args, - params: ParticleParams, - key_manager: KeyManager, -) -> Result { - let mut args = args.function_args.into_iter(); - let deal_id: Option = Args::next_opt("deal_id", &mut args)?; + let deal_id: String = Args::next("deal_id", &mut args)?; Ok(JValue::Array( key_manager - .get_worker_id(deal_id, params.init_peer_id) + .get_worker_id(deal_id) .map(|id| vec![JValue::String(id.to_base58())]) .unwrap_or_default(), )) @@ -87,13 +72,9 @@ pub(crate) async fn remove_worker( return Err(JError::new(format!("Worker {worker_id} can be removed only by worker creator {worker_creator} or worker itself"))); } - key_manager.remove_worker(worker_id)?; + tokio::task::spawn_blocking(move || key_manager.remove_worker(worker_id)).await??; - let spells: Vec<_> = spell_storage - .get_registered_spells() - .get(&worker_id) - .cloned() - .unwrap_or_default(); + let spells: Vec<_> = spell_storage.get_registered_spells_by(worker_id); for s in spells { remove_spell( @@ -104,12 +85,12 @@ pub(crate) async fn remove_worker( &s, worker_id, ) - .await .map_err(|e| { JError::new(format!( "Worker removing failed due to spell removing failure: {e}" )) - })?; + }) + .await?; } services.remove_services(worker_id)?; @@ -125,3 +106,126 @@ pub(crate) fn worker_list(key_manager: KeyManager) -> Result { .collect(), )) } + +pub(crate) async fn deactivate_deal( + args: Args, + params: ParticleParams, + key_manager: KeyManager, + spell_storage: SpellStorage, + spell_event_bus_api: SpellEventBusApi, + spell_service_api: SpellServiceApi, +) -> Result<(), JError> { + let mut args = args.function_args.into_iter(); + let deal_id: String = Args::next("deal_id", &mut args)?; + + if !key_manager.is_management(params.init_peer_id) && !key_manager.is_host(params.init_peer_id) + { + return Err(JError::new(format!( + "Only management or host peer can deactivate deal" + ))); + } + + let worker_id = key_manager.get_worker_id(deal_id)?; + + if !key_manager.is_worker_active(worker_id) { + return Err(JError::new("Deal has already been deactivated")); + } + + let spells = spell_storage.get_registered_spells_by(worker_id); + + for spell_id in spells.into_iter() { + spell_event_bus_api + .unsubscribe(spell_id.clone()) + .map_err(|e| { + JError::new(format!( + "Deal deactivation failed due to failure to stop spell {spell_id} : {e}" + )) + }) + .await?; + + spell_service_api + .set_trigger_config( + CallParams::local( + spell_id.clone(), + worker_id, + Duration::from_millis(params.ttl as u64), + ), + TriggerConfig::default(), + ) + .map_err(|e| { + JError::new(format!( + "Deal deactivation failed due to failure to stop spell {spell_id} : {e}" + )) + })?; + } + + tokio::task::spawn_blocking(move || key_manager.deactivate_worker(worker_id)).await??; + + Ok(()) +} + +pub(crate) async fn activate_deal( + args: Args, + params: ParticleParams, + key_manager: KeyManager, + services: ParticleAppServices, + spell_event_bus_api: SpellEventBusApi, + spell_service_api: SpellServiceApi, + worker_period_sec: u32, +) -> Result<(), JError> { + let mut args = args.function_args.into_iter(); + let deal_id: String = Args::next("deal_id", &mut args)?; + + if !key_manager.is_management(params.init_peer_id) && !key_manager.is_host(params.init_peer_id) + { + return Err(JError::new(format!( + "Only management or host peer can activate deal" + ))); + } + + let worker_id = key_manager.get_worker_id(deal_id)?; + + if key_manager.is_worker_active(worker_id) { + return Err(JError::new("Deal has already been activated")); + } + + let installation_spell_id = + services.resolve_alias(¶ms.id, worker_id, "worker-spell".to_string())?; + + // same as in decider-distro + let mut worker_config = TriggerConfig::default(); + worker_config.clock.start_sec = 1; + worker_config.clock.period_sec = worker_period_sec; + + spell_service_api.set_trigger_config( + CallParams::local( + installation_spell_id.clone(), + worker_id, + Duration::from_millis(params.ttl as u64), + ), + worker_config.clone(), + )?; + + let trigger_config = from_user_config(&worker_config)?.ok_or(JError::new(format!( + "Deal activation failed due to failure to parse trigger config" + )))?; + + spell_event_bus_api + .subscribe(installation_spell_id, trigger_config) + .map_err(|e| { + JError::new(format!( + "Deal activation failed due to failure to start worker spell : {e}" + )) + }) + .await?; + + tokio::task::spawn_blocking(move || key_manager.activate_worker(worker_id)).await??; + Ok(()) +} + +pub(crate) fn is_deal_active(args: Args, key_manager: KeyManager) -> Result { + let mut args = args.function_args.into_iter(); + let deal_id: String = Args::next("deal_id", &mut args)?; + let worker_id = key_manager.get_worker_id(deal_id)?; + Ok(JValue::Bool(key_manager.is_worker_active(worker_id))) +} diff --git a/spell-storage/src/storage.rs b/spell-storage/src/storage.rs index bb34cd1ab8..8ae9b8dc58 100644 --- a/spell-storage/src/storage.rs +++ b/spell-storage/src/storage.rs @@ -117,6 +117,14 @@ impl SpellStorage { self.registered_spells.read().clone() } + pub fn get_registered_spells_by(&self, worker_id: PeerId) -> Vec { + self.registered_spells + .read() + .get(&worker_id) + .cloned() + .unwrap_or_default() + } + pub fn get_blueprint(&self) -> String { self.spell_blueprint_id.clone() }