Skip to content

Commit

Permalink
Add support for backpressure from workers.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisstaite-menlo committed Jul 13, 2023
1 parent e16b45c commit fc97fcb
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 30 deletions.
38 changes: 25 additions & 13 deletions cas/scheduler/simple_scheduler.rs
Expand Up @@ -127,16 +127,15 @@ impl Workers {
assert!(matches!(awaited_action.current_state.stage, ActionStage::Queued));
let action_properties = &awaited_action.action_info.platform_properties;
let mut workers_iter = self.workers.iter_mut();
let workers_iter = match self.allocation_strategy {
// Use rfind to get the least recently used that satisfies the properties.
config::schedulers::WorkerAllocationStrategy::LeastRecentlyUsed => {
workers_iter.rfind(|(_, w)| action_properties.is_satisfied_by(&w.platform_properties))
}
// Use find to get the most recently used that satisfies the properties.
config::schedulers::WorkerAllocationStrategy::MostRecentlyUsed => {
workers_iter.find(|(_, w)| action_properties.is_satisfied_by(&w.platform_properties))
}
};
let workers_iter =
match self.allocation_strategy {
// Use rfind to get the least recently used that satisfies the properties.
config::schedulers::WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter
.rfind(|(_, w)| !w.is_paused && action_properties.is_satisfied_by(&w.platform_properties)),
// Use find to get the most recently used that satisfies the properties.
config::schedulers::WorkerAllocationStrategy::MostRecentlyUsed => workers_iter
.find(|(_, w)| !w.is_paused && action_properties.is_satisfied_by(&w.platform_properties)),
};
let worker_id = workers_iter.map(|(_, w)| &w.id);
// We need to "touch" the worker to ensure it gets re-ordered in the LRUCache, since it was selected.
if let Some(&worker_id) = worker_id {
Expand Down Expand Up @@ -247,10 +246,14 @@ impl SimpleSchedulerImpl {
Ok(rx)
}

fn retry_action(&mut self, action_info: &Arc<ActionInfo>, worker_id: &WorkerId) {
fn retry_action(&mut self, action_info: &Arc<ActionInfo>, worker_id: &WorkerId, due_to_backpressure: bool) {
match self.active_actions.remove(action_info) {
Some(running_action) => {
let mut awaited_action = running_action.action;
// Don't count a backpressure failure as an attempt for an action.
if due_to_backpressure {
awaited_action.attempts -= 1;
}
let send_result = if awaited_action.attempts >= self.max_job_retries {
let mut default_action_result = ActionResult::default();
default_action_result.execution_metadata.worker = format!("{}", worker_id);
Expand Down Expand Up @@ -298,7 +301,7 @@ impl SimpleSchedulerImpl {
// We create a temporary Vec to avoid doubt about a possible code
// path touching the worker.running_action_infos elsewhere.
for action_info in worker.running_action_infos.drain() {
self.retry_action(&action_info, &worker_id);
self.retry_action(&action_info, &worker_id, false);
}
}
// Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
Expand Down Expand Up @@ -390,6 +393,8 @@ impl SimpleSchedulerImpl {
}
};

let due_to_backpressure = err.code == Code::ResourceExhausted;

if running_action.worker_id != *worker_id {
log::error!(
"Got a result from a worker that should not be running the action, Removing worker. Expected worker {} got worker {}",
Expand All @@ -406,11 +411,18 @@ impl SimpleSchedulerImpl {

// Clear this action from the current worker.
if let Some(worker) = self.workers.workers.get_mut(worker_id) {
let was_paused = worker.is_paused;
// This unpauses, but since we're completing with an error, don't
// unpause unless all actions have completed.
worker.complete_action(&action_info);
// Only pause if there's an action still waiting that will unpause.
if (was_paused || due_to_backpressure) && worker.has_actions() {
worker.is_paused = true;
}
}

// Re-queue the action or fail on max attempts.
self.retry_action(&action_info, &worker_id);
self.retry_action(&action_info, &worker_id, due_to_backpressure);
}

fn update_action(
Expand Down
11 changes: 10 additions & 1 deletion cas/scheduler/worker.rs
Expand Up @@ -89,6 +89,9 @@ pub struct Worker {
// Warning: Do not update this timestamp without updating the placement of the worker in
// the LRUCache in the Workers struct.
pub last_update_timestamp: WorkerTimestamp,

/// Whether the worker rejected the last action due to back pressure.
pub is_paused: bool,
}

impl Worker {
Expand All @@ -104,6 +107,7 @@ impl Worker {
tx,
running_action_infos: HashSet::new(),
last_update_timestamp: timestamp,
is_paused: false,
}
}

Expand Down Expand Up @@ -148,7 +152,12 @@ impl Worker {

pub fn complete_action(&mut self, action_info: &Arc<ActionInfo>) {
self.running_action_infos.remove(action_info);
self.restore_platform_properties(&action_info.platform_properties)
self.restore_platform_properties(&action_info.platform_properties);
self.is_paused = false;
}

pub fn has_actions(&self) -> bool {
!self.running_action_infos.is_empty()
}

/// Reduces the platform properties available on the worker based on the platform properties provided.
Expand Down
45 changes: 41 additions & 4 deletions cas/worker/local_worker.rs
Expand Up @@ -13,10 +13,13 @@
// limitations under the License.

use std::pin::Pin;
use std::process::Stdio;
use std::str;
use std::sync::Arc;
use std::time::Duration;

use futures::{future::BoxFuture, select, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt};
use tokio::process;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio_stream::wrappers::UnboundedReceiverStream;
Expand Down Expand Up @@ -52,6 +55,38 @@ struct LocalWorkerImpl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> {
running_actions_manager: Arc<U>,
}

async fn preconditions_met(precondition_script: Option<String>) -> Result<(), Error> {
let Some(precondition_script) = &precondition_script else {
// No script means we are always ok to proceed.
return Ok(());
};
// TODO: Might want to pass some information about the command to the
// script, but at this point it's not even been downloaded yet,
// so that's not currently possible. Perhaps we'll move this in
// future to pass useful information through? Or perhaps we'll
// have a pre-condition and a pre-execute script instead, although
// arguably entrypoint_cmd already gives us that.
let precondition_process = process::Command::new(precondition_script)
.kill_on_drop(true)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.env_clear()
.spawn()
.err_tip(|| format!("Could not execute command {:?}", precondition_script))?;
let output = precondition_process.wait_with_output().await?;
if output.status.code() == Some(0) {
Ok(())
} else {
Err(make_err!(
Code::ResourceExhausted,
"Preconditions script returned status {} - {}",
output.status,
str::from_utf8(&output.stdout).unwrap_or("")
))
}
}

impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, T, U> {
fn new(config: &'a LocalWorkerConfig, grpc_client: T, worker_id: String, running_actions_manager: Arc<U>) -> Self {
Self {
Expand Down Expand Up @@ -134,10 +169,12 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
let salt = start_execute.salt;
let worker_id = self.worker_id.clone();
let action_digest = start_execute.execute_request.as_ref().and_then(|v| v.action_digest.clone());
let start_action_fut = self
.running_actions_manager
.clone()
.create_and_add_action(worker_id.clone(), start_execute)
let running_actions_manager = self.running_actions_manager.clone();
let worker_id_clone = worker_id.clone();
let start_action_fut = preconditions_met(self.config.precondition_script.clone())
.and_then(|_| async move {
running_actions_manager.create_and_add_action(worker_id_clone, start_execute).await
})
.and_then(|action|
action
.clone()
Expand Down
103 changes: 101 additions & 2 deletions cas/worker/tests/local_worker_test.rs
Expand Up @@ -14,6 +14,8 @@

use std::collections::HashMap;
use std::env;
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
Expand All @@ -26,11 +28,11 @@ use tonic::Response;
use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ExecutionMetadata};
use common::{encode_stream_proto, fs, DigestInfo};
use config::cas_server::{LocalWorkerConfig, WrokerProperty};
use error::{make_input_err, Error};
use error::{make_err, make_input_err, Code, Error};
use fast_slow_store::FastSlowStore;
use filesystem_store::FilesystemStore;
use local_worker::new_local_worker;
use local_worker_test_utils::{setup_grpc_stream, setup_local_worker};
use local_worker_test_utils::{setup_grpc_stream, setup_local_worker, setup_local_worker_with_config};
use memory_store::MemoryStore;
use mock_running_actions_manager::MockRunningAction;
use platform_property_manager::PlatformProperties;
Expand Down Expand Up @@ -367,4 +369,101 @@ mod local_worker_tests {

Ok(())
}

#[tokio::test]
async fn precondition_script_fails() -> Result<(), Box<dyn std::error::Error>> {
let temp_path = make_temp_path("scripts");
fs::create_dir_all(temp_path.clone()).await?;
let precondition_script = format!("{}/precondition.sh", temp_path);
{
let mut file = fs::create_file(precondition_script.clone()).await?;
file.write_all(b"#!/bin/sh\nexit 1\n").await?;
}
fs::set_permissions(&precondition_script, Permissions::from_mode(0o777)).await?;
let local_worker_config = LocalWorkerConfig {
precondition_script: Some(precondition_script),
..Default::default()
};

let mut test_context = setup_local_worker_with_config(local_worker_config).await;
let streaming_response = test_context.maybe_streaming_response.take().unwrap();

{
// Ensure our worker connects and properties were sent.
let props = test_context.client.expect_connect_worker(Ok(streaming_response)).await;
assert_eq!(props, SupportedProperties::default());
}

let expected_worker_id = "foobar".to_string();

let mut tx_stream = test_context.maybe_tx_stream.take().unwrap();
{
// First initialize our worker by sending the response to the connection request.
tx_stream
.send_data(encode_stream_proto(&UpdateForWorker {
update: Some(Update::ConnectionResult(ConnectionResult {
worker_id: expected_worker_id.clone(),
})),
})?)
.await
.map_err(|e| make_input_err!("Could not send : {:?}", e))?;
}

const SALT: u64 = 1000;
let action_digest = DigestInfo::new([3u8; 32], 10);
let action_info = ActionInfo {
instance_name: "foo".to_string(),
command_digest: DigestInfo::new([1u8; 32], 10),
input_root_digest: DigestInfo::new([2u8; 32], 10),
timeout: Duration::from_secs(1),
platform_properties: PlatformProperties::default(),
priority: 0,
load_timestamp: SystemTime::UNIX_EPOCH,
insert_timestamp: SystemTime::UNIX_EPOCH,
unique_qualifier: ActionInfoHashKey {
digest: action_digest,
salt: SALT,
},
skip_cache_lookup: true,
};

{
// Send execution request.
tx_stream
.send_data(encode_stream_proto(&UpdateForWorker {
update: Some(Update::StartAction(StartExecute {
execute_request: Some(action_info.into()),
salt: SALT,
queued_timestamp: None,
})),
})?)
.await
.map_err(|e| make_input_err!("Could not send : {:?}", e))?;
}

// Now our client should be notified that our runner finished.
let execution_response = test_context
.client
.expect_execution_response(Ok(Response::new(())))
.await;

// Now ensure the final results match our expectations.
assert_eq!(
execution_response,
ExecuteResult {
worker_id: expected_worker_id,
action_digest: Some(action_digest.into()),
salt: SALT,
result: Some(execute_result::Result::InternalError(
make_err!(
Code::ResourceExhausted,
"Preconditions script returned status exit status: 1 - "
)
.into()
)),
}
);

Ok(())
}
}
24 changes: 14 additions & 10 deletions cas/worker/tests/utils/local_worker_test_utils.rs
Expand Up @@ -41,19 +41,10 @@ pub fn setup_grpc_stream() -> (HyperSender, Response<Streaming<UpdateForWorker>>
(tx, Response::new(stream))
}

pub async fn setup_local_worker(platform_properties: HashMap<String, WrokerProperty>) -> TestContext {
pub async fn setup_local_worker_with_config(local_worker_config: LocalWorkerConfig) -> TestContext {
let mock_worker_api_client = MockWorkerApiClient::new();
let mock_worker_api_client_clone = mock_worker_api_client.clone();
let actions_manager = Arc::new(MockRunningActionsManager::new());
const ARBITRARY_LARGE_TIMEOUT: f32 = 10000.;
let local_worker_config = LocalWorkerConfig {
platform_properties,
worker_api_endpoint: EndpointConfig {
timeout: Some(ARBITRARY_LARGE_TIMEOUT),
..Default::default()
},
..Default::default()
};
let worker = LocalWorker::new_with_connection_factory_and_actions_manager(
Arc::new(local_worker_config),
actions_manager.clone(),
Expand All @@ -77,6 +68,19 @@ pub async fn setup_local_worker(platform_properties: HashMap<String, WrokerPrope
}
}

pub async fn setup_local_worker(platform_properties: HashMap<String, WrokerProperty>) -> TestContext {
const ARBITRARY_LARGE_TIMEOUT: f32 = 10000.;
let local_worker_config = LocalWorkerConfig {
platform_properties,
worker_api_endpoint: EndpointConfig {
timeout: Some(ARBITRARY_LARGE_TIMEOUT),
..Default::default()
},
..Default::default()
};
setup_local_worker_with_config(local_worker_config).await
}

pub struct TestContext {
pub client: MockWorkerApiClient,
pub actions_manager: Arc<MockRunningActionsManager>,
Expand Down
10 changes: 10 additions & 0 deletions config/cas_server.rs
Expand Up @@ -265,6 +265,16 @@ pub struct LocalWorkerConfig {
/// and used to tell the scheduler to restrict what should be executed on this
/// worker.
pub platform_properties: HashMap<String, WrokerProperty>,

/// An optional script to run before every action is processed on the worker.
/// The value should be the full path to the script to execute and will pause
/// all actions on the worker if it returns an exit code other than 0.
/// If not set, then the worker will never pause and will continue to accept
/// jobs according to the scheduler configuration.
/// This is useful, for example, if the worker should not take any more
/// actions until there is enough resource available on the machine to
/// handle them.
pub precondition_script: Option<String>,
}

#[allow(non_camel_case_types)]
Expand Down

0 comments on commit fc97fcb

Please sign in to comment.