Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open up TTL backing for nested backings #13

Merged
merged 1 commit into from Jan 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cache-loader-async-macros/src/lib.rs
Expand Up @@ -101,7 +101,7 @@ pub fn test_with_features(item: proc_macro::TokenStream) -> proc_macro::TokenStr
#[cfg(feature = "ttl-cache")]
#[tokio::test]
async fn #fn_ident_ttl() {
let #ident: LoadingCache<#key_type, #value_type, #error_type, TtlCacheBacking<_, _>> = LoadingCache::with_backing(TtlCacheBacking::new(Duration::from_secs(3)), move |key: #key_type| {
let #ident: LoadingCache<#key_type, #value_type, #error_type, TtlCacheBacking<_, _, _>> = LoadingCache::with_backing(TtlCacheBacking::new(Duration::from_secs(3)), move |key: #key_type| {
async move #loader
});

Expand Down
135 changes: 81 additions & 54 deletions src/backing.rs
Expand Up @@ -5,6 +5,7 @@ use lru::LruCache;
#[cfg(feature = "ttl-cache")]
use std::collections::VecDeque;
use std::fmt::Debug;
use std::marker::PhantomData;
use thiserror::Error;
#[cfg(feature = "ttl-cache")]
use std::ops::Add;
Expand All @@ -20,8 +21,8 @@ pub trait CacheBacking<K, V>
fn get(&mut self, key: &K) -> Result<Option<&V>, BackingError>;
fn set(&mut self, key: K, value: V, meta: Option<Self::Meta>) -> Result<Option<V>, BackingError>;
fn remove(&mut self, key: &K) -> Result<Option<V>, BackingError>;
fn contains_key(&self, key: &K) -> Result<bool, BackingError>;
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send + 'static>) -> Result<(), BackingError>;
fn contains_key(&mut self, key: &K) -> Result<bool, BackingError>;
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send + Sync + 'static>) -> Result<Vec<(K, V)>, BackingError>;
fn clear(&mut self) -> Result<(), BackingError>;
}

Expand Down Expand Up @@ -62,11 +63,12 @@ impl<
Ok(self.lru.pop(key))
}

fn contains_key(&self, key: &K) -> Result<bool, BackingError> {
fn contains_key(&mut self, key: &K) -> Result<bool, BackingError> {
Ok(self.lru.contains(&key.clone()))
}

fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send>) -> Result<(), BackingError> {
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send + Sync>) -> Result<Vec<(K, V)>, BackingError> {
let mut removed = Vec::new();
let keys = self.lru.iter()
.filter_map(|(key, value)| {
if predicate((key, value)) {
Expand All @@ -78,9 +80,10 @@ impl<
.cloned()
.collect::<Vec<K>>();
for key in keys.into_iter() {
self.lru.pop(&key);
let val = self.lru.pop(&key);
removed.push((key, val.expect("LRU value is empty")))
}
Ok(())
Ok(removed)
}

fn clear(&mut self) -> Result<(), BackingError> {
Expand Down Expand Up @@ -108,10 +111,15 @@ impl<
}

#[cfg(feature = "ttl-cache")]
pub struct TtlCacheBacking<K, V> {
pub struct TtlCacheBacking<
K: Clone + Eq + Hash + Send,
V: Clone + Sized + Send,
B: CacheBacking<K, (V, Instant)>
> {
phantom: PhantomData<V>,
ttl: Duration,
expiry_queue: VecDeque<TTlEntry<K>>,
map: HashMap<K, (V, Instant)>,
map: B,
}

#[cfg(feature = "ttl-cache")]
Expand Down Expand Up @@ -154,25 +162,26 @@ impl From<Duration> for TtlMeta {

#[cfg(feature = "ttl-cache")]
impl<
K: Eq + Hash + Sized + Clone + Send,
V: Sized + Clone + Send
> CacheBacking<K, V> for TtlCacheBacking<K, V> {
K: Clone + Eq + Hash + Send + 'static,
V: Clone + Sized + Send + 'static,
B: CacheBacking<K, (V, Instant)>
> CacheBacking<K, V> for TtlCacheBacking<K, V, B> {
type Meta = TtlMeta;

fn get_mut(&mut self, key: &K) -> Result<Option<&mut V>, BackingError> {
self.remove_old();
Ok(self.map.get_mut(key)
self.remove_old()?;
Ok(self.map.get_mut(key)?
.map(|(value, _)| value))
}

fn get(&mut self, key: &K) -> Result<Option<&V>, BackingError> {
self.remove_old();
Ok(self.map.get(key)
self.remove_old()?;
Ok(self.map.get(key)?
.map(|(value, _)| value))
}

fn set(&mut self, key: K, value: V, meta: Option<Self::Meta>) -> Result<Option<V>, BackingError> {
self.remove_old();
self.remove_old()?;
let ttl = if let Some(meta) = meta {
meta.ttl
} else {
Expand All @@ -184,66 +193,77 @@ impl<
}

fn remove(&mut self, key: &K) -> Result<Option<V>, BackingError> {
self.remove_old();
self.remove_old()?;
Ok(self.remove_key(key)?)
}

fn contains_key(&self, key: &K) -> Result<bool, BackingError> {
// we cant clean old keys on this, since the self ref is not mutable :(
Ok(self.map.get(key)
.filter(|(_, expiry)| Instant::now().lt(expiry))
.is_some())
fn contains_key(&mut self, key: &K) -> Result<bool, BackingError> {
self.remove_old()?;
Ok(self.map.get(key)?.is_some())
}

fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send>) -> Result<(), BackingError> {
let keys = self.map.iter()
.filter_map(|(key, (value, _))| {
if predicate((key, value)) {
Some(key)
} else {
None
}
})
.cloned()
.collect::<Vec<K>>();
for key in keys.into_iter() {
self.map.remove(&key);
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send + Sync>) -> Result<Vec<(K, V)>, BackingError> {
let values = self.map.remove_if(Box::new(move |(key, (value, _))| predicate((key, value))))?;
let mut mapped = Vec::with_capacity(values.len());
for (key, (value, _)) in values {
// optimize looping through expiry_queue multiple times?
self.expiry_queue.retain(|entry| entry.key.ne(&key))
self.expiry_queue.retain(|entry| entry.key.ne(&key));
mapped.push((key, value));
}
Ok(())
Ok(mapped)
}

fn clear(&mut self) -> Result<(), BackingError> {
self.expiry_queue.clear();
self.map.clear();
self.map.clear()?;
Ok(())
}
}

#[cfg(feature = "ttl-cache")]
impl<K: Eq + Hash + Sized + Clone + Send, V: Sized + Clone + Send> TtlCacheBacking<K, V> {
pub fn new(ttl: Duration) -> TtlCacheBacking<K, V> {
impl<
K: Eq + Hash + Sized + Clone + Send,
V: Sized + Clone + Send,
> TtlCacheBacking<K, V, HashMapBacking<K, (V, Instant)>> {
pub fn new(ttl: Duration) -> TtlCacheBacking<K, V, HashMapBacking<K, (V, Instant)>> {
TtlCacheBacking {
phantom: Default::default(),
ttl,
map: HashMapBacking::new(),
expiry_queue: VecDeque::new(),
}
}
}

#[cfg(feature = "ttl-cache")]
impl<
K: Eq + Hash + Sized + Clone + Send,
V: Sized + Clone + Send,
B: CacheBacking<K, (V, Instant)>
> TtlCacheBacking<K, V, B> {
pub fn with_backing(ttl: Duration, backing: B) -> TtlCacheBacking<K, V, B> {
TtlCacheBacking {
phantom: Default::default(),
ttl,
map: HashMap::new(),
map: backing,
expiry_queue: VecDeque::new(),
}
}

fn remove_old(&mut self) {
fn remove_old(&mut self) -> Result<(), BackingError> {
let now = Instant::now();
while let Some(entry) = self.expiry_queue.pop_front() {
if now.lt(&entry.expiry) {
self.expiry_queue.push_front(entry);
break;
}
self.map.remove(&entry.key);
self.map.remove(&entry.key)?;
}
Ok(())
}

fn replace(&mut self, key: K, value: V, expiry: Instant) -> Result<Option<V>, TtlError> {
let entry = self.map.insert(key.clone(), (value, expiry));
fn replace(&mut self, key: K, value: V, expiry: Instant) -> Result<Option<V>, BackingError> {
let entry = self.map.set(key.clone(), (value, expiry), None)?;
let res = self.cleanup_expiry(entry, &key);
match self.expiry_queue.binary_search_by_key(&expiry, |entry| entry.expiry) {
Ok(found) => {
Expand All @@ -256,24 +276,24 @@ impl<K: Eq + Hash + Sized + Clone + Send, V: Sized + Clone + Send> TtlCacheBacki
res
}

fn remove_key(&mut self, key: &K) -> Result<Option<V>, TtlError> {
let entry = self.map.remove(key);
fn remove_key(&mut self, key: &K) -> Result<Option<V>, BackingError> {
let entry = self.map.remove(key)?;
self.cleanup_expiry(entry, key)
}

fn cleanup_expiry(&mut self, entry: Option<(V, Instant)>, key: &K) -> Result<Option<V>, TtlError> {
fn cleanup_expiry(&mut self, entry: Option<(V, Instant)>, key: &K) -> Result<Option<V>, BackingError> {
if let Some((value, old_expiry)) = entry {
match self.expiry_queue.binary_search_by_key(&old_expiry, |entry| entry.expiry) {
Ok(found) => {
let index = self.expiry_index_on_key_eq(found, &old_expiry, key);
if let Some(index) = index {
self.expiry_queue.remove(index);
} else {
return Err(TtlError::ExpiryKeyNotFound);
return Err(TtlError::ExpiryKeyNotFound.into());
}
}
Err(_) => {
return Err(TtlError::ExpiryNotFound);
return Err(TtlError::ExpiryNotFound.into());
}
}
Ok(Some(value))
Expand Down Expand Up @@ -340,13 +360,20 @@ impl<
Ok(self.map.remove(key))
}

fn contains_key(&self, key: &K) -> Result<bool, BackingError> {
fn contains_key(&mut self, key: &K) -> Result<bool, BackingError> {
Ok(self.map.contains_key(key))
}

fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send>) -> Result<(), BackingError> {
self.map.retain(|k, v| !predicate((k, v)));
Ok(())
fn remove_if(&mut self, predicate: Box<dyn Fn((&K, &V)) -> bool + Send + Sync>) -> Result<Vec<(K, V)>, BackingError> {
let removed = self.map.iter()
.filter(|(k, v)| predicate((k, v)))
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<(K, V)>>();

for (k, _) in removed.iter() {
self.map.remove(k);
}
Ok(removed)
}

fn clear(&mut self) -> Result<(), BackingError> {
Expand Down
25 changes: 25 additions & 0 deletions src/test.rs
Expand Up @@ -396,4 +396,29 @@ async fn test_ttl_backing() {
tokio::time::sleep(Duration::from_secs(2)).await;

assert_eq!(cache.exists("key1".to_owned()).await.unwrap(), false);
}

#[cfg(all(feature = "ttl-cache", feature = "lru-cache"))]
#[tokio::test]
async fn test_ttl_lru_backing() {
let cache: LoadingCache<String, _, u8, _> = LoadingCache::with_meta_loader(TtlCacheBacking::with_backing(Duration::from_secs(1), LruCacheBacking::new(2)), move |key: String| {
async move {
if key.len() < 3 {
Ok(key.to_lowercase())
.with_meta(Some(TtlMeta::from(Duration::from_secs(15))))
} else {
Ok(key.to_lowercase())
.with_meta(None)
}
}
});
assert_eq!(cache.get("a".to_owned()).await.unwrap(), "a");
assert_eq!(cache.get("bbbbb".to_owned()).await.unwrap(), "bbbbb");
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(cache.exists("a".to_owned()).await.unwrap());
assert!(!cache.exists("bbbbb".to_owned()).await.unwrap());
assert_eq!(cache.get("bbbbb".to_owned()).await.unwrap(), "bbbbb");
assert_eq!(cache.get("ccccc".to_owned()).await.unwrap(), "ccccc");
assert_eq!(cache.get("ddddd".to_owned()).await.unwrap(), "ddddd");
assert!(!cache.exists("a".to_owned()).await.unwrap());
}