Skip to content

Commit

Permalink
Account for block size in filesystem store for eviction purposes (#661)
Browse files Browse the repository at this point in the history
- eviction_store will now treat FileEntries as if they consume the worst case
amount of space on disk based on block size to prevent out of space errors.
Prior behavior can be restored by setting `"block_size": 1` under filesystem
in configuration file
- Add `FileEntry::size_on_disk` - worst case disk space consumed by entry
- Add test cases to `filesystem_store_test` ensuring correct `size_on_disk`
- Add field in filesystem config for setting block size
  • Loading branch information
zbirenbaum committed Feb 16, 2024
1 parent f8044e6 commit 0639a59
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 29 deletions.
7 changes: 7 additions & 0 deletions nativelink-config/src/stores.rs
Expand Up @@ -242,6 +242,13 @@ pub struct FilesystemStore {
/// value will cause items to never be removed from the store causing
/// infinite memory usage.
pub eviction_policy: Option<EvictionPolicy>,

/// The block size of the filesystem for the running machine
/// value is used to determine an entry's actual size on disk consumed
/// For a 4KB block size filesystem, a 1B file actually consumes 4KB
/// Default: 4096
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub block_size: u64,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
85 changes: 61 additions & 24 deletions nativelink-store/src/filesystem_store.rs
Expand Up @@ -43,6 +43,8 @@ use crate::cas_utils::is_zero_digest;

// Default size to allocate memory of the buffer when reading files.
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
// Default block size of all major filesystems is 4KB
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;

#[derive(Debug)]
pub struct SharedContext {
Expand Down Expand Up @@ -121,17 +123,21 @@ fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString {
#[async_trait]
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
/// Responsible for creating the underlying FileEntry.
fn create(file_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;

/// Creates a (usually) temp file, opens it and returns the path to the temp file.
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error>
where
Self: Sized;

/// Returns the underlying reference to where the filesize is stored.
fn get_file_size(&mut self) -> &mut u64;
/// Returns the underlying reference to the size of the data in bytes
fn data_size_mut(&mut self) -> &mut u64;

/// Returns the actual size of the underlying file on the disk after accounting for filesystem block size.
fn size_on_disk(&self) -> u64;

/// Gets the underlying EncodedfilePath.
fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
Expand All @@ -152,7 +158,8 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
}

pub struct FileEntryImpl {
file_size: u64,
data_size: u64,
block_size: u64,
encoded_file_path: RwLock<EncodedFilePath>,
}

Expand All @@ -164,9 +171,10 @@ impl FileEntryImpl {

#[async_trait]
impl FileEntry for FileEntryImpl {
fn create(file_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
Self {
file_size,
data_size,
block_size,
encoded_file_path,
}
}
Expand All @@ -175,6 +183,7 @@ impl FileEntry for FileEntryImpl {
/// the cleanup of the file is handled without creating a FileEntry, which would
/// try to cleanup the file as well during drop().
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(FileEntryImpl, fs::ResumeableFileSlot<'static>, OsString), Error> {
let temp_full_path = encoded_file_path.get_file_path();
Expand All @@ -194,15 +203,20 @@ impl FileEntry for FileEntryImpl {
Ok((
<FileEntryImpl as FileEntry>::create(
0, /* Unknown yet, we will fill it in later */
block_size,
RwLock::new(encoded_file_path),
),
temp_file_result,
temp_full_path,
))
}

fn get_file_size(&mut self) -> &mut u64 {
&mut self.file_size
fn data_size_mut(&mut self) -> &mut u64 {
&mut self.data_size
}

fn size_on_disk(&self) -> u64 {
self.data_size.div_ceil(self.block_size) * self.block_size
}

fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
Expand Down Expand Up @@ -245,7 +259,7 @@ impl FileEntry for FileEntryImpl {
impl Debug for FileEntryImpl {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
f.debug_struct("FileEntryImpl")
.field("file_size", &self.file_size)
.field("data_size", &self.data_size)
.field("encoded_file_path", &"<behind mutex>")
.finish()
}
Expand All @@ -260,11 +274,11 @@ fn make_temp_digest(digest: &mut DigestInfo) {
impl LenEntry for FileEntryImpl {
#[inline]
fn len(&self) -> usize {
self.file_size as usize
self.size_on_disk() as usize
}

fn is_empty(&self) -> bool {
self.file_size == 0
self.data_size == 0
}

#[inline]
Expand Down Expand Up @@ -345,19 +359,22 @@ async fn add_files_to_cache<Fe: FileEntry>(
evicting_map: &EvictingMap<Arc<Fe>, SystemTime>,
anchor_time: &SystemTime,
shared_context: &Arc<SharedContext>,
block_size: u64,
) -> Result<(), Error> {
async fn process_entry<Fe: FileEntry>(
evicting_map: &EvictingMap<Arc<Fe>, SystemTime>,
file_name: &str,
atime: SystemTime,
file_size: u64,
data_size: u64,
block_size: u64,
anchor_time: &SystemTime,
shared_context: &Arc<SharedContext>,
) -> Result<(), Error> {
let digest = digest_from_filename(file_name)?;

let file_entry = Fe::create(
file_size,
data_size,
block_size,
RwLock::new(EncodedFilePath {
shared_context: shared_context.clone(),
path_type: PathType::Content,
Expand Down Expand Up @@ -409,8 +426,17 @@ async fn add_files_to_cache<Fe: FileEntry>(
};

file_infos.sort_by(|a, b| a.1.cmp(&b.1));
for (file_name, atime, file_size) in file_infos {
let result = process_entry(evicting_map, &file_name, atime, file_size, anchor_time, shared_context).await;
for (file_name, atime, data_size) in file_infos {
let result = process_entry(
evicting_map,
&file_name,
atime,
data_size,
block_size,
anchor_time,
shared_context,
)
.await;
if let Err(err) = result {
warn!(
"Could not add file to eviction cache, so deleting: {} - {:?}",
Expand Down Expand Up @@ -442,6 +468,7 @@ async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
shared_context: Arc<SharedContext>,
evicting_map: Arc<EvictingMap<Arc<Fe>, SystemTime>>,
block_size: u64,
read_buffer_size: usize,
sleep_fn: fn(Duration) -> Sleep,
rename_fn: fn(&OsString, &OsString) -> Result<(), std::io::Error>,
Expand Down Expand Up @@ -475,7 +502,13 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
temp_path: config.temp_path.clone(),
content_path: config.content_path.clone(),
});
add_files_to_cache(evicting_map.as_ref(), &now, &shared_context).await?;

let block_size = if config.block_size == 0 {
DEFAULT_BLOCK_SIZE
} else {
config.block_size
};
add_files_to_cache(evicting_map.as_ref(), &now, &shared_context, block_size).await?;
prune_temp_path(&shared_context.temp_path).await?;

let read_buffer_size = if config.read_buffer_size == 0 {
Expand All @@ -486,6 +519,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
let store = Self {
shared_context,
evicting_map,
block_size,
read_buffer_size,
sleep_fn,
rename_fn,
Expand All @@ -507,7 +541,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
final_digest: DigestInfo,
mut reader: DropCloserReadHalf,
) -> Result<(), Error> {
let mut file_size = 0;
let mut data_size = 0;
loop {
let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).await else {
// In the event we timeout, we want to close the writing file, to prevent
Expand All @@ -534,7 +568,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
.write_all_buf(&mut data)
.await
.err_tip(|| "Failed to write data into filesystem store")?;
file_size += data_len as u64;
data_size += data_len as u64;
}

resumeable_temp_file
Expand All @@ -548,7 +582,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
drop(resumeable_temp_file);
*entry.get_file_size() = file_size;
*entry.data_size_mut() = data_size;
let entry = Arc::new(entry);
// This sequence of events is quite ticky to understand due to the amount of triggers that
Expand Down Expand Up @@ -654,11 +688,14 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
let mut temp_digest = digest;
make_temp_digest(&mut temp_digest);

let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(EncodedFilePath {
shared_context: self.shared_context.clone(),
path_type: PathType::Temp,
digest: temp_digest,
})
let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(
self.block_size,
EncodedFilePath {
shared_context: self.shared_context.clone(),
path_type: PathType::Temp,
digest: temp_digest,
},
)
.await?;

self.update_file(entry, temp_file, digest, reader)
Expand Down
60 changes: 55 additions & 5 deletions nativelink-store/tests/filesystem_store_test.rs
Expand Up @@ -67,17 +67,18 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> Debug for TestFileEntry<Hook

#[async_trait]
impl<Hooks: FileEntryHooks + 'static + Sync + Send> FileEntry for TestFileEntry<Hooks> {
fn create(file_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
Self {
inner: Some(FileEntryImpl::create(file_size, encoded_file_path)),
inner: Some(FileEntryImpl::create(data_size, block_size, encoded_file_path)),
_phantom: PhantomData,
}
}

async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error> {
let (inner, file_slot, path) = FileEntryImpl::make_and_open_file(encoded_file_path).await?;
let (inner, file_slot, path) = FileEntryImpl::make_and_open_file(block_size, encoded_file_path).await?;
Ok((
Self {
inner: Some(inner),
Expand All @@ -88,8 +89,12 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> FileEntry for TestFileEntry<
))
}

fn get_file_size(&mut self) -> &mut u64 {
self.inner.as_mut().unwrap().get_file_size()
fn data_size_mut(&mut self) -> &mut u64 {
self.inner.as_mut().unwrap().data_size_mut()
}

fn size_on_disk(&self) -> u64 {
self.inner.as_ref().unwrap().size_on_disk()
}

fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
Expand Down Expand Up @@ -216,13 +221,15 @@ mod filesystem_store_tests {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
eviction_policy: None,
block_size: 1,
..Default::default()
})
.await?,
);

// Insert dummy value into store.
store.as_ref().update_oneshot(digest, VALUE1.into()).await?;

assert_eq!(
store.as_ref().has(digest).await,
Ok(Some(VALUE1.len())),
Expand Down Expand Up @@ -345,6 +352,7 @@ mod filesystem_store_tests {
max_count: 3,
..Default::default()
}),
block_size: 1,
read_buffer_size: 1,
})
.await?,
Expand Down Expand Up @@ -449,6 +457,7 @@ mod filesystem_store_tests {
max_count: 1,
..Default::default()
}),
block_size: 1,
read_buffer_size: 1,
})
.await?,
Expand Down Expand Up @@ -729,6 +738,7 @@ mod filesystem_store_tests {
max_bytes: 5,
..Default::default()
}),
block_size: 1,
..Default::default()
})
.await?,
Expand Down Expand Up @@ -1135,4 +1145,44 @@ mod filesystem_store_tests {

Ok(())
}

// Ensure that get_file_size() returns the correct number
// ceil(content length / block_size) * block_size
// assume block size 4K
// 1B data size = 4K size on disk
// 5K data size = 8K size on disk
#[tokio::test]
async fn get_file_size_uses_block_size() -> Result<(), Error> {
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");

let value_1kb: String = "x".repeat(1024);
let value_5kb: String = "xabcd".repeat(1024);

let digest_1kb = DigestInfo::try_new(HASH1, value_1kb.len())?;
let digest_5kb = DigestInfo::try_new(HASH2, value_5kb.len())?;

let store = Box::pin(
FilesystemStore::<FileEntryImpl>::new_with_timeout_and_rename_fn(
&nativelink_config::stores::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
read_buffer_size: 1,
..Default::default()
},
|_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
);

store.as_ref().update_oneshot(digest_1kb, value_1kb.into()).await?;
let short_entry = store.as_ref().get_file_entry_for_digest(&digest_1kb).await?;
assert_eq!(short_entry.size_on_disk(), 4 * 1024);

store.as_ref().update_oneshot(digest_5kb, value_5kb.into()).await?;
let long_entry = store.as_ref().get_file_entry_for_digest(&digest_5kb).await?;
assert_eq!(long_entry.size_on_disk(), 8 * 1024);
Ok(())
}
}

0 comments on commit 0639a59

Please sign in to comment.