Skip to content

Commit

Permalink
Future proof code from accidentially creating zombies
Browse files Browse the repository at this point in the history
Move to use scopeguard instead of drop_guard and move workers
so code changes make zombies significantly less likely.
  • Loading branch information
allada committed Sep 15, 2023
1 parent b730a90 commit 4bd986c
Show file tree
Hide file tree
Showing 14 changed files with 45 additions and 63 deletions.
47 changes: 12 additions & 35 deletions Cargo.Bazel.lock
@@ -1,5 +1,5 @@
{
"checksum": "7f4a98804faf2c78e55b85d2c5e5db10d44f6c9234bdf72c615910774c8bd192",
"checksum": "ab4d365f12cccf08d844992f1bece260d9d2fc782bbe6d68ed8b288ce9bfdf71",
"crates": {
"addr2line 0.20.0": {
"name": "addr2line",
Expand Down Expand Up @@ -2544,10 +2544,6 @@
"id": "clap 4.3.21",
"target": "clap"
},
{
"id": "drop_guard 0.3.0",
"target": "drop_guard"
},
{
"id": "env_logger 0.10.0",
"target": "env_logger"
Expand Down Expand Up @@ -2664,6 +2660,10 @@
"id": "rusoto_signature 0.48.0",
"target": "rusoto_signature"
},
{
"id": "scopeguard 1.2.0",
"target": "scopeguard"
},
{
"id": "serde 1.0.171",
"target": "serde"
Expand Down Expand Up @@ -2926,36 +2926,6 @@
},
"license": "MIT OR Apache-2.0"
},
"drop_guard 0.3.0": {
"name": "drop_guard",
"version": "0.3.0",
"repository": {
"Http": {
"url": "https://crates.io/api/v1/crates/drop_guard/0.3.0/download",
"sha256": "2c4a817d8b683f6e649aed359aab0c47a875377516bb5791d0f7e46d9066d209"
}
},
"targets": [
{
"Library": {
"crate_name": "drop_guard",
"crate_root": "src/lib.rs",
"srcs": [
"**/*.rs"
]
}
}
],
"library_target_name": "drop_guard",
"common_attrs": {
"compile_data_glob": [
"**"
],
"edition": "2018",
"version": "0.3.0"
},
"license": "MIT/Apache-2.0"
},
"dtoa 1.0.9": {
"name": "dtoa",
"version": "1.0.9",
Expand Down Expand Up @@ -9552,6 +9522,13 @@
"compile_data_glob": [
"**"
],
"crate_features": {
"common": [
"default",
"use_std"
],
"selects": {}
},
"edition": "2015",
"version": "1.2.0"
},
Expand Down
11 changes: 3 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -104,7 +104,6 @@ byteorder = "1.4.3"
bytes = "1.4.0"
clap = { version = "4.3.11", features = ["derive"] }
ctor = "0.2.3"
drop_guard = "0.3.0"
env_logger = "0.10.0"
filetime = "0.2.21"
fixed-buffer = "0.2.3"
Expand Down Expand Up @@ -134,6 +133,7 @@ rusoto_core = "0.48.0"
rusoto_mock = "=0.48.0"
rusoto_s3 = "0.48.0"
rusoto_signature = "0.48.0"
scopeguard = "1.2.0"
serde = "1.0.167"
sha2 = "0.10.7"
shellexpand = "3.1.0"
Expand Down
2 changes: 1 addition & 1 deletion cas/BUILD
Expand Up @@ -20,7 +20,7 @@ rust_binary(
"//util:metrics_utils",
"@crate_index//:async-lock",
"@crate_index//:clap",
"@crate_index//:drop_guard",
"@crate_index//:scopeguard",
"@crate_index//:env_logger",
"@crate_index//:futures",
"@crate_index//:hyper",
Expand Down
6 changes: 3 additions & 3 deletions cas/cas_main.rs
Expand Up @@ -21,11 +21,11 @@ use std::time::Duration;
use async_lock::Mutex as AsyncMutex;
use axum::Router;
use clap::Parser;
use drop_guard::guard;
use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt};
use hyper::server::conn::Http;
use hyper::{Body, Response};
use parking_lot::Mutex;
use scopeguard::guard;
use tokio::net::TcpListener;
use tokio::task::spawn_blocking;
use tonic::codec::CompressionEncoding;
Expand Down Expand Up @@ -378,13 +378,13 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
connected_clients_mux.inner.lock().insert(remote_addr);
// This is the safest way to guarantee that if our future
// is ever dropped we will cleanup our data.
let drop_guard = guard(connected_clients_mux.clone(), move |connected_clients_mux| {
let scope_guard = guard(connected_clients_mux.clone(), move |connected_clients_mux| {
connected_clients_mux.inner.lock().remove(&remote_addr);
});
let fut = Http::new().serve_connection(tcp_stream, svc.clone());
tokio::spawn(async move {
// Move it into our spawn, so if our spawn dies the cleanup happens.
let _drop_guard = drop_guard;
let _guard = scope_guard;
if let Err(e) = fut.await {
log::error!("Failed running service : {:?}", e);
}
Expand Down
2 changes: 1 addition & 1 deletion cas/scheduler/BUILD
Expand Up @@ -111,7 +111,7 @@ rust_library(
"//proto",
"//util:common",
"//util:error",
"@crate_index//:drop_guard",
"@crate_index//:scopeguard",
"@crate_index//:futures",
"@crate_index//:parking_lot",
"@crate_index//:tonic",
Expand Down
8 changes: 4 additions & 4 deletions cas/scheduler/cache_lookup_scheduler.rs
Expand Up @@ -17,8 +17,8 @@ use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use drop_guard::guard;
use futures::stream::StreamExt;
use scopeguard::guard;
use tokio::select;
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
Expand Down Expand Up @@ -158,14 +158,14 @@ impl ActionScheduler for CacheLookupScheduler {
});
let (tx, rx) = watch::channel(current_state.clone());
let tx = Arc::new(tx);
let drop_guard = {
let scope_guard = {
let mut cache_check_actions = self.cache_check_actions.lock();
// Check this isn't a duplicate request first.
if let Some(rx) = subscribe_to_existing_action(&cache_check_actions, &action_info.unique_qualifier) {
return Ok(rx);
}
cache_check_actions.insert(action_info.unique_qualifier.clone(), tx.clone());
// In the event we loose the reference to our `drop_guard`, it will remove
// In the event we loose the reference to our `scope_guard`, it will remove
// the action from the cache_check_actions map.
let cache_check_actions = self.cache_check_actions.clone();
let unique_qualifier = action_info.unique_qualifier.clone();
Expand All @@ -180,7 +180,7 @@ impl ActionScheduler for CacheLookupScheduler {
// We need this spawn because we are returning a stream and this spawn will populate the stream's data.
tokio::spawn(async move {
// If our spawn ever dies, we will remove the action from the cache_check_actions map.
let _drop_guard = drop_guard;
let _scope_guard = scope_guard;

// Perform cache check.
let action_digest = current_state.action_digest();
Expand Down
1 change: 1 addition & 0 deletions cas/worker/BUILD
Expand Up @@ -47,6 +47,7 @@ rust_library(
"@crate_index//:parking_lot",
"@crate_index//:prost",
"@crate_index//:relative-path",
"@crate_index//:scopeguard",
"@crate_index//:tokio",
"@crate_index//:tokio-stream",
"@crate_index//:tonic",
Expand Down
4 changes: 1 addition & 3 deletions cas/worker/local_worker.rs
Expand Up @@ -213,9 +213,7 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
let actions_in_transit = self.actions_in_transit.clone();
let start_action_fut = self.metrics.clone().wrap(move |metrics| async move {
metrics.preconditions.wrap(preconditions_met(precondition_script_cfg))
.and_then(|_| async move {
running_actions_manager.create_and_add_action(worker_id_clone, start_execute).await
})
.and_then(|_| running_actions_manager.create_and_add_action(worker_id_clone, start_execute))
.map(|r| {
// Now that we either failed or registered our action, we can
// consider the action to no longer be in transit.
Expand Down
16 changes: 13 additions & 3 deletions cas/worker/running_actions_manager.rs
Expand Up @@ -36,6 +36,7 @@ use metrics_utils::{AsyncCounterWrapper, CollectorState, CounterWithTime, Metric
use parking_lot::Mutex;
use prost::Message;
use relative_path::RelativePath;
use scopeguard::{guard, ScopeGuard};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::process;
use tokio::sync::{oneshot, watch};
Expand Down Expand Up @@ -655,6 +656,13 @@ impl RunningActionImpl {
.take()
.err_tip(|| "Expected stderr to exist on command this should never happen")?;

let mut child_process_guard = guard(child_process, |mut child_process| {
log::error!(
"Child process was not cleaned up before dropping the call to execute(), killing in background spawn."
);
tokio::spawn(async move { child_process.kill().await });
});

let all_stdout_fut = JoinHandleDropGuard::new(tokio::spawn(async move {
let mut all_stdout = BytesMut::new();
loop {
Expand Down Expand Up @@ -690,7 +698,7 @@ impl RunningActionImpl {
_ = &mut sleep_fut => {
self.running_actions_manager.metrics.task_timeouts.inc();
killed_action = true;
if let Err(e) = child_process.start_kill() {
if let Err(e) = child_process_guard.start_kill() {
log::error!("Could not kill process in RunningActionsManager for timeout : {:?}", e);
}
{
Expand All @@ -705,7 +713,9 @@ impl RunningActionImpl {
)));
}
},
maybe_exit_status = child_process.wait() => {
maybe_exit_status = child_process_guard.wait() => {
// Defuse our guard so it does not try to cleanup and make nessless logs.
drop(ScopeGuard::<_, _>::into_inner(child_process_guard));
let exit_status = maybe_exit_status.err_tip(|| "Failed to collect exit code of process")?;
// TODO(allada) We should implement stderr/stdout streaming to client here.
// If we get killed before the stream is started, then these will lock up.
Expand Down Expand Up @@ -746,7 +756,7 @@ impl RunningActionImpl {
},
_ = &mut kill_channel_rx => {
killed_action = true;
if let Err(e) = child_process.start_kill() {
if let Err(e) = child_process_guard.start_kill() {
log::error!(
"Could not kill process in RunningActionsManager for action {} : {:?}",
hex::encode(self.action_id),
Expand Down
2 changes: 1 addition & 1 deletion gencargo/cache_lookup_scheduler/Cargo.toml
Expand Up @@ -20,9 +20,9 @@ doctest = false

[dependencies]
async-trait = { workspace = true }
drop_guard = { workspace = true }
futures = { workspace = true }
parking_lot = { workspace = true }
scopeguard = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion gencargo/cas/Cargo.toml
Expand Up @@ -22,13 +22,13 @@ doctest = false
async-lock = { workspace = true }
axum = { workspace = true }
clap = { workspace = true }
drop_guard = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
hyper = { workspace = true }
json5 = { workspace = true }
parking_lot = { workspace = true }
prometheus-client = { workspace = true }
scopeguard = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions gencargo/running_actions_manager/Cargo.toml
Expand Up @@ -27,6 +27,7 @@ hex = { workspace = true }
parking_lot = { workspace = true }
prost = { workspace = true }
relative-path = { workspace = true }
scopeguard = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions tools/cargo_shared.bzl
Expand Up @@ -133,8 +133,8 @@ PACKAGES = {
"blake3": {
"version": "1.4.1",
},
"drop_guard": {
"version": "0.3.0",
"scopeguard": {
"version": "1.2.0",
},
"stdext": {
"version": "0.3.1",
Expand Down

0 comments on commit 4bd986c

Please sign in to comment.