Skip to content

Commit

Permalink
Implement execution_response() in WorkerApiServer
Browse files Browse the repository at this point in the history
Implements the execution_response() function. This will allow
workers to let the scheduler and thus the client know a job has
an updated state (usually completed or errord).

This required a few API changes, but nothing dramatic.
  • Loading branch information
allada committed Apr 15, 2022
1 parent 0b68053 commit e26549a
Show file tree
Hide file tree
Showing 15 changed files with 937 additions and 110 deletions.
4 changes: 4 additions & 0 deletions cas/grpc_service/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ rust_library(
srcs = ["worker_api_server.rs"],
deps = [
"//cas/scheduler",
"//cas/scheduler:action_messages",
"//cas/scheduler:platform_property_manager",
"//cas/scheduler:worker",
"//config",
Expand Down Expand Up @@ -128,13 +129,16 @@ rust_test(
srcs = ["tests/worker_api_server_test.rs"],
deps = [
"//cas/scheduler",
"//cas/scheduler:action_messages",
"//cas/scheduler:platform_property_manager",
"//cas/scheduler:worker",
"//config",
"//proto",
"//third_party:pretty_assertions",
"//third_party:tokio",
"//third_party:tokio_stream",
"//third_party:tonic",
"//util:common",
"//util:error",
":worker_api_server",
],
Expand Down
14 changes: 8 additions & 6 deletions cas/grpc_service/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio_stream::wrappers::WatchStream;
use tonic::{Request, Response, Status};

use ac_utils::get_and_decode_digest;
use action_messages::ActionInfo;
use action_messages::{ActionInfo, ActionInfoHashKey};
use common::{log, DigestInfo};
use config::cas_server::{ExecutionConfig, InstanceName};
use error::{make_input_err, Error, ResultExt};
Expand Down Expand Up @@ -79,17 +79,19 @@ impl InstanceInfo {

Ok(ActionInfo {
instance_name,
digest: action_digest,
command_digest,
input_root_digest,
timeout,
platform_properties: PlatformProperties::new(platform_properties),
priority,
insert_timestamp: SystemTime::now(),
salt: if action.do_not_cache {
thread_rng().gen::<u64>()
} else {
0
unique_qualifier: ActionInfoHashKey {
digest: action_digest,
salt: if action.do_not_cache {
thread_rng().gen::<u64>()
} else {
0
},
},
})
}
Expand Down
5 changes: 1 addition & 4 deletions cas/grpc_service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,7 @@ mod batch_read_blobs {
data: vec![].into(),
status: Some(GrpcStatus {
code: Code::NotFound as i32,
message: format!(
"Error: Error {{ code: NotFound, messages: [\"Hash {} not found\"] }}",
digest3.hash
),
message: format!("Hash {} not found", digest3.hash),
details: vec![],
}),
}
Expand Down
145 changes: 142 additions & 3 deletions cas/grpc_service/tests/worker_api_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use tokio_stream::StreamExt;
use tonic::Request;

use action_messages::{ActionInfo, ActionInfoHashKey, ActionStage};
use common::DigestInfo;
use config::cas_server::{SchedulerConfig, WorkerApiConfig};
use error::{Error, ResultExt};
use platform_property_manager::PlatformProperties;
use proto::build::bazel::remote::execution::v2::{
ActionResult as ProtoActionResult, ExecuteResponse, ExecutedActionMetadata, LogFile, NodeProperties,
OutputDirectory, OutputFile, OutputSymlink,
};
use proto::com::github::allada::turbo_cache::remote_execution::{
update_for_worker, worker_api_server::WorkerApi, KeepAliveRequest, SupportedProperties,
update_for_worker, worker_api_server::WorkerApi, ExecuteResult, KeepAliveRequest, SupportedProperties,
};
use proto::google::rpc::Status as ProtoStatus;
use scheduler::Scheduler;
use worker::WorkerId;
use worker_api_server::{ConnectWorkerStream, NowFn, WorkerApiServer};
Expand Down Expand Up @@ -215,7 +223,7 @@ pub mod going_away_tests {
use super::*;

#[tokio::test]
pub async fn going_away_removes_worker() -> Result<(), Box<dyn std::error::Error>> {
pub async fn going_away_removes_worker_test() -> Result<(), Box<dyn std::error::Error>> {
let test_context = setup_api_server(BASE_WORKER_TIMEOUT_S, Box::new(static_now_fn)).await?;

let worker_exists = test_context
Expand All @@ -235,3 +243,134 @@ pub mod going_away_tests {
Ok(())
}
}

#[cfg(test)]
pub mod execution_response_tests {
use super::*;
use pretty_assertions::assert_eq; // Must be declared in every module.

fn make_system_time(time: u64) -> SystemTime {
UNIX_EPOCH.checked_add(Duration::from_secs(time)).unwrap()
}

#[tokio::test]
pub async fn execution_response_success_test() -> Result<(), Box<dyn std::error::Error>> {
let test_context = setup_api_server(BASE_WORKER_TIMEOUT_S, Box::new(static_now_fn)).await?;

const SALT: u64 = 5;
let action_digest = DigestInfo::new([7u8; 32], 123);

let action_info = ActionInfo {
instance_name: "instance_name".to_string(),
command_digest: DigestInfo::new([0u8; 32], 0),
input_root_digest: DigestInfo::new([0u8; 32], 0),
timeout: Duration::MAX,
platform_properties: PlatformProperties {
properties: HashMap::new(),
},
priority: 0,
insert_timestamp: make_system_time(0),
unique_qualifier: ActionInfoHashKey {
digest: action_digest.clone(),
salt: SALT,
},
};
let mut client_action_state_receiver = test_context.scheduler.add_action(action_info).await?;

let mut server_logs = HashMap::new();
server_logs.insert(
"log_name".to_string(),
LogFile {
digest: Some(DigestInfo::new([9u8; 32], 124).clone().into()),
human_readable: false, // We only support non-human readable.
},
);
let execute_result = ExecuteResult {
worker_id: test_context.worker_id.to_string(),
action_digest: Some(action_digest.clone().into()),
salt: SALT,
execute_response: Some(ExecuteResponse {
result: Some(ProtoActionResult {
output_files: vec![OutputFile {
path: "some path1".to_string(),
digest: Some(DigestInfo::new([8u8; 32], 124).into()),
is_executable: true,
contents: Default::default(), // We don't implement this.
node_properties: Some(NodeProperties {
properties: Default::default(), // We don't implement this.
mtime: Some(make_system_time(99).into()),
unix_mode: Some(12),
}),
}],
output_file_symlinks: Default::default(), // Bazel deprecated this.
output_symlinks: vec![OutputSymlink {
path: "some path3".to_string(),
target: "some target3".to_string(),
node_properties: Some(NodeProperties {
properties: Default::default(), // We don't implement this.
mtime: Some(make_system_time(97).into()),
unix_mode: Some(10),
}),
}],
output_directories: vec![OutputDirectory {
path: "some path4".to_string(),
tree_digest: Some(DigestInfo::new([12u8; 32], 124).into()),
}],
output_directory_symlinks: Default::default(), // Bazel deprecated this.
exit_code: 5,
stdout_raw: Default::default(), // We don't implement this.
stdout_digest: Some(DigestInfo::new([10u8; 32], 124).into()),
stderr_raw: Default::default(), // We don't implement this.
stderr_digest: Some(DigestInfo::new([11u8; 32], 124).into()),
execution_metadata: Some(ExecutedActionMetadata {
worker: test_context.worker_id.to_string(),
queued_timestamp: Some(make_system_time(1).into()),
worker_start_timestamp: Some(make_system_time(2).into()),
worker_completed_timestamp: Some(make_system_time(3).into()),
input_fetch_start_timestamp: Some(make_system_time(4).into()),
input_fetch_completed_timestamp: Some(make_system_time(5).into()),
execution_start_timestamp: Some(make_system_time(6).into()),
execution_completed_timestamp: Some(make_system_time(7).into()),
output_upload_start_timestamp: Some(make_system_time(8).into()),
output_upload_completed_timestamp: Some(make_system_time(9).into()),
auxiliary_metadata: vec![],
}),
}),
cached_result: false,
status: Some(ProtoStatus {
code: 9,
message: "foo".to_string(),
details: Default::default(),
}),
server_logs: server_logs,
message: "TODO(blaise.bruer) We should put a reference something like bb_browser".to_string(),
}),
};
{
// Ensure our client thinks we are executing.
client_action_state_receiver.changed().await?;
let action_state = client_action_state_receiver.borrow();
assert_eq!(action_state.as_ref().stage, ActionStage::Executing);
}

// Now send the result of our execution to the scheduler.
test_context
.worker_api_server
.execution_response(Request::new(execute_result.clone()))
.await?;

{
// Check the result that the client would have received.
client_action_state_receiver.changed().await?;
let client_given_state = client_action_state_receiver.borrow();
let execute_response = execute_result.execute_response.unwrap();

assert_eq!(client_given_state.stage, execute_response.clone().try_into()?);

// We just checked if conversion from ExecuteResponse into ActionStage was an exact mach.
// Now check if we cast the ActionStage into an ExecuteResponse we get the exact same struct.
assert_eq!(execute_response, (&client_given_state.stage).into());
}
Ok(())
}
}
38 changes: 36 additions & 2 deletions cas/grpc_service/worker_api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use tokio::sync::mpsc;
use tonic::{Request, Response, Status};
use uuid::Uuid;

use action_messages::ActionInfoHashKey;
use common::log;
use common::DigestInfo;
use config::cas_server::WorkerApiConfig;
use error::{make_err, Code, Error, ResultExt};
use platform_property_manager::PlatformProperties;
Expand Down Expand Up @@ -130,6 +132,28 @@ impl WorkerApiServer {
self.scheduler.remove_worker(worker_id).await;
Ok(Response::new(()))
}

async fn inner_execution_response(&self, execute_result: ExecuteResult) -> Result<Response<()>, Error> {
let worker_id: WorkerId = execute_result.worker_id.try_into()?;
let action_digest: DigestInfo = execute_result
.action_digest
.err_tip(|| "Expected action_digest to exist")?
.try_into()?;
let action_info_hash_key = ActionInfoHashKey {
digest: action_digest.clone(),
salt: execute_result.salt,
};
let action_stage = execute_result
.execute_response
.err_tip(|| "Expected execute_response to exist in ExecuteResult")?
.try_into()
.err_tip(|| "Failed to convert ExecuteResponse into an ActionStage")?;
self.scheduler
.update_action(&worker_id, &action_info_hash_key, action_stage)
.await
.err_tip(|| format!("Failed to update_action {:?}", action_digest))?;
Ok(Response::new(()))
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -180,7 +204,17 @@ impl WorkerApi for WorkerApiServer {
return resp.map_err(|e| e.into());
}

async fn execution_response(&self, _grpc_request: Request<ExecuteResult>) -> Result<Response<()>, Status> {
unimplemented!();
async fn execution_response(&self, grpc_request: Request<ExecuteResult>) -> Result<Response<()>, Status> {
let now = Instant::now();
log::info!("\x1b[0;31mexecution_response Req\x1b[0m: {:?}", grpc_request.get_ref());
let execute_result = grpc_request.into_inner();
let resp = self.inner_execution_response(execute_result).await;
let d = now.elapsed().as_secs_f32();
if let Err(err) = resp.as_ref() {
log::error!("\x1b[0;31mexecution_response Resp\x1b[0m: {} {:?}", d, err);
} else {
log::info!("\x1b[0;31mexecution_response Resp\x1b[0m: {}", d);
}
return resp.map_err(|e| e.into());
}
}
1 change: 1 addition & 0 deletions cas/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ rust_library(
"//proto",
"//third_party:prost",
"//third_party:prost_types",
"//third_party:tonic",
"//util:common",
"//util:error",
":platform_property_manager",
Expand Down
Loading

0 comments on commit e26549a

Please sign in to comment.