Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance enhancements related to caching #269

Merged
merged 14 commits into from Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Expand Up @@ -36,6 +36,12 @@ jobs:
command: test
args: --package akd

- name: Test the base library, enabling runtime metrics processing
uses: actions-rs/cargo@v1
with:
command: test
args: --package akd --features runtime_metrics

- name: Test the local auditor, with default features
uses: actions-rs/cargo@v1
with:
Expand Down
3 changes: 3 additions & 0 deletions akd/Cargo.toml
Expand Up @@ -21,11 +21,14 @@ public-tests = ["rand", "bincode", "colored", "once_cell", "serde_serialization"
public_auditing = ["protobuf", "thiserror"]
default = []
serde_serialization = ["serde", "ed25519-dalek/serde"]
# Collect runtime metrics on db access calls + timing
runtime_metrics = []

[dependencies]
## Required dependencies ##
hex = "0.4"
async-trait = "0.1"
dashmap = { version = "5" }
tokio = { version = "1.10", features = ["full"] }
async-recursion = "0.3"
log = { version = "0.4.8", features = ["kv_unstable"] }
Expand Down
116 changes: 70 additions & 46 deletions akd/src/append_only_zks.rs
Expand Up @@ -6,21 +6,21 @@
// of this source tree.

//! An implementation of an append-only zero knowledge set
use crate::serialization::to_digest;
use crate::storage::manager::StorageManager;
use crate::{
errors::TreeNodeError,
proof_structs::{AppendOnlyProof, MembershipProof, NonMembershipProof, SingleAppendOnlyProof},
storage::{Storable, Storage},
storage::{Database, SizeOf, Storable},
tree_node::*,
};

use crate::serialization::to_digest;

use crate::storage::types::StorageType;
use crate::{errors::*, node_label::*, tree_node::TreeNode, ARITY, *};
use async_recursion::async_recursion;
use log::{debug, info};
use std::marker::{Send, Sync};
use tokio::time::Instant;
use std::time::Instant;
use winter_crypto::Hasher;

use keyed_priority_queue::{Entry, KeyedPriorityQueue};
Expand All @@ -44,6 +44,12 @@ pub struct Azks {
pub num_nodes: u64, // The size of the tree
}

impl SizeOf for Azks {
fn size_of(&self) -> usize {
std::mem::size_of::<u64>() * 2
slawlor marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl Storable for Azks {
type StorageKey = u8;

Expand Down Expand Up @@ -80,7 +86,9 @@ impl Clone for Azks {

impl Azks {
/// Creates a new azks
pub async fn new<S: Storage + Sync + Send, H: Hasher>(storage: &S) -> Result<Self, AkdError> {
pub async fn new<S: Database + Sync + Send, H: Hasher>(
storage: &StorageManager<S>,
) -> Result<Self, AkdError> {
create_empty_root::<H, S>(storage, Option::Some(0), Option::Some(0)).await?;
let azks = Azks {
latest_epoch: 0,
Expand All @@ -93,9 +101,9 @@ impl Azks {
/// Inserts a single leaf and is only used for testing, since batching is more efficient.
/// We just want to make sure batch insertions work correctly and this function is useful for that.
#[cfg(test)]
pub async fn insert_leaf<S: Storage + Sync + Send, H: Hasher>(
pub async fn insert_leaf<S: Database + Sync + Send, H: Hasher>(
&mut self,
storage: &S,
storage: &StorageManager<S>,
node: Node<H>,
epoch: u64,
) -> Result<(), AkdError> {
Expand Down Expand Up @@ -126,18 +134,18 @@ impl Azks {
}

/// Insert a batch of new leaves
pub async fn batch_insert_leaves<S: Storage + Sync + Send, H: Hasher>(
pub async fn batch_insert_leaves<S: Database + Sync + Send, H: Hasher>(
&mut self,
storage: &S,
storage: &StorageManager<S>,
insertion_set: Vec<Node<H>>,
) -> Result<(), AkdError> {
self.batch_insert_leaves_helper::<_, H>(storage, insertion_set, false)
.await
}

async fn preload_nodes_for_insertion<S: Storage + Sync + Send, H: Hasher>(
async fn preload_nodes_for_insertion<S: Database + Sync + Send, H: Hasher>(
&self,
storage: &S,
storage: &StorageManager<S>,
insertion_set: &[Node<H>],
) -> Result<u64, AkdError> {
let prefixes_set = crate::utils::build_prefixes_set(
Expand All @@ -152,9 +160,9 @@ impl Azks {
}

/// Preloads given nodes using breadth-first search.
pub async fn bfs_preload_nodes<S: Storage + Sync + Send, H: Hasher>(
pub async fn bfs_preload_nodes<S: Database + Sync + Send, H: Hasher>(
&self,
storage: &S,
storage: &StorageManager<S>,
nodes_to_load: HashSet<NodeLabel>,
) -> Result<u64, AkdError> {
let mut load_count: u64 = 0;
Expand Down Expand Up @@ -192,9 +200,9 @@ impl Azks {
/// An azks is built both by the [crate::directory::Directory] and the auditor.
/// However, both constructions have very minor differences, and the append_only_usage
/// bool keeps track of this.
pub async fn batch_insert_leaves_helper<S: Storage + Sync + Send, H: Hasher>(
pub async fn batch_insert_leaves_helper<S: Database + Sync + Send, H: Hasher>(
&mut self,
storage: &S,
storage: &StorageManager<S>,
insertion_set: Vec<Node<H>>,
append_only_exclude_usage: bool,
) -> Result<(), AkdError> {
Expand Down Expand Up @@ -276,9 +284,9 @@ impl Azks {

/// Returns the Merkle membership proof for the trie as it stood at epoch
// Assumes the verifier has access to the root at epoch
pub async fn get_membership_proof<S: Storage + Sync + Send, H: Hasher>(
pub async fn get_membership_proof<S: Database + Sync + Send, H: Hasher>(
&self,
storage: &S,
storage: &StorageManager<S>,
label: NodeLabel,
_epoch: u64,
) -> Result<MembershipProof<H>, AkdError> {
Expand All @@ -293,9 +301,9 @@ impl Azks {
/// In a compressed trie, the proof consists of the longest prefix
/// of the label that is included in the trie, as well as its children, to show that
/// none of the children is equal to the given label.
pub async fn get_non_membership_proof<S: Storage + Sync + Send, H: Hasher>(
pub async fn get_non_membership_proof<S: Database + Sync + Send, H: Hasher>(
&self,
storage: &S,
storage: &StorageManager<S>,
label: NodeLabel,
) -> Result<NonMembershipProof<H>, AkdError> {
let (longest_prefix_membership_proof, lcp_node_label) =
Expand Down Expand Up @@ -353,9 +361,9 @@ impl Azks {
/// **RESTRICTIONS**: Note that `start_epoch` and `end_epoch` are valid only when the following are true
/// * `start_epoch` <= `end_epoch`
/// * `start_epoch` and `end_epoch` are both existing epochs of this AZKS
pub async fn get_append_only_proof<S: Storage + Sync + Send, H: Hasher>(
pub async fn get_append_only_proof<S: Database + Sync + Send, H: Hasher>(
&self,
storage: &S,
storage: &StorageManager<S>,
start_epoch: u64,
end_epoch: u64,
) -> Result<AppendOnlyProof<H>, AkdError> {
Expand Down Expand Up @@ -423,10 +431,10 @@ impl Azks {
}
}

async fn gather_audit_proof_nodes<S: Storage + Sync + Send, H: Hasher>(
async fn gather_audit_proof_nodes<S: Database + Sync + Send, H: Hasher>(
&self,
nodes: Vec<TreeNode>,
storage: &S,
storage: &StorageManager<S>,
start_epoch: u64,
end_epoch: u64,
) -> Result<u64, AkdError> {
Expand Down Expand Up @@ -455,9 +463,9 @@ impl Azks {
}

#[async_recursion]
async fn get_append_only_proof_helper<S: Storage + Sync + Send, H: Hasher>(
async fn get_append_only_proof_helper<S: Database + Sync + Send, H: Hasher>(
&self,
storage: &S,
storage: &StorageManager<S>,
node: TreeNode,
start_epoch: u64,
end_epoch: u64,
Expand Down Expand Up @@ -519,9 +527,9 @@ impl Azks {

// FIXME: these functions below should be moved into higher-level API
/// Gets the root hash for this azks
pub async fn get_root_hash<S: Storage + Sync + Send, H: Hasher>(
pub async fn get_root_hash<S: Database + Sync + Send, H: Hasher>(
&self,
storage: &S,
storage: &StorageManager<S>,
) -> Result<H::Digest, AkdError> {
self.get_root_hash_at_epoch::<_, H>(storage, self.get_latest_epoch())
.await
Expand All @@ -530,9 +538,9 @@ impl Azks {
/// Gets the root hash of the tree at a epoch.
/// Since this is accessing the root node and the root node exists at all epochs that
/// the azks does, this would never be called at an epoch before the birth of the root node.
pub async fn get_root_hash_at_epoch<S: Storage + Sync + Send, H: Hasher>(
pub async fn get_root_hash_at_epoch<S: Database + Sync + Send, H: Hasher>(
&self,
storage: &S,
storage: &StorageManager<S>,
epoch: u64,
) -> Result<H::Digest, AkdError> {
if self.latest_epoch < epoch {
Expand Down Expand Up @@ -565,9 +573,9 @@ impl Azks {
/// This function returns the node label for the node whose label is the longest common
/// prefix for the queried label. It also returns a membership proof for said label.
/// This is meant to be used in both, getting membership proofs and getting non-membership proofs.
pub async fn get_membership_proof_and_node<S: Storage + Sync + Send, H: Hasher>(
pub async fn get_membership_proof_and_node<S: Database + Sync + Send, H: Hasher>(
&self,
storage: &S,
storage: &StorageManager<S>,
label: NodeLabel,
) -> Result<(MembershipProof<H>, NodeLabel), AkdError> {
let mut layer_proofs = Vec::new();
Expand Down Expand Up @@ -677,7 +685,8 @@ mod tests {
async fn test_batch_insert_basic() -> Result<(), AkdError> {
let mut rng = OsRng;
let num_nodes = 10;
let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();
let db = StorageManager::new_no_cache(&database);
let mut azks1 = Azks::new::<_, Blake3>(&db).await?;
azks1.increment_epoch();

Expand All @@ -693,7 +702,8 @@ mod tests {
azks1.insert_leaf::<_, Blake3>(&db, node, 1).await?;
}

let db2 = AsyncInMemoryDatabase::new();
let database2 = AsyncInMemoryDatabase::new();
let db2 = StorageManager::new_no_cache(&database2);
let mut azks2 = Azks::new::<_, Blake3>(&db2).await?;

azks2
Expand All @@ -713,7 +723,8 @@ mod tests {
async fn test_insert_permuted() -> Result<(), AkdError> {
let num_nodes = 10;
let mut rng = OsRng;
let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();
let db = StorageManager::new_no_cache(&database);
let mut azks1 = Azks::new::<_, Blake3>(&db).await?;
azks1.increment_epoch();
let mut insertion_set: Vec<Node<Blake3>> = vec![];
Expand All @@ -731,7 +742,8 @@ mod tests {
// Try randomly permuting
insertion_set.shuffle(&mut rng);

let db2 = AsyncInMemoryDatabase::new();
let database2 = AsyncInMemoryDatabase::new();
let db2 = StorageManager::new_no_cache(&database2);
let mut azks2 = Azks::new::<_, Blake3>(&db2).await?;

azks2
Expand Down Expand Up @@ -765,7 +777,8 @@ mod tests {

// Try randomly permuting
insertion_set.shuffle(&mut rng);
let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();
let db = StorageManager::new_no_cache(&database);
let mut azks = Azks::new::<_, Blake3>(&db).await?;
azks.batch_insert_leaves::<_, Blake3>(&db, insertion_set.clone())
.await?;
Expand Down Expand Up @@ -795,7 +808,8 @@ mod tests {
insertion_set.push(node);
}

let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();
let db = StorageManager::new_no_cache(&database);
let mut azks = Azks::new::<_, Blake3>(&db).await?;
azks.batch_insert_leaves::<_, Blake3>(&db, insertion_set.clone())
.await?;
Expand Down Expand Up @@ -827,7 +841,8 @@ mod tests {

// Try randomly permuting
insertion_set.shuffle(&mut rng);
let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();
let db = StorageManager::new_no_cache(&database);
let mut azks = Azks::new::<_, Blake3>(&db).await?;
azks.batch_insert_leaves::<_, Blake3>(&db, insertion_set.clone())
.await?;
Expand All @@ -852,7 +867,8 @@ mod tests {

#[tokio::test]
async fn test_membership_proof_intermediate() -> Result<(), AkdError> {
let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();
let db = StorageManager::new_no_cache(&database);

let mut insertion_set: Vec<Node<Blake3>> = vec![];
insertion_set.push(Node {
Expand Down Expand Up @@ -905,7 +921,8 @@ mod tests {
let node = Node::<Blake3> { label, hash };
insertion_set.push(node);
}
let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();
let db = StorageManager::new_no_cache(&database);
let mut azks = Azks::new::<_, Blake3>(&db).await?;
let search_label = insertion_set[0].label;
azks.batch_insert_leaves::<_, Blake3>(&db, insertion_set.clone()[1..2].to_vec())
Expand Down Expand Up @@ -934,7 +951,8 @@ mod tests {
let node = Node::<Blake3> { label, hash };
insertion_set.push(node);
}
let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();
let db = StorageManager::new_no_cache(&database);
let mut azks = Azks::new::<_, Blake3>(&db).await?;
let search_label = insertion_set[num_nodes - 1].label;
azks.batch_insert_leaves::<_, Blake3>(
Expand Down Expand Up @@ -964,7 +982,8 @@ mod tests {
let node = Node::<Blake3> { label, hash };
insertion_set.push(node);
}
let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();
let db = StorageManager::new_no_cache(&database);
let mut azks = Azks::new::<_, Blake3>(&db).await?;
let search_label = insertion_set[num_nodes - 1].label;
azks.batch_insert_leaves::<_, Blake3>(
Expand All @@ -981,7 +1000,8 @@ mod tests {

#[tokio::test]
async fn test_append_only_proof_very_tiny() -> Result<(), AkdError> {
let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();
let db = StorageManager::new_no_cache(&database);
let mut azks = Azks::new::<_, Blake3>(&db).await?;

let mut insertion_set_1: Vec<Node<Blake3>> = vec![];
Expand Down Expand Up @@ -1011,7 +1031,8 @@ mod tests {

#[tokio::test]
async fn test_append_only_proof_tiny() -> Result<(), AkdError> {
let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();
let db = StorageManager::new_no_cache(&database);
let mut azks = Azks::new::<_, Blake3>(&db).await?;

let mut insertion_set_1: Vec<Node<Blake3>> = vec![];
Expand Down Expand Up @@ -1063,7 +1084,8 @@ mod tests {
insertion_set_1.push(node);
}

let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();
let db = StorageManager::new_no_cache(&database);
let mut azks = Azks::new::<_, Blake3>(&db).await?;
azks.batch_insert_leaves::<_, Blake3>(&db, insertion_set_1.clone())
.await?;
Expand Down Expand Up @@ -1111,7 +1133,9 @@ mod tests {

#[tokio::test]
async fn future_epoch_throws_error() -> Result<(), AkdError> {
let db = AsyncInMemoryDatabase::new();
let database = AsyncInMemoryDatabase::new();

let db = StorageManager::new_no_cache(&database);
let azks = Azks::new::<_, Blake3>(&db).await?;

let out = azks.get_root_hash_at_epoch::<_, Blake3>(&db, 123).await;
Expand Down