From fc97fcb1f85131997a9db7068134973116486f6a Mon Sep 17 00:00:00 2001 From: Chris Staite Date: Thu, 15 Jun 2023 10:53:48 +0000 Subject: [PATCH] Add support for backpressure from workers. --- cas/scheduler/simple_scheduler.rs | 38 ++++--- cas/scheduler/worker.rs | 11 +- cas/worker/local_worker.rs | 45 +++++++- cas/worker/tests/local_worker_test.rs | 103 +++++++++++++++++- .../tests/utils/local_worker_test_utils.rs | 24 ++-- config/cas_server.rs | 10 ++ 6 files changed, 201 insertions(+), 30 deletions(-) diff --git a/cas/scheduler/simple_scheduler.rs b/cas/scheduler/simple_scheduler.rs index ae2eafc25..1934bd9c4 100644 --- a/cas/scheduler/simple_scheduler.rs +++ b/cas/scheduler/simple_scheduler.rs @@ -127,16 +127,15 @@ impl Workers { assert!(matches!(awaited_action.current_state.stage, ActionStage::Queued)); let action_properties = &awaited_action.action_info.platform_properties; let mut workers_iter = self.workers.iter_mut(); - let workers_iter = match self.allocation_strategy { - // Use rfind to get the least recently used that satisfies the properties. - config::schedulers::WorkerAllocationStrategy::LeastRecentlyUsed => { - workers_iter.rfind(|(_, w)| action_properties.is_satisfied_by(&w.platform_properties)) - } - // Use find to get the most recently used that satisfies the properties. - config::schedulers::WorkerAllocationStrategy::MostRecentlyUsed => { - workers_iter.find(|(_, w)| action_properties.is_satisfied_by(&w.platform_properties)) - } - }; + let workers_iter = + match self.allocation_strategy { + // Use rfind to get the least recently used that satisfies the properties. + config::schedulers::WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter + .rfind(|(_, w)| !w.is_paused && action_properties.is_satisfied_by(&w.platform_properties)), + // Use find to get the most recently used that satisfies the properties. + config::schedulers::WorkerAllocationStrategy::MostRecentlyUsed => workers_iter + .find(|(_, w)| !w.is_paused && action_properties.is_satisfied_by(&w.platform_properties)), + }; let worker_id = workers_iter.map(|(_, w)| &w.id); // We need to "touch" the worker to ensure it gets re-ordered in the LRUCache, since it was selected. if let Some(&worker_id) = worker_id { @@ -247,10 +246,14 @@ impl SimpleSchedulerImpl { Ok(rx) } - fn retry_action(&mut self, action_info: &Arc, worker_id: &WorkerId) { + fn retry_action(&mut self, action_info: &Arc, worker_id: &WorkerId, due_to_backpressure: bool) { match self.active_actions.remove(action_info) { Some(running_action) => { let mut awaited_action = running_action.action; + // Don't count a backpressure failure as an attempt for an action. + if due_to_backpressure { + awaited_action.attempts -= 1; + } let send_result = if awaited_action.attempts >= self.max_job_retries { let mut default_action_result = ActionResult::default(); default_action_result.execution_metadata.worker = format!("{}", worker_id); @@ -298,7 +301,7 @@ impl SimpleSchedulerImpl { // We create a temporary Vec to avoid doubt about a possible code // path touching the worker.running_action_infos elsewhere. for action_info in worker.running_action_infos.drain() { - self.retry_action(&action_info, &worker_id); + self.retry_action(&action_info, &worker_id, false); } } // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once. @@ -390,6 +393,8 @@ impl SimpleSchedulerImpl { } }; + let due_to_backpressure = err.code == Code::ResourceExhausted; + if running_action.worker_id != *worker_id { log::error!( "Got a result from a worker that should not be running the action, Removing worker. Expected worker {} got worker {}", @@ -406,11 +411,18 @@ impl SimpleSchedulerImpl { // Clear this action from the current worker. if let Some(worker) = self.workers.workers.get_mut(worker_id) { + let was_paused = worker.is_paused; + // This unpauses, but since we're completing with an error, don't + // unpause unless all actions have completed. worker.complete_action(&action_info); + // Only pause if there's an action still waiting that will unpause. + if (was_paused || due_to_backpressure) && worker.has_actions() { + worker.is_paused = true; + } } // Re-queue the action or fail on max attempts. - self.retry_action(&action_info, &worker_id); + self.retry_action(&action_info, &worker_id, due_to_backpressure); } fn update_action( diff --git a/cas/scheduler/worker.rs b/cas/scheduler/worker.rs index 250b241da..663a3e916 100644 --- a/cas/scheduler/worker.rs +++ b/cas/scheduler/worker.rs @@ -89,6 +89,9 @@ pub struct Worker { // Warning: Do not update this timestamp without updating the placement of the worker in // the LRUCache in the Workers struct. pub last_update_timestamp: WorkerTimestamp, + + /// Whether the worker rejected the last action due to back pressure. + pub is_paused: bool, } impl Worker { @@ -104,6 +107,7 @@ impl Worker { tx, running_action_infos: HashSet::new(), last_update_timestamp: timestamp, + is_paused: false, } } @@ -148,7 +152,12 @@ impl Worker { pub fn complete_action(&mut self, action_info: &Arc) { self.running_action_infos.remove(action_info); - self.restore_platform_properties(&action_info.platform_properties) + self.restore_platform_properties(&action_info.platform_properties); + self.is_paused = false; + } + + pub fn has_actions(&self) -> bool { + !self.running_action_infos.is_empty() } /// Reduces the platform properties available on the worker based on the platform properties provided. diff --git a/cas/worker/local_worker.rs b/cas/worker/local_worker.rs index 18ecb233d..02f2febe6 100644 --- a/cas/worker/local_worker.rs +++ b/cas/worker/local_worker.rs @@ -13,10 +13,13 @@ // limitations under the License. use std::pin::Pin; +use std::process::Stdio; +use std::str; use std::sync::Arc; use std::time::Duration; use futures::{future::BoxFuture, select, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt}; +use tokio::process; use tokio::sync::mpsc; use tokio::time::sleep; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -52,6 +55,38 @@ struct LocalWorkerImpl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> { running_actions_manager: Arc, } +async fn preconditions_met(precondition_script: Option) -> Result<(), Error> { + let Some(precondition_script) = &precondition_script else { + // No script means we are always ok to proceed. + return Ok(()); + }; + // TODO: Might want to pass some information about the command to the + // script, but at this point it's not even been downloaded yet, + // so that's not currently possible. Perhaps we'll move this in + // future to pass useful information through? Or perhaps we'll + // have a pre-condition and a pre-execute script instead, although + // arguably entrypoint_cmd already gives us that. + let precondition_process = process::Command::new(precondition_script) + .kill_on_drop(true) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::null()) + .env_clear() + .spawn() + .err_tip(|| format!("Could not execute command {:?}", precondition_script))?; + let output = precondition_process.wait_with_output().await?; + if output.status.code() == Some(0) { + Ok(()) + } else { + Err(make_err!( + Code::ResourceExhausted, + "Preconditions script returned status {} - {}", + output.status, + str::from_utf8(&output.stdout).unwrap_or("") + )) + } +} + impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, T, U> { fn new(config: &'a LocalWorkerConfig, grpc_client: T, worker_id: String, running_actions_manager: Arc) -> Self { Self { @@ -134,10 +169,12 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, let salt = start_execute.salt; let worker_id = self.worker_id.clone(); let action_digest = start_execute.execute_request.as_ref().and_then(|v| v.action_digest.clone()); - let start_action_fut = self - .running_actions_manager - .clone() - .create_and_add_action(worker_id.clone(), start_execute) + let running_actions_manager = self.running_actions_manager.clone(); + let worker_id_clone = worker_id.clone(); + let start_action_fut = preconditions_met(self.config.precondition_script.clone()) + .and_then(|_| async move { + running_actions_manager.create_and_add_action(worker_id_clone, start_execute).await + }) .and_then(|action| action .clone() diff --git a/cas/worker/tests/local_worker_test.rs b/cas/worker/tests/local_worker_test.rs index bdba8c786..f07e4f504 100644 --- a/cas/worker/tests/local_worker_test.rs +++ b/cas/worker/tests/local_worker_test.rs @@ -14,6 +14,8 @@ use std::collections::HashMap; use std::env; +use std::fs::Permissions; +use std::os::unix::fs::PermissionsExt; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -26,11 +28,11 @@ use tonic::Response; use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ExecutionMetadata}; use common::{encode_stream_proto, fs, DigestInfo}; use config::cas_server::{LocalWorkerConfig, WrokerProperty}; -use error::{make_input_err, Error}; +use error::{make_err, make_input_err, Code, Error}; use fast_slow_store::FastSlowStore; use filesystem_store::FilesystemStore; use local_worker::new_local_worker; -use local_worker_test_utils::{setup_grpc_stream, setup_local_worker}; +use local_worker_test_utils::{setup_grpc_stream, setup_local_worker, setup_local_worker_with_config}; use memory_store::MemoryStore; use mock_running_actions_manager::MockRunningAction; use platform_property_manager::PlatformProperties; @@ -367,4 +369,101 @@ mod local_worker_tests { Ok(()) } + + #[tokio::test] + async fn precondition_script_fails() -> Result<(), Box> { + let temp_path = make_temp_path("scripts"); + fs::create_dir_all(temp_path.clone()).await?; + let precondition_script = format!("{}/precondition.sh", temp_path); + { + let mut file = fs::create_file(precondition_script.clone()).await?; + file.write_all(b"#!/bin/sh\nexit 1\n").await?; + } + fs::set_permissions(&precondition_script, Permissions::from_mode(0o777)).await?; + let local_worker_config = LocalWorkerConfig { + precondition_script: Some(precondition_script), + ..Default::default() + }; + + let mut test_context = setup_local_worker_with_config(local_worker_config).await; + let streaming_response = test_context.maybe_streaming_response.take().unwrap(); + + { + // Ensure our worker connects and properties were sent. + let props = test_context.client.expect_connect_worker(Ok(streaming_response)).await; + assert_eq!(props, SupportedProperties::default()); + } + + let expected_worker_id = "foobar".to_string(); + + let mut tx_stream = test_context.maybe_tx_stream.take().unwrap(); + { + // First initialize our worker by sending the response to the connection request. + tx_stream + .send_data(encode_stream_proto(&UpdateForWorker { + update: Some(Update::ConnectionResult(ConnectionResult { + worker_id: expected_worker_id.clone(), + })), + })?) + .await + .map_err(|e| make_input_err!("Could not send : {:?}", e))?; + } + + const SALT: u64 = 1000; + let action_digest = DigestInfo::new([3u8; 32], 10); + let action_info = ActionInfo { + instance_name: "foo".to_string(), + command_digest: DigestInfo::new([1u8; 32], 10), + input_root_digest: DigestInfo::new([2u8; 32], 10), + timeout: Duration::from_secs(1), + platform_properties: PlatformProperties::default(), + priority: 0, + load_timestamp: SystemTime::UNIX_EPOCH, + insert_timestamp: SystemTime::UNIX_EPOCH, + unique_qualifier: ActionInfoHashKey { + digest: action_digest, + salt: SALT, + }, + skip_cache_lookup: true, + }; + + { + // Send execution request. + tx_stream + .send_data(encode_stream_proto(&UpdateForWorker { + update: Some(Update::StartAction(StartExecute { + execute_request: Some(action_info.into()), + salt: SALT, + queued_timestamp: None, + })), + })?) + .await + .map_err(|e| make_input_err!("Could not send : {:?}", e))?; + } + + // Now our client should be notified that our runner finished. + let execution_response = test_context + .client + .expect_execution_response(Ok(Response::new(()))) + .await; + + // Now ensure the final results match our expectations. + assert_eq!( + execution_response, + ExecuteResult { + worker_id: expected_worker_id, + action_digest: Some(action_digest.into()), + salt: SALT, + result: Some(execute_result::Result::InternalError( + make_err!( + Code::ResourceExhausted, + "Preconditions script returned status exit status: 1 - " + ) + .into() + )), + } + ); + + Ok(()) + } } diff --git a/cas/worker/tests/utils/local_worker_test_utils.rs b/cas/worker/tests/utils/local_worker_test_utils.rs index fff1f5c09..cd1314f44 100644 --- a/cas/worker/tests/utils/local_worker_test_utils.rs +++ b/cas/worker/tests/utils/local_worker_test_utils.rs @@ -41,19 +41,10 @@ pub fn setup_grpc_stream() -> (HyperSender, Response> (tx, Response::new(stream)) } -pub async fn setup_local_worker(platform_properties: HashMap) -> TestContext { +pub async fn setup_local_worker_with_config(local_worker_config: LocalWorkerConfig) -> TestContext { let mock_worker_api_client = MockWorkerApiClient::new(); let mock_worker_api_client_clone = mock_worker_api_client.clone(); let actions_manager = Arc::new(MockRunningActionsManager::new()); - const ARBITRARY_LARGE_TIMEOUT: f32 = 10000.; - let local_worker_config = LocalWorkerConfig { - platform_properties, - worker_api_endpoint: EndpointConfig { - timeout: Some(ARBITRARY_LARGE_TIMEOUT), - ..Default::default() - }, - ..Default::default() - }; let worker = LocalWorker::new_with_connection_factory_and_actions_manager( Arc::new(local_worker_config), actions_manager.clone(), @@ -77,6 +68,19 @@ pub async fn setup_local_worker(platform_properties: HashMap) -> TestContext { + const ARBITRARY_LARGE_TIMEOUT: f32 = 10000.; + let local_worker_config = LocalWorkerConfig { + platform_properties, + worker_api_endpoint: EndpointConfig { + timeout: Some(ARBITRARY_LARGE_TIMEOUT), + ..Default::default() + }, + ..Default::default() + }; + setup_local_worker_with_config(local_worker_config).await +} + pub struct TestContext { pub client: MockWorkerApiClient, pub actions_manager: Arc, diff --git a/config/cas_server.rs b/config/cas_server.rs index bc6dd0bf9..12e4550ef 100644 --- a/config/cas_server.rs +++ b/config/cas_server.rs @@ -265,6 +265,16 @@ pub struct LocalWorkerConfig { /// and used to tell the scheduler to restrict what should be executed on this /// worker. pub platform_properties: HashMap, + + /// An optional script to run before every action is processed on the worker. + /// The value should be the full path to the script to execute and will pause + /// all actions on the worker if it returns an exit code other than 0. + /// If not set, then the worker will never pause and will continue to accept + /// jobs according to the scheduler configuration. + /// This is useful, for example, if the worker should not take any more + /// actions until there is enough resource available on the machine to + /// handle them. + pub precondition_script: Option, } #[allow(non_camel_case_types)]