Skip to content

Commit

Permalink
Evict on touch failure (#613)
Browse files Browse the repository at this point in the history
There's an unknown failure occuring under load where the filesystem
store ends up with an entry in the temp path that doesn't exist.  I
can't find that fault, but rather than persisting, it makes sense to
avoid the failure persisting by removing it from the eviciting map
in that case.

Therefore, this change introduces a change to the LenEntry such that
touch() returns a bool.  If it returns false then it is removed from
the EvictingMap immediately.
  • Loading branch information
chrisstaite-menlo committed Jan 29, 2024
1 parent f9f7908 commit 3037a66
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 48 deletions.
6 changes: 4 additions & 2 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl LenEntry for FileEntryImpl {
}

#[inline]
async fn touch(&self) {
async fn touch(&self) -> bool {
let result = self
.get_file_path_locked(move |full_content_path| async move {
spawn_blocking(move || {
Expand All @@ -284,8 +284,10 @@ impl LenEntry for FileEntryImpl {
})
.await;
if let Err(e) = result {
error!("{}", e);
error!("{e}");
return false;
}
true
}

// unref() only triggers when an item is removed from the eviction_map. It is possible
Expand Down
38 changes: 34 additions & 4 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> LenEntry for TestFileEntry<H
self.inner.as_ref().unwrap().is_empty()
}

async fn touch(&self) {
async fn touch(&self) -> bool {
self.inner.as_ref().unwrap().touch().await
}

Expand Down Expand Up @@ -1004,7 +1004,6 @@ mod filesystem_store_tests {

// Test that the empty digest file is created and contains an empty length.
if file_metadata.is_file() && file_metadata.len() == 0 {
println!("{}", file_metadata.is_file());
return Ok(());
}
}
Expand Down Expand Up @@ -1040,15 +1039,15 @@ mod filesystem_store_tests {
..Default::default()
},
|_| sleep(Duration::ZERO),
|_, _| {
|from, to| {
// If someone locked our mutex, it means we need to pause, so we
// simply request a lock on the same mutex.
if RENAME_REQUEST_PAUSE_MUX.try_lock().is_err() {
RENAME_IS_PAUSED.store(true, Ordering::Release);
let _lock = RENAME_REQUEST_PAUSE_MUX.lock();
RENAME_IS_PAUSED.store(false, Ordering::Release);
}
Ok(())
std::fs::rename(from, to)
},
)
.await?,
Expand Down Expand Up @@ -1105,4 +1104,35 @@ mod filesystem_store_tests {

Ok(())
}

#[tokio::test]
async fn deleted_file_removed_from_store() -> Result<(), Error> {
let digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");

let store = Box::pin(
FilesystemStore::<FileEntryImpl>::new_with_timeout_and_rename_fn(
&nativelink_config::stores::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
read_buffer_size: 1,
..Default::default()
},
|_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
);

store.as_ref().update_oneshot(digest, VALUE1.into()).await?;

let stored_file_path = OsString::from(format!("{}/{}-{}", content_path, digest.hash_str(), digest.size_bytes));
std::fs::remove_file(stored_file_path)?;

let digest_result = store.as_ref().has(digest).await.err_tip(|| "Failed to execute has")?;
assert!(digest_result.is_none());

Ok(())
}
}
109 changes: 68 additions & 41 deletions nativelink-util/src/evicting_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use async_lock::Mutex;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::{future, join, FutureExt, StreamExt};
use futures::{future, join, StreamExt};
use lru::LruCache;
use nativelink_config::stores::EvictionPolicy;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -71,9 +71,12 @@ pub trait LenEntry: 'static {
/// Returns `true` if `self` has zero length.
fn is_empty(&self) -> bool;

/// Called when an entry is touched.
/// Called when an entry is touched. On failure, will remove the entry
/// from the map.
#[inline]
async fn touch(&self) {}
async fn touch(&self) -> bool {
true
}

/// This will be called when object is removed from map.
/// Note: There may still be a reference to it held somewhere else, which
Expand Down Expand Up @@ -104,8 +107,8 @@ impl<T: LenEntry + Send + Sync> LenEntry for Arc<T> {
}

#[inline]
async fn touch(&self) {
self.as_ref().touch().await;
async fn touch(&self) -> bool {
self.as_ref().touch().await
}

#[inline]
Expand All @@ -128,6 +131,21 @@ struct State<T: LenEntry + Debug> {
lifetime_inserted_bytes: Counter,
}

impl<T: LenEntry + Debug + Sync> State<T> {
async fn remove(&mut self, eviction_item: &EvictionItem<T>, replaced: bool) {
self.sum_store_size -= eviction_item.data.len() as u64;
if replaced {
self.replaced_items.inc();
self.replaced_bytes.add(eviction_item.data.len() as u64);
} else {
self.evicted_items.inc();
self.evicted_bytes.add(eviction_item.data.len() as u64);
}
// Note: See comment in `unref()` requring global lock of insert/remove.
eviction_item.data.unref().await;
}
}

pub struct EvictingMap<T: LenEntry + Debug, I: InstantWrapper> {
state: Mutex<State<T>>,
anchor_time: I,
Expand Down Expand Up @@ -234,12 +252,8 @@ where

while self.should_evict(state.lru.len(), peek_entry, state.sum_store_size, max_bytes) {
let (key, eviction_item) = state.lru.pop_lru().expect("Tried to peek() then pop() but failed");
state.sum_store_size -= eviction_item.data.len() as u64;
state.evicted_items.inc();
state.evicted_bytes.add(eviction_item.data.len() as u64);
// Note: See comment in `unref()` requring global lock of insert/remove.
eviction_item.data.unref().await;
info!("\x1b[0;31mEvicting Map\x1b[0m: Evicting {}", key.hash_str());
state.remove(&eviction_item, false).await;

peek_entry = if let Some((_, entry)) = state.lru.peek_lru() {
entry
Expand All @@ -256,6 +270,21 @@ where
results[0]
}

async fn touch_or_remove(&self, digest: &DigestInfo, data: T) -> Option<T> {
if data.touch().await {
return Some(data);
}

let mut state = self.state.lock().await;
let (key, eviction_item) = state.lru.pop_entry(digest)?;
info!(
"\x1b[0;31mEvicting Map\x1b[0m: Touch failed, evicting {}",
key.hash_str()
);
state.remove(&eviction_item, false).await;
None
}

/// Return the sizes of a collection of `DigestInfo`. Expects `results` collection
/// to be provided for storing the resulting `DigestInfo` size. Each index value in
/// `digests` maps directly to the size value of the `DigestInfo` in `results`.
Expand All @@ -264,25 +293,26 @@ where
let mut state = self.state.lock().await;
let mut remove_digests: Vec<&DigestInfo> = Vec::new();

let to_touch: Vec<T> = digests
let mut lru_len = state.lru.len();
let mut sum_store_size = state.sum_store_size;
let to_touch_or_remove: Vec<Option<T>> = digests
.iter()
.zip(results.iter_mut())
.flat_map(|(digest, result)| {
let lru_len = state.lru.len();
let sum_store_size = state.sum_store_size;
.map(|digest| {
// Determine if a digest should be evicted or data should be touched.
// Digests to be eviected are collected in separate vector and chained
// in a single future.
if let Some(entry) = state.lru.get(digest) {
if self.should_evict(lru_len, entry, sum_store_size, self.max_bytes) {
// Important to track the eviction size, otherwise if we
// reach the maximum we end up eviciting everything!
sum_store_size -= entry.data.len() as u64;
lru_len -= 1;
// Digest should be evicted.
remove_digests.push(digest);
None
} else {
// Extract data entry to be touched and slot length into results.
let data = entry.data.clone();
*result = Some(data.len());
Some(data)
// Extract data entry to be touched.
Some(entry.data.clone())
}
} else {
// Digest will be evicted if not in lru map, this is a pedantic case.
Expand All @@ -293,14 +323,23 @@ where
.collect();

join!(
to_touch
.iter()
.map(|data| data.touch().map(|_| ()))
to_touch_or_remove
.into_iter()
.zip(results.iter_mut())
.zip(digests.iter())
.filter_map(|((data, result), digest)| Some((data?, result, digest)))
.map(|(data, result, digest)| async move {
*result = self.touch_or_remove(digest, data).await.map(|data| data.len());
})
.collect::<FuturesUnordered<_>>()
.for_each(|_| future::ready(())),
async move {
for digest in remove_digests {
self.inner_remove(state.deref_mut(), digest).await;
// Do not use inner_remove as it calls evict_items, which
// is precisely what we're doing here.
if let Some(entry) = state.lru.pop(digest) {
state.remove(&entry, false).await;
}
}
}
);
Expand All @@ -310,13 +349,10 @@ where
let mut state = self.state.lock().await;
self.evict_items(state.deref_mut()).await;

if let Some(entry) = state.lru.get_mut(digest) {
let data = entry.data.clone();
drop(state);
data.touch().await;
return Some(data);
}
None
let entry = state.lru.get_mut(digest)?;
let data = entry.data.clone();
drop(state);
self.touch_or_remove(digest, data).await
}

/// Returns the replaced item if any.
Expand Down Expand Up @@ -362,11 +398,7 @@ where
};

if let Some(old_item) = state.lru.put(digest, eviction_item) {
state.sum_store_size -= old_item.data.len() as u64;
state.replaced_items.inc();
state.replaced_bytes.add(old_item.data.len() as u64);
// Note: See comment in `unref()` requring global lock of insert/remove.
old_item.data.unref().await;
state.remove(&old_item, true).await;
replaced_items.push(old_item.data);
}
state.sum_store_size += new_item_size;
Expand All @@ -384,12 +416,7 @@ where
async fn inner_remove(&self, mut state: &mut State<T>, digest: &DigestInfo) -> bool {
self.evict_items(state.deref_mut()).await;
if let Some(entry) = state.lru.pop(digest) {
let data_len = entry.data.len() as u64;
state.sum_store_size -= data_len;
state.removed_items.inc();
state.removed_bytes.add(data_len);
// Note: See comment in `unref()` requring global lock of insert/remove.
entry.data.unref().await;
state.remove(&entry, false).await;
return true;
}
false
Expand Down
3 changes: 2 additions & 1 deletion nativelink-util/tests/evicting_map_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,9 @@ mod evicting_map_tests {
unreachable!("We are not testing this functionality");
}

async fn touch(&self) {
async fn touch(&self) -> bool {
// Do nothing. We are not testing this functionality.
true
}

async fn unref(&self) {
Expand Down

0 comments on commit 3037a66

Please sign in to comment.