diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 30c1449ac..babb3abd6 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -242,6 +242,13 @@ pub struct FilesystemStore { /// value will cause items to never be removed from the store causing /// infinite memory usage. pub eviction_policy: Option, + + /// The block size of the filesystem for the running machine + /// value is used to determine an entry's actual size on disk consumed + /// For a 4KB block size filesystem, a 1B file actually consumes 4KB + /// Default: 4096 + #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")] + pub block_size: u64, } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 94a2c342f..1047ca91d 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -41,6 +41,8 @@ use crate::cas_utils::is_zero_digest; // Default size to allocate memory of the buffer when reading files. const DEFAULT_BUFF_SIZE: usize = 32 * 1024; +// Default block size of all major filesystems is 4KB +const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024; #[derive(Debug)] pub struct SharedContext { @@ -119,17 +121,21 @@ fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString { #[async_trait] pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static { /// Responsible for creating the underlying FileEntry. - fn create(file_size: u64, encoded_file_path: RwLock) -> Self; + fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock) -> Self; /// Creates a (usually) temp file, opens it and returns the path to the temp file. async fn make_and_open_file( + block_size: u64, encoded_file_path: EncodedFilePath, ) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error> where Self: Sized; - /// Returns the underlying reference to where the filesize is stored. - fn get_file_size(&mut self) -> &mut u64; + /// Returns the underlying reference to the size of the data in bytes + fn data_size_mut(&mut self) -> &mut u64; + + /// Returns the actual size of the underlying file on the disk after accounting for filesystem block size. + fn size_on_disk(&self) -> u64; /// Gets the underlying EncodedfilePath. fn get_encoded_file_path(&self) -> &RwLock; @@ -150,7 +156,8 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static { } pub struct FileEntryImpl { - file_size: u64, + data_size: u64, + block_size: u64, encoded_file_path: RwLock, } @@ -162,9 +169,10 @@ impl FileEntryImpl { #[async_trait] impl FileEntry for FileEntryImpl { - fn create(file_size: u64, encoded_file_path: RwLock) -> Self { + fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock) -> Self { Self { - file_size, + data_size, + block_size, encoded_file_path, } } @@ -173,6 +181,7 @@ impl FileEntry for FileEntryImpl { /// the cleanup of the file is handled without creating a FileEntry, which would /// try to cleanup the file as well during drop(). async fn make_and_open_file( + block_size: u64, encoded_file_path: EncodedFilePath, ) -> Result<(FileEntryImpl, fs::ResumeableFileSlot<'static>, OsString), Error> { let temp_full_path = encoded_file_path.get_file_path(); @@ -192,6 +201,7 @@ impl FileEntry for FileEntryImpl { Ok(( ::create( 0, /* Unknown yet, we will fill it in later */ + block_size, RwLock::new(encoded_file_path), ), temp_file_result, @@ -199,8 +209,12 @@ impl FileEntry for FileEntryImpl { )) } - fn get_file_size(&mut self) -> &mut u64 { - &mut self.file_size + fn data_size_mut(&mut self) -> &mut u64 { + &mut self.data_size + } + + fn size_on_disk(&self) -> u64 { + self.data_size.div_ceil(self.block_size) * self.block_size } fn get_encoded_file_path(&self) -> &RwLock { @@ -243,7 +257,7 @@ impl FileEntry for FileEntryImpl { impl Debug for FileEntryImpl { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { f.debug_struct("FileEntryImpl") - .field("file_size", &self.file_size) + .field("data_size", &self.data_size) .field("encoded_file_path", &"") .finish() } @@ -258,11 +272,11 @@ fn make_temp_digest(digest: &mut DigestInfo) { impl LenEntry for FileEntryImpl { #[inline] fn len(&self) -> usize { - self.file_size as usize + self.size_on_disk() as usize } fn is_empty(&self) -> bool { - self.file_size == 0 + self.data_size == 0 } #[inline] @@ -343,19 +357,22 @@ async fn add_files_to_cache( evicting_map: &EvictingMap, SystemTime>, anchor_time: &SystemTime, shared_context: &Arc, + block_size: u64, ) -> Result<(), Error> { async fn process_entry( evicting_map: &EvictingMap, SystemTime>, file_name: &str, atime: SystemTime, - file_size: u64, + data_size: u64, + block_size: u64, anchor_time: &SystemTime, shared_context: &Arc, ) -> Result<(), Error> { let digest = digest_from_filename(file_name)?; let file_entry = Fe::create( - file_size, + data_size, + block_size, RwLock::new(EncodedFilePath { shared_context: shared_context.clone(), path_type: PathType::Content, @@ -407,8 +424,17 @@ async fn add_files_to_cache( }; file_infos.sort_by(|a, b| a.1.cmp(&b.1)); - for (file_name, atime, file_size) in file_infos { - let result = process_entry(evicting_map, &file_name, atime, file_size, anchor_time, shared_context).await; + for (file_name, atime, data_size) in file_infos { + let result = process_entry( + evicting_map, + &file_name, + atime, + data_size, + block_size, + anchor_time, + shared_context, + ) + .await; if let Err(err) = result { warn!( "Could not add file to eviction cache, so deleting: {} - {:?}", @@ -440,6 +466,7 @@ async fn prune_temp_path(temp_path: &str) -> Result<(), Error> { pub struct FilesystemStore { shared_context: Arc, evicting_map: Arc, SystemTime>>, + block_size: u64, read_buffer_size: usize, sleep_fn: fn(Duration) -> Sleep, rename_fn: fn(&OsString, &OsString) -> Result<(), std::io::Error>, @@ -473,7 +500,13 @@ impl FilesystemStore { temp_path: config.temp_path.clone(), content_path: config.content_path.clone(), }); - add_files_to_cache(evicting_map.as_ref(), &now, &shared_context).await?; + + let block_size = if config.block_size == 0 { + DEFAULT_BLOCK_SIZE + } else { + config.block_size + }; + add_files_to_cache(evicting_map.as_ref(), &now, &shared_context, block_size).await?; prune_temp_path(&shared_context.temp_path).await?; let read_buffer_size = if config.read_buffer_size == 0 { @@ -484,6 +517,7 @@ impl FilesystemStore { let store = Self { shared_context, evicting_map, + block_size, read_buffer_size, sleep_fn, rename_fn, @@ -505,7 +539,7 @@ impl FilesystemStore { final_digest: DigestInfo, mut reader: DropCloserReadHalf, ) -> Result<(), Error> { - let mut file_size = 0; + let mut data_size = 0; loop { let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).await else { // In the event we timeout, we want to close the writing file, to prevent @@ -532,7 +566,7 @@ impl FilesystemStore { .write_all_buf(&mut data) .await .err_tip(|| "Failed to write data into filesystem store")?; - file_size += data_len as u64; + data_size += data_len as u64; } resumeable_temp_file @@ -546,7 +580,7 @@ impl FilesystemStore { drop(resumeable_temp_file); - *entry.get_file_size() = file_size; + *entry.data_size_mut() = data_size; let entry = Arc::new(entry); // This sequence of events is quite ticky to understand due to the amount of triggers that @@ -652,11 +686,14 @@ impl Store for FilesystemStore { let mut temp_digest = digest; make_temp_digest(&mut temp_digest); - let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(EncodedFilePath { - shared_context: self.shared_context.clone(), - path_type: PathType::Temp, - digest: temp_digest, - }) + let (entry, temp_file, temp_full_path) = Fe::make_and_open_file( + self.block_size, + EncodedFilePath { + shared_context: self.shared_context.clone(), + path_type: PathType::Temp, + digest: temp_digest, + }, + ) .await?; self.update_file(entry, temp_file, digest, reader) diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 1bbb08779..1c727107e 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -67,17 +67,18 @@ impl Debug for TestFileEntry FileEntry for TestFileEntry { - fn create(file_size: u64, encoded_file_path: RwLock) -> Self { + fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock) -> Self { Self { - inner: Some(FileEntryImpl::create(file_size, encoded_file_path)), + inner: Some(FileEntryImpl::create(data_size, block_size, encoded_file_path)), _phantom: PhantomData, } } async fn make_and_open_file( + block_size: u64, encoded_file_path: EncodedFilePath, ) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error> { - let (inner, file_slot, path) = FileEntryImpl::make_and_open_file(encoded_file_path).await?; + let (inner, file_slot, path) = FileEntryImpl::make_and_open_file(block_size, encoded_file_path).await?; Ok(( Self { inner: Some(inner), @@ -88,8 +89,12 @@ impl FileEntry for TestFileEntry< )) } - fn get_file_size(&mut self) -> &mut u64 { - self.inner.as_mut().unwrap().get_file_size() + fn data_size_mut(&mut self) -> &mut u64 { + self.inner.as_mut().unwrap().data_size_mut() + } + + fn size_on_disk(&self) -> u64 { + self.inner.as_ref().unwrap().size_on_disk() } fn get_encoded_file_path(&self) -> &RwLock { @@ -216,6 +221,7 @@ mod filesystem_store_tests { content_path: content_path.clone(), temp_path: temp_path.clone(), eviction_policy: None, + block_size: 1, ..Default::default() }) .await?, @@ -223,6 +229,7 @@ mod filesystem_store_tests { // Insert dummy value into store. store.as_ref().update_oneshot(digest, VALUE1.into()).await?; + assert_eq!( store.as_ref().has(digest).await, Ok(Some(VALUE1.len())), @@ -345,6 +352,7 @@ mod filesystem_store_tests { max_count: 3, ..Default::default() }), + block_size: 1, read_buffer_size: 1, }) .await?, @@ -449,6 +457,7 @@ mod filesystem_store_tests { max_count: 1, ..Default::default() }), + block_size: 1, read_buffer_size: 1, }) .await?, @@ -729,6 +738,7 @@ mod filesystem_store_tests { max_bytes: 5, ..Default::default() }), + block_size: 1, ..Default::default() }) .await?, @@ -1135,4 +1145,44 @@ mod filesystem_store_tests { Ok(()) } + + // Ensure that get_file_size() returns the correct number + // ceil(content length / block_size) * block_size + // assume block size 4K + // 1B data size = 4K size on disk + // 5K data size = 8K size on disk + #[tokio::test] + async fn get_file_size_uses_block_size() -> Result<(), Error> { + let content_path = make_temp_path("content_path"); + let temp_path = make_temp_path("temp_path"); + + let value_1kb: String = "x".repeat(1024); + let value_5kb: String = "xabcd".repeat(1024); + + let digest_1kb = DigestInfo::try_new(HASH1, value_1kb.len())?; + let digest_5kb = DigestInfo::try_new(HASH2, value_5kb.len())?; + + let store = Box::pin( + FilesystemStore::::new_with_timeout_and_rename_fn( + &nativelink_config::stores::FilesystemStore { + content_path: content_path.clone(), + temp_path: temp_path.clone(), + read_buffer_size: 1, + ..Default::default() + }, + |_| sleep(Duration::ZERO), + |from, to| std::fs::rename(from, to), + ) + .await?, + ); + + store.as_ref().update_oneshot(digest_1kb, value_1kb.into()).await?; + let short_entry = store.as_ref().get_file_entry_for_digest(&digest_1kb).await?; + assert_eq!(short_entry.size_on_disk(), 4 * 1024); + + store.as_ref().update_oneshot(digest_5kb, value_5kb.into()).await?; + let long_entry = store.as_ref().get_file_entry_for_digest(&digest_5kb).await?; + assert_eq!(long_entry.size_on_disk(), 8 * 1024); + Ok(()) + } }