Skip to content

Commit

Permalink
Fixes flakey filesystem_store_test
Browse files Browse the repository at this point in the history
Fixes a case where we'd check if the temp directory is clean after
some tests, but the Drop condition it was checking against was
the one actually deleting the files in that directory (in a spawn).
  • Loading branch information
allada committed Apr 28, 2022
1 parent f207dfa commit 717d87a
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 41 deletions.
40 changes: 36 additions & 4 deletions cas/store/filesystem_store.rs
@@ -1,5 +1,6 @@
// Copyright 2021 Nathan (Blaise) Bruer. All rights reserved.

use std::fmt::{Debug, Formatter};
use std::path::Path;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
Expand All @@ -26,14 +27,14 @@ use traits::{StoreTrait, UploadSizeInfo};
// Default size to allocate memory of the buffer when reading files.
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;

#[derive(Debug)]
struct FileEntry {
digest: DigestInfo,
file_size: u64,
temp_path: Arc<String>,
content_path: Arc<String>,
// Will be the name of the file in the temp_path if it is flagged for deletion.
pending_delete_file_name: AtomicU64,
file_evicted_callback: Option<&'static (dyn Fn() + Sync)>,
}

impl FileEntry {
Expand All @@ -55,6 +56,18 @@ impl FileEntry {
}
}

impl Debug for FileEntry {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
f.debug_struct("FileEntry")
.field("digest", &self.digest)
.field("file_size", &self.file_size)
.field("temp_path", &self.temp_path)
.field("content_path", &self.content_path)
.field("pending_delete_file_name", &self.pending_delete_file_name)
.finish()
}
}

#[async_trait]
impl LenEntry for FileEntry {
#[inline]
Expand Down Expand Up @@ -109,6 +122,7 @@ impl Drop for FileEntry {
if pending_delete_file_name == 0 {
return;
}
let file_evicted_callback = self.file_evicted_callback.take();
let full_temp_path = to_full_path(&self.temp_path, &pending_delete_file_name.to_string());
tokio::spawn(async move {
log::info!("\x1b[0;31mFilesystem Store\x1b[0m: Store deleting: {}", &full_temp_path);
Expand All @@ -119,6 +133,9 @@ impl Drop for FileEntry {
err
);
}
if let Some(callback) = file_evicted_callback {
(callback)();
}
});
}
}
Expand Down Expand Up @@ -162,6 +179,7 @@ async fn add_files_to_cache(
temp_path: temp_path.clone(),
content_path: content_path.clone(),
pending_delete_file_name: AtomicU64::new(0),
file_evicted_callback: None,
};
let time_since_anchor = anchor_time
.duration_since(atime)
Expand Down Expand Up @@ -252,9 +270,19 @@ pub struct FilesystemStore {
content_path: Arc<String>,
evicting_map: EvictingMap<Arc<FileEntry>, SystemTime>,
read_buffer_size: usize,
file_evicted_callback: Option<&'static (dyn Fn() + Sync)>,
}

impl FilesystemStore {
pub async fn new_with_callback(
config: &config::backends::FilesystemStore,
file_evicted_callback: &'static (dyn Fn() + Sync),
) -> Result<Self, Error> {
let mut me = Self::new(config).await?;
me.file_evicted_callback = Some(file_evicted_callback);
Ok(me)
}

pub async fn new(config: &config::backends::FilesystemStore) -> Result<Self, Error> {
let now = SystemTime::now();

Expand Down Expand Up @@ -284,6 +312,7 @@ impl FilesystemStore {
content_path: Arc::new(config.content_path.clone()),
evicting_map,
read_buffer_size,
file_evicted_callback: None,
};
Ok(store)
}
Expand All @@ -295,7 +324,7 @@ impl FilesystemStore {
async fn update_file<'a>(
self: Pin<&Self>,
temp_loc: &str,
temp_file: &mut fs::FileSlot<'a>,
mut temp_file: fs::FileSlot<'a>,
temp_name_num: u64,
digest: DigestInfo,
mut reader: DropCloserReadHalf,
Expand Down Expand Up @@ -323,12 +352,15 @@ impl FilesystemStore {
.await
.err_tip(|| format!("Failed to sync_data in filesystem store {}", temp_loc))?;

drop(temp_file);

let entry = Arc::new(FileEntry {
digest: digest.clone(),
file_size,
temp_path: self.temp_path.clone(),
content_path: self.content_path.clone(),
pending_delete_file_name: AtomicU64::new(0),
file_evicted_callback: self.file_evicted_callback,
});

let final_loc = to_full_path_from_digest(&self.content_path, &digest);
Expand Down Expand Up @@ -386,12 +418,12 @@ impl StoreTrait for FilesystemStore {
let temp_name_num = thread_rng().gen::<u64>();
let temp_full_path = to_full_path(&self.temp_path, &temp_name_num.to_string());

let mut temp_file = fs::create_file(&temp_full_path)
let temp_file = fs::create_file(&temp_full_path)
.await
.err_tip(|| "Failed to create temp file in filesystem store")?;

if let Err(err) = self
.update_file(&temp_full_path, &mut temp_file, temp_name_num, digest, reader)
.update_file(&temp_full_path, temp_file, temp_name_num, digest, reader)
.await
{
let result = fs::remove_file(temp_full_path)
Expand Down
112 changes: 75 additions & 37 deletions cas/store/tests/filesystem_store_test.rs
Expand Up @@ -2,18 +2,20 @@

use std::env;
use std::pin::Pin;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::SystemTime;

use filetime::{set_file_atime, FileTime};
use rand::{thread_rng, Rng};
use tokio::io::AsyncReadExt;
use tokio_stream::{wrappers::ReadDirStream, StreamExt};

use buf_channel::{make_buf_channel_pair, DropCloserReadHalf};
use common::{fs, DigestInfo};
use config;
use error::{Error, ResultExt};
use filesystem_store::FilesystemStore;
use filetime::{set_file_atime, FileTime};
use rand::{thread_rng, Rng};
use tokio::io::AsyncReadExt;
use tokio_stream::{wrappers::ReadDirStream, StreamExt};
use traits::StoreTrait;

/// Get temporary path from either `TEST_TMPDIR` or best effort temp directory if
Expand Down Expand Up @@ -98,16 +100,24 @@ mod filesystem_store_tests {
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");

static DELETES_FINISHED: AtomicU32 = AtomicU32::new(0);
fn on_file_delete() {
DELETES_FINISHED.fetch_add(1, Ordering::Relaxed);
}

let store = Box::pin(
FilesystemStore::new(&config::backends::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
eviction_policy: Some(config::backends::EvictionPolicy {
max_count: 3,
FilesystemStore::new_with_callback(
&config::backends::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
eviction_policy: Some(config::backends::EvictionPolicy {
max_count: 3,
..Default::default()
}),
..Default::default()
}),
..Default::default()
})
},
&on_file_delete,
)
.await?,
);

Expand All @@ -130,15 +140,19 @@ mod filesystem_store_tests {
assert_eq!(&data[..], VALUE2.as_bytes(), "Expected file content to match");
}

loop {
if DELETES_FINISHED.load(Ordering::Relaxed) == 1 {
break;
}
tokio::task::yield_now().await;
}

let (_permit, temp_dir_handle) = fs::read_dir(temp_path.clone())
.await
.err_tip(|| "Failed opening temp directory")?
.into_inner();
let mut read_dir_stream = ReadDirStream::new(temp_dir_handle);

// Ensure we let any background tasks finish.
tokio::task::yield_now().await;

while let Some(temp_dir_entry) = read_dir_stream.next().await {
let path = temp_dir_entry?.path();
assert!(false, "No files should exist in temp directory, found: {:?}", path);
Expand All @@ -156,16 +170,24 @@ mod filesystem_store_tests {
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");

static DELETES_FINISHED: AtomicU32 = AtomicU32::new(0);
fn on_file_delete() {
DELETES_FINISHED.fetch_add(1, Ordering::Relaxed);
}

let store = Arc::new(
FilesystemStore::new(&config::backends::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
eviction_policy: Some(config::backends::EvictionPolicy {
max_count: 3,
..Default::default()
}),
read_buffer_size: 1,
})
FilesystemStore::new_with_callback(
&config::backends::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
eviction_policy: Some(config::backends::EvictionPolicy {
max_count: 3,
..Default::default()
}),
read_buffer_size: 1,
},
&on_file_delete,
)
.await?,
);

Expand Down Expand Up @@ -225,8 +247,12 @@ mod filesystem_store_tests {
"Expected file content to match"
);

// Ensure we let any background tasks finish.
tokio::task::yield_now().await;
loop {
if DELETES_FINISHED.load(Ordering::Relaxed) == 1 {
break;
}
tokio::task::yield_now().await;
}

{
// Now ensure our temp file was cleaned up.
Expand Down Expand Up @@ -254,16 +280,24 @@ mod filesystem_store_tests {
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");

static DELETES_FINISHED: AtomicU32 = AtomicU32::new(0);
fn on_file_delete() {
DELETES_FINISHED.fetch_add(1, Ordering::Relaxed);
}

let store = Arc::new(
FilesystemStore::new(&config::backends::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
eviction_policy: Some(config::backends::EvictionPolicy {
max_count: 1,
..Default::default()
}),
read_buffer_size: 1,
})
FilesystemStore::new_with_callback(
&config::backends::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
eviction_policy: Some(config::backends::EvictionPolicy {
max_count: 1,
..Default::default()
}),
read_buffer_size: 1,
},
&on_file_delete,
)
.await?,
);

Expand Down Expand Up @@ -316,8 +350,12 @@ mod filesystem_store_tests {

assert_eq!(&reader_data, VALUE1, "Expected file content to match");

// Ensure we let any background tasks finish.
tokio::task::yield_now().await;
loop {
if DELETES_FINISHED.load(Ordering::Relaxed) == 1 {
break;
}
tokio::task::yield_now().await;
}

{
// Now ensure our temp file was cleaned up.
Expand Down

0 comments on commit 717d87a

Please sign in to comment.