Skip to content

Commit

Permalink
feat(workers): add activate/deactivate [NET-587] (#1889)
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh committed Nov 27, 2023
1 parent 63c13a8 commit 0883ab8
Show file tree
Hide file tree
Showing 23 changed files with 763 additions and 210 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions aquamarine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ pub enum AquamarineApiError {
particle_id: String,
err: ParticleError,
},
#[error("AquamarineApiError::WorkerIsNotActive: worker_id = {worker_id}, particle_id = {particle_id}")]
WorkerIsNotActive {
worker_id: String,
particle_id: String,
},
}

impl AquamarineApiError {
Expand All @@ -59,6 +64,7 @@ impl AquamarineApiError {
AquamarineApiError::ParticleExpired { particle_id } => Some(particle_id),
AquamarineApiError::OneshotCancelled { particle_id } => Some(particle_id),
AquamarineApiError::ExecutionTimedOut { particle_id, .. } => Some(particle_id),
AquamarineApiError::WorkerIsNotActive { particle_id, .. } => Some(particle_id),
// Should it be `None` considering usage of signature as particle id?
// It can compromise valid particles into thinking they are invalid.
// But still there can be a case when signature was generated wrong
Expand Down
8 changes: 8 additions & 0 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
return;
}

if !self.key_manager.is_worker_active(worker_id)
&& !self.key_manager.is_management(particle.init_peer_id)
{
tracing::trace!(target: "worker_inactive", particle_id = particle.id, worker_id = worker_id.to_string(), "Worker is not active");
return;
}

let builtins = &self.builtins;
let key = (ParticleId(particle.signature.clone()), worker_id);
let entry = self.actors.entry(key);
Expand Down Expand Up @@ -432,6 +439,7 @@ mod tests {
let builtin_mock = Arc::new(MockF);
let key_manager = KeyManager::new(
"keypair".into(),
"workers".into(),
KeyPair::generate_ed25519(),
RandomPeerId::random(),
RandomPeerId::random(),
Expand Down
13 changes: 5 additions & 8 deletions crates/connected-client/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,11 @@ impl NetworkBehaviour for ClientBehaviour {
) {
use ClientEvent::Particle;

match event {
Ok(HandlerMessage::InParticle(particle)) => {
self.events.push_back(GenerateEvent(Particle {
particle,
sender: peer_id,
}))
}
_ => {}
if let Ok(HandlerMessage::InParticle(particle)) = event {
self.events.push_back(GenerateEvent(Particle {
particle,
sender: peer_id,
}))
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/key-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ serde = { workspace = true }
toml = { workspace = true }
log = { workspace = true }
libp2p = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
39 changes: 38 additions & 1 deletion crates/key-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ pub enum KeyManagerError {
#[source]
err: toml::de::Error,
},
#[error("Failed to decode keypair {path}: {err}")]
PersistedKeypairDecodingError {
path: PathBuf,
#[source]
err: fluence_keypair::error::DecodingError,
},
#[error("Invalid key format {path}: {err}")]
PersistedKeypairInvalidKeyformat {
path: PathBuf,
#[source]
err: fluence_keypair::error::Error,
},
#[error("Error serializing persisted keypair: {err}")]
SerializePersistedKeypair {
#[source]
Expand All @@ -45,9 +57,28 @@ pub enum KeyManagerError {
#[source]
err: std::io::Error,
},
#[error("Error removing persisted keypair {path:?}: {err}")]
#[error("Error removing persisted keypair {path:?} for worker {worker_id}: {err}")]
RemoveErrorPersistedKeypair {
path: PathBuf,
worker_id: PeerId,
#[source]
err: std::io::Error,
},
#[error("Error serializing persisted worker: {err}")]
SerializePersistedWorker {
#[source]
err: toml::ser::Error,
},
#[error("Error writing persisted worker to {path:?}: {err}")]
WriteErrorPersistedWorker {
path: PathBuf,
#[source]
err: std::io::Error,
},
#[error("Error removing persisted worker {path:?} for worker {worker_id}: {err}")]
RemoveErrorPersistedWorker {
path: PathBuf,
worker_id: PeerId,
#[source]
err: std::io::Error,
},
Expand All @@ -57,6 +88,12 @@ pub enum KeyManagerError {
#[source]
err: std::io::Error,
},
#[error("Error creating directory for persisted workers {path:?}: {err}")]
CreateWorkersDir {
path: PathBuf,
#[source]
err: std::io::Error,
},
#[error("Keypair for peer_id {0} not found")]
KeypairNotFound(PeerId),
#[error("Worker for {deal_id} already exists")]
Expand Down
Loading

0 comments on commit 0883ab8

Please sign in to comment.