Skip to content

Commit

Permalink
Adds ability in proto for worker to return internal errors
Browse files Browse the repository at this point in the history
Updates the worker_api.proto so it can return error codes for
internal errors in the event that an internal error occurred.
  • Loading branch information
allada committed Apr 18, 2022
1 parent d935d7f commit 576aff4
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 73 deletions.
127 changes: 67 additions & 60 deletions cas/grpc_service/tests/worker_api_server_test.rs
Expand Up @@ -17,7 +17,8 @@ use proto::build::bazel::remote::execution::v2::{
OutputDirectory, OutputFile, OutputSymlink,
};
use proto::com::github::allada::turbo_cache::remote_execution::{
update_for_worker, worker_api_server::WorkerApi, ExecuteResult, KeepAliveRequest, SupportedProperties,
execute_result, update_for_worker, worker_api_server::WorkerApi, ExecuteFinishedResult, ExecuteResult,
KeepAliveRequest, SupportedProperties,
};
use proto::google::rpc::Status as ProtoStatus;
use scheduler::Scheduler;
Expand Down Expand Up @@ -285,66 +286,68 @@ pub mod execution_response_tests {
human_readable: false, // We only support non-human readable.
},
);
let execute_result = ExecuteResult {
worker_id: test_context.worker_id.to_string(),
action_digest: Some(action_digest.clone().into()),
salt: SALT,
execute_response: Some(ExecuteResponse {
result: Some(ProtoActionResult {
output_files: vec![OutputFile {
path: "some path1".to_string(),
digest: Some(DigestInfo::new([8u8; 32], 124).into()),
is_executable: true,
contents: Default::default(), // We don't implement this.
node_properties: Some(NodeProperties {
properties: Default::default(), // We don't implement this.
mtime: Some(make_system_time(99).into()),
unix_mode: Some(12),
}),
}],
output_file_symlinks: Default::default(), // Bazel deprecated this.
output_symlinks: vec![OutputSymlink {
path: "some path3".to_string(),
target: "some target3".to_string(),
node_properties: Some(NodeProperties {
properties: Default::default(), // We don't implement this.
mtime: Some(make_system_time(97).into()),
unix_mode: Some(10),
let result = ExecuteResult {
response: Some(execute_result::Response::Result(ExecuteFinishedResult {
worker_id: test_context.worker_id.to_string(),
action_digest: Some(action_digest.clone().into()),
salt: SALT,
execute_response: Some(ExecuteResponse {
result: Some(ProtoActionResult {
output_files: vec![OutputFile {
path: "some path1".to_string(),
digest: Some(DigestInfo::new([8u8; 32], 124).into()),
is_executable: true,
contents: Default::default(), // We don't implement this.
node_properties: Some(NodeProperties {
properties: Default::default(), // We don't implement this.
mtime: Some(make_system_time(99).into()),
unix_mode: Some(12),
}),
}],
output_file_symlinks: Default::default(), // Bazel deprecated this.
output_symlinks: vec![OutputSymlink {
path: "some path3".to_string(),
target: "some target3".to_string(),
node_properties: Some(NodeProperties {
properties: Default::default(), // We don't implement this.
mtime: Some(make_system_time(97).into()),
unix_mode: Some(10),
}),
}],
output_directories: vec![OutputDirectory {
path: "some path4".to_string(),
tree_digest: Some(DigestInfo::new([12u8; 32], 124).into()),
}],
output_directory_symlinks: Default::default(), // Bazel deprecated this.
exit_code: 5,
stdout_raw: Default::default(), // We don't implement this.
stdout_digest: Some(DigestInfo::new([10u8; 32], 124).into()),
stderr_raw: Default::default(), // We don't implement this.
stderr_digest: Some(DigestInfo::new([11u8; 32], 124).into()),
execution_metadata: Some(ExecutedActionMetadata {
worker: test_context.worker_id.to_string(),
queued_timestamp: Some(make_system_time(1).into()),
worker_start_timestamp: Some(make_system_time(2).into()),
worker_completed_timestamp: Some(make_system_time(3).into()),
input_fetch_start_timestamp: Some(make_system_time(4).into()),
input_fetch_completed_timestamp: Some(make_system_time(5).into()),
execution_start_timestamp: Some(make_system_time(6).into()),
execution_completed_timestamp: Some(make_system_time(7).into()),
output_upload_start_timestamp: Some(make_system_time(8).into()),
output_upload_completed_timestamp: Some(make_system_time(9).into()),
auxiliary_metadata: vec![],
}),
}],
output_directories: vec![OutputDirectory {
path: "some path4".to_string(),
tree_digest: Some(DigestInfo::new([12u8; 32], 124).into()),
}],
output_directory_symlinks: Default::default(), // Bazel deprecated this.
exit_code: 5,
stdout_raw: Default::default(), // We don't implement this.
stdout_digest: Some(DigestInfo::new([10u8; 32], 124).into()),
stderr_raw: Default::default(), // We don't implement this.
stderr_digest: Some(DigestInfo::new([11u8; 32], 124).into()),
execution_metadata: Some(ExecutedActionMetadata {
worker: test_context.worker_id.to_string(),
queued_timestamp: Some(make_system_time(1).into()),
worker_start_timestamp: Some(make_system_time(2).into()),
worker_completed_timestamp: Some(make_system_time(3).into()),
input_fetch_start_timestamp: Some(make_system_time(4).into()),
input_fetch_completed_timestamp: Some(make_system_time(5).into()),
execution_start_timestamp: Some(make_system_time(6).into()),
execution_completed_timestamp: Some(make_system_time(7).into()),
output_upload_start_timestamp: Some(make_system_time(8).into()),
output_upload_completed_timestamp: Some(make_system_time(9).into()),
auxiliary_metadata: vec![],
}),
cached_result: false,
status: Some(ProtoStatus {
code: 9,
message: "foo".to_string(),
details: Default::default(),
}),
server_logs: server_logs,
message: "TODO(blaise.bruer) We should put a reference something like bb_browser".to_string(),
}),
cached_result: false,
status: Some(ProtoStatus {
code: 9,
message: "foo".to_string(),
details: Default::default(),
}),
server_logs: server_logs,
message: "TODO(blaise.bruer) We should put a reference something like bb_browser".to_string(),
}),
})),
};
{
// Ensure our client thinks we are executing.
Expand All @@ -356,14 +359,18 @@ pub mod execution_response_tests {
// Now send the result of our execution to the scheduler.
test_context
.worker_api_server
.execution_response(Request::new(execute_result.clone()))
.execution_response(Request::new(result.clone()))
.await?;

{
// Check the result that the client would have received.
client_action_state_receiver.changed().await?;
let client_given_state = client_action_state_receiver.borrow();
let execute_response = execute_result.execute_response.unwrap();
let execute_response = if let execute_result::Response::Result(v) = result.response.unwrap() {
v.execute_response.unwrap()
} else {
panic!("Expected type to be Result");
};

assert_eq!(client_given_state.stage, execute_response.clone().try_into()?);

Expand Down
19 changes: 13 additions & 6 deletions cas/grpc_service/worker_api_server.rs
Expand Up @@ -17,8 +17,8 @@ use config::cas_server::WorkerApiConfig;
use error::{make_err, Code, Error, ResultExt};
use platform_property_manager::PlatformProperties;
use proto::com::github::allada::turbo_cache::remote_execution::{
worker_api_server::WorkerApi, worker_api_server::WorkerApiServer as Server, ExecuteResult, GoingAwayRequest,
KeepAliveRequest, SupportedProperties, UpdateForWorker,
execute_result, worker_api_server::WorkerApi, worker_api_server::WorkerApiServer as Server, ExecuteResult,
GoingAwayRequest, KeepAliveRequest, SupportedProperties, UpdateForWorker,
};
use scheduler::Scheduler;
use worker::{Worker, WorkerId};
Expand Down Expand Up @@ -134,16 +134,23 @@ impl WorkerApiServer {
}

async fn inner_execution_response(&self, execute_result: ExecuteResult) -> Result<Response<()>, Error> {
let worker_id: WorkerId = execute_result.worker_id.try_into()?;
let action_digest: DigestInfo = execute_result
let finished_result = match execute_result
.response
.err_tip(|| "Expected result to exist in ExecuteResult")?
{
execute_result::Response::Result(finished_result) => finished_result,
execute_result::Response::InternalError(e) => return Err(e.into()),
};
let worker_id: WorkerId = finished_result.worker_id.try_into()?;
let action_digest: DigestInfo = finished_result
.action_digest
.err_tip(|| "Expected action_digest to exist")?
.try_into()?;
let action_info_hash_key = ActionInfoHashKey {
digest: action_digest.clone(),
salt: execute_result.salt,
salt: finished_result.salt,
};
let action_stage = execute_result
let action_stage = finished_result
.execute_response
.err_tip(|| "Expected execute_response to exist in ExecuteResult")?
.try_into()
Expand Down
Expand Up @@ -6,6 +6,7 @@ package com.github.allada.turbo_cache.remote_execution;

import "build/bazel/remote/execution/v2/remote_execution.proto";
import "google/protobuf/empty.proto";
import "google/rpc/status.proto";

/// This API describes how schedulers communicate with Worker nodes.
///
Expand All @@ -17,7 +18,8 @@ import "google/protobuf/empty.proto";
service WorkerApi {
/// Registers this worker and informs the scheduler what properties
/// this worker supports. The response must be listened on the client
/// side for updates from the server.
/// side for updates from the server. The first item sent will always be
/// a ConnectionResult, after that it is undefined.
rpc ConnectWorker(SupportedProperties) returns (stream UpdateForWorker);

/// Message used to let the scheduler know that it is still alive as
Expand Down Expand Up @@ -75,8 +77,23 @@ message SupportedProperties {
reserved 2; // NextId.
}

/// Represents the result of an execution.

/// The result of an ExecutionRequest.
message ExecuteResult {
oneof response {
/// Result of an execution request if there were not detectable internal errors.
ExecuteFinishedResult result = 1;

/// An internal error. This is only present when an internal error happened that
/// was not recoverable. If the execution job failed but at no fault of the worker
/// it should not use this field and should send the error via ExecuteFinishedResult.
google.rpc.Status internal_error = 2;
}
reserved 3; // NextId.
}

/// Represents the result of an execution.
message ExecuteFinishedResult {
/// ID of the worker making the request.
string worker_id = 1;

Expand Down Expand Up @@ -132,7 +149,7 @@ message StartExecute {
/// The action information used to execute job.
build.bazel.remote.execution.v2.ExecuteRequest execute_request = 1;

/// See documentation in ExecuteResult::salt.
/// See documentation in ExecuteFinishedResult::salt.
uint64 salt = 2;
reserved 3; // NextId.
}
Expand Up @@ -32,9 +32,29 @@ pub struct SupportedProperties {
super::super::super::super::super::build::bazel::remote::execution::v2::platform::Property,
>,
}
//// Represents the result of an execution.
//// The result of an ExecutionRequest.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteResult {
#[prost(oneof = "execute_result::Response", tags = "1, 2")]
pub response: ::core::option::Option<execute_result::Response>,
}
/// Nested message and enum types in `ExecuteResult`.
pub mod execute_result {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Response {
//// Result of an execution request if there were not detectable internal errors.
#[prost(message, tag = "1")]
Result(super::ExecuteFinishedResult),
//// An internal error. This is only present when an internal error happened that
//// was not recoverable. If the execution job failed but at no fault of the worker
//// it should not use this field and should send the error via ExecuteFinishedResult.
#[prost(message, tag = "2")]
InternalError(super::super::super::super::super::super::google::rpc::Status),
}
}
//// Represents the result of an execution.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteFinishedResult {
//// ID of the worker making the request.
#[prost(string, tag = "1")]
pub worker_id: ::prost::alloc::string::String,
Expand Down Expand Up @@ -102,7 +122,7 @@ pub struct StartExecute {
pub execute_request: ::core::option::Option<
super::super::super::super::super::build::bazel::remote::execution::v2::ExecuteRequest,
>,
//// See documentation in ExecuteResult::salt.
//// See documentation in ExecuteFinishedResult::salt.
#[prost(uint64, tag = "2")]
pub salt: u64,
}
Expand Down Expand Up @@ -175,7 +195,8 @@ pub mod worker_api_client {
}
#[doc = "/ Registers this worker and informs the scheduler what properties"]
#[doc = "/ this worker supports. The response must be listened on the client"]
#[doc = "/ side for updates from the server."]
#[doc = "/ side for updates from the server. The first item sent will always be"]
#[doc = "/ a ConnectionResult, after that it is undefined."]
pub async fn connect_worker(
&mut self,
request: impl tonic::IntoRequest<super::SupportedProperties>,
Expand Down Expand Up @@ -275,7 +296,8 @@ pub mod worker_api_server {
+ 'static;
#[doc = "/ Registers this worker and informs the scheduler what properties"]
#[doc = "/ this worker supports. The response must be listened on the client"]
#[doc = "/ side for updates from the server."]
#[doc = "/ side for updates from the server. The first item sent will always be"]
#[doc = "/ a ConnectionResult, after that it is undefined."]
async fn connect_worker(
&self,
request: tonic::Request<super::SupportedProperties>,
Expand Down

0 comments on commit 576aff4

Please sign in to comment.