Skip to content

Commit

Permalink
Files will now close if held open too long
Browse files Browse the repository at this point in the history
In testing Buck2, it turns out they hammer the scheduler with jobs
and assume it can keep up (which is good). On local testing, where
all services are on the same machine it was deadlocking because
it was performing the following operations:
1. Open file1 for reading
2. Open file2 for writing
3. Streaming file1 -> file2

Since we allow users to limit the number of open files at any given
time, this was deadlocking because file1 was held open waiting
for file2 to open, which was waiting for a file to be closed. Since
buck2 goes crazy, it was causing a deadlock.

In most production systems this is not an issue because the CAS
is separated from the workers, but rarely might happen on the workers
if the `max_open_files` was set too low.

To get around this issue `ResumeableFileSlot` is introduced. It
allows callers to use a timeout and call `.close_file()` on it
and the next time the struct is used it will re-open the file.

related #222
closes #238
  • Loading branch information
allada committed Sep 3, 2023
1 parent 298bfb9 commit 67b90e2
Show file tree
Hide file tree
Showing 20 changed files with 863 additions and 187 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/main.yml
Expand Up @@ -57,8 +57,10 @@ jobs:
(echo "Cargo.toml is out of date. Please run: python ./tools/build_cargo_manifest.py" && exit 1)
- name: Compile & test with cargo
run: |
docker run --rm -e CC=clang -v $PWD:/root/turbo-cache allada/turbo-cache:test sh -c ' \
apt-get update && apt-get install cargo -y && \
docker run --rm -e CC=clang -v $PWD:/root/turbo-cache allada/turbo-cache:test bash -c ' \
apt update && apt install -y curl libssl-dev gcc pkg-config && \
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain=1.70.0 && \
. $HOME/.cargo/env && \
cargo build --all && \
cargo test --all \
'
Expand Down
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Expand Up @@ -13,6 +13,7 @@ members = [
"gencargo/ac_server",
"gencargo/ac_server_test",
"gencargo/ac_utils",
"gencargo/ac_utils_test",
"gencargo/action_messages",
"gencargo/action_messages_test",
"gencargo/async_fixed_buffer",
Expand Down Expand Up @@ -46,6 +47,7 @@ members = [
"gencargo/filesystem_store",
"gencargo/filesystem_store_test",
"gencargo/fs",
"gencargo/fs_test",
"gencargo/gen_protos_tool",
"gencargo/grpc_scheduler",
"gencargo/grpc_store",
Expand Down Expand Up @@ -149,6 +151,7 @@ uuid = { version = "1.4.0", features = ["v4"] }
ac_server = { path = "gencargo/ac_server" }
ac_server_test = { path = "gencargo/ac_server_test" }
ac_utils = { path = "gencargo/ac_utils" }
ac_utils_test = { path = "gencargo/ac_utils_test" }
action_messages = { path = "gencargo/action_messages" }
action_messages_test = { path = "gencargo/action_messages_test" }
async_fixed_buffer = { path = "gencargo/async_fixed_buffer" }
Expand Down Expand Up @@ -182,6 +185,7 @@ fastcdc_test = { path = "gencargo/fastcdc_test" }
filesystem_store = { path = "gencargo/filesystem_store" }
filesystem_store_test = { path = "gencargo/filesystem_store_test" }
fs = { path = "gencargo/fs" }
fs_test = { path = "gencargo/fs_test" }
gen_protos_tool = { path = "gencargo/gen_protos_tool" }
grpc_scheduler = { path = "gencargo/grpc_scheduler" }
grpc_store = { path = "gencargo/grpc_store" }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -65,7 +65,7 @@ bazel build -c opt //cas
These will place an executable in `./bazel-bin/cas/cas` that will start the service.

### Cargo requirements
* Cargo 1.66.0+
* Cargo 1.70.0+
* `libssl-dev` package installed (ie: `apt install libssl-dev` or `yum install libssl-dev`)
#### Cargo building for deployment
```sh
Expand Down
11 changes: 10 additions & 1 deletion cas/cas_main.rs
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use axum::Router;
use clap::Parser;
Expand All @@ -34,7 +35,7 @@ use ac_server::AcServer;
use bytestream_server::ByteStreamServer;
use capabilities_server::CapabilitiesServer;
use cas_server::CasServer;
use common::fs::set_open_file_limit;
use common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
use common::log;
use config::cas_server::{CasConfig, CompressionAlgorithm, GlobalConfig, ServerConfig, WorkerConfig};
use default_scheduler_factory::scheduler_factory;
Expand Down Expand Up @@ -461,14 +462,21 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Note: If the default changes make sure you update the documentation in
// `config/cas_server.rs`.
const DEFAULT_MAX_OPEN_FILES: usize = 512;
// Note: If the default changes make sure you update the documentation in
// `config/cas_server.rs`.
const DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS: u64 = 1000;
let global_cfg = if let Some(global_cfg) = &mut cfg.global {
if global_cfg.max_open_files == 0 {
global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES;
}
if global_cfg.idle_file_descriptor_timeout_millis == 0 {
global_cfg.idle_file_descriptor_timeout_millis = DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS;
}
*global_cfg
} else {
GlobalConfig {
max_open_files: DEFAULT_MAX_OPEN_FILES,
idle_file_descriptor_timeout_millis: DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS,
disable_metrics: cfg.servers.iter().all(|v| {
let Some(service) = &v.services else {
return true;
Expand All @@ -478,6 +486,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
};
set_open_file_limit(global_cfg.max_open_files);
set_idle_file_descriptor_timeout(Duration::from_millis(global_cfg.idle_file_descriptor_timeout_millis))?;
!global_cfg.disable_metrics
};
// Override metrics enabled if the environment variable is set.
Expand Down
16 changes: 16 additions & 0 deletions cas/store/BUILD
Expand Up @@ -440,3 +440,19 @@ rust_test(
],
proc_macro_deps = ["@crate_index//:async-trait"],
)

rust_test(
name = "ac_utils_test",
srcs = ["tests/ac_utils_test.rs"],
deps = [
":ac_utils",
":memory_store",
":store",
"//config",
"//util:common",
"//util:error",
"@crate_index//:pretty_assertions",
"@crate_index//:rand",
"@crate_index//:tokio",
],
)
114 changes: 82 additions & 32 deletions cas/store/ac_utils.rs
Expand Up @@ -21,10 +21,12 @@ use futures::{future::try_join, Future, FutureExt, TryFutureExt};
use prost::Message;
use sha2::{Digest, Sha256};
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::time::timeout;

use buf_channel::make_buf_channel_pair;
use common::DigestInfo;
use buf_channel::{make_buf_channel_pair, DropCloserWriteHalf};
use common::{fs, DigestInfo};
use error::{Code, Error, ResultExt};
use fs::idle_file_descriptor_timeout;
use store::{Store, UploadSizeInfo};

// NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than
Expand All @@ -36,6 +38,9 @@ pub const ESTIMATED_DIGEST_SIZE: usize = 2048;
/// to use up all the memory on this machine.
const MAX_ACTION_MSG_SIZE: usize = 10 << 20; // 10mb.

/// Default read buffer size for reading from an AsyncReader.
const DEFAULT_READ_BUFF_SIZE: usize = 4096;

/// Attempts to fetch the digest contents from a store into the associated proto.
pub async fn get_and_decode_digest<T: Message + Default>(
store: Pin<&dyn Store>,
Expand Down Expand Up @@ -77,30 +82,36 @@ pub async fn serialize_and_upload_message<'a, T: Message>(

/// Given a bytestream computes the digest for the data.
/// Note: This will happen in a new spawn since computing digests can be thread intensive.
pub fn compute_digest<R: AsyncRead + Unpin + Send + 'static>(
mut reader: R,
) -> impl Future<Output = Result<(DigestInfo, R), Error>> {
tokio::spawn(async move {
const DEFAULT_READ_BUFF_SIZE: usize = 4096;
let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE);
let mut hasher = Sha256::new();
let mut digest_size = 0;
loop {
reader
.read_buf(&mut chunk)
.await
.err_tip(|| "Could not read chunk during compute_digest")?;
if chunk.is_empty() {
break; // EOF.
}
digest_size += chunk.len();
hasher.update(&chunk);
chunk.clear();
pub async fn compute_digest<R: AsyncRead + Unpin + Send>(mut reader: R) -> Result<(DigestInfo, R), Error> {
let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE);
let mut hasher = Sha256::new();
let mut digest_size = 0;
loop {
reader
.read_buf(&mut chunk)
.await
.err_tip(|| "Could not read chunk during compute_digest")?;
if chunk.is_empty() {
break; // EOF.
}
digest_size += chunk.len();
hasher.update(&chunk);
chunk.clear();
}

Ok((DigestInfo::new(hasher.finalize().into(), digest_size as i64), reader))
})
.map(|r| r.err_tip(|| "Failed to launch spawn")?)
Ok((DigestInfo::new(hasher.finalize().into(), digest_size as i64), reader))
}

fn inner_upload_file_to_store<'a, Fut: Future<Output = Result<(), Error>> + 'a>(
cas_store: Pin<&'a dyn Store>,
digest: DigestInfo,
read_data_fn: impl FnOnce(DropCloserWriteHalf) -> Fut,
) -> impl Future<Output = Result<(), Error>> + 'a {
let (tx, rx) = make_buf_channel_pair();
let upload_file_to_store_fut = cas_store
.update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes as usize))
.map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store"));
try_join(read_data_fn(tx), upload_file_to_store_fut).map_ok(|(_, _)| ())
}

/// Uploads data to our store for given digest.
Expand All @@ -114,13 +125,8 @@ pub fn upload_to_store<'a, R: AsyncRead + Unpin>(
digest: DigestInfo,
reader: &'a mut R,
) -> impl Future<Output = Result<(), Error>> + 'a {
let (mut tx, rx) = make_buf_channel_pair();
let upload_to_store_fut = cas_store
.update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes as usize))
.map(|r| r.err_tip(|| "Could not upload data to store in upload_to_store"));
let read_data_fut = async move {
inner_upload_file_to_store(cas_store, digest, move |mut tx| async move {
loop {
const DEFAULT_READ_BUFF_SIZE: usize = 4096;
let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE);
reader
.read_buf(&mut chunk)
Expand All @@ -137,6 +143,50 @@ pub fn upload_to_store<'a, R: AsyncRead + Unpin>(
.await
.err_tip(|| "Could not send EOF to store in upload_to_store")?;
Ok(())
};
try_join(read_data_fut, upload_to_store_fut).map_ok(|(_, _)| ())
})
}

/// Same as `upload_to_store`, however it specializes in dealing with a `ResumeableFileSlot`.
/// This will close the reading file to close if writing the data takes a while.
pub fn upload_file_to_store<'a>(
cas_store: Pin<&'a dyn Store>,
digest: DigestInfo,
mut file_reader: fs::ResumeableFileSlot<'a>,
) -> impl Future<Output = Result<(), Error>> + 'a {
inner_upload_file_to_store(cas_store, digest, move |mut tx| async move {
loop {
let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE);
file_reader
.as_reader()
.await
.err_tip(|| "Could not get reader from file slot in upload_file_to_store")?
.read_buf(&mut chunk)
.await
.err_tip(|| "Could not read chunk during upload_file_to_store")?;
if chunk.is_empty() {
break; // EOF.
}
let send_fut = tx.send(chunk.freeze());
tokio::pin!(send_fut);
loop {
match timeout(idle_file_descriptor_timeout(), &mut send_fut).await {
Ok(Ok(())) => break,
Ok(Err(err)) => {
return Err(err).err_tip(|| "Could not send buffer data to store in upload_file_to_store")
}
Err(_) => {
file_reader
.close_file()
.await
.err_tip(|| "Could not close file due to timeout in upload_file_to_store")?;
continue;
}
}
}
}
tx.send_eof()
.await
.err_tip(|| "Could not send EOF to store in upload_file_to_store")?;
Ok(())
})
}

0 comments on commit 67b90e2

Please sign in to comment.