Skip to content

Commit

Permalink
fix(particle-data-store): use signature in particle data store path n…
Browse files Browse the repository at this point in the history
…ame [NET-632] (#2052)

* use signature in data store name in base58 format
  • Loading branch information
kmd-fl committed Feb 2, 2024
1 parent e61b546 commit 3065a95
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aquamarine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ serde_json = { workspace = true }
parking_lot = { workspace = true }
chrono = "0.4.33"
base64 = { workspace = true }
bs58 = { workspace = true }
thiserror = { workspace = true }
humantime = "2.1.0"
anyhow = "1.0.79"
Expand Down
5 changes: 3 additions & 2 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ where
self.future.is_some()
}

pub fn cleanup_key(&self) -> (String, PeerId) {
pub fn cleanup_key(&self) -> (String, PeerId, Vec<u8>) {
let particle_id = self.particle.id.clone();
(particle_id, self.current_peer_id)
let signature = self.particle.signature.clone();
(particle_id, self.current_peer_id, signature)
}

pub fn mailbox_size(&self) -> usize {
Expand Down
69 changes: 49 additions & 20 deletions aquamarine/src/particle_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,19 @@ impl ParticleDataStore {
}
}

pub fn data_file(&self, particle_id: &str, current_peer_id: &str) -> PathBuf {
let key = store_key_from_components(particle_id, current_peer_id);
pub fn data_file(&self, particle_id: &str, current_peer_id: &str, signature: &[u8]) -> PathBuf {
let key = store_key_from_components(particle_id, current_peer_id, signature);
self.particle_data_store.join(key)
}

/// Returns $ANOMALY_DATA_STORE/$particle_id/$timestamp
pub fn anomaly_dir(&self, particle_id: &str, current_peer_id: &str) -> PathBuf {
let key = store_key_from_components(particle_id, current_peer_id);
pub fn anomaly_dir(
&self,
particle_id: &str,
current_peer_id: &str,
signature: &[u8],
) -> PathBuf {
let key = store_key_from_components(particle_id, current_peer_id, signature);
[
self.anomaly_data_store.as_path(),
Path::new(&key),
Expand Down Expand Up @@ -94,9 +99,10 @@ impl ParticleDataStore {
data: &[u8],
particle_id: &str,
current_peer_id: &str,
signature: &[u8],
) -> Result<()> {
tracing::trace!(target: "particle_reap", particle_id = particle_id, "Storing data for particle");
let data_path = self.data_file(particle_id, current_peer_id);
let data_path = self.data_file(particle_id, current_peer_id, signature);
tokio::fs::write(&data_path, data)
.await
.map_err(|err| DataStoreError::StoreData(err, data_path))?;
Expand All @@ -105,16 +111,21 @@ impl ParticleDataStore {
}

#[instrument(level = tracing::Level::INFO)]
pub async fn read_data(&self, particle_id: &str, current_peer_id: &str) -> Result<Vec<u8>> {
let data_path = self.data_file(particle_id, current_peer_id);
pub async fn read_data(
&self,
particle_id: &str,
current_peer_id: &str,
signature: &[u8],
) -> Result<Vec<u8>> {
let data_path = self.data_file(particle_id, current_peer_id, signature);
let data = tokio::fs::read(&data_path).await.unwrap_or_default();
Ok(data)
}

pub async fn batch_cleanup_data(&self, cleanup_keys: Vec<(String, PeerId)>) {
pub async fn batch_cleanup_data(&self, cleanup_keys: Vec<(String, PeerId, Vec<u8>)>) {
let futures: FuturesUnordered<_> = cleanup_keys
.into_iter()
.map(|(particle_id, peer_id)| async move {
.map(|(particle_id, peer_id, signature)| async move {
let peer_id = peer_id.to_string();
tracing::debug!(
target: "particle_reap",
Expand All @@ -123,7 +134,7 @@ impl ParticleDataStore {
);

if let Err(err) = self
.cleanup_data(particle_id.as_str(), peer_id.as_str())
.cleanup_data(particle_id.as_str(), peer_id.as_str(), &signature)
.await
{
tracing::warn!(
Expand All @@ -137,9 +148,14 @@ impl ParticleDataStore {
let _results: Vec<_> = futures.collect().await;
}

async fn cleanup_data(&self, particle_id: &str, current_peer_id: &str) -> Result<()> {
async fn cleanup_data(
&self,
particle_id: &str,
current_peer_id: &str,
signature: &[u8],
) -> Result<()> {
tracing::debug!(target: "particle_reap", particle_id = particle_id, "Cleaning up particle data for particle");
let path = self.data_file(particle_id, current_peer_id);
let path = self.data_file(particle_id, current_peer_id, signature);
match tokio::fs::remove_file(&path).await {
Ok(_) => Ok(()),
// ignore NotFound
Expand Down Expand Up @@ -186,6 +202,7 @@ impl ParticleDataStore {
current_data: &[u8],
call_results: &CallResults,
particle_parameters: &ParticleParameters<'_>,
particle_signature: &[u8],
outcome: &RawAVMOutcome,
execution_time: Duration,
memory_delta: usize,
Expand All @@ -194,6 +211,7 @@ impl ParticleDataStore {
.read_data(
&particle_parameters.particle_id,
&particle_parameters.current_peer_id,
particle_signature,
)
.await?;

Expand All @@ -217,6 +235,7 @@ impl ParticleDataStore {
self.collect_anomaly_data(
&particle_parameters.particle_id,
&particle_parameters.current_peer_id,
particle_signature,
anomaly_data,
)
.await?;
Expand All @@ -227,9 +246,10 @@ impl ParticleDataStore {
&self,
particle_id: &str,
current_peer_id: &str,
signature: &[u8],
anomaly_data: AnomalyData<'_>,
) -> std::result::Result<(), DataStoreError> {
let path = self.anomaly_dir(particle_id, current_peer_id);
let path = self.anomaly_dir(particle_id, current_peer_id, signature);
tokio::fs::create_dir_all(&path)
.await
.map_err(DataStoreError::CreateAnomalyDir)?;
Expand Down Expand Up @@ -279,8 +299,15 @@ pub enum DataStoreError {
ReadData(#[source] std::io::Error, PathBuf),
}

fn store_key_from_components(particle_id: &str, current_peer_id: &str) -> String {
format!("particle_{particle_id}-peer_{current_peer_id}")
fn store_key_from_components(particle_id: &str, current_peer_id: &str, signature: &[u8]) -> String {
format!(
"particle_{particle_id}-peer_{current_peer_id}-sig_{}",
format_signature(signature)
)
}

fn format_signature(signature: &[u8]) -> String {
bs58::encode(signature).into_string()
}

#[cfg(test)]
Expand Down Expand Up @@ -327,14 +354,15 @@ mod tests {

let particle_id = "test_particle";
let current_peer_id = "test_peer";
let signature: &[u8] = &[0];
let data = b"test_data";

particle_data_store
.store_data(data, particle_id, current_peer_id)
.store_data(data, particle_id, current_peer_id, signature)
.await
.expect("Failed to store data");
let read_result = particle_data_store
.read_data(particle_id, current_peer_id)
.read_data(particle_id, current_peer_id, signature)
.await;

assert!(read_result.is_ok());
Expand Down Expand Up @@ -474,14 +502,15 @@ mod tests {

let particle_id = "test_particle";
let current_peer_id = "test_peer";
let signature: &[u8] = &[];
let data = b"test_data";

particle_data_store
.store_data(data, particle_id, current_peer_id)
.store_data(data, particle_id, current_peer_id, signature)
.await
.expect("Failed to store data");

let data_file_path = particle_data_store.data_file(particle_id, current_peer_id);
let data_file_path = particle_data_store.data_file(particle_id, current_peer_id, signature);
let vault_path = temp_dir_path.join("vault").join(particle_id);
tokio::fs::create_dir_all(&vault_path)
.await
Expand All @@ -490,7 +519,7 @@ mod tests {
assert!(vault_path.exists());

let cleanup_result = particle_data_store
.cleanup_data(particle_id, current_peer_id)
.cleanup_data(particle_id, current_peer_id, signature)
.await;

assert!(cleanup_result.is_ok());
Expand Down
8 changes: 7 additions & 1 deletion aquamarine/src/particle_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ impl<RT: AquaRuntime> ParticleExecutor for RT {

let prev_data = data_store
.clone()
.read_data(particle_id.as_str(), current_peer_id.to_base58().as_str())
.read_data(
particle_id.as_str(),
current_peer_id.to_base58().as_str(),
&particle.signature,
)
.await;

if let Ok(prev_data) = prev_data {
Expand Down Expand Up @@ -195,6 +199,7 @@ where
&avm_result.particle.data,
&avm_result.call_results,
&avm_result.particle_params,
&avm_result.particle.signature,
outcome,
stats.interpretation_time,
stats.memory_delta,
Expand All @@ -214,6 +219,7 @@ where
&outcome.data,
particle_id.as_str(),
current_peer_id.to_base58().as_str(),
&avm_result.particle.signature,
)
.await;
if let Err(err) = store_result {
Expand Down
3 changes: 2 additions & 1 deletion aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
// do not schedule task if another in progress
if self.cleanup_future.is_none() {
// Remove expired actors
let mut cleanup_keys: Vec<(String, PeerId)> = Vec::with_capacity(MAX_CLEANUP_KEYS_SIZE);
let mut cleanup_keys: Vec<(String, PeerId, Vec<u8>)> =
Vec::with_capacity(MAX_CLEANUP_KEYS_SIZE);
let now = now_ms();
self.actors.retain(|_, actor| {
// TODO: this code isn't optimal we continue iterate over actors if cleanup keys is full
Expand Down
50 changes: 35 additions & 15 deletions crates/local-vm/src/local_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,27 @@ pub async fn make_particle(
let timestamp = now_ms() as u64;
let ttl = particle_ttl.as_millis() as u32;

let mut particle = Particle {
id: id.clone(),
init_peer_id: peer_id,
timestamp,
ttl,
script: script.clone(),
signature: vec![],
data: vec![],
};
// We can sign at this point since the `data` which is evaluated below isn't part of the signature
particle.sign(key_pair).expect("sign particle");

let mut call_results: CallResults = <_>::default();
let mut particle_data = vec![];
loop {
let prev_data = data_store
.read_data(id.as_str(), peer_id.to_base58().as_str())
.read_data(
id.as_str(),
peer_id.to_base58().as_str(),
&particle.signature,
)
.await
.expect("Could not load prev data");

Expand All @@ -276,7 +292,12 @@ pub async fn make_particle(
.expect("execute & make particle");

data_store
.store_data(&data, id.as_str(), peer_id.to_base58().as_str())
.store_data(
&data,
id.as_str(),
peer_id.to_base58().as_str(),
&particle.signature,
)
.await
.expect("local vm could not store particle.data data");

Expand All @@ -294,17 +315,7 @@ pub async fn make_particle(
tokio::task::yield_now().await;
}

let mut particle = Particle {
id: id.clone(),
init_peer_id: peer_id,
timestamp,
ttl,
script,
signature: vec![],
data: particle_data,
};

particle.sign(key_pair).expect("sign particle");
particle.data = particle_data;

tracing::info!(
particle_id = id,
Expand All @@ -327,7 +338,11 @@ pub async fn read_args(
let mut particle_data = particle.data;
loop {
let prev_data = data_store
.read_data(particle.id.as_str(), peer_id.to_base58().as_str())
.read_data(
particle.id.as_str(),
peer_id.to_base58().as_str(),
&particle.signature,
)
.await
.expect("Could not load prev data");

Expand All @@ -351,7 +366,12 @@ pub async fn read_args(
.expect("execute & make particle");

data_store
.store_data(&data, particle.id.as_str(), peer_id.to_base58().as_str())
.store_data(
&data,
particle.id.as_str(),
peer_id.to_base58().as_str(),
&particle.signature,
)
.await
.expect("local vm could not store particle.data data");

Expand Down

0 comments on commit 3065a95

Please sign in to comment.