Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance enhancements related to caching #269

Merged
merged 14 commits into from Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions akd/Cargo.toml
Expand Up @@ -21,6 +21,13 @@ public-tests = ["rand", "bincode", "colored", "once_cell", "serde_serialization"
public_auditing = ["protobuf", "thiserror"]
default = []
serde_serialization = ["serde", "ed25519-dalek/serde"]
# Represents that we're going to support a higher-level of parallelism and therefore should avoid
# usages of a RwLock around a HashMap which doesn't scale well for parallel access (i.e. in the storage
# caches)
high_parallelism = ["dashmap"]
# Add support for handling memory pressure process on the caches. This costs some performance, however
# provides the ability to protect against OOMing
memory_pressure = []

[dependencies]
## Required dependencies ##
Expand All @@ -35,6 +42,7 @@ winter-math = "0.2"
keyed_priority_queue = "0.3"

## Optional Dependencies ##
dashmap = { version = "5", optional = true }
bincode = { version = "1", optional = true }
serde = { version = "1", features = ["derive"], optional = true }
rand = { version = "0.7", optional = true }
Expand Down
8 changes: 7 additions & 1 deletion akd/src/append_only_zks.rs
Expand Up @@ -9,7 +9,7 @@
use crate::{
errors::TreeNodeError,
proof_structs::{AppendOnlyProof, MembershipProof, NonMembershipProof, SingleAppendOnlyProof},
storage::{Storable, Storage},
storage::{SizeOf, Storable, Storage},
tree_node::*,
};

Expand Down Expand Up @@ -44,6 +44,12 @@ pub struct Azks {
pub num_nodes: u64, // The size of the tree
}

impl SizeOf for Azks {
fn size_of(&self) -> usize {
std::mem::size_of::<u64>() * 2
slawlor marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl Storable for Azks {
type StorageKey = u8;

Expand Down
7 changes: 7 additions & 0 deletions akd/src/helper_structs.rs
Expand Up @@ -11,6 +11,7 @@
#[cfg(feature = "serde_serialization")]
use crate::serialization::{digest_deserialize, digest_serialize};

use winter_crypto::Digest;
use winter_crypto::Hasher;

use crate::{storage::types::ValueState, NodeLabel};
Expand All @@ -36,6 +37,12 @@ pub struct Node<H: Hasher> {
pub hash: H::Digest,
}

impl<H: Hasher> crate::storage::SizeOf for Node<H> {
fn size_of(&self) -> usize {
self.label.size_of() + self.hash.as_bytes().len()
}
}

// can't use #derive because it doesn't bind correctly
// #derive and generics are not friendly; might make Debug weird too ...
// see also:
Expand Down
6 changes: 6 additions & 0 deletions akd/src/node_label.rs
Expand Up @@ -45,6 +45,12 @@ pub struct NodeLabel {
pub label_len: u32,
}

impl crate::storage::SizeOf for NodeLabel {
fn size_of(&self) -> usize {
32 + std::mem::size_of::<u32>()
}
}

impl PartialOrd for NodeLabel {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
Expand Down
65 changes: 28 additions & 37 deletions akd/src/storage/timed_cache.rs → akd/src/storage/caches/basic.rs
Expand Up @@ -5,32 +5,28 @@
// License, Version 2.0 found in the LICENSE-APACHE file in the root directory
// of this source tree.

//! This module implements a basic async timed cache
//! This module implements a basic async timed cache. It additionally counts some
//! metrics related to access counts which can be helpful for profiling/debugging

use super::{CachedItem, CACHE_CLEAN_FREQUENCY_MS, DEFAULT_ITEM_LIFETIME_MS};
use crate::storage::DbRecord;
#[cfg(feature = "memory_pressure")]
use crate::storage::SizeOf;
use crate::storage::Storable;
use log::{debug, error, info, trace, warn};
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

// item's live for 30s
const DEFAULT_ITEM_LIFETIME_MS: u64 = 30000;
// clean the cache every 15s
const CACHE_CLEAN_FREQUENCY_MS: u64 = 15000;

struct CachedItem {
expiration: Instant,
data: DbRecord,
}
use std::time::Duration;
use std::time::Instant;

/// Implements a basic cahce with timing information which automatically flushes
/// expired entries and removes them
pub struct TimedCache {
azks: Arc<tokio::sync::RwLock<Option<DbRecord>>>,
map: Arc<tokio::sync::RwLock<HashMap<Vec<u8>, CachedItem>>>,
last_clean: Arc<tokio::sync::RwLock<Instant>>,
can_clean: Arc<tokio::sync::RwLock<bool>>,
can_clean: Arc<AtomicBool>,
item_lifetime: Duration,
hit_count: Arc<tokio::sync::RwLock<u64>>,
}
Expand Down Expand Up @@ -72,8 +68,8 @@ impl Clone for TimedCache {

impl TimedCache {
async fn clean(&self) {
let can_clean_guard = self.can_clean.read().await;
if !*can_clean_guard {
let can_clean = self.can_clean.load(Ordering::Relaxed);
if !can_clean {
// cleaning is disabled
return;
}
Expand All @@ -86,26 +82,24 @@ impl TimedCache {
if do_clean {
debug!("BEGIN clean cache");
let now = Instant::now();
let mut keys_to_flush = HashSet::new();

let mut write = self.map.write().await;
for (key, value) in write.iter() {
if value.expiration < now {
keys_to_flush.insert(key.clone());
}
}
if !keys_to_flush.is_empty() {
for key in keys_to_flush.into_iter() {
write.remove(&key);
}
}
write.retain(|_, v| v.expiration >= now);
debug!("END clean cache");

// update last clean time
*(self.last_clean.write().await) = Instant::now();
}
}

#[cfg(feature = "memory_pressure")]
/// Measure the size of the underlying hashmap and storage utilized
pub async fn measure(&self) -> usize {
let read = self.map.read().await;
read.iter()
.map(|(key, item)| key.len() + item.size_of())
.sum()
}

/// Create a new timed cache instance. You can supply an optional item lifetime parameter
/// or take the default (30s)
pub fn new(o_lifetime: Option<Duration>) -> Self {
Expand All @@ -117,7 +111,7 @@ impl TimedCache {
azks: Arc::new(tokio::sync::RwLock::new(None)),
map: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
last_clean: Arc::new(tokio::sync::RwLock::new(Instant::now())),
can_clean: Arc::new(tokio::sync::RwLock::new(true)),
can_clean: Arc::new(AtomicBool::new(true)),
item_lifetime: lifetime,
hit_count: Arc::new(tokio::sync::RwLock::new(0)),
}
Expand Down Expand Up @@ -153,8 +147,7 @@ impl TimedCache {
debug!("END cache retrieve");
if let Some(result) = ptr.get(&full_key) {
*(self.hit_count.write().await) += 1;

let ignore_clean = !*self.can_clean.read().await;
let ignore_clean = !self.can_clean.load(Ordering::Relaxed);
// if we've disabled cache cleaning, we're in the middle
// of an in-memory transaction and should ignore expiration
// of cache items until this flag is disabled again
Expand Down Expand Up @@ -221,16 +214,14 @@ impl TimedCache {
}

/// Disable cache-cleaning (i.e. during a transaction)
pub async fn disable_clean(&self) {
pub fn disable_clean(&self) {
debug!("Disabling MySQL object cache cleaning");
let mut guard = self.can_clean.write().await;
(*guard) = false;
self.can_clean.store(false, Ordering::Relaxed);
}

/// Re-enable cache cleaning (i.e. when a transaction is over)
pub async fn enable_clean(&self) {
pub fn enable_clean(&self) {
debug!("Enabling MySQL object cache cleaning");
let mut guard = self.can_clean.write().await;
(*guard) = true;
self.can_clean.store(true, Ordering::Relaxed);
}
}