Skip to content

Commit

Permalink
Support injecting property values into worker command.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisstaite-menlo committed Sep 13, 2023
1 parent e0e0a88 commit 06c03de
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 40 deletions.
10 changes: 10 additions & 0 deletions cas/scheduler/platform_property_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::collections::HashMap;

use config::schedulers::PropertyType;
Expand Down Expand Up @@ -104,6 +105,15 @@ impl PlatformPropertyValue {
Self::Exact(_) | Self::Unknown(_) => false,
}
}

pub fn as_str(&self) -> Cow<str> {
match self {
Self::Exact(value) => Cow::Borrowed(value),
Self::Minimum(value) => Cow::Owned(value.to_string()),
Self::Priority(value) => Cow::Borrowed(value),
Self::Unknown(value) => Cow::Borrowed(value),
}
}
}

/// Helps manage known properties and conversion into `PlatformPropertyValue`.
Expand Down
10 changes: 7 additions & 3 deletions cas/worker/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use proto::com::github::allada::turbo_cache::remote_execution::{
UpdateForWorker,
};
use running_actions_manager::{
Metrics as RunningActionManagerMetrics, RunningAction, RunningActionsManager, RunningActionsManagerImpl,
ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction, RunningActionsManager,
RunningActionsManagerImpl,
};
use store::Store;
use worker_api_client_wrapper::{WorkerApiClientTrait, WorkerApiClientWrapper};
Expand Down Expand Up @@ -339,7 +340,7 @@ pub async fn new_local_worker(
let entrypoint_cmd = if config.entrypoint_cmd.is_empty() {
None
} else {
Some(Arc::new(config.entrypoint_cmd.clone()))
Some(config.entrypoint_cmd.clone())
};
let max_action_timeout = if config.max_action_timeout == 0 {
DEFAULT_MAX_ACTION_TIMEOUT
Expand All @@ -348,7 +349,10 @@ pub async fn new_local_worker(
};
let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(
config.work_directory.clone(),
entrypoint_cmd,
ExecutionConfiguration {
entrypoint_cmd,
additional_environment: config.additional_environment.clone(),
},
fast_slow_store,
ac_store,
config.ac_store_strategy,
Expand Down
73 changes: 56 additions & 17 deletions cas/worker/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::collections::{vec_deque::VecDeque, HashMap};
use std::ffi::OsStr;
use std::ffi::OsString;
Expand Down Expand Up @@ -50,7 +51,7 @@ use ac_utils::{
use action_messages::{ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, SymlinkInfo};
use async_trait::async_trait;
use common::{fs, log, DigestInfo, JoinHandleDropGuard};
use config::cas_server::UploadCacheResultsStrategy;
use config::cas_server::{EnvironmentSource, UploadCacheResultsStrategy};
use error::{make_err, make_input_err, Code, Error, ResultExt};
use fast_slow_store::FastSlowStore;
use filesystem_store::{FileEntry, FilesystemStore};
Expand Down Expand Up @@ -445,7 +446,6 @@ struct RunningActionImplState {
pub struct RunningActionImpl {
action_id: ActionId,
work_directory: String,
entrypoint_cmd: Option<Arc<String>>,
action_info: ActionInfo,
timeout: Duration,
running_actions_manager: Arc<RunningActionsManagerImpl>,
Expand All @@ -458,7 +458,6 @@ impl RunningActionImpl {
execution_metadata: ExecutionMetadata,
action_id: ActionId,
work_directory: String,
entrypoint_cmd: Option<Arc<String>>,
action_info: ActionInfo,
timeout: Duration,
running_actions_manager: Arc<RunningActionsManagerImpl>,
Expand All @@ -467,7 +466,6 @@ impl RunningActionImpl {
Self {
action_id,
work_directory,
entrypoint_cmd,
action_info,
timeout,
running_actions_manager,
Expand Down Expand Up @@ -578,13 +576,14 @@ impl RunningActionImpl {
if command_proto.arguments.is_empty() {
return Err(make_input_err!("No arguments provided in Command proto"));
}
let args: Vec<&OsStr> = if let Some(entrypoint_cmd) = &self.entrypoint_cmd {
std::iter::once(entrypoint_cmd.as_ref().as_ref())
.chain(command_proto.arguments.iter().map(AsRef::as_ref))
.collect()
} else {
command_proto.arguments.iter().map(AsRef::as_ref).collect()
};
let args: Vec<&OsStr> =
if let Some(entrypoint_cmd) = &self.running_actions_manager.execution_configuration.entrypoint_cmd {
std::iter::once(entrypoint_cmd.as_ref())
.chain(command_proto.arguments.iter().map(AsRef::as_ref))
.collect()
} else {
command_proto.arguments.iter().map(AsRef::as_ref).collect()
};
log::info!("\x1b[0;31mWorker Executing\x1b[0m: {:?}", &args);
let mut command_builder = process::Command::new(args[0]);
command_builder
Expand All @@ -596,6 +595,25 @@ impl RunningActionImpl {
.current_dir(format!("{}/{}", self.work_directory, command_proto.working_directory))
.env_clear();

if let Some(additional_environment) = &self
.running_actions_manager
.execution_configuration
.additional_environment
{
for (name, source) in additional_environment {
let value = match source {
EnvironmentSource::Property(property) => self
.action_info
.platform_properties
.properties
.get(property)
.map_or_else(|| Cow::Borrowed(""), |v| v.as_str()),
EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
};
command_builder.env(name, value.as_ref());
}
}

#[cfg(target_family = "unix")]
let envs = &command_proto.environment_variables;
// If SystemRoot is not set on windows we set it to default. Failing to do
Expand Down Expand Up @@ -1059,16 +1077,38 @@ pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static {
type NowFn = fn() -> SystemTime;
type SleepFn = fn(Duration) -> BoxFuture<'static, ()>;

/// Functions that may be injected for testing purposes, during standard control
/// flows these are specified by the new function.
pub struct Callbacks {
/// A function that gets the current time.
pub now_fn: NowFn,
/// A function that sleeps for a given Duration.
pub sleep_fn: SleepFn,
}

/// The set of additional information for executing an action over and above
/// those given in the ActionInfo passed to the worker. This allows
/// modification of the action for execution on this particular worker. This
/// may be used to run the action with a particular set of additional
/// environment variables, or perhaps configure it to execute within a
/// container.
#[derive(Default)]
pub struct ExecutionConfiguration {
/// If set, will be executed instead of the first argument passed in the
/// ActionInfo with all of the arguments in the ActionInfo passed as
/// arguments to this command.
pub entrypoint_cmd: Option<String>,
/// The only environment variables that will be specified when the command
/// executes other than those in the ActionInfo. On Windows, SystemRoot
/// and PATH are also assigned (see inner_execute).
pub additional_environment: Option<HashMap<String, EnvironmentSource>>,
}

/// Holds state info about what is being executed and the interface for interacting
/// with actions while they are running.
pub struct RunningActionsManagerImpl {
root_work_directory: String,
entrypoint_cmd: Option<Arc<String>>,
execution_configuration: ExecutionConfiguration,
cas_store: Arc<FastSlowStore>,
filesystem_store: Arc<FilesystemStore>,
ac_store: Arc<dyn Store>,
Expand All @@ -1085,7 +1125,7 @@ pub struct RunningActionsManagerImpl {
impl RunningActionsManagerImpl {
pub fn new_with_callbacks(
root_work_directory: String,
entrypoint_cmd: Option<Arc<String>>,
execution_configuration: ExecutionConfiguration,
cas_store: Arc<FastSlowStore>,
ac_store: Arc<dyn Store>,
upload_strategy: UploadCacheResultsStrategy,
Expand All @@ -1103,7 +1143,7 @@ impl RunningActionsManagerImpl {
let (action_done_tx, _) = watch::channel(());
Ok(Self {
root_work_directory,
entrypoint_cmd,
execution_configuration,
cas_store,
filesystem_store,
ac_store,
Expand All @@ -1118,15 +1158,15 @@ impl RunningActionsManagerImpl {

pub fn new(
root_work_directory: String,
entrypoint_cmd: Option<Arc<String>>,
execution_configuration: ExecutionConfiguration,
cas_store: Arc<FastSlowStore>,
ac_store: Arc<dyn Store>,
upload_strategy: UploadCacheResultsStrategy,
max_action_timeout: Duration,
) -> Result<Self, Error> {
Self::new_with_callbacks(
root_work_directory,
entrypoint_cmd,
execution_configuration,
cas_store,
ac_store,
upload_strategy,
Expand Down Expand Up @@ -1253,7 +1293,6 @@ impl RunningActionsManager for RunningActionsManagerImpl {
execution_metadata,
action_id,
work_directory,
self.entrypoint_cmd.clone(),
action_info,
timeout,
self.clone(),
Expand Down
Loading

0 comments on commit 06c03de

Please sign in to comment.