From f684028a56f99eb509ebadf668111b8f214a15d7 Mon Sep 17 00:00:00 2001 From: "Nathan (Blaise) Bruer" Date: Thu, 16 Nov 2023 23:13:27 -0600 Subject: [PATCH] Add Blake3 digest support Fully support Blake3 digest upload format. This can increase performance significantly for cases where hashing files is a bottleneck. closes #395 --- cas/BUILD | 1 + cas/cas_main.rs | 11 +- cas/grpc_service/BUILD | 4 + cas/grpc_service/capabilities_server.rs | 7 +- cas/grpc_service/execution_server.rs | 15 +- .../tests/worker_api_server_test.rs | 2 + cas/scheduler/BUILD | 3 + cas/scheduler/action_messages.rs | 18 +- cas/scheduler/tests/action_messages_test.rs | 5 + cas/scheduler/tests/simple_scheduler_test.rs | 12 +- cas/scheduler/tests/utils/scheduler_utils.rs | 2 + cas/worker/BUILD | 3 + cas/worker/local_worker.rs | 7 +- cas/worker/running_actions_manager.rs | 63 ++++--- cas/worker/tests/local_worker_test.rs | 88 ++++++++- .../tests/running_actions_manager_test.rs | 176 +++++++++++++++++- .../utils/mock_running_actions_manager.rs | 7 +- config/cas_server.rs | 13 ++ gencargo/action_messages/Cargo.toml | 1 + gencargo/action_messages_test/Cargo.toml | 1 + gencargo/capabilities_server/Cargo.toml | 1 + gencargo/cas/Cargo.toml | 1 + gencargo/digest_hasher/Cargo.toml | 3 + gencargo/execution_server/Cargo.toml | 1 + gencargo/local_worker/Cargo.toml | 1 + gencargo/local_worker_test/Cargo.toml | 1 + .../mock_running_actions_manager/Cargo.toml | 1 + gencargo/scheduler_utils/Cargo.toml | 1 + gencargo/worker_api_server/Cargo.toml | 1 + gencargo/worker_api_server_test/Cargo.toml | 1 + util/BUILD | 3 + util/common.rs | 7 +- util/digest_hasher.rs | 67 +++++++ 33 files changed, 463 insertions(+), 65 deletions(-) diff --git a/cas/BUILD b/cas/BUILD index bda4060a0..30ff17417 100644 --- a/cas/BUILD +++ b/cas/BUILD @@ -16,6 +16,7 @@ rust_binary( "//cas/worker:local_worker", "//config", "//util:common", + "//util:digest_hasher", "//util:error", "//util:metrics_utils", "@crate_index//:async-lock", diff --git a/cas/cas_main.rs b/cas/cas_main.rs index abee8993e..5b9872dbb 100644 --- a/cas/cas_main.rs +++ b/cas/cas_main.rs @@ -42,9 +42,12 @@ use capabilities_server::CapabilitiesServer; use cas_server::CasServer; use common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; use common::log; -use config::cas_server::{CasConfig, CompressionAlgorithm, GlobalConfig, ServerConfig, WorkerConfig}; +use config::cas_server::{ + CasConfig, CompressionAlgorithm, ConfigDigestHashFunction, GlobalConfig, ServerConfig, WorkerConfig, +}; use default_scheduler_factory::scheduler_factory; use default_store_factory::store_factory; +use digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc}; use error::{make_err, Code, Error, ResultExt}; use execution_server::ExecutionServer; use local_worker::new_local_worker; @@ -632,10 +635,16 @@ fn main() -> Result<(), Box> { }; service.prometheus.is_none() }), + default_digest_hash_function: None, } }; set_open_file_limit(global_cfg.max_open_files); set_idle_file_descriptor_timeout(Duration::from_millis(global_cfg.idle_file_descriptor_timeout_millis))?; + set_default_digest_hasher_func(DigestHasherFunc::from( + global_cfg + .default_digest_hash_function + .unwrap_or(ConfigDigestHashFunction::sha256), + ))?; !global_cfg.disable_metrics }; // Override metrics enabled if the environment variable is set. diff --git a/cas/grpc_service/BUILD b/cas/grpc_service/BUILD index ee0a3caa0..a31c02a10 100644 --- a/cas/grpc_service/BUILD +++ b/cas/grpc_service/BUILD @@ -46,6 +46,7 @@ rust_library( "//cas/store", "//config", "//proto", + "//util:digest_hasher", "//util:error", "@crate_index//:tonic", ], @@ -85,6 +86,7 @@ rust_library( "//config", "//proto", "//util:common", + "//util:digest_hasher", "//util:error", "@crate_index//:futures", "@crate_index//:rand", @@ -107,6 +109,7 @@ rust_library( "//config", "//proto", "//util:common", + "//util:digest_hasher", "//util:error", "@crate_index//:futures", "@crate_index//:tokio", @@ -128,6 +131,7 @@ rust_test( "//config", "//proto", "//util:common", + "//util:digest_hasher", "//util:error", "@crate_index//:pretty_assertions", "@crate_index//:prost-types", diff --git a/cas/grpc_service/capabilities_server.rs b/cas/grpc_service/capabilities_server.rs index bf29347b1..9afd7ec83 100644 --- a/cas/grpc_service/capabilities_server.rs +++ b/cas/grpc_service/capabilities_server.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use tonic::{Request, Response, Status}; use config::cas_server::{CapabilitiesConfig, InstanceName}; +use digest_hasher::default_digest_hasher_func; use error::{Error, ResultExt}; use proto::build::bazel::remote::execution::v2::{ capabilities_server::Capabilities, capabilities_server::CapabilitiesServer as Server, @@ -85,7 +86,7 @@ impl Capabilities for CapabilitiesServer { let instance_name = request.into_inner().instance_name; let maybe_supported_node_properties = self.supported_node_properties_for_instance.get(&instance_name); let execution_capabilities = maybe_supported_node_properties.map(|props_for_instance| ExecutionCapabilities { - digest_function: DigestFunction::Sha256.into(), + digest_function: default_digest_hasher_func().proto_digest_func().into(), exec_enabled: true, // TODO(blaise.bruer) Make this configurable. execution_priority_capabilities: Some(PriorityCapabilities { priorities: vec![PriorityRange { @@ -94,12 +95,12 @@ impl Capabilities for CapabilitiesServer { }], }), supported_node_properties: props_for_instance.clone(), - digest_functions: vec![DigestFunction::Sha256.into()], + digest_functions: vec![DigestFunction::Sha256.into(), DigestFunction::Blake3.into()], }); let resp = ServerCapabilities { cache_capabilities: Some(CacheCapabilities { - digest_functions: vec![DigestFunction::Sha256.into()], + digest_functions: vec![DigestFunction::Sha256.into(), DigestFunction::Blake3.into()], action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities { update_enabled: true }), cache_priority_capabilities: None, max_batch_total_size_bytes: MAX_BATCH_TOTAL_SIZE, diff --git a/cas/grpc_service/execution_server.rs b/cas/grpc_service/execution_server.rs index fc5aa0be6..b361facac 100644 --- a/cas/grpc_service/execution_server.rs +++ b/cas/grpc_service/execution_server.rs @@ -27,6 +27,7 @@ use ac_utils::get_and_decode_digest; use action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY}; use common::{log, DigestInfo}; use config::cas_server::{ExecutionConfig, InstanceName}; +use digest_hasher::DigestHasherFunc; use error::{make_input_err, Error, ResultExt}; use platform_property_manager::PlatformProperties; use proto::build::bazel::remote::execution::v2::{ @@ -54,6 +55,7 @@ impl InstanceInfo { action: &Action, priority: i32, skip_cache_lookup: bool, + digest_function: DigestHasherFunc, ) -> Result { let command_digest = DigestInfo::try_from( action @@ -124,6 +126,7 @@ impl InstanceInfo { }, }, skip_cache_lookup, + digest_function, }) } } @@ -194,7 +197,17 @@ impl ExecutionServer { let action = get_and_decode_digest::(instance_info.cas_pin(), &digest).await?; let action_info = instance_info - .build_action_info(instance_name, digest, &action, priority, execute_req.skip_cache_lookup) + .build_action_info( + instance_name, + digest, + &action, + priority, + execute_req.skip_cache_lookup, + execute_req + .digest_function + .try_into() + .err_tip(|| "Could not convert digest function in inner_execute()")?, + ) .await?; let rx = instance_info diff --git a/cas/grpc_service/tests/worker_api_server_test.rs b/cas/grpc_service/tests/worker_api_server_test.rs index 2d6a571a2..12c688327 100644 --- a/cas/grpc_service/tests/worker_api_server_test.rs +++ b/cas/grpc_service/tests/worker_api_server_test.rs @@ -22,6 +22,7 @@ use tonic::Request; use action_messages::{ActionInfo, ActionInfoHashKey, ActionStage}; use common::DigestInfo; use config::cas_server::WorkerApiConfig; +use digest_hasher::DigestHasherFunc; use error::{Error, ResultExt}; use platform_property_manager::PlatformProperties; use proto::build::bazel::remote::execution::v2::{ @@ -269,6 +270,7 @@ pub mod execution_response_tests { salt: SALT, }, skip_cache_lookup: true, + digest_function: DigestHasherFunc::Sha256, }; let mut client_action_state_receiver = test_context.scheduler.add_action(action_info).await?; diff --git a/cas/scheduler/BUILD b/cas/scheduler/BUILD index c77db6d8a..ee3de8d40 100644 --- a/cas/scheduler/BUILD +++ b/cas/scheduler/BUILD @@ -63,6 +63,7 @@ rust_library( ":action_messages", ":platform_property_manager", "//util:common", + "//util:digest_hasher", ], ) @@ -256,6 +257,7 @@ rust_library( ":platform_property_manager", "//proto", "//util:common", + "//util:digest_hasher", "//util:error", "//util:metrics_utils", "@crate_index//:blake3", @@ -273,6 +275,7 @@ rust_test( ":platform_property_manager", "//proto", "//util:common", + "//util:digest_hasher", "//util:error", "@crate_index//:pretty_assertions", "@crate_index//:tokio", diff --git a/cas/scheduler/action_messages.rs b/cas/scheduler/action_messages.rs index d48e436cc..2c1703fb1 100644 --- a/cas/scheduler/action_messages.rs +++ b/cas/scheduler/action_messages.rs @@ -25,14 +25,15 @@ use prost::Message; use prost_types::Any; use common::{DigestInfo, HashMapExt, VecExt}; +use digest_hasher::DigestHasherFunc; use error::{error_if, make_input_err, Error, ResultExt}; use metrics_utils::{CollectorState, MetricsComponent}; use platform_property_manager::PlatformProperties; use prost::bytes::Bytes; use proto::build::bazel::remote::execution::v2::{ - digest_function, execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, - ExecuteRequest, ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile, - OutputSymlink, SymlinkNode, + execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest, + ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile, OutputSymlink, + SymlinkNode, }; use proto::google::longrunning::{operation::Result as LongRunningResult, Operation}; use proto::google::rpc::Status; @@ -145,6 +146,9 @@ pub struct ActionInfo { /// Whether to try looking up this action in the cache. pub skip_cache_lookup: bool, + + /// The digest function this action expects. + pub digest_function: DigestHasherFunc, } impl ActionInfo { @@ -199,6 +203,8 @@ impl ActionInfo { salt, }, skip_cache_lookup: execute_request.skip_cache_lookup, + digest_function: DigestHasherFunc::try_from(execute_request.digest_function) + .err_tip(|| "Could not find digest_function in try_from_action_and_execute_request_with_salt")?, }) } } @@ -212,7 +218,7 @@ impl From for ExecuteRequest { skip_cache_lookup: true, // The worker should never cache lookup. execution_policy: None, // Not used in the worker. results_cache_policy: None, // Not used in the worker. - digest_function: digest_function::Value::Sha256.into(), + digest_function: val.digest_function.proto_digest_func().into(), } } } @@ -587,8 +593,8 @@ impl Default for ActionResult { output_directory_symlinks: Default::default(), output_file_symlinks: Default::default(), exit_code: INTERNAL_ERROR_EXIT_CODE, - stdout_digest: DigestInfo::empty_digest(), - stderr_digest: DigestInfo::empty_digest(), + stdout_digest: DigestInfo::new([0u8; 32], 0), + stderr_digest: DigestInfo::new([0u8; 32], 0), execution_metadata: ExecutionMetadata { worker: "".to_string(), queued_timestamp: SystemTime::UNIX_EPOCH, diff --git a/cas/scheduler/tests/action_messages_test.rs b/cas/scheduler/tests/action_messages_test.rs index 5908277c4..4232e9920 100644 --- a/cas/scheduler/tests/action_messages_test.rs +++ b/cas/scheduler/tests/action_messages_test.rs @@ -18,6 +18,7 @@ use std::time::{Duration, SystemTime}; use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata}; use common::DigestInfo; +use digest_hasher::DigestHasherFunc; use error::Error; use platform_property_manager::PlatformProperties; use proto::build::bazel::remote::execution::v2::ExecuteResponse; @@ -118,6 +119,7 @@ mod action_messages_tests { salt: 0, }, skip_cache_lookup: true, + digest_function: DigestHasherFunc::Sha256, }); let lowest_priority_action = Arc::new(ActionInfo { command_digest: DigestInfo::new([0u8; 32], 0), @@ -135,6 +137,7 @@ mod action_messages_tests { salt: 0, }, skip_cache_lookup: true, + digest_function: DigestHasherFunc::Sha256, }); let mut action_set = BTreeSet::>::new(); action_set.insert(lowest_priority_action.clone()); @@ -168,6 +171,7 @@ mod action_messages_tests { salt: 0, }, skip_cache_lookup: true, + digest_function: DigestHasherFunc::Sha256, }); let current_action = Arc::new(ActionInfo { command_digest: DigestInfo::new([0u8; 32], 0), @@ -185,6 +189,7 @@ mod action_messages_tests { salt: 0, }, skip_cache_lookup: true, + digest_function: DigestHasherFunc::Sha256, }); let mut action_set = BTreeSet::>::new(); action_set.insert(current_action.clone()); diff --git a/cas/scheduler/tests/simple_scheduler_test.rs b/cas/scheduler/tests/simple_scheduler_test.rs index dbcf0deb0..3edb239c5 100644 --- a/cas/scheduler/tests/simple_scheduler_test.rs +++ b/cas/scheduler/tests/simple_scheduler_test.rs @@ -231,7 +231,7 @@ mod scheduler_tests { let unique_qualifier = ActionInfoHashKey { instance_name: "".to_string(), - digest: DigestInfo::empty_digest(), + digest: DigestInfo::zero_digest(), salt: 0, }; @@ -421,7 +421,7 @@ mod scheduler_tests { let mut expected_action_state = ActionState { unique_qualifier: ActionInfoHashKey { instance_name: "".to_string(), - digest: DigestInfo::empty_digest(), + digest: DigestInfo::zero_digest(), salt: 0, }, // Will be filled later. stage: ActionStage::Queued, @@ -565,7 +565,7 @@ mod scheduler_tests { // Name is a random string, so we ignore it and just make it the same. unique_qualifier: ActionInfoHashKey { instance_name: "".to_string(), - digest: DigestInfo::empty_digest(), + digest: DigestInfo::zero_digest(), salt: 0, }, stage: ActionStage::Executing, @@ -931,7 +931,7 @@ mod scheduler_tests { let mut expected_action_state = ActionState { unique_qualifier: ActionInfoHashKey { instance_name: "".to_string(), - digest: DigestInfo::empty_digest(), + digest: DigestInfo::zero_digest(), salt: 0, }, // Will be filled later. stage: ActionStage::Executing, @@ -1313,8 +1313,8 @@ mod scheduler_tests { output_file_symlinks: Vec::default(), output_directory_symlinks: Vec::default(), exit_code: INTERNAL_ERROR_EXIT_CODE, - stdout_digest: DigestInfo::empty_digest(), - stderr_digest: DigestInfo::empty_digest(), + stdout_digest: DigestInfo::zero_digest(), + stderr_digest: DigestInfo::zero_digest(), execution_metadata: ExecutionMetadata { worker: WORKER_ID.to_string(), queued_timestamp: SystemTime::UNIX_EPOCH, diff --git a/cas/scheduler/tests/utils/scheduler_utils.rs b/cas/scheduler/tests/utils/scheduler_utils.rs index c7ebbfad0..bdafe221c 100644 --- a/cas/scheduler/tests/utils/scheduler_utils.rs +++ b/cas/scheduler/tests/utils/scheduler_utils.rs @@ -17,6 +17,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use action_messages::{ActionInfo, ActionInfoHashKey}; use common::DigestInfo; +use digest_hasher::DigestHasherFunc; use platform_property_manager::PlatformProperties; pub const INSTANCE_NAME: &str = "foobar_instance_name"; @@ -38,5 +39,6 @@ pub fn make_base_action_info(insert_timestamp: SystemTime) -> ActionInfo { salt: 0, }, skip_cache_lookup: false, + digest_function: DigestHasherFunc::Sha256, } } diff --git a/cas/worker/BUILD b/cas/worker/BUILD index 09d93fe42..80a199e88 100644 --- a/cas/worker/BUILD +++ b/cas/worker/BUILD @@ -14,6 +14,7 @@ rust_library( "//config", "//proto", "//util:common", + "//util:digest_hasher", "//util:error", "//util:metrics_utils", "@crate_index//:async-lock", @@ -126,6 +127,7 @@ rust_library( "//cas/scheduler:action_messages", "//proto", "//util:common", + "//util:digest_hasher", "//util:error", "@crate_index//:async-lock", "@crate_index//:tokio", @@ -174,6 +176,7 @@ rust_test( "//config", "//proto", "//util:common", + "//util:digest_hasher", "//util:error", "@crate_index//:env_logger", "@crate_index//:pretty_assertions", diff --git a/cas/worker/local_worker.rs b/cas/worker/local_worker.rs index 576f21080..67bd44f93 100644 --- a/cas/worker/local_worker.rs +++ b/cas/worker/local_worker.rs @@ -29,6 +29,7 @@ use tonic::{transport::Channel as TonicChannel, Streaming}; use action_messages::{ActionResult, ActionStage}; use common::{fs, log}; use config::cas_server::LocalWorkerConfig; +use digest_hasher::DigestHasherFunc; use error::{make_err, make_input_err, Code, Error, ResultExt}; use fast_slow_store::FastSlowStore; use metrics_utils::{AsyncCounterWrapper, Collector, CollectorState, CounterWithTime, MetricsComponent, Registry}; @@ -207,6 +208,10 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, let salt = start_execute.salt; let worker_id = self.worker_id.clone(); let action_digest = start_execute.execute_request.as_ref().and_then(|v| v.action_digest.clone()); + let try_hasher = start_execute.execute_request.as_ref() + .ok_or(make_input_err!("Expected execute_request to be set")) + .and_then(|v| DigestHasherFunc::try_from(v.digest_function)) + .err_tip(|| "In LocalWorkerImpl::new()"); let running_actions_manager = self.running_actions_manager.clone(); let worker_id_clone = worker_id.clone(); let precondition_script_cfg = self.config.precondition_script.clone(); @@ -245,7 +250,7 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, Ok(mut action_result) => { // Save in the action cache before notifying the scheduler that we've completed. if let Some(digest_info) = action_digest.clone().and_then(|action_digest| action_digest.try_into().ok()) { - if let Err(err) = running_actions_manager.cache_action_result(digest_info, &mut action_result).await { + if let Err(err) = running_actions_manager.cache_action_result(digest_info, &mut action_result, try_hasher?).await { log::error!("\x1b[0;31mError saving action in store\x1b[0m: {} - {:?}", err, action_digest); } } diff --git a/cas/worker/running_actions_manager.rs b/cas/worker/running_actions_manager.rs index 1c59c14ed..83f4475f5 100644 --- a/cas/worker/running_actions_manager.rs +++ b/cas/worker/running_actions_manager.rs @@ -63,10 +63,9 @@ use fast_slow_store::FastSlowStore; use filesystem_store::{FileEntry, FilesystemStore}; use grpc_store::GrpcStore; use proto::build::bazel::remote::execution::v2::{ - digest_function, Action, Command as ProtoCommand, Directory as ProtoDirectory, Directory, DirectoryNode, - ExecuteResponse, FileNode, SymlinkNode, Tree as ProtoTree, + Action, ActionResult as ProtoActionResult, Command as ProtoCommand, Directory as ProtoDirectory, Directory, + DirectoryNode, ExecuteResponse, FileNode, SymlinkNode, Tree as ProtoTree, UpdateActionResultRequest, }; -use proto::build::bazel::remote::execution::v2::{ActionResult as ProtoActionResult, UpdateActionResultRequest}; use proto::com::github::trace_machina::native_link::remote_execution::{HistoricalExecuteResponse, StartExecute}; use store::Store; @@ -233,6 +232,7 @@ async fn upload_file( mut resumeable_file: fs::ResumeableFileSlot<'static>, cas_store: Pin<&dyn Store>, full_path: impl AsRef + Debug, + hasher: DigestHasherFunc, ) -> Result { let (digest, is_executable, resumeable_file) = { let (digest, mut resumeable_file) = JoinHandleDropGuard::new(tokio::spawn(async move { @@ -240,9 +240,7 @@ async fn upload_file( .as_reader() .await .err_tip(|| "Could not get reader from file slot in RunningActionsManager::upload_file()")?; - let digest = compute_digest(file_handle, &mut DigestHasherFunc::Sha256.into()) - .await? - .0; + let digest = compute_digest(file_handle, &mut hasher.into()).await?.0; Ok::<_, Error>((digest, resumeable_file)) })) .await @@ -326,6 +324,7 @@ fn upload_directory<'a, P: AsRef + Debug + Send + Sync + Clone + 'a>( cas_store: Pin<&'a dyn Store>, full_dir_path: P, full_work_directory: &'a str, + hasher: DigestHasherFunc, ) -> BoxFuture<'a, Result<(Directory, VecDeque), Error>> { Box::pin(async move { let file_futures = FuturesUnordered::new(); @@ -354,7 +353,7 @@ fn upload_directory<'a, P: AsRef + Debug + Send + Sync + Clone + 'a>( if file_type.is_dir() { let full_dir_path = full_dir_path.clone(); dir_futures.push( - upload_directory(cas_store, full_path.clone(), full_work_directory) + upload_directory(cas_store, full_path.clone(), full_work_directory, hasher) .and_then(|(dir, all_dirs)| async move { let directory_name = full_path .file_name() @@ -365,10 +364,9 @@ fn upload_directory<'a, P: AsRef + Debug + Send + Sync + Clone + 'a>( })? .to_string(); - let digest = - serialize_and_upload_message(&dir, cas_store, &mut DigestHasherFunc::Sha256.into()) - .await - .err_tip(|| format!("for {full_path:?}"))?; + let digest = serialize_and_upload_message(&dir, cas_store, &mut hasher.into()) + .await + .err_tip(|| format!("for {full_path:?}"))?; Result::<(DirectoryNode, VecDeque), Error>::Ok(( DirectoryNode { @@ -385,7 +383,7 @@ fn upload_directory<'a, P: AsRef + Debug + Send + Sync + Clone + 'a>( let file_handle = fs::open_file(full_path.as_os_str().to_os_string(), u64::MAX) .await .err_tip(|| format!("Could not open file {full_path:?}"))?; - upload_file(file_handle, cas_store, &full_path) + upload_file(file_handle, cas_store, &full_path, hasher) .map_ok(|v| v.into()) .await }); @@ -897,6 +895,7 @@ impl RunningActionImpl { ) }; let cas_store = Pin::new(self.running_actions_manager.cas_store.as_ref()); + let hasher = self.action_info.digest_function; enum OutputType { None, File(FileInfo), @@ -952,7 +951,7 @@ impl RunningActionImpl { .err_tip(|| format!("While querying symlink metadata for {entry}"))?; if metadata.is_file() { return Ok(OutputType::File( - upload_file(resumeable_file, cas_store, &full_path) + upload_file(resumeable_file, cas_store, &full_path, hasher) .await .map(|mut file_info| { file_info.name_or_path = NameOrPath::Path(entry); @@ -965,19 +964,15 @@ impl RunningActionImpl { }; if metadata.is_dir() { Ok(OutputType::Directory( - upload_directory(cas_store, &full_path, work_directory) + upload_directory(cas_store, &full_path, work_directory, hasher) .and_then(|(root_dir, children)| async move { let tree = ProtoTree { root: Some(root_dir), children: children.into(), }; - let tree_digest = serialize_and_upload_message( - &tree, - cas_store, - &mut DigestHasherFunc::Sha256.into(), - ) - .await - .err_tip(|| format!("While processing {entry}"))?; + let tree_digest = serialize_and_upload_message(&tree, cas_store, &mut hasher.into()) + .await + .err_tip(|| format!("While processing {entry}"))?; Ok(DirectoryInfo { path: entry, tree_digest, @@ -1036,7 +1031,7 @@ impl RunningActionImpl { let stdout_digest_fut = self.metrics().upload_stdout.wrap(async { let data = execution_result.stdout; - let digest = compute_buf_digest(&data, &mut DigestHasherFunc::Sha256.into()) + let digest = compute_buf_digest(&data, &mut hasher.into()) .await .err_tip(|| "Computing stdout digest")?; upload_buf_to_store(cas_store, digest, data) @@ -1046,7 +1041,7 @@ impl RunningActionImpl { }); let stderr_digest_fut = self.metrics().upload_stderr.wrap(async { let data = execution_result.stderr; - let digest = compute_buf_digest(&data, &mut DigestHasherFunc::Sha256.into()) + let digest = compute_buf_digest(&data, &mut hasher.into()) .await .err_tip(|| "Computing stderr digest")?; upload_buf_to_store(cas_store, digest, data) @@ -1190,6 +1185,7 @@ pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static { &self, action_digest: DigestInfo, action_result: &mut ActionResult, + hasher: DigestHasherFunc, ) -> Result<(), Error>; async fn kill_all(&self); @@ -1295,9 +1291,9 @@ impl UploadActionResults { mut template_str: Template, action_digest_info: DigestInfo, maybe_historical_digest_info: Option, + hasher: DigestHasherFunc, ) -> Result { - // TODO(allada) Currently only sha256 is supported, but soon will be dynamic. - template_str.replace("digest_function", digest_function::Value::Sha256.as_str_name()); + template_str.replace("digest_function", hasher.proto_digest_func().as_str_name()); template_str.replace("action_digest_hash", action_digest_info.hash_str()); template_str.replace("action_digest_size", action_digest_info.size_bytes); if let Some(historical_digest_info) = maybe_historical_digest_info { @@ -1316,6 +1312,7 @@ impl UploadActionResults { &self, action_digest: DigestInfo, action_result: ProtoActionResult, + hasher: DigestHasherFunc, ) -> Result<(), Error> { let Some(ac_store) = &self.ac_store else { return Ok(()) }; // If we are a GrpcStore we shortcut here, as this is a special store. @@ -1328,7 +1325,7 @@ impl UploadActionResults { action_digest: Some(action_digest.into()), action_result: Some(action_result), results_cache_policy: None, - digest_function: digest_function::Value::Sha256.into(), + digest_function: hasher.proto_digest_func().into(), }; return grpc_store .update_action_result(Request::new(update_action_request)) @@ -1353,6 +1350,7 @@ impl UploadActionResults { action_digest: DigestInfo, execute_response: ExecuteResponse, message_template: Template, + hasher: DigestHasherFunc, ) -> Result { let historical_digest_info = serialize_and_upload_message( &HistoricalExecuteResponse { @@ -1360,12 +1358,12 @@ impl UploadActionResults { execute_response: Some(execute_response.clone()), }, Pin::new(self.historical_store.as_ref()), - &mut DigestHasherFunc::Sha256.into(), + &mut hasher.into(), ) .await .err_tip(|| format!("Caching HistoricalExecuteResponse for digest: {action_digest:?}"))?; - Self::format_execute_response_message(message_template, action_digest, Some(historical_digest_info)) + Self::format_execute_response_message(message_template, action_digest, Some(historical_digest_info), hasher) .err_tip(|| "Could not format message in upload_historical_results_with_message") } @@ -1373,6 +1371,7 @@ impl UploadActionResults { &self, action_info: DigestInfo, action_result: &mut ActionResult, + hasher: DigestHasherFunc, ) -> Result<(), Error> { let should_upload_historical_results = Self::should_cache_result(self.upload_historical_results_strategy, action_result, true); @@ -1394,7 +1393,7 @@ impl UploadActionResults { let upload_historical_results_with_message_result = if should_upload_historical_results { let maybe_message = self - .upload_historical_results_with_message(action_info, execute_response.clone(), message_template) + .upload_historical_results_with_message(action_info, execute_response.clone(), message_template, hasher) .await; match maybe_message { Ok(message) => { @@ -1405,7 +1404,7 @@ impl UploadActionResults { Err(e) => Result::<(), Error>::Err(e), } } else { - match Self::format_execute_response_message(message_template, action_info, None) { + match Self::format_execute_response_message(message_template, action_info, None, hasher) { Ok(message) => { action_result.message = message.clone(); execute_response.message = message; @@ -1424,6 +1423,7 @@ impl UploadActionResults { execute_response .result .err_tip(|| "No result set in cache_action_result")?, + hasher, ) .await } else { @@ -1636,12 +1636,13 @@ impl RunningActionsManager for RunningActionsManagerImpl { &self, action_info: DigestInfo, action_result: &mut ActionResult, + hasher: DigestHasherFunc, ) -> Result<(), Error> { self.metrics .cache_action_result .wrap( self.upload_action_results - .cache_action_result(action_info, action_result), + .cache_action_result(action_info, action_result, hasher), ) .await } diff --git a/cas/worker/tests/local_worker_test.rs b/cas/worker/tests/local_worker_test.rs index ccd01a7ff..f3e56a050 100644 --- a/cas/worker/tests/local_worker_test.rs +++ b/cas/worker/tests/local_worker_test.rs @@ -32,6 +32,7 @@ use tonic::Response; use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ExecutionMetadata}; use common::{encode_stream_proto, fs, DigestInfo}; use config::cas_server::{LocalWorkerConfig, WorkerProperty}; +use digest_hasher::DigestHasherFunc; use error::{make_err, make_input_err, Code, Error}; use fast_slow_store::FastSlowStore; use filesystem_store::FilesystemStore; @@ -177,6 +178,87 @@ mod local_worker_tests { Ok(()) } + #[tokio::test] + async fn blake3_digest_function_registerd_properly() -> Result<(), Box> { + const SALT: u64 = 1000; + + let mut test_context = setup_local_worker(HashMap::new()).await; + let streaming_response = test_context.maybe_streaming_response.take().unwrap(); + + { + // Ensure our worker connects and properties were sent. + let props = test_context.client.expect_connect_worker(Ok(streaming_response)).await; + assert_eq!(props, SupportedProperties::default()); + } + + let expected_worker_id = "foobar".to_string(); + + let mut tx_stream = test_context.maybe_tx_stream.take().unwrap(); + { + // First initialize our worker by sending the response to the connection request. + tx_stream + .send_data(encode_stream_proto(&UpdateForWorker { + update: Some(Update::ConnectionResult(ConnectionResult { + worker_id: expected_worker_id.clone(), + })), + })?) + .await + .map_err(|e| make_input_err!("Could not send : {:?}", e))?; + } + + let action_digest = DigestInfo::new([3u8; 32], 10); + let action_info = ActionInfo { + command_digest: DigestInfo::new([1u8; 32], 10), + input_root_digest: DigestInfo::new([2u8; 32], 10), + timeout: Duration::from_secs(1), + platform_properties: PlatformProperties::default(), + priority: 0, + load_timestamp: SystemTime::UNIX_EPOCH, + insert_timestamp: SystemTime::UNIX_EPOCH, + unique_qualifier: ActionInfoHashKey { + instance_name: INSTANCE_NAME.to_string(), + digest: action_digest, + salt: SALT, + }, + skip_cache_lookup: true, + digest_function: DigestHasherFunc::Blake3, + }; + + { + // Send execution request. + tx_stream + .send_data(encode_stream_proto(&UpdateForWorker { + update: Some(Update::StartAction(StartExecute { + execute_request: Some(action_info.into()), + salt: SALT, + queued_timestamp: None, + })), + })?) + .await + .map_err(|e| make_input_err!("Could not send : {:?}", e))?; + } + let running_action = Arc::new(MockRunningAction::new()); + + // Send and wait for response from create_and_add_action to RunningActionsManager. + test_context + .actions_manager + .expect_create_and_add_action(Ok(running_action.clone())) + .await; + + // Now the RunningAction needs to send a series of state updates. This shortcuts them + // into a single call (shortcut for prepare, execute, upload, collect_results, cleanup). + running_action + .simple_expect_get_finished_result(Ok(ActionResult::default())) + .await?; + + // Expect the action to be updated in the action cache. + let (_stored_digest, _stored_result, digest_hasher) = + test_context.actions_manager.expect_cache_action_result().await; + assert_eq!(digest_hasher, DigestHasherFunc::Blake3); + + Ok(()) + } + #[tokio::test] async fn simple_worker_start_action_test() -> Result<(), Box> { const SALT: u64 = 1000; @@ -220,6 +302,7 @@ mod local_worker_tests { salt: SALT, }, skip_cache_lookup: true, + digest_function: DigestHasherFunc::Sha256, }; { @@ -274,9 +357,11 @@ mod local_worker_tests { .await?; // Expect the action to be updated in the action cache. - let (stored_digest, stored_result) = test_context.actions_manager.expect_cache_action_result().await; + let (stored_digest, stored_result, digest_hasher) = + test_context.actions_manager.expect_cache_action_result().await; assert_eq!(stored_digest, action_digest); assert_eq!(stored_result, action_result.clone()); + assert_eq!(digest_hasher, DigestHasherFunc::Sha256); // Now our client should be notified that our runner finished. let execution_response = test_context @@ -458,6 +543,7 @@ mod local_worker_tests { salt: SALT, }, skip_cache_lookup: true, + digest_function: DigestHasherFunc::Sha256, }; { diff --git a/cas/worker/tests/running_actions_manager_test.rs b/cas/worker/tests/running_actions_manager_test.rs index 657a1da0f..e7b4711f2 100644 --- a/cas/worker/tests/running_actions_manager_test.rs +++ b/cas/worker/tests/running_actions_manager_test.rs @@ -44,8 +44,9 @@ use filesystem_store::FilesystemStore; use memory_store::MemoryStore; #[cfg_attr(target_family = "windows", allow(unused_imports))] use proto::build::bazel::remote::execution::v2::{ - platform::Property, Action, ActionResult as ProtoActionResult, Command, Directory, DirectoryNode, ExecuteRequest, - ExecuteResponse, FileNode, NodeProperties, Platform, SymlinkNode, Tree, + digest_function::Value as ProtoDigestFunction, platform::Property, Action, ActionResult as ProtoActionResult, + Command, Directory, DirectoryNode, ExecuteRequest, ExecuteResponse, FileNode, NodeProperties, Platform, + SymlinkNode, Tree, }; use proto::com::github::trace_machina::native_link::remote_execution::{HistoricalExecuteResponse, StartExecute}; use running_actions_manager::{ @@ -611,6 +612,165 @@ mod running_actions_manager_tests { Ok(()) } + #[tokio::test] + async fn blake3_upload_files() -> Result<(), Box> { + const WORKER_ID: &str = "foo_worker_id"; + + fn test_monotonic_clock() -> SystemTime { + static CLOCK: AtomicU64 = AtomicU64::new(0); + monotonic_clock(&CLOCK) + } + + let (_, slow_store, cas_store, ac_store) = setup_stores().await?; + let root_work_directory = make_temp_path("root_work_directory"); + fs::create_dir_all(&root_work_directory).await?; + + let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( + RunningActionsManagerArgs { + root_work_directory, + execution_configuration: ExecutionConfiguration::default(), + cas_store: Pin::into_inner(cas_store.clone()), + ac_store: Some(Pin::into_inner(ac_store.clone())), + historical_store: Pin::into_inner(cas_store.clone()), + upload_action_result_config: &config::cas_server::UploadActionResultConfig { + upload_ac_results_strategy: config::cas_server::UploadCacheResultsStrategy::Never, + ..Default::default() + }, + max_action_timeout: Duration::MAX, + timeout_handled_externally: false, + }, + Callbacks { + now_fn: test_monotonic_clock, + sleep_fn: |_duration| Box::pin(futures::future::pending()), + }, + )?); + let action_result = { + const SALT: u64 = 55; + #[cfg(target_family = "unix")] + let arguments = vec![ + "sh".to_string(), + "-c".to_string(), + "echo -n \"123 \" > ./test.txt; echo -n \"foo-stdout \"; >&2 echo -n \"bar-stderr \"".to_string(), + ]; + #[cfg(target_family = "windows")] + let arguments = vec![ + "cmd".to_string(), + "/C".to_string(), + // Note: Windows adds two spaces after 'set /p=XXX'. + "echo | set /p=123> ./test.txt & echo | set /p=foo-stdout & echo | set /p=bar-stderr 1>&2 & exit 0" + .to_string(), + ]; + let working_directory = "some_cwd"; + let command = Command { + arguments, + output_paths: vec!["test.txt".to_string()], + working_directory: working_directory.to_string(), + ..Default::default() + }; + let command_digest = + serialize_and_upload_message(&command, cas_store.as_ref(), &mut DigestHasherFunc::Blake3.into()) + .await?; + let input_root_digest = serialize_and_upload_message( + &Directory { + directories: vec![DirectoryNode { + name: working_directory.to_string(), + digest: Some( + serialize_and_upload_message( + &Directory::default(), + cas_store.as_ref(), + &mut DigestHasherFunc::Blake3.into(), + ) + .await? + .into(), + ), + }], + ..Default::default() + }, + cas_store.as_ref(), + &mut DigestHasherFunc::Blake3.into(), + ) + .await?; + let action = Action { + command_digest: Some(command_digest.into()), + input_root_digest: Some(input_root_digest.into()), + ..Default::default() + }; + let action_digest = + serialize_and_upload_message(&action, cas_store.as_ref(), &mut DigestHasherFunc::Blake3.into()).await?; + + let running_action_impl = running_actions_manager + .create_and_add_action( + WORKER_ID.to_string(), + StartExecute { + execute_request: Some(ExecuteRequest { + action_digest: Some(action_digest.into()), + digest_function: ProtoDigestFunction::Blake3.into(), + ..Default::default() + }), + salt: SALT, + queued_timestamp: None, + }, + ) + .await?; + + run_action(running_action_impl.clone()).await? + }; + let file_content = slow_store + .as_ref() + .get_part_unchunked(action_result.output_files[0].digest, 0, None, None) + .await?; + assert_eq!(from_utf8(&file_content)?, "123 "); + let stdout_content = slow_store + .as_ref() + .get_part_unchunked(action_result.stdout_digest, 0, None, None) + .await?; + assert_eq!(from_utf8(&stdout_content)?, "foo-stdout "); + let stderr_content = slow_store + .as_ref() + .get_part_unchunked(action_result.stderr_digest, 0, None, None) + .await?; + assert_eq!(from_utf8(&stderr_content)?, "bar-stderr "); + let mut clock_time = make_system_time(0); + assert_eq!( + action_result, + ActionResult { + output_files: vec![FileInfo { + name_or_path: NameOrPath::Path("test.txt".to_string()), + digest: DigestInfo::try_new("3f488ba478fc6716c756922c9f34ebd7e84b85c3e03e33e22e7a3736cafdc6d8", 4)?, + is_executable: false, + }], + stdout_digest: DigestInfo::try_new( + "af1720193ae81515067a3ef39f0dfda3ad54a1a9d216e55d32fe5c1e178c6a7d", + 11 + )?, + stderr_digest: DigestInfo::try_new( + "65e0abbae32a3aedaf040b654c6f02ace03c7690c17a8415a90fc2ec9c809a16", + 12 + )?, + exit_code: 0, + output_folders: vec![], + output_file_symlinks: vec![], + output_directory_symlinks: vec![], + server_logs: HashMap::new(), + execution_metadata: ExecutionMetadata { + worker: WORKER_ID.to_string(), + queued_timestamp: SystemTime::UNIX_EPOCH, + worker_start_timestamp: increment_clock(&mut clock_time), + input_fetch_start_timestamp: increment_clock(&mut clock_time), + input_fetch_completed_timestamp: increment_clock(&mut clock_time), + execution_start_timestamp: increment_clock(&mut clock_time), + execution_completed_timestamp: increment_clock(&mut clock_time), + output_upload_start_timestamp: increment_clock(&mut clock_time), + output_upload_completed_timestamp: increment_clock(&mut clock_time), + worker_completed_timestamp: increment_clock(&mut clock_time), + }, + error: None, + message: String::new(), + } + ); + Ok(()) + } + #[tokio::test] async fn upload_files_from_above_cwd_test() -> Result<(), Box> { const WORKER_ID: &str = "foo_worker_id"; @@ -1595,7 +1755,7 @@ exit 1 message: String::new(), }; running_actions_manager - .cache_action_result(action_digest, &mut action_result) + .cache_action_result(action_digest, &mut action_result, DigestHasherFunc::Sha256) .await?; let retrieved_result = get_and_decode_digest::(ac_store.as_ref(), &action_digest).await?; @@ -1654,7 +1814,7 @@ exit 1 message: String::new(), }; running_actions_manager - .cache_action_result(action_digest, &mut action_result) + .cache_action_result(action_digest, &mut action_result, DigestHasherFunc::Sha256) .await?; let retrieved_result = get_and_decode_digest::(ac_store.as_ref(), &action_digest).await?; @@ -1714,7 +1874,7 @@ exit 1 message: String::new(), }; running_actions_manager - .cache_action_result(action_digest, &mut action_result) + .cache_action_result(action_digest, &mut action_result, DigestHasherFunc::Sha256) .await?; assert!(!action_result.message.is_empty(), "Message should be set"); @@ -1770,7 +1930,7 @@ exit 1 ..Default::default() }; running_actions_manager - .cache_action_result(action_digest, &mut action_result) + .cache_action_result(action_digest, &mut action_result, DigestHasherFunc::Sha256) .await?; assert!(action_result.message.is_empty(), "Message should not be set"); @@ -1803,7 +1963,7 @@ exit 1 ..Default::default() }; running_actions_manager - .cache_action_result(action_digest, &mut action_result) + .cache_action_result(action_digest, &mut action_result, DigestHasherFunc::Sha256) .await?; assert!(!action_result.message.is_empty(), "Message should be set"); @@ -1859,7 +2019,7 @@ exit 1 ..Default::default() }; running_actions_manager - .cache_action_result(action_digest, &mut action_result) + .cache_action_result(action_digest, &mut action_result, DigestHasherFunc::Sha256) .await?; assert!(!action_result.message.is_empty(), "Message should be set"); diff --git a/cas/worker/tests/utils/mock_running_actions_manager.rs b/cas/worker/tests/utils/mock_running_actions_manager.rs index 202adfc9e..ddd5c5133 100644 --- a/cas/worker/tests/utils/mock_running_actions_manager.rs +++ b/cas/worker/tests/utils/mock_running_actions_manager.rs @@ -20,6 +20,7 @@ use tokio::sync::mpsc; use action_messages::ActionResult; use common::DigestInfo; +use digest_hasher::DigestHasherFunc; use error::{make_input_err, Error}; use proto::com::github::trace_machina::native_link::remote_execution::StartExecute; use running_actions_manager::{Metrics, RunningAction, RunningActionsManager}; @@ -27,7 +28,7 @@ use running_actions_manager::{Metrics, RunningAction, RunningActionsManager}; #[derive(Debug)] enum RunningActionManagerCalls { CreateAndAddAction((String, StartExecute)), - CacheActionResult(Box<(DigestInfo, ActionResult)>), + CacheActionResult(Box<(DigestInfo, ActionResult, DigestHasherFunc)>), } enum RunningActionManagerReturns { @@ -87,7 +88,7 @@ impl MockRunningActionsManager { req } - pub async fn expect_cache_action_result(&self) -> (DigestInfo, ActionResult) { + pub async fn expect_cache_action_result(&self) -> (DigestInfo, ActionResult, DigestHasherFunc) { let mut rx_call_lock = self.rx_call.lock().await; match rx_call_lock.recv().await.expect("Could not recieve msg in mpsc") { RunningActionManagerCalls::CacheActionResult(req) => *req, @@ -126,11 +127,13 @@ impl RunningActionsManager for MockRunningActionsManager { &self, action_digest: DigestInfo, action_result: &mut ActionResult, + digest_function: DigestHasherFunc, ) -> Result<(), Error> { self.tx_call .send(RunningActionManagerCalls::CacheActionResult(Box::new(( action_digest, action_result.clone(), + digest_function, )))) .expect("Could not send request to mpsc"); Ok(()) diff --git a/config/cas_server.rs b/config/cas_server.rs index 00f1cd23a..56798eef1 100644 --- a/config/cas_server.rs +++ b/config/cas_server.rs @@ -494,6 +494,13 @@ pub enum WorkerConfig { local(LocalWorkerConfig), } +#[allow(non_camel_case_types)] +#[derive(Deserialize, Debug, Clone, Copy)] +pub enum ConfigDigestHashFunction { + sha256, + blake3, +} + #[derive(Deserialize, Debug, Clone, Copy)] pub struct GlobalConfig { /// Maximum number of open files that can be opened at one time. @@ -536,6 +543,12 @@ pub struct GlobalConfig { /// Default: #[serde(default)] pub disable_metrics: bool, + + /// Default hash function to use while uploading blobs to the CAS when not set + /// by client. + /// + /// Default: ConfigDigestHashFunction::Sha256 + pub default_digest_hash_function: Option, } #[derive(Deserialize, Debug)] diff --git a/gencargo/action_messages/Cargo.toml b/gencargo/action_messages/Cargo.toml index d300c9a3a..defd0ada5 100644 --- a/gencargo/action_messages/Cargo.toml +++ b/gencargo/action_messages/Cargo.toml @@ -28,5 +28,6 @@ tonic = { workspace = true } platform_property_manager = { workspace = true } proto = { workspace = true } common = { workspace = true } +digest_hasher = { workspace = true } error = { workspace = true } metrics_utils = { workspace = true } diff --git a/gencargo/action_messages_test/Cargo.toml b/gencargo/action_messages_test/Cargo.toml index 69a68651c..5dadd1de6 100644 --- a/gencargo/action_messages_test/Cargo.toml +++ b/gencargo/action_messages_test/Cargo.toml @@ -27,4 +27,5 @@ action_messages = { workspace = true } platform_property_manager = { workspace = true } proto = { workspace = true } common = { workspace = true } +digest_hasher = { workspace = true } error = { workspace = true } diff --git a/gencargo/capabilities_server/Cargo.toml b/gencargo/capabilities_server/Cargo.toml index 2ead8397c..06230666b 100644 --- a/gencargo/capabilities_server/Cargo.toml +++ b/gencargo/capabilities_server/Cargo.toml @@ -26,4 +26,5 @@ scheduler = { workspace = true } store = { workspace = true } config = { workspace = true } proto = { workspace = true } +digest_hasher = { workspace = true } error = { workspace = true } diff --git a/gencargo/cas/Cargo.toml b/gencargo/cas/Cargo.toml index 1299dbeb2..f08834648 100644 --- a/gencargo/cas/Cargo.toml +++ b/gencargo/cas/Cargo.toml @@ -50,5 +50,6 @@ store = { workspace = true } local_worker = { workspace = true } config = { workspace = true } common = { workspace = true } +digest_hasher = { workspace = true } error = { workspace = true } metrics_utils = { workspace = true } diff --git a/gencargo/digest_hasher/Cargo.toml b/gencargo/digest_hasher/Cargo.toml index df46b0d98..4ded99e55 100644 --- a/gencargo/digest_hasher/Cargo.toml +++ b/gencargo/digest_hasher/Cargo.toml @@ -23,4 +23,7 @@ blake3 = { workspace = true } sha2 = { workspace = true } # Local libraries. +config = { workspace = true } +proto = { workspace = true } common = { workspace = true } +error = { workspace = true } diff --git a/gencargo/execution_server/Cargo.toml b/gencargo/execution_server/Cargo.toml index 723f56e14..e33e5592c 100644 --- a/gencargo/execution_server/Cargo.toml +++ b/gencargo/execution_server/Cargo.toml @@ -35,4 +35,5 @@ store = { workspace = true } config = { workspace = true } proto = { workspace = true } common = { workspace = true } +digest_hasher = { workspace = true } error = { workspace = true } diff --git a/gencargo/local_worker/Cargo.toml b/gencargo/local_worker/Cargo.toml index 13f8f5692..177491841 100644 --- a/gencargo/local_worker/Cargo.toml +++ b/gencargo/local_worker/Cargo.toml @@ -35,5 +35,6 @@ worker_utils = { workspace = true } config = { workspace = true } proto = { workspace = true } common = { workspace = true } +digest_hasher = { workspace = true } error = { workspace = true } metrics_utils = { workspace = true } diff --git a/gencargo/local_worker_test/Cargo.toml b/gencargo/local_worker_test/Cargo.toml index d662c8dda..03df072cc 100644 --- a/gencargo/local_worker_test/Cargo.toml +++ b/gencargo/local_worker_test/Cargo.toml @@ -39,4 +39,5 @@ mock_running_actions_manager = { workspace = true } config = { workspace = true } proto = { workspace = true } common = { workspace = true } +digest_hasher = { workspace = true } error = { workspace = true } diff --git a/gencargo/mock_running_actions_manager/Cargo.toml b/gencargo/mock_running_actions_manager/Cargo.toml index bf0c29f15..9d85b6acc 100644 --- a/gencargo/mock_running_actions_manager/Cargo.toml +++ b/gencargo/mock_running_actions_manager/Cargo.toml @@ -28,4 +28,5 @@ action_messages = { workspace = true } running_actions_manager = { workspace = true } proto = { workspace = true } common = { workspace = true } +digest_hasher = { workspace = true } error = { workspace = true } diff --git a/gencargo/scheduler_utils/Cargo.toml b/gencargo/scheduler_utils/Cargo.toml index 87d489a21..491cbd481 100644 --- a/gencargo/scheduler_utils/Cargo.toml +++ b/gencargo/scheduler_utils/Cargo.toml @@ -25,3 +25,4 @@ async-trait = { workspace = true } action_messages = { workspace = true } platform_property_manager = { workspace = true } common = { workspace = true } +digest_hasher = { workspace = true } diff --git a/gencargo/worker_api_server/Cargo.toml b/gencargo/worker_api_server/Cargo.toml index be7832327..a2645a208 100644 --- a/gencargo/worker_api_server/Cargo.toml +++ b/gencargo/worker_api_server/Cargo.toml @@ -32,4 +32,5 @@ worker = { workspace = true } config = { workspace = true } proto = { workspace = true } common = { workspace = true } +digest_hasher = { workspace = true } error = { workspace = true } diff --git a/gencargo/worker_api_server_test/Cargo.toml b/gencargo/worker_api_server_test/Cargo.toml index 4e7078488..a4a3255ad 100644 --- a/gencargo/worker_api_server_test/Cargo.toml +++ b/gencargo/worker_api_server_test/Cargo.toml @@ -35,4 +35,5 @@ worker = { workspace = true } config = { workspace = true } proto = { workspace = true } common = { workspace = true } +digest_hasher = { workspace = true } error = { workspace = true } diff --git a/util/BUILD b/util/BUILD index a83cbb7d4..e7b8c1bc1 100644 --- a/util/BUILD +++ b/util/BUILD @@ -37,6 +37,9 @@ rust_library( visibility = ["//visibility:public"], deps = [ ":common", + "//config", + "//proto", + "//util:error", "@crate_index//:blake3", "@crate_index//:sha2", ], diff --git a/util/common.rs b/util/common.rs index 6b8db875f..a1a0209ed 100644 --- a/util/common.rs +++ b/util/common.rs @@ -68,14 +68,11 @@ impl DigestInfo { hex::encode(self.packed_hash) } - pub const fn empty_digest() -> DigestInfo { + pub const fn zero_digest() -> DigestInfo { DigestInfo { size_bytes: 0, // Magic hash of a sha256 of empty string. - packed_hash: [ - 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, - 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55, - ], + packed_hash: [0u8; 32], } } } diff --git a/util/digest_hasher.rs b/util/digest_hasher.rs index e3b3a4746..ed27c0698 100644 --- a/util/digest_hasher.rs +++ b/util/digest_hasher.rs @@ -12,17 +12,84 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::OnceLock; + use blake3::Hasher as Blake3Hasher; use sha2::{Digest, Sha256}; use common::DigestInfo; +use config::cas_server::ConfigDigestHashFunction; +use error::{make_err, make_input_err, Code, Error}; +use proto::build::bazel::remote::execution::v2::digest_function::Value as ProtoDigestFunction; + +static DEFAULT_DIGEST_HASHER_FUNC: OnceLock = OnceLock::new(); + +/// Get the default hasher. +pub fn default_digest_hasher_func() -> DigestHasherFunc { + *DEFAULT_DIGEST_HASHER_FUNC.get_or_init(|| DigestHasherFunc::Sha256) +} + +/// Sets the default hasher to use if no hasher was requested by the client. +pub fn set_default_digest_hasher_func(hasher: DigestHasherFunc) -> Result<(), Error> { + DEFAULT_DIGEST_HASHER_FUNC + .set(hasher) + .map_err(|_| make_err!(Code::Internal, "default_digest_hasher_func already set")) +} /// Supported digest hash functions. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub enum DigestHasherFunc { Sha256, Blake3, } +impl DigestHasherFunc { + pub fn proto_digest_func(&self) -> ProtoDigestFunction { + match self { + DigestHasherFunc::Sha256 => ProtoDigestFunction::Sha256, + DigestHasherFunc::Blake3 => ProtoDigestFunction::Blake3, + } + } +} + +impl From for DigestHasherFunc { + fn from(value: ConfigDigestHashFunction) -> Self { + match value { + ConfigDigestHashFunction::sha256 => DigestHasherFunc::Sha256, + ConfigDigestHashFunction::blake3 => DigestHasherFunc::Blake3, + } + } +} + +impl TryFrom for DigestHasherFunc { + type Error = Error; + + fn try_from(value: ProtoDigestFunction) -> Result { + match value { + ProtoDigestFunction::Sha256 => Ok(DigestHasherFunc::Sha256), + ProtoDigestFunction::Blake3 => Ok(DigestHasherFunc::Blake3), + v => Err(make_input_err!("Unknown or unsupported digest function {v:?}")), + } + } +} + +impl TryFrom for DigestHasherFunc { + type Error = Error; + + fn try_from(value: i32) -> Result { + match ProtoDigestFunction::from_i32(value) { + // Note: Unknown represents 0, which means non-set, so use default. + Some(ProtoDigestFunction::Unknown) => Ok(default_digest_hasher_func()), + Some(ProtoDigestFunction::Sha256) => Ok(DigestHasherFunc::Sha256), + Some(ProtoDigestFunction::Blake3) => Ok(DigestHasherFunc::Blake3), + value => Err(make_input_err!( + "Unknown or unsupported digest function: {:?}", + value.map(|v| v.as_str_name()) + )), + } + } +} + impl From for DigestHasher { fn from(value: DigestHasherFunc) -> Self { match value {