From 0639a5973b9bc4fb81e5d53668f43de508aa2b35 Mon Sep 17 00:00:00 2001 From: Zach Birenbaum Date: Fri, 16 Feb 2024 14:18:57 -0800 Subject: [PATCH] Account for block size in filesystem store for eviction purposes (#661) - eviction_store will now treat FileEntries as if they consume the worst case amount of space on disk based on block size to prevent out of space errors. Prior behavior can be restored by setting `"block_size": 1` under filesystem in configuration file - Add `FileEntry::size_on_disk` - worst case disk space consumed by entry - Add test cases to `filesystem_store_test` ensuring correct `size_on_disk` - Add field in filesystem config for setting block size --- nativelink-config/src/stores.rs | 7 ++ nativelink-store/src/filesystem_store.rs | 85 +++++++++++++------ .../tests/filesystem_store_test.rs | 60 +++++++++++-- 3 files changed, 123 insertions(+), 29 deletions(-) diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 30c1449ac..babb3abd6 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -242,6 +242,13 @@ pub struct FilesystemStore { /// value will cause items to never be removed from the store causing /// infinite memory usage. pub eviction_policy: Option, + + /// The block size of the filesystem for the running machine + /// value is used to determine an entry's actual size on disk consumed + /// For a 4KB block size filesystem, a 1B file actually consumes 4KB + /// Default: 4096 + #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")] + pub block_size: u64, } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 4f057ca6d..a58802eb3 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -43,6 +43,8 @@ use crate::cas_utils::is_zero_digest; // Default size to allocate memory of the buffer when reading files. const DEFAULT_BUFF_SIZE: usize = 32 * 1024; +// Default block size of all major filesystems is 4KB +const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024; #[derive(Debug)] pub struct SharedContext { @@ -121,17 +123,21 @@ fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString { #[async_trait] pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static { /// Responsible for creating the underlying FileEntry. - fn create(file_size: u64, encoded_file_path: RwLock) -> Self; + fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock) -> Self; /// Creates a (usually) temp file, opens it and returns the path to the temp file. async fn make_and_open_file( + block_size: u64, encoded_file_path: EncodedFilePath, ) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error> where Self: Sized; - /// Returns the underlying reference to where the filesize is stored. - fn get_file_size(&mut self) -> &mut u64; + /// Returns the underlying reference to the size of the data in bytes + fn data_size_mut(&mut self) -> &mut u64; + + /// Returns the actual size of the underlying file on the disk after accounting for filesystem block size. + fn size_on_disk(&self) -> u64; /// Gets the underlying EncodedfilePath. fn get_encoded_file_path(&self) -> &RwLock; @@ -152,7 +158,8 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static { } pub struct FileEntryImpl { - file_size: u64, + data_size: u64, + block_size: u64, encoded_file_path: RwLock, } @@ -164,9 +171,10 @@ impl FileEntryImpl { #[async_trait] impl FileEntry for FileEntryImpl { - fn create(file_size: u64, encoded_file_path: RwLock) -> Self { + fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock) -> Self { Self { - file_size, + data_size, + block_size, encoded_file_path, } } @@ -175,6 +183,7 @@ impl FileEntry for FileEntryImpl { /// the cleanup of the file is handled without creating a FileEntry, which would /// try to cleanup the file as well during drop(). async fn make_and_open_file( + block_size: u64, encoded_file_path: EncodedFilePath, ) -> Result<(FileEntryImpl, fs::ResumeableFileSlot<'static>, OsString), Error> { let temp_full_path = encoded_file_path.get_file_path(); @@ -194,6 +203,7 @@ impl FileEntry for FileEntryImpl { Ok(( ::create( 0, /* Unknown yet, we will fill it in later */ + block_size, RwLock::new(encoded_file_path), ), temp_file_result, @@ -201,8 +211,12 @@ impl FileEntry for FileEntryImpl { )) } - fn get_file_size(&mut self) -> &mut u64 { - &mut self.file_size + fn data_size_mut(&mut self) -> &mut u64 { + &mut self.data_size + } + + fn size_on_disk(&self) -> u64 { + self.data_size.div_ceil(self.block_size) * self.block_size } fn get_encoded_file_path(&self) -> &RwLock { @@ -245,7 +259,7 @@ impl FileEntry for FileEntryImpl { impl Debug for FileEntryImpl { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { f.debug_struct("FileEntryImpl") - .field("file_size", &self.file_size) + .field("data_size", &self.data_size) .field("encoded_file_path", &"") .finish() } @@ -260,11 +274,11 @@ fn make_temp_digest(digest: &mut DigestInfo) { impl LenEntry for FileEntryImpl { #[inline] fn len(&self) -> usize { - self.file_size as usize + self.size_on_disk() as usize } fn is_empty(&self) -> bool { - self.file_size == 0 + self.data_size == 0 } #[inline] @@ -345,19 +359,22 @@ async fn add_files_to_cache( evicting_map: &EvictingMap, SystemTime>, anchor_time: &SystemTime, shared_context: &Arc, + block_size: u64, ) -> Result<(), Error> { async fn process_entry( evicting_map: &EvictingMap, SystemTime>, file_name: &str, atime: SystemTime, - file_size: u64, + data_size: u64, + block_size: u64, anchor_time: &SystemTime, shared_context: &Arc, ) -> Result<(), Error> { let digest = digest_from_filename(file_name)?; let file_entry = Fe::create( - file_size, + data_size, + block_size, RwLock::new(EncodedFilePath { shared_context: shared_context.clone(), path_type: PathType::Content, @@ -409,8 +426,17 @@ async fn add_files_to_cache( }; file_infos.sort_by(|a, b| a.1.cmp(&b.1)); - for (file_name, atime, file_size) in file_infos { - let result = process_entry(evicting_map, &file_name, atime, file_size, anchor_time, shared_context).await; + for (file_name, atime, data_size) in file_infos { + let result = process_entry( + evicting_map, + &file_name, + atime, + data_size, + block_size, + anchor_time, + shared_context, + ) + .await; if let Err(err) = result { warn!( "Could not add file to eviction cache, so deleting: {} - {:?}", @@ -442,6 +468,7 @@ async fn prune_temp_path(temp_path: &str) -> Result<(), Error> { pub struct FilesystemStore { shared_context: Arc, evicting_map: Arc, SystemTime>>, + block_size: u64, read_buffer_size: usize, sleep_fn: fn(Duration) -> Sleep, rename_fn: fn(&OsString, &OsString) -> Result<(), std::io::Error>, @@ -475,7 +502,13 @@ impl FilesystemStore { temp_path: config.temp_path.clone(), content_path: config.content_path.clone(), }); - add_files_to_cache(evicting_map.as_ref(), &now, &shared_context).await?; + + let block_size = if config.block_size == 0 { + DEFAULT_BLOCK_SIZE + } else { + config.block_size + }; + add_files_to_cache(evicting_map.as_ref(), &now, &shared_context, block_size).await?; prune_temp_path(&shared_context.temp_path).await?; let read_buffer_size = if config.read_buffer_size == 0 { @@ -486,6 +519,7 @@ impl FilesystemStore { let store = Self { shared_context, evicting_map, + block_size, read_buffer_size, sleep_fn, rename_fn, @@ -507,7 +541,7 @@ impl FilesystemStore { final_digest: DigestInfo, mut reader: DropCloserReadHalf, ) -> Result<(), Error> { - let mut file_size = 0; + let mut data_size = 0; loop { let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).await else { // In the event we timeout, we want to close the writing file, to prevent @@ -534,7 +568,7 @@ impl FilesystemStore { .write_all_buf(&mut data) .await .err_tip(|| "Failed to write data into filesystem store")?; - file_size += data_len as u64; + data_size += data_len as u64; } resumeable_temp_file @@ -548,7 +582,7 @@ impl FilesystemStore { drop(resumeable_temp_file); - *entry.get_file_size() = file_size; + *entry.data_size_mut() = data_size; let entry = Arc::new(entry); // This sequence of events is quite ticky to understand due to the amount of triggers that @@ -654,11 +688,14 @@ impl Store for FilesystemStore { let mut temp_digest = digest; make_temp_digest(&mut temp_digest); - let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(EncodedFilePath { - shared_context: self.shared_context.clone(), - path_type: PathType::Temp, - digest: temp_digest, - }) + let (entry, temp_file, temp_full_path) = Fe::make_and_open_file( + self.block_size, + EncodedFilePath { + shared_context: self.shared_context.clone(), + path_type: PathType::Temp, + digest: temp_digest, + }, + ) .await?; self.update_file(entry, temp_file, digest, reader) diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 1bbb08779..1c727107e 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -67,17 +67,18 @@ impl Debug for TestFileEntry FileEntry for TestFileEntry { - fn create(file_size: u64, encoded_file_path: RwLock) -> Self { + fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock) -> Self { Self { - inner: Some(FileEntryImpl::create(file_size, encoded_file_path)), + inner: Some(FileEntryImpl::create(data_size, block_size, encoded_file_path)), _phantom: PhantomData, } } async fn make_and_open_file( + block_size: u64, encoded_file_path: EncodedFilePath, ) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error> { - let (inner, file_slot, path) = FileEntryImpl::make_and_open_file(encoded_file_path).await?; + let (inner, file_slot, path) = FileEntryImpl::make_and_open_file(block_size, encoded_file_path).await?; Ok(( Self { inner: Some(inner), @@ -88,8 +89,12 @@ impl FileEntry for TestFileEntry< )) } - fn get_file_size(&mut self) -> &mut u64 { - self.inner.as_mut().unwrap().get_file_size() + fn data_size_mut(&mut self) -> &mut u64 { + self.inner.as_mut().unwrap().data_size_mut() + } + + fn size_on_disk(&self) -> u64 { + self.inner.as_ref().unwrap().size_on_disk() } fn get_encoded_file_path(&self) -> &RwLock { @@ -216,6 +221,7 @@ mod filesystem_store_tests { content_path: content_path.clone(), temp_path: temp_path.clone(), eviction_policy: None, + block_size: 1, ..Default::default() }) .await?, @@ -223,6 +229,7 @@ mod filesystem_store_tests { // Insert dummy value into store. store.as_ref().update_oneshot(digest, VALUE1.into()).await?; + assert_eq!( store.as_ref().has(digest).await, Ok(Some(VALUE1.len())), @@ -345,6 +352,7 @@ mod filesystem_store_tests { max_count: 3, ..Default::default() }), + block_size: 1, read_buffer_size: 1, }) .await?, @@ -449,6 +457,7 @@ mod filesystem_store_tests { max_count: 1, ..Default::default() }), + block_size: 1, read_buffer_size: 1, }) .await?, @@ -729,6 +738,7 @@ mod filesystem_store_tests { max_bytes: 5, ..Default::default() }), + block_size: 1, ..Default::default() }) .await?, @@ -1135,4 +1145,44 @@ mod filesystem_store_tests { Ok(()) } + + // Ensure that get_file_size() returns the correct number + // ceil(content length / block_size) * block_size + // assume block size 4K + // 1B data size = 4K size on disk + // 5K data size = 8K size on disk + #[tokio::test] + async fn get_file_size_uses_block_size() -> Result<(), Error> { + let content_path = make_temp_path("content_path"); + let temp_path = make_temp_path("temp_path"); + + let value_1kb: String = "x".repeat(1024); + let value_5kb: String = "xabcd".repeat(1024); + + let digest_1kb = DigestInfo::try_new(HASH1, value_1kb.len())?; + let digest_5kb = DigestInfo::try_new(HASH2, value_5kb.len())?; + + let store = Box::pin( + FilesystemStore::::new_with_timeout_and_rename_fn( + &nativelink_config::stores::FilesystemStore { + content_path: content_path.clone(), + temp_path: temp_path.clone(), + read_buffer_size: 1, + ..Default::default() + }, + |_| sleep(Duration::ZERO), + |from, to| std::fs::rename(from, to), + ) + .await?, + ); + + store.as_ref().update_oneshot(digest_1kb, value_1kb.into()).await?; + let short_entry = store.as_ref().get_file_entry_for_digest(&digest_1kb).await?; + assert_eq!(short_entry.size_on_disk(), 4 * 1024); + + store.as_ref().update_oneshot(digest_5kb, value_5kb.into()).await?; + let long_entry = store.as_ref().get_file_entry_for_digest(&digest_5kb).await?; + assert_eq!(long_entry.size_on_disk(), 8 * 1024); + Ok(()) + } }