Skip to content

Commit

Permalink
Account for block size in filesystem store for eviction purposes
Browse files Browse the repository at this point in the history
  • Loading branch information
zbirenbaum committed Feb 15, 2024
1 parent 4e7d68b commit 62f13e8
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 24 deletions.
7 changes: 7 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ pub struct FilesystemStore {
/// value will cause items to never be removed from the store causing
/// infinite memory usage.
pub eviction_policy: Option<EvictionPolicy>,

/// The block size of the filesystem for the running machine
/// value is used to determine an entry's actual size on disk consumed
/// For a 4KB block size filesystem, a 1B file actually consumes 4KB
/// Default: 4096
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub block_size: u64,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
64 changes: 45 additions & 19 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ use crate::cas_utils::is_zero_digest;

// Default size to allocate memory of the buffer when reading files.
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
// Default block size of all major filesystems is 4KB
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;

#[derive(Debug)]
pub struct SharedContext {
Expand Down Expand Up @@ -119,17 +121,21 @@ fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString {
#[async_trait]
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
/// Responsible for creating the underlying FileEntry.
fn create(file_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;

/// Creates a (usually) temp file, opens it and returns the path to the temp file.
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error>
where
Self: Sized;

/// Returns the underlying reference to where the filesize is stored.
fn get_file_size(&mut self) -> &mut u64;
fn get_data_size(&mut self) -> &mut u64;

/// Returns the actual size of the underlying file on the disk after accounting for filesystem block size
fn get_file_size(&self) -> u64;

/// Gets the underlying EncodedfilePath.
fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
Expand All @@ -150,7 +156,8 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
}

pub struct FileEntryImpl {
file_size: u64,
data_size: u64,
block_size: u64,
encoded_file_path: RwLock<EncodedFilePath>,
}

Expand All @@ -162,17 +169,20 @@ impl FileEntryImpl {

#[async_trait]
impl FileEntry for FileEntryImpl {
fn create(file_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
Self {
file_size,
data_size,
block_size,
encoded_file_path,
}
}


/// This encapsolates the logic for the edge case of if the file fails to create
/// the cleanup of the file is handled without creating a FileEntry, which would
/// try to cleanup the file as well during drop().
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(FileEntryImpl, fs::ResumeableFileSlot<'static>, OsString), Error> {
let temp_full_path = encoded_file_path.get_file_path();
Expand All @@ -192,15 +202,20 @@ impl FileEntry for FileEntryImpl {
Ok((
<FileEntryImpl as FileEntry>::create(
0, /* Unknown yet, we will fill it in later */
block_size,
RwLock::new(encoded_file_path),
),
temp_file_result,
temp_full_path,
))
}

fn get_file_size(&mut self) -> &mut u64 {
&mut self.file_size
fn get_data_size(&mut self) -> &mut u64 {
&mut self.data_size
}

fn get_file_size(&self) -> u64 {
self.data_size.div_ceil(self.block_size) * self.block_size
}

fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
Expand Down Expand Up @@ -243,7 +258,7 @@ impl FileEntry for FileEntryImpl {
impl Debug for FileEntryImpl {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
f.debug_struct("FileEntryImpl")
.field("file_size", &self.file_size)
.field("data_size", &self.data_size)
.field("encoded_file_path", &"<behind mutex>")
.finish()
}
Expand All @@ -258,11 +273,11 @@ fn make_temp_digest(digest: &mut DigestInfo) {
impl LenEntry for FileEntryImpl {
#[inline]
fn len(&self) -> usize {
self.file_size as usize
self.get_file_size() as usize
}

fn is_empty(&self) -> bool {
self.file_size == 0
self.data_size == 0
}

#[inline]
Expand Down Expand Up @@ -343,19 +358,22 @@ async fn add_files_to_cache<Fe: FileEntry>(
evicting_map: &EvictingMap<Arc<Fe>, SystemTime>,
anchor_time: &SystemTime,
shared_context: &Arc<SharedContext>,
block_size: u64,
) -> Result<(), Error> {
async fn process_entry<Fe: FileEntry>(
evicting_map: &EvictingMap<Arc<Fe>, SystemTime>,
file_name: &str,
atime: SystemTime,
file_size: u64,
data_size: u64,
block_size: u64,
anchor_time: &SystemTime,
shared_context: &Arc<SharedContext>,
) -> Result<(), Error> {
let digest = digest_from_filename(file_name)?;

let file_entry = Fe::create(
file_size,
data_size,
block_size,
RwLock::new(EncodedFilePath {
shared_context: shared_context.clone(),
path_type: PathType::Content,
Expand Down Expand Up @@ -407,8 +425,8 @@ async fn add_files_to_cache<Fe: FileEntry>(
};

file_infos.sort_by(|a, b| a.1.cmp(&b.1));
for (file_name, atime, file_size) in file_infos {
let result = process_entry(evicting_map, &file_name, atime, file_size, anchor_time, shared_context).await;
for (file_name, atime, data_size) in file_infos {
let result = process_entry(evicting_map, &file_name, atime, data_size, block_size, anchor_time, shared_context).await;
if let Err(err) = result {
warn!(
"Could not add file to eviction cache, so deleting: {} - {:?}",
Expand Down Expand Up @@ -440,6 +458,7 @@ async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
shared_context: Arc<SharedContext>,
evicting_map: Arc<EvictingMap<Arc<Fe>, SystemTime>>,
block_size: u64,
read_buffer_size: usize,
sleep_fn: fn(Duration) -> Sleep,
rename_fn: fn(&OsString, &OsString) -> Result<(), std::io::Error>,
Expand Down Expand Up @@ -473,7 +492,13 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
temp_path: config.temp_path.clone(),
content_path: config.content_path.clone(),
});
add_files_to_cache(evicting_map.as_ref(), &now, &shared_context).await?;

let block_size = if config.block_size == 0 {
DEFAULT_BLOCK_SIZE
} else {
config.block_size
};
add_files_to_cache(evicting_map.as_ref(), &now, &shared_context, block_size).await?;
prune_temp_path(&shared_context.temp_path).await?;

let read_buffer_size = if config.read_buffer_size == 0 {
Expand All @@ -484,6 +509,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
let store = Self {
shared_context,
evicting_map,
block_size,
read_buffer_size,
sleep_fn,
rename_fn,
Expand All @@ -505,7 +531,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
final_digest: DigestInfo,
mut reader: DropCloserReadHalf,
) -> Result<(), Error> {
let mut file_size = 0;
let mut data_size = 0;
loop {
let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).await else {
// In the event we timeout, we want to close the writing file, to prevent
Expand All @@ -532,7 +558,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
.write_all_buf(&mut data)
.await
.err_tip(|| "Failed to write data into filesystem store")?;
file_size += data_len as u64;
data_size += data_len as u64;
}

resumeable_temp_file
Expand All @@ -546,7 +572,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
drop(resumeable_temp_file);
*entry.get_file_size() = file_size;
*entry.get_data_size() = data_size;
let entry = Arc::new(entry);
// This sequence of events is quite ticky to understand due to the amount of triggers that
Expand Down Expand Up @@ -652,7 +678,7 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
let mut temp_digest = digest;
make_temp_digest(&mut temp_digest);

let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(EncodedFilePath {
let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(self.block_size, EncodedFilePath {
shared_context: self.shared_context.clone(),
path_type: PathType::Temp,
digest: temp_digest,
Expand Down
64 changes: 59 additions & 5 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,18 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> Debug for TestFileEntry<Hook

#[async_trait]
impl<Hooks: FileEntryHooks + 'static + Sync + Send> FileEntry for TestFileEntry<Hooks> {
fn create(file_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
Self {
inner: Some(FileEntryImpl::create(file_size, encoded_file_path)),
inner: Some(FileEntryImpl::create(data_size, block_size, encoded_file_path)),
_phantom: PhantomData,
}
}

async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error> {
let (inner, file_slot, path) = FileEntryImpl::make_and_open_file(encoded_file_path).await?;
let (inner, file_slot, path) = FileEntryImpl::make_and_open_file(block_size, encoded_file_path).await?;
Ok((
Self {
inner: Some(inner),
Expand All @@ -88,8 +89,12 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> FileEntry for TestFileEntry<
))
}

fn get_file_size(&mut self) -> &mut u64 {
self.inner.as_mut().unwrap().get_file_size()
fn get_data_size(&mut self) -> &mut u64 {
self.inner.as_mut().unwrap().get_data_size()
}

fn get_file_size(&self) -> u64 {
self.inner.as_ref().unwrap().get_file_size()
}

fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
Expand Down Expand Up @@ -216,13 +221,15 @@ mod filesystem_store_tests {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
eviction_policy: None,
block_size: 1,
..Default::default()
})
.await?,
);

// Insert dummy value into store.
store.as_ref().update_oneshot(digest, VALUE1.into()).await?;

assert_eq!(
store.as_ref().has(digest).await,
Ok(Some(VALUE1.len())),
Expand Down Expand Up @@ -345,6 +352,7 @@ mod filesystem_store_tests {
max_count: 3,
..Default::default()
}),
block_size: 1,
read_buffer_size: 1,
})
.await?,
Expand Down Expand Up @@ -449,6 +457,7 @@ mod filesystem_store_tests {
max_count: 1,
..Default::default()
}),
block_size: 1,
read_buffer_size: 1,
})
.await?,
Expand Down Expand Up @@ -729,6 +738,7 @@ mod filesystem_store_tests {
max_bytes: 5,
..Default::default()
}),
block_size: 1,
..Default::default()
})
.await?,
Expand Down Expand Up @@ -1135,4 +1145,48 @@ mod filesystem_store_tests {

Ok(())
}

// Ensure that get_file_size() returns the correct number
// block_size if content length is < block_size
// ceil(content length / block_size) * block_size if content length is > block_size
#[tokio::test]
async fn get_file_size_uses_block_size() -> Result<(), Error> {
let short_digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
let long_digest = DigestInfo::try_new(HASH2, 2*VALUE1.len())?;
let long_value = format!("{VALUE1}{VALUE1}");

let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");
let block_size = VALUE1.len() as u64 + 1;

let store = Box::pin(
FilesystemStore::<FileEntryImpl>::new_with_timeout_and_rename_fn(
&nativelink_config::stores::FilesystemStore {
content_path: content_path.clone(),
block_size,
temp_path: temp_path.clone(),
read_buffer_size: 1,
..Default::default()
},
|_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
);

store.as_ref().update_oneshot(short_digest, VALUE1.into()).await?;
let short_entry = store.as_ref().get_file_entry_for_digest(&short_digest).await?;
assert_eq!(
short_entry.get_file_size(),
block_size
);

store.as_ref().update_oneshot(long_digest, long_value.into()).await?;
let long_entry = store.as_ref().get_file_entry_for_digest(&long_digest).await?;
assert_eq!(
long_entry.get_file_size(),
2*block_size
);
Ok(())
}
}

0 comments on commit 62f13e8

Please sign in to comment.