diff --git a/cas/worker/BUILD b/cas/worker/BUILD index 58e3005d9..cad082060 100644 --- a/cas/worker/BUILD +++ b/cas/worker/BUILD @@ -122,6 +122,7 @@ rust_library( rust_test( name = "running_actions_manager_test", srcs = ["tests/running_actions_manager_test.rs"], + data = ["tests/wrapper_for_test.sh"], deps = [ ":running_actions_manager", "//cas/scheduler:action_messages", diff --git a/cas/worker/local_worker.rs b/cas/worker/local_worker.rs index 313d51d76..02949f5f1 100644 --- a/cas/worker/local_worker.rs +++ b/cas/worker/local_worker.rs @@ -235,9 +235,14 @@ pub async fn new_local_worker( fs::create_dir_all(&config.work_directory) .await .err_tip(|| format!("Could not make work_directory : {}", config.work_directory))?; - + let entrypoint_cmd = if config.entrypoint_cmd.is_empty() { + None + } else { + Some(Arc::new(config.entrypoint_cmd.clone())) + }; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new( - config.work_directory.to_string(), + config.work_directory.clone(), + entrypoint_cmd, fast_slow_store, )?) .clone(); diff --git a/cas/worker/running_actions_manager.rs b/cas/worker/running_actions_manager.rs index ef062c300..0264089c2 100644 --- a/cas/worker/running_actions_manager.rs +++ b/cas/worker/running_actions_manager.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{vec_deque::VecDeque, HashMap}; +use std::ffi::OsStr; use std::fmt::Debug; use std::fs::Permissions; use std::io::Cursor; @@ -391,6 +392,7 @@ struct RunningActionImplState { pub struct RunningActionImpl { action_id: ActionId, work_directory: String, + entrypoint_cmd: Option>, action_info: ActionInfo, running_actions_manager: Arc, state: Mutex, @@ -402,6 +404,7 @@ impl RunningActionImpl { execution_metadata: ExecutionMetadata, action_id: ActionId, work_directory: String, + entrypoint_cmd: Option>, action_info: ActionInfo, running_actions_manager: Arc, ) -> Self { @@ -409,6 +412,7 @@ impl RunningActionImpl { Self { action_id, work_directory, + entrypoint_cmd, action_info, running_actions_manager, state: Mutex::new(RunningActionImplState { @@ -509,10 +513,16 @@ impl RunningAction for RunningActionImpl { .fuse(), ) }; - let args = &command_proto.arguments[..]; - if args.len() < 1 { + if command_proto.arguments.len() < 1 { return Err(make_input_err!("No arguments provided in Command proto")); } + let args: Vec<&OsStr> = if let Some(entrypoint_cmd) = &self.entrypoint_cmd { + std::iter::once(entrypoint_cmd.as_ref().as_ref()) + .chain(command_proto.arguments.iter().map(|v| v.as_ref())) + .collect() + } else { + command_proto.arguments.iter().map(|v| v.as_ref()).collect() + }; log::info!("\x1b[0;31mWorker Executing\x1b[0m: {:?}", &args); let mut command_builder = process::Command::new(&args[0]); command_builder @@ -815,6 +825,7 @@ type NowFn = fn() -> SystemTime; /// with actions while they are running. pub struct RunningActionsManagerImpl { root_work_directory: String, + entrypoint_cmd: Option>, cas_store: Arc, filesystem_store: Arc, running_actions: Mutex>>, @@ -824,6 +835,7 @@ pub struct RunningActionsManagerImpl { impl RunningActionsManagerImpl { pub fn new_with_now_fn( root_work_directory: String, + entrypoint_cmd: Option>, cas_store: Arc, now_fn: NowFn, ) -> Result { @@ -837,6 +849,7 @@ impl RunningActionsManagerImpl { .clone(); Ok(Self { root_work_directory, + entrypoint_cmd, cas_store, filesystem_store, running_actions: Mutex::new(HashMap::new()), @@ -844,8 +857,12 @@ impl RunningActionsManagerImpl { }) } - pub fn new(root_work_directory: String, cas_store: Arc) -> Result { - Self::new_with_now_fn(root_work_directory, cas_store, SystemTime::now) + pub fn new( + root_work_directory: String, + entrypoint_cmd: Option>, + cas_store: Arc, + ) -> Result { + Self::new_with_now_fn(root_work_directory, entrypoint_cmd, cas_store, SystemTime::now) } async fn make_work_directory(&self, action_id: &ActionId) -> Result { @@ -943,6 +960,7 @@ impl RunningActionsManager for RunningActionsManagerImpl { execution_metadata, action_id, work_directory, + self.entrypoint_cmd.clone(), action_info, self.clone(), )); diff --git a/cas/worker/tests/running_actions_manager_test.rs b/cas/worker/tests/running_actions_manager_test.rs index 21e8a2b17..3ff36b569 100644 --- a/cas/worker/tests/running_actions_manager_test.rs +++ b/cas/worker/tests/running_actions_manager_test.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::env; +use std::io::Cursor; use std::os::unix::fs::MetadataExt; use std::pin::Pin; use std::str::from_utf8; @@ -25,7 +26,7 @@ use futures::{FutureExt, TryFutureExt}; use prost::Message; use rand::{thread_rng, Rng}; -use ac_utils::{get_and_decode_digest, serialize_and_upload_message}; +use ac_utils::{compute_digest, get_and_decode_digest, serialize_and_upload_message}; use action_messages::{ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, SymlinkInfo}; use common::{fs, DigestInfo}; use config; @@ -386,6 +387,7 @@ mod running_actions_manager_tests { let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_now_fn( root_work_directory, + None, Pin::into_inner(cas_store.clone()), test_monotonic_clock, )?); @@ -464,6 +466,7 @@ mod running_actions_manager_tests { let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_now_fn( root_work_directory, + None, Pin::into_inner(cas_store.clone()), test_monotonic_clock, )?); @@ -586,6 +589,7 @@ mod running_actions_manager_tests { let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_now_fn( root_work_directory, + None, Pin::into_inner(cas_store.clone()), test_monotonic_clock, )?); @@ -739,6 +743,7 @@ mod running_actions_manager_tests { let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_now_fn( root_work_directory.clone(), + None, Pin::into_inner(cas_store.clone()), test_monotonic_clock, )?); @@ -826,6 +831,7 @@ mod running_actions_manager_tests { let running_actions_manager = Arc::new(RunningActionsManagerImpl::new( root_work_directory.clone(), + None, Pin::into_inner(cas_store.clone()), )?); const WORKER_ID: &str = "foo_worker_id"; @@ -868,4 +874,67 @@ mod running_actions_manager_tests { Ok(()) } + + // This script runs a command under `cas/worker/tests/wrapper_for_test.sh` set in a config. + // The wrapper script will print a constant string to stderr, and the test itself will + // print to stdout. We then check the results of both to make sure the shell script was + // invoked and the actual command was invoked under the shell script. + #[tokio::test] + async fn entrypoint_cmd_does_invoke_if_set() -> Result<(), Box> { + let (_, _, cas_store) = setup_stores().await?; + let root_work_directory = make_temp_path("root_work_directory"); + fs::create_dir_all(&root_work_directory).await?; + + const TEST_WRAPPER_SCRIPT: &str = "cas/worker/tests/wrapper_for_test.sh"; + let mut full_wrapper_script_path = env::current_dir()?; + full_wrapper_script_path.push(TEST_WRAPPER_SCRIPT); + let running_actions_manager = Arc::new(RunningActionsManagerImpl::new( + root_work_directory.clone(), + Some(Arc::new( + full_wrapper_script_path.into_os_string().into_string().unwrap(), + )), + Pin::into_inner(cas_store.clone()), + )?); + const WORKER_ID: &str = "foo_worker_id"; + const SALT: u64 = 66; + const EXPECTED_STDOUT: &str = "Action did run"; + let command = Command { + arguments: vec!["printf".to_string(), EXPECTED_STDOUT.to_string()], + working_directory: ".".to_string(), + ..Default::default() + }; + let command_digest = serialize_and_upload_message(&command, cas_store.as_ref()).await?; + let input_root_digest = serialize_and_upload_message(&Directory::default(), cas_store.as_ref()).await?; + let action = Action { + command_digest: Some(command_digest.into()), + input_root_digest: Some(input_root_digest.into()), + ..Default::default() + }; + let action_digest = serialize_and_upload_message(&action, cas_store.as_ref()).await?; + + let running_action_impl = running_actions_manager + .clone() + .create_and_add_action( + WORKER_ID.to_string(), + StartExecute { + execute_request: Some(ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }), + salt: SALT, + queued_timestamp: Some(make_system_time(1000).into()), + }, + ) + .await?; + + let result = run_action(running_action_impl).await?; + + let expected_stdout = compute_digest(Cursor::new(EXPECTED_STDOUT)).await?.0; + // Note: This string should match what is in worker_for_test.sh + let expected_stderr = compute_digest(Cursor::new("Wrapper script did run")).await?.0; + assert_eq!(expected_stdout, result.stdout_digest); + assert_eq!(expected_stderr, result.stderr_digest); + + Ok(()) + } } diff --git a/cas/worker/tests/wrapper_for_test.sh b/cas/worker/tests/wrapper_for_test.sh new file mode 100755 index 000000000..aee91301a --- /dev/null +++ b/cas/worker/tests/wrapper_for_test.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +# This script is used only to make sure wrapper scripts work in +# running_actions_manager_test.rs. + +# Print some static text to stderr. This is what the test uses to +# make sure the script did run. +>&2 printf "Wrapper script did run" + +# Now run the real command. +exec "$@" diff --git a/config/cas_server.rs b/config/cas_server.rs index 306c9ff7b..8a55e4a56 100644 --- a/config/cas_server.rs +++ b/config/cas_server.rs @@ -216,11 +216,10 @@ pub struct LocalWorkerConfig { /// The command to execute on every execution request. This will be parsed as /// a command + arguments (not shell). - /// '$@' has a special meaning in that all the arguments will expand into this - /// location. - /// Example: "run.sh $@" and a job with command: "sleep 5" will result in a + /// Example: "run.sh" and a job with command: "sleep 5" will result in a /// command like: "run.sh sleep 5". - #[serde(deserialize_with = "convert_string_with_shellexpand")] + /// Default: . + #[serde(default, deserialize_with = "convert_string_with_shellexpand")] pub entrypoint_cmd: String, /// Underlying CAS store that the worker will use to download CAS artifacts. diff --git a/config/examples/basic_cas.json b/config/examples/basic_cas.json index adb5d2051..35c92f2e5 100644 --- a/config/examples/basic_cas.json +++ b/config/examples/basic_cas.json @@ -64,7 +64,6 @@ "worker_api_endpoint": { "uri": "grpc://127.0.0.1:50061", }, - "entrypoint_cmd": "./examples/worker/local_entrypoint.sh $@", "cas_fast_slow_store": "WORKER_FAST_SLOW_STORE", "ac_store": "AC_MAIN_STORE", "work_directory": "/tmp/turbo_cache/work", diff --git a/deployment-examples/docker-compose/worker.json b/deployment-examples/docker-compose/worker.json index 49f0c37d0..748dafb95 100644 --- a/deployment-examples/docker-compose/worker.json +++ b/deployment-examples/docker-compose/worker.json @@ -32,7 +32,6 @@ "worker_api_endpoint": { "uri": "grpc://${SCHEDULER_ENDPOINT:-127.0.0.1}:50061", }, - "entrypoint_cmd": "$@", "cas_fast_slow_store": "WORKER_FAST_SLOW_STORE", "ac_store": "GRPC_LOCAL_STORE", "work_directory": "~/.cache/turbo-cache/work", diff --git a/deployment-examples/terraform/scripts/worker.json b/deployment-examples/terraform/scripts/worker.json index 1a84b9d15..2e14e7946 100644 --- a/deployment-examples/terraform/scripts/worker.json +++ b/deployment-examples/terraform/scripts/worker.json @@ -88,7 +88,6 @@ "worker_api_endpoint": { "uri": "grpc://${SCHEDULER_ENDPOINT:-127.0.0.1}:50061", }, - "entrypoint_cmd": "$@", "cas_fast_slow_store": "WORKER_FAST_SLOW_STORE", "ac_store": "AC_S3_STORE", "work_directory": "/worker/work", diff --git a/run_integration_tests.sh b/run_integration_tests.sh index e09423b99..229a69cbf 100755 --- a/run_integration_tests.sh +++ b/run_integration_tests.sh @@ -93,6 +93,7 @@ for pattern in "${TEST_PATTERNS[@]}"; do echo "$FILENAME passed" else echo "$FILENAME failed with exit code $EXIT_CODE" + docker-compose logs exit $EXIT_CODE fi docker-compose down