Skip to content

Commit

Permalink
Use fast-async-mutex for all mutex needs
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Nov 1, 2021
1 parent cd12d1d commit c5d450c
Show file tree
Hide file tree
Showing 18 changed files with 84 additions and 231 deletions.
20 changes: 2 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ path = "needed_only_to_make_cargo_tooling_happy.rs"
prost = "0.7.0"
prost-types = "0.7.0"
hex = "0.4.3"
async-mutex = "1.4.0"
async-trait = "0.1.51"
fixed-buffer = "0.2.3"
futures = "0.3.16"
Expand Down
12 changes: 7 additions & 5 deletions cas/grpc_service/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::time::Instant;

use async_fixed_buffer::AsyncFixedBuf;
use drop_guard::DropGuard;
use futures::{stream::unfold, Stream};
use futures::{stream::unfold, Stream, Future};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tonic::{Request, Response, Status, Streaming};

Expand All @@ -35,7 +35,7 @@ pub struct ByteStreamServer {
struct ReaderState {
max_bytes_per_stream: usize,
was_shutdown: bool,
stream_closer: Box<dyn FnMut() + Sync + Send>,
stream_closer_fut: Option<Pin<Box<dyn Future<Output = ()> + Sync + Send>>>,
rx: Box<dyn AsyncRead + Sync + Send + Unpin>,
reading_future: Box<tokio::task::JoinHandle<Result<(), Error>>>,
}
Expand All @@ -44,7 +44,9 @@ impl ReaderState {
async fn shutdown(&mut self) -> Result<(), Error> {
self.was_shutdown = true;
// Close stream then wait for reader stream to finish.
(self.stream_closer)();
if let Some(stream_closer) = self.stream_closer_fut.take() {
stream_closer.await;
}
let mut drainer = tokio::io::sink();
// Note: We ignore errors here or else we will get "Sender Disconnected" errors.
let _ = tokio::io::copy(&mut self.rx, &mut drainer).await;
Expand Down Expand Up @@ -91,7 +93,7 @@ impl ByteStreamServer {
let digest = DigestInfo::try_new(&resource_info.hash, resource_info.expected_size)?;

let mut raw_fixed_buffer = AsyncFixedBuf::new(vec![0u8; self.read_buffer_stream_size].into_boxed_slice());
let stream_closer = raw_fixed_buffer.get_closer();
let stream_closer_fut = Some(raw_fixed_buffer.get_closer());
let (rx, mut tx) = tokio::io::split(raw_fixed_buffer);
let rx: Box<dyn tokio::io::AsyncRead + Sync + Send + Unpin> = if read_limit != 0 {
Box::new(rx.take(u64::try_from(read_limit).err_tip(|| "read_limit has is not convertable to u64")?))
Expand Down Expand Up @@ -119,7 +121,7 @@ impl ByteStreamServer {
// This allows us to call a destructor when the the object is dropped.
let state = Some(DropGuard::new(
ReaderState {
stream_closer: stream_closer,
stream_closer_fut,
rx: rx,
max_bytes_per_stream: self.max_bytes_per_stream,
reading_future: reading_future,
Expand Down
4 changes: 2 additions & 2 deletions cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ rust_library(
srcs = ["memory_store.rs"],
deps = [
"//config",
"//third_party:async_mutex",
"//third_party:fast_async_mutex",
"//third_party:tokio",
"//util:common",
"//util:error",
Expand All @@ -50,7 +50,7 @@ rust_library(
srcs = ["verify_store.rs"],
deps = [
"//config",
"//third_party:async_mutex",
"//third_party:fast_async_mutex",
"//third_party:hex",
"//third_party:sha2",
"//third_party:tokio",
Expand Down
2 changes: 1 addition & 1 deletion cas/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::marker::Send;
use std::sync::Arc;
use std::time::Instant;

use async_mutex::Mutex;
use fast_async_mutex::mutex::Mutex;
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

Expand Down
4 changes: 2 additions & 2 deletions cas/store/verify_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl StoreTrait for VerifyStore {
let expected_size = digest.size_bytes;
Box::pin(async move {
let mut raw_fixed_buffer = AsyncFixedBuf::new(vec![0u8; 1024 * 4].into_boxed_slice());
let mut stream_closer = raw_fixed_buffer.get_closer();
let stream_closer_fut = raw_fixed_buffer.get_closer();
let (rx, tx) = tokio::io::split(raw_fixed_buffer);

let hash_copy = digest.packed_hash;
Expand All @@ -109,7 +109,7 @@ impl StoreTrait for VerifyStore {
hasher = Some((hash_copy, Sha256::new()));
}
let result = inner_check_update(tx, reader, expected_size, self.verify_size, hasher).await;
stream_closer();
stream_closer_fut.await;
result.merge(
spawn_future
.await
Expand Down
29 changes: 24 additions & 5 deletions config/examples/basic_cas.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,36 @@
"CAS_MAIN_STORE": {
"verify": {
"backend": {
"memory": {}
"s3_store": {
"region": "us-west-1",
"bucket": "blaisebruer-cas-store",
"key_prefix": "test-prefix-cas",
"retry": {
"max_retries": 0,
"delay": 0.1,
"jitter": 0.0,
}
}
},
"verify_size": true,
"verify_hash": true
}
},
"AC_MAIN_STORE": {
"memory": {
"eviction_policy": {
// 100mb.
"max_bytes": 100000000,
// "memory": {
// "eviction_policy": {
// // 100mb.
// "max_bytes": 100000000,
// }
// }
"s3_store": {
"region": "us-west-1",
"bucket": "blaisebruer-cas-store",
"key_prefix": "test-prefix-ac",
"retry": {
"max_retries": 0,
"delay": 0.1,
"jitter": 0.0,
}
}
}
Expand Down
9 changes: 0 additions & 9 deletions third_party/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,6 @@ licenses([
])

# Aliased targets
alias(
name = "async_mutex",
actual = "@raze__async_mutex__1_4_0//:async_mutex",
tags = [
"cargo-raze",
"manual",
],
)

alias(
name = "async_trait",
actual = "@raze__async_trait__0_1_51//:async_trait",
Expand Down
30 changes: 5 additions & 25 deletions third_party/crates.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,6 @@ def raze_fetch_remote_crates():
build_file = Label("//third_party/remote:BUILD.arrayvec-0.5.2.bazel"),
)

maybe(
http_archive,
name = "raze__async_mutex__1_4_0",
url = "https://crates.io/api/v1/crates/async-mutex/1.4.0/download",
type = "tar.gz",
sha256 = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e",
strip_prefix = "async-mutex-1.4.0",
build_file = Label("//third_party/remote:BUILD.async-mutex-1.4.0.bazel"),
)

maybe(
http_archive,
name = "raze__async_stream__0_3_2",
Expand Down Expand Up @@ -601,16 +591,6 @@ def raze_fetch_remote_crates():
build_file = Label("//third_party/remote:BUILD.env_logger-0.9.0.bazel"),
)

maybe(
http_archive,
name = "raze__event_listener__2_5_1",
url = "https://crates.io/api/v1/crates/event-listener/2.5.1/download",
type = "tar.gz",
sha256 = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59",
strip_prefix = "event-listener-2.5.1",
build_file = Label("//third_party/remote:BUILD.event-listener-2.5.1.bazel"),
)

maybe(
http_archive,
name = "raze__fake_simd__0_1_2",
Expand Down Expand Up @@ -1313,12 +1293,12 @@ def raze_fetch_remote_crates():

maybe(
http_archive,
name = "raze__openssl_sys__0_9_69",
url = "https://crates.io/api/v1/crates/openssl-sys/0.9.69/download",
name = "raze__openssl_sys__0_9_70",
url = "https://crates.io/api/v1/crates/openssl-sys/0.9.70/download",
type = "tar.gz",
sha256 = "14276c7942cb12d5ffab976d5b69789b0510d052576b230fcde58d8c581b8d1d",
strip_prefix = "openssl-sys-0.9.69",
build_file = Label("//third_party/remote:BUILD.openssl-sys-0.9.69.bazel"),
sha256 = "c6517987b3f8226b5da3661dad65ff7f300cc59fb5ea8333ca191fc65fde3edf",
strip_prefix = "openssl-sys-0.9.70",
build_file = Label("//third_party/remote:BUILD.openssl-sys-0.9.70.bazel"),
)

maybe(
Expand Down
68 changes: 0 additions & 68 deletions third_party/remote/BUILD.async-mutex-1.4.0.bazel

This file was deleted.

57 changes: 0 additions & 57 deletions third_party/remote/BUILD.event-listener-2.5.1.bazel

This file was deleted.

Loading

0 comments on commit c5d450c

Please sign in to comment.