Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support keypairs for spells [NET-237 NET-239 NET-281 NET-283] #1382

Merged
merged 30 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b51b547
WIP update plumber for virtual peers
justprosh Dec 8, 2022
98b703c
fix tests
justprosh Dec 9, 2022
4228616
wip
justprosh Dec 15, 2022
ee92afd
wip
justprosh Dec 20, 2022
d657c8c
sign particles, refactor key manager
justprosh Dec 23, 2022
d3f52e3
fix bugs
justprosh Dec 28, 2022
ae4fe6b
Merge branch 'master' into virtual_peers
justprosh Dec 29, 2022
7d28d5a
gather interpretation stats independently
justprosh Jan 3, 2023
57ef32d
chore(deps): update rust crate ctrlc to 3.2.4 (#1380)
renovate[bot] Jan 9, 2023
caf9e9c
feat(spells): add spell config update [NET-234 NET-315] (#1381)
kmd-fl Jan 9, 2023
241e1b0
pr fixes
justprosh Jan 10, 2023
8dfce97
Merge remote-tracking branch 'origin/master' into virtual_peers
justprosh Jan 12, 2023
c337c56
pr fixes
justprosh Jan 12, 2023
af85e10
test fixes
justprosh Jan 12, 2023
3c52637
fix linter errors
justprosh Jan 12, 2023
fcfc39a
fix plumber test
justprosh Jan 12, 2023
1a95c3e
pr fixes
justprosh Jan 12, 2023
38675b1
add enum for errors
justprosh Jan 12, 2023
9401a99
Merge remote-tracking branch 'origin/master' into virtual_peers
justprosh Jan 13, 2023
768f31d
refactor sorcerer custom service creation
justprosh Jan 13, 2023
3c0b6c3
pr fixes
justprosh Jan 13, 2023
bee99d8
fix lint warning
justprosh Jan 13, 2023
855c1d4
pr fixes
justprosh Jan 13, 2023
2e43b73
remove debug logging
justprosh Jan 13, 2023
7f69ffd
Update crates/key-manager/src/key_manager.rs
justprosh Jan 16, 2023
bd11c1e
Update crates/key-manager/src/key_manager.rs
justprosh Jan 16, 2023
e4571ba
pr fixes
justprosh Jan 16, 2023
ad52071
Merge remote-tracking branch 'origin/master' into virtual_peers
justprosh Jan 16, 2023
82ef032
fix tests
justprosh Jan 16, 2023
a43a0d0
Merge branch 'master' into virtual_peers
justprosh Jan 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ connected-client = { path = "crates/connected-client" }
test-constants = { path = "crates/test-constants" }
peer-metrics = { path = "crates/peer-metrics" }
spell-event-bus = { path = "crates/spell-event-bus" }
key-manager = { path = "crates/key-manager" }
sorcerer = { path = "sorcerer" }
builtins = { path = "particle-node/tests/builtins" }
particle-node = { path = "particle-node" }
Expand Down Expand Up @@ -113,3 +114,5 @@ futures = "0.3.24"
thiserror = "1.0.37"
serde = "1.0.145"
toml = "0.5.9"
itertools = "0.10.5"
humantime-serde = "1.1.1"
18 changes: 9 additions & 9 deletions aquamarine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ particle-protocol = { workspace = true }
particle-services = { workspace = true }
particle-builtins = { workspace = true }

now-millis = { path = "../crates/now-millis" }
fluence-libp2p = { path = "../crates/libp2p" }
config-utils = { path = "../crates/config-utils" }
particle-args = { path = "../crates/particle-args" }
control-macro = { path = "../crates/control-macro" }
fs-utils = { path = "../crates/fs-utils" }
peer-metrics = { path = "../crates/peer-metrics" }
particle-execution = { path = "../particle-execution" }
key-manager = { path = "../crates/key-manager" }
now-millis = { workspace = true }
fluence-libp2p = { workspace = true }
config-utils = { workspace = true }
particle-args = { workspace = true }
control-macro = { workspace = true }
fs-utils = { workspace = true }
peer-metrics = { workspace = true }
particle-execution = { workspace = true }
key-manager = { workspace = true}

avm-server = { workspace = true }
libp2p = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion aquamarine/src/aquamarine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
out: EffectsChannel,
plumber_metrics: Option<ParticleExecutorMetrics>,
vm_pool_metrics: Option<VmPoolMetrics>,
host_peer_id: PeerId,
key_manager: KeyManager,
justprosh marked this conversation as resolved.
Show resolved Hide resolved
) -> (Self, AquamarineApi) {
// TODO: make `100` configurable
let (outlet, inlet) = mpsc::channel(100);
let sender = AquamarineApi::new(outlet, config.execution_timeout);
let vm_pool = VmPool::new(config.pool_size, runtime_config, vm_pool_metrics);
let host_peer_id = key_manager.get_host_peer_id();
let plumber = Plumber::new(vm_pool, builtins, plumber_metrics, key_manager);
let this = Self {
inlet,
Expand All @@ -80,6 +80,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
Poll::Ready(Some(Ingest { particle, function })) => {
wake = true;
// set new particle to be executed
// every particle that comes from the connection pool first executed on the host peer id
self.plumber.ingest(particle, function, self.host_peer_id);
justprosh marked this conversation as resolved.
Show resolved Hide resolved
}
Poll::Ready(Some(AddService {
Expand Down
15 changes: 5 additions & 10 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,12 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
let key_manager = self.key_manager.clone();
for actor in self.actors.values_mut() {
if let Poll::Ready(result) = actor.poll_completed(cx) {
// TODO: filter result.effects and filter out local effects

interpretation_stats.push(result.stats);
let (local_peers, remote_peers): (Vec<_>, Vec<_>) = result
.effects
.next_peers
.into_iter()
.partition(|p| key_manager.is_local_peer_id(&p.to_base58()));
.partition(|p| key_manager.is_scope_peer_id(*p));

if !remote_peers.is_empty() {
remote_effects.push(RoutingEffects {
Expand All @@ -155,7 +153,6 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
}

if !local_peers.is_empty() {
log::trace!("Local peers: {:?}", local_peers);
local_effects.push(RoutingEffects {
justprosh marked this conversation as resolved.
Show resolved Hide resolved
particle: result.effects.particle,
next_peers: local_peers,
Expand Down Expand Up @@ -235,16 +232,14 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
}
});

for effect in local_effects.iter() {
for effect in local_effects.into_iter() {
for local_peer in effect.next_peers.iter() {
self.ingest(effect.particle.clone(), None, local_peer.clone());
self.ingest(effect.particle.clone(), None, *local_peer);
justprosh marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Turn effects into events, and buffer them
for effect in remote_effects {
self.events.push_back(Ok(effect));
}
self.events.extend(remote_effects.into_iter().map(Ok));

// Return a new event if there is some
if let Some(event) = self.events.pop_front() {
Expand Down Expand Up @@ -380,7 +375,7 @@ mod tests {
// Pool is of size 1 so it's easier to control tests
let vm_pool = VmPool::new(1, (), None);
let builtin_mock = Arc::new(MockF);
let key_manager = KeyManager::new("keypair".into());
let key_manager = KeyManager::new("keypair".into(), RandomPeerId::random());
Plumber::new(vm_pool, builtin_mock, None, key_manager)
}

Expand Down
2 changes: 1 addition & 1 deletion connection-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ futures = { workspace = true }
log = { workspace = true }
serde = { workspace = true }
async-std = { workspace = true }
itertools = "0.10.5"
itertools = { workspace = true }

[dev-dependencies]
parking_lot = { workspace = true }
2 changes: 1 addition & 1 deletion crates/connected-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ futures = { workspace = true }
serde = { version = "1.0.145", features = ["derive"] }
serde_json = { workspace = true }
log = { workspace = true }
derivative = "2.2.0"
derivative = { workspace = true }
eyre = { workspace = true }
parking_lot = { workspace = true }

Expand Down
2 changes: 1 addition & 1 deletion crates/created-swarm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ libp2p = { workspace = true }

async-std = { workspace = true }
futures = { workspace = true }
derivative = "2.2.0"
derivative = { workspace = true }
serde_json = { workspace = true }
base64 = { workspace = true }
serde = { workspace = true }
Expand Down
7 changes: 6 additions & 1 deletion crates/created-swarm/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,12 @@ pub fn create_swarm_with_runtime<RT: AquaRuntime>(
let mut node = Node::new(resolved, vm_config, "some version").expect("create node");
node.listen(vec![config.listen_on.clone()]).expect("listen");

(node.local_peer_id, node, management_kp, config)
(
node.key_manager.get_host_peer_id(),
node,
management_kp,
config,
)
}

pub fn create_swarm(config: SwarmConfig) -> (PeerId, Box<Node<AVM>>, KeyPair, SwarmConfig) {
Expand Down
6 changes: 4 additions & 2 deletions crates/key-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
fs-utils = { path = "../fs-utils" }

fs-utils = { workspace = true }
fluence-libp2p = { workspace = true}
fluence-keypair = { workspace = true }

parking_lot = { workspace = true }
eyre = { workspace = true }
thiserror = { workspace = true }
serde = { workspace = true }
toml = { workspace = true }
log = { workspace = true }
libp2p = { workspace = true }
2 changes: 2 additions & 0 deletions crates/key-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use thiserror::Error;

#[derive(Debug, Error)]
pub enum PersistedKeypairError {
#[error("Failed to persist keypair: RSA is not supported")]
CannotExtractRSASecretKey,
#[error("Error reading persisted keypair from {path:?}: {err}")]
ReadPersistedKeypair {
path: PathBuf,
Expand Down
88 changes: 47 additions & 41 deletions crates/key-manager/src/key_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,33 @@
*/

use fluence_keypair::{KeyFormat, KeyPair};
use libp2p::PeerId;
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;

use crate::error::PersistedKeypairError;
use crate::persistence::{load_persisted_keypairs, persist_keypair, PersistedKeypair};
use parking_lot::RwLock;

#[derive(Clone)]
pub struct KeyManager {
local_peer_ids: Arc<RwLock<HashMap<String, Arc<KeyPair>>>>,
remote_peer_ids: Arc<RwLock<HashMap<String, Arc<KeyPair>>>>,
/// scope_peer_id -> scope_keypair
scope_keypairs: Arc<RwLock<HashMap<PeerId, KeyPair>>>,
/// remote_peer_id -> scope_peer_id
scope_peer_ids: Arc<RwLock<HashMap<PeerId, PeerId>>>,
keypairs_dir: PathBuf,
host_peer_id: PeerId,
}

impl KeyManager {
pub fn new(keypairs_dir: PathBuf) -> Self {
pub fn new(keypairs_dir: PathBuf, host_peer_id: PeerId) -> Self {
let this = Self {
local_peer_ids: Arc::new(Default::default()),
remote_peer_ids: Arc::new(Default::default()),
scope_keypairs: Arc::new(Default::default()),
scope_peer_ids: Arc::new(Default::default()),
keypairs_dir,
host_peer_id,
};

this.load_persisted_keypairs();
Expand All @@ -48,18 +54,16 @@ impl KeyManager {
for pkp in persisted_keypairs {
let res: eyre::Result<()> = try {
let persisted_kp = pkp?;
let keypair = Arc::new(KeyPair::from_vec(
persisted_kp.keypair_bytes,
let keypair = KeyPair::from_secret_key(
persisted_kp.private_key_bytes,
KeyFormat::from_str(&persisted_kp.key_format)?,
)?);
let peer_id = keypair.get_peer_id().to_base58();
self.remote_peer_ids
)?;
let peer_id = keypair.get_peer_id();
self.scope_peer_ids
.write()
.insert(persisted_kp.remote_peer_id, keypair.clone());
.insert(persisted_kp.remote_peer_id, keypair.get_peer_id());

self.local_peer_ids.write().insert(peer_id, keypair.clone());

()
self.scope_keypairs.write().insert(peer_id, keypair);
};

if let Err(e) = res {
Expand All @@ -68,54 +72,56 @@ impl KeyManager {
}
}

pub fn has_keypair(&self, remote_peer_id: &str) -> bool {
self.remote_peer_ids.read().contains_key(remote_peer_id)
pub fn get_host_peer_id(&self) -> PeerId {
self.host_peer_id
}

pub fn is_local_peer_id(&self, local_peer_id: &str) -> bool {
self.local_peer_ids.read().contains_key(local_peer_id)
pub fn has_keypair(&self, remote_peer_id: PeerId) -> bool {
self.scope_peer_ids.read().contains_key(&remote_peer_id)
}

pub fn get_keypair_by_remote_peer_id(
&self,
remote_peer_id: &str,
) -> eyre::Result<Arc<KeyPair>> {
if let Some(k) = self.remote_peer_ids.read().get(remote_peer_id).cloned() {
pub fn is_scope_peer_id(&self, scope_peer_id: PeerId) -> bool {
self.scope_keypairs.read().contains_key(&scope_peer_id)
}

pub fn get_scope_peer_id(&self, remote_peer_id: PeerId) -> eyre::Result<PeerId> {
if let Some(k) = self.scope_peer_ids.read().get(&remote_peer_id).cloned() {
Ok(k)
} else {
Err(eyre::eyre!(
"Keypair for peer id {} not exists",
"Scope peer id for peer id {} not exists",
justprosh marked this conversation as resolved.
Show resolved Hide resolved
remote_peer_id
))
}
}

pub fn get_keypair_by_local_peer_id(&self, local_peer_id: &str) -> eyre::Result<Arc<KeyPair>> {
if let Some(k) = self.local_peer_ids.read().get(local_peer_id).cloned() {
Ok(k)
} else {
Err(eyre::eyre!(
"Keypair for peer id {} not exists",
local_peer_id
))
}
pub fn get_scope_keypair(&self, scope_peer_id: PeerId) -> eyre::Result<KeyPair> {
self.scope_keypairs
.read()
.get(&scope_peer_id)
.cloned()
.ok_or_else(|| eyre::eyre!("Keypair for peer id {} does not exist", scope_peer_id))
justprosh marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn generate_keypair(&self) -> Arc<KeyPair> {
Arc::new(KeyPair::generate_ed25519())
pub fn generate_keypair(&self) -> KeyPair {
KeyPair::generate_ed25519()
}

pub fn store_keypair(&self, remote_peer_id: &str, keypair: Arc<KeyPair>) -> eyre::Result<()> {
pub fn store_keypair(
&self,
remote_peer_id: PeerId,
keypair: KeyPair,
) -> Result<(), PersistedKeypairError> {
persist_keypair(
&self.keypairs_dir,
PersistedKeypair::new(remote_peer_id.to_string(), &keypair),
PersistedKeypair::new(remote_peer_id, &keypair)?,
)?;
let peer_id = keypair.get_peer_id().to_base58();
self.remote_peer_ids
let scope_peer_id = keypair.get_peer_id();
self.scope_peer_ids
.write()
.insert(remote_peer_id.to_string(), keypair.clone());
.insert(remote_peer_id, scope_peer_id);

self.local_peer_ids.write().insert(peer_id, keypair.clone());
self.scope_keypairs.write().insert(scope_peer_id, keypair);

Ok(())
}
Expand Down
Loading