diff --git a/cas/scheduler/action_messages.rs b/cas/scheduler/action_messages.rs index 64db1e9ab..56b91b81f 100644 --- a/cas/scheduler/action_messages.rs +++ b/cas/scheduler/action_messages.rs @@ -16,8 +16,8 @@ use prost::Message; use prost_types::Any; use proto::build::bazel::remote::execution::v2::{ execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest, - ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile, OutputSymlink, - SymlinkNode, + ExecuteResponse, ExecutedActionMetadata, ExecutionPolicy, FileNode, LogFile, OutputDirectory, OutputFile, + OutputSymlink, Platform, SymlinkNode, }; use proto::google::longrunning::{operation::Result as LongRunningResult, Operation}; @@ -112,16 +112,13 @@ impl ActionInfo { .try_into()?, timeout: action .timeout - .err_tip(|| "Expected timeout to exist on Action")? + .unwrap_or(prost_types::Duration::default()) .try_into() .map_err(|_| make_input_err!("Failed convert proto duration to system duration"))?, - platform_properties: action - .platform - .err_tip(|| "Expected platform to exist on Action")? - .try_into()?, + platform_properties: action.platform.unwrap_or(Platform::default()).try_into()?, priority: execute_request .execution_policy - .err_tip(|| "Expected execution_policy to exist on ExecuteRequest")? + .unwrap_or(ExecutionPolicy::default()) .priority, insert_timestamp: SystemTime::UNIX_EPOCH, // We can't know it at this point. unique_qualifier: ActionInfoHashKey { @@ -223,12 +220,26 @@ impl Eq for ActionInfoHashKey {} /// This is in order to be able to reuse the same struct instead of building different /// structs when converting `FileInfo` -> {`OutputFile`, `FileNode`} and other similar /// structs. -#[derive(Eq, PartialEq, Debug, Clone)] +#[derive(Eq, PartialEq, PartialOrd, Debug, Clone)] pub enum NameOrPath { Name(String), Path(String), } +impl Ord for NameOrPath { + fn cmp(&self, other: &Self) -> Ordering { + let self_lexical_name = match self { + NameOrPath::Name(name) => name, + NameOrPath::Path(path) => path, + }; + let other_lexical_name = match other { + NameOrPath::Name(name) => name, + NameOrPath::Path(path) => path, + }; + self_lexical_name.cmp(other_lexical_name) + } +} + /// Represents an individual file and associated metadata. /// This struct must be 100% compatible with `OutputFile` and `FileNode` structs in /// remote_execution.proto. diff --git a/cas/store/BUILD b/cas/store/BUILD index a8dd7e393..8f3ed168e 100644 --- a/cas/store/BUILD +++ b/cas/store/BUILD @@ -54,7 +54,12 @@ rust_library( name = "ac_utils", srcs = ["ac_utils.rs"], deps = [ + "//third_party:bytes", + "//third_party:futures", "//third_party:prost", + "//third_party:sha2", + "//third_party:tokio", + "//util:buf_channel", "//util:common", "//util:error", ":store", diff --git a/cas/store/ac_utils.rs b/cas/store/ac_utils.rs index fc745c54e..52f109afc 100644 --- a/cas/store/ac_utils.rs +++ b/cas/store/ac_utils.rs @@ -1,12 +1,19 @@ // Copyright 2021 Nathan (Blaise) Bruer. All rights reserved. use std::default::Default; +use std::io::Cursor; use std::pin::Pin; +use bytes::BytesMut; +use futures::{future::try_join, Future, FutureExt, TryFutureExt}; +use prost::Message; +use sha2::{Digest, Sha256}; +use tokio::io::{AsyncRead, AsyncReadExt}; + +use buf_channel::make_buf_channel_pair; use common::DigestInfo; use error::{Code, Error, ResultExt}; -use prost::Message; -use store::Store; +use store::{Store, UploadSizeInfo}; // NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than // 1.2k. Giving a bit more just in case to reduce allocs. @@ -42,3 +49,93 @@ pub async fn get_and_decode_digest( T::decode(store_data).err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e))) } + +/// Takes a proto message and will serialize it and upload it to the provided store. +pub fn serialize_and_upload_message<'a, T: Message>( + message: &'a T, + cas_store: Pin<&'a dyn Store>, +) -> impl Future> + 'a { + async move { + let mut buffer = BytesMut::new(); + let digest = { + message + .encode(&mut buffer) + .err_tip(|| "Could not encode directory proto")?; + let mut hasher = Sha256::new(); + hasher.update(&buffer); + DigestInfo::new(hasher.finalize().into(), buffer.len() as i64) + }; + upload_to_store(cas_store, digest.clone(), &mut Cursor::new(buffer)).await?; + Ok(digest) + } +} + +/// Given a bytestream computes the digest for the data. +/// Note: This will happen in a new spawn since computing digests can be thread intensive. +pub fn compute_digest( + mut reader: R, +) -> impl Future> { + tokio::spawn(async move { + const DEFAULT_READ_BUFF_SIZE: usize = 4096; + let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE); + let mut hasher = Sha256::new(); + let mut digest_size = 0; + loop { + reader + .read_buf(&mut chunk) + .await + .err_tip(|| "Could not read chunk during compute_digest")?; + if chunk.is_empty() { + break; // EOF. + } + digest_size += chunk.len(); + hasher.update(&chunk); + chunk.clear(); + } + + Ok((DigestInfo::new(hasher.finalize().into(), digest_size as i64), reader)) + }) + .map(|r| r.err_tip(|| "Failed to launch spawn")?) +} + +/// Uploads data to our store for given digest. +/// Sadly we cannot upload our data while computing our hash, this means that we often +/// will need to read the file two times, one to hash the file and the other to upload +/// it. In the future we could possibly upload to store while computing the hash and +/// then "finish" the upload by giving the digest, but not all stores will support this +/// for now we will just always read twice. +pub fn upload_to_store<'a, R: AsyncRead + Unpin>( + cas_store: Pin<&'a dyn Store>, + digest: DigestInfo, + reader: &'a mut R, +) -> impl Future> + 'a { + let (mut tx, rx) = make_buf_channel_pair(); + let upload_to_store_fut = cas_store + .update( + digest.clone(), + rx, + UploadSizeInfo::ExactSize(digest.size_bytes as usize), + ) + .map(|r| r.err_tip(|| "Could not upload data to store in upload_to_store")); + let read_data_fut = async move { + loop { + const DEFAULT_READ_BUFF_SIZE: usize = 4096; + let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE); + reader + .read_buf(&mut chunk) + .await + .err_tip(|| "Could not read chunk during upload_to_store")?; + if chunk.len() == 0 { + break; // EOF. + } + tx.send(chunk.freeze()) + .await + .err_tip(|| "Could not send buffer data to store in upload_to_store")?; + } + tx.send_eof() + .await + .err_tip(|| "Could not send EOF to store in upload_to_store")?; + Ok(()) + }; + try_join(read_data_fut, upload_to_store_fut).map_ok(|(_, _)| ()) +} diff --git a/cas/store/compression_store.rs b/cas/store/compression_store.rs index 5c77ffb09..6e9087ce0 100644 --- a/cas/store/compression_store.rs +++ b/cas/store/compression_store.rs @@ -560,7 +560,7 @@ impl StoreTrait for CompressionStore { Ok(()) } - fn as_any(self: Arc) -> Arc { - self + fn as_any(self: Arc) -> Box { + Box::new(self) } } diff --git a/cas/store/dedup_store.rs b/cas/store/dedup_store.rs index 393f79a35..94d9c97bb 100644 --- a/cas/store/dedup_store.rs +++ b/cas/store/dedup_store.rs @@ -325,7 +325,7 @@ impl StoreTrait for DedupStore { Ok(()) } - fn as_any(self: Arc) -> Arc { - self + fn as_any(self: Arc) -> Box { + Box::new(self) } } diff --git a/cas/store/fast_slow_store.rs b/cas/store/fast_slow_store.rs index e372c6fee..7156fa73b 100644 --- a/cas/store/fast_slow_store.rs +++ b/cas/store/fast_slow_store.rs @@ -33,7 +33,7 @@ impl FastSlowStore { Self { fast_store, slow_store } } - pub fn fast_slow<'a>(&'a self) -> &'a Arc { + pub fn fast_store<'a>(&'a self) -> &'a Arc { &self.fast_store } @@ -211,7 +211,7 @@ impl StoreTrait for FastSlowStore { Ok(()) } - fn as_any(self: Arc) -> Arc { - self + fn as_any(self: Arc) -> Box { + Box::new(self) } } diff --git a/cas/store/filesystem_store.rs b/cas/store/filesystem_store.rs index 59217b5ad..a93fd7f4f 100644 --- a/cas/store/filesystem_store.rs +++ b/cas/store/filesystem_store.rs @@ -317,6 +317,12 @@ impl FilesystemStore { file_size += data_len as u64; } + temp_file + .as_ref() + .sync_data() + .await + .err_tip(|| format!("Failed to sync_data in filesystem store {}", temp_loc))?; + let entry = Arc::new(FileEntry { digest: digest.clone(), file_size, @@ -437,7 +443,7 @@ impl StoreTrait for FilesystemStore { Ok(()) } - fn as_any(self: Arc) -> Arc { - self + fn as_any(self: Arc) -> Box { + Box::new(self) } } diff --git a/cas/store/memory_store.rs b/cas/store/memory_store.rs index bddd9385a..0d633ca3a 100644 --- a/cas/store/memory_store.rs +++ b/cas/store/memory_store.rs @@ -112,7 +112,7 @@ impl StoreTrait for MemoryStore { Ok(()) } - fn as_any(self: Arc) -> Arc { - self + fn as_any(self: Arc) -> Box { + Box::new(self) } } diff --git a/cas/store/ref_store.rs b/cas/store/ref_store.rs index ee9aa0e6c..c341a3363 100644 --- a/cas/store/ref_store.rs +++ b/cas/store/ref_store.rs @@ -105,7 +105,7 @@ impl StoreTrait for RefStore { Pin::new(store.as_ref()).get_part(digest, writer, offset, length).await } - fn as_any(self: Arc) -> Arc { - self + fn as_any(self: Arc) -> Box { + Box::new(self) } } diff --git a/cas/store/s3_store.rs b/cas/store/s3_store.rs index 734de3497..aad9c3db7 100644 --- a/cas/store/s3_store.rs +++ b/cas/store/s3_store.rs @@ -483,7 +483,7 @@ impl StoreTrait for S3Store { .await } - fn as_any(self: Arc) -> Arc { - self + fn as_any(self: Arc) -> Box { + Box::new(self) } } diff --git a/cas/store/size_partitioning_store.rs b/cas/store/size_partitioning_store.rs index 72b0f33be..aa9ff695b 100644 --- a/cas/store/size_partitioning_store.rs +++ b/cas/store/size_partitioning_store.rs @@ -73,7 +73,7 @@ impl StoreTrait for SizePartitioningStore { .await } - fn as_any(self: Arc) -> Arc { - self + fn as_any(self: Arc) -> Box { + Box::new(self) } } diff --git a/cas/store/store_trait.rs b/cas/store/store_trait.rs index ab3c4439a..c2720c843 100644 --- a/cas/store/store_trait.rs +++ b/cas/store/store_trait.rs @@ -95,5 +95,6 @@ pub trait StoreTrait: Sync + Send + Unpin { .merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked")) } - fn as_any(self: Arc) -> Arc; + /// Expect the returned Any to be `Arc`. + fn as_any(self: Arc) -> Box; } diff --git a/cas/store/verify_store.rs b/cas/store/verify_store.rs index 4d5f22a73..615a0d6a9 100644 --- a/cas/store/verify_store.rs +++ b/cas/store/verify_store.rs @@ -132,7 +132,7 @@ impl StoreTrait for VerifyStore { self.pin_inner().get_part(digest, writer, offset, length).await } - fn as_any(self: Arc) -> Arc { - self + fn as_any(self: Arc) -> Box { + Box::new(self) } } diff --git a/cas/worker/BUILD b/cas/worker/BUILD index 2b96feb60..0a2381df5 100644 --- a/cas/worker/BUILD +++ b/cas/worker/BUILD @@ -6,6 +6,7 @@ rust_library( name = "local_worker", srcs = ["local_worker.rs"], deps = [ + "//cas/scheduler:action_messages", "//cas/store", "//cas/store:fast_slow_store", "//config", @@ -28,14 +29,20 @@ rust_library( srcs = ["running_actions_manager.rs"], deps = [ "//cas/scheduler:action_messages", + "//cas/store", "//cas/store:ac_utils", "//cas/store:fast_slow_store", "//cas/store:filesystem_store", "//proto", + "//third_party:bytes", "//third_party:fast_async_mutex", + "//third_party:filetime", "//third_party:futures", + "//third_party:hex", + "//third_party:relative_path", "//third_party:tokio", - "//third_party:filetime", + "//third_party:tokio_stream", + "//third_party:tokio_util", "//util:common", "//util:error", ], @@ -106,6 +113,7 @@ rust_library( srcs = ["tests/utils/mock_running_actions_manager.rs"], testonly = True, deps = [ + "//cas/scheduler:action_messages", "//proto", "//third_party:fast_async_mutex", "//third_party:tokio", @@ -119,12 +127,15 @@ rust_test( name = "running_actions_manager_test", srcs = ["tests/running_actions_manager_test.rs"], deps = [ + "//cas/scheduler:action_messages", "//cas/store", + "//cas/store:ac_utils", "//cas/store:fast_slow_store", "//cas/store:filesystem_store", "//cas/store:memory_store", "//config", "//proto", + "//third_party:futures", "//third_party:pretty_assertions", "//third_party:prost", "//third_party:rand", diff --git a/cas/worker/local_worker.rs b/cas/worker/local_worker.rs index 678f4fd64..cbe214934 100644 --- a/cas/worker/local_worker.rs +++ b/cas/worker/local_worker.rs @@ -10,6 +10,7 @@ use tokio::time::sleep; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{transport::Channel as TonicChannel, Streaming}; +use action_messages::{ActionResult, ActionStage}; use common::log; use config::cas_server::LocalWorkerConfig; use error::{make_err, make_input_err, Code, Error, ResultExt}; @@ -119,22 +120,44 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, Update::StartAction(start_execute) => { let add_future_channel = add_future_channel.clone(); let mut grpc_client = self.grpc_client.clone(); + let salt = start_execute.salt.clone(); + let worker_id = self.worker_id.clone(); + let action_digest = start_execute.execute_request.as_ref().map_or(None, |v| v.action_digest.clone()); let start_action_fut = self .running_actions_manager .clone() - .create_and_add_action(start_execute) - .and_then(|action| action.prepare_action()) - .and_then(|action| action.execute()) - .and_then(|action| action.upload_results()) - .and_then(|action| action.cleanup()) - .and_then(|action| action.get_finished_result()); + .create_and_add_action(worker_id.clone(), start_execute) + .and_then(|action| + action + .clone() + .prepare_action() + .and_then(|action| action.execute()) + .and_then(|action| action.upload_results()) + .and_then(|action| action.get_finished_result()) + // Note: We need ensure we run cleanup even if one of the other steps fail. + .then(|result| async move { + if let Err(e) = action.cleanup().await { + return Result::::Err(e).merge(result); + } + result + }) + ); - let make_publish_future = move |res: Result| async move { + let make_publish_future = move |res: Result| async move { match res { - Ok(finished_result) => { - grpc_client.execution_response(ExecuteResult{ - response: Some(execute_result::Response::Result(finished_result)), - }).await.err_tip(|| "Error while calling execution_response")?; + Ok(action_result) => { + grpc_client.execution_response( + ExecuteResult{ + response: Some(execute_result::Response::Result(ExecuteFinishedResult{ + worker_id, + action_digest, + salt, + execute_response: Some(ActionStage::Completed(action_result).into()), + })), + } + ) + .await + .err_tip(|| "Error while calling execution_response")?; }, Err(e) => { grpc_client.execution_response(ExecuteResult{ @@ -189,7 +212,11 @@ pub fn new_local_worker( .downcast_ref::>() .err_tip(|| "Expected store for LocalWorker's store to be a FastSlowStore")? .clone(); - let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(fast_slow_store)?).clone(); + let running_actions_manager = Arc::new(RunningActionsManagerImpl::new( + config.work_directory.clone(), + fast_slow_store, + )?) + .clone(); Ok(LocalWorker::new_with_connection_factory_and_actions_manager( config.clone(), running_actions_manager, diff --git a/cas/worker/running_actions_manager.rs b/cas/worker/running_actions_manager.rs index f241f056b..6b763f796 100644 --- a/cas/worker/running_actions_manager.rs +++ b/cas/worker/running_actions_manager.rs @@ -1,26 +1,49 @@ // Copyright 2022 Nathan (Blaise) Bruer. All rights reserved. -use std::collections::HashMap; +use std::collections::{vec_deque::VecDeque, HashMap}; +use std::fmt::Debug; use std::fs::Permissions; -use std::os::unix::fs::PermissionsExt; +use std::io::Cursor; +use std::os::unix::fs::{MetadataExt, PermissionsExt}; +use std::path::Path; use std::pin::Pin; -use std::sync::{Arc, Weak}; +use std::process::Stdio; +use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc, Weak}; +use std::time::SystemTime; +use bytes::{BufMut, Bytes, BytesMut}; use fast_async_mutex::mutex::Mutex; use filetime::{set_file_mtime, FileTime}; -use futures::future::{BoxFuture, FutureExt, TryFutureExt}; -use futures::stream::{FuturesUnordered, TryStreamExt}; +use futures::future::{try_join, try_join3, try_join_all, BoxFuture, FutureExt, TryFutureExt}; +use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt}; +use hex; +use relative_path::RelativePath; +use tokio::io::AsyncSeekExt; +use tokio::process; +use tokio::sync::oneshot; use tokio::task::spawn_blocking; +use tokio_stream::wrappers::ReadDirStream; +use tokio_util::io::ReaderStream; -use ac_utils::get_and_decode_digest; -use action_messages::ActionInfo; +use ac_utils::{compute_digest, get_and_decode_digest, serialize_and_upload_message, upload_to_store}; +use action_messages::{ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, SymlinkInfo}; use async_trait::async_trait; -use common::{fs, DigestInfo}; -use error::{make_err, Code, Error, ResultExt}; +use common::{fs, log, DigestInfo, JoinHandleDropGuard}; +use error::{make_err, make_input_err, Code, Error, ResultExt}; use fast_slow_store::FastSlowStore; use filesystem_store::FilesystemStore; -use proto::build::bazel::remote::execution::v2::{Action, Directory as ProtoDirectory}; -use proto::com::github::allada::turbo_cache::remote_execution::{ExecuteFinishedResult, StartExecute}; +use proto::build::bazel::remote::execution::v2::{ + Action, Command as ProtoCommand, Directory as ProtoDirectory, Directory, DirectoryNode, FileNode, SymlinkNode, + Tree as ProtoTree, +}; +use proto::com::github::allada::turbo_cache::remote_execution::StartExecute; +use store::Store; + +pub type ActionId = [u8; 32]; + +/// For simplicity we use a fixed exit code for cases when our program is terminated +/// due to a signal. +const EXIT_CODE_FOR_SIGNAL: i32 = 9; /// Aggressively download the digests of files and make a local folder from it. This function /// will spawn unbounded number of futures to try and get these downloaded. The store itself @@ -106,13 +129,13 @@ pub fn download_to_directory<'a>( ); } - for symlink in directory.symlinks { - let dest = format!("{}/{}", current_directory, symlink.name); + for symlink_node in directory.symlinks { + let dest = format!("{}/{}", current_directory, symlink_node.name); futures.push( async move { - fs::symlink(&symlink.target, &dest) + fs::symlink(&symlink_node.target, &dest) .await - .err_tip(|| format!("Could not create symlink {} -> {}", symlink.target, dest))?; + .err_tip(|| format!("Could not create symlink {} -> {}", symlink_node.target, dest))?; Ok(()) } .boxed(), @@ -125,6 +148,185 @@ pub fn download_to_directory<'a>( .boxed() } +async fn upload_file<'a>( + file_handle: fs::FileSlot<'static>, + cas_store: Pin<&'a dyn Store>, + full_path: impl AsRef + Debug, +) -> Result { + let (digest, mut file_handle) = compute_digest(file_handle) + .await + .err_tip(|| format!("for {:?}", full_path))?; + file_handle.rewind().await.err_tip(|| "Could not rewind file")?; + upload_to_store(cas_store, digest.clone(), &mut file_handle) + .await + .err_tip(|| format!("for {:?}", full_path))?; + + let name = full_path + .as_ref() + .file_name() + .err_tip(|| format!("Expected file_name to exist on {:?}", full_path))? + .to_str() + .err_tip(|| make_err!(Code::Internal, "Could not convert {:?} to string", full_path))? + .to_string(); + let metadata = file_handle + .as_ref() + .metadata() + .await + .err_tip(|| format!("While reading metadata for {:?}", full_path))?; + let is_executable = (metadata.mode() & 0o001) != 0; + Ok(FileInfo { + name_or_path: NameOrPath::Name(name), + digest, + is_executable, + }) +} + +async fn upload_symlink( + full_path: impl AsRef + Debug, + full_work_directory_path: impl AsRef, +) -> Result { + let full_target_path = fs::read_link(full_path.as_ref()) + .await + .err_tip(|| format!("Could not get read_link path of {:?}", full_path))?; + + // Detect if our symlink is inside our work directory, if it is find the + // relative path otherwise use the absolute path. + let target = if full_target_path.starts_with(full_work_directory_path.as_ref()) { + let full_target_path = RelativePath::from_path(&full_target_path) + .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?; + RelativePath::from_path(full_work_directory_path.as_ref()) + .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))? + .relative(full_target_path) + .normalize() + .into_string() + } else { + full_target_path + .to_str() + .err_tip(|| make_err!(Code::Internal, "Could not convert '{:?}' to string", full_target_path))? + .to_string() + }; + + let name = full_path + .as_ref() + .file_name() + .err_tip(|| format!("Expected file_name to exist on {:?}", full_path))? + .to_str() + .err_tip(|| make_err!(Code::Internal, "Could not convert {:?} to string", full_path))? + .to_string(); + + Ok(SymlinkInfo { + name_or_path: NameOrPath::Name(name), + target, + }) +} + +fn upload_directory<'a, P: AsRef + Debug + Send + Sync + Clone + 'a>( + cas_store: Pin<&'a dyn Store>, + full_dir_path: P, + full_work_directory: &'a str, +) -> BoxFuture<'a, Result<(Directory, VecDeque), Error>> { + Box::pin(async move { + let file_futures = FuturesUnordered::new(); + let dir_futures = FuturesUnordered::new(); + let symlink_futures = FuturesUnordered::new(); + { + let (_permit, dir_handle) = fs::read_dir(&full_dir_path) + .await + .err_tip(|| format!("Error reading dir for reading {:?}", full_dir_path))? + .into_inner(); + let mut dir_stream = ReadDirStream::new(dir_handle); + // Note: Try very hard to not leave file descriptors open. Try to keep them as short + // lived as possible. This is why we iterate the directory and then build a bunch of + // futures with all the work we are wanting to do then execute it. It allows us to + // close the directory iterator file descriptor, then open the child files/folders. + while let Some(entry) = dir_stream.next().await { + let entry = match entry { + Ok(entry) => entry, + Err(e) => return Err(e).err_tip(|| "Error while iterating directory")?, + }; + let file_type = entry + .file_type() + .await + .err_tip(|| format!("Error running file_type() on {:?}", entry))?; + let full_path = full_dir_path.as_ref().join(entry.path()); + if file_type.is_dir() { + let full_dir_path = full_dir_path.clone(); + dir_futures.push( + upload_directory(cas_store, full_path.clone(), &full_work_directory) + .and_then(|(dir, all_dirs)| async move { + let directory_name = full_path + .file_name() + .err_tip(|| format!("Expected file_name to exist on {:?}", full_dir_path))? + .to_str() + .err_tip(|| { + make_err!(Code::Internal, "Could not convert {:?} to string", full_dir_path) + })? + .to_string(); + + let digest = serialize_and_upload_message(&dir, cas_store) + .await + .err_tip(|| format!("for {:?}", full_path))?; + + Result::<(DirectoryNode, VecDeque), Error>::Ok(( + DirectoryNode { + name: directory_name, + digest: Some(digest.into()), + }, + all_dirs, + )) + }) + .boxed(), + ); + } else if file_type.is_file() { + file_futures.push(async move { + let file_handle = fs::open_file(&full_path) + .await + .err_tip(|| format!("Could not open file {:?}", full_path))?; + upload_file(file_handle, cas_store, full_path) + .map_ok(|v| v.into()) + .await + }); + } else if file_type.is_symlink() { + symlink_futures.push(upload_symlink(full_path, &full_work_directory).map_ok(|v| v.into())); + } + } + } + + let (mut file_nodes, dir_entries, mut symlinks) = try_join3( + file_futures.try_collect::>(), + dir_futures.try_collect::)>>(), + symlink_futures.try_collect::>(), + ) + .await?; + + let mut directory_nodes = Vec::with_capacity(dir_entries.len()); + // For efficiency we use a deque because it allows cheap concat of Vecs. + // We make the assumption here that when performance is important it is because + // our directory is quite large. This allows us to cheaply merge large amounts of + // directories into one VecDeque. Then after we are done we need to collapse it + // down into a single Vec. + let mut all_child_directories = VecDeque::with_capacity(dir_entries.len()); + for (directory_node, mut recursive_child_directories) in dir_entries { + directory_nodes.push(directory_node); + all_child_directories.append(&mut recursive_child_directories); + } + + file_nodes.sort_unstable_by(|a, b| a.name.cmp(&b.name)); + directory_nodes.sort_unstable_by(|a, b| a.name.cmp(&b.name)); + symlinks.sort_unstable_by(|a, b| a.name.cmp(&b.name)); + + let directory = Directory { + files: file_nodes, + directories: directory_nodes, + symlinks, + node_properties: None, // We don't support file properties. + }; + all_child_directories.push_back(directory.clone()); + + Ok((directory, all_child_directories)) + }) +} + #[async_trait] pub trait RunningAction: Sync + Send + Sized + Unpin + 'static { /// Anything that needs to execute before the actions is actually executed should happen here. @@ -142,45 +344,387 @@ pub trait RunningAction: Sync + Send + Sized + Unpin + 'static { /// Returns the final result. As a general rule this action should be thought of as /// a consumption of `self`, meaning once a return happens here the lifetime of `Self` /// is over and any action performed on it after this call is undefined behavior. - async fn get_finished_result(self: Arc) -> Result; + async fn get_finished_result(self: Arc) -> Result; +} + +struct RunningActionImplExecutionResult { + stdout: Bytes, + stderr: Bytes, + exit_code: i32, +} + +struct RunningActionImplState { + command_proto: Option, + // TODO(allada) Kill is not implemented yet, but is instrumented. + _kill_channel_tx: Option>, + kill_channel_rx: Option>, + execution_result: Option, + action_result: Option, } pub struct RunningActionImpl { - _action_info: ActionInfo, - _cas_store: Pin>, - _filesystem_store: Pin>, + worker_id: String, + action_id: ActionId, + work_directory: String, + action_info: ActionInfo, + running_actions_manager: Arc, + state: Mutex, + did_cleanup: AtomicBool, } impl RunningActionImpl { - fn new(action_info: ActionInfo, cas_store: Arc, filesystem_store: Arc) -> Self { + fn new( + worker_id: String, + action_id: ActionId, + work_directory: String, + action_info: ActionInfo, + running_actions_manager: Arc, + ) -> Self { + let (kill_channel_tx, kill_channel_rx) = oneshot::channel(); Self { - _action_info: action_info, - _cas_store: Pin::new(cas_store), - _filesystem_store: Pin::new(filesystem_store), + worker_id, + action_id, + work_directory, + action_info, + running_actions_manager, + state: Mutex::new(RunningActionImplState { + command_proto: None, + kill_channel_rx: Some(kill_channel_rx), + _kill_channel_tx: Some(kill_channel_tx), + execution_result: None, + action_result: None, + }), + did_cleanup: AtomicBool::new(false), } } } +impl Drop for RunningActionImpl { + fn drop(&mut self) { + assert!( + self.did_cleanup.load(Ordering::Relaxed), + "RunningActionImpl did not cleanup. This is a violation of how RunningActionImpl's requirements" + ); + } +} + #[async_trait] impl RunningAction for RunningActionImpl { + /// Prepares any actions needed to execution this action. This action will do the following: + /// * Download any files needed to execute the action + /// * Build a folder with all files needed to execute the action. + /// This function will aggressively download and spawn potentially thousands of futures. It is + /// up to the stores to rate limit if needed. async fn prepare_action(self: Arc) -> Result, Error> { - unimplemented!(); + let command = { + // Download and build out our input files/folders. Also fetch and decode our Command. + let cas_store_pin = Pin::new(self.running_actions_manager.cas_store.as_ref()); + let command_fut = async { + Ok( + get_and_decode_digest::(cas_store_pin, &self.action_info.command_digest) + .await + .err_tip(|| "Converting command_digest to Command")?, + ) + }; + let filesystem_store_pin = Pin::new(self.running_actions_manager.filesystem_store.as_ref()); + // Download the input files/folder and place them into the temp directory. + let download_to_directory_fut = download_to_directory( + cas_store_pin, + filesystem_store_pin, + &self.action_info.input_root_digest, + &self.work_directory, + ); + let (command, _) = try_join(command_fut, download_to_directory_fut).await?; + command + }; + { + // Create all directories needed for our output paths. This is required by the bazel spec. + let full_work_directory = format!("{}/{}", self.work_directory, command.working_directory); + let prepare_output_directories = move |output_file| { + let full_output_path = format!("{}/{}", full_work_directory, output_file); + async move { + let full_parent_path = Path::new(&full_output_path) + .parent() + .err_tip(|| format!("Parent path for {} has no parent", full_output_path))?; + fs::create_dir_all(full_parent_path) + .await + .err_tip(|| format!("Error creating output directory {} (file)", full_parent_path.display()))?; + Result::<(), Error>::Ok(()) + } + }; + try_join_all(command.output_paths.iter().map(prepare_output_directories)).await?; + } + { + let mut state = self.state.lock().await; + state.command_proto = Some(command); + } + Ok(self) } async fn execute(self: Arc) -> Result, Error> { - unimplemented!(); + let (command_proto, mut kill_channel_rx) = { + let mut state = self.state.lock().await; + ( + state + .command_proto + .take() + .err_tip(|| "Expected state to have command_proto in execute()")?, + state + .kill_channel_rx + .take() + .err_tip(|| "Expected state to have kill_channel_rx in execute()")?, + ) + }; + let args = &command_proto.arguments[..]; + if args.len() < 1 { + return Err(make_input_err!("No arguments provided in Command proto")); + } + let mut command_builder = process::Command::new(&args[0]); + command_builder + .args(&args[1..]) + .kill_on_drop(true) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(format!("{}/{}", self.work_directory, command_proto.working_directory)) + .env_clear(); + for environment_variable in &command_proto.environment_variables { + command_builder.env(&environment_variable.name, &environment_variable.value); + } + + let mut child_process = command_builder + .spawn() + .err_tip(|| format!("Could not execute command {:?}", command_proto.arguments))?; + let mut stdout_stream = ReaderStream::new( + child_process + .stdout + .take() + .err_tip(|| "Expected stdout to exist on command this should never happen")?, + ); + let mut stderr_stream = ReaderStream::new( + child_process + .stderr + .take() + .err_tip(|| "Expected stderr to exist on command this should never happen")?, + ); + + let all_stdout_fut = JoinHandleDropGuard::new(tokio::spawn(async move { + let mut all_stdout = BytesMut::new(); + while let Some(chunk) = stdout_stream.next().await { + all_stdout.put(chunk.err_tip(|| "Error reading stdout stream")?); + } + Result::::Ok(all_stdout.freeze()) + })); + let all_stderr_fut = JoinHandleDropGuard::new(tokio::spawn(async move { + let mut all_stderr = BytesMut::new(); + while let Some(chunk) = stderr_stream.next().await { + all_stderr.put(chunk.err_tip(|| "Error reading stderr stream")?); + } + Result::::Ok(all_stderr.freeze()) + })); + loop { + tokio::select! { + maybe_exit_status = child_process.wait() => { + let exit_status = maybe_exit_status.err_tip(|| "Failed to collect exit code of process")?; + // TODO(allada) We should implement stderr/stdout streaming to client here. + let stdout = all_stdout_fut.await.err_tip(|| "Internal error reading from stdout of worker task")??; + let stderr = all_stderr_fut.await.err_tip(|| "Internal error reading from stderr of worker task")??; + { + let mut state = self.state.lock().await; + state.command_proto = Some(command_proto); + state.execution_result = Some(RunningActionImplExecutionResult{ + stdout, + stderr, + exit_code: exit_status.code().unwrap_or(EXIT_CODE_FOR_SIGNAL), + }); + } + return Ok(self); + }, + _ = &mut kill_channel_rx => { + if let Err(e) = child_process.start_kill() { + log::error!("Could kill process in RunningActionsManager : {:?}", e); + } + }, + } + } + // Unreachable. } async fn upload_results(self: Arc) -> Result, Error> { - unimplemented!(); + let (command_proto, execution_result) = { + let mut state = self.state.lock().await; + ( + state + .command_proto + .take() + .err_tip(|| "Expected state to have command_proto in execute()")?, + state + .execution_result + .take() + .err_tip(|| "Execution result does not exist at upload_results stage")?, + ) + }; + let cas_store = Pin::new(self.running_actions_manager.cas_store.as_ref()); + let (stdout_digest, stderr_digest) = { + // Upload our stdout/stderr to our CAS store. + try_join( + async { + let cursor = Cursor::new(execution_result.stdout); + let (digest, mut cursor) = compute_digest(cursor).await?; + cursor.rewind().await.err_tip(|| "Could not rewind cursor")?; + upload_to_store(cas_store, digest.clone(), &mut cursor).await?; + Result::::Ok(digest) + }, + async { + let cursor = Cursor::new(execution_result.stderr); + let (digest, mut cursor) = compute_digest(cursor).await?; + cursor.rewind().await.err_tip(|| "Could not rewind cursor")?; + upload_to_store(cas_store, digest.clone(), &mut cursor).await?; + Result::::Ok(digest) + }, + ) + .await? + }; + + enum OutputType { + None, + File(FileInfo), + Directory(DirectoryInfo), + Symlink(SymlinkInfo), + } + let full_work_directory = format!("{}/{}", self.work_directory, command_proto.working_directory); + + let mut output_path_futures = FuturesUnordered::new(); + for entry in command_proto.output_paths.into_iter() { + let full_work_directory = &full_work_directory; // This ensures we don't move the value. + let full_path = format!("{}/{}", full_work_directory, entry); + output_path_futures.push(async move { + let metadata = { + let file_handle = match fs::open_file(&full_path).await { + Ok(handle) => handle, + Err(e) => { + if e.code == Code::NotFound { + // In the event our output does not exist, according to the bazel remote + // execution spec, we simply ignore it continue. + return Result::::Ok(OutputType::None); + } + return Err(e).err_tip(|| format!("Could not open file {}", full_path)); + } + }; + // We cannot rely on the file_handle's metadata, because it follows symlinks, so + // we need to instead use `symlink_metadata`. + let metadata = fs::symlink_metadata(&full_path) + .await + .err_tip(|| format!("While querying symlink metadata for {}", entry))?; + if metadata.is_file() { + return Ok(OutputType::File( + upload_file(file_handle, cas_store, full_path) + .await + .map(|mut file_info| { + file_info.name_or_path = NameOrPath::Path(entry); + file_info + })?, + )); + } + metadata + }; + if metadata.is_dir() { + Ok(OutputType::Directory( + upload_directory(cas_store, full_path, full_work_directory) + .and_then(|(root_dir, children)| async move { + let tree = ProtoTree { + root: Some(root_dir), + children: children.into(), + }; + let tree_digest = serialize_and_upload_message(&tree, cas_store) + .await + .err_tip(|| format!("While processing {}", entry))?; + Ok(DirectoryInfo { + path: entry, + tree_digest, + }) + }) + .await?, + )) + } else if metadata.is_symlink() { + Ok(OutputType::Symlink( + upload_symlink(full_path, full_work_directory) + .await + .map(|mut symlink_info| { + symlink_info.name_or_path = NameOrPath::Path(entry); + symlink_info + })?, + )) + } else { + Err(make_err!( + Code::Internal, + "{} was not a file, folder or symlink. Must be one.", + full_path + )) + } + }); + } + let mut output_files = vec![]; + let mut output_folders = vec![]; + let mut output_symlinks = vec![]; + while let Some(output_type) = output_path_futures.try_next().await? { + match output_type { + OutputType::File(output_file) => output_files.push(output_file), + OutputType::Directory(output_folder) => output_folders.push(output_folder), + OutputType::Symlink(output_symlink) => output_symlinks.push(output_symlink), + OutputType::None => { /* Safe to ignore */ } + } + } + drop(output_path_futures); + output_files.sort_unstable_by(|a, b| a.name_or_path.cmp(&b.name_or_path)); + output_folders.sort_unstable_by(|a, b| a.path.cmp(&b.path)); + output_symlinks.sort_unstable_by(|a, b| a.name_or_path.cmp(&b.name_or_path)); + { + let mut state = self.state.lock().await; + state.action_result = Some(ActionResult { + output_files, + output_folders, + output_symlinks, + exit_code: execution_result.exit_code, + stdout_digest: stdout_digest.into(), + stderr_digest: stderr_digest.into(), + // TODO(allada) We should implement the timing info here. + execution_metadata: ExecutionMetadata { + worker: self.worker_id.to_string(), + queued_timestamp: SystemTime::UNIX_EPOCH, + worker_start_timestamp: SystemTime::UNIX_EPOCH, + worker_completed_timestamp: SystemTime::UNIX_EPOCH, + input_fetch_start_timestamp: SystemTime::UNIX_EPOCH, + input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH, + execution_start_timestamp: SystemTime::UNIX_EPOCH, + execution_completed_timestamp: SystemTime::UNIX_EPOCH, + output_upload_start_timestamp: SystemTime::UNIX_EPOCH, + output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, + }, + server_logs: Default::default(), // TODO(allada) Not implemented. + }); + } + Ok(self) } async fn cleanup(self: Arc) -> Result, Error> { - unimplemented!(); + // Note: We need to be careful to keep trying to cleanup even if one of the steps fails. + let remove_dir_result = fs::remove_dir_all(&self.work_directory) + .await + .err_tip(|| format!("Could not remove working directory {}", self.work_directory)); + self.did_cleanup.store(true, Ordering::Relaxed); + if let Err(e) = self.running_actions_manager.cleanup_action(&self.action_id).await { + return Result::, Error>::Err(e).merge(remove_dir_result.map(|_| self)); + } + remove_dir_result.map(|_| self) } - async fn get_finished_result(self: Arc) -> Result { - unimplemented!(); + async fn get_finished_result(self: Arc) -> Result { + let mut state = self.state.lock().await; + state + .action_result + .take() + .err_tip(|| "Expected action_result to exist in get_finished_result") } } @@ -190,37 +734,48 @@ pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static { async fn create_and_add_action( self: Arc, + worker_id: String, start_execute: StartExecute, ) -> Result, Error>; -} -type ActionId = [u8; 32]; + async fn get_action(&self, action_id: &ActionId) -> Result, Error>; +} /// Holds state info about what is being executed and the interface for interacting /// with actions while they are running. pub struct RunningActionsManagerImpl { + root_work_directory: String, cas_store: Arc, filesystem_store: Arc, running_actions: Mutex>>, } impl RunningActionsManagerImpl { - pub fn new(cas_store: Arc) -> Result { + pub fn new(root_work_directory: String, cas_store: Arc) -> Result { // Sadly because of some limitations of how Any works we need to clone more times than optimal. let filesystem_store = cas_store - .fast_slow() + .fast_store() .clone() .as_any() .downcast_ref::>() .err_tip(|| "Expected fast slow store for cas_store in RunningActionsManagerImpl")? .clone(); Ok(Self { + root_work_directory, cas_store, filesystem_store, running_actions: Mutex::new(HashMap::new()), }) } + async fn make_work_directory(&self, action_id: &ActionId) -> Result { + let work_directory = format!("{}/{}", self.root_work_directory, hex::encode(action_id)); + fs::create_dir(&work_directory) + .await + .err_tip(|| format!("Error creating work directory {}", work_directory))?; + Ok(work_directory) + } + async fn create_action_info(&self, start_execute: StartExecute) -> Result { let execute_request = start_execute .execute_request @@ -238,6 +793,17 @@ impl RunningActionsManagerImpl { .err_tip(|| "Could not create ActionInfo in create_and_add_action()")?, ) } + + async fn cleanup_action(&self, action_id: &ActionId) -> Result<(), Error> { + let mut running_actions = self.running_actions.lock().await; + running_actions.remove(action_id).err_tip(|| { + format!( + "Expected action id '{:?}' to exist in RunningActionsManagerImpl", + action_id + ) + })?; + Ok(()) + } } #[async_trait] @@ -246,14 +812,18 @@ impl RunningActionsManager for RunningActionsManagerImpl { async fn create_and_add_action( self: Arc, + worker_id: String, start_execute: StartExecute, ) -> Result, Error> { let action_info = self.create_action_info(start_execute).await?; let action_id = action_info.unique_qualifier.get_hash(); + let work_directory = self.make_work_directory(&action_id).await?; let running_action = Arc::new(RunningActionImpl::new( + worker_id, + action_id, + work_directory, action_info, - self.cas_store.clone(), - self.filesystem_store.clone(), + self.clone(), )); { let mut running_actions = self.running_actions.lock().await; @@ -261,4 +831,13 @@ impl RunningActionsManager for RunningActionsManagerImpl { } Ok(running_action) } + + async fn get_action(&self, action_id: &ActionId) -> Result, Error> { + let running_actions = self.running_actions.lock().await; + Ok(running_actions + .get(action_id) + .err_tip(|| format!("Action '{:?}' not found", action_id))? + .upgrade() + .err_tip(|| "Could not upgrade RunningAction Arc")?) + } } diff --git a/cas/worker/tests/local_worker_test.rs b/cas/worker/tests/local_worker_test.rs index 670496053..7251e68fa 100644 --- a/cas/worker/tests/local_worker_test.rs +++ b/cas/worker/tests/local_worker_test.rs @@ -7,7 +7,7 @@ use std::time::{Duration, SystemTime}; use prost::Message; use tonic::Response; -use action_messages::{ActionInfo, ActionInfoHashKey}; +use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ExecutionMetadata}; use common::{encode_stream_proto, DigestInfo}; use config::cas_server::WrokerProperty; use error::{make_input_err, Error}; @@ -131,6 +131,7 @@ mod local_worker_tests { } const SALT: u64 = 1000; + let action_digest = DigestInfo::new([03u8; 32], 10); let action_info = ActionInfo { instance_name: "foo".to_string(), command_digest: DigestInfo::new([01u8; 32], 10), @@ -140,7 +141,7 @@ mod local_worker_tests { priority: 0, insert_timestamp: SystemTime::UNIX_EPOCH, unique_qualifier: ActionInfoHashKey { - digest: DigestInfo::new([03u8; 32], 10), + digest: action_digest.clone(), salt: SALT, }, }; @@ -157,11 +158,26 @@ mod local_worker_tests { .await .map_err(|e| make_input_err!("Could not send : {:?}", e))?; } - let execute_finished_result = ExecuteFinishedResult { - worker_id: "1234".to_string(), - action_digest: Some(DigestInfo::new([11u8; 32], 10).into()), - salt: 123, - execute_response: None, + let action_result = ActionResult { + output_files: vec![], + output_folders: vec![], + output_symlinks: vec![], + exit_code: 5, + stdout_digest: DigestInfo::new([21u8; 32], 10), + stderr_digest: DigestInfo::new([22u8; 32], 10), + execution_metadata: ExecutionMetadata { + worker: expected_worker_id.clone(), + queued_timestamp: SystemTime::UNIX_EPOCH, + worker_start_timestamp: SystemTime::UNIX_EPOCH, + worker_completed_timestamp: SystemTime::UNIX_EPOCH, + input_fetch_start_timestamp: SystemTime::UNIX_EPOCH, + input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH, + execution_start_timestamp: SystemTime::UNIX_EPOCH, + execution_completed_timestamp: SystemTime::UNIX_EPOCH, + output_upload_start_timestamp: SystemTime::UNIX_EPOCH, + output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, + }, + server_logs: HashMap::new(), }; let running_action = Arc::new(MockRunningAction::new()); @@ -174,7 +190,7 @@ mod local_worker_tests { // Now the RunningAction needs to send a series of state updates. This shortcuts them // into a single call (shortcut for prepare, execute, upload, collect_results, cleanup). running_action - .simple_expect_get_finished_result(Ok(execute_finished_result.clone())) + .simple_expect_get_finished_result(Ok(action_result.clone())) .await?; // Now our client should be notified that our runner finished. @@ -187,7 +203,12 @@ mod local_worker_tests { assert_eq!( execution_response, ExecuteResult { - response: Some(execute_result::Response::Result(execute_finished_result)) + response: Some(execute_result::Response::Result(ExecuteFinishedResult { + worker_id: expected_worker_id, + action_digest: Some(action_digest.into()), + salt: SALT, + execute_response: Some(ActionStage::Completed(action_result).into()), + })) } ); diff --git a/cas/worker/tests/running_actions_manager_test.rs b/cas/worker/tests/running_actions_manager_test.rs index 744ebdc5f..28288913b 100644 --- a/cas/worker/tests/running_actions_manager_test.rs +++ b/cas/worker/tests/running_actions_manager_test.rs @@ -1,22 +1,30 @@ // Copyright 2022 Nathan (Blaise) Bruer. All rights reserved. +use std::collections::HashMap; use std::env; use std::os::unix::fs::MetadataExt; use std::pin::Pin; +use std::str::from_utf8; use std::sync::Arc; use std::time::{Duration, SystemTime}; +use futures::{FutureExt, TryFutureExt}; +use prost::Message; use rand::{thread_rng, Rng}; +use ac_utils::{get_and_decode_digest, serialize_and_upload_message}; +use action_messages::{ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, SymlinkInfo}; use common::{fs, DigestInfo}; use config; use error::{Error, ResultExt}; use fast_slow_store::FastSlowStore; use filesystem_store::FilesystemStore; use memory_store::MemoryStore; -use prost::Message; -use proto::build::bazel::remote::execution::v2::{Directory, DirectoryNode, FileNode, NodeProperties, SymlinkNode}; -use running_actions_manager::download_to_directory; +use proto::build::bazel::remote::execution::v2::{ + Action, Command, Directory, DirectoryNode, ExecuteRequest, FileNode, NodeProperties, SymlinkNode, Tree, +}; +use proto::com::github::allada::turbo_cache::remote_execution::StartExecute; +use running_actions_manager::{download_to_directory, RunningAction, RunningActionsManager, RunningActionsManagerImpl}; use store::Store; /// Get temporary path from either `TEST_TMPDIR` or best effort temp directory if @@ -315,4 +323,355 @@ mod running_actions_manager_tests { } Ok(()) } + + #[tokio::test] + async fn upload_files_test() -> Result<(), Box> { + let (_, slow_store, cas_store) = setup_stores().await?; + let root_work_directory = make_temp_path("root_work_directory"); + fs::create_dir_all(&root_work_directory).await?; + let running_actions_manager = Arc::new(RunningActionsManagerImpl::new( + root_work_directory, + Pin::into_inner(cas_store.clone()), + )?); + const WORKER_ID: &str = "foo_worker_id"; + let action_result = { + const SALT: u64 = 55; + let command = Command { + arguments: vec![ + "sh".to_string(), + "-c".to_string(), + "echo -n 123 > test.txt; echo -n foo-stdout; >&2 echo -n bar-stderr".to_string(), + ], + output_paths: vec!["test.txt".to_string()], + working_directory: ".".to_string(), + ..Default::default() + }; + let command_digest = serialize_and_upload_message(&command, cas_store.as_ref()).await?; + let input_root_digest = serialize_and_upload_message(&Directory::default(), cas_store.as_ref()).await?; + let action = Action { + command_digest: Some(command_digest.into()), + input_root_digest: Some(input_root_digest.into()), + ..Default::default() + }; + let action_digest = serialize_and_upload_message(&action, cas_store.as_ref()).await?; + + let running_action_impl = running_actions_manager + .create_and_add_action( + WORKER_ID.to_string(), + StartExecute { + execute_request: Some(ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }), + salt: SALT, + }, + ) + .await?; + + running_action_impl + .clone() + .prepare_action() + .and_then(|action| action.execute()) + .and_then(|action| action.upload_results()) + .and_then(|action| action.get_finished_result()) + .then(|result| async move { + running_action_impl.cleanup().await?; + result + }) + .await? + }; + let file_content = slow_store + .as_ref() + .get_part_unchunked(action_result.output_files[0].digest.clone(), 0, None, None) + .await?; + assert_eq!(from_utf8(&file_content)?, "123"); + let stdout_content = slow_store + .as_ref() + .get_part_unchunked(action_result.stdout_digest.clone(), 0, None, None) + .await?; + assert_eq!(from_utf8(&stdout_content)?, "foo-stdout"); + let stderr_content = slow_store + .as_ref() + .get_part_unchunked(action_result.stderr_digest.clone(), 0, None, None) + .await?; + assert_eq!(from_utf8(&stderr_content)?, "bar-stderr"); + assert_eq!( + action_result, + ActionResult { + output_files: vec![FileInfo { + name_or_path: NameOrPath::Path("test.txt".to_string()), + digest: DigestInfo::try_new("a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3", 3)?, + is_executable: false, + }], + stdout_digest: DigestInfo::try_new( + "426afaf613d8cfdd9fa8addcc030ae6c95a7950ae0301164af1d5851012081d5", + 10 + )?, + stderr_digest: DigestInfo::try_new( + "7b2e400d08b8e334e3172d105be308b506c6036c62a9bde5c509d7808b28b213", + 10 + )?, + exit_code: 0, + output_folders: vec![], + output_symlinks: vec![], + server_logs: HashMap::new(), + execution_metadata: ExecutionMetadata { + worker: WORKER_ID.to_string(), + queued_timestamp: SystemTime::UNIX_EPOCH, + worker_start_timestamp: SystemTime::UNIX_EPOCH, + worker_completed_timestamp: SystemTime::UNIX_EPOCH, + input_fetch_start_timestamp: SystemTime::UNIX_EPOCH, + input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH, + execution_start_timestamp: SystemTime::UNIX_EPOCH, + execution_completed_timestamp: SystemTime::UNIX_EPOCH, + output_upload_start_timestamp: SystemTime::UNIX_EPOCH, + output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, + } + } + ); + Ok(()) + } + + #[tokio::test] + async fn upload_dir_and_symlink_test() -> Result<(), Box> { + let (_, slow_store, cas_store) = setup_stores().await?; + let root_work_directory = make_temp_path("root_work_directory"); + fs::create_dir_all(&root_work_directory).await?; + let running_actions_manager = Arc::new(RunningActionsManagerImpl::new( + root_work_directory, + Pin::into_inner(cas_store.clone()), + )?); + const WORKER_ID: &str = "foo_worker_id"; + let action_result = { + const SALT: u64 = 55; + let command = Command { + arguments: vec![ + "sh".to_string(), + "-c".to_string(), + concat!( + "mkdir -p dir1/dir2 && ", + "echo foo > dir1/file && ", + "touch dir1/file2 && ", + "ln -s ../file dir1/dir2/sym &&", + "ln -s /dev/null empty_sym", + ) + .to_string(), + ], + output_paths: vec!["dir1".to_string(), "empty_sym".to_string()], + working_directory: ".".to_string(), + ..Default::default() + }; + let command_digest = serialize_and_upload_message(&command, cas_store.as_ref()).await?; + let input_root_digest = serialize_and_upload_message(&Directory::default(), cas_store.as_ref()).await?; + let action = Action { + command_digest: Some(command_digest.into()), + input_root_digest: Some(input_root_digest.into()), + ..Default::default() + }; + let action_digest = serialize_and_upload_message(&action, cas_store.as_ref()).await?; + + let running_action_impl = running_actions_manager + .create_and_add_action( + WORKER_ID.to_string(), + StartExecute { + execute_request: Some(ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }), + salt: SALT, + }, + ) + .await?; + + running_action_impl + .clone() + .prepare_action() + .and_then(|action| action.execute()) + .and_then(|action| action.upload_results()) + .and_then(|action| action.get_finished_result()) + .then(|result| async move { + running_action_impl.cleanup().await?; + result + }) + .await? + }; + let tree = + get_and_decode_digest::(slow_store.as_ref(), &action_result.output_folders[0].tree_digest).await?; + let root_directory = Directory { + files: vec![ + FileNode { + name: "file".to_string(), + digest: Some( + DigestInfo::try_new("b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c", 4)? + .into(), + ), + ..Default::default() + }, + FileNode { + name: "file2".to_string(), + digest: Some( + DigestInfo::try_new("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0)? + .into(), + ), + ..Default::default() + }, + ], + directories: vec![DirectoryNode { + name: "dir2".to_string(), + digest: Some( + DigestInfo::try_new("cce0098e0b0f1d785edb0da50beedb13e27dcd459b091b2f8f82543cb7cd0527", 16)?.into(), + ), + }], + ..Default::default() + }; + assert_eq!( + tree, + Tree { + root: Some(root_directory.clone()), + children: vec![ + Directory { + symlinks: vec![SymlinkNode { + name: "sym".to_string(), + target: "../file".to_string(), + ..Default::default() + }], + ..Default::default() + }, + root_directory + ], + ..Default::default() + } + ); + assert_eq!( + action_result, + ActionResult { + output_files: vec![], + stdout_digest: DigestInfo::try_new( + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + 0 + )?, + stderr_digest: DigestInfo::try_new( + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + 0 + )?, + exit_code: 0, + output_folders: vec![DirectoryInfo { + path: "dir1".to_string(), + tree_digest: DigestInfo::try_new( + "adbb04fa6e166e663c1310bbf8ba494e468b1b6c33e1e5346e2216b6904c9917", + 490 + )?, + }], + output_symlinks: vec![SymlinkInfo { + name_or_path: NameOrPath::Path("empty_sym".to_string()), + target: "/dev/null".to_string(), + }], + server_logs: HashMap::new(), + execution_metadata: ExecutionMetadata { + worker: WORKER_ID.to_string(), + queued_timestamp: SystemTime::UNIX_EPOCH, + worker_start_timestamp: SystemTime::UNIX_EPOCH, + worker_completed_timestamp: SystemTime::UNIX_EPOCH, + input_fetch_start_timestamp: SystemTime::UNIX_EPOCH, + input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH, + execution_start_timestamp: SystemTime::UNIX_EPOCH, + execution_completed_timestamp: SystemTime::UNIX_EPOCH, + output_upload_start_timestamp: SystemTime::UNIX_EPOCH, + output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, + } + } + ); + Ok(()) + } + + #[tokio::test] + async fn cleanup_happens_on_job_failure() -> Result<(), Box> { + let (_, _, cas_store) = setup_stores().await?; + let root_work_directory = make_temp_path("root_work_directory"); + fs::create_dir_all(&root_work_directory).await?; + let running_actions_manager = Arc::new(RunningActionsManagerImpl::new( + root_work_directory.clone(), + Pin::into_inner(cas_store.clone()), + )?); + const WORKER_ID: &str = "foo_worker_id"; + let action_result = { + const SALT: u64 = 55; + let command = Command { + arguments: vec!["sh".to_string(), "-c".to_string(), "exit 33".to_string()], + output_paths: vec![], + working_directory: ".".to_string(), + ..Default::default() + }; + let command_digest = serialize_and_upload_message(&command, cas_store.as_ref()).await?; + let input_root_digest = serialize_and_upload_message(&Directory::default(), cas_store.as_ref()).await?; + let action = Action { + command_digest: Some(command_digest.into()), + input_root_digest: Some(input_root_digest.into()), + ..Default::default() + }; + let action_digest = serialize_and_upload_message(&action, cas_store.as_ref()).await?; + + let running_action_impl = running_actions_manager + .create_and_add_action( + WORKER_ID.to_string(), + StartExecute { + execute_request: Some(ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }), + salt: SALT, + }, + ) + .await?; + + running_action_impl + .clone() + .prepare_action() + .and_then(|action| action.execute()) + .and_then(|action| action.upload_results()) + .and_then(|action| action.get_finished_result()) + .then(|result| async move { + running_action_impl.cleanup().await?; + result + }) + .await? + }; + assert_eq!( + action_result, + ActionResult { + output_files: vec![], + stdout_digest: DigestInfo::try_new( + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + 0 + )?, + stderr_digest: DigestInfo::try_new( + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + 0 + )?, + exit_code: 33, + output_folders: vec![], + output_symlinks: vec![], + server_logs: HashMap::new(), + execution_metadata: ExecutionMetadata { + worker: WORKER_ID.to_string(), + queued_timestamp: SystemTime::UNIX_EPOCH, + worker_start_timestamp: SystemTime::UNIX_EPOCH, + worker_completed_timestamp: SystemTime::UNIX_EPOCH, + input_fetch_start_timestamp: SystemTime::UNIX_EPOCH, + input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH, + execution_start_timestamp: SystemTime::UNIX_EPOCH, + execution_completed_timestamp: SystemTime::UNIX_EPOCH, + output_upload_start_timestamp: SystemTime::UNIX_EPOCH, + output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, + } + } + ); + let mut dir_stream = fs::read_dir(&root_work_directory).await?; + assert!( + dir_stream.as_mut().next_entry().await?.is_none(), + "Expected empty directory at {}", + root_work_directory + ); + Ok(()) + } } diff --git a/cas/worker/tests/utils/mock_running_actions_manager.rs b/cas/worker/tests/utils/mock_running_actions_manager.rs index 7a2e8fc90..334dbcd71 100644 --- a/cas/worker/tests/utils/mock_running_actions_manager.rs +++ b/cas/worker/tests/utils/mock_running_actions_manager.rs @@ -6,13 +6,14 @@ use async_trait::async_trait; use fast_async_mutex::mutex::Mutex; use tokio::sync::mpsc; +use action_messages::ActionResult; use error::{make_input_err, Error}; -use proto::com::github::allada::turbo_cache::remote_execution::{ExecuteFinishedResult, StartExecute}; -use running_actions_manager::{RunningAction, RunningActionsManager}; +use proto::com::github::allada::turbo_cache::remote_execution::StartExecute; +use running_actions_manager::{ActionId, RunningAction, RunningActionsManager}; #[derive(Debug)] enum RunningActionManagerCalls { - CreateAndAddAction(StartExecute), + CreateAndAddAction((String, StartExecute)), } enum RunningActionManagerReturns { @@ -41,7 +42,10 @@ impl MockRunningActionsManager { } impl MockRunningActionsManager { - pub async fn expect_create_and_add_action(&self, result: Result, Error>) -> StartExecute { + pub async fn expect_create_and_add_action( + &self, + result: Result, Error>, + ) -> (String, StartExecute) { let mut rx_call_lock = self.rx_call.lock().await; let req = match rx_call_lock.recv().await.expect("Could not receive msg in mpsc") { RunningActionManagerCalls::CreateAndAddAction(req) => req, @@ -60,16 +64,24 @@ impl RunningActionsManager for MockRunningActionsManager { async fn create_and_add_action( self: Arc, + worker_id: String, start_execute: StartExecute, ) -> Result, Error> { self.tx_call - .send(RunningActionManagerCalls::CreateAndAddAction(start_execute)) + .send(RunningActionManagerCalls::CreateAndAddAction(( + worker_id, + start_execute, + ))) .expect("Could not send request to mpsc"); let mut rx_resp_lock = self.rx_resp.lock().await; match rx_resp_lock.recv().await.expect("Could not receive msg in mpsc") { RunningActionManagerReturns::CreateAndAddAction(result) => result, } } + + async fn get_action(&self, _action_id: &ActionId) -> Result, Error> { + unimplemented!("get_action not implemented"); + } } #[derive(Debug)] @@ -87,7 +99,7 @@ enum RunningActionReturns { Execute(Result, Error>), UploadResults(Result, Error>), Cleanup(Result, Error>), - GetFinishedResult(Result), + GetFinishedResult(Result), } #[derive(Debug)] @@ -113,13 +125,14 @@ impl MockRunningAction { pub async fn simple_expect_get_finished_result( self: &Arc, - result: Result, + result: Result, ) -> Result<(), Error> { self.expect_prepare_action(Ok(())).await?; self.expect_execute(Ok(())).await?; self.upload_results(Ok(())).await?; + let result = self.get_finished_result(result).await; self.cleanup(Ok(())).await?; - self.get_finished_result(result).await + result } pub async fn expect_prepare_action(self: &Arc, result: Result<(), Error>) -> Result<(), Error> { @@ -186,10 +199,7 @@ impl MockRunningAction { Ok(req) } - pub async fn get_finished_result( - self: &Arc, - result: Result, - ) -> Result<(), Error> { + pub async fn get_finished_result(self: &Arc, result: Result) -> Result<(), Error> { let mut rx_call_lock = self.rx_call.lock().await; let req = match rx_call_lock.recv().await.expect("Could not receive msg in mpsc") { RunningActionCalls::GetFinishedResult => (), @@ -254,7 +264,7 @@ impl RunningAction for MockRunningAction { } } - async fn get_finished_result(self: Arc) -> Result { + async fn get_finished_result(self: Arc) -> Result { self.tx_call .send(RunningActionCalls::GetFinishedResult) .expect("Could not send request to mpsc"); diff --git a/util/fs.rs b/util/fs.rs index 68ba99a58..25c393c6a 100644 --- a/util/fs.rs +++ b/util/fs.rs @@ -244,3 +244,11 @@ pub async fn symlink_metadata(path: impl AsRef) -> Result .map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e))?; tokio::fs::symlink_metadata(path).await.map_err(|e| e.into()) } + +pub async fn remove_dir_all(path: impl AsRef) -> Result<(), Error> { + let _permit = OPEN_FILE_SEMAPHORE + .acquire() + .await + .map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e))?; + tokio::fs::remove_dir_all(path).await.map_err(|e| e.into()) +}