Skip to content

Commit

Permalink
feat(particle-vault)!: introduce new particle vault path format (#2098)
Browse files Browse the repository at this point in the history
  • Loading branch information
kmd-fl authored Feb 23, 2024
1 parent 79e7850 commit 3d68c85
Show file tree
Hide file tree
Showing 15 changed files with 233 additions and 83 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,40 @@ server:
particle_reap=debug" \
cargo run --release -p nox

spell:
WASM_LOG="trace" \
RUST_LOG="info,\
aquamarine::aqua_runtime=off,\
ipfs_effector=debug,\
ipfs_pure=debug,\
run-console=debug,\
system_services=debug,\
spell_even_bus=trace,\
marine_core::module::marine_module=info,\
aquamarine::log=debug,\
aquamarine=warn,\
tokio_threadpool=info,\
tokio_reactor=info,\
mio=info,\
tokio_io=info,\
soketto=info,\
yamux=info,\
multistream_select=info,\
libp2p_secio=info,\
libp2p_websocket::framed=info,\
libp2p_ping=info,\
libp2p_core::upgrade::apply=info,\
libp2p_kad::kbucket=info,\
cranelift_codegen=info,\
wasmer_wasi=info,\
cranelift_codegen=info,\
wasmer_wasi=info,\
wasmtime_cranelift=off,\
wasmtime_jit=off,\
particle_protocol=off,\
particle_reap=debug" \
cargo run --release -p nox

local-env:
docker compose -f docker-compose.yml up -d

Expand Down
11 changes: 9 additions & 2 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub struct Actor<RT, F> {
/// Particles and call results will be processed in the security scope of this peer id
/// It's either `host_peer_id` or local worker peer id
current_peer_id: PeerId,
/// TODO: for the clean-up, I don't think we need it here!
particle_token: String,
key_pair: KeyPair,
data_store: Arc<ParticleDataStore>,
spawner: Spawner,
Expand All @@ -74,10 +76,13 @@ where
RT: AquaRuntime,
F: ParticleFunctionStatic,
{
// TODO: temporary (I hope), need to do smth clever with particle_token
#[allow(clippy::too_many_arguments)]
pub fn new(
particle: &Particle,
functions: Functions<F>,
current_peer_id: PeerId,
particle_token: String,
key_pair: KeyPair,
data_store: Arc<ParticleDataStore>,
deal_id: Option<String>,
Expand All @@ -95,6 +100,7 @@ where
..particle.clone()
},
current_peer_id,
particle_token,
key_pair,
data_store,
spawner,
Expand All @@ -110,10 +116,11 @@ where
self.future.is_some()
}

pub fn cleanup_key(&self) -> (String, PeerId, Vec<u8>) {
pub fn cleanup_key(&self) -> (String, PeerId, Vec<u8>, String) {
let particle_id = self.particle.id.clone();
let signature = self.particle.signature.clone();
(particle_id, self.current_peer_id, signature)
let token = self.particle_token.clone();
(particle_id, self.current_peer_id, signature, token)
}

pub fn mailbox_size(&self) -> usize {
Expand Down
71 changes: 44 additions & 27 deletions aquamarine/src/particle_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,48 +122,57 @@ impl ParticleDataStore {
Ok(data)
}

pub async fn batch_cleanup_data(&self, cleanup_keys: Vec<(String, PeerId, Vec<u8>)>) {
pub async fn batch_cleanup_data(&self, cleanup_keys: Vec<(String, PeerId, Vec<u8>, String)>) {
let futures: FuturesUnordered<_> = cleanup_keys
.into_iter()
.map(|(particle_id, peer_id, signature)| async move {
let peer_id = peer_id.to_string();
tracing::debug!(
target: "particle_reap",
particle_id = particle_id, worker_id = peer_id,
"Reaping particle's actor"
);

if let Err(err) = self
.cleanup_data(particle_id.as_str(), peer_id.as_str(), &signature)
.await
{
tracing::warn!(
particle_id = particle_id,
"Error cleaning up after particle {:?}",
err
.map(
|(particle_id, peer_id, signature, particle_token)| async move {
tracing::debug!(
target: "particle_reap",
particle_id = particle_id, worker_id = peer_id.to_base58(),
"Reaping particle's actor"
);
}
})

if let Err(err) = self
.cleanup_data(
particle_id.as_str(),
peer_id,
&signature,
particle_token.as_str(),
)
.await
{
tracing::warn!(
particle_id = particle_id,
"Error cleaning up after particle {:?}",
err
);
}
},
)
.collect();
let _results: Vec<_> = futures.collect().await;
}

async fn cleanup_data(
&self,
particle_id: &str,
current_peer_id: &str,
current_peer_id: PeerId,
signature: &[u8],
particle_token: &str,
) -> Result<()> {
tracing::debug!(target: "particle_reap", particle_id = particle_id, "Cleaning up particle data for particle");
let path = self.data_file(particle_id, current_peer_id, signature);
let path = self.data_file(particle_id, &current_peer_id.to_base58(), signature);
match tokio::fs::remove_file(&path).await {
Ok(_) => Ok(()),
// ignore NotFound
Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
Err(err) => Err(DataStoreError::CleanupData(err)),
}?;

self.vault.cleanup(particle_id).await?;
self.vault
.cleanup(current_peer_id, particle_id, particle_token)
.await?;

Ok(())
}
Expand Down Expand Up @@ -318,6 +327,7 @@ mod tests {
use crate::ParticleDataStore;
use avm_server::avm_runner::RawAVMOutcome;
use avm_server::{CallRequests, CallResults, CallServiceResult};
use fluence_libp2p::PeerId;
use std::path::PathBuf;
use std::time::Duration;

Expand Down Expand Up @@ -501,25 +511,32 @@ mod tests {
.expect("Failed to initialize");

let particle_id = "test_particle";
let current_peer_id = "test_peer";
let particle_token = "test_token";
let current_peer_id = PeerId::random();
let current_peer_id_str = current_peer_id.to_base58();
let signature: &[u8] = &[];
let data = b"test_data";

particle_data_store
.store_data(data, particle_id, current_peer_id, signature)
.store_data(data, particle_id, &current_peer_id_str, signature)
.await
.expect("Failed to store data");

let data_file_path = particle_data_store.data_file(particle_id, current_peer_id, signature);
let vault_path = temp_dir_path.join("vault").join(particle_id);
let data_file_path =
particle_data_store.data_file(particle_id, &current_peer_id_str, signature);
let vault_path = particle_data_store.vault.real_particle_vault(
current_peer_id,
particle_id,
particle_token,
);
tokio::fs::create_dir_all(&vault_path)
.await
.expect("Failed to create vault dir");
assert!(data_file_path.exists());
assert!(vault_path.exists());

let cleanup_result = particle_data_store
.cleanup_data(particle_id, current_peer_id, signature)
.cleanup_data(particle_id, current_peer_id, signature, particle_token)
.await;

assert!(cleanup_result.is_ok());
Expand Down
10 changes: 7 additions & 3 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,11 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
&self.key_storage.root_key_pair,
&particle.particle.signature,
)?;
let params =
ParticleParams::clone_from(particle.as_ref(), peer_scope, particle_token);
let params = ParticleParams::clone_from(
particle.as_ref(),
peer_scope,
particle_token.clone(),
);
let functions = Functions::new(params, builtins.clone());
let key_pair = self
.key_storage
Expand Down Expand Up @@ -221,6 +224,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
particle.as_ref(),
functions,
current_peer_id,
particle_token,
key_pair,
data_store,
deal_id,
Expand Down Expand Up @@ -329,7 +333,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
// do not schedule task if another in progress
if self.cleanup_future.is_none() {
// Remove expired actors
let mut cleanup_keys: Vec<(String, PeerId, Vec<u8>)> =
let mut cleanup_keys: Vec<(String, PeerId, Vec<u8>, String)> =
Vec::with_capacity(MAX_CLEANUP_KEYS_SIZE);
let now = now_ms();
self.actors.retain(|_, actor| {
Expand Down
11 changes: 10 additions & 1 deletion crates/fs-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ use eyre::{eyre, Context};
use futures_util::StreamExt;
use std::fmt::Debug;
use std::fs;
use std::fs::Permissions;
use std::fs::{DirBuilder, Permissions};
use std::future::Future;
use std::io::ErrorKind;
use std::os::unix::fs::DirBuilderExt;
use std::path::{Path, PathBuf};
use std::thread::available_parallelism;
use thiserror::Error;
Expand Down Expand Up @@ -77,6 +78,14 @@ pub fn create_dir<P: AsRef<Path> + Debug>(dir: P) -> Result<(), std::io::Error>
.map_err(|err| std::io::Error::new(err.kind(), format!("{err:?}: {dir:?}")))
}

pub fn create_dir_write_only<P: AsRef<Path> + Debug>(dir: P) -> Result<(), std::io::Error> {
DirBuilder::new()
.recursive(true)
.mode(0o333)
.create(&dir)
.map_err(|err| std::io::Error::new(err.kind(), format!("{err:?}: {dir:?}")))
}

pub fn remove_dirs<Item>(dirs: &[Item]) -> Result<(), std::io::Error>
where
Item: AsRef<Path> + Debug,
Expand Down
Binary file modified crates/nox-tests/tests/file_share/artifacts/file_share.wasm
Binary file not shown.
4 changes: 3 additions & 1 deletion crates/nox-tests/tests/file_share/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ fn store_file(contents: impl AsRef<[u8]>) -> (String, String) {

fn vault_dir() -> PathBuf {
let particle_id = get_call_parameters().particle.id;
let vault = Path::new("/tmp").join("vault").join(particle_id);
let token = get_call_parameters().particle.token;
let path = format!("{}-{}", particle_id, token);
let vault = Path::new("/tmp").join("vault").join(path);

vault
}
2 changes: 1 addition & 1 deletion crates/system-services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
aqua-ipfs-distro = "=0.5.30"
aqua-ipfs-distro = "=0.6.0"
decider-distro = "=0.6.7"
registry-distro = "=0.9.4"
trust-graph-distro = "=0.4.11"
Expand Down
5 changes: 3 additions & 2 deletions crates/system-services/src/deployer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,11 @@ impl Deployer {
}

let trigger_config = spell_event_bus::api::from_user_config(&spell_distro.trigger_config)?;
let params = CallParams::local(
let params = CallParams::new(
self.host_peer_id,
PeerScope::Host,
spell_id.to_string(),
self.host_peer_id,
Some(format!("spell_{spell_id}_0")),
DEPLOYER_TTL,
);
// update trigger config
Expand Down
20 changes: 14 additions & 6 deletions particle-builtins/src/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ where

let module_hash = self.modules.add_module_from_vault(
&self.services.vault,
self.scopes.to_peer_id(params.peer_scope),
module_path,
config,
params,
Expand Down Expand Up @@ -649,6 +650,7 @@ where

let config = ModuleRepository::load_module_config_from_vault(
&self.services.vault,
self.scopes.to_peer_id(params.peer_scope),
config_path,
params,
)?;
Expand Down Expand Up @@ -694,10 +696,11 @@ where
let mut args = args.function_args.into_iter();
let blueprint_path: String = Args::next("blueprint_path", &mut args)?;

let data = self
.services
.vault
.cat_slice(&params, Path::new(&blueprint_path))?;
let current_peer_id = self.scopes.to_peer_id(params.peer_scope);
let data =
self.services
.vault
.cat_slice(current_peer_id, &params, Path::new(&blueprint_path))?;
let blueprint = AddBlueprint::decode(&data).map_err(|err| {
JError::new(format!(
"Error parsing blueprint from vault {blueprint_path:?}: {err}"
Expand Down Expand Up @@ -986,17 +989,22 @@ where
let mut args = args.function_args.into_iter();
let data: String = Args::next("data", &mut args)?;
let name = uuid();
let virtual_path = self.services.vault.put(&params, name, &data)?;
let current_peer_id = self.scopes.to_peer_id(params.peer_scope);
let virtual_path = self
.services
.vault
.put(current_peer_id, &params, name, &data)?;

Ok(JValue::String(virtual_path.display().to_string()))
}

fn vault_cat(&self, args: Args, params: ParticleParams) -> Result<JValue, JError> {
let mut args = args.function_args.into_iter();
let path: String = Args::next("path", &mut args)?;
let current_peer_id = self.scopes.to_peer_id(params.peer_scope);
self.services
.vault
.cat(&params, Path::new(&path))
.cat(current_peer_id, &params, Path::new(&path))
.map(JValue::String)
.map_err(|_| JError::new(format!("Error reading vault file `{path}`")))
}
Expand Down
Loading

0 comments on commit 3d68c85

Please sign in to comment.