Skip to content

Commit

Permalink
feat: add worker.create and worker.get_peer_id (#1475)
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh committed Feb 21, 2023
1 parent 8d21afa commit ddc2f90
Show file tree
Hide file tree
Showing 13 changed files with 408 additions and 181 deletions.
2 changes: 1 addition & 1 deletion aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct Actor<RT, F> {
/// Particle's data is empty.
particle: Particle,
/// Particles and call results will be processed in the security scope of this peer id
/// It's either `host_peer_id` or owner-specific spell peer id
/// It's either `host_peer_id` or local worker peer id
current_peer_id: PeerId,
}

Expand Down
24 changes: 14 additions & 10 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
&mut self,
particle: Particle,
function: Option<ServiceFunction>,
scope_peer_id: PeerId,
worker_id: PeerId,
) {
self.wake();

Expand All @@ -92,11 +92,11 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
let builtins = &self.builtins;
let actor = self
.actors
.entry((particle.id.clone(), scope_peer_id))
.entry((particle.id.clone(), worker_id))
.or_insert_with(|| {
let params = ParticleParams::clone_from(&particle, scope_peer_id);
let params = ParticleParams::clone_from(&particle, worker_id);
let functions = Functions::new(params, builtins.clone());
Actor::new(&particle, functions, scope_peer_id)
Actor::new(&particle, functions, worker_id)
});

actor.ingest(particle);
Expand Down Expand Up @@ -139,11 +139,11 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
for actor in self.actors.values_mut() {
if let Poll::Ready(result) = actor.poll_completed(cx) {
interpretation_stats.push(result.stats);
let (local_peers, remote_peers): (Vec<_>, Vec<_>) =
result.effects.next_peers.into_iter().partition(|p| {
key_manager.is_scope_peer_id(*p)
|| p.eq(&self.key_manager.get_host_peer_id())
});
let (local_peers, remote_peers): (Vec<_>, Vec<_>) = result
.effects
.next_peers
.into_iter()
.partition(|p| key_manager.is_local(*p));

if !remote_peers.is_empty() {
remote_effects.push(RoutingEffects {
Expand Down Expand Up @@ -375,7 +375,11 @@ mod tests {
// Pool is of size 1 so it's easier to control tests
let vm_pool = VmPool::new(1, (), None);
let builtin_mock = Arc::new(MockF);
let key_manager = KeyManager::new("keypair".into(), RandomPeerId::random());
let key_manager = KeyManager::new(
"keypair".into(),
RandomPeerId::random(),
RandomPeerId::random(),
);
Plumber::new(vm_pool, builtin_mock, None, key_manager)
}

Expand Down
12 changes: 7 additions & 5 deletions crates/key-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::path::PathBuf;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum PersistedKeypairError {
pub enum KeyManagerError {
#[error("Failed to persist keypair: RSA is not supported")]
CannotExtractRSASecretKey,
#[error("Error reading persisted keypair from {path:?}: {err}")]
Expand Down Expand Up @@ -51,10 +51,12 @@ pub enum PersistedKeypairError {
#[source]
err: std::io::Error,
},
}

#[derive(Debug, Error)]
pub enum KeyManagerError {
#[error("Keypair for peer_id {0} not found")]
KeypairNotFound(PeerId),
#[error("Worker for {deal_id} already exists")]
WorkerAlreadyExists { deal_id: String },
#[error("Worker for deal_id {0} not found")]
WorkerNotFoundByDeal(String),
#[error("Worker {0} not found")]
WorkerNotFound(PeerId),
}
127 changes: 80 additions & 47 deletions crates/key-manager/src/key_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,44 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;

use crate::error::{KeyManagerError, PersistedKeypairError};
use crate::error::KeyManagerError;
use crate::persistence::{load_persisted_keypairs, persist_keypair, PersistedKeypair};
use crate::KeyManagerError::{WorkerAlreadyExists, WorkerNotFound, WorkerNotFoundByDeal};
use parking_lot::RwLock;

pub const INSECURE_KEYPAIR_SEED: Range<u8> = 0..32;

type DealId = String;

#[derive(Clone)]
pub struct KeyManager {
/// scope_peer_id -> scope_keypair
scope_keypairs: Arc<RwLock<HashMap<PeerId, KeyPair>>>,
/// remote_peer_id -> scope_peer_id
scope_peer_ids: Arc<RwLock<HashMap<PeerId, PeerId>>>,
/// worker_id -> worker_keypair
worker_keypairs: Arc<RwLock<HashMap<PeerId, KeyPair>>>,
/// deal_id -> worker_id
worker_ids: Arc<RwLock<HashMap<DealId, PeerId>>>,
/// worker_id -> init_peer_id of worker creator
worker_creators: Arc<RwLock<HashMap<PeerId, PeerId>>>,
keypairs_dir: PathBuf,
host_peer_id: PeerId,
// temporary public, will refactor
pub insecure_keypair: KeyPair,
management_peer_id: PeerId,
}

impl KeyManager {
pub fn new(keypairs_dir: PathBuf, host_peer_id: PeerId) -> Self {
pub fn new(keypairs_dir: PathBuf, host_peer_id: PeerId, management_peer_id: PeerId) -> Self {
let this = Self {
scope_keypairs: Arc::new(Default::default()),
scope_peer_ids: Arc::new(Default::default()),
worker_keypairs: Arc::new(Default::default()),
worker_ids: Arc::new(Default::default()),
worker_creators: Arc::new(Default::default()),
keypairs_dir,
host_peer_id,
insecure_keypair: KeyPair::from_secret_key(
INSECURE_KEYPAIR_SEED.collect(),
KeyFormat::Ed25519,
)
.expect("error creating insecure keypair"),
management_peer_id,
};

this.load_persisted_keypairs();
Expand All @@ -69,11 +77,11 @@ impl KeyManager {
KeyFormat::from_str(&persisted_kp.key_format)?,
)?;
let peer_id = keypair.get_peer_id();
self.scope_peer_ids
self.worker_ids
.write()
.insert(persisted_kp.remote_peer_id, keypair.get_peer_id());
.insert(persisted_kp.deal_id, keypair.get_peer_id());

self.scope_keypairs.write().insert(peer_id, keypair);
self.worker_keypairs.write().insert(peer_id, keypair);
};

if let Err(e) = res {
Expand All @@ -82,45 +90,71 @@ impl KeyManager {
}
}

pub fn get_host_peer_id(&self) -> PeerId {
self.host_peer_id
pub fn is_local(&self, peer_id: PeerId) -> bool {
self.is_host(peer_id) || self.is_worker(peer_id)
}

pub fn has_keypair(&self, remote_peer_id: PeerId) -> bool {
self.scope_peer_ids.read().contains_key(&remote_peer_id)
pub fn is_host(&self, peer_id: PeerId) -> bool {
self.host_peer_id == peer_id
}

pub fn is_worker(&self, peer_id: PeerId) -> bool {
self.worker_keypairs.read().contains_key(&peer_id)
}

pub fn is_management(&self, peer_id: PeerId) -> bool {
self.management_peer_id == peer_id
}

pub fn get_host_peer_id(&self) -> PeerId {
self.host_peer_id
}

pub fn is_scope_peer_id(&self, scope_peer_id: PeerId) -> bool {
self.scope_keypairs.read().contains_key(&scope_peer_id)
pub fn generate_deal_id(init_peer_id: PeerId) -> String {
format!("direct_hosting_{init_peer_id}")
}

/// For local peer ids is identity,
/// for remote returns associated peer id or generate a new one.
pub fn get_scope_peer_id(&self, init_peer_id: PeerId) -> Result<PeerId, PersistedKeypairError> {
// All "nested" spells share the same keypair.
// "nested" means spells which are created by other spells
if self.is_scope_peer_id(init_peer_id) {
Ok(init_peer_id)
} else {
let scope_peer_id = self.scope_peer_ids.read().get(&init_peer_id).cloned();
match scope_peer_id {
Some(p) => Ok(p),
_ => {
let kp = self.generate_keypair();
let scope_peer_id = kp.get_peer_id();
self.store_keypair(init_peer_id, kp)?;
Ok(scope_peer_id)
}
pub fn create_worker(
&self,
deal_id: Option<String>,
init_peer_id: PeerId,
) -> Result<PeerId, KeyManagerError> {
// if deal_id is not provided, we associate it with init_peer_id
let deal_id = deal_id.unwrap_or(Self::generate_deal_id(init_peer_id));
let worker_id = self.worker_ids.read().get(&deal_id).cloned();
match worker_id {
Some(_) => Err(WorkerAlreadyExists { deal_id }),
_ => {
let kp = self.generate_keypair();
let worker_id = kp.get_peer_id();
self.store_keypair(deal_id, init_peer_id, kp)?;
Ok(worker_id)
}
}
}

pub fn get_scope_keypair(&self, scope_peer_id: PeerId) -> Result<KeyPair, KeyManagerError> {
self.scope_keypairs
pub fn get_worker_id(&self, deal_id: String) -> Result<PeerId, KeyManagerError> {
self.worker_ids
.read()
.get(&deal_id)
.cloned()
.ok_or(WorkerNotFoundByDeal(deal_id))
}

pub fn get_worker_keypair(&self, worker_id: PeerId) -> Result<KeyPair, KeyManagerError> {
self.worker_keypairs
.read()
.get(&scope_peer_id)
.get(&worker_id)
.cloned()
.ok_or(KeyManagerError::KeypairNotFound(scope_peer_id))
.ok_or(KeyManagerError::KeypairNotFound(worker_id))
}

pub fn get_worker_creator(&self, worker_id: PeerId) -> Result<PeerId, KeyManagerError> {
self.worker_creators
.read()
.get(&worker_id)
.cloned()
.ok_or(WorkerNotFound(worker_id))
}

pub fn generate_keypair(&self) -> KeyPair {
Expand All @@ -129,19 +163,18 @@ impl KeyManager {

pub fn store_keypair(
&self,
remote_peer_id: PeerId,
deal_id: DealId,
deal_creator: PeerId,
keypair: KeyPair,
) -> Result<(), PersistedKeypairError> {
) -> Result<(), KeyManagerError> {
persist_keypair(
&self.keypairs_dir,
PersistedKeypair::new(remote_peer_id, &keypair)?,
PersistedKeypair::new(deal_creator, &keypair, deal_id.clone())?,
)?;
let scope_peer_id = keypair.get_peer_id();
self.scope_peer_ids
.write()
.insert(remote_peer_id, scope_peer_id);

self.scope_keypairs.write().insert(scope_peer_id, keypair);
let worker_id = keypair.get_peer_id();
self.worker_ids.write().insert(deal_id, worker_id);
self.worker_creators.write().insert(worker_id, deal_creator);
self.worker_keypairs.write().insert(worker_id, keypair);

Ok(())
}
Expand Down
24 changes: 14 additions & 10 deletions crates/key-manager/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

use fs_utils::{create_dir, list_files};

use crate::error::PersistedKeypairError;
use crate::error::PersistedKeypairError::{
use crate::error::KeyManagerError;
use crate::error::KeyManagerError::{
CannotExtractRSASecretKey, CreateKeypairsDir, DeserializePersistedKeypair,
ReadPersistedKeypair, SerializePersistedKeypair, WriteErrorPersistedKeypair,
};
Expand All @@ -30,17 +30,23 @@ use std::path::Path;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PersistedKeypair {
#[serde(with = "peerid_serializer")]
pub remote_peer_id: PeerId,
pub deal_creator: PeerId,
pub private_key_bytes: Vec<u8>,
pub key_format: String,
pub deal_id: String,
}

impl PersistedKeypair {
pub fn new(owner_id: PeerId, keypair: &KeyPair) -> Result<Self, PersistedKeypairError> {
pub fn new(
deal_creator: PeerId,
keypair: &KeyPair,
deal_id: String,
) -> Result<Self, KeyManagerError> {
Ok(Self {
remote_peer_id: owner_id,
deal_creator,
private_key_bytes: keypair.secret().map_err(|_| CannotExtractRSASecretKey)?,
key_format: keypair.public().get_key_format().into(),
deal_id,
})
}
}
Expand All @@ -59,10 +65,8 @@ pub fn is_keypair(path: &Path) -> bool {
pub fn persist_keypair(
keypairs_dir: &Path,
persisted_keypair: PersistedKeypair,
) -> Result<(), PersistedKeypairError> {
let path = keypairs_dir.join(keypair_file_name(
&persisted_keypair.remote_peer_id.to_base58(),
));
) -> Result<(), KeyManagerError> {
let path = keypairs_dir.join(keypair_file_name(&persisted_keypair.deal_id));
let bytes =
toml::to_vec(&persisted_keypair).map_err(|err| SerializePersistedKeypair { err })?;
std::fs::write(&path, bytes).map_err(|err| WriteErrorPersistedKeypair { path, err })
Expand All @@ -71,7 +75,7 @@ pub fn persist_keypair(
/// Load info about persisted keypairs from disk
pub fn load_persisted_keypairs(
keypairs_dir: &Path,
) -> Vec<Result<PersistedKeypair, PersistedKeypairError>> {
) -> Vec<Result<PersistedKeypair, KeyManagerError>> {
// Load all persisted service file names
let files = match list_files(keypairs_dir) {
Some(files) => files,
Expand Down
7 changes: 1 addition & 6 deletions crates/particle-node-tests/tests/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1633,12 +1633,7 @@ fn json_builtins() {
#[test]
fn insecure_sign_verify() {
let kp = KeyPair::from_secret_key(INSECURE_KEYPAIR_SEED.collect(), KeyFormat::Ed25519).unwrap();
let swarms = make_swarms_with_builtins(
1,
"tests/builtins/services".as_ref(),
Some(kp.clone()),
None,
);
let swarms = make_swarms_with_builtins(1, "tests/builtins/services".as_ref(), None, None);

let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.wrap_err("connect client")
Expand Down
Loading

0 comments on commit ddc2f90

Please sign in to comment.