Skip to content

Commit

Permalink
Add Blake3 digest support
Browse files Browse the repository at this point in the history
Fully support Blake3 digest upload format. This can increase
performance significantly for cases where hashing files is a
bottleneck.

closes #395
  • Loading branch information
allada committed Nov 20, 2023
1 parent 2fb59b6 commit f684028
Show file tree
Hide file tree
Showing 33 changed files with 463 additions and 65 deletions.
1 change: 1 addition & 0 deletions cas/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rust_binary(
"//cas/worker:local_worker",
"//config",
"//util:common",
"//util:digest_hasher",
"//util:error",
"//util:metrics_utils",
"@crate_index//:async-lock",
Expand Down
11 changes: 10 additions & 1 deletion cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ use capabilities_server::CapabilitiesServer;
use cas_server::CasServer;
use common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
use common::log;
use config::cas_server::{CasConfig, CompressionAlgorithm, GlobalConfig, ServerConfig, WorkerConfig};
use config::cas_server::{
CasConfig, CompressionAlgorithm, ConfigDigestHashFunction, GlobalConfig, ServerConfig, WorkerConfig,
};
use default_scheduler_factory::scheduler_factory;
use default_store_factory::store_factory;
use digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc};
use error::{make_err, Code, Error, ResultExt};
use execution_server::ExecutionServer;
use local_worker::new_local_worker;
Expand Down Expand Up @@ -632,10 +635,16 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
};
service.prometheus.is_none()
}),
default_digest_hash_function: None,
}
};
set_open_file_limit(global_cfg.max_open_files);
set_idle_file_descriptor_timeout(Duration::from_millis(global_cfg.idle_file_descriptor_timeout_millis))?;
set_default_digest_hasher_func(DigestHasherFunc::from(
global_cfg
.default_digest_hash_function
.unwrap_or(ConfigDigestHashFunction::sha256),
))?;
!global_cfg.disable_metrics
};
// Override metrics enabled if the environment variable is set.
Expand Down
4 changes: 4 additions & 0 deletions cas/grpc_service/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ rust_library(
"//cas/store",
"//config",
"//proto",
"//util:digest_hasher",
"//util:error",
"@crate_index//:tonic",
],
Expand Down Expand Up @@ -85,6 +86,7 @@ rust_library(
"//config",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:futures",
"@crate_index//:rand",
Expand All @@ -107,6 +109,7 @@ rust_library(
"//config",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:futures",
"@crate_index//:tokio",
Expand All @@ -128,6 +131,7 @@ rust_test(
"//config",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:pretty_assertions",
"@crate_index//:prost-types",
Expand Down
7 changes: 4 additions & 3 deletions cas/grpc_service/capabilities_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use tonic::{Request, Response, Status};

use config::cas_server::{CapabilitiesConfig, InstanceName};
use digest_hasher::default_digest_hasher_func;
use error::{Error, ResultExt};
use proto::build::bazel::remote::execution::v2::{
capabilities_server::Capabilities, capabilities_server::CapabilitiesServer as Server,
Expand Down Expand Up @@ -85,7 +86,7 @@ impl Capabilities for CapabilitiesServer {
let instance_name = request.into_inner().instance_name;
let maybe_supported_node_properties = self.supported_node_properties_for_instance.get(&instance_name);
let execution_capabilities = maybe_supported_node_properties.map(|props_for_instance| ExecutionCapabilities {
digest_function: DigestFunction::Sha256.into(),
digest_function: default_digest_hasher_func().proto_digest_func().into(),
exec_enabled: true, // TODO(blaise.bruer) Make this configurable.
execution_priority_capabilities: Some(PriorityCapabilities {
priorities: vec![PriorityRange {
Expand All @@ -94,12 +95,12 @@ impl Capabilities for CapabilitiesServer {
}],
}),
supported_node_properties: props_for_instance.clone(),
digest_functions: vec![DigestFunction::Sha256.into()],
digest_functions: vec![DigestFunction::Sha256.into(), DigestFunction::Blake3.into()],
});

let resp = ServerCapabilities {
cache_capabilities: Some(CacheCapabilities {
digest_functions: vec![DigestFunction::Sha256.into()],
digest_functions: vec![DigestFunction::Sha256.into(), DigestFunction::Blake3.into()],
action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities { update_enabled: true }),
cache_priority_capabilities: None,
max_batch_total_size_bytes: MAX_BATCH_TOTAL_SIZE,
Expand Down
15 changes: 14 additions & 1 deletion cas/grpc_service/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use ac_utils::get_and_decode_digest;
use action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use common::{log, DigestInfo};
use config::cas_server::{ExecutionConfig, InstanceName};
use digest_hasher::DigestHasherFunc;
use error::{make_input_err, Error, ResultExt};
use platform_property_manager::PlatformProperties;
use proto::build::bazel::remote::execution::v2::{
Expand Down Expand Up @@ -54,6 +55,7 @@ impl InstanceInfo {
action: &Action,
priority: i32,
skip_cache_lookup: bool,
digest_function: DigestHasherFunc,
) -> Result<ActionInfo, Error> {
let command_digest = DigestInfo::try_from(
action
Expand Down Expand Up @@ -124,6 +126,7 @@ impl InstanceInfo {
},
},
skip_cache_lookup,
digest_function,
})
}
}
Expand Down Expand Up @@ -194,7 +197,17 @@ impl ExecutionServer {

let action = get_and_decode_digest::<Action>(instance_info.cas_pin(), &digest).await?;
let action_info = instance_info
.build_action_info(instance_name, digest, &action, priority, execute_req.skip_cache_lookup)
.build_action_info(
instance_name,
digest,
&action,
priority,
execute_req.skip_cache_lookup,
execute_req
.digest_function
.try_into()
.err_tip(|| "Could not convert digest function in inner_execute()")?,
)
.await?;

let rx = instance_info
Expand Down
2 changes: 2 additions & 0 deletions cas/grpc_service/tests/worker_api_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tonic::Request;
use action_messages::{ActionInfo, ActionInfoHashKey, ActionStage};
use common::DigestInfo;
use config::cas_server::WorkerApiConfig;
use digest_hasher::DigestHasherFunc;
use error::{Error, ResultExt};
use platform_property_manager::PlatformProperties;
use proto::build::bazel::remote::execution::v2::{
Expand Down Expand Up @@ -269,6 +270,7 @@ pub mod execution_response_tests {
salt: SALT,
},
skip_cache_lookup: true,
digest_function: DigestHasherFunc::Sha256,
};
let mut client_action_state_receiver = test_context.scheduler.add_action(action_info).await?;

Expand Down
3 changes: 3 additions & 0 deletions cas/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ rust_library(
":action_messages",
":platform_property_manager",
"//util:common",
"//util:digest_hasher",
],
)

Expand Down Expand Up @@ -256,6 +257,7 @@ rust_library(
":platform_property_manager",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"//util:metrics_utils",
"@crate_index//:blake3",
Expand All @@ -273,6 +275,7 @@ rust_test(
":platform_property_manager",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:pretty_assertions",
"@crate_index//:tokio",
Expand Down
18 changes: 12 additions & 6 deletions cas/scheduler/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ use prost::Message;
use prost_types::Any;

use common::{DigestInfo, HashMapExt, VecExt};
use digest_hasher::DigestHasherFunc;
use error::{error_if, make_input_err, Error, ResultExt};
use metrics_utils::{CollectorState, MetricsComponent};
use platform_property_manager::PlatformProperties;
use prost::bytes::Bytes;
use proto::build::bazel::remote::execution::v2::{
digest_function, execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata,
ExecuteRequest, ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile,
OutputSymlink, SymlinkNode,
execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest,
ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile, OutputSymlink,
SymlinkNode,
};
use proto::google::longrunning::{operation::Result as LongRunningResult, Operation};
use proto::google::rpc::Status;
Expand Down Expand Up @@ -145,6 +146,9 @@ pub struct ActionInfo {

/// Whether to try looking up this action in the cache.
pub skip_cache_lookup: bool,

/// The digest function this action expects.
pub digest_function: DigestHasherFunc,
}

impl ActionInfo {
Expand Down Expand Up @@ -199,6 +203,8 @@ impl ActionInfo {
salt,
},
skip_cache_lookup: execute_request.skip_cache_lookup,
digest_function: DigestHasherFunc::try_from(execute_request.digest_function)
.err_tip(|| "Could not find digest_function in try_from_action_and_execute_request_with_salt")?,
})
}
}
Expand All @@ -212,7 +218,7 @@ impl From<ActionInfo> for ExecuteRequest {
skip_cache_lookup: true, // The worker should never cache lookup.
execution_policy: None, // Not used in the worker.
results_cache_policy: None, // Not used in the worker.
digest_function: digest_function::Value::Sha256.into(),
digest_function: val.digest_function.proto_digest_func().into(),
}
}
}
Expand Down Expand Up @@ -587,8 +593,8 @@ impl Default for ActionResult {
output_directory_symlinks: Default::default(),
output_file_symlinks: Default::default(),
exit_code: INTERNAL_ERROR_EXIT_CODE,
stdout_digest: DigestInfo::empty_digest(),
stderr_digest: DigestInfo::empty_digest(),
stdout_digest: DigestInfo::new([0u8; 32], 0),
stderr_digest: DigestInfo::new([0u8; 32], 0),
execution_metadata: ExecutionMetadata {
worker: "".to_string(),
queued_timestamp: SystemTime::UNIX_EPOCH,
Expand Down
5 changes: 5 additions & 0 deletions cas/scheduler/tests/action_messages_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::{Duration, SystemTime};

use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata};
use common::DigestInfo;
use digest_hasher::DigestHasherFunc;
use error::Error;
use platform_property_manager::PlatformProperties;
use proto::build::bazel::remote::execution::v2::ExecuteResponse;
Expand Down Expand Up @@ -118,6 +119,7 @@ mod action_messages_tests {
salt: 0,
},
skip_cache_lookup: true,
digest_function: DigestHasherFunc::Sha256,
});
let lowest_priority_action = Arc::new(ActionInfo {
command_digest: DigestInfo::new([0u8; 32], 0),
Expand All @@ -135,6 +137,7 @@ mod action_messages_tests {
salt: 0,
},
skip_cache_lookup: true,
digest_function: DigestHasherFunc::Sha256,
});
let mut action_set = BTreeSet::<Arc<ActionInfo>>::new();
action_set.insert(lowest_priority_action.clone());
Expand Down Expand Up @@ -168,6 +171,7 @@ mod action_messages_tests {
salt: 0,
},
skip_cache_lookup: true,
digest_function: DigestHasherFunc::Sha256,
});
let current_action = Arc::new(ActionInfo {
command_digest: DigestInfo::new([0u8; 32], 0),
Expand All @@ -185,6 +189,7 @@ mod action_messages_tests {
salt: 0,
},
skip_cache_lookup: true,
digest_function: DigestHasherFunc::Sha256,
});
let mut action_set = BTreeSet::<Arc<ActionInfo>>::new();
action_set.insert(current_action.clone());
Expand Down
12 changes: 6 additions & 6 deletions cas/scheduler/tests/simple_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ mod scheduler_tests {

let unique_qualifier = ActionInfoHashKey {
instance_name: "".to_string(),
digest: DigestInfo::empty_digest(),
digest: DigestInfo::zero_digest(),
salt: 0,
};

Expand Down Expand Up @@ -421,7 +421,7 @@ mod scheduler_tests {
let mut expected_action_state = ActionState {
unique_qualifier: ActionInfoHashKey {
instance_name: "".to_string(),
digest: DigestInfo::empty_digest(),
digest: DigestInfo::zero_digest(),
salt: 0,
}, // Will be filled later.
stage: ActionStage::Queued,
Expand Down Expand Up @@ -565,7 +565,7 @@ mod scheduler_tests {
// Name is a random string, so we ignore it and just make it the same.
unique_qualifier: ActionInfoHashKey {
instance_name: "".to_string(),
digest: DigestInfo::empty_digest(),
digest: DigestInfo::zero_digest(),
salt: 0,
},
stage: ActionStage::Executing,
Expand Down Expand Up @@ -931,7 +931,7 @@ mod scheduler_tests {
let mut expected_action_state = ActionState {
unique_qualifier: ActionInfoHashKey {
instance_name: "".to_string(),
digest: DigestInfo::empty_digest(),
digest: DigestInfo::zero_digest(),
salt: 0,
}, // Will be filled later.
stage: ActionStage::Executing,
Expand Down Expand Up @@ -1313,8 +1313,8 @@ mod scheduler_tests {
output_file_symlinks: Vec::default(),
output_directory_symlinks: Vec::default(),
exit_code: INTERNAL_ERROR_EXIT_CODE,
stdout_digest: DigestInfo::empty_digest(),
stderr_digest: DigestInfo::empty_digest(),
stdout_digest: DigestInfo::zero_digest(),
stderr_digest: DigestInfo::zero_digest(),
execution_metadata: ExecutionMetadata {
worker: WORKER_ID.to_string(),
queued_timestamp: SystemTime::UNIX_EPOCH,
Expand Down
2 changes: 2 additions & 0 deletions cas/scheduler/tests/utils/scheduler_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};

use action_messages::{ActionInfo, ActionInfoHashKey};
use common::DigestInfo;
use digest_hasher::DigestHasherFunc;
use platform_property_manager::PlatformProperties;

pub const INSTANCE_NAME: &str = "foobar_instance_name";
Expand All @@ -38,5 +39,6 @@ pub fn make_base_action_info(insert_timestamp: SystemTime) -> ActionInfo {
salt: 0,
},
skip_cache_lookup: false,
digest_function: DigestHasherFunc::Sha256,
}
}
3 changes: 3 additions & 0 deletions cas/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ rust_library(
"//config",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"//util:metrics_utils",
"@crate_index//:async-lock",
Expand Down Expand Up @@ -126,6 +127,7 @@ rust_library(
"//cas/scheduler:action_messages",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:async-lock",
"@crate_index//:tokio",
Expand Down Expand Up @@ -174,6 +176,7 @@ rust_test(
"//config",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:env_logger",
"@crate_index//:pretty_assertions",
Expand Down
7 changes: 6 additions & 1 deletion cas/worker/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tonic::{transport::Channel as TonicChannel, Streaming};
use action_messages::{ActionResult, ActionStage};
use common::{fs, log};
use config::cas_server::LocalWorkerConfig;
use digest_hasher::DigestHasherFunc;
use error::{make_err, make_input_err, Code, Error, ResultExt};
use fast_slow_store::FastSlowStore;
use metrics_utils::{AsyncCounterWrapper, Collector, CollectorState, CounterWithTime, MetricsComponent, Registry};
Expand Down Expand Up @@ -207,6 +208,10 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
let salt = start_execute.salt;
let worker_id = self.worker_id.clone();
let action_digest = start_execute.execute_request.as_ref().and_then(|v| v.action_digest.clone());
let try_hasher = start_execute.execute_request.as_ref()
.ok_or(make_input_err!("Expected execute_request to be set"))
.and_then(|v| DigestHasherFunc::try_from(v.digest_function))
.err_tip(|| "In LocalWorkerImpl::new()");
let running_actions_manager = self.running_actions_manager.clone();
let worker_id_clone = worker_id.clone();
let precondition_script_cfg = self.config.precondition_script.clone();
Expand Down Expand Up @@ -245,7 +250,7 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
Ok(mut action_result) => {
// Save in the action cache before notifying the scheduler that we've completed.
if let Some(digest_info) = action_digest.clone().and_then(|action_digest| action_digest.try_into().ok()) {
if let Err(err) = running_actions_manager.cache_action_result(digest_info, &mut action_result).await {
if let Err(err) = running_actions_manager.cache_action_result(digest_info, &mut action_result, try_hasher?).await {
log::error!("\x1b[0;31mError saving action in store\x1b[0m: {} - {:?}", err, action_digest);
}
}
Expand Down

0 comments on commit f684028

Please sign in to comment.