Skip to content

Commit

Permalink
open up ttl backing for nested backings (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
ByteAlex committed Jan 23, 2022
1 parent dd09af1 commit 4fdf7ea
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 55 deletions.
2 changes: 1 addition & 1 deletion cache-loader-async-macros/src/lib.rs
Expand Up @@ -101,7 +101,7 @@ pub fn test_with_features(item: proc_macro::TokenStream) -> proc_macro::TokenStr
#[cfg(feature = "ttl-cache")]
#[tokio::test]
async fn #fn_ident_ttl() {
let #ident: LoadingCache<#key_type, #value_type, #error_type, TtlCacheBacking<_, _>> = LoadingCache::with_backing(TtlCacheBacking::new(Duration::from_secs(3)), move |key: #key_type| {
let #ident: LoadingCache<#key_type, #value_type, #error_type, TtlCacheBacking<_, _, _>> = LoadingCache::with_backing(TtlCacheBacking::new(Duration::from_secs(3)), move |key: #key_type| {
async move #loader
});

Expand Down
135 changes: 81 additions & 54 deletions src/backing.rs
Expand Up @@ -5,6 +5,7 @@ use lru::LruCache;
#[cfg(feature = "ttl-cache")]
use std::collections::VecDeque;
use std::fmt::Debug;
use std::marker::PhantomData;
use thiserror::Error;
#[cfg(feature = "ttl-cache")]
use std::ops::Add;
Expand All @@ -20,8 +21,8 @@ pub trait CacheBacking<K, V>
fn get(&mut self, key: &K) -> Result<Option<&V>, BackingError>;
fn set(&mut self, key: K, value: V, meta: Option<Self::Meta>) -> Result<Option<V>, BackingError>;
fn remove(&mut self, key: &K) -> Result<Option<V>, BackingError>;
fn contains_key(&self, key: &K) -> Result<bool, BackingError>;
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send + 'static>) -> Result<(), BackingError>;
fn contains_key(&mut self, key: &K) -> Result<bool, BackingError>;
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send + Sync + 'static>) -> Result<Vec<(K, V)>, BackingError>;
fn clear(&mut self) -> Result<(), BackingError>;
}

Expand Down Expand Up @@ -62,11 +63,12 @@ impl<
Ok(self.lru.pop(key))
}

fn contains_key(&self, key: &K) -> Result<bool, BackingError> {
fn contains_key(&mut self, key: &K) -> Result<bool, BackingError> {
Ok(self.lru.contains(&key.clone()))
}

fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send>) -> Result<(), BackingError> {
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send + Sync>) -> Result<Vec<(K, V)>, BackingError> {
let mut removed = Vec::new();
let keys = self.lru.iter()
.filter_map(|(key, value)| {
if predicate((key, value)) {
Expand All @@ -78,9 +80,10 @@ impl<
.cloned()
.collect::<Vec<K>>();
for key in keys.into_iter() {
self.lru.pop(&key);
let val = self.lru.pop(&key);
removed.push((key, val.expect("LRU value is empty")))
}
Ok(())
Ok(removed)
}

fn clear(&mut self) -> Result<(), BackingError> {
Expand Down Expand Up @@ -108,10 +111,15 @@ impl<
}

#[cfg(feature = "ttl-cache")]
pub struct TtlCacheBacking<K, V> {
pub struct TtlCacheBacking<
K: Clone + Eq + Hash + Send,
V: Clone + Sized + Send,
B: CacheBacking<K, (V, Instant)>
> {
phantom: PhantomData<V>,
ttl: Duration,
expiry_queue: VecDeque<TTlEntry<K>>,
map: HashMap<K, (V, Instant)>,
map: B,
}

#[cfg(feature = "ttl-cache")]
Expand Down Expand Up @@ -154,25 +162,26 @@ impl From<Duration> for TtlMeta {

#[cfg(feature = "ttl-cache")]
impl<
K: Eq + Hash + Sized + Clone + Send,
V: Sized + Clone + Send
> CacheBacking<K, V> for TtlCacheBacking<K, V> {
K: Clone + Eq + Hash + Send + 'static,
V: Clone + Sized + Send + 'static,
B: CacheBacking<K, (V, Instant)>
> CacheBacking<K, V> for TtlCacheBacking<K, V, B> {
type Meta = TtlMeta;

fn get_mut(&mut self, key: &K) -> Result<Option<&mut V>, BackingError> {
self.remove_old();
Ok(self.map.get_mut(key)
self.remove_old()?;
Ok(self.map.get_mut(key)?
.map(|(value, _)| value))
}

fn get(&mut self, key: &K) -> Result<Option<&V>, BackingError> {
self.remove_old();
Ok(self.map.get(key)
self.remove_old()?;
Ok(self.map.get(key)?
.map(|(value, _)| value))
}

fn set(&mut self, key: K, value: V, meta: Option<Self::Meta>) -> Result<Option<V>, BackingError> {
self.remove_old();
self.remove_old()?;
let ttl = if let Some(meta) = meta {
meta.ttl
} else {
Expand All @@ -184,66 +193,77 @@ impl<
}

fn remove(&mut self, key: &K) -> Result<Option<V>, BackingError> {
self.remove_old();
self.remove_old()?;
Ok(self.remove_key(key)?)
}

fn contains_key(&self, key: &K) -> Result<bool, BackingError> {
// we cant clean old keys on this, since the self ref is not mutable :(
Ok(self.map.get(key)
.filter(|(_, expiry)| Instant::now().lt(expiry))
.is_some())
fn contains_key(&mut self, key: &K) -> Result<bool, BackingError> {
self.remove_old()?;
Ok(self.map.get(key)?.is_some())
}

fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send>) -> Result<(), BackingError> {
let keys = self.map.iter()
.filter_map(|(key, (value, _))| {
if predicate((key, value)) {
Some(key)
} else {
None
}
})
.cloned()
.collect::<Vec<K>>();
for key in keys.into_iter() {
self.map.remove(&key);
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send + Sync>) -> Result<Vec<(K, V)>, BackingError> {
let values = self.map.remove_if(Box::new(move |(key, (value, _))| predicate((key, value))))?;
let mut mapped = Vec::with_capacity(values.len());
for (key, (value, _)) in values {
// optimize looping through expiry_queue multiple times?
self.expiry_queue.retain(|entry| entry.key.ne(&key))
self.expiry_queue.retain(|entry| entry.key.ne(&key));
mapped.push((key, value));
}
Ok(())
Ok(mapped)
}

fn clear(&mut self) -> Result<(), BackingError> {
self.expiry_queue.clear();
self.map.clear();
self.map.clear()?;
Ok(())
}
}

#[cfg(feature = "ttl-cache")]
impl<K: Eq + Hash + Sized + Clone + Send, V: Sized + Clone + Send> TtlCacheBacking<K, V> {
pub fn new(ttl: Duration) -> TtlCacheBacking<K, V> {
impl<
K: Eq + Hash + Sized + Clone + Send,
V: Sized + Clone + Send,
> TtlCacheBacking<K, V, HashMapBacking<K, (V, Instant)>> {
pub fn new(ttl: Duration) -> TtlCacheBacking<K, V, HashMapBacking<K, (V, Instant)>> {
TtlCacheBacking {
phantom: Default::default(),
ttl,
map: HashMapBacking::new(),
expiry_queue: VecDeque::new(),
}
}
}

#[cfg(feature = "ttl-cache")]
impl<
K: Eq + Hash + Sized + Clone + Send,
V: Sized + Clone + Send,
B: CacheBacking<K, (V, Instant)>
> TtlCacheBacking<K, V, B> {
pub fn with_backing(ttl: Duration, backing: B) -> TtlCacheBacking<K, V, B> {
TtlCacheBacking {
phantom: Default::default(),
ttl,
map: HashMap::new(),
map: backing,
expiry_queue: VecDeque::new(),
}
}

fn remove_old(&mut self) {
fn remove_old(&mut self) -> Result<(), BackingError> {
let now = Instant::now();
while let Some(entry) = self.expiry_queue.pop_front() {
if now.lt(&entry.expiry) {
self.expiry_queue.push_front(entry);
break;
}
self.map.remove(&entry.key);
self.map.remove(&entry.key)?;
}
Ok(())
}

fn replace(&mut self, key: K, value: V, expiry: Instant) -> Result<Option<V>, TtlError> {
let entry = self.map.insert(key.clone(), (value, expiry));
fn replace(&mut self, key: K, value: V, expiry: Instant) -> Result<Option<V>, BackingError> {
let entry = self.map.set(key.clone(), (value, expiry), None)?;
let res = self.cleanup_expiry(entry, &key);
match self.expiry_queue.binary_search_by_key(&expiry, |entry| entry.expiry) {
Ok(found) => {
Expand All @@ -256,24 +276,24 @@ impl<K: Eq + Hash + Sized + Clone + Send, V: Sized + Clone + Send> TtlCacheBacki
res
}

fn remove_key(&mut self, key: &K) -> Result<Option<V>, TtlError> {
let entry = self.map.remove(key);
fn remove_key(&mut self, key: &K) -> Result<Option<V>, BackingError> {
let entry = self.map.remove(key)?;
self.cleanup_expiry(entry, key)
}

fn cleanup_expiry(&mut self, entry: Option<(V, Instant)>, key: &K) -> Result<Option<V>, TtlError> {
fn cleanup_expiry(&mut self, entry: Option<(V, Instant)>, key: &K) -> Result<Option<V>, BackingError> {
if let Some((value, old_expiry)) = entry {
match self.expiry_queue.binary_search_by_key(&old_expiry, |entry| entry.expiry) {
Ok(found) => {
let index = self.expiry_index_on_key_eq(found, &old_expiry, key);
if let Some(index) = index {
self.expiry_queue.remove(index);
} else {
return Err(TtlError::ExpiryKeyNotFound);
return Err(TtlError::ExpiryKeyNotFound.into());
}
}
Err(_) => {
return Err(TtlError::ExpiryNotFound);
return Err(TtlError::ExpiryNotFound.into());
}
}
Ok(Some(value))
Expand Down Expand Up @@ -340,13 +360,20 @@ impl<
Ok(self.map.remove(key))
}

fn contains_key(&self, key: &K) -> Result<bool, BackingError> {
fn contains_key(&mut self, key: &K) -> Result<bool, BackingError> {
Ok(self.map.contains_key(key))
}

fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send>) -> Result<(), BackingError> {
self.map.retain(|k, v| !predicate((k, v)));
Ok(())
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send + Sync>) -> Result<Vec<(K, V)>, BackingError> {
let removed = self.map.iter()
.filter(|(k, v)| predicate((k, v)))
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<(K, V)>>();

for (k, _) in removed.iter() {
self.map.remove(k);
}
Ok(removed)
}

fn clear(&mut self) -> Result<(), BackingError> {
Expand Down
25 changes: 25 additions & 0 deletions src/test.rs
Expand Up @@ -396,4 +396,29 @@ async fn test_ttl_backing() {
tokio::time::sleep(Duration::from_secs(2)).await;

assert_eq!(cache.exists("key1".to_owned()).await.unwrap(), false);
}

#[cfg(all(feature = "ttl-cache", feature = "lru-cache"))]
#[tokio::test]
async fn test_ttl_lru_backing() {
let cache: LoadingCache<String, _, u8, _> = LoadingCache::with_meta_loader(TtlCacheBacking::with_backing(Duration::from_secs(1), LruCacheBacking::new(2)), move |key: String| {
async move {
if key.len() < 3 {
Ok(key.to_lowercase())
.with_meta(Some(TtlMeta::from(Duration::from_secs(15))))
} else {
Ok(key.to_lowercase())
.with_meta(None)
}
}
});
assert_eq!(cache.get("a".to_owned()).await.unwrap(), "a");
assert_eq!(cache.get("bbbbb".to_owned()).await.unwrap(), "bbbbb");
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(cache.exists("a".to_owned()).await.unwrap());
assert!(!cache.exists("bbbbb".to_owned()).await.unwrap());
assert_eq!(cache.get("bbbbb".to_owned()).await.unwrap(), "bbbbb");
assert_eq!(cache.get("ccccc".to_owned()).await.unwrap(), "ccccc");
assert_eq!(cache.get("ddddd".to_owned()).await.unwrap(), "ddddd");
assert!(!cache.exists("a".to_owned()).await.unwrap());
}

0 comments on commit 4fdf7ea

Please sign in to comment.