Skip to content

Commit

Permalink
Implement a GrpcScheduler to forward.
Browse files Browse the repository at this point in the history
This implements an ActionScheduler that is able to forward to an upstream
GrpcScheduler.  This allows proxying scheduler tasks from one instance to
another.  This is useful when combined with a CacheLookupScheduler.
  • Loading branch information
chrisstaite-menlo committed Jul 11, 2023
1 parent 2fd1702 commit bf0b933
Show file tree
Hide file tree
Showing 14 changed files with 327 additions and 72 deletions.
55 changes: 31 additions & 24 deletions cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use clap::Parser;
use futures::future::{select_all, BoxFuture, TryFutureExt};
use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt};
use json5;
use runfiles::Runfiles;
use tonic::codec::CompressionEncoding;
Expand Down Expand Up @@ -262,29 +262,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.err_tip(|| "Could not create ByteStream service")?,
)
.add_optional_service(
services
.capabilities
.map_or(Ok(None), |cfg| {
CapabilitiesServer::new(&cfg, &action_schedulers).and_then(|v| {
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
service = service.send_compressed(encoding);
}
for encoding in server_cfg
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
{
service = service.accept_compressed(encoding);
}
Ok(Some(service))
})
})
.err_tip(|| "Could not create Capabilities service")?,
OptionFuture::from(
services
.capabilities
.as_ref()
// Borrow checker fighting here...
.map(|_| CapabilitiesServer::new(&services.capabilities.as_ref().unwrap(), &action_schedulers)),
)
.await
.map_or(Ok::<Option<CapabilitiesServer>, Error>(None), |server| {
Ok(Some(server?))
})
.err_tip(|| "Could not create Capabilities service")?
.and_then(|v| {
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
service = service.send_compressed(encoding);
}
for encoding in server_cfg
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
{
service = service.accept_compressed(encoding);
}
Some(service)
}),
)
.add_optional_service(
services
Expand Down
9 changes: 7 additions & 2 deletions cas/grpc_service/capabilities_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct CapabilitiesServer {
}

impl CapabilitiesServer {
pub fn new(
pub async fn new(
config: &HashMap<InstanceName, CapabilitiesConfig>,
scheduler_map: &HashMap<String, Arc<dyn ActionScheduler>>,
) -> Result<Self, Error> {
Expand All @@ -54,7 +54,12 @@ impl CapabilitiesServer {
})?
.clone();

for (platform_key, _) in scheduler.get_platform_property_manager().get_known_properties() {
for (platform_key, _) in scheduler
.get_platform_property_manager(&instance_name)
.await
.err_tip(|| format!("Failed to get platform properties for {}", instance_name))?
.get_known_properties()
{
properties.push(platform_key.clone());
}
}
Expand Down
17 changes: 10 additions & 7 deletions cas/grpc_service/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio_stream::wrappers::WatchStream;
use tonic::{Request, Response, Status};

use ac_utils::get_and_decode_digest;
use action_messages::{ActionInfo, ActionInfoHashKey};
use action_messages::{ActionInfo, ActionInfoHashKey, DEFAULT_EXECUTION_PRIORITY};
use common::{log, DigestInfo};
use config::cas_server::{ExecutionConfig, InstanceName};
use error::{make_input_err, Error, ResultExt};
Expand All @@ -36,9 +36,6 @@ use proto::google::longrunning::Operation;
use scheduler::ActionScheduler;
use store::{Store, StoreManager};

/// Default priority remote execution jobs will get when not provided.
const DEFAULT_EXECUTION_PRIORITY: i32 = 0;

struct InstanceInfo {
scheduler: Arc<dyn ActionScheduler>,
cas_store: Arc<dyn Store>,
Expand All @@ -55,6 +52,7 @@ impl InstanceInfo {
action_digest: DigestInfo,
action: &Action,
priority: i32,
skip_cache_lookup: bool,
) -> Result<ActionInfo, Error> {
let command_digest = DigestInfo::try_from(
action
Expand All @@ -81,7 +79,9 @@ impl InstanceInfo {
for property in &platform.properties {
let platform_property = self
.scheduler
.get_platform_property_manager()
.get_platform_property_manager(&instance_name)
.await
.err_tip(|| "Failed to get platform properties in build_action_info")?
.make_prop_value(&property.name, &property.value)
.err_tip(|| "Failed to convert platform property in build_action_info")?;
platform_properties.insert(property.name.clone(), platform_property);
Expand All @@ -95,7 +95,9 @@ impl InstanceInfo {
for property in &platform.properties {
let platform_property = self
.scheduler
.get_platform_property_manager()
.get_platform_property_manager(&instance_name)
.await
.err_tip(|| "Failed to get platform properties in build_action_info")?
.make_prop_value(&property.name, &property.value)
.err_tip(|| "Failed to convert command platform property in build_action_info")?;
platform_properties.insert(property.name.clone(), platform_property);
Expand All @@ -120,6 +122,7 @@ impl InstanceInfo {
0
},
},
skip_cache_lookup,
})
}
}
Expand Down Expand Up @@ -182,7 +185,7 @@ impl ExecutionServer {

let action = get_and_decode_digest::<Action>(instance_info.cas_pin(), &digest).await?;
let action_info = instance_info
.build_action_info(instance_name, digest, &action, priority)
.build_action_info(instance_name, digest, &action, priority, execute_req.skip_cache_lookup)
.await?;

let rx = instance_info
Expand Down
1 change: 1 addition & 0 deletions cas/grpc_service/tests/worker_api_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ pub mod execution_response_tests {
digest: action_digest.clone(),
salt: SALT,
},
skip_cache_lookup: true,
};
let mut client_action_state_receiver = test_context.scheduler.add_action(action_info).await?;

Expand Down
23 changes: 23 additions & 0 deletions cas/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,28 @@ rust_library(
],
)

rust_library(
name = "grpc_scheduler",
srcs = ["grpc_scheduler.rs"],
visibility = [
"//cas:__pkg__",
"//cas:__subpackages__",
],
proc_macro_deps = ["@crate_index//:async-trait"],
deps = [
":action_messages",
":platform_property_manager",
":scheduler",
"//config",
"//proto",
"//util:common",
"//util:error",
"@crate_index//:parking_lot",
"@crate_index//:tokio",
"@crate_index//:tonic",
],
)

rust_library(
name = "default_scheduler_factory",
srcs = ["default_scheduler_factory.rs"],
Expand All @@ -63,6 +85,7 @@ rust_library(
"//cas:__subpackages__",
],
deps = [
":grpc_scheduler",
":scheduler",
":simple_scheduler",
"//config",
Expand Down
92 changes: 91 additions & 1 deletion cas/scheduler/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use prost_types::Any;
use sha2::{digest::Update as _, Digest as _, Sha256};

use common::{DigestInfo, HashMapExt, VecExt};
use error::{make_input_err, Error, ResultExt};
use error::{error_if, make_input_err, Error, ResultExt};
use platform_property_manager::PlatformProperties;
use prost::bytes::Bytes;
use proto::build::bazel::remote::execution::v2::{
Expand All @@ -35,6 +35,9 @@ use proto::build::bazel::remote::execution::v2::{
use proto::google::longrunning::{operation::Result as LongRunningResult, Operation};
use proto::google::rpc::Status;

/// Default priority remote execution jobs will get when not provided.
pub const DEFAULT_EXECUTION_PRIORITY: i32 = 0;

/// This is a utility struct used to make it easier to match `ActionInfos` in a
/// `HashMap` without needing to construct an entire `ActionInfo`.
/// Since the hashing only needs the digest and salt we can just alias them here
Expand Down Expand Up @@ -96,6 +99,9 @@ pub struct ActionInfo {
/// return a temporary reference we must have an object tied to ActionInfo's lifetime and
/// return it's reference.
pub unique_qualifier: ActionInfoHashKey,

/// Whether to try looking up this action in the cache.
pub skip_cache_lookup: bool,
}

impl ActionInfo {
Expand Down Expand Up @@ -144,6 +150,7 @@ impl ActionInfo {
.try_into()?,
salt,
},
skip_cache_lookup: execute_request.skip_cache_lookup,
})
}
}
Expand Down Expand Up @@ -488,6 +495,9 @@ impl TryFrom<ExecutedActionMetadata> for ExecutionMetadata {
}
}

/// Exit code sent if there is an internal error.
pub const INTERNAL_ERROR_EXIT_CODE: i32 = -178;

/// Represents the results of an execution.
/// This struct must be 100% compatible with `ActionResult` in `remote_execution.proto`.
#[derive(Eq, PartialEq, Debug, Clone)]
Expand All @@ -503,6 +513,33 @@ pub struct ActionResult {
pub server_logs: HashMap<String, DigestInfo>,
}

impl Default for ActionResult {
fn default() -> Self {
ActionResult {
output_files: Default::default(),
output_folders: Default::default(),
output_directory_symlinks: Default::default(),
output_file_symlinks: Default::default(),
exit_code: INTERNAL_ERROR_EXIT_CODE,
stdout_digest: DigestInfo::empty_digest(),
stderr_digest: DigestInfo::empty_digest(),
execution_metadata: ExecutionMetadata {
worker: "".to_string(),
queued_timestamp: SystemTime::UNIX_EPOCH,
worker_start_timestamp: SystemTime::UNIX_EPOCH,
worker_completed_timestamp: SystemTime::UNIX_EPOCH,
input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
execution_start_timestamp: SystemTime::UNIX_EPOCH,
execution_completed_timestamp: SystemTime::UNIX_EPOCH,
output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
},
server_logs: Default::default(),
}
}
}

/// The execution status/stage. This should match `ExecutionStage::Value` in `remote_execution.proto`.
#[derive(PartialEq, Debug, Clone)]
pub enum ActionStage {
Expand Down Expand Up @@ -672,6 +709,59 @@ impl TryFrom<ExecuteResponse> for ActionStage {
}
}

impl TryFrom<Operation> for ActionState {
type Error = Error;

fn try_from(operation: Operation) -> Result<ActionState, Error> {
let metadata_data = operation.metadata.err_tip(|| "No metadata in upstream operation")?;
error_if!(
metadata_data.type_url != "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse",
"Incorrect metadata structure in upstream operation. {} != type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse",
metadata_data.type_url
);
let metadata = ExecuteOperationMetadata::decode(metadata_data.value.as_slice())
.err_tip(|| "Could not decode metadata in upstream operation")?;

let action_digest = metadata
.action_digest
.err_tip(|| "No action digest in upstream operation metadata")?
.try_into()
.err_tip(|| "Could not convert Digest to DigestInfo")?;

let stage = match execution_stage::Value::from_i32(metadata.stage)
.err_tip(|| format!("Could not convert {} to execution_stage::Value", metadata.stage))?
{
execution_stage::Value::Unknown => ActionStage::Unknown,
execution_stage::Value::CacheCheck => ActionStage::CacheCheck,
execution_stage::Value::Queued => ActionStage::Queued,
execution_stage::Value::Executing => ActionStage::Executing,
execution_stage::Value::Completed => {
let execute_response = operation
.result
.err_tip(|| "No result data for completed upstream action")?;
match execute_response {
LongRunningResult::Error(error) => ActionStage::Error((error.into(), ActionResult::default())),
LongRunningResult::Response(response) => {
// Could be Completed, CompletedFromCache or Error.
error_if!(
response.type_url != "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse",
"Incorrect result structure for completed upstream action. {} != type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse",
response.type_url
);
ExecuteResponse::decode(response.value.as_slice())?.try_into()?
}
}
}
};

Ok(Self {
name: operation.name,
action_digest,
stage,
})
}
}

/// Current state of the action.
/// This must be 100% compatible with `Operation` in `google/longrunning/operations.proto`.
#[derive(PartialEq, Debug, Clone)]
Expand Down
2 changes: 2 additions & 0 deletions cas/scheduler/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use futures::Future;

use config::schedulers::SchedulerConfig;
use error::Error;
use grpc_scheduler::GrpcScheduler;
use scheduler::{ActionScheduler, WorkerScheduler};
use simple_scheduler::SimpleScheduler;

Expand All @@ -33,6 +34,7 @@ pub fn scheduler_factory<'a>(
let scheduler = Arc::new(SimpleScheduler::new(&config));
(Some(scheduler.clone()), Some(scheduler))
}
SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(&config).await?)), None),
};
Ok(scheduler)
})
Expand Down

0 comments on commit bf0b933

Please sign in to comment.