Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worker): add worker.remove [fixes NET-354 NET-376] #1499

Merged
merged 14 commits into from
Mar 14, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/key-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ pub enum KeyManagerError {
#[source]
err: std::io::Error,
},
#[error("Error removing persisted keypair {path:?}: {err}")]
RemoveErrorPersistedKeypair {
path: PathBuf,
#[source]
err: std::io::Error,
},
#[error("Error creating directory for persisted keypairs {path:?}: {err}")]
CreateKeypairsDir {
path: PathBuf,
Expand Down
74 changes: 60 additions & 14 deletions crates/key-manager/src/key_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,29 @@ use std::str::FromStr;
use std::sync::Arc;

use crate::error::KeyManagerError;
use crate::persistence::{load_persisted_keypairs, persist_keypair, PersistedKeypair};
use crate::persistence::{
load_persisted_keypairs, persist_keypair, remove_keypair, PersistedKeypair,
};
use crate::KeyManagerError::{WorkerAlreadyExists, WorkerNotFound, WorkerNotFoundByDeal};
use parking_lot::RwLock;

pub const INSECURE_KEYPAIR_SEED: Range<u8> = 0..32;

type DealId = String;
type WorkerId = PeerId;

#[derive(Clone)]
pub struct WorkerInfo {
pub deal_id: String,
pub creator: PeerId,
}

#[derive(Clone)]
pub struct KeyManager {
/// worker_id -> worker_keypair
worker_keypairs: Arc<RwLock<HashMap<PeerId, KeyPair>>>,
/// deal_id -> worker_id
worker_ids: Arc<RwLock<HashMap<DealId, PeerId>>>,
/// worker_id -> init_peer_id of worker creator
worker_creators: Arc<RwLock<HashMap<PeerId, PeerId>>>,
worker_keypairs: Arc<RwLock<HashMap<WorkerId, KeyPair>>>,
worker_ids: Arc<RwLock<HashMap<DealId, WorkerId>>>,
worker_infos: Arc<RwLock<HashMap<WorkerId, WorkerInfo>>>,
keypairs_dir: PathBuf,
host_peer_id: PeerId,
// temporary public, will refactor
Expand All @@ -58,7 +65,7 @@ impl KeyManager {
let this = Self {
worker_keypairs: Arc::new(Default::default()),
worker_ids: Arc::new(Default::default()),
worker_creators: Arc::new(Default::default()),
worker_infos: Arc::new(Default::default()),
keypairs_dir,
host_peer_id: root_keypair.get_peer_id(),
insecure_keypair: KeyPair::from_secret_key(
Expand All @@ -85,12 +92,20 @@ impl KeyManager {
persisted_kp.private_key_bytes,
KeyFormat::from_str(&persisted_kp.key_format)?,
)?;
let peer_id = keypair.get_peer_id();
let worker_id = keypair.get_peer_id();
self.worker_ids
.write()
.insert(persisted_kp.deal_id, keypair.get_peer_id());
.insert(persisted_kp.deal_id.clone(), keypair.get_peer_id());

self.worker_keypairs.write().insert(peer_id, keypair);
self.worker_keypairs.write().insert(worker_id, keypair);

self.worker_infos.write().insert(
worker_id,
WorkerInfo {
deal_id: persisted_kp.deal_id,
creator: persisted_kp.deal_creator,
},
);
};

if let Err(e) = res {
Expand Down Expand Up @@ -142,14 +157,38 @@ impl KeyManager {
}
}

pub fn get_worker_id(&self, deal_id: String) -> Result<PeerId, KeyManagerError> {
pub fn get_worker_id(
&self,
deal_id: Option<String>,
init_peer_id: PeerId,
) -> Result<PeerId, KeyManagerError> {
// if deal_id is not provided, we associate it with init_peer_id
let deal_id = deal_id.unwrap_or(Self::generate_deal_id(init_peer_id));
self.worker_ids
.read()
.get(&deal_id)
.cloned()
.ok_or(WorkerNotFoundByDeal(deal_id))
}

pub fn get_deal_id(&self, worker_id: PeerId) -> Result<DealId, KeyManagerError> {
self.worker_infos
.read()
.get(&worker_id)
.ok_or_else(|| KeyManagerError::WorkerNotFound(worker_id))
.map(|info| info.deal_id.clone())
}

pub fn remove_worker(&self, worker_id: PeerId) -> Result<(), KeyManagerError> {
let deal_id = self.get_deal_id(worker_id)?;
remove_keypair(&self.keypairs_dir, &deal_id)?;
self.worker_ids.write().remove(&deal_id);
self.worker_infos.write().remove(&worker_id);
self.worker_keypairs.write().remove(&worker_id);
justprosh marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}

pub fn get_worker_keypair(&self, worker_id: PeerId) -> Result<KeyPair, KeyManagerError> {
if self.is_host(worker_id) {
Ok(self.root_keypair.clone())
Expand All @@ -166,11 +205,12 @@ impl KeyManager {
if self.is_host(worker_id) {
Ok(worker_id)
} else {
self.worker_creators
self.worker_infos
.read()
.get(&worker_id)
.cloned()
.ok_or(WorkerNotFound(worker_id))
.map(|i| i.creator)
}
}

Expand All @@ -189,8 +229,14 @@ impl KeyManager {
PersistedKeypair::new(deal_creator, &keypair, deal_id.clone())?,
)?;
let worker_id = keypair.get_peer_id();
self.worker_ids.write().insert(deal_id, worker_id);
self.worker_creators.write().insert(worker_id, deal_creator);
self.worker_ids.write().insert(deal_id.clone(), worker_id);
self.worker_infos.write().insert(
worker_id,
WorkerInfo {
deal_id,
creator: deal_creator,
},
);
self.worker_keypairs.write().insert(worker_id, keypair);

Ok(())
Expand Down
10 changes: 8 additions & 2 deletions crates/key-manager/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::error::KeyManagerError::{
ReadPersistedKeypair, SerializePersistedKeypair, WriteErrorPersistedKeypair,
};
use crate::KeyManager;
use crate::KeyManagerError::RemoveErrorPersistedKeypair;
use fluence_keypair::KeyPair;
use fluence_libp2p::peerid_serializer;
use libp2p::PeerId;
Expand Down Expand Up @@ -54,8 +55,8 @@ impl PersistedKeypair {
}
}

pub fn keypair_file_name(remote_peer_id: &str) -> String {
format!("{remote_peer_id}_keypair.toml")
pub fn keypair_file_name(deal_id: &str) -> String {
format!("{deal_id}_keypair.toml")
}

pub fn is_keypair(path: &Path) -> bool {
Expand Down Expand Up @@ -117,3 +118,8 @@ pub fn load_persisted_keypairs(
})
.collect()
}

pub fn remove_keypair(keypairs_dir: &Path, deal_id: &str) -> Result<(), KeyManagerError> {
let path = keypairs_dir.join(keypair_file_name(deal_id));
std::fs::remove_file(path.clone()).map_err(|err| RemoveErrorPersistedKeypair { path, err })
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ fn explore_services() {
}

#[test]
fn explore_services_fixed() {
fn explore_services_fixed_flaky() {
let swarms = make_swarms(5);
sleep(KAD_TIMEOUT);

Expand Down
81 changes: 80 additions & 1 deletion crates/particle-node-tests/tests/spells.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use created_swarm::{make_swarms, make_swarms_with_builtins};
use fluence_spell_dtos::trigger_config::TriggerConfig;
use service_modules::load_module;
use spell_event_bus::api::{TriggerInfo, TriggerInfoAqua, MAX_PERIOD_SEC};
use test_utils::create_service;
use test_utils::{create_service, create_service_worker};

type SpellId = String;
type WorkerPeerId = String;
Expand Down Expand Up @@ -1476,3 +1476,82 @@ fn spell_create_worker_same_deal_id_different_peer() {
let error_msg = response[0].as_str().unwrap().to_string();
assert!(error_msg.contains("Worker for deal_id already exists"));
}

#[test]
fn create_remove_worker() {
justprosh marked this conversation as resolved.
Show resolved Hide resolved
let swarms = make_swarms(1);
let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.wrap_err("connect client")
.unwrap();

let script = r#"(call %init_peer_id% ("getDataSrv" "spell_id") [] spell_id)"#;
let mut config = TriggerConfig::default();
config.clock.period_sec = 0;
config.clock.start_sec = 1;

let (spell_id, worker_id) = create_spell(&mut client, &script, config, json!({}));
let service = create_service_worker(
&mut client,
"file_share",
load_module("tests/file_share/artifacts", "file_share").expect("load module"),
worker_id.clone(),
);
let data = hashmap! {
"client" => json!(client.peer_id.to_string()),
"relay" => json!(client.node.to_string()),
"worker_id" => json!(worker_id.clone()),
"spell_id" => json!(spell_id.clone()),
"srv_id" => json!(service.id.clone()),
};
client.send_particle(
r#"
(xor
(seq
(seq
(call relay ("op" "noop") [])
(call worker_id ("srv" "list") [] before)
)
(seq
(seq
(call relay ("worker" "remove") [worker_id])
(xor
(call relay ("srv" "info") [spell_id] info1)
(call relay ("op" "identity") [%last_error%.$.message] err1)
)
)
(seq
(xor
(call relay ("srv" "info") [srv_id] info2)
(call relay ("op" "identity") [%last_error%.$.message] err2)
)
(call client ("return" "") [before err1 err2])
)
)
)
(call client ("return" "") [%last_error%.$.message])
)
"#,
data.clone(),
);

if let [JValue::Array(before), JValue::String(spell_err), JValue::String(srv_err)] = client
.receive_args()
.wrap_err("receive")
.unwrap()
.as_slice()
{
assert_eq!(before.len(), 2);
log::info!("RESULT {:?}", before);
justprosh marked this conversation as resolved.
Show resolved Hide resolved

let before: Vec<String> = before
.into_iter()
.map(|s| s.get("id").unwrap().as_str().unwrap().to_string())
.collect();
assert!(before.contains(&spell_id));
assert!(before.contains(&service.id));
assert!(spell_err.contains(&format!("Service with id '{spell_id}' not found")));
assert!(srv_err.contains(&format!("Service with id '{}' not found", service.id)));
} else {
panic!("expected one string result")
}
}
12 changes: 11 additions & 1 deletion crates/test-utils/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ pub fn create_service(
client: &mut ConnectedClient,
module_name: &str,
module_bytes: Vec<u8>,
) -> CreatedService {
create_service_worker(client, module_name, module_bytes, client.node.to_string())
}

pub fn create_service_worker(
client: &mut ConnectedClient,
module_name: &str,
module_bytes: Vec<u8>,
worker_id: String,
) -> CreatedService {
let script = f!(r#"
(seq
Expand All @@ -43,7 +52,7 @@ pub fn create_service(
(call relay ("dist" "add_blueprint") [blueprint] blueprint_id)
)
(seq
(call relay ("srv" "create") [blueprint_id] service_id)
(call worker_id ("srv" "create") [blueprint_id] service_id)
(call client ("return" "") [service_id] client_result)
)
)
Expand All @@ -53,6 +62,7 @@ pub fn create_service(
let data = hashmap! {
"client" => json!(client.peer_id.to_string()),
"relay" => json!(client.node.to_string()),
"worker_id" => json!(worker_id),
"module_name" => json!(module_name),
"module_bytes" => json!(base64.encode(module_bytes)),
"name" => json!("blueprint"),
Expand Down
23 changes: 0 additions & 23 deletions particle-builtins/src/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,6 @@ where
("json", "obj_pairs") => unary(args, |vs: Vec<(String, JValue)>| -> R<JValue, _> { json::obj_from_pairs(vs) }),
("json", "puts_pairs") => binary(args, |obj: JValue, vs: Vec<(String, JValue)>| -> R<JValue, _> { json::puts_from_pairs(obj, vs) }),

("worker", "create") => wrap(self.create_worker(args, particle)),
("worker", "get_peer_id") => wrap(self.get_worker_peer_id(args, particle)),

("run-console", "print") => wrap_unit(Ok(log::debug!(target: "run-console", "{}", json!(args.function_args)))),

_ => FunctionOutcome::NotDefined { args, params: particle },
Expand Down Expand Up @@ -1028,26 +1025,6 @@ where
self.key_manager.insecure_keypair.get_peer_id().to_base58(),
))
}

fn create_worker(&self, args: Args, params: ParticleParams) -> Result<JValue, JError> {
let mut args = args.function_args.into_iter();
let deal_id: Option<String> = Args::next_opt("deal_id", &mut args)?;
Ok(JValue::String(
self.key_manager
.create_worker(deal_id, params.init_peer_id)?
.to_base58(),
))
}

fn get_worker_peer_id(&self, args: Args, params: ParticleParams) -> Result<JValue, JError> {
let mut args = args.function_args.into_iter();
let deal_id: Option<String> = Args::next_opt("deal_id", &mut args)?;
let deal_id = deal_id.unwrap_or(KeyManager::generate_deal_id(params.init_peer_id));

Ok(JValue::String(
self.key_manager.get_worker_id(deal_id)?.to_base58(),
))
}
}

fn make_module_config(args: Args) -> Result<JValue, JError> {
Expand Down
Loading