Skip to content

Commit

Permalink
Fixing #4585: broken re-execution of orphaned actions.
Browse files Browse the repository at this point in the history
This is an important regression, we will want to patch the fix into 0.10

TESTED=fixed unit test, with A/B testing
RELNOTES: Resolved an issue where a failure in the remote cache would not trigger local re-execution of an action.
PiperOrigin-RevId: 184991670
  • Loading branch information
olaola authored and Copybara-Service committed Feb 8, 2018
1 parent cfdeb4d commit 56aeb04
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,6 @@ public void ensureInputsPresent(
* allow throwing such an exception. Any caller must make sure to catch the
* {@link StatusRuntimeException}. Note that the retrier implicitly catches it, so if this is used
* in the context of {@link RemoteRetrier#execute}, that's perfectly safe.
*
* <p>This method also converts any NOT_FOUND code returned from the server into a
* {@link CacheNotFoundException}. TODO(olaola): this is not enough. NOT_FOUND can also be raised
* by execute, in which case the server should return the missing digest in the Status.details
* field. This should be part of the API.
*/
private void readBlob(Digest digest, OutputStream stream)
throws IOException, StatusRuntimeException {
Expand All @@ -191,42 +186,49 @@ private void readBlob(Digest digest, OutputStream stream)
resourceName += options.remoteInstanceName + "/";
}
resourceName += "blobs/" + digest.getHash() + "/" + digest.getSizeBytes();
try {
Iterator<ReadResponse> replies = bsBlockingStub()
.read(ReadRequest.newBuilder().setResourceName(resourceName).build());
while (replies.hasNext()) {
replies.next().getData().writeTo(stream);
}
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
throw new CacheNotFoundException(digest);
}
throw e;
Iterator<ReadResponse> replies = bsBlockingStub()
.read(ReadRequest.newBuilder().setResourceName(resourceName).build());
while (replies.hasNext()) {
replies.next().getData().writeTo(stream);
}
}

@Override
protected void downloadBlob(Digest digest, Path dest) throws IOException, InterruptedException {
retrier.execute(
() -> {
try (OutputStream stream = dest.getOutputStream()) {
readBlob(digest, stream);
}
return null;
});
try {
retrier.execute(
() -> {
try (OutputStream stream = dest.getOutputStream()) {
readBlob(digest, stream);
}
return null;
});
} catch (RetryException e) {
if (RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND)) {
throw new CacheNotFoundException(digest);
}
throw e;
}
}

@Override
protected byte[] downloadBlob(Digest digest) throws IOException, InterruptedException {
if (digest.getSizeBytes() == 0) {
return new byte[0];
}
return retrier.execute(
() -> {
ByteArrayOutputStream stream = new ByteArrayOutputStream((int) digest.getSizeBytes());
readBlob(digest, stream);
return stream.toByteArray();
});
try {
return retrier.execute(
() -> {
ByteArrayOutputStream stream = new ByteArrayOutputStream((int) digest.getSizeBytes());
readBlob(digest, stream);
return stream.toByteArray();
});
} catch (RetryException e) {
if (RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND)) {
throw new CacheNotFoundException(digest);
}
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ private SpawnResult handleError(IOException exception, FileOutErr outErr) throws
if (exception instanceof RetryException
&& RemoteRetrierUtils.causedByStatus((RetryException) exception, Code.UNAVAILABLE)) {
status = Status.EXECUTION_FAILED_CATASTROPHICALLY;
} else if (cause instanceof CacheNotFoundException) {
} else if (exception instanceof CacheNotFoundException
|| cause instanceof CacheNotFoundException) {
status = Status.REMOTE_CACHE_FAILED;
} else {
status = Status.EXECUTION_FAILED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ public PathFragment getExecPath() {
@After
public void tearDown() throws Exception {
fakeServer.shutdownNow();
fakeServer.awaitTermination();
}

@Test
Expand Down Expand Up @@ -791,6 +792,66 @@ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObser
}
}

@Test
public void passRepeatedOrphanedCacheMissErrorWithStackTrace() throws Exception {
final Digest stdOutDigest = DIGEST_UTIL.computeAsUtf8("bloo");
final ActionResult actionResult =
ActionResult.newBuilder().setStdoutDigest(stdOutDigest).build();
serviceRegistry.addService(
new ActionCacheImplBase() {
@Override
public void getActionResult(
GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
responseObserver.onNext(actionResult);
responseObserver.onCompleted();
}
});
serviceRegistry.addService(
new ExecutionImplBase() {
@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
responseObserver.onNext(
Operation.newBuilder()
.setDone(true)
.setResponse(
Any.pack(ExecuteResponse.newBuilder().setResult(actionResult).build()))
.build());
responseObserver.onCompleted();
}
});
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
responseObserver.onCompleted();
}
});
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
assertThat(request.getResourceName().contains(stdOutDigest.getHash())).isTrue();
responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
}
});

try {
client.exec(simpleSpawn, simplePolicy);
fail("Expected an exception");
} catch (SpawnExecException expected) {
assertThat(expected.getSpawnResult().status())
.isEqualTo(SpawnResult.Status.REMOTE_CACHE_FAILED);
assertThat(expected).hasMessageThat().contains(stdOutDigest.getHash());
// Ensure we also got back the stack trace.
assertThat(expected)
.hasMessageThat()
.contains("passRepeatedOrphanedCacheMissErrorWithStackTrace");
}
}

@Test
public void remotelyReExecuteOrphanedCachedActions() throws Exception {
final Digest stdOutDigest = DIGEST_UTIL.computeAsUtf8("stdout");
Expand All @@ -807,10 +868,19 @@ public void getActionResult(
});
serviceRegistry.addService(
new ByteStreamImplBase() {
private boolean first = true;

@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
// All reads are a cache miss.
responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
// First read is a cache miss, next read succeeds.
if (first) {
first = false;
responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
} else {
responseObserver.onNext(
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("stdout")).build());
responseObserver.onCompleted();
}
}

@Override
Expand Down Expand Up @@ -858,14 +928,9 @@ public void findMissingBlobs(
}
});

try {
client.exec(simpleSpawn, simplePolicy);
fail("Expected an exception");
} catch (ExecException expected) {
assertThat(expected).hasMessageThat().contains("Missing digest");
assertThat(expected)
.hasMessageThat()
.contains(DIGEST_UTIL.computeAsUtf8("stdout").toString());
}
SpawnResult result = client.exec(simpleSpawn, simplePolicy);
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
}
}

0 comments on commit 56aeb04

Please sign in to comment.