Skip to content

Commit

Permalink
Remove unused operation wrapper (pantsbuild#7194)
Browse files Browse the repository at this point in the history
The tower migration made this obsolete
  • Loading branch information
illicitonion committed Feb 1, 2019
1 parent 884acc6 commit 9400024
Showing 1 changed file with 90 additions and 108 deletions.
198 changes: 90 additions & 108 deletions src/rust/engine/process_execution/src/remote.rs
Expand Up @@ -34,12 +34,6 @@ use tower_util::MakeService;
// CommandRunner. // CommandRunner.
const CACHE_KEY_GEN_VERSION_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_GEN_VERSION"; const CACHE_KEY_GEN_VERSION_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_GEN_VERSION";


#[derive(Debug)]
enum OperationOrStatus {
Operation(bazel_protos::google::longrunning::Operation),
Status(bazel_protos::google::rpc::Status),
}

type Connection = tower_http::add_origin::AddOrigin< type Connection = tower_http::add_origin::AddOrigin<
tower_h2::client::Connection<tokio::net::tcp::TcpStream, DefaultExecutor, tower_grpc::BoxBody>, tower_h2::client::Connection<tokio::net::tcp::TcpStream, DefaultExecutor, tower_grpc::BoxBody>,
>; >;
Expand Down Expand Up @@ -88,7 +82,7 @@ impl CommandRunner {
fn oneshot_execute( fn oneshot_execute(
&self, &self,
execute_request: bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest, execute_request: bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest,
) -> impl Future<Item = OperationOrStatus, Error = String> { ) -> impl Future<Item = bazel_protos::google::longrunning::Operation, Error = String> {
let command_runner = self.clone(); let command_runner = self.clone();
self self
.clients .clients
Expand All @@ -115,7 +109,6 @@ impl CommandRunner {
std::mem::drop(stream); std::mem::drop(stream);
resp.ok_or_else(|| "Didn't get response from remote process execution".to_owned()) resp.ok_or_else(|| "Didn't get response from remote process execution".to_owned())
}) })
.map(OperationOrStatus::Operation)
}) })
}) })
} }
Expand Down Expand Up @@ -281,7 +274,6 @@ impl super::CommandRunner for CommandRunner {
}) })
.map_err(towergrpcerror_to_string) .map_err(towergrpcerror_to_string)
}) })
.map(OperationOrStatus::Operation)
.map(move |operation| { .map(move |operation| {
future::Loop::Continue((history, operation, iter_num + 1)) future::Loop::Continue((history, operation, iter_num + 1))
}) })
Expand Down Expand Up @@ -419,108 +411,99 @@ impl CommandRunner {


fn extract_execute_response( fn extract_execute_response(
&self, &self,
operation_or_status: OperationOrStatus, operation: bazel_protos::google::longrunning::Operation,
attempts: &mut ExecutionHistory, attempts: &mut ExecutionHistory,
) -> BoxFuture<FallibleExecuteProcessResult, ExecutionError> { ) -> BoxFuture<FallibleExecuteProcessResult, ExecutionError> {
trace!("Got operation response: {:?}", operation_or_status); trace!("Got operation response: {:?}", operation);


let status = match operation_or_status { if !operation.done {
OperationOrStatus::Operation(operation) => { return future::err(ExecutionError::NotFinished(operation.name)).to_boxed();
if !operation.done { }
return future::err(ExecutionError::NotFinished(operation.name)).to_boxed(); let execute_response = if let Some(result) = operation.result {
} match result {
let execute_response = if let Some(result) = operation.result { bazel_protos::google::longrunning::operation::Result::Error(ref status) => {
match result { return future::err(ExecutionError::Fatal(format_error(status))).to_boxed();
bazel_protos::google::longrunning::operation::Result::Error(ref status) => {
return future::err(ExecutionError::Fatal(format_error(status))).to_boxed();
}
bazel_protos::google::longrunning::operation::Result::Response(ref any) => try_future!(
bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse::decode(
&any.value
)
.map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e)))
),
}
} else {
return future::err(ExecutionError::Fatal(
"Operation finished but no response supplied".to_string(),
))
.to_boxed();
};

trace!("Got (nested) execute response: {:?}", execute_response);

if let Some(ref result) = execute_response.result {
if let Some(ref metadata) = result.execution_metadata {
let enqueued = timespec_from(&metadata.queued_timestamp);
let worker_start = timespec_from(&metadata.worker_start_timestamp);
let input_fetch_start = timespec_from(&metadata.input_fetch_start_timestamp);
let input_fetch_completed = timespec_from(&metadata.input_fetch_completed_timestamp);
let execution_start = timespec_from(&metadata.execution_start_timestamp);
let execution_completed = timespec_from(&metadata.execution_completed_timestamp);
let output_upload_start = timespec_from(&metadata.output_upload_start_timestamp);
let output_upload_completed =
timespec_from(&metadata.output_upload_completed_timestamp);

match (worker_start - enqueued).to_std() {
Ok(duration) => attempts.current_attempt.remote_queue = Some(duration),
Err(err) => warn!("Got negative remote queue time: {}", err),
}
match (input_fetch_completed - input_fetch_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration),
Err(err) => warn!("Got negative remote input fetch time: {}", err),
}
match (execution_completed - execution_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_execution = Some(duration),
Err(err) => warn!("Got negative remote execution time: {}", err),
}
match (output_upload_completed - output_upload_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration),
Err(err) => warn!("Got negative remote output store time: {}", err),
}
attempts.current_attempt.was_cache_hit = execute_response.cached_result;
}
} }
bazel_protos::google::longrunning::operation::Result::Response(ref any) => try_future!(
bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse::decode(&any.value)
.map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e)))
),
}
} else {
return future::err(ExecutionError::Fatal(
"Operation finished but no response supplied".to_string(),
))
.to_boxed();
};


let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]); trace!("Got (nested) execute response: {:?}", execute_response);
execution_attempts.push(attempts.current_attempt);

if let Some(ref result) = execute_response.result {
let maybe_result = execute_response.result; if let Some(ref metadata) = result.execution_metadata {

let enqueued = timespec_from(&metadata.queued_timestamp);
let status = execute_response let worker_start = timespec_from(&metadata.worker_start_timestamp);
.status let input_fetch_start = timespec_from(&metadata.input_fetch_start_timestamp);
.unwrap_or_else(|| bazel_protos::google::rpc::Status { let input_fetch_completed = timespec_from(&metadata.input_fetch_completed_timestamp);
code: bazel_protos::google::rpc::Code::Ok.into(), let execution_start = timespec_from(&metadata.execution_start_timestamp);
message: String::new(), let execution_completed = timespec_from(&metadata.execution_completed_timestamp);
details: vec![], let output_upload_start = timespec_from(&metadata.output_upload_start_timestamp);
}); let output_upload_completed = timespec_from(&metadata.output_upload_completed_timestamp);
if status.code == bazel_protos::google::rpc::Code::Ok.into() {
if let Some(result) = maybe_result { match (worker_start - enqueued).to_std() {
return self Ok(duration) => attempts.current_attempt.remote_queue = Some(duration),
.extract_stdout(&result) Err(err) => warn!("Got negative remote queue time: {}", err),
.join(self.extract_stderr(&result)) }
.join(self.extract_output_files(&result)) match (input_fetch_completed - input_fetch_start).to_std() {
.and_then(move |((stdout, stderr), output_directory)| { Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration),
Ok(FallibleExecuteProcessResult { Err(err) => warn!("Got negative remote input fetch time: {}", err),
stdout: stdout,
stderr: stderr,
exit_code: result.exit_code,
output_directory: output_directory,
execution_attempts: execution_attempts,
})
})
.to_boxed();
} else {
return futures::future::err(ExecutionError::Fatal(
"No result found on ExecuteResponse".to_owned(),
))
.to_boxed();
}
} }
status match (execution_completed - execution_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_execution = Some(duration),
Err(err) => warn!("Got negative remote execution time: {}", err),
}
match (output_upload_completed - output_upload_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration),
Err(err) => warn!("Got negative remote output store time: {}", err),
}
attempts.current_attempt.was_cache_hit = execute_response.cached_result;
} }
OperationOrStatus::Status(status) => status, }
};
let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]);
execution_attempts.push(attempts.current_attempt);

let maybe_result = execute_response.result;

let status = execute_response
.status
.unwrap_or_else(|| bazel_protos::google::rpc::Status {
code: bazel_protos::google::rpc::Code::Ok.into(),
message: String::new(),
details: vec![],
});
if status.code == bazel_protos::google::rpc::Code::Ok.into() {
if let Some(result) = maybe_result {
return self
.extract_stdout(&result)
.join(self.extract_stderr(&result))
.join(self.extract_output_files(&result))
.and_then(move |((stdout, stderr), output_directory)| {
Ok(FallibleExecuteProcessResult {
stdout: stdout,
stderr: stderr,
exit_code: result.exit_code,
output_directory: output_directory,
execution_attempts: execution_attempts,
})
})
.to_boxed();
} else {
return futures::future::err(ExecutionError::Fatal(
"No result found on ExecuteResponse".to_owned(),
))
.to_boxed();
}
}


match bazel_protos::code_from_i32(status.code) { match bazel_protos::code_from_i32(status.code) {
bazel_protos::google::rpc::Code::Ok => unreachable!(), bazel_protos::google::rpc::Code::Ok => unreachable!(),
Expand Down Expand Up @@ -2625,10 +2608,9 @@ mod tests {
.build(); .build();
let mut runtime = tokio::runtime::Runtime::new().unwrap(); let mut runtime = tokio::runtime::Runtime::new().unwrap();
let command_runner = create_command_runner("127.0.0.1:0".to_owned(), &cas); let command_runner = create_command_runner("127.0.0.1:0".to_owned(), &cas);
let result = runtime.block_on(command_runner.extract_execute_response( let result = runtime.block_on(
super::OperationOrStatus::Operation(operation), command_runner.extract_execute_response(operation, &mut ExecutionHistory::default()),
&mut ExecutionHistory::default(), );
));


runtime.shutdown_now().wait().unwrap(); runtime.shutdown_now().wait().unwrap();
result result
Expand Down

0 comments on commit 9400024

Please sign in to comment.