Skip to content

Commit

Permalink
Suppress last-ditch download exceptions w/cleanup
Browse files Browse the repository at this point in the history
Create an encapsulating DownloadException to represent the aggregate
exception set of an ActionResult download. IOExceptions will be retained
through exception suppression, and the outer exception has a property to
indicate if it only represents a sequence of CacheNotFoundExceptions.

InterruptedExceptions interception is improved to cancel pending work
and wrap, through suppression, any DownloadException that also occurred
during the download. InterruptedException being thrown on the download
control thread, it does not require suppression of further interrupts,
and can represent an outer download exception. Thread interrupt status
is suppressed for cancellations, and conveyed on throw.

These exception wrapping efforts allow non-asynchronous frame
representation in stack traces, and much clearer identification of
sources within remote strategy execution which produce failures based on
remote errors.

Any DownloadException in the last-ditch output download under
handleError in RemoteSpawnRunner is added as suppressed to the
initiating exception. Other exceptions (likely local IO) present clear
immediate traces and do not require specialized treatment.

Closes #10029.

PiperOrigin-RevId: 306619678
  • Loading branch information
George Gensure authored and laurentlb committed Apr 17, 2020
1 parent 17ebbf1 commit 71fb56b
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 126 deletions.
@@ -0,0 +1,49 @@
// Copyright 2020 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;

import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import java.io.IOException;

/**
* Exception which represents a collection of IOExceptions for the purpose of distinguishing remote
* communication exceptions from those which occur on filesystems locally. This exception serves as
* a trace point for the actual transfer, so that the intended operation can be observed in a stack,
* with all constituent exceptions available for observation.
*/
class BulkTransferException extends IOException {
// true since no empty BulkTransferException is ever thrown
private boolean allCacheNotFoundException = true;

BulkTransferException() {}

BulkTransferException(IOException e) {
add(e);
}

/**
* Add an IOException to the suppressed list.
*
* <p>The Java standard addSuppressed is final and this method stands in its place to selectively
* filter and record whether all suppressed exceptions are CacheNotFoundExceptions
*/
void add(IOException e) {
allCacheNotFoundException &= e instanceof CacheNotFoundException;
super.addSuppressed(e);
}

boolean onlyCausedByCacheNotFoundException() {
return allCacheNotFoundException;
}
}
148 changes: 68 additions & 80 deletions src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
Expand Up @@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;

import build.bazel.remote.execution.v2.Action;
import build.bazel.remote.execution.v2.ActionResult;
Expand All @@ -27,9 +28,7 @@
import build.bazel.remote.execution.v2.OutputSymlink;
import build.bazel.remote.execution.v2.SymlinkNode;
import build.bazel.remote.execution.v2.Tree;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -57,7 +56,6 @@
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.remote.util.Utils.InMemoryOutput;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.util.io.OutErr;
Expand All @@ -80,7 +78,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -116,7 +113,7 @@ public RemoteCache(

public ActionResult downloadActionResult(ActionKey actionKey, boolean inlineOutErr)
throws IOException, InterruptedException {
return Utils.getFromFuture(cacheProtocol.downloadActionResult(actionKey, inlineOutErr));
return getFromFuture(cacheProtocol.downloadActionResult(actionKey, inlineOutErr));
}

/**
Expand Down Expand Up @@ -181,8 +178,7 @@ private void uploadOutputs(
digests.addAll(digestToFile.keySet());
digests.addAll(digestToBlobs.keySet());

ImmutableSet<Digest> digestsToUpload =
Utils.getFromFuture(cacheProtocol.findMissingDigests(digests));
ImmutableSet<Digest> digestsToUpload = getFromFuture(cacheProtocol.findMissingDigests(digests));
ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
for (Digest digest : digestsToUpload) {
Path file = digestToFile.get(digest);
Expand All @@ -198,7 +194,7 @@ private void uploadOutputs(
}
}

waitForUploads(uploads.build());
waitForBulkTransfer(uploads.build(), /* cancelRemainingOnInterrupt=*/ false);

if (manifest.getStderrDigest() != null) {
result.setStderrDigest(manifest.getStderrDigest());
Expand All @@ -208,22 +204,45 @@ private void uploadOutputs(
}
}

private static void waitForUploads(List<ListenableFuture<Void>> uploads)
throws IOException, InterruptedException {
try {
for (ListenableFuture<Void> upload : uploads) {
upload.get();
protected static <T> void waitForBulkTransfer(
Iterable<ListenableFuture<T>> transfers, boolean cancelRemainingOnInterrupt)
throws BulkTransferException, InterruptedException {
BulkTransferException bulkTransferException = null;
InterruptedException interruptedException = null;
boolean interrupted = Thread.currentThread().isInterrupted();
for (ListenableFuture<T> transfer : transfers) {
try {
if (interruptedException == null) {
// Wait for all downloads to finish.
getFromFuture(transfer);
} else {
transfer.cancel(true);
}
} catch (IOException e) {
if (bulkTransferException == null) {
bulkTransferException = new BulkTransferException();
}
bulkTransferException.add(e);
} catch (InterruptedException e) {
interrupted = Thread.interrupted() || interrupted;
interruptedException = e;
if (!cancelRemainingOnInterrupt) {
// leave the rest of the transfers alone
break;
}
}
} catch (ExecutionException e) {
// TODO(buchgr): Add support for cancellation and factor this method out to be shared
// between ByteStreamUploader as well.
Throwable cause = e.getCause();
Throwables.throwIfInstanceOf(cause, IOException.class);
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
if (cause != null) {
throw new IOException(cause);
}
if (interrupted) {
Thread.currentThread().interrupt();
}
if (interruptedException != null) {
if (bulkTransferException != null) {
interruptedException.addSuppressed(bulkTransferException);
}
throw new IOException(e);
throw interruptedException;
}
if (bulkTransferException != null) {
throw bulkTransferException;
}
}

Expand Down Expand Up @@ -299,40 +318,16 @@ public void download(
// Subsequently we need to wait for *every* download to finish, even if we already know that
// one failed. That's so that when exiting this method we can be sure that all downloads have
// finished and don't race with the cleanup routine.
// TODO(buchgr): Look into cancellation.

IOException downloadException = null;
InterruptedException interruptedException = null;
FileOutErr tmpOutErr = null;
try {
if (origOutErr != null) {
tmpOutErr = origOutErr.childOutErr();
}
downloads.addAll(downloadOutErr(result, tmpOutErr));
} catch (IOException e) {
downloadException = e;
}

for (ListenableFuture<FileMetadata> download : downloads) {
try {
// Wait for all downloads to finish.
getFromFuture(download);
} catch (IOException e) {
if (downloadException == null) {
downloadException = e;
} else if (e != downloadException) {
downloadException.addSuppressed(e);
}
} catch (InterruptedException e) {
if (interruptedException == null) {
interruptedException = e;
} else if (e != interruptedException) {
interruptedException.addSuppressed(e);
}
}
if (origOutErr != null) {
tmpOutErr = origOutErr.childOutErr();
}
downloads.addAll(downloadOutErr(result, tmpOutErr));

if (downloadException != null || interruptedException != null) {
try {
waitForBulkTransfer(downloads, /* cancelRemainingOnInterrupt=*/ true);
} catch (Exception e) {
try {
// Delete any (partially) downloaded output files.
for (OutputFile file : result.getOutputFilesList()) {
Expand All @@ -347,27 +342,18 @@ public void download(
tmpOutErr.clearOut();
tmpOutErr.clearErr();
}
} catch (IOException e) {
if (downloadException != null && e != downloadException) {
e.addSuppressed(downloadException);
}
if (interruptedException != null) {
e.addSuppressed(interruptedException);
}
} catch (IOException ioEx) {
ioEx.addSuppressed(e);

// If deleting of output files failed, we abort the build with a decent error message as
// any subsequent local execution failure would likely be incomprehensible.
throw new EnvironmentalExecException(
"Failed to delete output files after incomplete download", e);
ExecException execEx =
new EnvironmentalExecException(
"Failed to delete output files after incomplete download", ioEx);
execEx.addSuppressed(e);
throw execEx;
}
}

if (interruptedException != null) {
throw interruptedException;
}

if (downloadException != null) {
throw downloadException;
throw e;
}

if (tmpOutErr != null) {
Expand Down Expand Up @@ -487,12 +473,15 @@ public void onFailure(Throwable t) {
return outerF;
}

private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result, OutErr outErr)
throws IOException {
private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result, OutErr outErr) {
List<ListenableFuture<FileMetadata>> downloads = new ArrayList<>();
if (!result.getStdoutRaw().isEmpty()) {
result.getStdoutRaw().writeTo(outErr.getOutputStream());
outErr.getOutputStream().flush();
try {
result.getStdoutRaw().writeTo(outErr.getOutputStream());
outErr.getOutputStream().flush();
} catch (IOException e) {
downloads.add(Futures.immediateFailedFuture(e));
}
} else if (result.hasStdoutDigest()) {
downloads.add(
Futures.transform(
Expand All @@ -501,8 +490,12 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
directExecutor()));
}
if (!result.getStderrRaw().isEmpty()) {
result.getStderrRaw().writeTo(outErr.getErrorStream());
outErr.getErrorStream().flush();
try {
result.getStderrRaw().writeTo(outErr.getErrorStream());
outErr.getErrorStream().flush();
} catch (IOException e) {
downloads.add(Futures.immediateFailedFuture(e));
}
} else if (result.hasStderrDigest()) {
downloads.add(
Futures.transform(
Expand Down Expand Up @@ -1115,9 +1108,4 @@ public Collection<SymlinkMetadata> symlinks() {
return symlinks.values();
}
}

@VisibleForTesting
protected <T> T getFromFuture(ListenableFuture<T> f) throws IOException, InterruptedException {
return Utils.getFromFuture(f);
}
}
Expand Up @@ -13,11 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.lang.String.format;

import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -26,7 +26,6 @@
import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
Expand All @@ -35,7 +34,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {
Expand All @@ -57,21 +55,7 @@ private void uploadMissing(Map<Digest, Path> files, Map<Digest, ByteString> blob
uploads.add(cacheProtocol.uploadBlob(entry.getKey(), entry.getValue()));
}

try {
for (ListenableFuture<Void> upload : uploads) {
upload.get();
}
} catch (ExecutionException e) {
// Cancel remaining uploads.
for (ListenableFuture<Void> upload : uploads) {
upload.cancel(/* mayInterruptIfRunning= */ true);
}

Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause, IOException.class);
Throwables.propagateIfPossible(cause, InterruptedException.class);
throw new IOException(cause);
}
waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt=*/ false);
}

/**
Expand All @@ -91,7 +75,7 @@ public void ensureInputsPresent(MerkleTree merkleTree, Map<Digest, Message> addi
Iterable<Digest> allDigests =
Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
ImmutableSet<Digest> missingDigests =
Utils.getFromFuture(cacheProtocol.findMissingDigests(allDigests));
getFromFuture(cacheProtocol.findMissingDigests(allDigests));
Map<Digest, Path> filesToUpload = new HashMap<>();
Map<Digest, ByteString> blobsToUpload = new HashMap<>();
for (Digest missingDigest : missingDigests) {
Expand Down

0 comments on commit 71fb56b

Please sign in to comment.