Skip to content

Commit

Permalink
fix(datastore): remove particles (#1965)
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu committed Dec 18, 2023
1 parent 72ebb4d commit 17e5e2e
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 7 deletions.
2 changes: 1 addition & 1 deletion aquamarine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fluence-keypair = { workspace = true }
futures = { workspace = true }
log = { workspace = true }

tokio = { workspace = true, features = ["fs"] }
tokio = { workspace = true, features = ["fs", "rt"] }
tokio-stream = { workspace = true }
tracing = { workspace = true }
serde_json = { workspace = true }
Expand Down
152 changes: 146 additions & 6 deletions aquamarine/src/particle_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use fluence_libp2p::PeerId;
use thiserror::Error;
use tracing::instrument;

use crate::DataStoreError::SerializeAnomaly;
use now_millis::now_ms;
use particle_execution::{ParticleVault, VaultError};

Expand Down Expand Up @@ -130,7 +129,7 @@ impl ParticleDataStore {
async fn cleanup_data(&self, particle_id: &str, current_peer_id: &str) -> Result<()> {
tracing::debug!(target: "particle_reap", particle_id = particle_id, "Cleaning up particle data for particle");
let path = self.data_file(particle_id, current_peer_id);
match tokio::fs::remove_dir_all(&path).await {
match tokio::fs::remove_file(&path).await {
Ok(_) => Ok(()),
// ignore NotFound
Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
Expand Down Expand Up @@ -172,9 +171,12 @@ impl ParticleDataStore {
)
.await?;

let ser_particle = serde_json::to_vec(particle_parameters).map_err(SerializeAnomaly)?;
let ser_call_results = serde_json::to_vec(call_results).map_err(SerializeAnomaly)?;
let ser_avm_outcome = serde_json::to_vec(outcome).map_err(SerializeAnomaly)?;
let ser_particle =
serde_json::to_vec(particle_parameters).map_err(DataStoreError::SerializeAnomaly)?;
let ser_call_results =
serde_json::to_vec(call_results).map_err(DataStoreError::SerializeAnomaly)?;
let ser_avm_outcome =
serde_json::to_vec(outcome).map_err(DataStoreError::SerializeAnomaly)?;

let anomaly_data = AnomalyData {
air_script: Cow::Borrowed(air_script),
Expand Down Expand Up @@ -210,7 +212,7 @@ impl ParticleDataStore {
let data = serde_json::to_vec(&anomaly_data).map_err(DataStoreError::SerializeAnomaly)?;
tokio::fs::write(&file, data)
.await
.map_err(|err| DataStoreError::ReadData(err, file))?;
.map_err(|err| DataStoreError::WriteAnomaly(err, file))?;

Ok(())
}
Expand Down Expand Up @@ -239,3 +241,141 @@ pub enum DataStoreError {
fn store_key_from_components(particle_id: &str, current_peer_id: &str) -> String {
format!("particle_{particle_id}-peer_{current_peer_id}")
}

#[cfg(test)]
mod tests {
use crate::ParticleDataStore;
use avm_server::avm_runner::RawAVMOutcome;
use avm_server::CallRequests;
use std::path::PathBuf;
use std::time::Duration;

#[tokio::test]
async fn test_initialize() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let particle_data_store = temp_dir.path().join("particle_data_store");
let vault_dir = temp_dir.path().join("vault");
let anomaly_data_store = temp_dir.path().join("anomaly_data_store");
let particle_data_store_clone = particle_data_store.clone();

let particle_data_store =
ParticleDataStore::new(particle_data_store, vault_dir, anomaly_data_store);

let result = particle_data_store.initialize().await;

assert!(result.is_ok());
assert!(particle_data_store_clone.exists());
}

#[tokio::test]
async fn test_store_and_read_data() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let particle_data_store = temp_dir.path().join("particle_data_store");
let vault_dir = temp_dir.path().join("vault");
let anomaly_data_store = temp_dir.path().join("anomaly_data_store");

let particle_data_store =
ParticleDataStore::new(particle_data_store, vault_dir, anomaly_data_store);
particle_data_store
.initialize()
.await
.expect("Failed to initialize");

let particle_id = "test_particle";
let current_peer_id = "test_peer";
let data = b"test_data";

particle_data_store
.store_data(data, particle_id, current_peer_id)
.await
.expect("Failed to store data");
let read_result = particle_data_store
.read_data(particle_id, current_peer_id)
.await;

assert!(read_result.is_ok());
assert_eq!(read_result.unwrap(), data);
}

#[tokio::test]
async fn test_detect_anomaly() {
let particle_data_store = ParticleDataStore::new(
PathBuf::from("dummy"),
PathBuf::from("dummy"),
PathBuf::from("dummy"),
);

let execution_time_below_threshold = Duration::from_millis(400);
let execution_time_above_threshold = Duration::from_millis(600);
let memory_delta_below_threshold = 5 * bytesize::MB as usize;
let memory_delta_above_threshold = 15 * bytesize::MB as usize;
let outcome_success = RawAVMOutcome {
ret_code: 0,
error_message: "".to_string(),
data: vec![],
call_requests: CallRequests::new(),
next_peer_pks: vec![],
};
let outcome_failure = RawAVMOutcome {
ret_code: 1,
error_message: "".to_string(),
data: vec![],
call_requests: CallRequests::new(),
next_peer_pks: vec![],
};

let anomaly_below_threshold = particle_data_store.detect_anomaly(
execution_time_below_threshold,
memory_delta_below_threshold,
&outcome_success,
);
let anomaly_above_threshold = particle_data_store.detect_anomaly(
execution_time_above_threshold,
memory_delta_above_threshold,
&outcome_failure,
);

assert!(!anomaly_below_threshold);
assert!(anomaly_above_threshold);
}

#[tokio::test]
async fn test_cleanup_data() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let temp_dir_path = temp_dir.path();
let particle_data_store = ParticleDataStore::new(
temp_dir_path.join("particle_data_store"),
temp_dir_path.join("vault"),
temp_dir_path.join("anomaly_data_store"),
);
particle_data_store
.initialize()
.await
.expect("Failed to initialize");

let particle_id = "test_particle";
let current_peer_id = "test_peer";
let data = b"test_data";

particle_data_store
.store_data(data, particle_id, current_peer_id)
.await
.expect("Failed to store data");

let data_file_path = particle_data_store.data_file(particle_id, current_peer_id);
let vault_path = temp_dir_path.join("vault").join(particle_id);
tokio::fs::create_dir_all(&vault_path)
.await
.expect("Failed to create vault dir");
assert!(data_file_path.exists());
assert!(vault_path.exists());

let cleanup_result = particle_data_store
.cleanup_data(particle_id, current_peer_id)
.await;

assert!(cleanup_result.is_ok());
assert!(!data_file_path.exists());
assert!(!vault_path.exists())
}
}

0 comments on commit 17e5e2e

Please sign in to comment.