Skip to content

Commit

Permalink
Fix potential race condition if worker disconnects
Browse files Browse the repository at this point in the history
In the event the worker disconnects from the scheduler, we now
wait until all actions are killed before we attempt to reconnect
to the scheduler. This prevents a race condition where the
scheduler does not know about an action the worker is doing
resulting in a potential of the scheduler sending the same
action to the worker that it has not cleaned up yet.

fixes #246
  • Loading branch information
allada committed Sep 11, 2023
1 parent c8e2ee8 commit b871a90
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 17 deletions.
51 changes: 46 additions & 5 deletions cas/worker/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::pin::Pin;
use std::process::Stdio;
use std::str;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;

Expand Down Expand Up @@ -42,6 +43,10 @@ use store::Store;
use worker_api_client_wrapper::{WorkerApiClientTrait, WorkerApiClientWrapper};
use worker_utils::make_supported_properties;

/// Amount of time to wait if we have actions in transit before we try to
/// consider an error to have occurred.
const ACTIONS_IN_TRANSIT_TIMEOUT_S: f32 = 10.;

/// If we loose connection to the worker api server we will wait this many seconds
/// before trying to connect.
const CONNECTION_RETRY_DELAY_S: f32 = 0.5;
Expand All @@ -60,6 +65,11 @@ struct LocalWorkerImpl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> {
grpc_client: T,
worker_id: String,
running_actions_manager: Arc<U>,
// Number of actions that have been received in `Update::StartAction`, but
// not yet processed by running_actions_manager's spawn. This number should
// always be zero if there are no actions running and no actions being waited
// on by the scheduler.
actions_in_transit: Arc<AtomicU64>,
metrics: Arc<Metrics>,
}

Expand Down Expand Up @@ -108,6 +118,11 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
grpc_client,
worker_id,
running_actions_manager,
// Number of actions that have been received in `Update::StartAction`, but
// not yet processed by running_actions_manager's spawn. This number should
// always be zero if there are no actions running and no actions being waited
// on by the scheduler.
actions_in_transit: Arc::new(AtomicU64::new(0)),
metrics,
}
}
Expand Down Expand Up @@ -194,11 +209,18 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
let running_actions_manager = self.running_actions_manager.clone();
let worker_id_clone = worker_id.clone();
let precondition_script_cfg = self.config.precondition_script.clone();
let actions_in_transit = self.actions_in_transit.clone();
let start_action_fut = self.metrics.clone().wrap(move |metrics| async move {
metrics.preconditions.wrap(preconditions_met(precondition_script_cfg))
.and_then(|_| async move {
running_actions_manager.create_and_add_action(worker_id_clone, start_execute).await
})
.map(|r| {
// Now that we either failed or registered our action, we can
// consider the action to no longer be in transit.
actions_in_transit.fetch_sub(1, Ordering::Release);
r
})
.and_then(|action|
action
.clone()
Expand Down Expand Up @@ -254,24 +276,25 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
Ok(())
};

let mapped_fut = tokio::spawn(start_action_fut)
.map(move |res| {
self.actions_in_transit.fetch_add(1, Ordering::Release);
futures.push(
tokio::spawn(start_action_fut).map(move |res| {
let res = res.err_tip(|| "Failed to launch spawn")?;
add_future_channel
.send(make_publish_future(res).boxed())
.map_err(|_| make_err!(Code::Internal, "LocalWorker could not send future"))?;
Ok(())
})
.boxed();
futures.push(mapped_fut);
.boxed()
);
}
};
},
res = add_future_rx.next() => {
let fut = res.err_tip(|| "New future stream receives should never be closed")?;
futures.push(fut);
},
res = futures.next() => res.err_tip(|| "Keep-alive should always pending. This is an internal error")??,
res = futures.next() => res.err_tip(|| "Keep-alive should always pending. Likely unable to send data to scheduler")??,
};
}
// Unreachable.
Expand Down Expand Up @@ -450,6 +473,24 @@ impl<T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorker<T, U> {

// Now listen for connections and run all other services.
if let Err(e) = inner.run(update_for_worker_stream).await {
'no_more_actions: {
// Ensure there are no actions in transit before we try to kill
// all our actions.
const ITERATIONS: usize = 1_000;
let sleep_duration = ACTIONS_IN_TRANSIT_TIMEOUT_S / ITERATIONS as f32;
for _ in 0..ITERATIONS {
if inner.actions_in_transit.load(Ordering::Acquire) == 0 {
break 'no_more_actions;
}
(sleep_fn_pin)(Duration::from_secs_f32(sleep_duration)).await;
}
let e = make_err!(
Code::Internal,
"Actions in transit did not reach zero before we disconnected from the scheduler."
);
log::error!("{e:?}");
return Err(e);
}
// Kill off any existing actions because if we re-connect, we'll
// get some more and it might resource lock us.
self.running_actions_manager.kill_all().await;
Expand Down
59 changes: 49 additions & 10 deletions cas/worker/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use prost::Message;
use relative_path::RelativePath;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::process;
use tokio::sync::oneshot;
use tokio::sync::{oneshot, watch};
use tokio::task::spawn_blocking;
use tokio::time::timeout;
use tokio_stream::wrappers::ReadDirStream;
Expand Down Expand Up @@ -602,14 +602,22 @@ impl RunningActionImpl {
#[cfg(target_family = "windows")]
let envs = {
let mut envs = command_proto.environment_variables.clone();
if envs.iter().any(|v| v.name == "SystemRoot") {
if !envs.iter().any(|v| v.name.to_uppercase() == "SYSTEMROOT") {
envs.push(
proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
name: "SystemRoot".to_string(),
value: "C:\\Windows".to_string(),
},
);
}
if !envs.iter().any(|v| v.name.to_uppercase() == "PATH") {
envs.push(
proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
name: "PATH".to_string(),
value: "C:\\Windows\\System32".to_string(),
},
);
}
envs
};
for environment_variable in envs {
Expand Down Expand Up @@ -720,9 +728,15 @@ impl RunningActionImpl {
_ = &mut kill_channel_rx => {
killed_action = true;
if let Err(e) = child_process.start_kill() {
log::error!("Could not kill process in RunningActionsManager : {:?}", e);
log::error!(
"Could not kill process in RunningActionsManager for action {} : {:?}",
hex::encode(self.action_id),
e);
} else {
log::error!("Could not get child process id, maybe already dead?");
log::error!(
"Could not get child process id, maybe already dead? for action {}",
hex::encode(self.action_id)
);
}
{
let mut state = self.state.lock();
Expand Down Expand Up @@ -944,9 +958,17 @@ impl RunningActionImpl {
.err_tip(|| format!("Could not remove working directory {}", self.work_directory));
self.did_cleanup.store(true, Ordering::Relaxed);
if let Err(e) = self.running_actions_manager.cleanup_action(&self.action_id) {
log::error!("Error cleaning up action: {e:?}");
return Result::<Arc<Self>, Error>::Err(e).merge(remove_dir_result.map(|_| self));
}
remove_dir_result.map(|_| self)
if let Err(e) = remove_dir_result {
log::error!(
"Error removing working for action {} directory: {e:?}",
hex::encode(self.action_id)
);
return Err(e);
}
Ok(self)
}

async fn inner_get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error> {
Expand Down Expand Up @@ -1043,6 +1065,9 @@ pub struct RunningActionsManagerImpl {
upload_strategy: UploadCacheResultsStrategy,
max_action_timeout: Duration,
running_actions: Mutex<HashMap<ActionId, Weak<RunningActionImpl>>>,
// Note: We don't use Notify because we need to support a .wait_for()-like function, which
// Notify does not support.
action_done_tx: watch::Sender<()>,
callbacks: Callbacks,
metrics: Arc<Metrics>,
}
Expand All @@ -1065,6 +1090,7 @@ impl RunningActionsManagerImpl {
.downcast_ref::<Arc<FilesystemStore>>()
.err_tip(|| "Expected FilesystemStore store for .fast_store() in RunningActionsManagerImpl")?
.clone();
let (action_done_tx, _) = watch::channel(());
Ok(Self {
root_work_directory,
entrypoint_cmd,
Expand All @@ -1074,6 +1100,7 @@ impl RunningActionsManagerImpl {
upload_strategy,
max_action_timeout,
running_actions: Mutex::new(HashMap::new()),
action_done_tx,
callbacks,
metrics: Arc::new(Metrics::default()),
})
Expand Down Expand Up @@ -1143,21 +1170,24 @@ impl RunningActionsManagerImpl {

fn cleanup_action(&self, action_id: &ActionId) -> Result<(), Error> {
let mut running_actions = self.running_actions.lock();
running_actions
let result = running_actions
.remove(action_id)
.err_tip(|| format!("Expected action id '{action_id:?}' to exist in RunningActionsManagerImpl"))?;
Ok(())
.err_tip(|| format!("Expected action id '{action_id:?}' to exist in RunningActionsManagerImpl"));
// No need to copy anything, we just are telling the receivers an event happened.
self.action_done_tx.send_modify(|_| {});
result.map(|_| ())
}

// Note: We do not capture metrics on this call, only `.kill_all()`.
// Important: When the future returns the process may still be running.
async fn kill_action(action: Arc<RunningActionImpl>) {
let kill_channel_tx = {
let mut action_state = action.state.lock();
action_state.kill_channel_tx.take()
};
if let Some(kill_channel_tx) = kill_channel_tx {
if kill_channel_tx.send(()).is_err() {
log::error!("Error sending kill to running action");
log::error!("Error sending kill to running action {}", hex::encode(action.action_id));
}
}
}
Expand Down Expand Up @@ -1278,6 +1308,7 @@ impl RunningActionsManager for RunningActionsManagerImpl {
.await
}

// Note: When the future returns the process should be fully killed and cleaned up.
async fn kill_all(&self) {
self.metrics
.kill_all
Expand All @@ -1293,7 +1324,15 @@ impl RunningActionsManager for RunningActionsManagerImpl {
Self::kill_action(action).await;
}
})
.await
.await;
// Ignore error. If error happens it means there's no sender, which is not a problem.
// Note: Sanity check this API will always check current value then future values:
// https://play.rust-lang.org/?version=stable&edition=2021&gist=23103652cc1276a97e5f9938da87fdb2
let _ = self
.action_done_tx
.subscribe()
.wait_for(|_| self.running_actions.lock().is_empty())
.await;
}

#[inline]
Expand Down
Loading

0 comments on commit b871a90

Please sign in to comment.