Skip to content

Commit

Permalink
remote: Fix a bug where local executed results would not be uploaded. F…
Browse files Browse the repository at this point in the history
…ixes #3379

Commit dc24004 broke the upload of locally executed action
results. Also, update our unit tests who did not catch this error.

P.S.: olaola@ is the author of this change, but due to time constraints
we had to merge it while she was asleep.

Change-Id: Ib150152c0bddc8311908c105aef208506d3b6a8d
PiperOrigin-RevId: 161954553
  • Loading branch information
Ola Rozenfeld authored and laszlocsomor committed Jul 14, 2017
1 parent 10f147e commit 688dbf7
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -320,9 +322,7 @@ public void upload(ActionKey actionKey, Path execRoot, Collection<Path> files, F

void upload(Path execRoot, Collection<Path> files, FileOutErr outErr, ActionResult.Builder result)
throws IOException, InterruptedException {
ArrayList<Digest> digests = new ArrayList<>();
ImmutableSet<Digest> digestsToUpload = getMissingDigests(digests);
List<Chunker> filesToUpload = new ArrayList<>(digestsToUpload.size());
Map<Digest, Path> digestToFile = new HashMap<>();
for (Path file : files) {
if (!file.exists()) {
// We ignore requested results that have not been generated by the action.
Expand All @@ -335,28 +335,29 @@ void upload(Path execRoot, Collection<Path> files, FileOutErr outErr, ActionResu
}

Digest digest = Digests.computeDigest(file);
digests.add(digest);
// TODO(olaola): inline small results here.
result
.addOutputFilesBuilder()
.setPath(file.relativeTo(execRoot).getPathString())
.setDigest(digest)
.setIsExecutable(file.isExecutable());
digestToFile.put(digest, file);
}

if (digestsToUpload.contains(digest)) {
Chunker chunker = new Chunker(file);
filesToUpload.add(chunker);
ImmutableSet<Digest> digestsToUpload = getMissingDigests(digestToFile.keySet());
List<Chunker> filesToUpload = new ArrayList<>();
for (Digest digest : digestsToUpload) {
Path file = digestToFile.get(digest);
if (file == null) {
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
throw new IOException(message);
}
filesToUpload.add(new Chunker(file));
}

if (!filesToUpload.isEmpty()) {
uploader.uploadBlobs(filesToUpload);
}

int index = 0;
for (Path file : files) {
// Add to protobuf.
// TODO(olaola): inline small results here.
result
.addOutputFilesBuilder()
.setPath(file.relativeTo(execRoot).getPathString())
.setDigest(digests.get(index++))
.setIsExecutable(file.isExecutable());
}
// TODO(olaola): inline small stdout/stderr here.
if (outErr.getErrorPath().exists()) {
Digest stderr = uploadFileContents(outErr.getErrorPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,30 +347,6 @@ public StreamObserver<WriteRequest> answer(InvocationOnMock invocation) {
};
}

private Answer<StreamObserver<WriteRequest>> blobChunkedWriteAnswerError() {
return new Answer<StreamObserver<WriteRequest>>() {
@Override
@SuppressWarnings("unchecked")
public StreamObserver<WriteRequest> answer(final InvocationOnMock invocation) {
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest request) {
((StreamObserver<WriteResponse>) invocation.getArguments()[0])
.onError(Status.UNAVAILABLE.asRuntimeException());
}

@Override
public void onCompleted() {}

@Override
public void onError(Throwable t) {
fail("An unexpected client-side error occurred: " + t);
}
};
}
};
}

@Test
public void testUploadBlobMultipleChunks() throws Exception {
final Digest digest = Digests.computeDigestUtf8("abcdef");
Expand All @@ -395,6 +371,8 @@ public void findMissingBlobs(
.thenAnswer(blobChunkedWriteAnswer("abcdef", chunkSize));
assertThat(client.uploadBlob("abcdef".getBytes(UTF_8))).isEqualTo(digest);
}
Mockito.verify(mockByteStreamImpl, Mockito.times(6))
.write(Mockito.<StreamObserver<WriteResponse>>anyObject());
}

@Test
Expand All @@ -413,12 +391,7 @@ public void testUploadCacheHits() throws Exception {
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
assertThat(request)
.isEqualTo(
FindMissingBlobsRequest.newBuilder()
.addBlobDigests(fooDigest)
.addBlobDigests(barDigest)
.build());
assertThat(request.getBlobDigestsList()).containsExactly(fooDigest, barDigest);
// Nothing is missing.
responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
responseObserver.onCompleted();
Expand Down Expand Up @@ -460,7 +433,13 @@ public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
if (numErrors-- <= 0) {
responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
// Everything is missing.
responseObserver.onNext(
FindMissingBlobsResponse.newBuilder()
.addMissingBlobDigests(fooDigest)
.addMissingBlobDigests(barDigest)
.addMissingBlobDigests(bazDigest)
.build());
responseObserver.onCompleted();
} else {
responseObserver.onError(Status.UNAVAILABLE.asRuntimeException());
Expand Down Expand Up @@ -496,14 +475,59 @@ public void updateActionResult(
ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
serviceRegistry.addService(mockByteStreamImpl);
when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
.thenAnswer(blobChunkedWriteAnswerError()) // Error out for foo.
.thenAnswer(blobChunkedWriteAnswer("x", 1)) // Upload bar successfully.
.thenAnswer(blobChunkedWriteAnswerError()) // Error out for baz.
.thenAnswer(blobChunkedWriteAnswer("xyz", 3)) // Retry foo successfully.
.thenAnswer(blobChunkedWriteAnswerError()) // Error out for baz again.
.thenAnswer(blobChunkedWriteAnswer("z", 1)); // Retry baz successfully.
.thenAnswer(
new Answer<StreamObserver<WriteRequest>>() {
private int numErrors = 4;

@Override
@SuppressWarnings("unchecked")
public StreamObserver<WriteRequest> answer(InvocationOnMock invocation) {
StreamObserver<WriteResponse> responseObserver =
(StreamObserver<WriteResponse>) invocation.getArguments()[0];
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest request) {
numErrors--;
if (numErrors >= 0) {
responseObserver.onError(Status.UNAVAILABLE.asRuntimeException());
return;
}
assertThat(request.getFinishWrite()).isTrue();
String resourceName = request.getResourceName();
String dataStr = request.getData().toStringUtf8();
int size = 0;
if (resourceName.contains(fooDigest.getHash())) {
assertThat(dataStr).isEqualTo("xyz");
size = 3;
} else if (resourceName.contains(barDigest.getHash())) {
assertThat(dataStr).isEqualTo("x");
size = 1;
} else if (resourceName.contains(bazDigest.getHash())) {
assertThat(dataStr).isEqualTo("z");
size = 1;
} else {
fail("Unexpected resource name in upload: " + resourceName);
}
responseObserver.onNext(
WriteResponse.newBuilder().setCommittedSize(size).build());
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}

@Override
public void onError(Throwable t) {
fail("An error occurred: " + t);
}
};
}
});
client.upload(actionKey, execRoot, ImmutableList.<Path>of(fooFile, barFile, bazFile), outErr);
// 4 times for the errors, 3 times for the successful uploads.
Mockito.verify(mockByteStreamImpl, Mockito.times(7))
.write(Mockito.<StreamObserver<WriteResponse>>anyObject());
}

@Test
Expand Down

0 comments on commit 688dbf7

Please sign in to comment.