Skip to content

Commit

Permalink
Jobs now honor timeout in action
Browse files Browse the repository at this point in the history
fixes: #211
  • Loading branch information
allada committed Jul 26, 2023
1 parent 875b3ca commit fdc6d9b
Show file tree
Hide file tree
Showing 12 changed files with 639 additions and 162 deletions.
73 changes: 42 additions & 31 deletions cas/scheduler/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,23 @@ pub struct ExecutionMetadata {
pub output_upload_completed_timestamp: SystemTime,
}

impl Default for ExecutionMetadata {
fn default() -> Self {
Self {
worker: "".to_string(),
queued_timestamp: SystemTime::UNIX_EPOCH,
worker_start_timestamp: SystemTime::UNIX_EPOCH,
worker_completed_timestamp: SystemTime::UNIX_EPOCH,
input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
execution_start_timestamp: SystemTime::UNIX_EPOCH,
execution_completed_timestamp: SystemTime::UNIX_EPOCH,
output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
}
}
}

impl From<ExecutionMetadata> for ExecutedActionMetadata {
fn from(val: ExecutionMetadata) -> Self {
Self {
Expand Down Expand Up @@ -558,6 +575,7 @@ pub struct ActionResult {
pub stderr_digest: DigestInfo,
pub execution_metadata: ExecutionMetadata,
pub server_logs: HashMap<String, DigestInfo>,
pub error: Option<Error>,
}

impl Default for ActionResult {
Expand All @@ -583,12 +601,16 @@ impl Default for ActionResult {
output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
},
server_logs: Default::default(),
error: None,
}
}
}

// TODO(allada) Remove the need for clippy argument by making the ActionResult and ProtoActionResult
// a Box.
/// The execution status/stage. This should match `ExecutionStage::Value` in `remote_execution.proto`.
#[derive(PartialEq, Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum ActionStage {
/// Stage is unknown.
Unknown,
Expand All @@ -604,19 +626,13 @@ pub enum ActionStage {
Completed(ActionResult),
/// Result was found from cache, don't decode the proto just to re-encode it.
CompletedFromCache(ProtoActionResult),
/// Error or action failed with an exit code on the worker.
/// This means that the job might have finished executing, but the worker had an
/// internal error. This might have happened if the worker timed out, crashed,
/// action cleanup failure, out of memory or other kind of errors that are not
/// related to the action, but rather the environment.
Error((Error, ActionResult)),
}

impl ActionStage {
pub const fn has_action_result(&self) -> bool {
match self {
Self::Unknown | Self::CacheCheck | Self::Queued | Self::Executing => false,
Self::Completed(_) | Self::CompletedFromCache(_) | Self::Error(_) => true,
Self::Completed(_) | Self::CompletedFromCache(_) => true,
}
}

Expand All @@ -635,7 +651,7 @@ impl From<&ActionStage> for execution_stage::Value {
ActionStage::CacheCheck => Self::CacheCheck,
ActionStage::Queued => Self::Queued,
ActionStage::Executing => Self::Executing,
ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) | ActionStage::Error(_) => Self::Completed,
ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => Self::Completed,
}
}
}
Expand Down Expand Up @@ -664,21 +680,16 @@ impl From<ActionStage> for ExecuteResponse {
ActionStage::Unknown | ActionStage::CacheCheck | ActionStage::Queued | ActionStage::Executing => {
Self::default()
}

ActionStage::Completed(action_result) => Self {
server_logs: logs_from(action_result.server_logs.clone()),
result: Some(action_result.into()),
cached_result: false,
status: Some(Status::default()),
message: RESPONSE_MESSAGE.to_string(),
},
ActionStage::Error((error, action_result)) => Self {
server_logs: logs_from(action_result.server_logs.clone()),
result: Some(action_result.into()),
cached_result: false,
status: Some(error.into()),
message: RESPONSE_MESSAGE.to_string(),
},
ActionStage::Completed(action_result) => {
let status = Some(action_result.error.clone().map_or_else(Status::default, |v| v.into()));
Self {
server_logs: logs_from(action_result.server_logs.clone()),
result: Some(action_result.into()),
cached_result: false,
status,
message: RESPONSE_MESSAGE.to_string(),
}
}
// Handled separately as there are no server logs and the action
// result is already in Proto format.
ActionStage::CompletedFromCache(proto_action_result) => Self {
Expand Down Expand Up @@ -748,15 +759,12 @@ impl TryFrom<ExecuteResponse> for ActionStage {
.err_tip(|| "Expected digest to be set on LogFile msg")?
.try_into()
})?,
error: execute_response
.status
.clone()
.and_then(|v| if v.code == 0 { None } else { Some(v.into()) }),
};

let status = execute_response
.status
.err_tip(|| "Expected status to be set on ExecuteResponse")?;
if status.code != tonic::Code::Ok as i32 {
return Ok(Self::Error((status.into(), action_result)));
}

if execute_response.cached_result {
return Ok(Self::CompletedFromCache(action_result.into()));
}
Expand Down Expand Up @@ -826,7 +834,10 @@ impl TryFrom<Operation> for ActionState {
.result
.err_tip(|| "No result data for completed upstream action")?;
match execute_response {
LongRunningResult::Error(error) => ActionStage::Error((error.into(), ActionResult::default())),
LongRunningResult::Error(error) => ActionStage::Completed(ActionResult {
error: Some(error.into()),
..ActionResult::default()
}),
LongRunningResult::Response(response) => {
// Could be Completed, CompletedFromCache or Error.
from_any::<ExecuteResponse>(&response)
Expand Down
5 changes: 4 additions & 1 deletion cas/scheduler/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ impl ActionScheduler for CacheLookupScheduler {
}
}
Err(err) => {
Arc::make_mut(&mut current_state).stage = ActionStage::Error((err, ActionResult::default()));
Arc::make_mut(&mut current_state).stage = ActionStage::Completed(ActionResult {
error: Some(err),
..Default::default()
});
let _ = tx.send(current_state);
}
}
Expand Down
80 changes: 43 additions & 37 deletions cas/scheduler/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::sync::{watch, Notify};
use tokio::task::JoinHandle;
use tokio::time::Duration;

use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState};
use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata};
use common::log;
use error::{error_if, make_err, make_input_err, Code, Error, ResultExt};
use platform_property_manager::PlatformPropertyManager;
Expand Down Expand Up @@ -308,22 +308,22 @@ impl SimpleSchedulerImpl {
.map(Self::subscribe_to_channel)
}

fn retry_action(&mut self, action_info: &Arc<ActionInfo>, worker_id: &WorkerId) {
fn retry_action(&mut self, action_info: &Arc<ActionInfo>, worker_id: &WorkerId, err: Error) {
match self.active_actions.remove(action_info) {
Some(running_action) => {
let mut awaited_action = running_action.action;
let send_result = if awaited_action.attempts >= self.max_job_retries {
let mut default_action_result = ActionResult::default();
default_action_result.execution_metadata.worker = format!("{worker_id}");
Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Error((
awaited_action.last_error.unwrap_or_else(|| {
make_err!(
Code::Internal,
"Job cancelled because it attempted to execute too many times and failed"
)
}),
default_action_result,
));
Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Completed(ActionResult {
execution_metadata: ExecutionMetadata {
worker: format!("{worker_id}"),
..ExecutionMetadata::default()
},
error: Some(err.merge(make_err!(
Code::Internal,
"Job cancelled because it attempted to execute too many times and failed"
))),
..ActionResult::default()
});
awaited_action.notify_channel.send(awaited_action.current_state.clone())
// Do not put the action back in the queue here, as this action attempted to run too many
// times.
Expand Down Expand Up @@ -352,14 +352,14 @@ impl SimpleSchedulerImpl {
}

/// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.
fn immediate_evict_worker(&mut self, worker_id: &WorkerId) {
fn immediate_evict_worker(&mut self, worker_id: &WorkerId, err: Error) {
if let Some(mut worker) = self.workers.remove_worker(worker_id) {
// We don't care if we fail to send message to worker, this is only a best attempt.
let _ = worker.notify_update(WorkerUpdate::Disconnect);
// We create a temporary Vec to avoid doubt about a possible code
// path touching the worker.running_action_infos elsewhere.
for action_info in worker.running_action_infos.drain() {
self.retry_action(&action_info, worker_id);
self.retry_action(&action_info, worker_id, err.clone());
}
}
// Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
Expand Down Expand Up @@ -396,8 +396,9 @@ impl SimpleSchedulerImpl {
let notify_worker_result = worker.notify_update(WorkerUpdate::RunAction(action_info.clone()));
if notify_worker_result.is_err() {
// Remove worker, as it is no longer receiving messages and let it try to find another worker.
log::warn!("Worker command failed, removing worker {}", worker_id);
self.immediate_evict_worker(&worker_id);
let err = make_err!(Code::Internal, "Worker command failed, removing worker {}", worker_id);
log::warn!("{:?}", err);
self.immediate_evict_worker(&worker_id, err);
return;
}

Expand Down Expand Up @@ -449,7 +450,7 @@ impl SimpleSchedulerImpl {
if running_action.worker_id == *worker_id {
// Don't set the error on an action that's running somewhere else.
log::warn!("Internal error for worker {}: {}", worker_id, err);
running_action.action.last_error = Some(err);
running_action.action.last_error = Some(err.clone());
} else {
log::error!(
"Got a result from a worker that should not be running the action, Removing worker. Expected worker {} got worker {}",
Expand All @@ -473,7 +474,7 @@ impl SimpleSchedulerImpl {
}

// Re-queue the action or fail on max attempts.
self.retry_action(&action_info, worker_id);
self.retry_action(&action_info, worker_id, err);
}

fn update_action(
Expand All @@ -483,10 +484,13 @@ impl SimpleSchedulerImpl {
action_stage: ActionStage,
) -> Result<(), Error> {
if !action_stage.has_action_result() {
let msg = format!("Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker.");
log::error!("{}", msg);
self.immediate_evict_worker(worker_id);
return Err(make_input_err!("{}", msg));
let err = make_err!(
Code::Internal,
"Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker.",
);
log::error!("{:?}", err);
self.immediate_evict_worker(worker_id, err.clone());
return Err(err);
}

let (action_info, mut running_action) = self
Expand All @@ -495,18 +499,16 @@ impl SimpleSchedulerImpl {
.err_tip(|| format!("Could not find action info in active actions : {action_info_hash_key:?}"))?;

if running_action.worker_id != *worker_id {
let msg = format!(
"Got a result from a worker that should not be running the action, {}",
format_args!(
"Removing worker. Expected worker {} got worker {}",
running_action.worker_id, worker_id
)
let err = make_err!(
Code::Internal,
"Got a result from a worker that should not be running the action, Removing worker. Expected worker {} got worker {worker_id}",
running_action.worker_id,
);
log::error!("{}", msg);
log::error!("{:?}", err);
// First put it back in our active_actions or we will drop the task.
self.active_actions.insert(action_info, running_action);
self.immediate_evict_worker(worker_id);
return Err(make_input_err!("{}", msg));
self.immediate_evict_worker(worker_id, err.clone());
return Err(err);
}

Arc::make_mut(&mut running_action.action.current_state).stage = action_stage;
Expand Down Expand Up @@ -694,8 +696,8 @@ impl WorkerScheduler for SimpleScheduler {
.workers
.add_worker(worker)
.err_tip(|| "Error while adding worker, removing from pool");
if res.is_err() {
inner.immediate_evict_worker(&worker_id);
if let Err(err) = &res {
inner.immediate_evict_worker(&worker_id, err.clone());
}
inner.tasks_or_workers_change_notify.notify_one();
res
Expand Down Expand Up @@ -731,7 +733,10 @@ impl WorkerScheduler for SimpleScheduler {

async fn remove_worker(&self, worker_id: WorkerId) {
let mut inner = self.inner.lock();
inner.immediate_evict_worker(&worker_id);
inner.immediate_evict_worker(
&worker_id,
make_err!(Code::Internal, "Received request to remove worker"),
);
}

async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
Expand All @@ -752,8 +757,9 @@ impl WorkerScheduler for SimpleScheduler {
})
.collect();
for worker_id in &worker_ids_to_remove {
log::warn!("Worker {} timed out, removing from pool", worker_id);
inner.immediate_evict_worker(worker_id);
let err = make_err!(Code::Internal, "Worker {worker_id} timed out, removing from pool",);
log::warn!("{:?}", err);
inner.immediate_evict_worker(worker_id, err);
}
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions cas/scheduler/tests/action_messages_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ mod action_messages_tests {
output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
},
server_logs: HashMap::default(),
error: None,
})
.into();

Expand Down
Loading

0 comments on commit fdc6d9b

Please sign in to comment.