Skip to content

Commit

Permalink
fix(avm-server): update avm-server to one that user particle_id and p…
Browse files Browse the repository at this point in the history
…eer_id in data_store (#1494)

* try avm with data_store fix

* just use avm after fmt

* make clippy happy

* use new aquavm api

* update avm + make clippy happy

* update to match AquaVM API

* fmt

* use released aqmavm
  • Loading branch information
ValeryAntopol committed Feb 27, 2023
1 parent 2366265 commit 5ede2b0
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 36 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ particle-execution = { path = "particle-execution" }
fluence-spell-dtos = "=0.5.2"
fluence-spell-distro = "=0.5.2"
fluence-app-service = "0.23.0"
avm-server = "0.28.1"
avm-server = "0.30.0"
air-interpreter-wasm = "=0.35.4"
libp2p = { version = "0.50.0", features = ["noise", "tcp", "dns", "websocket", "yamux", "mplex", "async-std", "kad", "ping", "identify", "macros"] }
libp2p-core = { version = "0.38.0", default-features = false, features = ["secp256k1"] }
Expand Down
9 changes: 7 additions & 2 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,14 @@ where
self.future.is_some()
}

pub fn cleanup(&self, particle_id: &str, vm: &mut RT) -> eyre::Result<()> {
pub fn cleanup(
&self,
particle_id: &str,
current_peer_id: &str,
vm: &mut RT,
) -> eyre::Result<()> {
// TODO: remove dirs without using vm https://github.com/fluencelabs/fluence/issues/1216
vm.cleanup(particle_id)?;
vm.cleanup(particle_id, current_peer_id)?;
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions aquamarine/src/aqua_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub trait AquaRuntime: Sized + Send + 'static {
call_results: CallResults,
) -> Result<AVMOutcome, Self::Error>;

fn cleanup(&mut self, particle_id: &str) -> Result<(), Self::Error>;
fn cleanup(&mut self, particle_id: &str, current_peer_id: &str) -> Result<(), Self::Error>;

/// Return current size of memory. Use only for diagnostics purposes.
fn memory_stats(&self) -> AVMMemoryStats;
Expand Down Expand Up @@ -133,8 +133,8 @@ impl AquaRuntime for AVM<DataStoreError> {
}

#[inline]
fn cleanup(&mut self, particle_id: &str) -> Result<(), Self::Error> {
AVM::cleanup_data(self, particle_id)
fn cleanup(&mut self, particle_id: &str, current_peer_id: &str) -> Result<(), Self::Error> {
AVM::cleanup_data(self, particle_id, current_peer_id)
}

fn memory_stats(&self) -> AVMMemoryStats {
Expand Down
43 changes: 29 additions & 14 deletions aquamarine/src/particle_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::path::{Path, PathBuf};
use std::time::Duration;

use avm_server::avm_runner::RawAVMOutcome;
use avm_server::{AnomalyData, DataStore};
use thiserror::Error;

Expand Down Expand Up @@ -49,15 +50,17 @@ impl ParticleDataStore {
}
}

pub fn data_file(&self, key: &str) -> PathBuf {
pub fn data_file(&self, particle_id: &str, current_peer_id: &str) -> PathBuf {
let key = store_key_from_components(particle_id, current_peer_id);
self.particle_data_store.join(key)
}

/// Returns $ANOMALY_DATA_STORE/$particle_id/$timestamp
pub fn anomaly_dir(&self, key: &str) -> PathBuf {
pub fn anomaly_dir(&self, particle_id: &str, current_peer_id: &str) -> PathBuf {
let key = store_key_from_components(particle_id, current_peer_id);
[
self.anomaly_data_store.as_path(),
Path::new(key),
Path::new(&key),
Path::new(&now_ms().to_string()),
]
.iter()
Expand Down Expand Up @@ -85,36 +88,44 @@ impl DataStore for ParticleDataStore {
Ok(())
}

fn store_data(&mut self, data: &[u8], key: &str) -> Result<()> {
let data_path = self.data_file(key);
fn store_data(&mut self, data: &[u8], particle_id: &str, current_peer_id: &str) -> Result<()> {
let data_path = self.data_file(particle_id, current_peer_id);
std::fs::write(&data_path, data).map_err(|err| StoreData(err, data_path))?;

Ok(())
}

fn read_data(&mut self, key: &str) -> Result<Vec<u8>> {
let data_path = self.data_file(key);
fn read_data(&mut self, particle_id: &str, current_peer_id: &str) -> Result<Vec<u8>> {
let data_path = self.data_file(particle_id, current_peer_id);
let data = std::fs::read(data_path).unwrap_or_default();
Ok(data)
}

fn cleanup_data(&mut self, key: &str) -> Result<()> {
remove_file(&self.data_file(key)).map_err(CleanupData)?;
self.vault.cleanup(key)?;
fn cleanup_data(&mut self, particle_id: &str, current_peer_id: &str) -> Result<()> {
remove_file(&self.data_file(particle_id, current_peer_id)).map_err(CleanupData)?;
self.vault.cleanup(particle_id)?;

Ok(())
}

fn detect_anomaly(&self, execution_time: Duration, memory_delta: usize) -> bool {
execution_time > EXECUTION_TIME_THRESHOLD || memory_delta > MEMORY_DELTA_BYTES_THRESHOLD
fn detect_anomaly(
&self,
execution_time: Duration,
memory_delta: usize,
outcome: &RawAVMOutcome,
) -> bool {
execution_time > EXECUTION_TIME_THRESHOLD
|| memory_delta > MEMORY_DELTA_BYTES_THRESHOLD
|| outcome.ret_code != 0
}

fn collect_anomaly_data(
&mut self,
key: &str,
particle_id: &str,
current_peer_id: &str,
anomaly_data: AnomalyData<'_>,
) -> std::result::Result<(), Self::Error> {
let path = self.anomaly_dir(key);
let path = self.anomaly_dir(particle_id, current_peer_id);
create_dir(&path).map_err(DataStoreError::CreateAnomalyDir)?;

let file = path.join("data");
Expand Down Expand Up @@ -142,3 +153,7 @@ pub enum DataStoreError {
#[error("error serializing anomaly data")]
SerializeAnomaly(#[source] serde_json::error::Error),
}

fn store_key_from_components(particle_id: &str, current_peer_id: &str) -> String {
format!("particle_{particle_id}-peer_{current_peer_id}")
}
14 changes: 10 additions & 4 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,20 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
// Remove expired actors
if let Some((vm_id, mut vm)) = self.vm_pool.get_vm() {
let now = now_ms();
self.actors.retain(|(particle_id, _peer_id), actor| {
self.actors.retain(|(particle_id, peer_id), actor| {
// if actor hasn't yet expired or is still executing, keep it
// TODO: if actor is expired, cancel execution and return VM back to pool
// https://github.com/fluencelabs/fluence/issues/1212
if !actor.is_expired(now) || actor.is_executing() {
return true; // keep actor
}

log::debug!("Reaping particle's actor {}", particle_id);
log::debug!(
"Reaping particle's actor particle_id={particle_id}, peer_id={peer_id})"
);
// cleanup files and dirs after particle processing (vault & prev_data)
// TODO: do not pass vm https://github.com/fluencelabs/fluence/issues/1216
if let Err(err) = actor.cleanup(particle_id, &mut vm) {
if let Err(err) = actor.cleanup(particle_id, &peer_id.to_string(), &mut vm) {
log::warn!(
"Error cleaning up after particle {}: {:?}",
particle_id,
Expand Down Expand Up @@ -360,7 +362,11 @@ mod tests {
})
}

fn cleanup(&mut self, _particle_id: &str) -> Result<(), Self::Error> {
fn cleanup(
&mut self,
_particle_id: &str,
_current_peer_id: &str,
) -> Result<(), Self::Error> {
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/toy-vms/src/easy_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl AquaRuntime for EasyVM {
})
}

fn cleanup(&mut self, _particle_id: &str) -> Result<(), Self::Error> {
fn cleanup(&mut self, _particle_id: &str, _current_peer_id: &str) -> Result<(), Self::Error> {
// Nothing to cleanup in EasyVM
Ok(())
}
Expand Down

0 comments on commit 5ede2b0

Please sign in to comment.