Skip to content

Commit

Permalink
Move trivial data access Mutex's to sync Mutex.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisstaite-menlo committed Jul 6, 2023
1 parent b354b25 commit 812b94d
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 68 deletions.
6 changes: 5 additions & 1 deletion Cargo.Bazel.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"checksum": "7ba4c619cdcf6bb43fbfadb39a3207ee2ef162a7907e632ed76ff7ab77b697e1",
"checksum": "1f509064a530945cea1a818bf8c8d06f3e18b9ccba980d2f785c071b7fd19b11",
"crates": {
"adler 1.0.2": {
"name": "adler",
Expand Down Expand Up @@ -2515,6 +2515,10 @@
"id": "nix 0.23.2",
"target": "nix"
},
{
"id": "parking_lot 0.12.1",
"target": "parking_lot"
},
{
"id": "pin-project-lite 0.2.9",
"target": "pin_project_lite"
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ clap = { version = "4.1.8", features = ["derive"] }
uuid = { version = "0.8.2", features = ["v4"] }
shlex = "1.1.0"
relative-path = "1.7.0"
parking_lot = "0.12.1"

[dev-dependencies]
stdext = "0.2.1"
Expand Down
2 changes: 1 addition & 1 deletion cas/grpc_service/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ rust_library(
"//util:error",
"//util:resource_info",
"//util:write_request_stream_wrapper",
"@crate_index//:async-lock",
"@crate_index//:futures",
"@crate_index//:parking_lot",
"@crate_index//:tokio",
"@crate_index//:tonic",
],
Expand Down
11 changes: 5 additions & 6 deletions cas/grpc_service/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use async_lock::Mutex;
use futures::{stream::unfold, Stream};
use parking_lot::Mutex;
use proto::google::bytestream::{
byte_stream_server::ByteStream, byte_stream_server::ByteStreamServer as Server, QueryWriteStatusRequest,
QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest, WriteResponse,
Expand Down Expand Up @@ -236,7 +236,7 @@ impl ByteStreamServer {
.ok_or_else(|| make_input_err!("UUID must be set if querying write status"))?;

{
let active_uploads = self.active_uploads.lock().await;
let active_uploads = self.active_uploads.lock();
if active_uploads.contains(uuid) {
return Ok(Response::new(QueryWriteStatusResponse {
// TODO(blaise.bruer) We currently don't support resuming a stream, so we always
Expand Down Expand Up @@ -300,13 +300,12 @@ impl ByteStream for ByteStreamServer {
.ok_or_else(|| Into::<Status>::into(make_input_err!("UUID must be set if writing data")))?;
{
// Check to see if request is already being uploaded and if it is error, otherwise insert entry.
let mut active_uploads = self.active_uploads.lock().await;
if active_uploads.contains(&uuid) {
let mut active_uploads = self.active_uploads.lock();
if !active_uploads.insert(uuid.clone()) {
return Err(Into::<Status>::into(make_input_err!(
"Cannot upload same UUID simultaneously"
)));
}
active_uploads.insert(uuid.clone());
}

log::info!("\x1b[0;31mWrite Req\x1b[0m: {:?}", hash);
Expand All @@ -318,7 +317,7 @@ impl ByteStream for ByteStreamServer {

{
// Remove the active upload request.
let mut active_uploads = self.active_uploads.lock().await;
let mut active_uploads = self.active_uploads.lock();
active_uploads.remove(&uuid);
}
let d = now.elapsed().as_secs_f32();
Expand Down
2 changes: 1 addition & 1 deletion cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ rust_library(
"//util:common",
"//util:error",
"//util:write_request_stream_wrapper",
"@crate_index//:async-lock",
"@crate_index//:futures",
"@crate_index//:parking_lot",
"@crate_index//:shellexpand",
"@crate_index//:tokio",
"@crate_index//:tonic",
Expand Down
6 changes: 3 additions & 3 deletions cas/store/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::pin::Pin;
use std::sync::Arc;

use async_lock::Mutex;
use async_trait::async_trait;
use futures::{stream::unfold, Stream};
use shellexpand;
Expand All @@ -26,6 +25,7 @@ use buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use common::{log, DigestInfo};
use config;
use error::{error_if, make_input_err, Error, ResultExt};
use parking_lot::Mutex;
use proto::build::bazel::remote::execution::v2::{
action_cache_client::ActionCacheClient, content_addressable_storage_client::ContentAddressableStorageClient,
ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse,
Expand Down Expand Up @@ -193,12 +193,12 @@ impl GrpcStore {
}
// TODO(allada) I'm sure there's a way to do this without a mutex, but rust can be super
// picky with borrowing through a stream await.
*local_state.error.lock().await = Some(maybe_message.unwrap_err());
*local_state.error.lock() = Some(maybe_message.unwrap_err());
None
});

let result = client.write(stream).await.err_tip(|| "in GrpcStore::write")?;
if let Some(err) = (error.lock().await).take() {
if let Some(err) = error.lock().take() {
return Err(err);
}
return Ok(result);
Expand Down
2 changes: 1 addition & 1 deletion cas/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ rust_library(
"//proto",
"//util:common",
"//util:error",
"@crate_index//:async-lock",
"@crate_index//:bytes",
"@crate_index//:filetime",
"@crate_index//:futures",
"@crate_index//:hex",
"@crate_index//:parking_lot",
"@crate_index//:relative-path",
"@crate_index//:tokio",
"@crate_index//:tokio-stream",
Expand Down
71 changes: 34 additions & 37 deletions cas/worker/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use std::process::Stdio;
use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc, Weak};
use std::time::SystemTime;

use async_lock::Mutex;
use bytes::{BufMut, Bytes, BytesMut};
use filetime::{set_file_mtime, FileTime};
use futures::future::{try_join, try_join3, try_join_all, BoxFuture, FutureExt, TryFutureExt};
use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
use hex;
use parking_lot::Mutex;
use relative_path::RelativePath;
use tokio::io::AsyncSeekExt;
use tokio::process;
Expand Down Expand Up @@ -442,7 +442,7 @@ impl RunningAction for RunningActionImpl {
/// up to the stores to rate limit if needed.
async fn prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error> {
{
let mut state = self.state.lock().await;
let mut state = self.state.lock();
state.execution_metadata.input_fetch_start_timestamp = (self.running_actions_manager.now_fn)();
}
let command = {
Expand Down Expand Up @@ -485,7 +485,7 @@ impl RunningAction for RunningActionImpl {
}
log::info!("\x1b[0;31mWorker Received Command\x1b[0m: {:?}", command);
{
let mut state = self.state.lock().await;
let mut state = self.state.lock();
state.command_proto = Some(command);
state.execution_metadata.input_fetch_completed_timestamp = (self.running_actions_manager.now_fn)();
}
Expand All @@ -494,7 +494,7 @@ impl RunningAction for RunningActionImpl {

async fn execute(self: Arc<Self>) -> Result<Arc<Self>, Error> {
let (command_proto, mut kill_channel_rx) = {
let mut state = self.state.lock().await;
let mut state = self.state.lock();
state.execution_metadata.execution_start_timestamp = (self.running_actions_manager.now_fn)();
(
state
Expand Down Expand Up @@ -557,15 +557,20 @@ impl RunningAction for RunningActionImpl {
}
Result::<Bytes, Error>::Ok(all_stderr.freeze())
}));
let mut killed_action = false;
loop {
tokio::select! {
maybe_exit_status = child_process.wait() => {
let exit_status = maybe_exit_status.err_tip(|| "Failed to collect exit code of process")?;
// TODO(allada) We should implement stderr/stdout streaming to client here.
let stdout = all_stdout_fut.await.err_tip(|| "Internal error reading from stdout of worker task")??;
let stderr = all_stderr_fut.await.err_tip(|| "Internal error reading from stderr of worker task")??;
// If we get killed before the stream is started, then these will lock up.
let (stdout, stderr) = if killed_action {
(Bytes::new(), Bytes::new())
} else {
(all_stdout_fut.await.err_tip(|| "Internal error reading from stdout of worker task")??, all_stderr_fut.await.err_tip(|| "Internal error reading from stderr of worker task")??)
};
{
let mut state = self.state.lock().await;
let mut state = self.state.lock();
state.command_proto = Some(command_proto);
state.execution_result = Some(RunningActionImplExecutionResult{
stdout,
Expand All @@ -577,6 +582,7 @@ impl RunningAction for RunningActionImpl {
return Ok(self);
},
_ = &mut kill_channel_rx => {
killed_action = true;
if let Err(e) = child_process.start_kill() {
log::error!("Could not kill process in RunningActionsManager : {:?}", e);
}
Expand All @@ -589,7 +595,7 @@ impl RunningAction for RunningActionImpl {
async fn upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
log::info!("\x1b[0;31mWorker Uploading Results\x1b[0m");
let (mut command_proto, execution_result, mut execution_metadata) = {
let mut state = self.state.lock().await;
let mut state = self.state.lock();
state.execution_metadata.output_upload_start_timestamp = (self.running_actions_manager.now_fn)();
(
state
Expand Down Expand Up @@ -746,7 +752,7 @@ impl RunningAction for RunningActionImpl {
output_file_symlinks.sort_unstable_by(|a, b| a.name_or_path.cmp(&b.name_or_path));
output_directory_symlinks.sort_unstable_by(|a, b| a.name_or_path.cmp(&b.name_or_path));
{
let mut state = self.state.lock().await;
let mut state = self.state.lock();
execution_metadata.worker_completed_timestamp = (self.running_actions_manager.now_fn)();
state.action_result = Some(ActionResult {
output_files,
Expand All @@ -770,14 +776,14 @@ impl RunningAction for RunningActionImpl {
.await
.err_tip(|| format!("Could not remove working directory {}", self.work_directory));
self.did_cleanup.store(true, Ordering::Relaxed);
if let Err(e) = self.running_actions_manager.cleanup_action(&self.action_id).await {
if let Err(e) = self.running_actions_manager.cleanup_action(&self.action_id) {
return Result::<Arc<Self>, Error>::Err(e).merge(remove_dir_result.map(|_| self));
}
remove_dir_result.map(|_| self)
}

async fn get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error> {
let mut state = self.state.lock().await;
let mut state = self.state.lock();
state
.action_result
.take()
Expand All @@ -799,8 +805,6 @@ pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static {
start_execute: StartExecute,
) -> Result<Arc<Self::RunningAction>, Error>;

async fn get_action(&self, action_id: &ActionId) -> Result<Arc<Self::RunningAction>, Error>;

async fn kill_all(&self);
}

Expand Down Expand Up @@ -880,8 +884,8 @@ impl RunningActionsManagerImpl {
Ok(action_info)
}

async fn cleanup_action(&self, action_id: &ActionId) -> Result<(), Error> {
let mut running_actions = self.running_actions.lock().await;
fn cleanup_action(&self, action_id: &ActionId) -> Result<(), Error> {
let mut running_actions = self.running_actions.lock();
running_actions.remove(action_id).err_tip(|| {
format!(
"Expected action id '{:?}' to exist in RunningActionsManagerImpl",
Expand All @@ -892,8 +896,11 @@ impl RunningActionsManagerImpl {
}

async fn kill_action(action: Arc<RunningActionImpl>) {
let mut action_state = action.state.lock().await;
if let Some(kill_channel_tx) = action_state.kill_channel_tx.take() {
let kill_channel_tx = {
let mut action_state = action.state.lock();
action_state.kill_channel_tx.take()
};
if let Some(kill_channel_tx) = kill_channel_tx {
match kill_channel_tx.send(()) {
Err(_) => log::error!("Error sending kill to running action"),
_ => (),
Expand Down Expand Up @@ -940,32 +947,22 @@ impl RunningActionsManager for RunningActionsManagerImpl {
self.clone(),
));
{
let mut running_actions = self.running_actions.lock().await;
let mut running_actions = self.running_actions.lock();
running_actions.insert(action_id, Arc::downgrade(&running_action));
}
Ok(running_action)
}

async fn get_action(&self, action_id: &ActionId) -> Result<Arc<Self::RunningAction>, Error> {
let running_actions = self.running_actions.lock().await;
Ok(running_actions
.get(action_id)
.err_tip(|| format!("Action '{:?}' not found", action_id))?
.upgrade()
.err_tip(|| "Could not upgrade RunningAction Arc")?)
}

async fn kill_all(&self) {
let kill_actions = {
let running_actions = self.running_actions.lock().await;
futures::future::join_all(
running_actions
.iter()
.filter_map(|(_action_id, action)| action.upgrade())
.map(|action| Box::pin(Self::kill_action(action))),
)
let kill_actions: Vec<Arc<RunningActionImpl>> = {
let running_actions = self.running_actions.lock();
running_actions
.iter()
.filter_map(|(_action_id, action)| action.upgrade())
.collect()
};
// Await the actions after dropping the Mutex.
kill_actions.await;
for action in kill_actions {
Self::kill_action(action).await
}
}
}
6 changes: 1 addition & 5 deletions cas/worker/tests/utils/mock_running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::sync::mpsc;
use action_messages::ActionResult;
use error::{make_input_err, Error};
use proto::com::github::allada::turbo_cache::remote_execution::StartExecute;
use running_actions_manager::{ActionId, RunningAction, RunningActionsManager};
use running_actions_manager::{RunningAction, RunningActionsManager};

#[derive(Debug)]
enum RunningActionManagerCalls {
Expand Down Expand Up @@ -102,10 +102,6 @@ impl RunningActionsManager for MockRunningActionsManager {
}
}

async fn get_action(&self, _action_id: &ActionId) -> Result<Arc<Self::RunningAction>, Error> {
unimplemented!("get_action not implemented");
}

async fn kill_all(&self) {
self.tx_kill_all.send(()).expect("Could not send request to mpsc");
}
Expand Down
2 changes: 1 addition & 1 deletion util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ rust_library(
srcs = ["async_fixed_buffer.rs"],
visibility = ["//visibility:public"],
deps = [
"@crate_index//:async-lock",
"@crate_index//:fixed-buffer",
"@crate_index//:futures",
"@crate_index//:parking_lot",
"@crate_index//:pin-project-lite",
"@crate_index//:tokio",
],
Expand Down
Loading

0 comments on commit 812b94d

Please sign in to comment.