Skip to content

Commit

Permalink
feat(worker): add worker.remove [fixes NET-354 NET-376] (#1499)
Browse files Browse the repository at this point in the history
* feat(worker): add worker.remove

* restore worker info after restart

* fix

* fix clippy warn, mark 1 test as flaky

* Update crates/particle-node-tests/tests/spells.rs

* refactoring

* pr fixes

* fix clippy warning

* Update spell-storage/src/storage.rs

Co-authored-by: folex <0xdxdy@gmail.com>

* pr fixes

---------

Co-authored-by: folex <0xdxdy@gmail.com>
  • Loading branch information
justprosh and folex committed Mar 14, 2023
1 parent 9cfe617 commit 97f552f
Show file tree
Hide file tree
Showing 23 changed files with 541 additions and 206 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions aquamarine/src/aquamarine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
Poll::Ready(Some(AddService {
service,
functions,
unhandled,
})) => self.plumber.add_service(service, functions, unhandled),
fallback,
})) => self.plumber.add_service(service, functions, fallback),

Poll::Ready(Some(RemoveService { service })) => {
self.plumber.remove_service(service)
Expand Down Expand Up @@ -158,7 +158,7 @@ impl AquamarineApi {
AddService {
service,
functions,
unhandled: None,
fallback: None,
},
None,
)
Expand Down
2 changes: 1 addition & 1 deletion aquamarine/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum Command {
AddService {
service: String,
functions: HashMap<String, ServiceFunction>,
unhandled: Option<ServiceFunction>,
fallback: Option<ServiceFunction>,
},
RemoveService {
service: String,
Expand Down
6 changes: 3 additions & 3 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
&self,
service: String,
functions: HashMap<String, ServiceFunction>,
unhandled: Option<ServiceFunction>,
fallback: Option<ServiceFunction>,
) {
self.builtins.extend(service, functions, unhandled)
self.builtins.extend(service, functions, fallback)
}

pub fn remove_service(&self, service: String) {
Expand Down Expand Up @@ -310,7 +310,7 @@ mod tests {
&self,
_service: String,
_functions: HashMap<String, ServiceFunction>,
_unhandled: Option<ServiceFunction>,
_fallback: Option<ServiceFunction>,
) {
todo!()
}
Expand Down
6 changes: 6 additions & 0 deletions crates/key-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ pub enum KeyManagerError {
#[source]
err: std::io::Error,
},
#[error("Error removing persisted keypair {path:?}: {err}")]
RemoveErrorPersistedKeypair {
path: PathBuf,
#[source]
err: std::io::Error,
},
#[error("Error creating directory for persisted keypairs {path:?}: {err}")]
CreateKeypairsDir {
path: PathBuf,
Expand Down
78 changes: 64 additions & 14 deletions crates/key-manager/src/key_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,29 @@ use std::str::FromStr;
use std::sync::Arc;

use crate::error::KeyManagerError;
use crate::persistence::{load_persisted_keypairs, persist_keypair, PersistedKeypair};
use crate::persistence::{
load_persisted_keypairs, persist_keypair, remove_keypair, PersistedKeypair,
};
use crate::KeyManagerError::{WorkerAlreadyExists, WorkerNotFound, WorkerNotFoundByDeal};
use parking_lot::RwLock;

pub const INSECURE_KEYPAIR_SEED: Range<u8> = 0..32;

type DealId = String;
type WorkerId = PeerId;

#[derive(Clone)]
pub struct WorkerInfo {
pub deal_id: String,
pub creator: PeerId,
}

#[derive(Clone)]
pub struct KeyManager {
/// worker_id -> worker_keypair
worker_keypairs: Arc<RwLock<HashMap<PeerId, KeyPair>>>,
/// deal_id -> worker_id
worker_ids: Arc<RwLock<HashMap<DealId, PeerId>>>,
/// worker_id -> init_peer_id of worker creator
worker_creators: Arc<RwLock<HashMap<PeerId, PeerId>>>,
worker_keypairs: Arc<RwLock<HashMap<WorkerId, KeyPair>>>,
worker_ids: Arc<RwLock<HashMap<DealId, WorkerId>>>,
worker_infos: Arc<RwLock<HashMap<WorkerId, WorkerInfo>>>,
keypairs_dir: PathBuf,
host_peer_id: PeerId,
// temporary public, will refactor
Expand All @@ -58,7 +65,7 @@ impl KeyManager {
let this = Self {
worker_keypairs: Arc::new(Default::default()),
worker_ids: Arc::new(Default::default()),
worker_creators: Arc::new(Default::default()),
worker_infos: Arc::new(Default::default()),
keypairs_dir,
host_peer_id: root_keypair.get_peer_id(),
insecure_keypair: KeyPair::from_secret_key(
Expand All @@ -85,12 +92,20 @@ impl KeyManager {
persisted_kp.private_key_bytes,
KeyFormat::from_str(&persisted_kp.key_format)?,
)?;
let peer_id = keypair.get_peer_id();
let worker_id = keypair.get_peer_id();
self.worker_ids
.write()
.insert(persisted_kp.deal_id, keypair.get_peer_id());
.insert(persisted_kp.deal_id.clone(), keypair.get_peer_id());

self.worker_keypairs.write().insert(peer_id, keypair);
self.worker_keypairs.write().insert(worker_id, keypair);

self.worker_infos.write().insert(
worker_id,
WorkerInfo {
deal_id: persisted_kp.deal_id,
creator: persisted_kp.deal_creator,
},
);
};

if let Err(e) = res {
Expand Down Expand Up @@ -142,14 +157,42 @@ impl KeyManager {
}
}

pub fn get_worker_id(&self, deal_id: String) -> Result<PeerId, KeyManagerError> {
pub fn get_worker_id(
&self,
deal_id: Option<String>,
init_peer_id: PeerId,
) -> Result<PeerId, KeyManagerError> {
// if deal_id is not provided, we associate it with init_peer_id
let deal_id = deal_id.unwrap_or(Self::generate_deal_id(init_peer_id));
self.worker_ids
.read()
.get(&deal_id)
.cloned()
.ok_or(WorkerNotFoundByDeal(deal_id))
}

pub fn get_deal_id(&self, worker_id: PeerId) -> Result<DealId, KeyManagerError> {
self.worker_infos
.read()
.get(&worker_id)
.ok_or_else(|| KeyManagerError::WorkerNotFound(worker_id))
.map(|info| info.deal_id.clone())
}

pub fn remove_worker(&self, worker_id: PeerId) -> Result<(), KeyManagerError> {
let deal_id = self.get_deal_id(worker_id)?;
remove_keypair(&self.keypairs_dir, &deal_id)?;
let removed_worker_id = self.worker_ids.write().remove(&deal_id);
let removed_worker_info = self.worker_infos.write().remove(&worker_id);
let removed_worker_kp = self.worker_keypairs.write().remove(&worker_id);

debug_assert!(removed_worker_id.is_some(), "worker_id does not exist");
debug_assert!(removed_worker_info.is_some(), "worker info does not exist");
debug_assert!(removed_worker_kp.is_some(), "worker kp does not exist");

Ok(())
}

pub fn get_worker_keypair(&self, worker_id: PeerId) -> Result<KeyPair, KeyManagerError> {
if self.is_host(worker_id) {
Ok(self.root_keypair.clone())
Expand All @@ -166,11 +209,12 @@ impl KeyManager {
if self.is_host(worker_id) {
Ok(worker_id)
} else {
self.worker_creators
self.worker_infos
.read()
.get(&worker_id)
.cloned()
.ok_or(WorkerNotFound(worker_id))
.map(|i| i.creator)
}
}

Expand All @@ -189,8 +233,14 @@ impl KeyManager {
PersistedKeypair::new(deal_creator, &keypair, deal_id.clone())?,
)?;
let worker_id = keypair.get_peer_id();
self.worker_ids.write().insert(deal_id, worker_id);
self.worker_creators.write().insert(worker_id, deal_creator);
self.worker_ids.write().insert(deal_id.clone(), worker_id);
self.worker_infos.write().insert(
worker_id,
WorkerInfo {
deal_id,
creator: deal_creator,
},
);
self.worker_keypairs.write().insert(worker_id, keypair);

Ok(())
Expand Down
10 changes: 8 additions & 2 deletions crates/key-manager/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::error::KeyManagerError::{
ReadPersistedKeypair, SerializePersistedKeypair, WriteErrorPersistedKeypair,
};
use crate::KeyManager;
use crate::KeyManagerError::RemoveErrorPersistedKeypair;
use fluence_keypair::KeyPair;
use fluence_libp2p::peerid_serializer;
use libp2p::PeerId;
Expand Down Expand Up @@ -54,8 +55,8 @@ impl PersistedKeypair {
}
}

pub fn keypair_file_name(remote_peer_id: &str) -> String {
format!("{remote_peer_id}_keypair.toml")
pub fn keypair_file_name(deal_id: &str) -> String {
format!("{deal_id}_keypair.toml")
}

pub fn is_keypair(path: &Path) -> bool {
Expand Down Expand Up @@ -117,3 +118,8 @@ pub fn load_persisted_keypairs(
})
.collect()
}

pub fn remove_keypair(keypairs_dir: &Path, deal_id: &str) -> Result<(), KeyManagerError> {
let path = keypairs_dir.join(keypair_file_name(deal_id));
std::fs::remove_file(path.clone()).map_err(|err| RemoveErrorPersistedKeypair { path, err })
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ fn explore_services() {
}

#[test]
fn explore_services_fixed() {
fn explore_services_fixed_flaky() {
let swarms = make_swarms(5);
sleep(KAD_TIMEOUT);

Expand Down
80 changes: 79 additions & 1 deletion crates/particle-node-tests/tests/spells.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use created_swarm::{make_swarms, make_swarms_with_builtins};
use fluence_spell_dtos::trigger_config::TriggerConfig;
use service_modules::load_module;
use spell_event_bus::api::{TriggerInfo, TriggerInfoAqua, MAX_PERIOD_SEC};
use test_utils::create_service;
use test_utils::{create_service, create_service_worker};

type SpellId = String;
type WorkerPeerId = String;
Expand Down Expand Up @@ -1476,3 +1476,81 @@ fn spell_create_worker_same_deal_id_different_peer() {
let error_msg = response[0].as_str().unwrap().to_string();
assert!(error_msg.contains("Worker for deal_id already exists"));
}

#[test]
fn create_remove_worker() {
let swarms = make_swarms(1);
let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.wrap_err("connect client")
.unwrap();

let script = r#"(call %init_peer_id% ("getDataSrv" "spell_id") [] spell_id)"#;
let mut config = TriggerConfig::default();
config.clock.period_sec = 0;
config.clock.start_sec = 1;

let (spell_id, worker_id) = create_spell(&mut client, &script, config, json!({}));
let service = create_service_worker(
&mut client,
"file_share",
load_module("tests/file_share/artifacts", "file_share").expect("load module"),
worker_id.clone(),
);
let data = hashmap! {
"client" => json!(client.peer_id.to_string()),
"relay" => json!(client.node.to_string()),
"worker_id" => json!(worker_id.clone()),
"spell_id" => json!(spell_id.clone()),
"srv_id" => json!(service.id.clone()),
};
client.send_particle(
r#"
(xor
(seq
(seq
(call relay ("op" "noop") [])
(call worker_id ("srv" "list") [] before)
)
(seq
(seq
(call relay ("worker" "remove") [worker_id])
(xor
(call relay ("srv" "info") [spell_id] info1)
(call relay ("op" "identity") [%last_error%.$.message] err1)
)
)
(seq
(xor
(call relay ("srv" "info") [srv_id] info2)
(call relay ("op" "identity") [%last_error%.$.message] err2)
)
(call client ("return" "") [before err1 err2])
)
)
)
(call client ("return" "") [%last_error%.$.message])
)
"#,
data.clone(),
);

if let [JValue::Array(before), JValue::String(spell_err), JValue::String(srv_err)] = client
.receive_args()
.wrap_err("receive")
.unwrap()
.as_slice()
{
assert_eq!(before.len(), 2);

let before: Vec<String> = before
.into_iter()
.map(|s| s.get("id").unwrap().as_str().unwrap().to_string())
.collect();
assert!(before.contains(&spell_id));
assert!(before.contains(&service.id));
assert!(spell_err.contains(&format!("Service with id '{spell_id}' not found")));
assert!(srv_err.contains(&format!("Service with id '{}' not found", service.id)));
} else {
panic!("expected one string result")
}
}
12 changes: 11 additions & 1 deletion crates/test-utils/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ pub fn create_service(
client: &mut ConnectedClient,
module_name: &str,
module_bytes: Vec<u8>,
) -> CreatedService {
create_service_worker(client, module_name, module_bytes, client.node.to_string())
}

pub fn create_service_worker(
client: &mut ConnectedClient,
module_name: &str,
module_bytes: Vec<u8>,
worker_id: String,
) -> CreatedService {
let script = f!(r#"
(seq
Expand All @@ -43,7 +52,7 @@ pub fn create_service(
(call relay ("dist" "add_blueprint") [blueprint] blueprint_id)
)
(seq
(call relay ("srv" "create") [blueprint_id] service_id)
(call worker_id ("srv" "create") [blueprint_id] service_id)
(call client ("return" "") [service_id] client_result)
)
)
Expand All @@ -53,6 +62,7 @@ pub fn create_service(
let data = hashmap! {
"client" => json!(client.peer_id.to_string()),
"relay" => json!(client.node.to_string()),
"worker_id" => json!(worker_id),
"module_name" => json!(module_name),
"module_bytes" => json!(base64.encode(module_bytes)),
"name" => json!("blueprint"),
Expand Down
Loading

0 comments on commit 97f552f

Please sign in to comment.