Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Blake3 digest support #403

Merged
merged 1 commit into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cas/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rust_binary(
"//cas/worker:local_worker",
"//config",
"//util:common",
"//util:digest_hasher",
"//util:error",
"//util:metrics_utils",
"@crate_index//:async-lock",
Expand Down
11 changes: 10 additions & 1 deletion cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ use capabilities_server::CapabilitiesServer;
use cas_server::CasServer;
use common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
use common::log;
use config::cas_server::{CasConfig, CompressionAlgorithm, GlobalConfig, ServerConfig, WorkerConfig};
use config::cas_server::{
CasConfig, CompressionAlgorithm, ConfigDigestHashFunction, GlobalConfig, ServerConfig, WorkerConfig,
};
use default_scheduler_factory::scheduler_factory;
use default_store_factory::store_factory;
use digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc};
use error::{make_err, Code, Error, ResultExt};
use execution_server::ExecutionServer;
use local_worker::new_local_worker;
Expand Down Expand Up @@ -632,10 +635,16 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
};
service.prometheus.is_none()
}),
default_digest_hash_function: None,
}
};
set_open_file_limit(global_cfg.max_open_files);
set_idle_file_descriptor_timeout(Duration::from_millis(global_cfg.idle_file_descriptor_timeout_millis))?;
set_default_digest_hasher_func(DigestHasherFunc::from(
global_cfg
.default_digest_hash_function
.unwrap_or(ConfigDigestHashFunction::sha256),
))?;
!global_cfg.disable_metrics
};
// Override metrics enabled if the environment variable is set.
Expand Down
4 changes: 4 additions & 0 deletions cas/grpc_service/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ rust_library(
"//cas/store",
"//config",
"//proto",
"//util:digest_hasher",
"//util:error",
"@crate_index//:tonic",
],
Expand Down Expand Up @@ -85,6 +86,7 @@ rust_library(
"//config",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:futures",
"@crate_index//:rand",
Expand All @@ -107,6 +109,7 @@ rust_library(
"//config",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:futures",
"@crate_index//:tokio",
Expand All @@ -128,6 +131,7 @@ rust_test(
"//config",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:pretty_assertions",
"@crate_index//:prost-types",
Expand Down
7 changes: 4 additions & 3 deletions cas/grpc_service/capabilities_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use tonic::{Request, Response, Status};

use config::cas_server::{CapabilitiesConfig, InstanceName};
use digest_hasher::default_digest_hasher_func;
use error::{Error, ResultExt};
use proto::build::bazel::remote::execution::v2::{
capabilities_server::Capabilities, capabilities_server::CapabilitiesServer as Server,
Expand Down Expand Up @@ -85,7 +86,7 @@ impl Capabilities for CapabilitiesServer {
let instance_name = request.into_inner().instance_name;
let maybe_supported_node_properties = self.supported_node_properties_for_instance.get(&instance_name);
let execution_capabilities = maybe_supported_node_properties.map(|props_for_instance| ExecutionCapabilities {
digest_function: DigestFunction::Sha256.into(),
digest_function: default_digest_hasher_func().proto_digest_func().into(),
exec_enabled: true, // TODO(blaise.bruer) Make this configurable.
execution_priority_capabilities: Some(PriorityCapabilities {
priorities: vec![PriorityRange {
Expand All @@ -94,12 +95,12 @@ impl Capabilities for CapabilitiesServer {
}],
}),
supported_node_properties: props_for_instance.clone(),
digest_functions: vec![DigestFunction::Sha256.into()],
digest_functions: vec![DigestFunction::Sha256.into(), DigestFunction::Blake3.into()],
});

let resp = ServerCapabilities {
cache_capabilities: Some(CacheCapabilities {
digest_functions: vec![DigestFunction::Sha256.into()],
digest_functions: vec![DigestFunction::Sha256.into(), DigestFunction::Blake3.into()],
action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities { update_enabled: true }),
cache_priority_capabilities: None,
max_batch_total_size_bytes: MAX_BATCH_TOTAL_SIZE,
Expand Down
15 changes: 14 additions & 1 deletion cas/grpc_service/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use ac_utils::get_and_decode_digest;
use action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use common::{log, DigestInfo};
use config::cas_server::{ExecutionConfig, InstanceName};
use digest_hasher::DigestHasherFunc;
use error::{make_input_err, Error, ResultExt};
use platform_property_manager::PlatformProperties;
use proto::build::bazel::remote::execution::v2::{
Expand Down Expand Up @@ -54,6 +55,7 @@ impl InstanceInfo {
action: &Action,
priority: i32,
skip_cache_lookup: bool,
digest_function: DigestHasherFunc,
) -> Result<ActionInfo, Error> {
let command_digest = DigestInfo::try_from(
action
Expand Down Expand Up @@ -124,6 +126,7 @@ impl InstanceInfo {
},
},
skip_cache_lookup,
digest_function,
})
}
}
Expand Down Expand Up @@ -194,7 +197,17 @@ impl ExecutionServer {

let action = get_and_decode_digest::<Action>(instance_info.cas_pin(), &digest).await?;
let action_info = instance_info
.build_action_info(instance_name, digest, &action, priority, execute_req.skip_cache_lookup)
.build_action_info(
instance_name,
digest,
&action,
priority,
execute_req.skip_cache_lookup,
execute_req
.digest_function
.try_into()
.err_tip(|| "Could not convert digest function in inner_execute()")?,
)
.await?;

let rx = instance_info
Expand Down
2 changes: 2 additions & 0 deletions cas/grpc_service/tests/worker_api_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tonic::Request;
use action_messages::{ActionInfo, ActionInfoHashKey, ActionStage};
use common::DigestInfo;
use config::cas_server::WorkerApiConfig;
use digest_hasher::DigestHasherFunc;
use error::{Error, ResultExt};
use platform_property_manager::PlatformProperties;
use proto::build::bazel::remote::execution::v2::{
Expand Down Expand Up @@ -269,6 +270,7 @@ pub mod execution_response_tests {
salt: SALT,
},
skip_cache_lookup: true,
digest_function: DigestHasherFunc::Sha256,
};
let mut client_action_state_receiver = test_context.scheduler.add_action(action_info).await?;

Expand Down
3 changes: 3 additions & 0 deletions cas/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ rust_library(
":action_messages",
":platform_property_manager",
"//util:common",
"//util:digest_hasher",
],
)

Expand Down Expand Up @@ -256,6 +257,7 @@ rust_library(
":platform_property_manager",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"//util:metrics_utils",
"@crate_index//:blake3",
Expand All @@ -273,6 +275,7 @@ rust_test(
":platform_property_manager",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:pretty_assertions",
"@crate_index//:tokio",
Expand Down
18 changes: 12 additions & 6 deletions cas/scheduler/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ use prost::Message;
use prost_types::Any;

use common::{DigestInfo, HashMapExt, VecExt};
use digest_hasher::DigestHasherFunc;
use error::{error_if, make_input_err, Error, ResultExt};
use metrics_utils::{CollectorState, MetricsComponent};
use platform_property_manager::PlatformProperties;
use prost::bytes::Bytes;
use proto::build::bazel::remote::execution::v2::{
digest_function, execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata,
ExecuteRequest, ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile,
OutputSymlink, SymlinkNode,
execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest,
ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile, OutputSymlink,
SymlinkNode,
};
use proto::google::longrunning::{operation::Result as LongRunningResult, Operation};
use proto::google::rpc::Status;
Expand Down Expand Up @@ -145,6 +146,9 @@ pub struct ActionInfo {

/// Whether to try looking up this action in the cache.
pub skip_cache_lookup: bool,

/// The digest function this action expects.
pub digest_function: DigestHasherFunc,
}

impl ActionInfo {
Expand Down Expand Up @@ -199,6 +203,8 @@ impl ActionInfo {
salt,
},
skip_cache_lookup: execute_request.skip_cache_lookup,
digest_function: DigestHasherFunc::try_from(execute_request.digest_function)
.err_tip(|| "Could not find digest_function in try_from_action_and_execute_request_with_salt")?,
})
}
}
Expand All @@ -212,7 +218,7 @@ impl From<ActionInfo> for ExecuteRequest {
skip_cache_lookup: true, // The worker should never cache lookup.
execution_policy: None, // Not used in the worker.
results_cache_policy: None, // Not used in the worker.
digest_function: digest_function::Value::Sha256.into(),
digest_function: val.digest_function.proto_digest_func().into(),
}
}
}
Expand Down Expand Up @@ -587,8 +593,8 @@ impl Default for ActionResult {
output_directory_symlinks: Default::default(),
output_file_symlinks: Default::default(),
exit_code: INTERNAL_ERROR_EXIT_CODE,
stdout_digest: DigestInfo::empty_digest(),
stderr_digest: DigestInfo::empty_digest(),
stdout_digest: DigestInfo::new([0u8; 32], 0),
stderr_digest: DigestInfo::new([0u8; 32], 0),
execution_metadata: ExecutionMetadata {
worker: "".to_string(),
queued_timestamp: SystemTime::UNIX_EPOCH,
Expand Down
5 changes: 5 additions & 0 deletions cas/scheduler/tests/action_messages_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::{Duration, SystemTime};

use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata};
use common::DigestInfo;
use digest_hasher::DigestHasherFunc;
use error::Error;
use platform_property_manager::PlatformProperties;
use proto::build::bazel::remote::execution::v2::ExecuteResponse;
Expand Down Expand Up @@ -118,6 +119,7 @@ mod action_messages_tests {
salt: 0,
},
skip_cache_lookup: true,
digest_function: DigestHasherFunc::Sha256,
});
let lowest_priority_action = Arc::new(ActionInfo {
command_digest: DigestInfo::new([0u8; 32], 0),
Expand All @@ -135,6 +137,7 @@ mod action_messages_tests {
salt: 0,
},
skip_cache_lookup: true,
digest_function: DigestHasherFunc::Sha256,
});
let mut action_set = BTreeSet::<Arc<ActionInfo>>::new();
action_set.insert(lowest_priority_action.clone());
Expand Down Expand Up @@ -168,6 +171,7 @@ mod action_messages_tests {
salt: 0,
},
skip_cache_lookup: true,
digest_function: DigestHasherFunc::Sha256,
});
let current_action = Arc::new(ActionInfo {
command_digest: DigestInfo::new([0u8; 32], 0),
Expand All @@ -185,6 +189,7 @@ mod action_messages_tests {
salt: 0,
},
skip_cache_lookup: true,
digest_function: DigestHasherFunc::Sha256,
});
let mut action_set = BTreeSet::<Arc<ActionInfo>>::new();
action_set.insert(current_action.clone());
Expand Down
12 changes: 6 additions & 6 deletions cas/scheduler/tests/simple_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ mod scheduler_tests {

let unique_qualifier = ActionInfoHashKey {
instance_name: "".to_string(),
digest: DigestInfo::empty_digest(),
digest: DigestInfo::zero_digest(),
salt: 0,
};

Expand Down Expand Up @@ -421,7 +421,7 @@ mod scheduler_tests {
let mut expected_action_state = ActionState {
unique_qualifier: ActionInfoHashKey {
instance_name: "".to_string(),
digest: DigestInfo::empty_digest(),
digest: DigestInfo::zero_digest(),
salt: 0,
}, // Will be filled later.
stage: ActionStage::Queued,
Expand Down Expand Up @@ -565,7 +565,7 @@ mod scheduler_tests {
// Name is a random string, so we ignore it and just make it the same.
unique_qualifier: ActionInfoHashKey {
instance_name: "".to_string(),
digest: DigestInfo::empty_digest(),
digest: DigestInfo::zero_digest(),
salt: 0,
},
stage: ActionStage::Executing,
Expand Down Expand Up @@ -931,7 +931,7 @@ mod scheduler_tests {
let mut expected_action_state = ActionState {
unique_qualifier: ActionInfoHashKey {
instance_name: "".to_string(),
digest: DigestInfo::empty_digest(),
digest: DigestInfo::zero_digest(),
salt: 0,
}, // Will be filled later.
stage: ActionStage::Executing,
Expand Down Expand Up @@ -1313,8 +1313,8 @@ mod scheduler_tests {
output_file_symlinks: Vec::default(),
output_directory_symlinks: Vec::default(),
exit_code: INTERNAL_ERROR_EXIT_CODE,
stdout_digest: DigestInfo::empty_digest(),
stderr_digest: DigestInfo::empty_digest(),
stdout_digest: DigestInfo::zero_digest(),
stderr_digest: DigestInfo::zero_digest(),
execution_metadata: ExecutionMetadata {
worker: WORKER_ID.to_string(),
queued_timestamp: SystemTime::UNIX_EPOCH,
Expand Down
2 changes: 2 additions & 0 deletions cas/scheduler/tests/utils/scheduler_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};

use action_messages::{ActionInfo, ActionInfoHashKey};
use common::DigestInfo;
use digest_hasher::DigestHasherFunc;
use platform_property_manager::PlatformProperties;

pub const INSTANCE_NAME: &str = "foobar_instance_name";
Expand All @@ -38,5 +39,6 @@ pub fn make_base_action_info(insert_timestamp: SystemTime) -> ActionInfo {
salt: 0,
},
skip_cache_lookup: false,
digest_function: DigestHasherFunc::Sha256,
}
}
3 changes: 3 additions & 0 deletions cas/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ rust_library(
"//config",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"//util:metrics_utils",
"@crate_index//:async-lock",
Expand Down Expand Up @@ -126,6 +127,7 @@ rust_library(
"//cas/scheduler:action_messages",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:async-lock",
"@crate_index//:tokio",
Expand Down Expand Up @@ -174,6 +176,7 @@ rust_test(
"//config",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:env_logger",
"@crate_index//:pretty_assertions",
Expand Down
7 changes: 6 additions & 1 deletion cas/worker/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tonic::{transport::Channel as TonicChannel, Streaming};
use action_messages::{ActionResult, ActionStage};
use common::{fs, log};
use config::cas_server::LocalWorkerConfig;
use digest_hasher::DigestHasherFunc;
use error::{make_err, make_input_err, Code, Error, ResultExt};
use fast_slow_store::FastSlowStore;
use metrics_utils::{AsyncCounterWrapper, Collector, CollectorState, CounterWithTime, MetricsComponent, Registry};
Expand Down Expand Up @@ -207,6 +208,10 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
let salt = start_execute.salt;
let worker_id = self.worker_id.clone();
let action_digest = start_execute.execute_request.as_ref().and_then(|v| v.action_digest.clone());
let try_hasher = start_execute.execute_request.as_ref()
.ok_or(make_input_err!("Expected execute_request to be set"))
.and_then(|v| DigestHasherFunc::try_from(v.digest_function))
.err_tip(|| "In LocalWorkerImpl::new()");
let running_actions_manager = self.running_actions_manager.clone();
let worker_id_clone = worker_id.clone();
let precondition_script_cfg = self.config.precondition_script.clone();
Expand Down Expand Up @@ -245,7 +250,7 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
Ok(mut action_result) => {
// Save in the action cache before notifying the scheduler that we've completed.
if let Some(digest_info) = action_digest.clone().and_then(|action_digest| action_digest.try_into().ok()) {
if let Err(err) = running_actions_manager.cache_action_result(digest_info, &mut action_result).await {
if let Err(err) = running_actions_manager.cache_action_result(digest_info, &mut action_result, try_hasher?).await {
log::error!("\x1b[0;31mError saving action in store\x1b[0m: {} - {:?}", err, action_digest);
}
}
Expand Down
Loading