Skip to content

Commit

Permalink
Fix bug in evicting_map with unref improperly called and readability
Browse files Browse the repository at this point in the history
Fixes a bug where we would call unref() on an entry when it was a
replacement. This could cause issues in certain stores like
filesystem_store where it would delete the file from an unref, but
when it's a replacement it would reuse the same file.

Also minor changes to improve debugability in eviction_map
and filesystem_store.
  • Loading branch information
allada committed Apr 4, 2022
1 parent 2edf514 commit ea393a5
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 39 deletions.
79 changes: 50 additions & 29 deletions cas/store/filesystem_store.rs
Expand Up @@ -8,7 +8,7 @@ use std::time::SystemTime;
use async_trait::async_trait;
use bytes::BytesMut;
use filetime::{set_file_atime, FileTime};
use futures::stream::StreamExt;
use futures::stream::{StreamExt, TryStreamExt};
use rand::{thread_rng, Rng};
use tokio::fs::{self, File};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom, Take};
Expand All @@ -25,6 +25,7 @@ use traits::{StoreTrait, UploadSizeInfo};
// Default size to allocate memory of the buffer when reading files.
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;

#[derive(Debug)]
struct FileEntry {
digest: DigestInfo,
file_size: u64,
Expand Down Expand Up @@ -58,9 +59,16 @@ impl LenEntry for FileEntry {
#[inline]
async fn touch(&self) {
let full_content_path = to_full_path_from_digest(&self.content_path, &self.digest);
let res = match spawn_blocking(move || set_file_atime(&full_content_path, FileTime::now())).await {
Ok(res) => res.err_tip(|| "Failed to touch file in filesystem store"),
Err(_) => Err(make_err!(Code::Internal, "Failed to change atime of file")),
let set_atime_fut = spawn_blocking(move || {
set_file_atime(&full_content_path, FileTime::now())
.err_tip(|| format!("Failed to touch file in filesystem store {}", full_content_path))
});
let res = match set_atime_fut.await {
Ok(res) => res,
Err(_) => Err(make_err!(
Code::Internal,
"Failed to change atime of file due to spawn failing"
)),
};
if let Err(err) = res {
log::error!("{:?}", err);
Expand All @@ -73,6 +81,7 @@ impl LenEntry for FileEntry {
.store(thread_rng().gen::<u64>(), Ordering::Relaxed);
let from_path = to_full_path_from_digest(&self.content_path, &self.digest);
let to_path = to_full_path(&self.temp_path, &self.temp_path.to_string());
log::info!("\x1b[0;31mFilesystem Store\x1b[0m: Store evicting: {}", &from_path);
// It is possible (although extremely unlikely) that another thread is reading
// this file while we want to delete it here. To prevent errors in either case
// we rename the file (since that other thread would have an open file handle)
Expand All @@ -95,8 +104,9 @@ impl Drop for FileEntry {
}
let full_temp_path = to_full_path(&self.temp_path, &pending_delete_file_name.to_string());
tokio::spawn(async move {
log::info!("\x1b[0;31mFilesystem Store\x1b[0m: Store deleting: {}", &full_temp_path);
if let Err(err) = fs::remove_file(&full_temp_path).await {
log::info!(
log::warn!(
"\x1b[0;31mFilesystem Store\x1b[0m: Failed to remove file {} {:?}",
full_temp_path,
err
Expand Down Expand Up @@ -131,37 +141,23 @@ async fn add_files_to_cache(
async fn process_entry(
evicting_map: &EvictingMap<Arc<FileEntry>, SystemTime>,
file_name: &str,
dir_entry: &fs::DirEntry,
atime: SystemTime,
file_size: u64,
anchor_time: &SystemTime,
temp_path: &Arc<String>,
content_path: &Arc<String>,
) -> Result<(), Error> {
let digest = make_digest(&file_name)?;

let metadata = dir_entry.metadata().await.err_tip(|| "Failed to get metadata")?;
let atime = match metadata.accessed() {
Ok(atime) => atime,
Err(err) => {
panic!(
"{}{}{} : {} {:?}",
"It appears this filesystem does not support access time. ",
"Please configure this program to run on a drive that supports ",
"atime",
file_name,
err
);
}
};

let file_entry = FileEntry {
digest: digest.clone(),
file_size: metadata.len(),
file_size,
temp_path: temp_path.clone(),
content_path: content_path.clone(),
pending_delete_file_name: AtomicU64::new(0),
};
let time_since_anchor = anchor_time
.duration_since(atime.clone())
.duration_since(atime)
.map_err(|_| make_input_err!("File access time newer than now"))?;
evicting_map
.insert_with_time(digest, Arc::new(file_entry), time_since_anchor.as_secs() as i32)
Expand All @@ -173,14 +169,38 @@ async fn add_files_to_cache(
.await
.err_tip(|| "Failed opening content directory for iterating in filesystem store")?;

let mut read_dir_stream = ReadDirStream::new(dir_handle);
while let Some(dir_entry) = read_dir_stream.next().await {
let dir_entry = dir_entry.unwrap();
let file_name = dir_entry.file_name().into_string().unwrap();
let mut file_infos: Vec<(String, SystemTime, u64)> = ReadDirStream::new(dir_handle)
.then(|dir_entry| async move {
let dir_entry = dir_entry.unwrap();
let file_name = dir_entry.file_name().into_string().unwrap();
let metadata = dir_entry
.metadata()
.await
.err_tip(|| "Failed to get metadata in filesystem store")?;
let atime = match metadata.accessed() {
Ok(atime) => atime,
Err(err) => {
panic!(
"{}{}{} : {} {:?}",
"It appears this filesystem does not support access time. ",
"Please configure this program to run on a drive that supports ",
"atime",
file_name,
err
);
}
};
Result::<(String, SystemTime, u64), Error>::Ok((file_name, atime, metadata.len()))
})
.try_collect()
.await?;
file_infos.sort_by(|a, b| a.1.cmp(&b.1));
for (file_name, atime, file_size) in file_infos {
let result = process_entry(
&evicting_map,
&file_name,
&dir_entry,
atime,
file_size,
&anchor_time,
&temp_path,
&content_path,
Expand All @@ -192,7 +212,8 @@ async fn add_files_to_cache(
file_name,
err
);
let _ = fs::remove_file(dir_entry.path()).await; // Ignore result.
// Ignore result.
let _ = fs::remove_file(format!("{}/{}", &content_path, &file_name)).await;
}
}
Ok(())
Expand Down
7 changes: 7 additions & 0 deletions cas/store/memory_store.rs
@@ -1,5 +1,6 @@
// Copyright 2020-2021 Nathan (Blaise) Bruer. All rights reserved.

use std::fmt::Debug;
use std::pin::Pin;
use std::time::SystemTime;

Expand All @@ -16,6 +17,12 @@ use traits::{StoreTrait, UploadSizeInfo};
#[derive(Clone)]
pub struct BytesWrapper(Bytes);

impl Debug for BytesWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("BytesWrapper { -- Binary data -- }")
}
}

impl LenEntry for BytesWrapper {
#[inline]
fn len(&self) -> usize {
Expand Down
1 change: 1 addition & 0 deletions util/BUILD
Expand Up @@ -143,6 +143,7 @@ rust_test(
":error",
":evicting_map",
],
proc_macro_deps = ["//third_party:async_trait"],
)

rust_test(
Expand Down
22 changes: 14 additions & 8 deletions util/evicting_map.rs
@@ -1,5 +1,6 @@
// Copyright 2021 Nathan (Blaise) Bruer. All rights reserved.

use std::fmt::Debug;
use std::ops::DerefMut;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -40,7 +41,8 @@ impl InstantWrapper for SystemTime {
}
}

struct EvictionItem<T: LenEntry> {
#[derive(Debug)]
struct EvictionItem<T: LenEntry + Debug> {
seconds_since_anchor: i32,
data: T,
}
Expand Down Expand Up @@ -85,12 +87,12 @@ impl<T: LenEntry + Send + Sync> LenEntry for Arc<T> {
}
}

struct State<T: LenEntry> {
struct State<T: LenEntry + Debug> {
lru: LruCache<DigestInfo, EvictionItem<T>>,
sum_store_size: u64,
}

pub struct EvictingMap<T: LenEntry, I: InstantWrapper> {
pub struct EvictingMap<T: LenEntry + Debug, I: InstantWrapper> {
state: Mutex<State<T>>,
anchor_time: I,
max_bytes: u64,
Expand All @@ -100,7 +102,7 @@ pub struct EvictingMap<T: LenEntry, I: InstantWrapper> {

impl<T, I> EvictingMap<T, I>
where
T: LenEntry + Clone + Send + Sync,
T: LenEntry + Debug + Clone + Send + Sync,
I: InstantWrapper,
{
pub fn new(config: &EvictionPolicy, anchor_time: I) -> Self {
Expand Down Expand Up @@ -158,9 +160,8 @@ where
fn should_evict(&self, lru_len: usize, peek_entry: &EvictionItem<T>, sum_store_size: u64) -> bool {
let is_over_size = self.max_bytes != 0 && sum_store_size >= self.max_bytes;

let evict_before_seconds =
(self.anchor_time.elapsed().as_secs() as i32).max(self.max_seconds) - self.max_seconds;
let old_item_exists = self.max_seconds != 0 && peek_entry.seconds_since_anchor < evict_before_seconds;
let evict_older_than_seconds = (self.anchor_time.elapsed().as_secs() as i32) - self.max_seconds;
let old_item_exists = self.max_seconds != 0 && peek_entry.seconds_since_anchor < evict_older_than_seconds;

let is_over_count = self.max_count != 0 && (lru_len as u64) > self.max_count;

Expand Down Expand Up @@ -227,7 +228,12 @@ where
let mut state = self.state.lock().await;
if let Some(old_item) = state.lru.put(digest.into(), eviction_item) {
state.sum_store_size -= old_item.data.len() as u64;
old_item.data.unref().await;
// We do not want to unref here because if we are on a filesystem-backed
// store (or similar) the name of the newly inserted item will be the same
// as the name of the old item. If we were to unref might trigger updated
// file to be deleted. Unref is purely unnecessary here since we will always
// be updating the underlying data at this point instead of evicting/deleting
// it.
}
state.sum_store_size += new_item_size;
self.evict_items(state.deref_mut()).await;
Expand Down
71 changes: 69 additions & 2 deletions util/tests/evicting_map_test.rs
@@ -1,7 +1,10 @@
// Copyright 2021 Nathan (Blaise) Bruer. All rights reserved.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use bytes::Bytes;
use mock_instant::{Instant as MockInstant, MockClock};

Expand Down Expand Up @@ -224,7 +227,7 @@ mod evicting_map_tests {
MockClock::advance(Duration::from_secs(2));
evicting_map
.insert(DigestInfo::try_new(HASH3, 0)?, Bytes::from(DATA).into())
.await;
.await; // This will trigger an eviction.

assert_eq!(
evicting_map.size_for_key(&DigestInfo::try_new(HASH1, 0)?).await,
Expand All @@ -245,6 +248,70 @@ mod evicting_map_tests {
Ok(())
}

#[tokio::test]
async fn unref_not_called_on_replace() -> Result<(), Error> {
#[derive(Debug)]
struct MockEntry {
data: Bytes,
unref_called: AtomicBool,
}

#[async_trait]
impl LenEntry for MockEntry {
fn len(&self) -> usize {
// Note: We are not testing this functionality.
return 0;
}

async fn touch(&self) {
// Do nothing. We are not testing this functionality.
}

async fn unref(&self) {
self.unref_called.store(true, Ordering::Relaxed);
}
}

let evicting_map = EvictingMap::<Arc<MockEntry>, MockInstantWrapped>::new(
&EvictionPolicy {
max_count: 1,
max_seconds: 0,
max_bytes: 0,
},
MockInstantWrapped(MockInstant::now()),
);

const DATA1: &str = "12345678";
const DATA2: &str = "87654321";

let (entry1, entry2) = {
let entry1 = Arc::new(MockEntry {
data: Bytes::from(DATA1),
unref_called: AtomicBool::new(false),
});
evicting_map
.insert(DigestInfo::try_new(HASH1, 0)?, entry1.clone())
.await;

let entry2 = Arc::new(MockEntry {
data: Bytes::from(DATA2),
unref_called: AtomicBool::new(false),
});
evicting_map
.insert(DigestInfo::try_new(HASH1, 0)?, entry2.clone())
.await;
(entry1, entry2)
};

let existing_entry = evicting_map.get(&DigestInfo::try_new(HASH1, 0)?).await.unwrap();
assert_eq!(existing_entry.data, DATA2);

assert_eq!(entry1.unref_called.load(Ordering::Relaxed), false);
assert_eq!(entry2.unref_called.load(Ordering::Relaxed), false);

Ok(())
}

#[tokio::test]
async fn contains_key_refreshes_time() -> Result<(), Error> {
let evicting_map = EvictingMap::<BytesWrapper, MockInstantWrapped>::new(
Expand All @@ -269,7 +336,7 @@ mod evicting_map_tests {
MockClock::advance(Duration::from_secs(2));
evicting_map
.insert(DigestInfo::try_new(HASH3, 0)?, Bytes::from(DATA).into())
.await;
.await; // This will trigger an eviction.

assert_eq!(
evicting_map.size_for_key(&DigestInfo::try_new(HASH1, 0)?).await,
Expand Down

0 comments on commit ea393a5

Please sign in to comment.