diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 930c6adb79a..b0272cc5828 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -34,12 +34,6 @@ use tower_util::MakeService; // CommandRunner. const CACHE_KEY_GEN_VERSION_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_GEN_VERSION"; -#[derive(Debug)] -enum OperationOrStatus { - Operation(bazel_protos::google::longrunning::Operation), - Status(bazel_protos::google::rpc::Status), -} - type Connection = tower_http::add_origin::AddOrigin< tower_h2::client::Connection, >; @@ -88,7 +82,7 @@ impl CommandRunner { fn oneshot_execute( &self, execute_request: bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest, - ) -> impl Future { + ) -> impl Future { let command_runner = self.clone(); self .clients @@ -115,7 +109,6 @@ impl CommandRunner { std::mem::drop(stream); resp.ok_or_else(|| "Didn't get response from remote process execution".to_owned()) }) - .map(OperationOrStatus::Operation) }) }) } @@ -281,7 +274,6 @@ impl super::CommandRunner for CommandRunner { }) .map_err(towergrpcerror_to_string) }) - .map(OperationOrStatus::Operation) .map(move |operation| { future::Loop::Continue((history, operation, iter_num + 1)) }) @@ -419,108 +411,99 @@ impl CommandRunner { fn extract_execute_response( &self, - operation_or_status: OperationOrStatus, + operation: bazel_protos::google::longrunning::Operation, attempts: &mut ExecutionHistory, ) -> BoxFuture { - trace!("Got operation response: {:?}", operation_or_status); + trace!("Got operation response: {:?}", operation); - let status = match operation_or_status { - OperationOrStatus::Operation(operation) => { - if !operation.done { - return future::err(ExecutionError::NotFinished(operation.name)).to_boxed(); - } - let execute_response = if let Some(result) = operation.result { - match result { - bazel_protos::google::longrunning::operation::Result::Error(ref status) => { - return future::err(ExecutionError::Fatal(format_error(status))).to_boxed(); - } - bazel_protos::google::longrunning::operation::Result::Response(ref any) => try_future!( - bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse::decode( - &any.value - ) - .map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e))) - ), - } - } else { - return future::err(ExecutionError::Fatal( - "Operation finished but no response supplied".to_string(), - )) - .to_boxed(); - }; - - trace!("Got (nested) execute response: {:?}", execute_response); - - if let Some(ref result) = execute_response.result { - if let Some(ref metadata) = result.execution_metadata { - let enqueued = timespec_from(&metadata.queued_timestamp); - let worker_start = timespec_from(&metadata.worker_start_timestamp); - let input_fetch_start = timespec_from(&metadata.input_fetch_start_timestamp); - let input_fetch_completed = timespec_from(&metadata.input_fetch_completed_timestamp); - let execution_start = timespec_from(&metadata.execution_start_timestamp); - let execution_completed = timespec_from(&metadata.execution_completed_timestamp); - let output_upload_start = timespec_from(&metadata.output_upload_start_timestamp); - let output_upload_completed = - timespec_from(&metadata.output_upload_completed_timestamp); - - match (worker_start - enqueued).to_std() { - Ok(duration) => attempts.current_attempt.remote_queue = Some(duration), - Err(err) => warn!("Got negative remote queue time: {}", err), - } - match (input_fetch_completed - input_fetch_start).to_std() { - Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration), - Err(err) => warn!("Got negative remote input fetch time: {}", err), - } - match (execution_completed - execution_start).to_std() { - Ok(duration) => attempts.current_attempt.remote_execution = Some(duration), - Err(err) => warn!("Got negative remote execution time: {}", err), - } - match (output_upload_completed - output_upload_start).to_std() { - Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration), - Err(err) => warn!("Got negative remote output store time: {}", err), - } - attempts.current_attempt.was_cache_hit = execute_response.cached_result; - } + if !operation.done { + return future::err(ExecutionError::NotFinished(operation.name)).to_boxed(); + } + let execute_response = if let Some(result) = operation.result { + match result { + bazel_protos::google::longrunning::operation::Result::Error(ref status) => { + return future::err(ExecutionError::Fatal(format_error(status))).to_boxed(); } + bazel_protos::google::longrunning::operation::Result::Response(ref any) => try_future!( + bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse::decode(&any.value) + .map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e))) + ), + } + } else { + return future::err(ExecutionError::Fatal( + "Operation finished but no response supplied".to_string(), + )) + .to_boxed(); + }; - let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]); - execution_attempts.push(attempts.current_attempt); - - let maybe_result = execute_response.result; - - let status = execute_response - .status - .unwrap_or_else(|| bazel_protos::google::rpc::Status { - code: bazel_protos::google::rpc::Code::Ok.into(), - message: String::new(), - details: vec![], - }); - if status.code == bazel_protos::google::rpc::Code::Ok.into() { - if let Some(result) = maybe_result { - return self - .extract_stdout(&result) - .join(self.extract_stderr(&result)) - .join(self.extract_output_files(&result)) - .and_then(move |((stdout, stderr), output_directory)| { - Ok(FallibleExecuteProcessResult { - stdout: stdout, - stderr: stderr, - exit_code: result.exit_code, - output_directory: output_directory, - execution_attempts: execution_attempts, - }) - }) - .to_boxed(); - } else { - return futures::future::err(ExecutionError::Fatal( - "No result found on ExecuteResponse".to_owned(), - )) - .to_boxed(); - } + trace!("Got (nested) execute response: {:?}", execute_response); + + if let Some(ref result) = execute_response.result { + if let Some(ref metadata) = result.execution_metadata { + let enqueued = timespec_from(&metadata.queued_timestamp); + let worker_start = timespec_from(&metadata.worker_start_timestamp); + let input_fetch_start = timespec_from(&metadata.input_fetch_start_timestamp); + let input_fetch_completed = timespec_from(&metadata.input_fetch_completed_timestamp); + let execution_start = timespec_from(&metadata.execution_start_timestamp); + let execution_completed = timespec_from(&metadata.execution_completed_timestamp); + let output_upload_start = timespec_from(&metadata.output_upload_start_timestamp); + let output_upload_completed = timespec_from(&metadata.output_upload_completed_timestamp); + + match (worker_start - enqueued).to_std() { + Ok(duration) => attempts.current_attempt.remote_queue = Some(duration), + Err(err) => warn!("Got negative remote queue time: {}", err), + } + match (input_fetch_completed - input_fetch_start).to_std() { + Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration), + Err(err) => warn!("Got negative remote input fetch time: {}", err), } - status + match (execution_completed - execution_start).to_std() { + Ok(duration) => attempts.current_attempt.remote_execution = Some(duration), + Err(err) => warn!("Got negative remote execution time: {}", err), + } + match (output_upload_completed - output_upload_start).to_std() { + Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration), + Err(err) => warn!("Got negative remote output store time: {}", err), + } + attempts.current_attempt.was_cache_hit = execute_response.cached_result; } - OperationOrStatus::Status(status) => status, - }; + } + + let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]); + execution_attempts.push(attempts.current_attempt); + + let maybe_result = execute_response.result; + + let status = execute_response + .status + .unwrap_or_else(|| bazel_protos::google::rpc::Status { + code: bazel_protos::google::rpc::Code::Ok.into(), + message: String::new(), + details: vec![], + }); + if status.code == bazel_protos::google::rpc::Code::Ok.into() { + if let Some(result) = maybe_result { + return self + .extract_stdout(&result) + .join(self.extract_stderr(&result)) + .join(self.extract_output_files(&result)) + .and_then(move |((stdout, stderr), output_directory)| { + Ok(FallibleExecuteProcessResult { + stdout: stdout, + stderr: stderr, + exit_code: result.exit_code, + output_directory: output_directory, + execution_attempts: execution_attempts, + }) + }) + .to_boxed(); + } else { + return futures::future::err(ExecutionError::Fatal( + "No result found on ExecuteResponse".to_owned(), + )) + .to_boxed(); + } + } match bazel_protos::code_from_i32(status.code) { bazel_protos::google::rpc::Code::Ok => unreachable!(), @@ -2625,10 +2608,9 @@ mod tests { .build(); let mut runtime = tokio::runtime::Runtime::new().unwrap(); let command_runner = create_command_runner("127.0.0.1:0".to_owned(), &cas); - let result = runtime.block_on(command_runner.extract_execute_response( - super::OperationOrStatus::Operation(operation), - &mut ExecutionHistory::default(), - )); + let result = runtime.block_on( + command_runner.extract_execute_response(operation, &mut ExecutionHistory::default()), + ); runtime.shutdown_now().wait().unwrap(); result