From 3065a950246d8a10116095aac64b06d4814e4bd4 Mon Sep 17 00:00:00 2001 From: Maria Kuklina <101095419+kmd-fl@users.noreply.github.com> Date: Fri, 2 Feb 2024 16:33:14 +0100 Subject: [PATCH] fix(particle-data-store): use signature in particle data store path name [NET-632] (#2052) * use signature in data store name in base58 format --- Cargo.lock | 1 + aquamarine/Cargo.toml | 1 + aquamarine/src/actor.rs | 5 +- aquamarine/src/particle_data_store.rs | 69 +++++++++++++++++++-------- aquamarine/src/particle_executor.rs | 8 +++- aquamarine/src/plumber.rs | 3 +- crates/local-vm/src/local_vm.rs | 50 +++++++++++++------ 7 files changed, 98 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6442d95ada..5552279a49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -245,6 +245,7 @@ dependencies = [ "async-trait", "avm-server", "base64 0.21.7", + "bs58", "bytesize", "chrono", "config", diff --git a/aquamarine/Cargo.toml b/aquamarine/Cargo.toml index cff9f024b4..8768f9a9c0 100644 --- a/aquamarine/Cargo.toml +++ b/aquamarine/Cargo.toml @@ -34,6 +34,7 @@ serde_json = { workspace = true } parking_lot = { workspace = true } chrono = "0.4.33" base64 = { workspace = true } +bs58 = { workspace = true } thiserror = { workspace = true } humantime = "2.1.0" anyhow = "1.0.79" diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index b27b8e5cc8..103a3b34ad 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -110,9 +110,10 @@ where self.future.is_some() } - pub fn cleanup_key(&self) -> (String, PeerId) { + pub fn cleanup_key(&self) -> (String, PeerId, Vec) { let particle_id = self.particle.id.clone(); - (particle_id, self.current_peer_id) + let signature = self.particle.signature.clone(); + (particle_id, self.current_peer_id, signature) } pub fn mailbox_size(&self) -> usize { diff --git a/aquamarine/src/particle_data_store.rs b/aquamarine/src/particle_data_store.rs index ba6450da14..8bb03e9544 100644 --- a/aquamarine/src/particle_data_store.rs +++ b/aquamarine/src/particle_data_store.rs @@ -53,14 +53,19 @@ impl ParticleDataStore { } } - pub fn data_file(&self, particle_id: &str, current_peer_id: &str) -> PathBuf { - let key = store_key_from_components(particle_id, current_peer_id); + pub fn data_file(&self, particle_id: &str, current_peer_id: &str, signature: &[u8]) -> PathBuf { + let key = store_key_from_components(particle_id, current_peer_id, signature); self.particle_data_store.join(key) } /// Returns $ANOMALY_DATA_STORE/$particle_id/$timestamp - pub fn anomaly_dir(&self, particle_id: &str, current_peer_id: &str) -> PathBuf { - let key = store_key_from_components(particle_id, current_peer_id); + pub fn anomaly_dir( + &self, + particle_id: &str, + current_peer_id: &str, + signature: &[u8], + ) -> PathBuf { + let key = store_key_from_components(particle_id, current_peer_id, signature); [ self.anomaly_data_store.as_path(), Path::new(&key), @@ -94,9 +99,10 @@ impl ParticleDataStore { data: &[u8], particle_id: &str, current_peer_id: &str, + signature: &[u8], ) -> Result<()> { tracing::trace!(target: "particle_reap", particle_id = particle_id, "Storing data for particle"); - let data_path = self.data_file(particle_id, current_peer_id); + let data_path = self.data_file(particle_id, current_peer_id, signature); tokio::fs::write(&data_path, data) .await .map_err(|err| DataStoreError::StoreData(err, data_path))?; @@ -105,16 +111,21 @@ impl ParticleDataStore { } #[instrument(level = tracing::Level::INFO)] - pub async fn read_data(&self, particle_id: &str, current_peer_id: &str) -> Result> { - let data_path = self.data_file(particle_id, current_peer_id); + pub async fn read_data( + &self, + particle_id: &str, + current_peer_id: &str, + signature: &[u8], + ) -> Result> { + let data_path = self.data_file(particle_id, current_peer_id, signature); let data = tokio::fs::read(&data_path).await.unwrap_or_default(); Ok(data) } - pub async fn batch_cleanup_data(&self, cleanup_keys: Vec<(String, PeerId)>) { + pub async fn batch_cleanup_data(&self, cleanup_keys: Vec<(String, PeerId, Vec)>) { let futures: FuturesUnordered<_> = cleanup_keys .into_iter() - .map(|(particle_id, peer_id)| async move { + .map(|(particle_id, peer_id, signature)| async move { let peer_id = peer_id.to_string(); tracing::debug!( target: "particle_reap", @@ -123,7 +134,7 @@ impl ParticleDataStore { ); if let Err(err) = self - .cleanup_data(particle_id.as_str(), peer_id.as_str()) + .cleanup_data(particle_id.as_str(), peer_id.as_str(), &signature) .await { tracing::warn!( @@ -137,9 +148,14 @@ impl ParticleDataStore { let _results: Vec<_> = futures.collect().await; } - async fn cleanup_data(&self, particle_id: &str, current_peer_id: &str) -> Result<()> { + async fn cleanup_data( + &self, + particle_id: &str, + current_peer_id: &str, + signature: &[u8], + ) -> Result<()> { tracing::debug!(target: "particle_reap", particle_id = particle_id, "Cleaning up particle data for particle"); - let path = self.data_file(particle_id, current_peer_id); + let path = self.data_file(particle_id, current_peer_id, signature); match tokio::fs::remove_file(&path).await { Ok(_) => Ok(()), // ignore NotFound @@ -186,6 +202,7 @@ impl ParticleDataStore { current_data: &[u8], call_results: &CallResults, particle_parameters: &ParticleParameters<'_>, + particle_signature: &[u8], outcome: &RawAVMOutcome, execution_time: Duration, memory_delta: usize, @@ -194,6 +211,7 @@ impl ParticleDataStore { .read_data( &particle_parameters.particle_id, &particle_parameters.current_peer_id, + particle_signature, ) .await?; @@ -217,6 +235,7 @@ impl ParticleDataStore { self.collect_anomaly_data( &particle_parameters.particle_id, &particle_parameters.current_peer_id, + particle_signature, anomaly_data, ) .await?; @@ -227,9 +246,10 @@ impl ParticleDataStore { &self, particle_id: &str, current_peer_id: &str, + signature: &[u8], anomaly_data: AnomalyData<'_>, ) -> std::result::Result<(), DataStoreError> { - let path = self.anomaly_dir(particle_id, current_peer_id); + let path = self.anomaly_dir(particle_id, current_peer_id, signature); tokio::fs::create_dir_all(&path) .await .map_err(DataStoreError::CreateAnomalyDir)?; @@ -279,8 +299,15 @@ pub enum DataStoreError { ReadData(#[source] std::io::Error, PathBuf), } -fn store_key_from_components(particle_id: &str, current_peer_id: &str) -> String { - format!("particle_{particle_id}-peer_{current_peer_id}") +fn store_key_from_components(particle_id: &str, current_peer_id: &str, signature: &[u8]) -> String { + format!( + "particle_{particle_id}-peer_{current_peer_id}-sig_{}", + format_signature(signature) + ) +} + +fn format_signature(signature: &[u8]) -> String { + bs58::encode(signature).into_string() } #[cfg(test)] @@ -327,14 +354,15 @@ mod tests { let particle_id = "test_particle"; let current_peer_id = "test_peer"; + let signature: &[u8] = &[0]; let data = b"test_data"; particle_data_store - .store_data(data, particle_id, current_peer_id) + .store_data(data, particle_id, current_peer_id, signature) .await .expect("Failed to store data"); let read_result = particle_data_store - .read_data(particle_id, current_peer_id) + .read_data(particle_id, current_peer_id, signature) .await; assert!(read_result.is_ok()); @@ -474,14 +502,15 @@ mod tests { let particle_id = "test_particle"; let current_peer_id = "test_peer"; + let signature: &[u8] = &[]; let data = b"test_data"; particle_data_store - .store_data(data, particle_id, current_peer_id) + .store_data(data, particle_id, current_peer_id, signature) .await .expect("Failed to store data"); - let data_file_path = particle_data_store.data_file(particle_id, current_peer_id); + let data_file_path = particle_data_store.data_file(particle_id, current_peer_id, signature); let vault_path = temp_dir_path.join("vault").join(particle_id); tokio::fs::create_dir_all(&vault_path) .await @@ -490,7 +519,7 @@ mod tests { assert!(vault_path.exists()); let cleanup_result = particle_data_store - .cleanup_data(particle_id, current_peer_id) + .cleanup_data(particle_id, current_peer_id, signature) .await; assert!(cleanup_result.is_ok()); diff --git a/aquamarine/src/particle_executor.rs b/aquamarine/src/particle_executor.rs index 65495ece77..b44aad0fc1 100644 --- a/aquamarine/src/particle_executor.rs +++ b/aquamarine/src/particle_executor.rs @@ -88,7 +88,11 @@ impl ParticleExecutor for RT { let prev_data = data_store .clone() - .read_data(particle_id.as_str(), current_peer_id.to_base58().as_str()) + .read_data( + particle_id.as_str(), + current_peer_id.to_base58().as_str(), + &particle.signature, + ) .await; if let Ok(prev_data) = prev_data { @@ -195,6 +199,7 @@ where &avm_result.particle.data, &avm_result.call_results, &avm_result.particle_params, + &avm_result.particle.signature, outcome, stats.interpretation_time, stats.memory_delta, @@ -214,6 +219,7 @@ where &outcome.data, particle_id.as_str(), current_peer_id.to_base58().as_str(), + &avm_result.particle.signature, ) .await; if let Err(err) = store_result { diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index 60c549616b..604e0cdeac 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -282,7 +282,8 @@ impl Plumber { // do not schedule task if another in progress if self.cleanup_future.is_none() { // Remove expired actors - let mut cleanup_keys: Vec<(String, PeerId)> = Vec::with_capacity(MAX_CLEANUP_KEYS_SIZE); + let mut cleanup_keys: Vec<(String, PeerId, Vec)> = + Vec::with_capacity(MAX_CLEANUP_KEYS_SIZE); let now = now_ms(); self.actors.retain(|_, actor| { // TODO: this code isn't optimal we continue iterate over actors if cleanup keys is full diff --git a/crates/local-vm/src/local_vm.rs b/crates/local-vm/src/local_vm.rs index 7ca618ad71..0374a0ad95 100644 --- a/crates/local-vm/src/local_vm.rs +++ b/crates/local-vm/src/local_vm.rs @@ -248,11 +248,27 @@ pub async fn make_particle( let timestamp = now_ms() as u64; let ttl = particle_ttl.as_millis() as u32; + let mut particle = Particle { + id: id.clone(), + init_peer_id: peer_id, + timestamp, + ttl, + script: script.clone(), + signature: vec![], + data: vec![], + }; + // We can sign at this point since the `data` which is evaluated below isn't part of the signature + particle.sign(key_pair).expect("sign particle"); + let mut call_results: CallResults = <_>::default(); let mut particle_data = vec![]; loop { let prev_data = data_store - .read_data(id.as_str(), peer_id.to_base58().as_str()) + .read_data( + id.as_str(), + peer_id.to_base58().as_str(), + &particle.signature, + ) .await .expect("Could not load prev data"); @@ -276,7 +292,12 @@ pub async fn make_particle( .expect("execute & make particle"); data_store - .store_data(&data, id.as_str(), peer_id.to_base58().as_str()) + .store_data( + &data, + id.as_str(), + peer_id.to_base58().as_str(), + &particle.signature, + ) .await .expect("local vm could not store particle.data data"); @@ -294,17 +315,7 @@ pub async fn make_particle( tokio::task::yield_now().await; } - let mut particle = Particle { - id: id.clone(), - init_peer_id: peer_id, - timestamp, - ttl, - script, - signature: vec![], - data: particle_data, - }; - - particle.sign(key_pair).expect("sign particle"); + particle.data = particle_data; tracing::info!( particle_id = id, @@ -327,7 +338,11 @@ pub async fn read_args( let mut particle_data = particle.data; loop { let prev_data = data_store - .read_data(particle.id.as_str(), peer_id.to_base58().as_str()) + .read_data( + particle.id.as_str(), + peer_id.to_base58().as_str(), + &particle.signature, + ) .await .expect("Could not load prev data"); @@ -351,7 +366,12 @@ pub async fn read_args( .expect("execute & make particle"); data_store - .store_data(&data, particle.id.as_str(), peer_id.to_base58().as_str()) + .store_data( + &data, + particle.id.as_str(), + peer_id.to_base58().as_str(), + &particle.signature, + ) .await .expect("local vm could not store particle.data data");