diff --git a/util/BUILD b/util/BUILD index 5a02eec8c..d43aec4e1 100644 --- a/util/BUILD +++ b/util/BUILD @@ -82,6 +82,7 @@ rust_library( "//third_party:fast_async_mutex", "//third_party:lru", "//third_party:serde", + "//third_party:tokio", "//util:common", ], proc_macro_deps = ["//third_party:async_trait"], diff --git a/util/evicting_map.rs b/util/evicting_map.rs index 8bf790923..ec79aa73b 100644 --- a/util/evicting_map.rs +++ b/util/evicting_map.rs @@ -115,7 +115,7 @@ pub struct EvictingMap { impl EvictingMap where - T: LenEntry + Debug + Clone + Send + Sync, + T: LenEntry + Debug + Clone + Send + Sync + 'static, I: InstantWrapper, { pub fn new(config: &EvictionPolicy, anchor_time: I) -> Self { @@ -168,7 +168,7 @@ where ); } // Just in case we allow for some cleanup (eg: old items). - self.evict_items(state.deref_mut()).await; + self.evict_items(state.deref_mut()); } fn should_evict(&self, lru_len: usize, peek_entry: &EvictionItem, sum_store_size: u64, evicting: bool) -> bool { @@ -184,13 +184,10 @@ where let is_over_count = self.max_count != 0 && (lru_len as u64) > self.max_count; - if is_over_size || old_item_exists || is_over_count { - return true; - } - false + is_over_size || old_item_exists || is_over_count } - async fn evict_items(&self, state: &mut State) { + fn evict_items(&self, state: &mut State) { let mut peek_entry = if let Some((_, entry)) = state.lru.peek_lru() { entry } else { @@ -201,8 +198,13 @@ where evicting = true; let (key, eviction_item) = state.lru.pop_lru().expect("Tried to peek() then pop() but failed"); state.sum_store_size -= eviction_item.data.len() as u64; - eviction_item.data.unref().await; - log::info!("\x1b[0;31mevicting map\x1b[0m: Evicting {:?}", key); + + // Perform the eviction in a separate greenlet because this function + // cannot be async otherwise the lock Send trait does not hold. + tokio::spawn(async move { + eviction_item.data.unref().await; + log::info!("\x1b[0;31mevicting map\x1b[0m: Evicting {:?}", key); + }); peek_entry = if let Some((_, entry)) = state.lru.peek_lru() { entry @@ -212,28 +214,31 @@ where } } - pub async fn size_for_key(&self, digest: &DigestInfo) -> Option { + async fn get_item_for_key(&self, digest: &DigestInfo) -> Option { let mut state = self.state.lock().await; if let Some(mut entry) = state.lru.get_mut(digest) { entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; - let data = entry.data.clone(); - drop(state); - data.touch().await; - return Some(data.len()); + Some(entry.data.clone()) + } else { + None + } + } + + pub async fn size_for_key(&self, digest: &DigestInfo) -> Option { + match self.get(digest).await { + Some(data) => Some(data.len()), + None => None, } - None } pub async fn get(&self, digest: &DigestInfo) -> Option { - let mut state = self.state.lock().await; - if let Some(mut entry) = state.lru.get_mut(digest) { - entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; - let data = entry.data.clone(); - drop(state); - data.touch().await; - return Some(data); + match self.get_item_for_key(digest).await { + Some(data) => { + data.touch().await; + Some(data) + } + None => None, } - None } /// Returns the replaced item if any. @@ -264,7 +269,7 @@ where None }; state.sum_store_size += new_item_size; - self.evict_items(state.deref_mut()).await; + self.evict_items(state.deref_mut()); maybe_old_item }