Skip to content

Commit

Permalink
Account for block size in filesystem store for eviction purposes
Browse files Browse the repository at this point in the history
  • Loading branch information
zbirenbaum committed Feb 15, 2024
1 parent 4e7d68b commit 429acb5
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 29 deletions.
7 changes: 7 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ pub struct FilesystemStore {
/// value will cause items to never be removed from the store causing
/// infinite memory usage.
pub eviction_policy: Option<EvictionPolicy>,

/// The block size of the filesystem for the running machine
/// value is used to determine an entry's actual size on disk consumed
/// For a 4KB block size filesystem, a 1B file actually consumes 4KB
/// Default: 4096
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub block_size: u64,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
85 changes: 61 additions & 24 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ use crate::cas_utils::is_zero_digest;

// Default size to allocate memory of the buffer when reading files.
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
// Default block size of all major filesystems is 4KB
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;

#[derive(Debug)]
pub struct SharedContext {
Expand Down Expand Up @@ -119,17 +121,21 @@ fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString {
#[async_trait]
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
/// Responsible for creating the underlying FileEntry.
fn create(file_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;

/// Creates a (usually) temp file, opens it and returns the path to the temp file.
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error>
where
Self: Sized;

/// Returns the underlying reference to where the filesize is stored.
fn get_file_size(&mut self) -> &mut u64;
/// Returns the underlying reference to the size of the data in bytes
fn data_size_mut(&mut self) -> &mut u64;

/// Returns the actual size of the underlying file on the disk after accounting for filesystem block size.
fn size_on_disk(&self) -> u64;

/// Gets the underlying EncodedfilePath.
fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
Expand All @@ -150,7 +156,8 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
}

pub struct FileEntryImpl {
file_size: u64,
data_size: u64,
block_size: u64,
encoded_file_path: RwLock<EncodedFilePath>,
}

Expand All @@ -162,9 +169,10 @@ impl FileEntryImpl {

#[async_trait]
impl FileEntry for FileEntryImpl {
fn create(file_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
Self {
file_size,
data_size,
block_size,
encoded_file_path,
}
}
Expand All @@ -173,6 +181,7 @@ impl FileEntry for FileEntryImpl {
/// the cleanup of the file is handled without creating a FileEntry, which would
/// try to cleanup the file as well during drop().
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(FileEntryImpl, fs::ResumeableFileSlot<'static>, OsString), Error> {
let temp_full_path = encoded_file_path.get_file_path();
Expand All @@ -192,15 +201,20 @@ impl FileEntry for FileEntryImpl {
Ok((
<FileEntryImpl as FileEntry>::create(
0, /* Unknown yet, we will fill it in later */
block_size,
RwLock::new(encoded_file_path),
),
temp_file_result,
temp_full_path,
))
}

fn get_file_size(&mut self) -> &mut u64 {
&mut self.file_size
fn data_size_mut(&mut self) -> &mut u64 {
&mut self.data_size
}

fn size_on_disk(&self) -> u64 {
self.data_size.div_ceil(self.block_size) * self.block_size
}

fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
Expand Down Expand Up @@ -243,7 +257,7 @@ impl FileEntry for FileEntryImpl {
impl Debug for FileEntryImpl {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
f.debug_struct("FileEntryImpl")
.field("file_size", &self.file_size)
.field("data_size", &self.data_size)
.field("encoded_file_path", &"<behind mutex>")
.finish()
}
Expand All @@ -258,11 +272,11 @@ fn make_temp_digest(digest: &mut DigestInfo) {
impl LenEntry for FileEntryImpl {
#[inline]
fn len(&self) -> usize {
self.file_size as usize
self.size_on_disk() as usize
}

fn is_empty(&self) -> bool {
self.file_size == 0
self.data_size == 0
}

#[inline]
Expand Down Expand Up @@ -343,19 +357,22 @@ async fn add_files_to_cache<Fe: FileEntry>(
evicting_map: &EvictingMap<Arc<Fe>, SystemTime>,
anchor_time: &SystemTime,
shared_context: &Arc<SharedContext>,
block_size: u64,
) -> Result<(), Error> {
async fn process_entry<Fe: FileEntry>(
evicting_map: &EvictingMap<Arc<Fe>, SystemTime>,
file_name: &str,
atime: SystemTime,
file_size: u64,
data_size: u64,
block_size: u64,
anchor_time: &SystemTime,
shared_context: &Arc<SharedContext>,
) -> Result<(), Error> {
let digest = digest_from_filename(file_name)?;

let file_entry = Fe::create(
file_size,
data_size,
block_size,
RwLock::new(EncodedFilePath {
shared_context: shared_context.clone(),
path_type: PathType::Content,
Expand Down Expand Up @@ -407,8 +424,17 @@ async fn add_files_to_cache<Fe: FileEntry>(
};

file_infos.sort_by(|a, b| a.1.cmp(&b.1));
for (file_name, atime, file_size) in file_infos {
let result = process_entry(evicting_map, &file_name, atime, file_size, anchor_time, shared_context).await;
for (file_name, atime, data_size) in file_infos {
let result = process_entry(
evicting_map,
&file_name,
atime,
data_size,
block_size,
anchor_time,
shared_context,
)
.await;
if let Err(err) = result {
warn!(
"Could not add file to eviction cache, so deleting: {} - {:?}",
Expand Down Expand Up @@ -440,6 +466,7 @@ async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
shared_context: Arc<SharedContext>,
evicting_map: Arc<EvictingMap<Arc<Fe>, SystemTime>>,
block_size: u64,
read_buffer_size: usize,
sleep_fn: fn(Duration) -> Sleep,
rename_fn: fn(&OsString, &OsString) -> Result<(), std::io::Error>,
Expand Down Expand Up @@ -473,7 +500,13 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
temp_path: config.temp_path.clone(),
content_path: config.content_path.clone(),
});
add_files_to_cache(evicting_map.as_ref(), &now, &shared_context).await?;

let block_size = if config.block_size == 0 {
DEFAULT_BLOCK_SIZE
} else {
config.block_size
};
add_files_to_cache(evicting_map.as_ref(), &now, &shared_context, block_size).await?;
prune_temp_path(&shared_context.temp_path).await?;

let read_buffer_size = if config.read_buffer_size == 0 {
Expand All @@ -484,6 +517,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
let store = Self {
shared_context,
evicting_map,
block_size,
read_buffer_size,
sleep_fn,
rename_fn,
Expand All @@ -505,7 +539,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
final_digest: DigestInfo,
mut reader: DropCloserReadHalf,
) -> Result<(), Error> {
let mut file_size = 0;
let mut data_size = 0;
loop {
let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).await else {
// In the event we timeout, we want to close the writing file, to prevent
Expand All @@ -532,7 +566,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
.write_all_buf(&mut data)
.await
.err_tip(|| "Failed to write data into filesystem store")?;
file_size += data_len as u64;
data_size += data_len as u64;
}

resumeable_temp_file
Expand All @@ -546,7 +580,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
drop(resumeable_temp_file);
*entry.get_file_size() = file_size;
*entry.data_size_mut() = data_size;
let entry = Arc::new(entry);
// This sequence of events is quite ticky to understand due to the amount of triggers that
Expand Down Expand Up @@ -652,11 +686,14 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
let mut temp_digest = digest;
make_temp_digest(&mut temp_digest);

let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(EncodedFilePath {
shared_context: self.shared_context.clone(),
path_type: PathType::Temp,
digest: temp_digest,
})
let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(
self.block_size,
EncodedFilePath {
shared_context: self.shared_context.clone(),
path_type: PathType::Temp,
digest: temp_digest,
},
)
.await?;

self.update_file(entry, temp_file, digest, reader)
Expand Down
62 changes: 57 additions & 5 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,18 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> Debug for TestFileEntry<Hook

#[async_trait]
impl<Hooks: FileEntryHooks + 'static + Sync + Send> FileEntry for TestFileEntry<Hooks> {
fn create(file_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
Self {
inner: Some(FileEntryImpl::create(file_size, encoded_file_path)),
inner: Some(FileEntryImpl::create(data_size, block_size, encoded_file_path)),
_phantom: PhantomData,
}
}

async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error> {
let (inner, file_slot, path) = FileEntryImpl::make_and_open_file(encoded_file_path).await?;
let (inner, file_slot, path) = FileEntryImpl::make_and_open_file(block_size, encoded_file_path).await?;
Ok((
Self {
inner: Some(inner),
Expand All @@ -88,8 +89,12 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> FileEntry for TestFileEntry<
))
}

fn get_file_size(&mut self) -> &mut u64 {
self.inner.as_mut().unwrap().get_file_size()
fn data_size_mut(&mut self) -> &mut u64 {
self.inner.as_mut().unwrap().data_size_mut()
}

fn size_on_disk(&self) -> u64 {
self.inner.as_ref().unwrap().size_on_disk()
}

fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
Expand Down Expand Up @@ -216,13 +221,15 @@ mod filesystem_store_tests {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
eviction_policy: None,
block_size: 1,
..Default::default()
})
.await?,
);

// Insert dummy value into store.
store.as_ref().update_oneshot(digest, VALUE1.into()).await?;

assert_eq!(
store.as_ref().has(digest).await,
Ok(Some(VALUE1.len())),
Expand Down Expand Up @@ -345,6 +352,7 @@ mod filesystem_store_tests {
max_count: 3,
..Default::default()
}),
block_size: 1,
read_buffer_size: 1,
})
.await?,
Expand Down Expand Up @@ -449,6 +457,7 @@ mod filesystem_store_tests {
max_count: 1,
..Default::default()
}),
block_size: 1,
read_buffer_size: 1,
})
.await?,
Expand Down Expand Up @@ -729,6 +738,7 @@ mod filesystem_store_tests {
max_bytes: 5,
..Default::default()
}),
block_size: 1,
..Default::default()
})
.await?,
Expand Down Expand Up @@ -1135,4 +1145,46 @@ mod filesystem_store_tests {

Ok(())
}

// Ensure that get_file_size() returns the correct number
// ceil(content length / block_size) * block_size
// Content length is 1B and block_size is 4B: size on disk = 4B
// Content length is 5B and block_size is 4B: size on disk = 8B
#[tokio::test]
async fn get_file_size_uses_block_size() -> Result<(), Error> {
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");

const VALUE_1B: &str = "x";
const VALUE_5B: &str = "xabcd";

let digest_1b = DigestInfo::try_new(HASH1, VALUE_1B.len())?;
let digest_5b = DigestInfo::try_new(HASH2, VALUE_5B.len())?;

let block_size = 4;

let store = Box::pin(
FilesystemStore::<FileEntryImpl>::new_with_timeout_and_rename_fn(
&nativelink_config::stores::FilesystemStore {
content_path: content_path.clone(),
block_size,
temp_path: temp_path.clone(),
read_buffer_size: 1,
..Default::default()
},
|_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
);

store.as_ref().update_oneshot(digest_1b, VALUE_1B.into()).await?;
let short_entry = store.as_ref().get_file_entry_for_digest(&digest_1b).await?;
assert_eq!(short_entry.size_on_disk(), 4);

store.as_ref().update_oneshot(digest_5b, VALUE_5B.into()).await?;
let long_entry = store.as_ref().get_file_entry_for_digest(&digest_5b).await?;
assert_eq!(long_entry.size_on_disk(), 8);
Ok(())
}
}

0 comments on commit 429acb5

Please sign in to comment.