Skip to content

Commit

Permalink
Scheduler will retry on internal errors
Browse files Browse the repository at this point in the history
Internal errors from the worker (not exit code errors) are now
properly forwarded up to the client, but after some retries by the
scheduler.
  • Loading branch information
allada committed Jun 21, 2022
1 parent 7325ffb commit 2be02e2
Show file tree
Hide file tree
Showing 10 changed files with 377 additions and 149 deletions.
116 changes: 57 additions & 59 deletions cas/grpc_service/tests/worker_api_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use proto::build::bazel::remote::execution::v2::{
OutputSymlink,
};
use proto::com::github::allada::turbo_cache::remote_execution::{
execute_result, update_for_worker, worker_api_server::WorkerApi, ExecuteFinishedResult, ExecuteResult,
KeepAliveRequest, SupportedProperties,
execute_result, update_for_worker, worker_api_server::WorkerApi, ExecuteResult, KeepAliveRequest,
SupportedProperties,
};
use proto::google::rpc::Status as ProtoStatus;
use scheduler::Scheduler;
Expand Down Expand Up @@ -287,62 +287,60 @@ pub mod execution_response_tests {
},
);
let result = ExecuteResult {
response: Some(execute_result::Response::Result(ExecuteFinishedResult {
worker_id: test_context.worker_id.to_string(),
action_digest: Some(action_digest.clone().into()),
salt: SALT,
execute_response: Some(ExecuteResponse {
result: Some(ProtoActionResult {
output_files: vec![OutputFile {
path: "some path1".to_string(),
digest: Some(DigestInfo::new([8u8; 32], 124).into()),
is_executable: true,
contents: Default::default(), // We don't implement this.
node_properties: None,
}],
output_file_symlinks: vec![OutputSymlink {
path: "some path3".to_string(),
target: "some target3".to_string(),
node_properties: None,
}],
output_symlinks: vec![OutputSymlink {
path: "some path3".to_string(),
target: "some target3".to_string(),
node_properties: None,
}],
output_directories: vec![OutputDirectory {
path: "some path4".to_string(),
tree_digest: Some(DigestInfo::new([12u8; 32], 124).into()),
}],
output_directory_symlinks: Default::default(), // Bazel deprecated this.
exit_code: 5,
stdout_raw: Default::default(), // We don't implement this.
stdout_digest: Some(DigestInfo::new([10u8; 32], 124).into()),
stderr_raw: Default::default(), // We don't implement this.
stderr_digest: Some(DigestInfo::new([11u8; 32], 124).into()),
execution_metadata: Some(ExecutedActionMetadata {
worker: test_context.worker_id.to_string(),
queued_timestamp: Some(make_system_time(1).into()),
worker_start_timestamp: Some(make_system_time(2).into()),
worker_completed_timestamp: Some(make_system_time(3).into()),
input_fetch_start_timestamp: Some(make_system_time(4).into()),
input_fetch_completed_timestamp: Some(make_system_time(5).into()),
execution_start_timestamp: Some(make_system_time(6).into()),
execution_completed_timestamp: Some(make_system_time(7).into()),
output_upload_start_timestamp: Some(make_system_time(8).into()),
output_upload_completed_timestamp: Some(make_system_time(9).into()),
auxiliary_metadata: vec![],
}),
}),
cached_result: false,
status: Some(ProtoStatus {
code: 9,
message: "foo".to_string(),
details: Default::default(),
worker_id: test_context.worker_id.to_string(),
action_digest: Some(action_digest.clone().into()),
salt: SALT,
result: Some(execute_result::Result::ExecuteResponse(ExecuteResponse {
result: Some(ProtoActionResult {
output_files: vec![OutputFile {
path: "some path1".to_string(),
digest: Some(DigestInfo::new([8u8; 32], 124).into()),
is_executable: true,
contents: Default::default(), // We don't implement this.
node_properties: None,
}],
output_file_symlinks: vec![OutputSymlink {
path: "some path3".to_string(),
target: "some target3".to_string(),
node_properties: None,
}],
output_symlinks: vec![OutputSymlink {
path: "some path3".to_string(),
target: "some target3".to_string(),
node_properties: None,
}],
output_directories: vec![OutputDirectory {
path: "some path4".to_string(),
tree_digest: Some(DigestInfo::new([12u8; 32], 124).into()),
}],
output_directory_symlinks: Default::default(), // Bazel deprecated this.
exit_code: 5,
stdout_raw: Default::default(), // We don't implement this.
stdout_digest: Some(DigestInfo::new([10u8; 32], 124).into()),
stderr_raw: Default::default(), // We don't implement this.
stderr_digest: Some(DigestInfo::new([11u8; 32], 124).into()),
execution_metadata: Some(ExecutedActionMetadata {
worker: test_context.worker_id.to_string(),
queued_timestamp: Some(make_system_time(1).into()),
worker_start_timestamp: Some(make_system_time(2).into()),
worker_completed_timestamp: Some(make_system_time(3).into()),
input_fetch_start_timestamp: Some(make_system_time(4).into()),
input_fetch_completed_timestamp: Some(make_system_time(5).into()),
execution_start_timestamp: Some(make_system_time(6).into()),
execution_completed_timestamp: Some(make_system_time(7).into()),
output_upload_start_timestamp: Some(make_system_time(8).into()),
output_upload_completed_timestamp: Some(make_system_time(9).into()),
auxiliary_metadata: vec![],
}),
server_logs: server_logs,
message: "TODO(blaise.bruer) We should put a reference something like bb_browser".to_string(),
}),
cached_result: false,
status: Some(ProtoStatus {
code: 9,
message: "foo".to_string(),
details: Default::default(),
}),
server_logs: server_logs,
message: "TODO(blaise.bruer) We should put a reference something like bb_browser".to_string(),
})),
};
{
Expand All @@ -362,10 +360,10 @@ pub mod execution_response_tests {
// Check the result that the client would have received.
client_action_state_receiver.changed().await?;
let client_given_state = client_action_state_receiver.borrow();
let execute_response = if let execute_result::Response::Result(v) = result.response.unwrap() {
v.execute_response.unwrap()
let execute_response = if let execute_result::Result::ExecuteResponse(v) = result.result.unwrap() {
v
} else {
panic!("Expected type to be Result");
panic!("Expected type to be ExecuteResponse");
};

assert_eq!(client_given_state.stage, execute_response.clone().try_into()?);
Expand Down
42 changes: 23 additions & 19 deletions cas/grpc_service/worker_api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,31 +134,35 @@ impl WorkerApiServer {
}

async fn inner_execution_response(&self, execute_result: ExecuteResult) -> Result<Response<()>, Error> {
let finished_result = match execute_result
.response
.err_tip(|| "Expected result to exist in ExecuteResult")?
{
execute_result::Response::Result(finished_result) => finished_result,
execute_result::Response::InternalError(e) => return Err(e.into()),
};
let worker_id: WorkerId = finished_result.worker_id.try_into()?;
let action_digest: DigestInfo = finished_result
let worker_id: WorkerId = execute_result.worker_id.try_into()?;
let action_digest: DigestInfo = execute_result
.action_digest
.err_tip(|| "Expected action_digest to exist")?
.try_into()?;
let action_info_hash_key = ActionInfoHashKey {
digest: action_digest.clone(),
salt: finished_result.salt,
salt: execute_result.salt,
};
let action_stage = finished_result
.execute_response
.err_tip(|| "Expected execute_response to exist in ExecuteResult")?
.try_into()
.err_tip(|| "Failed to convert ExecuteResponse into an ActionStage")?;
self.scheduler
.update_action(&worker_id, &action_info_hash_key, action_stage)
.await
.err_tip(|| format!("Failed to update_action {:?}", action_digest))?;

match execute_result
.result
.err_tip(|| "Expected result to exist in ExecuteResult")?
{
execute_result::Result::ExecuteResponse(finished_result) => {
let action_stage = finished_result
.try_into()
.err_tip(|| "Failed to convert ExecuteResponse into an ActionStage")?;
self.scheduler
.update_action(&worker_id, &action_info_hash_key, action_stage)
.await
.err_tip(|| format!("Failed to update_action {:?}", action_digest))?;
}
execute_result::Result::InternalError(e) => {
self.scheduler
.update_worker_with_internal_error(&worker_id, &action_info_hash_key, e.into())
.await;
}
}
Ok(Response::new(()))
}
}
Expand Down
121 changes: 114 additions & 7 deletions cas/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,42 @@
use std::cmp;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::SystemTime;

use fast_async_mutex::mutex::Mutex;
use lru::LruCache;
use rand::{thread_rng, Rng};
use tokio::sync::watch;

use action_messages::{ActionInfo, ActionInfoHashKey, ActionStage, ActionState};
use common::log;
use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata};
use common::{log, DigestInfo};
use config::cas_server::SchedulerConfig;
use error::{error_if, make_input_err, Error, ResultExt};
use error::{error_if, make_err, make_input_err, Code, Error, ResultExt};
use platform_property_manager::PlatformPropertyManager;
use worker::{Worker, WorkerId, WorkerTimestamp, WorkerUpdate};

/// Default timeout for workers in seconds.
/// If this changes, remember to change the documentation in the config.
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;

/// Default times a job can retry before failing.
/// If this changes, remember to change the documentation in the config.
const DEFAULT_MAX_JOB_RETRIES: usize = 3;

/// Exit code sent if there is an internal error.
pub const INTERNAL_ERROR_EXIT_CODE: i32 = -178;

/// An action that is being awaited on and last known state.
struct AwaitedAction {
action_info: Arc<ActionInfo>,
current_state: Arc<ActionState>,
notify_channel: watch::Sender<Arc<ActionState>>,

/// Number of attempts the job has been tried.
attempts: usize,
/// Possible last error set by the worker. If empty and attempts is set, it may be due to
/// something like a worker timeout.
last_error: Option<Error>,
}

/// Holds the relationship of a worker that is executing a specific action.
Expand Down Expand Up @@ -125,6 +140,8 @@ struct SchedulerImpl {
active_actions: HashMap<Arc<ActionInfo>, RunningAction>,
/// Timeout of how long to evict workers if no response in this given amount of time in seconds.
worker_timeout_s: u64,
/// Default times a job can retry before failing.
max_job_retries: usize,
}

impl SchedulerImpl {
Expand Down Expand Up @@ -195,6 +212,8 @@ impl SchedulerImpl {
action_info,
current_state,
notify_channel: tx,
attempts: 0,
last_error: None,
},
);

Expand All @@ -212,10 +231,48 @@ impl SchedulerImpl {
match self.active_actions.remove(&action_info) {
Some(running_action) => {
let mut awaited_action = running_action.action;
Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Queued;
let send_result = awaited_action.notify_channel.send(awaited_action.current_state.clone());
self.queued_actions_set.insert(action_info.clone());
self.queued_actions.insert(action_info.clone(), awaited_action);
let send_result = if awaited_action.attempts >= self.max_job_retries {
Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Error((
awaited_action.last_error.unwrap_or_else(|| {
make_err!(
Code::Internal,
"Job cancelled because it attempted to execute too many times and failed"
)
}),
ActionResult {
output_files: Default::default(),
output_folders: Default::default(),
output_directory_symlinks: Default::default(),
output_file_symlinks: Default::default(),
exit_code: INTERNAL_ERROR_EXIT_CODE,
stdout_digest: DigestInfo::empty_digest(),
stderr_digest: DigestInfo::empty_digest(),
execution_metadata: ExecutionMetadata {
worker: format!("{}", worker_id),
queued_timestamp: SystemTime::UNIX_EPOCH,
worker_start_timestamp: SystemTime::UNIX_EPOCH,
worker_completed_timestamp: SystemTime::UNIX_EPOCH,
input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
execution_start_timestamp: SystemTime::UNIX_EPOCH,
execution_completed_timestamp: SystemTime::UNIX_EPOCH,
output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
},
server_logs: Default::default(),
},
));
awaited_action.notify_channel.send(awaited_action.current_state.clone())
// Do not put the action back in the queue here, as this action attempted to run too many
// times.
} else {
Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Queued;
let send_result = awaited_action.notify_channel.send(awaited_action.current_state.clone());
self.queued_actions_set.insert(action_info.clone());
self.queued_actions.insert(action_info.clone(), awaited_action);
send_result
};

if send_result.is_err() {
// Don't remove this task, instead we keep them around for a bit just in case
// the client disconnected and will reconnect and ask for same job to be executed
Expand Down Expand Up @@ -290,6 +347,7 @@ impl SchedulerImpl {
awaited_action.action_info.digest().str()
);
}
awaited_action.attempts += 1;
self.active_actions.insert(
action_info.clone(),
RunningAction {
Expand All @@ -303,6 +361,37 @@ impl SchedulerImpl {
should_run_again
}

async fn update_worker_with_internal_error(
&mut self,
worker_id: &WorkerId,
action_info_hash_key: &ActionInfoHashKey,
err: Error,
) {
let (action_info, mut running_action) = match self.active_actions.remove_entry(action_info_hash_key) {
Some((action_info, running_action)) => (action_info, running_action),
None => {
log::error!(
"Could not find action info in active actions : {:?}",
action_info_hash_key
);
return;
}
};

if running_action.worker_id != *worker_id {
log::error!(
"Got a result from a worker that should not be running the action, Removing worker. Expected worker {} got worker {}",
running_action.worker_id, worker_id
);
}
running_action.action.last_error = Some(err);

// Now put it back. immediate_evict_worker() needs it to be there to send errors properly.
self.active_actions.insert(action_info, running_action);

self.immediate_evict_worker(worker_id);
}

async fn update_action(
&mut self,
worker_id: &WorkerId,
Expand Down Expand Up @@ -398,13 +487,19 @@ impl Scheduler {
worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
}

let mut max_job_retries = scheduler_cfg.max_job_retries;
if max_job_retries == 0 {
max_job_retries = DEFAULT_MAX_JOB_RETRIES;
}

Self {
inner: Mutex::new(SchedulerImpl {
queued_actions_set: HashSet::new(),
queued_actions: BTreeMap::new(),
workers: Workers::new(),
active_actions: HashMap::new(),
worker_timeout_s,
max_job_retries,
}),
platform_property_manager,
}
Expand Down Expand Up @@ -436,6 +531,18 @@ impl Scheduler {
inner.add_action(action_info)
}

pub async fn update_worker_with_internal_error(
&self,
worker_id: &WorkerId,
action_info_hash_key: &ActionInfoHashKey,
err: Error,
) {
let mut inner = self.inner.lock().await;
inner
.update_worker_with_internal_error(worker_id, action_info_hash_key, err)
.await
}

/// Adds an action to the scheduler for remote execution.
pub async fn update_action(
&self,
Expand Down
Loading

0 comments on commit 2be02e2

Please sign in to comment.