Skip to content

Commit

Permalink
Fixes the entrypoint_cmd configuration
Browse files Browse the repository at this point in the history
Fixes an oversight where the design called for `entrypoint_cmd`
config, but it was never implemented.
  • Loading branch information
allada committed Jul 8, 2023
1 parent 6a72841 commit 096d7ea
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 14 deletions.
1 change: 1 addition & 0 deletions cas/worker/BUILD
Expand Up @@ -122,6 +122,7 @@ rust_library(
rust_test(
name = "running_actions_manager_test",
srcs = ["tests/running_actions_manager_test.rs"],
data = ["tests/wrapper_for_test.sh"],
deps = [
":running_actions_manager",
"//cas/scheduler:action_messages",
Expand Down
9 changes: 7 additions & 2 deletions cas/worker/local_worker.rs
Expand Up @@ -235,9 +235,14 @@ pub async fn new_local_worker(
fs::create_dir_all(&config.work_directory)
.await
.err_tip(|| format!("Could not make work_directory : {}", config.work_directory))?;

let entrypoint_cmd = if config.entrypoint_cmd.is_empty() {
None
} else {
Some(Arc::new(config.entrypoint_cmd.clone()))
};
let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(
config.work_directory.to_string(),
config.work_directory.clone(),
entrypoint_cmd,
fast_slow_store,
)?)
.clone();
Expand Down
26 changes: 22 additions & 4 deletions cas/worker/running_actions_manager.rs
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{vec_deque::VecDeque, HashMap};
use std::ffi::OsStr;
use std::fmt::Debug;
use std::fs::Permissions;
use std::io::Cursor;
Expand Down Expand Up @@ -391,6 +392,7 @@ struct RunningActionImplState {
pub struct RunningActionImpl {
action_id: ActionId,
work_directory: String,
entrypoint_cmd: Option<Arc<String>>,
action_info: ActionInfo,
running_actions_manager: Arc<RunningActionsManagerImpl>,
state: Mutex<RunningActionImplState>,
Expand All @@ -402,13 +404,15 @@ impl RunningActionImpl {
execution_metadata: ExecutionMetadata,
action_id: ActionId,
work_directory: String,
entrypoint_cmd: Option<Arc<String>>,
action_info: ActionInfo,
running_actions_manager: Arc<RunningActionsManagerImpl>,
) -> Self {
let (kill_channel_tx, kill_channel_rx) = oneshot::channel();
Self {
action_id,
work_directory,
entrypoint_cmd,
action_info,
running_actions_manager,
state: Mutex::new(RunningActionImplState {
Expand Down Expand Up @@ -509,10 +513,16 @@ impl RunningAction for RunningActionImpl {
.fuse(),
)
};
let args = &command_proto.arguments[..];
if args.len() < 1 {
if command_proto.arguments.len() < 1 {
return Err(make_input_err!("No arguments provided in Command proto"));
}
let args: Vec<&OsStr> = if let Some(entrypoint_cmd) = &self.entrypoint_cmd {
std::iter::once(entrypoint_cmd.as_ref().as_ref())
.chain(command_proto.arguments.iter().map(|v| v.as_ref()))
.collect()
} else {
command_proto.arguments.iter().map(|v| v.as_ref()).collect()
};
log::info!("\x1b[0;31mWorker Executing\x1b[0m: {:?}", &args);
let mut command_builder = process::Command::new(&args[0]);
command_builder
Expand Down Expand Up @@ -815,6 +825,7 @@ type NowFn = fn() -> SystemTime;
/// with actions while they are running.
pub struct RunningActionsManagerImpl {
root_work_directory: String,
entrypoint_cmd: Option<Arc<String>>,
cas_store: Arc<FastSlowStore>,
filesystem_store: Arc<FilesystemStore>,
running_actions: Mutex<HashMap<ActionId, Weak<RunningActionImpl>>>,
Expand All @@ -824,6 +835,7 @@ pub struct RunningActionsManagerImpl {
impl RunningActionsManagerImpl {
pub fn new_with_now_fn(
root_work_directory: String,
entrypoint_cmd: Option<Arc<String>>,
cas_store: Arc<FastSlowStore>,
now_fn: NowFn,
) -> Result<Self, Error> {
Expand All @@ -837,15 +849,20 @@ impl RunningActionsManagerImpl {
.clone();
Ok(Self {
root_work_directory,
entrypoint_cmd,
cas_store,
filesystem_store,
running_actions: Mutex::new(HashMap::new()),
now_fn,
})
}

pub fn new(root_work_directory: String, cas_store: Arc<FastSlowStore>) -> Result<Self, Error> {
Self::new_with_now_fn(root_work_directory, cas_store, SystemTime::now)
pub fn new(
root_work_directory: String,
entrypoint_cmd: Option<Arc<String>>,
cas_store: Arc<FastSlowStore>,
) -> Result<Self, Error> {
Self::new_with_now_fn(root_work_directory, entrypoint_cmd, cas_store, SystemTime::now)
}

async fn make_work_directory(&self, action_id: &ActionId) -> Result<String, Error> {
Expand Down Expand Up @@ -943,6 +960,7 @@ impl RunningActionsManager for RunningActionsManagerImpl {
execution_metadata,
action_id,
work_directory,
self.entrypoint_cmd.clone(),
action_info,
self.clone(),
));
Expand Down
71 changes: 70 additions & 1 deletion cas/worker/tests/running_actions_manager_test.rs
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;
use std::env;
use std::io::Cursor;
use std::os::unix::fs::MetadataExt;
use std::pin::Pin;
use std::str::from_utf8;
Expand All @@ -25,7 +26,7 @@ use futures::{FutureExt, TryFutureExt};
use prost::Message;
use rand::{thread_rng, Rng};

use ac_utils::{get_and_decode_digest, serialize_and_upload_message};
use ac_utils::{compute_digest, get_and_decode_digest, serialize_and_upload_message};
use action_messages::{ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, SymlinkInfo};
use common::{fs, DigestInfo};
use config;
Expand Down Expand Up @@ -386,6 +387,7 @@ mod running_actions_manager_tests {

let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_now_fn(
root_work_directory,
None,
Pin::into_inner(cas_store.clone()),
test_monotonic_clock,
)?);
Expand Down Expand Up @@ -464,6 +466,7 @@ mod running_actions_manager_tests {

let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_now_fn(
root_work_directory,
None,
Pin::into_inner(cas_store.clone()),
test_monotonic_clock,
)?);
Expand Down Expand Up @@ -586,6 +589,7 @@ mod running_actions_manager_tests {

let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_now_fn(
root_work_directory,
None,
Pin::into_inner(cas_store.clone()),
test_monotonic_clock,
)?);
Expand Down Expand Up @@ -739,6 +743,7 @@ mod running_actions_manager_tests {

let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_now_fn(
root_work_directory.clone(),
None,
Pin::into_inner(cas_store.clone()),
test_monotonic_clock,
)?);
Expand Down Expand Up @@ -826,6 +831,7 @@ mod running_actions_manager_tests {

let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(
root_work_directory.clone(),
None,
Pin::into_inner(cas_store.clone()),
)?);
const WORKER_ID: &str = "foo_worker_id";
Expand Down Expand Up @@ -868,4 +874,67 @@ mod running_actions_manager_tests {

Ok(())
}

// This script runs a command under `cas/worker/tests/wrapper_for_test.sh` set in a config.
// The wrapper script will print a constant string to stderr, and the test itself will
// print to stdout. We then check the results of both to make sure the shell script was
// invoked and the actual command was invoked under the shell script.
#[tokio::test]
async fn entrypoint_cmd_does_invoke_if_set() -> Result<(), Box<dyn std::error::Error>> {
let (_, _, cas_store) = setup_stores().await?;
let root_work_directory = make_temp_path("root_work_directory");
fs::create_dir_all(&root_work_directory).await?;

const TEST_WRAPPER_SCRIPT: &str = "cas/worker/tests/wrapper_for_test.sh";
let mut full_wrapper_script_path = env::current_dir()?;
full_wrapper_script_path.push(TEST_WRAPPER_SCRIPT);
let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(
root_work_directory.clone(),
Some(Arc::new(
full_wrapper_script_path.into_os_string().into_string().unwrap(),
)),
Pin::into_inner(cas_store.clone()),
)?);
const WORKER_ID: &str = "foo_worker_id";
const SALT: u64 = 66;
const EXPECTED_STDOUT: &str = "Action did run";
let command = Command {
arguments: vec!["printf".to_string(), EXPECTED_STDOUT.to_string()],
working_directory: ".".to_string(),
..Default::default()
};
let command_digest = serialize_and_upload_message(&command, cas_store.as_ref()).await?;
let input_root_digest = serialize_and_upload_message(&Directory::default(), cas_store.as_ref()).await?;
let action = Action {
command_digest: Some(command_digest.into()),
input_root_digest: Some(input_root_digest.into()),
..Default::default()
};
let action_digest = serialize_and_upload_message(&action, cas_store.as_ref()).await?;

let running_action_impl = running_actions_manager
.clone()
.create_and_add_action(
WORKER_ID.to_string(),
StartExecute {
execute_request: Some(ExecuteRequest {
action_digest: Some(action_digest.into()),
..Default::default()
}),
salt: SALT,
queued_timestamp: Some(make_system_time(1000).into()),
},
)
.await?;

let result = run_action(running_action_impl).await?;

let expected_stdout = compute_digest(Cursor::new(EXPECTED_STDOUT)).await?.0;
// Note: This string should match what is in worker_for_test.sh
let expected_stderr = compute_digest(Cursor::new("Wrapper script did run")).await?.0;
assert_eq!(expected_stdout, result.stdout_digest);
assert_eq!(expected_stderr, result.stderr_digest);

Ok(())
}
}
11 changes: 11 additions & 0 deletions cas/worker/tests/wrapper_for_test.sh
@@ -0,0 +1,11 @@
#!/bin/bash

# This script is used only to make sure wrapper scripts work in
# running_actions_manager_test.rs.

# Print some static text to stderr. This is what the test uses to
# make sure the script did run.
>&2 printf "Wrapper script did run"

# Now run the real command.
exec "$@"
7 changes: 3 additions & 4 deletions config/cas_server.rs
Expand Up @@ -216,11 +216,10 @@ pub struct LocalWorkerConfig {

/// The command to execute on every execution request. This will be parsed as
/// a command + arguments (not shell).
/// '$@' has a special meaning in that all the arguments will expand into this
/// location.
/// Example: "run.sh $@" and a job with command: "sleep 5" will result in a
/// Example: "run.sh" and a job with command: "sleep 5" will result in a
/// command like: "run.sh sleep 5".
#[serde(deserialize_with = "convert_string_with_shellexpand")]
/// Default: <Use the command from the job request>.
#[serde(default, deserialize_with = "convert_string_with_shellexpand")]
pub entrypoint_cmd: String,

/// Underlying CAS store that the worker will use to download CAS artifacts.
Expand Down
1 change: 0 additions & 1 deletion config/examples/basic_cas.json
Expand Up @@ -64,7 +64,6 @@
"worker_api_endpoint": {
"uri": "grpc://127.0.0.1:50061",
},
"entrypoint_cmd": "./examples/worker/local_entrypoint.sh $@",
"cas_fast_slow_store": "WORKER_FAST_SLOW_STORE",
"ac_store": "AC_MAIN_STORE",
"work_directory": "/tmp/turbo_cache/work",
Expand Down
1 change: 0 additions & 1 deletion deployment-examples/docker-compose/worker.json
Expand Up @@ -32,7 +32,6 @@
"worker_api_endpoint": {
"uri": "grpc://${SCHEDULER_ENDPOINT:-127.0.0.1}:50061",
},
"entrypoint_cmd": "$@",
"cas_fast_slow_store": "WORKER_FAST_SLOW_STORE",
"ac_store": "GRPC_LOCAL_STORE",
"work_directory": "~/.cache/turbo-cache/work",
Expand Down
1 change: 0 additions & 1 deletion deployment-examples/terraform/scripts/worker.json
Expand Up @@ -88,7 +88,6 @@
"worker_api_endpoint": {
"uri": "grpc://${SCHEDULER_ENDPOINT:-127.0.0.1}:50061",
},
"entrypoint_cmd": "$@",
"cas_fast_slow_store": "WORKER_FAST_SLOW_STORE",
"ac_store": "AC_S3_STORE",
"work_directory": "/worker/work",
Expand Down
1 change: 1 addition & 0 deletions run_integration_tests.sh
Expand Up @@ -93,6 +93,7 @@ for pattern in "${TEST_PATTERNS[@]}"; do
echo "$FILENAME passed"
else
echo "$FILENAME failed with exit code $EXIT_CODE"
docker-compose logs
exit $EXIT_CODE
fi
docker-compose down
Expand Down

0 comments on commit 096d7ea

Please sign in to comment.