Skip to content

Commit

Permalink
wip refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh committed Feb 13, 2023
1 parent be32509 commit a1aac28
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 80 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
for actor in self.actors.values_mut() {
if let Poll::Ready(result) = actor.poll_completed(cx) {
interpretation_stats.push(result.stats);
let (local_peers, remote_peers): (Vec<_>, Vec<_>) =
result.effects.next_peers.into_iter().partition(|p| {
key_manager.is_scope_peer_id(*p)
|| p.eq(&self.key_manager.get_host_peer_id())
});
let (local_peers, remote_peers): (Vec<_>, Vec<_>) = result
.effects
.next_peers
.into_iter()
.partition(|p| key_manager.is_local(*p));

if !remote_peers.is_empty() {
remote_effects.push(RoutingEffects {
Expand Down
1 change: 1 addition & 0 deletions crates/key-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
fs-utils = { workspace = true }
fluence-libp2p = { workspace = true}
fluence-keypair = { workspace = true }
server-config = { workspace = true }

parking_lot = { workspace = true }
eyre = { workspace = true }
Expand Down
59 changes: 37 additions & 22 deletions crates/key-manager/src/key_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,43 @@ use crate::error::{KeyManagerError, PersistedKeypairError};
use crate::persistence::{load_persisted_keypairs, persist_keypair, PersistedKeypair};
use parking_lot::RwLock;

pub const INSECURE_KEYPAIR_SEED: Range<u8> = 0..32;
pub const INSECURE_KEYPAIR_SEED: Range<u8> = 0..32;

#[derive(Clone)]
pub struct KeyManager {
/// scope_peer_id -> scope_keypair
scope_keypairs: Arc<RwLock<HashMap<PeerId, KeyPair>>>,
/// remote_peer_id -> scope_peer_id
scope_peer_ids: Arc<RwLock<HashMap<PeerId, PeerId>>>,
pub management_peer_id: PeerId,
pub builtins_management_peer_id: PeerId,
pub root_keypair: KeyPair,
/// worker_peer_id -> worker_keypair
worker_keypairs: Arc<RwLock<HashMap<PeerId, KeyPair>>>,
/// remote_peer_id -> worker_peer_id
worker_peer_ids: Arc<RwLock<HashMap<PeerId, PeerId>>>,
keypairs_dir: PathBuf,
host_peer_id: PeerId,
// temporary public, will refactor
pub insecure_keypair: KeyPair,
}

impl KeyManager {
pub fn new(keypairs_dir: PathBuf, host_peer_id: PeerId) -> Self {
pub fn new(
keypairs_dir: PathBuf,
root_keypair: KeyPair,
management_peer_id: PeerId,
builtins_kp: KeyPair,
) -> Self {
let this = Self {
scope_keypairs: Arc::new(Default::default()),
scope_peer_ids: Arc::new(Default::default()),
management_peer_id,
builtins_management_peer_id: builtins_kp.get_peer_id(),
host_peer_id: root_keypair.get_peer_id(),
root_keypair,
worker_keypairs: Arc::new(Default::default()),
worker_peer_ids: Arc::new(Default::default()),
keypairs_dir,
host_peer_id,
insecure_keypair: KeyPair::from_secret_key(INSECURE_KEYPAIR_SEED.collect(), KeyFormat::Ed25519)
.expect("error creating insecure keypair"),
insecure_keypair: KeyPair::from_secret_key(
INSECURE_KEYPAIR_SEED.collect(),
KeyFormat::Ed25519,
)
.expect("error creating insecure keypair"),
};

this.load_persisted_keypairs();
Expand All @@ -66,11 +80,11 @@ impl KeyManager {
KeyFormat::from_str(&persisted_kp.key_format)?,
)?;
let peer_id = keypair.get_peer_id();
self.scope_peer_ids
self.worker_peer_ids
.write()
.insert(persisted_kp.remote_peer_id, keypair.get_peer_id());

self.scope_keypairs.write().insert(peer_id, keypair);
self.worker_keypairs.write().insert(peer_id, keypair);
};

if let Err(e) = res {
Expand All @@ -84,22 +98,23 @@ impl KeyManager {
}

pub fn has_keypair(&self, remote_peer_id: PeerId) -> bool {
self.scope_peer_ids.read().contains_key(&remote_peer_id)
self.worker_peer_ids.read().contains_key(&remote_peer_id)
}

pub fn is_scope_peer_id(&self, scope_peer_id: PeerId) -> bool {
self.scope_keypairs.read().contains_key(&scope_peer_id)
pub fn is_local(&self, worker_peer_id: PeerId) -> bool {
self.host_peer_id.eq(&worker_peer_id)
|| self.worker_keypairs.read().contains_key(&worker_peer_id)
}

/// For local peer ids is identity,
/// for remote returns associated peer id or generate a new one.
pub fn get_scope_peer_id(&self, init_peer_id: PeerId) -> Result<PeerId, PersistedKeypairError> {
// All "nested" spells share the same keypair.
// "nested" means spells which are created by other spells
if self.is_scope_peer_id(init_peer_id) {
if self.is_local(init_peer_id) {
Ok(init_peer_id)
} else {
let scope_peer_id = self.scope_peer_ids.read().get(&init_peer_id).cloned();
let scope_peer_id = self.worker_peer_ids.read().get(&init_peer_id).cloned();
match scope_peer_id {
Some(p) => Ok(p),
_ => {
Expand All @@ -112,8 +127,8 @@ impl KeyManager {
}
}

pub fn get_scope_keypair(&self, scope_peer_id: PeerId) -> Result<KeyPair, KeyManagerError> {
self.scope_keypairs
pub fn get_worker_keypair(&self, scope_peer_id: PeerId) -> Result<KeyPair, KeyManagerError> {
self.worker_keypairs
.read()
.get(&scope_peer_id)
.cloned()
Expand All @@ -134,11 +149,11 @@ impl KeyManager {
PersistedKeypair::new(remote_peer_id, &keypair)?,
)?;
let scope_peer_id = keypair.get_peer_id();
self.scope_peer_ids
self.worker_peer_ids
.write()
.insert(remote_peer_id, scope_peer_id);

self.scope_keypairs.write().insert(scope_peer_id, keypair);
self.worker_keypairs.write().insert(scope_peer_id, keypair);

Ok(())
}
Expand Down
13 changes: 0 additions & 13 deletions crates/server-config/src/services_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
use fs_utils::{create_dirs, set_write_only, to_abs_path};

use bytesize::ByteSize;
use libp2p::PeerId;
use std::collections::HashMap;
use std::path::PathBuf;

#[derive(Debug, Clone)]
pub struct ServicesConfig {
/// Peer id of the current node
pub local_peer_id: PeerId,
/// Path of the blueprint directory containing blueprints and wasm modules
pub blueprint_dir: PathBuf,
/// Opaque environment variables to be passed on each service creation
Expand All @@ -39,10 +36,6 @@ pub struct ServicesConfig {
/// Dir to store directories shared between services
/// in the span of a single particle execution
pub particles_vault_dir: PathBuf,
/// key that could manage services
pub management_peer_id: PeerId,
/// key to manage builtins services initialization
pub builtins_management_peer_id: PeerId,
/// Maximum heap size in bytes available for the module.
pub max_heap_size: ByteSize,
/// Default heap size in bytes available for the module unless otherwise specified.
Expand All @@ -52,27 +45,21 @@ pub struct ServicesConfig {
impl ServicesConfig {
#[allow(clippy::too_many_arguments)]
pub fn new(
local_peer_id: PeerId,
base_dir: PathBuf,
particles_vault_dir: PathBuf,
envs: HashMap<Vec<u8>, Vec<u8>>,
management_peer_id: PeerId,
builtins_management_peer_id: PeerId,
max_heap_size: ByteSize,
default_heap_size: Option<ByteSize>,
) -> Result<Self, std::io::Error> {
let base_dir = to_abs_path(base_dir);

let this = Self {
local_peer_id,
blueprint_dir: config_utils::blueprint_dir(&base_dir),
workdir: config_utils::workdir(&base_dir),
modules_dir: config_utils::modules_dir(&base_dir),
services_dir: config_utils::services_dir(&base_dir),
particles_vault_dir,
envs,
management_peer_id,
builtins_management_peer_id,
max_heap_size,
default_heap_size,
};
Expand Down
34 changes: 9 additions & 25 deletions particle-builtins/src/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ pub struct Builtins<C> {
pub connectivity: C,
pub script_storage: ScriptStorageApi,

// TODO: move all peer ids and keypairs to key manager
pub management_peer_id: PeerId,
pub builtins_management_peer_id: PeerId,
pub local_peer_id: PeerId,
#[derivative(Debug = "ignore")]
pub root_keypair: KeyPair,

pub modules: ModuleRepository,
pub services: ParticleAppServices,
pub node_info: NodeInfo,
Expand All @@ -100,7 +93,6 @@ where
node_info: NodeInfo,
config: ServicesConfig,
services_metrics: ServicesMetrics,
root_keypair: KeyPair,
key_manager: KeyManager,
) -> Self {
let modules_dir = &config.modules_dir;
Expand All @@ -114,18 +106,11 @@ where
config.default_heap_size,
);
let particles_vault_dir = vault_dir.to_path_buf();
let management_peer_id = config.management_peer_id;
let builtins_management_peer_id = config.builtins_management_peer_id;
let local_peer_id = config.local_peer_id;
let services = ParticleAppServices::new(config, modules.clone(), Some(services_metrics));

Self {
connectivity,
script_storage,
management_peer_id,
builtins_management_peer_id,
local_peer_id,
root_keypair,
modules,
services,
node_info,
Expand Down Expand Up @@ -896,11 +881,7 @@ where

let tetraplet = tetraplets.get(0).map(|v| v.as_slice());
if let Some([t]) = tetraplet {
if t.peer_pk != self.local_peer_id.to_base58()
&& !self
.key_manager
.is_scope_peer_id(PeerId::from_str(&t.peer_pk)?)
{
if !self.key_manager.is_local(PeerId::from_str(&t.peer_pk)?) {
return Err(JError::new(format!(
"data is expected to be produced by service 'registry' on peer '{}', was from peer '{}'",
self.local_peer_id, t.peer_pk
Expand Down Expand Up @@ -931,7 +912,7 @@ where
json!(self.root_keypair.sign(&data)?.to_vec())
} else {
// if this call is initiated by worker on these worker as host_id and init_peer_id
let keypair = self.key_manager.get_scope_keypair(params.init_peer_id)?;
let keypair = self.key_manager.get_worker_keypair(params.init_peer_id)?;
json!(keypair.sign(&data)?.to_vec())
}
};
Expand Down Expand Up @@ -966,7 +947,7 @@ where
} else {
Ok(JValue::Bool(
self.key_manager
.get_scope_keypair(params.host_id)?
.get_worker_keypair(params.host_id)?
.public()
.verify(&data, &signature)
.is_ok(),
Expand Down Expand Up @@ -1012,11 +993,14 @@ where
let mut args = args.function_args.into_iter();
let signature: Vec<u8> = Args::next("signature", &mut args)?;
let data: Vec<u8> = Args::next("data", &mut args)?;
let signature =
Signature::from_bytes(self.key_manager.insecure_keypair.public().get_key_format(), signature);
let signature = Signature::from_bytes(
self.key_manager.insecure_keypair.public().get_key_format(),
signature,
);

Ok(JValue::Bool(
self.key_manager.insecure_keypair
self.key_manager
.insecure_keypair
.public()
.verify(&data, &signature)
.is_ok(),
Expand Down
10 changes: 3 additions & 7 deletions particle-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,15 @@ impl<RT: AquaRuntime> Node<RT> {

let key_manager = KeyManager::new(
config.dir_config.keypairs_base_dir.clone(),
to_peer_id(&key_pair),
config.node_config.root_key_pair,
config.management_peer_id,
config.node_config.builtins_key_pair,
);

let services_config = ServicesConfig::new(
key_manager.get_host_peer_id(),
config.dir_config.services_base_dir.clone(),
config_utils::particles_vault_dir(&config.dir_config.avm_base_dir),
config.services_envs.clone(),
config.management_peer_id,
builtins_peer_id,
config.node_config.module_max_heap_size,
config.node_config.module_default_heap_size,
)
Expand Down Expand Up @@ -201,7 +200,6 @@ impl<RT: AquaRuntime> Node<RT> {
services_config,
script_storage_api,
services_metrics,
config.node_config.root_key_pair.clone(),
key_manager.clone(),
));

Expand Down Expand Up @@ -317,7 +315,6 @@ impl<RT: AquaRuntime> Node<RT> {
services_config: ServicesConfig,
script_storage_api: ScriptStorageApi,
services_metrics: ServicesMetrics,
root_keypair: KeyPair,
key_manager: KeyManager,
) -> Builtins<Connectivity> {
let node_info = NodeInfo {
Expand All @@ -332,7 +329,6 @@ impl<RT: AquaRuntime> Node<RT> {
node_info,
services_config,
services_metrics,
root_keypair,
key_manager,
)
}
Expand Down
1 change: 1 addition & 0 deletions particle-services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ particle-execution = { workspace = true }
peer-metrics = { workspace = true }
uuid-utils = { workspace = true }
now-millis = { workspace = true }
key-manager = { workspace = true }

fluence-app-service = { workspace = true }

Expand Down
8 changes: 1 addition & 7 deletions particle-services/src/app_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ pub struct ParticleAppServices {
services: Arc<RwLock<Services>>,
modules: ModuleRepository,
aliases: Arc<RwLock<Aliases>>,
// TODO: move these peer ids to key manager
management_peer_id: PeerId,
builtins_management_peer_id: PeerId,
pub metrics: Option<ServicesMetrics>,
}

Expand Down Expand Up @@ -152,16 +149,13 @@ impl ParticleAppServices {
metrics: Option<ServicesMetrics>,
) -> Self {
let vault = ParticleVault::new(config.particles_vault_dir.clone());
let management_peer_id = config.management_peer_id;
let builtins_management_peer_id = config.builtins_management_peer_id;

let this = Self {
config,
vault,
services: <_>::default(),
modules,
aliases: <_>::default(),
management_peer_id,
builtins_management_peer_id,
metrics,
};

Expand Down
2 changes: 1 addition & 1 deletion sorcerer/src/script_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Sorcerer {
) -> Result<Particle, JError> {
let spell_keypair = self
.key_manager
.get_scope_keypair(scope_peer_id)
.get_worker_keypair(scope_peer_id)
.map_err(|err| ScopeKeypairMissing {
err,
spell_id: spell_id.clone(),
Expand Down

0 comments on commit a1aac28

Please sign in to comment.