Skip to content

Commit

Permalink
Fixed various bugs in filesystem store
Browse files Browse the repository at this point in the history
Fixes a couple bugs regarding temp file storage during
file eviction. In addition fixes some newly found bugs.
  • Loading branch information
allada committed Apr 6, 2022
1 parent a451628 commit 7ba407d
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 32 deletions.
3 changes: 3 additions & 0 deletions cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ rust_library(
"//third_party:bytes",
"//third_party:filetime",
"//third_party:futures",
"//third_party:nix",
"//third_party:rand",
"//third_party:tokio",
"//third_party:tokio_stream",
Expand Down Expand Up @@ -285,6 +286,8 @@ rust_test(
"//third_party:pretty_assertions",
"//third_party:rand",
"//third_party:tokio",
"//third_party:tokio_stream",
"//util:buf_channel",
"//util:common",
"//util:error",
":filesystem_store",
Expand Down
89 changes: 68 additions & 21 deletions cas/store/filesystem_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2021 Nathan (Blaise) Bruer. All rights reserved.

use std::path::Path;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
Expand All @@ -9,6 +10,7 @@ use async_trait::async_trait;
use bytes::BytesMut;
use filetime::{set_file_atime, FileTime};
use futures::stream::{StreamExt, TryStreamExt};
use nix::fcntl::{renameat2, RenameFlags};
use rand::{thread_rng, Rng};
use tokio::fs::{self, File};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom, Take};
Expand Down Expand Up @@ -47,6 +49,11 @@ impl FileEntry {
.err_tip(|| format!("Failed to seek file: {}", full_content_path))?;
Ok(file.take(length))
}

#[inline]
fn flag_moved_to_temp_file(&self, rand_file_name: u64) {
self.pending_delete_file_name.store(rand_file_name, Ordering::Relaxed);
}
}

#[async_trait]
Expand Down Expand Up @@ -77,18 +84,19 @@ impl LenEntry for FileEntry {

#[inline]
async fn unref(&self) {
self.pending_delete_file_name
.store(thread_rng().gen::<u64>(), Ordering::Relaxed);
let rand_file_name = thread_rng().gen::<u64>();

let from_path = to_full_path_from_digest(&self.content_path, &self.digest);
let to_path = to_full_path(&self.temp_path, &self.temp_path.to_string());
log::info!("\x1b[0;31mFilesystem Store\x1b[0m: Store evicting: {}", &from_path);
let to_path = to_full_path(&self.temp_path, &format!("{}", rand_file_name));
log::info!("\x1b[0;31mFilesystem Store\x1b[0m: Deleting: {}", &from_path);
// It is possible (although extremely unlikely) that another thread is reading
// this file while we want to delete it here. To prevent errors in either case
// we rename the file (since that other thread would have an open file handle)
// to the temp folder then delete it when the Arc reference is dropped.
if let Err(err) = fs::rename(&from_path, &to_path).await {
log::warn!("Failed to rename file from {} to {} : {:?}", from_path, to_path, err);
}
self.flag_moved_to_temp_file(rand_file_name);
}
}

Expand Down Expand Up @@ -238,6 +246,7 @@ pub struct FilesystemStore {
temp_path: Arc<String>,
content_path: Arc<String>,
evicting_map: EvictingMap<Arc<FileEntry>, SystemTime>,
read_buffer_size: usize,
}

impl FilesystemStore {
Expand All @@ -260,10 +269,16 @@ impl FilesystemStore {
add_files_to_cache(&evicting_map, &now, &temp_path, &content_path).await?;
prune_temp_path(&temp_path.as_ref()).await?;

let read_buffer_size = if config.read_buffer_size == 0 {
DEFAULT_BUFF_SIZE
} else {
config.read_buffer_size as usize
};
let store = Self {
temp_path: Arc::new(config.temp_path.clone()),
content_path: Arc::new(config.content_path.clone()),
evicting_map,
read_buffer_size,
};
Ok(store)
}
Expand All @@ -274,8 +289,9 @@ impl FilesystemStore {

async fn update_file(
self: Pin<&Self>,
file_path: &str,
file: &mut File,
temp_loc: &str,
temp_file: &mut File,
temp_name_num: u64,
digest: DigestInfo,
mut reader: DropCloserReadHalf,
) -> Result<(), Error> {
Expand All @@ -289,9 +305,10 @@ impl FilesystemStore {
if data_len == 0 {
break; // EOF.
}
file.write_all_buf(&mut data)
temp_file
.write_all_buf(&mut data)
.await
.err_tip(|| format!("Failed to write data into filesystem store {}", file_path))?;
.err_tip(|| format!("Failed to write data into filesystem store {}", temp_loc))?;
file_size += data_len as u64;
}

Expand All @@ -303,15 +320,42 @@ impl FilesystemStore {
pending_delete_file_name: AtomicU64::new(0),
});

let final_path = to_full_path_from_digest(&self.content_path, &digest);
fs::rename(&file_path, &final_path).await.err_tip(|| {
format!(
"Failed to move file from temp directory to content directory {}",
file_path
)
})?;
let final_loc = to_full_path_from_digest(&self.content_path, &digest);

self.evicting_map.insert(digest, entry).await;
let final_path = Path::new(&final_loc);
let current_path = Path::new(&temp_loc);
let rename_flags = if final_path.exists() {
RenameFlags::RENAME_EXCHANGE
} else {
RenameFlags::empty()
};

// TODO(allada) We should find another way to do this without needing to use this nix
// library. We need a way to atomically swap files, so if one location is downloading the
// file and another replaces the contents it doesn't error out the downloading one.
// We can't use two operations here because a `.has()/.get()` call might see it existing
// then try and download it, but if we get EXTREMELY unlucky we might move the file then
// another location might try to run `.get()` on it, and the file won't exist then the
// second rename might happen (finishing the stages), resulting in an error to the
// fetcher. By using the RENAME_EXCHANGE flag it solves our problem, but limits the
// filesystem and operating system.
renameat2(None, current_path, None, final_path, rename_flags)
.map_err(|e| {
make_err!(
Code::NotFound,
"Could not atomically swap files {} and {} in filesystem store {}",
temp_loc,
final_loc,
e
)
})
.err_tip(|| "This could be due to the filesystem not supporting RENAME_EXCHANGE")?;

if let Some(old_item) = self.evicting_map.insert(digest, entry).await {
// At this point `temp_name_num` will be the file containing the old content we
// are going to delete.
old_item.flag_moved_to_temp_file(temp_name_num);
}
Ok(())
}
}
Expand All @@ -328,14 +372,17 @@ impl StoreTrait for FilesystemStore {
reader: DropCloserReadHalf,
_upload_size: UploadSizeInfo,
) -> Result<(), Error> {
let file_name = thread_rng().gen::<u64>().to_string();
let temp_full_path = to_full_path(&self.temp_path, &file_name);
let temp_name_num = thread_rng().gen::<u64>();
let temp_full_path = to_full_path(&self.temp_path, &temp_name_num.to_string());

let mut file = fs::File::create(&temp_full_path)
let mut temp_file = fs::File::create(&temp_full_path)
.await
.err_tip(|| "Failed to create temp file in filesystem store")?;

if let Err(err) = self.update_file(&temp_full_path, &mut file, digest, reader).await {
if let Err(err) = self
.update_file(&temp_full_path, &mut temp_file, temp_name_num, digest, reader)
.await
{
let result = fs::remove_file(temp_full_path)
.await
.err_tip(|| "Failed to delete temp file in filesystem store");
Expand Down Expand Up @@ -364,7 +411,7 @@ impl StoreTrait for FilesystemStore {
.read_file_part(offset as u64, length.unwrap_or(usize::MAX) as u64)
.await?;

let mut buf = BytesMut::with_capacity(length.unwrap_or(DEFAULT_BUFF_SIZE));
let mut buf = BytesMut::with_capacity(length.unwrap_or(self.read_buffer_size));
loop {
file.read_buf(&mut buf)
.await
Expand Down
Loading

0 comments on commit 7ba407d

Please sign in to comment.