diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 3e283874b..b40f64618 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -57,8 +57,10 @@ jobs: (echo "Cargo.toml is out of date. Please run: python ./tools/build_cargo_manifest.py" && exit 1) - name: Compile & test with cargo run: | - docker run --rm -e CC=clang -v $PWD:/root/turbo-cache allada/turbo-cache:test sh -c ' \ - apt-get update && apt-get install cargo -y && \ + docker run --rm -e CC=clang -v $PWD:/root/turbo-cache allada/turbo-cache:test bash -c ' \ + apt update && apt install -y curl libssl-dev gcc pkg-config && \ + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain=1.70.0 && \ + . $HOME/.cargo/env && \ cargo build --all && \ cargo test --all \ ' diff --git a/Cargo.lock b/Cargo.lock index 20f6af8ab..3953f4666 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,6 +53,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "ac_utils_test" +version = "0.0.0" +dependencies = [ + "ac_utils", + "common", + "config", + "error", + "memory_store", + "pretty_assertions", + "rand", + "store", + "tokio", +] + [[package]] name = "action_messages" version = "0.0.0" @@ -1227,6 +1242,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "fs_test" +version = "0.0.0" +dependencies = [ + "common", + "error", + "pretty_assertions", + "rand", + "tokio", +] + [[package]] name = "futures" version = "0.3.28" diff --git a/Cargo.toml b/Cargo.toml index 465cdd5d6..db82b6567 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "gencargo/ac_server", "gencargo/ac_server_test", "gencargo/ac_utils", + "gencargo/ac_utils_test", "gencargo/action_messages", "gencargo/action_messages_test", "gencargo/async_fixed_buffer", @@ -46,6 +47,7 @@ members = [ "gencargo/filesystem_store", "gencargo/filesystem_store_test", "gencargo/fs", + "gencargo/fs_test", "gencargo/gen_protos_tool", "gencargo/grpc_scheduler", "gencargo/grpc_store", @@ -149,6 +151,7 @@ uuid = { version = "1.4.0", features = ["v4"] } ac_server = { path = "gencargo/ac_server" } ac_server_test = { path = "gencargo/ac_server_test" } ac_utils = { path = "gencargo/ac_utils" } +ac_utils_test = { path = "gencargo/ac_utils_test" } action_messages = { path = "gencargo/action_messages" } action_messages_test = { path = "gencargo/action_messages_test" } async_fixed_buffer = { path = "gencargo/async_fixed_buffer" } @@ -182,6 +185,7 @@ fastcdc_test = { path = "gencargo/fastcdc_test" } filesystem_store = { path = "gencargo/filesystem_store" } filesystem_store_test = { path = "gencargo/filesystem_store_test" } fs = { path = "gencargo/fs" } +fs_test = { path = "gencargo/fs_test" } gen_protos_tool = { path = "gencargo/gen_protos_tool" } grpc_scheduler = { path = "gencargo/grpc_scheduler" } grpc_store = { path = "gencargo/grpc_store" } diff --git a/README.md b/README.md index 588187266..5d1674173 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,7 @@ bazel build -c opt //cas These will place an executable in `./bazel-bin/cas/cas` that will start the service. ### Cargo requirements -* Cargo 1.66.0+ +* Cargo 1.70.0+ * `libssl-dev` package installed (ie: `apt install libssl-dev` or `yum install libssl-dev`) #### Cargo building for deployment ```sh diff --git a/cas/cas_main.rs b/cas/cas_main.rs index 566c67ab6..a792aff7a 100644 --- a/cas/cas_main.rs +++ b/cas/cas_main.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use axum::Router; use clap::Parser; @@ -34,7 +35,7 @@ use ac_server::AcServer; use bytestream_server::ByteStreamServer; use capabilities_server::CapabilitiesServer; use cas_server::CasServer; -use common::fs::set_open_file_limit; +use common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; use common::log; use config::cas_server::{CasConfig, CompressionAlgorithm, GlobalConfig, ServerConfig, WorkerConfig}; use default_scheduler_factory::scheduler_factory; @@ -461,14 +462,21 @@ fn main() -> Result<(), Box> { // Note: If the default changes make sure you update the documentation in // `config/cas_server.rs`. const DEFAULT_MAX_OPEN_FILES: usize = 512; + // Note: If the default changes make sure you update the documentation in + // `config/cas_server.rs`. + const DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS: u64 = 1000; let global_cfg = if let Some(global_cfg) = &mut cfg.global { if global_cfg.max_open_files == 0 { global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES; } + if global_cfg.idle_file_descriptor_timeout_millis == 0 { + global_cfg.idle_file_descriptor_timeout_millis = DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS; + } *global_cfg } else { GlobalConfig { max_open_files: DEFAULT_MAX_OPEN_FILES, + idle_file_descriptor_timeout_millis: DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS, disable_metrics: cfg.servers.iter().all(|v| { let Some(service) = &v.services else { return true; @@ -478,6 +486,7 @@ fn main() -> Result<(), Box> { } }; set_open_file_limit(global_cfg.max_open_files); + set_idle_file_descriptor_timeout(Duration::from_millis(global_cfg.idle_file_descriptor_timeout_millis))?; !global_cfg.disable_metrics }; // Override metrics enabled if the environment variable is set. diff --git a/cas/store/BUILD b/cas/store/BUILD index 6597ac504..fe3758d56 100644 --- a/cas/store/BUILD +++ b/cas/store/BUILD @@ -440,3 +440,19 @@ rust_test( ], proc_macro_deps = ["@crate_index//:async-trait"], ) + +rust_test( + name = "ac_utils_test", + srcs = ["tests/ac_utils_test.rs"], + deps = [ + ":ac_utils", + ":memory_store", + ":store", + "//config", + "//util:common", + "//util:error", + "@crate_index//:pretty_assertions", + "@crate_index//:rand", + "@crate_index//:tokio", + ], +) diff --git a/cas/store/ac_utils.rs b/cas/store/ac_utils.rs index cb6aacd4c..cb42067cb 100644 --- a/cas/store/ac_utils.rs +++ b/cas/store/ac_utils.rs @@ -21,10 +21,12 @@ use futures::{future::try_join, Future, FutureExt, TryFutureExt}; use prost::Message; use sha2::{Digest, Sha256}; use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::time::timeout; -use buf_channel::make_buf_channel_pair; -use common::DigestInfo; +use buf_channel::{make_buf_channel_pair, DropCloserWriteHalf}; +use common::{fs, DigestInfo}; use error::{Code, Error, ResultExt}; +use fs::idle_file_descriptor_timeout; use store::{Store, UploadSizeInfo}; // NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than @@ -36,6 +38,9 @@ pub const ESTIMATED_DIGEST_SIZE: usize = 2048; /// to use up all the memory on this machine. const MAX_ACTION_MSG_SIZE: usize = 10 << 20; // 10mb. +/// Default read buffer size for reading from an AsyncReader. +const DEFAULT_READ_BUFF_SIZE: usize = 4096; + /// Attempts to fetch the digest contents from a store into the associated proto. pub async fn get_and_decode_digest( store: Pin<&dyn Store>, @@ -77,30 +82,36 @@ pub async fn serialize_and_upload_message<'a, T: Message>( /// Given a bytestream computes the digest for the data. /// Note: This will happen in a new spawn since computing digests can be thread intensive. -pub fn compute_digest( - mut reader: R, -) -> impl Future> { - tokio::spawn(async move { - const DEFAULT_READ_BUFF_SIZE: usize = 4096; - let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE); - let mut hasher = Sha256::new(); - let mut digest_size = 0; - loop { - reader - .read_buf(&mut chunk) - .await - .err_tip(|| "Could not read chunk during compute_digest")?; - if chunk.is_empty() { - break; // EOF. - } - digest_size += chunk.len(); - hasher.update(&chunk); - chunk.clear(); +pub async fn compute_digest(mut reader: R) -> Result<(DigestInfo, R), Error> { + let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE); + let mut hasher = Sha256::new(); + let mut digest_size = 0; + loop { + reader + .read_buf(&mut chunk) + .await + .err_tip(|| "Could not read chunk during compute_digest")?; + if chunk.is_empty() { + break; // EOF. } + digest_size += chunk.len(); + hasher.update(&chunk); + chunk.clear(); + } - Ok((DigestInfo::new(hasher.finalize().into(), digest_size as i64), reader)) - }) - .map(|r| r.err_tip(|| "Failed to launch spawn")?) + Ok((DigestInfo::new(hasher.finalize().into(), digest_size as i64), reader)) +} + +fn inner_upload_file_to_store<'a, Fut: Future> + 'a>( + cas_store: Pin<&'a dyn Store>, + digest: DigestInfo, + read_data_fn: impl FnOnce(DropCloserWriteHalf) -> Fut, +) -> impl Future> + 'a { + let (tx, rx) = make_buf_channel_pair(); + let upload_file_to_store_fut = cas_store + .update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes as usize)) + .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store")); + try_join(read_data_fn(tx), upload_file_to_store_fut).map_ok(|(_, _)| ()) } /// Uploads data to our store for given digest. @@ -114,13 +125,8 @@ pub fn upload_to_store<'a, R: AsyncRead + Unpin>( digest: DigestInfo, reader: &'a mut R, ) -> impl Future> + 'a { - let (mut tx, rx) = make_buf_channel_pair(); - let upload_to_store_fut = cas_store - .update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes as usize)) - .map(|r| r.err_tip(|| "Could not upload data to store in upload_to_store")); - let read_data_fut = async move { + inner_upload_file_to_store(cas_store, digest, move |mut tx| async move { loop { - const DEFAULT_READ_BUFF_SIZE: usize = 4096; let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE); reader .read_buf(&mut chunk) @@ -137,6 +143,50 @@ pub fn upload_to_store<'a, R: AsyncRead + Unpin>( .await .err_tip(|| "Could not send EOF to store in upload_to_store")?; Ok(()) - }; - try_join(read_data_fut, upload_to_store_fut).map_ok(|(_, _)| ()) + }) +} + +/// Same as `upload_to_store`, however it specializes in dealing with a `ResumeableFileSlot`. +/// This will close the reading file to close if writing the data takes a while. +pub fn upload_file_to_store<'a>( + cas_store: Pin<&'a dyn Store>, + digest: DigestInfo, + mut file_reader: fs::ResumeableFileSlot<'a>, +) -> impl Future> + 'a { + inner_upload_file_to_store(cas_store, digest, move |mut tx| async move { + loop { + let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE); + file_reader + .as_reader() + .await + .err_tip(|| "Could not get reader from file slot in upload_file_to_store")? + .read_buf(&mut chunk) + .await + .err_tip(|| "Could not read chunk during upload_file_to_store")?; + if chunk.is_empty() { + break; // EOF. + } + let send_fut = tx.send(chunk.freeze()); + tokio::pin!(send_fut); + loop { + match timeout(idle_file_descriptor_timeout(), &mut send_fut).await { + Ok(Ok(())) => break, + Ok(Err(err)) => { + return Err(err).err_tip(|| "Could not send buffer data to store in upload_file_to_store") + } + Err(_) => { + file_reader + .close_file() + .await + .err_tip(|| "Could not close file due to timeout in upload_file_to_store")?; + continue; + } + } + } + } + tx.send_eof() + .await + .err_tip(|| "Could not send EOF to store in upload_file_to_store")?; + Ok(()) + }) } diff --git a/cas/store/filesystem_store.rs b/cas/store/filesystem_store.rs index 4291c9130..71201b49a 100644 --- a/cas/store/filesystem_store.rs +++ b/cas/store/filesystem_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ffi::OsString; use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; @@ -23,9 +24,10 @@ use async_trait::async_trait; use bytes::BytesMut; use filetime::{set_file_atime, FileTime}; use futures::stream::{StreamExt, TryStreamExt}; -use futures::Future; -use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom, Take}; +use futures::{Future, TryFutureExt}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::task::spawn_blocking; +use tokio::time::timeout; use tokio_stream::wrappers::ReadDirStream; use buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; @@ -69,13 +71,13 @@ pub struct EncodedFilePath { impl EncodedFilePath { #[inline] - fn get_file_path(&self) -> String { + fn get_file_path(&self) -> OsString { get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.digest) } } #[inline] -fn get_file_path_raw(path_type: &PathType, shared_context: &SharedContext, digest: &DigestInfo) -> String { +fn get_file_path_raw(path_type: &PathType, shared_context: &SharedContext, digest: &DigestInfo) -> OsString { let folder = match path_type { PathType::Content => &shared_context.content_path, PathType::Temp => &shared_context.temp_path, @@ -95,12 +97,12 @@ impl Drop for EncodedFilePath { let shared_context = self.shared_context.clone(); shared_context.active_drop_spawns.fetch_add(1, Ordering::Relaxed); tokio::spawn(async move { - log::info!("\x1b[0;31mFilesystem Store\x1b[0m: Deleting: {}", &file_path); + log::info!("\x1b[0;31mFilesystem Store\x1b[0m: Deleting: {:?}", file_path); let result = fs::remove_file(&file_path) .await - .err_tip(|| format!("Failed to remove file {}", file_path)); + .err_tip(|| format!("Failed to remove file {:?}", file_path)); if let Err(err) = result { - log::warn!("\x1b[0;31mFilesystem Store\x1b[0m: {:?}", err); + log::info!("\x1b[0;31mFilesystem Store\x1b[0m: {:?}", err); } shared_context.active_drop_spawns.fetch_sub(1, Ordering::Relaxed); }); @@ -108,8 +110,8 @@ impl Drop for EncodedFilePath { } #[inline] -fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> String { - format!("{}/{}-{}", folder, digest.hash_str(), digest.size_bytes) +fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString { + format!("{}/{}-{}", folder, digest.hash_str(), digest.size_bytes).into() } #[async_trait] @@ -120,7 +122,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static { /// Creates a (usually) temp file, opens it and returns the path to the temp file. async fn make_and_open_file( encoded_file_path: EncodedFilePath, - ) -> Result<(Self, fs::FileSlot<'static>, String), Error> + ) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error> where Self: Sized; @@ -131,7 +133,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static { fn get_encoded_file_path(&self) -> &RwLock; /// Returns a reader that will read part of the underlying file. - async fn read_file_part(&self, offset: u64, length: u64) -> Result>, Error>; + async fn read_file_part<'a>(&'a self, offset: u64, length: u64) -> Result, Error>; /// This function is a safe way to extract the file name of the underlying file. To protect users from /// accidentally creating undefined behavior we encourage users to do the logic they need to do with @@ -139,7 +141,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static { /// This is because the filename is not guaranteed to exist after this function returns, however inside /// the callback the file is always guaranteed to exist and immutable. /// DO NOT USE THIS FUNCTION TO EXTRACT THE FILENAME AND STORE IT FOR LATER USE. - async fn get_file_path_locked> + Send, F: FnOnce(String) -> Fut + Send>( + async fn get_file_path_locked> + Send, F: FnOnce(OsString) -> Fut + Send>( &self, handler: F, ) -> Result; @@ -170,34 +172,29 @@ impl FileEntry for FileEntryImpl { /// try to cleanup the file as well during drop(). async fn make_and_open_file( encoded_file_path: EncodedFilePath, - ) -> Result<(FileEntryImpl, fs::FileSlot<'static>, String), Error> { + ) -> Result<(FileEntryImpl, fs::ResumeableFileSlot<'static>, OsString), Error> { let temp_full_path = encoded_file_path.get_file_path(); - let temp_file_result = fs::create_file(&temp_full_path) - .await - .err_tip(|| format!("Failed to create {} in filesystem store", temp_full_path)); - - match temp_file_result { - Ok(file) => { - Ok(( - ::create( - 0, /* Unknown yet, we will fill it in later */ - RwLock::new(encoded_file_path), - ), - file, - temp_full_path, - )) - } - Err(mut err) => { + let temp_file_result = fs::create_file(temp_full_path.clone()) + .or_else(|mut err| async { let remove_result = fs::remove_file(&temp_full_path) .await - .err_tip(|| format!("Failed to remove file {} in filesystem store", temp_full_path)); + .err_tip(|| format!("Failed to remove file {:?} in filesystem store", temp_full_path)); if let Err(remove_err) = remove_result { err = err.merge(remove_err); } log::warn!("\x1b[0;31mFilesystem Store\x1b[0m: {:?}", err); - Err(err) - } - } + Err(err).err_tip(|| format!("Failed to create {:?} in filesystem store", temp_full_path)) + }) + .await?; + + Ok(( + ::create( + 0, /* Unknown yet, we will fill it in later */ + RwLock::new(encoded_file_path), + ), + temp_file_result, + temp_full_path, + )) } fn get_file_size(&mut self) -> &mut u64 { @@ -208,23 +205,31 @@ impl FileEntry for FileEntryImpl { &self.encoded_file_path } - async fn read_file_part(&self, offset: u64, length: u64) -> Result>, Error> { + async fn read_file_part<'a>(&'a self, offset: u64, length: u64) -> Result, Error> { let (mut file, full_content_path_for_debug_only) = self .get_file_path_locked(|full_content_path| async move { - let file = fs::open_file(&full_content_path) + let file = fs::open_file(full_content_path.clone(), length) .await - .err_tip(|| format!("Failed to open file in filesystem store {}", full_content_path))?; + .err_tip(|| format!("Failed to open file in filesystem store {:?}", full_content_path))?; Ok((file, full_content_path)) }) .await?; - file.seek(SeekFrom::Start(offset)) + file.as_reader() .await - .err_tip(|| format!("Failed to seek file: {}", full_content_path_for_debug_only))?; - Ok(file.take(length)) + .err_tip(|| "Could not seek file in read_file_part()")? + .get_mut() + .seek(SeekFrom::Start(offset)) + .await + .err_tip(|| format!("Failed to seek file: {:?}", full_content_path_for_debug_only))?; + Ok(file) } - async fn get_file_path_locked> + Send, F: FnOnce(String) -> Fut + Send>( + async fn get_file_path_locked< + T, + Fut: Future> + Send, + F: FnOnce(OsString) -> Fut + Send, + >( &self, handler: F, ) -> Result { @@ -264,7 +269,7 @@ impl LenEntry for FileEntryImpl { .get_file_path_locked(move |full_content_path| async move { spawn_blocking(move || { set_file_atime(&full_content_path, FileTime::now()) - .err_tip(|| format!("Failed to touch file in filesystem store {}", full_content_path)) + .err_tip(|| format!("Failed to touch file in filesystem store {:?}", full_content_path)) }) .await .map_err(|e| { @@ -301,13 +306,18 @@ impl LenEntry for FileEntryImpl { let to_path = to_full_path_from_digest(&encoded_file_path.shared_context.temp_path, &new_digest); log::info!( - "\x1b[0;31mFilesystem Store\x1b[0m: Unref {}, moving file {} to {}", + "\x1b[0;31mFilesystem Store\x1b[0m: Unref {}, moving file {:?} to {:?}", encoded_file_path.digest.hash_str(), - &from_path, - &to_path + from_path, + to_path ); if let Err(err) = fs::rename(&from_path, &to_path).await { - log::warn!("Failed to rename file from {} to {} : {:?}", from_path, to_path, err); + log::warn!( + "Failed to rename file from {:?} to {:?} : {:?}", + from_path, + to_path, + err + ); } else { encoded_file_path.path_type = PathType::Temp; encoded_file_path.digest = new_digest; @@ -476,36 +486,49 @@ impl FilesystemStore { } async fn update_file<'a>( - self: Pin<&Self>, + self: Pin<&'a Self>, mut entry: Fe, - mut temp_file: fs::FileSlot<'a>, + mut resumeable_temp_file: fs::ResumeableFileSlot<'a>, final_digest: DigestInfo, mut reader: DropCloserReadHalf, ) -> Result<(), Error> { let mut file_size = 0; loop { - let mut data = reader - .recv() - .await - .err_tip(|| "Failed to receive data in filesystem store")?; + let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).await else { + // In the event we timeout, we want to close the writing file, to prevent + // the file descriptor left open for long periods of time. + // This is needed because we wrap `fs` so only a fixed number of file + // descriptors may be open at any given time. If we are streaming from + // File -> File, it can cause a deadlock if the Write file is not sending + // data because it is waiting for a file descriotor to open before sending data. + resumeable_temp_file.close_file().await.err_tip(|| "Could not close file due to timeout in FileSystemStore::update_file")?; + continue; + }; + let mut data = data_result.err_tip(|| "Failed to receive data in filesystem store")?; let data_len = data.len(); if data_len == 0 { break; // EOF. } - temp_file + resumeable_temp_file + .as_writer() + .await + .err_tip(|| "in filesystem_store::update_file")? .write_all_buf(&mut data) .await .err_tip(|| "Failed to write data into filesystem store")?; file_size += data_len as u64; } - temp_file + resumeable_temp_file + .as_writer() + .await + .err_tip(|| "in filesystem_store::update_file")? .as_ref() .sync_all() .await .err_tip(|| "Failed to sync_data in filesystem store")?; - drop(temp_file); + drop(resumeable_temp_file); *entry.get_file_size() = file_size; let entry = Arc::new(entry); @@ -539,7 +562,7 @@ impl FilesystemStore { let result = fs::rename(encoded_file_path.get_file_path(), &final_path) .await - .err_tip(|| format!("Failed to rename temp file to final path {}", final_path)); + .err_tip(|| format!("Failed to rename temp file to final path {:?}", final_path)); // In the event our move from temp file to final file fails we need to ensure we remove // the entry from our map. @@ -593,7 +616,7 @@ impl StoreTrait for FilesystemStore { self.update_file(entry, temp_file, digest, reader) .await - .err_tip(|| format!("While processing with temp file {}", temp_full_path)) + .err_tip(|| format!("While processing with temp file {:?}", temp_full_path)) } async fn get_part( @@ -608,22 +631,42 @@ impl StoreTrait for FilesystemStore { .get(&digest) .await .ok_or_else(|| make_err!(Code::NotFound, "not found in filesystem store"))?; - let mut file = entry - .read_file_part(offset as u64, length.unwrap_or(usize::MAX) as u64) - .await?; + let read_limit = length.unwrap_or(usize::MAX) as u64; + let mut resumeable_temp_file = entry.read_file_part(offset as u64, read_limit).await?; let mut buf = BytesMut::with_capacity(length.unwrap_or(self.read_buffer_size)); loop { - file.read_buf(&mut buf) + resumeable_temp_file + .as_reader() + .await + .err_tip(|| "In FileSystemStore::get_part()")? + .read_buf(&mut buf) .await .err_tip(|| "Failed to read data in filesystem store")?; if buf.is_empty() { break; // EOF. } - writer - .send(buf.split().freeze()) - .await - .err_tip(|| "Failed to send chunk in filesystem store get_part")?; + // In the event it takes a while to send the data to the client, we want to close the + // reading file, to prevent the file descriptor left open for long periods of time. + // Failing to do so might cause deadlocks if the receiver is unable to receive data + // because it is waiting for a file descriptor to open before receiving data. + // Using `ResumeableFileSlot` will re-open the file in the event it gets closed on the + // next iteration. + loop { + match timeout(fs::idle_file_descriptor_timeout(), writer.send(buf.split().freeze())).await { + Ok(Ok(())) => break, + Ok(Err(err)) => { + return Err(err).err_tip(|| "Failed to send chunk in filesystem store get_part"); + } + Err(_) => { + resumeable_temp_file + .close_file() + .await + .err_tip(|| "Could not close file due to timeout in FileSystemStore::get_part")?; + continue; + } + } + } } writer .send_eof() diff --git a/cas/store/tests/ac_utils_test.rs b/cas/store/tests/ac_utils_test.rs new file mode 100644 index 000000000..f56cfcfef --- /dev/null +++ b/cas/store/tests/ac_utils_test.rs @@ -0,0 +1,81 @@ +// Copyright 2023 The Turbo Cache Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use rand::{thread_rng, Rng}; +use std::env; +use std::ffi::OsString; +use std::pin::Pin; +use std::sync::Arc; + +use tokio::io::AsyncWriteExt; + +use ac_utils::upload_file_to_store; +use common::{fs, DigestInfo}; +use error::{Error, ResultExt}; +use memory_store::MemoryStore; +use store::Store; + +/// Get temporary path from either `TEST_TMPDIR` or best effort temp directory if +/// not set. +async fn make_temp_path(data: &str) -> OsString { + let dir = format!( + "{}/{}", + env::var("TEST_TMPDIR").unwrap_or(env::temp_dir().to_str().unwrap().to_string()), + thread_rng().gen::(), + ); + fs::create_dir_all(&dir).await.unwrap(); + OsString::from(format!("{}/{}", dir, data)) +} + +#[cfg(test)] +mod ac_utils_tests { + use super::*; + use pretty_assertions::assert_eq; // Must be declared in every module. + + const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef"; + const HASH1_SIZE: i64 = 147; + + // Regression test for bug created when implementing ResumeableFileSlot + // where the timeout() success condition was breaking out of the outer + // loop resulting in the file always being created with <= 4096 bytes. + #[tokio::test] + async fn upload_file_to_store_with_large_file() -> Result<(), Error> { + let filepath = make_temp_path("test.txt").await; + let expected_data = vec![0x88; 1024 * 1024]; // 1MB. + let store = Arc::new(MemoryStore::new(&config::stores::MemoryStore::default())); + let store_pin = Pin::new(store.as_ref()); + let digest = DigestInfo::try_new(HASH1, HASH1_SIZE)?; // Dummy hash data. + { + // Write 1MB of 0x88s to the file. + let mut file = tokio::fs::File::create(&filepath) + .await + .err_tip(|| "Could not open file")?; + file.write_all(&expected_data) + .await + .err_tip(|| "Could not write to file")?; + } + { + // Upload our file. + let resumeable_file = fs::open_file(filepath, u64::MAX).await?; + upload_file_to_store(store_pin, digest, resumeable_file).await?; + } + { + // Check to make sure the file was saved correctly to the store. + let store_data = store_pin.get_part_unchunked(digest, 0, None, None).await?; + assert_eq!(store_data.len(), expected_data.len()); + assert_eq!(store_data, expected_data); + } + Ok(()) + } +} diff --git a/cas/store/tests/filesystem_store_test.rs b/cas/store/tests/filesystem_store_test.rs index 4e03bfb8d..cbb941bf0 100644 --- a/cas/store/tests/filesystem_store_test.rs +++ b/cas/store/tests/filesystem_store_test.rs @@ -14,6 +14,7 @@ use std::cell::RefCell; use std::env; +use std::ffi::{OsStr, OsString}; use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; use std::ops::DerefMut; @@ -33,7 +34,7 @@ use futures::task::Poll; use futures::{poll, Future}; use lazy_static::lazy_static; use rand::{thread_rng, Rng}; -use tokio::io::{AsyncReadExt, AsyncWriteExt, Take}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::Barrier; use tokio_stream::{wrappers::ReadDirStream, StreamExt}; use traits::UploadSizeInfo; @@ -71,7 +72,7 @@ impl FileEntry for TestFileEntry< async fn make_and_open_file( encoded_file_path: EncodedFilePath, - ) -> Result<(Self, fs::FileSlot<'static>, String), Error> { + ) -> Result<(Self, fs::ResumeableFileSlot<'static>, OsString), Error> { let (inner, file_slot, path) = FileEntryImpl::make_and_open_file(encoded_file_path).await?; Ok(( Self { @@ -91,11 +92,15 @@ impl FileEntry for TestFileEntry< self.inner.as_ref().unwrap().get_encoded_file_path() } - async fn read_file_part(&self, offset: u64, length: u64) -> Result>, Error> { + async fn read_file_part<'a>(&'a self, offset: u64, length: u64) -> Result, Error> { self.inner.as_ref().unwrap().read_file_part(offset, length).await } - async fn get_file_path_locked> + Send, F: FnOnce(String) -> Fut + Send>( + async fn get_file_path_locked< + T, + Fut: Future> + Send, + F: FnOnce(OsString) -> Fut + Send, + >( &self, handler: F, ) -> Result { @@ -159,22 +164,24 @@ fn make_temp_path(data: &str) -> String { ) } -async fn read_file_contents(file_name: &str) -> Result, Error> { - let mut file = fs::open_file(&file_name) +async fn read_file_contents(file_name: &OsStr) -> Result, Error> { + let mut file = fs::open_file(file_name, u64::MAX) .await - .err_tip(|| format!("Failed to open file: {}", file_name))?; + .err_tip(|| format!("Failed to open file: {file_name:?}"))?; let mut data = vec![]; - file.read_to_end(&mut data) + file.as_reader() + .await? + .read_to_end(&mut data) .await .err_tip(|| "Error reading file to end")?; Ok(data) } -async fn write_file(file_name: &str, data: &[u8]) -> Result<(), Error> { - let mut file = fs::create_file(&file_name) +async fn write_file(file_name: &OsStr, data: &[u8]) -> Result<(), Error> { + let mut file = fs::create_file(file_name) .await - .err_tip(|| format!("Failed to create file: {}", file_name))?; - Ok(file.write_all(data).await?) + .err_tip(|| format!("Failed to create file: {file_name:?}"))?; + Ok(file.as_writer().await?.write_all(data).await?) } #[cfg(test)] @@ -261,7 +268,12 @@ mod filesystem_store_tests { // Insert data into store. store.as_ref().update_oneshot(digest1, VALUE1.into()).await?; - let expected_file_name = format!("{}/{}-{}", content_path, digest1.hash_str(), digest1.size_bytes); + let expected_file_name = OsString::from(format!( + "{}/{}-{}", + content_path, + digest1.hash_str(), + digest1.size_bytes + )); { // Check to ensure our file exists where it should and content matches. let data = read_file_contents(&expected_file_name).await?; @@ -362,7 +374,7 @@ mod filesystem_store_tests { while let Some(temp_dir_entry) = read_dir_stream.next().await { num_files += 1; let path = temp_dir_entry?.path(); - let data = read_file_contents(path.to_str().unwrap()).await?; + let data = read_file_contents(path.as_os_str()).await?; assert_eq!(&data[..], VALUE1.as_bytes(), "Expected file content to match"); } assert_eq!(num_files, 1, "There should only be one file in the temp directory"); @@ -463,7 +475,7 @@ mod filesystem_store_tests { while let Some(temp_dir_entry) = read_dir_stream.next().await { num_files += 1; let path = temp_dir_entry?.path(); - let data = read_file_contents(path.to_str().unwrap()).await?; + let data = read_file_contents(path.as_os_str()).await?; assert_eq!(&data[..], VALUE1.as_bytes(), "Expected file content to match"); } assert_eq!(num_files, 1, "There should only be one file in the temp directory"); @@ -551,8 +563,18 @@ mod filesystem_store_tests { fs::create_dir_all(&content_path).await?; // Make the two files on disk before loading the store. - let file1 = format!("{}/{}-{}", content_path, digest1.hash_str(), digest1.size_bytes); - let file2 = format!("{}/{}-{}", content_path, digest2.hash_str(), digest2.size_bytes); + let file1 = OsString::from(format!( + "{}/{}-{}", + content_path, + digest1.hash_str(), + digest1.size_bytes + )); + let file2 = OsString::from(format!( + "{}/{}-{}", + content_path, + digest2.hash_str(), + digest2.size_bytes + )); write_file(&file1, VALUE1.as_bytes()).await?; write_file(&file2, VALUE2.as_bytes()).await?; set_file_atime(&file1, FileTime::from_unix_time(0, 0))?; @@ -651,7 +673,7 @@ mod filesystem_store_tests { // The file contents should equal our initial data. let mut reader = file_entry.read_file_part(0, u64::MAX).await?; let mut file_contents = String::new(); - reader.read_to_string(&mut file_contents).await?; + reader.as_reader().await?.read_to_string(&mut file_contents).await?; assert_eq!(file_contents, VALUE1); } @@ -662,7 +684,7 @@ mod filesystem_store_tests { // The file contents still equal our old data. let mut reader = file_entry.read_file_part(0, u64::MAX).await?; let mut file_contents = String::new(); - reader.read_to_string(&mut file_contents).await?; + reader.as_reader().await?.read_to_string(&mut file_contents).await?; assert_eq!(file_contents, VALUE1); } @@ -776,11 +798,11 @@ mod filesystem_store_tests { let dir_entry = dir_entry?; { // Some filesystems won't sync automatically, so force it. - let file_handle = fs::open_file(dir_entry.path().to_str().unwrap()) + let mut file_handle = fs::open_file(dir_entry.path().into_os_string(), u64::MAX) .await .err_tip(|| "Failed to open temp file")?; // We don't care if it fails, this is only best attempt. - let _ = file_handle.as_ref().sync_all().await; + let _ = file_handle.as_reader().await?.get_ref().as_ref().sync_all().await; } // Ensure we have written to the file too. This ensures we have an open file handle. // Failing to do this may result in the file existing, but the `update_fut` not actually diff --git a/cas/worker/running_actions_manager.rs b/cas/worker/running_actions_manager.rs index 54142bd0f..5ef4e74fd 100644 --- a/cas/worker/running_actions_manager.rs +++ b/cas/worker/running_actions_manager.rs @@ -14,6 +14,7 @@ use std::collections::{vec_deque::VecDeque, HashMap}; use std::ffi::OsStr; +use std::ffi::OsString; use std::fmt::Debug; #[cfg(target_family = "unix")] use std::fs::Permissions; @@ -38,11 +39,13 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::process; use tokio::sync::oneshot; use tokio::task::spawn_blocking; +use tokio::time::timeout; use tokio_stream::wrappers::ReadDirStream; use tonic::Request; use ac_utils::{ - compute_digest, get_and_decode_digest, serialize_and_upload_message, upload_to_store, ESTIMATED_DIGEST_SIZE, + compute_digest, get_and_decode_digest, serialize_and_upload_message, upload_file_to_store, upload_to_store, + ESTIMATED_DIGEST_SIZE, }; use action_messages::{ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, SymlinkInfo}; use async_trait::async_trait; @@ -178,7 +181,7 @@ pub fn download_to_directory<'a>( } #[cfg(target_family = "windows")] -async fn is_executable(_file_handle: &fs::FileSlot<'_>, _full_path: &impl AsRef) -> Result { +async fn is_executable(_file_handle: &fs::FileSlot, _full_path: &impl AsRef) -> Result { static EXECUTABLE_EXTENSIONS: &[&str] = &["exe", "bat", "com"]; Ok(EXECUTABLE_EXTENSIONS .iter() @@ -186,7 +189,7 @@ async fn is_executable(_file_handle: &fs::FileSlot<'_>, _full_path: &impl AsRef< } #[cfg(target_family = "unix")] -async fn is_executable(file_handle: &fs::FileSlot<'_>, full_path: &impl AsRef) -> Result { +async fn is_executable(file_handle: &fs::FileSlot, full_path: &impl AsRef) -> Result { let metadata = file_handle .as_ref() .metadata() @@ -195,16 +198,40 @@ async fn is_executable(file_handle: &fs::FileSlot<'_>, full_path: &impl AsRef( - file_handle: fs::FileSlot<'static>, - cas_store: Pin<&'a dyn Store>, +async fn upload_file( + mut resumeable_file: fs::ResumeableFileSlot<'static>, + cas_store: Pin<&dyn Store>, full_path: impl AsRef + Debug, ) -> Result { - let (digest, mut file_handle) = compute_digest(file_handle) + let (digest, is_executable, resumeable_file) = { + let (digest, mut resumeable_file) = JoinHandleDropGuard::new(tokio::spawn(async move { + let file_handle = resumeable_file + .as_reader() + .await + .err_tip(|| "Could not get reader from file slot in RunningActionsManager::upload_file()")?; + let digest = compute_digest(file_handle).await?.0; + Ok::<_, Error>((digest, resumeable_file)) + })) .await + .err_tip(|| "Failed to launch spawn")? .err_tip(|| format!("for {full_path:?}"))?; - file_handle.rewind().await.err_tip(|| "Could not rewind file")?; - upload_to_store(cas_store, digest, &mut file_handle) + + // Sadly we to reaquire a `reader` from `resumeable_file` because `tokio::spawn` requires an owned + // version of the struct. Luckily acquireing a reader is cheap, but this code was dirtied up a bit + // from this. + let file_handle = resumeable_file + .as_reader() + .await + .err_tip(|| "Could not get reader from file slot in RunningActionsManager::upload_file()")?; + let is_executable = is_executable(file_handle.get_ref(), &full_path).await?; + file_handle + .get_mut() + .rewind() + .await + .err_tip(|| "Could not rewind file")?; + (digest, is_executable, resumeable_file) + }; + upload_file_to_store(cas_store, digest, resumeable_file) .await .err_tip(|| format!("for {full_path:?}"))?; @@ -216,8 +243,6 @@ async fn upload_file<'a>( .err_tip(|| make_err!(Code::Internal, "Could not convert {:?} to string", full_path))? .to_string(); - let is_executable = is_executable(&file_handle, &full_path).await?; - Ok(FileInfo { name_or_path: NameOrPath::Name(name), digest, @@ -323,10 +348,10 @@ fn upload_directory<'a, P: AsRef + Debug + Send + Sync + Clone + 'a>( ); } else if file_type.is_file() { file_futures.push(async move { - let file_handle = fs::open_file(&full_path) + let file_handle = fs::open_file(full_path.as_os_str().to_os_string(), u64::MAX) .await .err_tip(|| format!("Could not open file {full_path:?}"))?; - upload_file(file_handle, cas_store, full_path) + upload_file(file_handle, cas_store, &full_path) .map_ok(|v| v.into()) .await }); @@ -573,7 +598,7 @@ impl RunningActionImpl { #[cfg(target_family = "windows")] let envs = { let mut envs = command_proto.environment_variables.clone(); - if envs.iter().find(|v| v.name == "SystemRoot").is_none() { + if envs.iter().any(|v| v.name == "SystemRoot") { envs.push( proto::build::bazel::remote::execution::v2::command::EnvironmentVariable { name: "SystemRoot".to_string(), @@ -745,29 +770,42 @@ impl RunningActionImpl { output_paths.append(&mut command_proto.output_directories); } for entry in output_paths { - let full_path = format!("{}/{}", self.work_directory, entry); + let full_path = OsString::from(format!("{}/{}", self.work_directory, entry)); let work_directory = &self.work_directory; output_path_futures.push(async move { let metadata = { - let file_handle = match fs::open_file(&full_path).await { - Ok(handle) => handle, + let mut resumeable_file = match fs::open_file(full_path.clone(), u64::MAX).await { + Ok(file) => file, Err(e) => { if e.code == Code::NotFound { // In the event our output does not exist, according to the bazel remote // execution spec, we simply ignore it continue. return Result::::Ok(OutputType::None); } - return Err(e).err_tip(|| format!("Could not open file {full_path}")); + return Err(e).err_tip(|| format!("Could not open file {full_path:?}")); } }; // We cannot rely on the file_handle's metadata, because it follows symlinks, so // we need to instead use `symlink_metadata`. - let metadata = fs::symlink_metadata(&full_path) - .await - .err_tip(|| format!("While querying symlink metadata for {entry}"))?; + let metadata_fut = fs::symlink_metadata(&full_path); + tokio::pin!(metadata_fut); + + // Just in case we are starved for open file descriptors, we timeout the metadata + // call and close the file, then try again. + let metadata = match timeout(fs::idle_file_descriptor_timeout(), &mut metadata_fut).await { + Ok(result) => result, + Err(_) => { + resumeable_file + .close_file() + .await + .err_tip(|| "In inner_upload_results()")?; + (&mut metadata_fut).await + } + } + .err_tip(|| format!("While querying symlink metadata for {entry}"))?; if metadata.is_file() { return Ok(OutputType::File( - upload_file(file_handle, cas_store, full_path) + upload_file(resumeable_file, cas_store, &full_path) .await .map(|mut file_info| { file_info.name_or_path = NameOrPath::Path(entry); @@ -813,7 +851,7 @@ impl RunningActionImpl { Err(e) => { if e.code != Code::NotFound { return Err(e) - .err_tip(|| format!("While querying target symlink metadata for {full_path}")); + .err_tip(|| format!("While querying target symlink metadata for {full_path:?}")); } // If the file doesn't exist, we consider it a file. Even though the // file doesn't exist we still need to populate an entry. @@ -823,8 +861,7 @@ impl RunningActionImpl { } else { Err(make_err!( Code::Internal, - "{} was not a file, folder or symlink. Must be one.", - full_path + "{full_path:?} was not a file, folder or symlink. Must be one.", )) } }); diff --git a/cas/worker/tests/local_worker_test.rs b/cas/worker/tests/local_worker_test.rs index b4f80450e..d74991449 100644 --- a/cas/worker/tests/local_worker_test.rs +++ b/cas/worker/tests/local_worker_test.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::env; +use std::ffi::OsString; #[cfg(target_family = "unix")] use std::fs::Permissions; #[cfg(target_family = "unix")] @@ -357,8 +358,8 @@ mod local_worker_tests { let ac_store = Arc::new(MemoryStore::new(&config::stores::MemoryStore::default())); let work_directory = make_temp_path("foo"); fs::create_dir_all(format!("{}/{}", work_directory, "another_dir")).await?; - let mut file = fs::create_file(format!("{}/{}", work_directory, "foo.txt")).await?; - file.write_all(b"Hello, world!").await?; + let mut file = fs::create_file(OsString::from(format!("{}/{}", work_directory, "foo.txt"))).await?; + file.as_writer().await?.write_all(b"Hello, world!").await?; new_local_worker( Arc::new(LocalWorkerConfig { work_directory: work_directory.clone(), @@ -386,16 +387,16 @@ mod local_worker_tests { #[cfg(target_family = "unix")] let precondition_script = { let precondition_script = format!("{}/precondition.sh", temp_path); - let mut file = fs::create_file(precondition_script.clone()).await?; - file.write_all(b"#!/bin/sh\nexit 1\n").await?; + let mut file = fs::create_file(OsString::from(&precondition_script)).await?; + file.as_writer().await?.write_all(b"#!/bin/sh\nexit 1\n").await?; fs::set_permissions(&precondition_script, Permissions::from_mode(0o777)).await?; precondition_script }; #[cfg(target_family = "windows")] let precondition_script = { let precondition_script = format!("{}/precondition.bat", temp_path); - let mut file = fs::create_file(precondition_script.clone()).await?; - file.write_all(b"@echo off\r\nexit 1").await?; + let mut file = fs::create_file(OsString::from(&precondition_script)).await?; + file.as_writer().await?.write_all(b"@echo off\r\nexit 1").await?; precondition_script }; let local_worker_config = LocalWorkerConfig { diff --git a/cas/worker/tests/running_actions_manager_test.rs b/cas/worker/tests/running_actions_manager_test.rs index 743e16d7f..973599a43 100644 --- a/cas/worker/tests/running_actions_manager_test.rs +++ b/cas/worker/tests/running_actions_manager_test.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::env; +use std::ffi::OsString; #[cfg(target_family = "unix")] use std::fs::Permissions; use std::io::Cursor; @@ -1003,11 +1004,13 @@ exit 0 let test_wrapper_dir = make_temp_path("wrapper_dir"); fs::create_dir_all(&test_wrapper_dir).await?; #[cfg(target_family = "unix")] - let test_wrapper_script = test_wrapper_dir + "/test_wrapper_script.sh"; + let test_wrapper_script = OsString::from(test_wrapper_dir + "/test_wrapper_script.sh"); #[cfg(target_family = "windows")] - let test_wrapper_script = test_wrapper_dir + "\\test_wrapper_script.bat"; + let test_wrapper_script = OsString::from(test_wrapper_dir + "\\test_wrapper_script.bat"); let mut test_wrapper_script_handle = fs::create_file(&test_wrapper_script).await?; test_wrapper_script_handle + .as_writer() + .await? .write_all(TEST_WRAPPER_SCRIPT_CONTENT.as_bytes()) .await?; #[cfg(target_family = "unix")] diff --git a/config/cas_server.rs b/config/cas_server.rs index 633f4bb8f..50d3f9281 100644 --- a/config/cas_server.rs +++ b/config/cas_server.rs @@ -341,6 +341,21 @@ pub struct GlobalConfig { #[serde(deserialize_with = "convert_numeric_with_shellexpand")] pub max_open_files: usize, + /// If a file descriptor is idle for this many milliseconds, it will be closed. + /// In the event a client or store takes a long time to send or receive data + /// the file descriptor will be closed, and since `max_open_files` blocks new + /// open_file requests until a slot opens up, it will allow new requests to be + /// processed. If a read or write is attempted on a closed file descriptor, the + /// file will be reopened and the operation will continue. + /// + /// On services where worker(s) and scheduler(s) live in the same process, this + /// also prevents deadlocks if a file->file copy is happening, but cannot open + /// a new file descriptor because the limit has been reached. + /// + /// Default: 1000 (1 second) + #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")] + pub idle_file_descriptor_timeout_millis: u64, + /// This flag can be used to prevent metrics from being collected at runtime. /// Metrics are still able to be collected, but this flag prevents metrics that /// are collected at runtime (performance metrics) from being tallied. The diff --git a/gencargo/ac_utils_test/Cargo.toml b/gencargo/ac_utils_test/Cargo.toml new file mode 100644 index 000000000..671f7b255 --- /dev/null +++ b/gencargo/ac_utils_test/Cargo.toml @@ -0,0 +1,32 @@ +# This file is automatically generated from `tools/build_cargo_manifest.py`. +# If you want to add a dependency add it to `tools/cargo_shared.bzl` +# then run `python tools/build_cargo_manifest.py`. +# Do not edit this file directly. + +[package] +name = "ac_utils_test" +version = "0.0.0" +edition = "2021" +autobins = false +autoexamples = false +autotests = false +autobenches = false + +[[test]] +name = "ac_utils_test" +path = "../../cas/store/tests/ac_utils_test.rs" +# TODO(allada) We should support doctests. +doctest = false + +[dependencies] +pretty_assertions = { workspace = true } +rand = { workspace = true } +tokio = { workspace = true } + +# Local libraries. +ac_utils = { workspace = true } +memory_store = { workspace = true } +store = { workspace = true } +config = { workspace = true } +common = { workspace = true } +error = { workspace = true } diff --git a/gencargo/fs_test/Cargo.toml b/gencargo/fs_test/Cargo.toml new file mode 100644 index 000000000..19d28be34 --- /dev/null +++ b/gencargo/fs_test/Cargo.toml @@ -0,0 +1,28 @@ +# This file is automatically generated from `tools/build_cargo_manifest.py`. +# If you want to add a dependency add it to `tools/cargo_shared.bzl` +# then run `python tools/build_cargo_manifest.py`. +# Do not edit this file directly. + +[package] +name = "fs_test" +version = "0.0.0" +edition = "2021" +autobins = false +autoexamples = false +autotests = false +autobenches = false + +[[test]] +name = "fs_test" +path = "../../util/tests/fs_test.rs" +# TODO(allada) We should support doctests. +doctest = false + +[dependencies] +pretty_assertions = { workspace = true } +rand = { workspace = true } +tokio = { workspace = true } + +# Local libraries. +common = { workspace = true } +error = { workspace = true } diff --git a/util/BUILD b/util/BUILD index d73c84d5c..2fb1817ed 100644 --- a/util/BUILD +++ b/util/BUILD @@ -233,3 +233,15 @@ rust_test( "@crate_index//:tokio", ], ) + +rust_test( + name = "fs_test", + srcs = ["tests/fs_test.rs"], + deps = [ + ":common", + ":error", + "@crate_index//:rand", + "@crate_index//:pretty_assertions", + "@crate_index//:tokio", + ], +) diff --git a/util/buf_channel.rs b/util/buf_channel.rs index bdbd37134..d91f22b49 100644 --- a/util/buf_channel.rs +++ b/util/buf_channel.rs @@ -136,7 +136,7 @@ impl Drop for DropCloserWriteHalf { /// This will notify the reader of an error if we did not send an EOF. fn drop(&mut self) { if tokio::runtime::Handle::try_current().is_err() { - println!("No tokio runtime active. Tx was dropped but can't send error."); + eprintln!("No tokio runtime active. Tx was dropped but can't send error."); return; // Cant send error, no runtime. } if let Some(tx) = self.tx.take() { diff --git a/util/fs.rs b/util/fs.rs index bd63ffb7e..68e797437 100644 --- a/util/fs.rs +++ b/util/fs.rs @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; +use std::ffi::OsStr; use std::fs::Metadata; use std::io::IoSlice; use std::path::{Path, PathBuf}; use std::pin::Pin; -use std::task::Context; -use std::task::Poll; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::OnceLock; +use std::task::{Context, Poll}; +use std::time::Duration; use error::{make_err, Code, Error, ResultExt}; -use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, ReadBuf, SeekFrom, Take}; use tokio::sync::{Semaphore, SemaphorePermit}; /// We wrap all tokio::fs items in our own wrapper so we can limit the number of outstanding @@ -28,26 +32,119 @@ use tokio::sync::{Semaphore, SemaphorePermit}; /// issues. pub use tokio::fs::DirEntry; +type StreamPosition = u64; +type BytesRemaining = u64; + +#[derive(Debug)] +enum MaybeFileSlot { + Open(Take), + Closed((StreamPosition, BytesRemaining)), +} + +/// A wrapper around a generic FileSlot. This gives us the ability to +/// close a file and then resume it later. Specifically useful for cases +/// piping data from one location to another and one side is slow at +/// reading or writing the data, we can have a timeout, close the file +/// and then reopen it later. +/// +/// Note: This wraps both files opened for read and write, so we always +/// need to know how the original file was opened and the location of +/// the file. To simplify the code significantly we always require the +/// file to be a Take. +#[derive(Debug)] +pub struct ResumeableFileSlot<'a> { + maybe_file_slot: MaybeFileSlot, + path: Cow<'a, OsStr>, + is_write: bool, +} + +impl<'a> ResumeableFileSlot<'a> { + pub fn new(file: FileSlot, path: impl Into>, is_write: bool) -> Self { + Self { + maybe_file_slot: MaybeFileSlot::Open(file.take(u64::MAX)), + path: path.into(), + is_write, + } + } + + pub fn new_with_take(file: Take, path: impl Into>, is_write: bool) -> Self { + Self { + maybe_file_slot: MaybeFileSlot::Open(file), + path: path.into(), + is_write, + } + } + + pub async fn close_file(&mut self) -> Result<(), Error> { + let MaybeFileSlot::Open(file_slot) = &mut self.maybe_file_slot else { + return Ok(()); + }; + let position = file_slot + .get_mut() + .inner + .stream_position() + .await + .err_tip(|| format!("Failed to get file position {:?}", self.path))?; + self.maybe_file_slot = MaybeFileSlot::Closed((position, file_slot.limit())); + Ok(()) + } + + #[inline] + pub async fn as_reader(&mut self) -> Result<&mut Take, Error> { + let (stream_position, bytes_remaining) = match self.maybe_file_slot { + MaybeFileSlot::Open(ref mut file_slot) => return Ok(file_slot), + MaybeFileSlot::Closed(pos) => pos, + }; + let permit = OPEN_FILE_SEMAPHORE + .acquire() + .await + .map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e))?; + let inner = tokio::fs::OpenOptions::new() + .write(self.is_write) + .read(!self.is_write) + .open(&self.path) + .await + .err_tip(|| format!("Could not open after resume {:?}", self.path))?; + let mut file_slot = FileSlot { _permit: permit, inner }; + file_slot + .inner + .seek(SeekFrom::Start(stream_position)) + .await + .err_tip(|| format!("Failed to seek to position {:?} {:?}", stream_position, self.path))?; + + self.maybe_file_slot = MaybeFileSlot::Open(file_slot.take(bytes_remaining)); + match &mut self.maybe_file_slot { + MaybeFileSlot::Open(file_slot) => Ok(file_slot), + MaybeFileSlot::Closed(_) => unreachable!(), + } + } + + #[inline] + pub async fn as_writer(&mut self) -> Result<&mut FileSlot, Error> { + Ok(self.as_reader().await?.get_mut()) + } +} + #[derive(Debug)] -pub struct FileSlot<'a> { +pub struct FileSlot { // We hold the permit because once it is dropped it goes back into the queue. - _permit: SemaphorePermit<'a>, + _permit: SemaphorePermit<'static>, inner: tokio::fs::File, } -impl<'a> AsRef for FileSlot<'a> { +impl AsRef for FileSlot { fn as_ref(&self) -> &tokio::fs::File { &self.inner } } -impl<'a> AsMut for FileSlot<'a> { +impl AsMut for FileSlot { fn as_mut(&mut self) -> &mut tokio::fs::File { &mut self.inner } } -impl<'a> AsyncRead for FileSlot<'a> { +impl AsyncRead for FileSlot { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -57,7 +154,7 @@ impl<'a> AsyncRead for FileSlot<'a> { } } -impl<'a> AsyncSeek for FileSlot<'a> { +impl AsyncSeek for FileSlot { fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<(), tokio::io::Error> { Pin::new(&mut self.inner).start_seek(position) } @@ -67,7 +164,7 @@ impl<'a> AsyncSeek for FileSlot<'a> { } } -impl<'a> AsyncWrite for FileSlot<'a> { +impl AsyncWrite for FileSlot { fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { Pin::new(&mut self.inner).poll_write(cx, buf) } @@ -94,44 +191,73 @@ impl<'a> AsyncWrite for FileSlot<'a> { } const DEFAULT_OPEN_FILE_PERMITS: usize = 10; -static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_PERMITS); +static TOTAL_FILE_SEMAPHORES: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_PERMITS); +pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_PERMITS); pub fn set_open_file_limit(limit: usize) { - if limit < DEFAULT_OPEN_FILE_PERMITS { - log::error!( - "set_open_file_limit({}) must be greater than {}", - limit, - DEFAULT_OPEN_FILE_PERMITS - ); + let current_total = TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire); + if limit < current_total { + log::error!("set_open_file_limit({}) must be greater than {}", limit, current_total); return; } - OPEN_FILE_SEMAPHORE.add_permits(limit - DEFAULT_OPEN_FILE_PERMITS); + TOTAL_FILE_SEMAPHORES.fetch_add(limit - current_total, Ordering::Release); + OPEN_FILE_SEMAPHORE.add_permits(limit - current_total); +} + +pub fn get_open_files_for_test() -> usize { + TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire) - OPEN_FILE_SEMAPHORE.available_permits() } -pub async fn open_file(path: impl AsRef + std::fmt::Debug) -> Result, Error> { +/// How long a file descriptor can be open without being used before it is closed. +static IDLE_FILE_DESCRIPTOR_TIMEOUT: OnceLock = OnceLock::new(); + +pub fn idle_file_descriptor_timeout() -> Duration { + *IDLE_FILE_DESCRIPTOR_TIMEOUT.get_or_init(|| Duration::MAX) +} + +/// Set the idle file descriptor timeout. This is the amount of time +/// a file descriptor can be open without being used before it is closed. +pub fn set_idle_file_descriptor_timeout(timeout: Duration) -> Result<(), Error> { + IDLE_FILE_DESCRIPTOR_TIMEOUT + .set(timeout) + .map_err(|_| make_err!(Code::Internal, "idle_file_descriptor_timeout already set")) +} + +pub async fn open_file<'a>(path: impl Into>, limit: u64) -> Result, Error> { let permit = OPEN_FILE_SEMAPHORE .acquire() .await .map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e))?; - Ok(FileSlot { - _permit: permit, - inner: tokio::fs::File::open(&path) - .await - .err_tip(|| format!("Could not open {:?}", path))?, - }) + let path = path.into(); + Ok(ResumeableFileSlot::new_with_take( + FileSlot { + _permit: permit, + inner: tokio::fs::File::open(&path) + .await + .err_tip(|| format!("Could not open {:?}", path))?, + } + .take(limit), + path, + false, /* is_write */ + )) } -pub async fn create_file(path: impl AsRef + std::fmt::Debug) -> Result, Error> { +pub async fn create_file<'a>(path: impl Into>) -> Result, Error> { let permit = OPEN_FILE_SEMAPHORE .acquire() .await .map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e))?; - Ok(FileSlot { - _permit: permit, - inner: tokio::fs::File::create(&path) - .await - .err_tip(|| format!("Could not open {:?}", path))?, - }) + let path = path.into(); + Ok(ResumeableFileSlot::new( + FileSlot { + _permit: permit, + inner: tokio::fs::File::create(&path) + .await + .err_tip(|| format!("Could not open {:?}", path))?, + }, + path, + true, /* is_write */ + )) } pub async fn hard_link(src: impl AsRef, dst: impl AsRef) -> Result<(), Error> { diff --git a/util/tests/fs_test.rs b/util/tests/fs_test.rs new file mode 100644 index 000000000..0a5a14961 --- /dev/null +++ b/util/tests/fs_test.rs @@ -0,0 +1,169 @@ +// Copyright 2023 The Turbo Cache Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::env; +use std::ffi::OsString; +use std::io::SeekFrom; +use std::str::from_utf8; + +use rand::{thread_rng, Rng}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +use tokio::sync::Semaphore; + +use common::fs; +use error::Error; + +/// Get temporary path from either `TEST_TMPDIR` or best effort temp directory if +/// not set. +async fn make_temp_path(data: &str) -> OsString { + let dir = format!( + "{}/{}", + env::var("TEST_TMPDIR").unwrap_or(env::temp_dir().to_str().unwrap().to_string()), + thread_rng().gen::(), + ); + fs::create_dir_all(&dir).await.unwrap(); + OsString::from(format!("{}/{}", dir, data)) +} + +static TEST_EXCLUSIVE_SEMAPHORE: Semaphore = Semaphore::const_new(1); + +#[cfg(test)] +mod fs_tests { + use super::*; + use pretty_assertions::assert_eq; // Must be declared in every module. + + #[tokio::test] + async fn resumeable_file_slot_write_close_write_test() -> Result<(), Error> { + let _permit = TEST_EXCLUSIVE_SEMAPHORE.acquire().await; // One test at a time. + let filename = make_temp_path("test_file.txt").await; + { + let mut file = fs::create_file(&filename).await?; + file.as_writer().await?.write_all(b"Hello").await?; + file.close_file().await?; + assert_eq!(fs::get_open_files_for_test(), 0); + file.as_writer().await?.write_all(b"Goodbye").await?; + assert_eq!(fs::get_open_files_for_test(), 1); + file.as_writer().await?.as_mut().sync_all().await?; + } + assert_eq!(fs::get_open_files_for_test(), 0); + { + let mut file = fs::open_file(&filename, u64::MAX).await?; + let mut contents = String::new(); + file.as_reader().await?.read_to_string(&mut contents).await?; + assert_eq!(contents, "HelloGoodbye"); + } + Ok(()) + } + + #[tokio::test] + async fn resumeable_file_slot_read_close_read_test() -> Result<(), Error> { + let _permit = TEST_EXCLUSIVE_SEMAPHORE.acquire().await; // One test at a time. + const DUMMYDATA: &str = "DummyDataTest"; + let filename = make_temp_path("test_file.txt").await; + { + let mut file = fs::create_file(&filename).await?; + file.as_writer().await?.write_all(DUMMYDATA.as_bytes()).await?; + file.as_writer().await?.as_mut().sync_all().await?; + } + { + let mut file = fs::open_file(&filename, u64::MAX).await?; + let mut contents = [0u8; 5]; + { + assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5); + assert_eq!(from_utf8(&contents[..]).unwrap(), "Dummy"); + } + file.close_file().await?; + { + assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5); + assert_eq!(from_utf8(&contents[..]).unwrap(), "DataT"); + } + file.close_file().await?; + { + assert_eq!(file.as_reader().await?.read(&mut contents).await?, 3); + assert_eq!(from_utf8(&contents[..3]).unwrap(), "est"); + } + } + Ok(()) + } + + #[tokio::test] + async fn resumeable_file_slot_read_close_read_with_take_test() -> Result<(), Error> { + let _permit = TEST_EXCLUSIVE_SEMAPHORE.acquire().await; // One test at a time. + const DUMMYDATA: &str = "DummyDataTest"; + let filename = make_temp_path("test_file.txt").await; + { + let mut file = fs::create_file(&filename).await?; + file.as_writer().await?.write_all(DUMMYDATA.as_bytes()).await?; + file.as_writer().await?.as_mut().sync_all().await?; + } + { + let mut file = fs::open_file(&filename, 11).await?; + let mut contents = [0u8; 5]; + { + assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5); + assert_eq!(from_utf8(&contents[..]).unwrap(), "Dummy"); + } + assert_eq!(fs::get_open_files_for_test(), 1); + file.close_file().await?; + assert_eq!(fs::get_open_files_for_test(), 0); + { + assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5); + assert_eq!(from_utf8(&contents[..]).unwrap(), "DataT"); + } + assert_eq!(fs::get_open_files_for_test(), 1); + file.close_file().await?; + assert_eq!(fs::get_open_files_for_test(), 0); + { + assert_eq!(file.as_reader().await?.read(&mut contents).await?, 1); + assert_eq!(from_utf8(&contents[..1]).unwrap(), "e"); + } + } + Ok(()) + } + + #[tokio::test] + async fn resumeable_file_slot_read_close_read_with_take_and_seek_test() -> Result<(), Error> { + let _permit = TEST_EXCLUSIVE_SEMAPHORE.acquire().await; // One test at a time. + const DUMMYDATA: &str = "DummyDataTest"; + let filename = make_temp_path("test_file.txt").await; + { + let mut file = fs::create_file(&filename).await?; + file.as_writer().await?.write_all(DUMMYDATA.as_bytes()).await?; + file.as_writer().await?.as_mut().sync_all().await?; + } + { + let mut file = fs::open_file(&filename, 11).await?; + file.as_reader().await?.get_mut().seek(SeekFrom::Start(2)).await?; + let mut contents = [0u8; 5]; + { + assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5); + assert_eq!(from_utf8(&contents[..]).unwrap(), "mmyDa"); + } + assert_eq!(fs::get_open_files_for_test(), 1); + file.close_file().await?; + assert_eq!(fs::get_open_files_for_test(), 0); + { + assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5); + assert_eq!(from_utf8(&contents[..]).unwrap(), "taTes"); + } + file.close_file().await?; + assert_eq!(fs::get_open_files_for_test(), 0); + { + assert_eq!(file.as_reader().await?.read(&mut contents).await?, 1); + assert_eq!(from_utf8(&contents[..1]).unwrap(), "t"); + } + } + Ok(()) + } +}