Skip to content

Commit

Permalink
Remote: Async upload (Part 6)
Browse files Browse the repository at this point in the history
Add RxUtils#mergeBulkTransfer which is similar to waitForBulkTransfer but is used with Rx.

Add UploadManifest#uploadAsync and update UploadManifest#upload to use it.

Part of #13655.

PiperOrigin-RevId: 395018504
  • Loading branch information
coeuvre authored and Copybara-Service committed Sep 6, 2021
1 parent db15e47 commit 862fd5e
Show file tree
Hide file tree
Showing 14 changed files with 507 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.google.devtools.build.lib.remote;

import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.Durations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.concurrent.ThreadSafety;
import com.google.devtools.build.lib.exec.SpawnProgressEvent;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
import com.google.devtools.build.lib.remote.common.OutputDigestMismatchException;
import com.google.devtools.build.lib.remote.common.ProgressStatusListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.DirectoryMetadata;
import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.FileMetadata;
import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.SymlinkMetadata;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.NetworkTime;
import com.google.devtools.build.lib.remote.common.OperationObserver;
import com.google.devtools.build.lib.remote.common.OutputDigestMismatchException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.RemoteExecutionService.RemoteAction;
import com.google.devtools.build.lib.remote.RemoteExecutionService.RemoteActionResult;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.Utils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.google.devtools.build.lib.remote.RemoteExecutionService.RemoteAction;
import com.google.devtools.build.lib.remote.RemoteExecutionService.RemoteActionResult;
import com.google.devtools.build.lib.remote.RemoteExecutionService.ServerLogs;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.OperationObserver;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.Utils;
Expand Down
105 changes: 61 additions & 44 deletions src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.devtools.build.lib.remote.RemoteCache.waitForBulkTransfer;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;

import build.bazel.remote.execution.v2.Action;
import build.bazel.remote.execution.v2.ActionResult;
Expand All @@ -24,9 +28,6 @@
import build.bazel.remote.execution.v2.Tree;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
Expand All @@ -45,6 +46,9 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.Symlinks;
import com.google.protobuf.ByteString;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -54,9 +58,7 @@
import java.util.Map;
import javax.annotation.Nullable;

/**
* UploadManifest adds output metadata to a {@link ActionResult}.
*/
/** UploadManifest adds output metadata to a {@link ActionResult}. */
public class UploadManifest {

private final DigestUtil digestUtil;
Expand Down Expand Up @@ -140,7 +142,7 @@ private void setStdoutStderr(FileOutErr outErr) throws IOException {
* non-directory descendant files.
*/
@VisibleForTesting
public void addFiles(Collection<Path> files) throws ExecException, IOException {
void addFiles(Collection<Path> files) throws ExecException, IOException {
for (Path file : files) {
// TODO(ulfjack): Maybe pass in a SpawnResult here, add a list of output files to that, and
// rely on the local spawn runner to stat the files, instead of statting here.
Expand Down Expand Up @@ -199,9 +201,7 @@ private void addAction(RemoteCacheClient.ActionKey actionKey, Action action, Com
digestToBlobs.put(action.getCommandDigest(), command.toByteString());
}

/**
* Map of digests to file paths to upload.
*/
/** Map of digests to file paths to upload. */
public Map<Digest, Path> getDigestToFile() {
return digestToFile;
}
Expand Down Expand Up @@ -292,15 +292,11 @@ private Directory computeDirectory(Path path, Tree.Builder tree)
FileStatus statFollow = child.statIfFound(Symlinks.FOLLOW);
if (statFollow == null) {
throw new IOException(
String.format(
"Action output %s is a dangling symbolic link to %s ", child, target));
String.format("Action output %s is a dangling symbolic link to %s ", child, target));
}
if (statFollow.isFile() && !statFollow.isSpecialFile()) {
Digest digest = digestUtil.compute(child);
b.addFilesBuilder()
.setName(name)
.setDigest(digest)
.setIsExecutable(child.isExecutable());
b.addFilesBuilder().setName(name).setDigest(digest).setIsExecutable(child.isExecutable());
digestToFile.put(digest, child);
} else if (statFollow.isDirectory()) {
Directory dir = computeDirectory(child, tree);
Expand Down Expand Up @@ -330,10 +326,11 @@ private void illegalOutput(Path what) throws ExecException {
+ "Change the file type or use --remote_allow_symlink_upload.",
remotePathResolver.localPathToOutputPath(what), kind);

FailureDetail failureDetail = FailureDetail.newBuilder()
.setMessage(message)
.setRemoteExecution(RemoteExecution.newBuilder().setCode(Code.ILLEGAL_OUTPUT))
.build();
FailureDetail failureDetail =
FailureDetail.newBuilder()
.setMessage(message)
.setRemoteExecution(RemoteExecution.newBuilder().setCode(Code.ILLEGAL_OUTPUT))
.build();
throw new UserExecException(failureDetail);
}

Expand All @@ -345,36 +342,56 @@ ActionResult getActionResult() {
/** Uploads outputs and action result (if exit code is 0) to remote cache. */
public ActionResult upload(RemoteActionExecutionContext context, RemoteCache remoteCache)
throws IOException, InterruptedException {
Map<Digest, Path> digestToFile = getDigestToFile();
Map<Digest, ByteString> digestToBlobs = getDigestToBlobs();
try {
return uploadAsync(context, remoteCache).blockingGet();
} catch (RuntimeException e) {
throwIfInstanceOf(e.getCause(), InterruptedException.class);
throwIfInstanceOf(e.getCause(), IOException.class);
throw e;
}
}

private Completable upload(
RemoteActionExecutionContext context, RemoteCache remoteCache, Digest digest) {
Path file = digestToFile.get(digest);
if (file != null) {
return toCompletable(() -> remoteCache.uploadFile(context, digest, file), directExecutor());
}

ByteString blob = digestToBlobs.get(digest);
if (blob == null) {
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
return Completable.error(new IOException(message));
}

return toCompletable(() -> remoteCache.uploadBlob(context, digest, blob), directExecutor());
}

/**
* Returns a {@link Single} which upon subscription will upload outputs and action result (if exit
* code is 0) to remote cache.
*/
public Single<ActionResult> uploadAsync(
RemoteActionExecutionContext context, RemoteCache remoteCache) {
Collection<Digest> digests = new ArrayList<>();
digests.addAll(digestToFile.keySet());
digests.addAll(digestToBlobs.keySet());

ImmutableSet<Digest> digestsToUpload =
getFromFuture(remoteCache.findMissingDigests(context, digests));
ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
for (Digest digest : digestsToUpload) {
Path file = digestToFile.get(digest);
if (file != null) {
uploads.add(remoteCache.uploadFile(context, digest, file));
} else {
ByteString blob = digestToBlobs.get(digest);
if (blob == null) {
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
throw new IOException(message);
}
uploads.add(remoteCache.uploadBlob(context, digest, blob));
}
}

waitForBulkTransfer(uploads.build(), /* cancelRemainingOnInterrupt=*/ false);
Completable uploadOutputs =
mergeBulkTransfer(
toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
.flatMapPublisher(Flowable::fromIterable)
.flatMapSingle(digest -> toTransferResult(upload(context, remoteCache, digest))));

ActionResult actionResult = result.build();
Completable uploadActionResult = Completable.complete();
if (actionResult.getExitCode() == 0 && actionKey != null) {
getFromFuture(remoteCache.uploadActionResult(context, actionKey, actionResult));
uploadActionResult =
toCompletable(
() -> remoteCache.uploadActionResult(context, actionKey, actionResult),
directExecutor());
}

return actionResult;
return Completable.concatArray(uploadOutputs, uploadActionResult).toSingleDefault(actionResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
package com.google.devtools.build.lib.remote.common;

import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import java.io.IOException;

/**
Expand All @@ -22,13 +21,13 @@
* a trace point for the actual transfer, so that the intended operation can be observed in a stack,
* with all constituent exceptions available for observation.
*/
class BulkTransferException extends IOException {
public class BulkTransferException extends IOException {
// true since no empty BulkTransferException is ever thrown
private boolean allCacheNotFoundException = true;

BulkTransferException() {}
public BulkTransferException() {}

BulkTransferException(IOException e) {
public BulkTransferException(IOException e) {
add(e);
}

Expand All @@ -38,16 +37,16 @@ class BulkTransferException extends IOException {
* <p>The Java standard addSuppressed is final and this method stands in its place to selectively
* filter and record whether all suppressed exceptions are CacheNotFoundExceptions
*/
void add(IOException e) {
public void add(IOException e) {
allCacheNotFoundException &= e instanceof CacheNotFoundException;
super.addSuppressed(e);
}

boolean onlyCausedByCacheNotFoundException() {
public boolean onlyCausedByCacheNotFoundException() {
return allCacheNotFoundException;
}

static boolean isOnlyCausedByCacheNotFoundException(Exception e) {
public static boolean isOnlyCausedByCacheNotFoundException(Exception e) {
return e instanceof BulkTransferException
&& ((BulkTransferException) e).onlyCausedByCacheNotFoundException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.util;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.util.concurrent.AbstractFuture;
Expand All @@ -24,6 +25,9 @@
import io.reactivex.rxjava3.core.CompletableEmitter;
import io.reactivex.rxjava3.core.CompletableObserver;
import io.reactivex.rxjava3.core.CompletableOnSubscribe;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleEmitter;
import io.reactivex.rxjava3.core.SingleOnSubscribe;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import java.util.concurrent.Callable;
Expand All @@ -42,7 +46,7 @@ private RxFutures() {}
* Returns a {@link Completable} that is complete once the supplied {@link ListenableFuture} has
* completed.
*
* <p>A {@link ListenableFuture>} represents some computation that is already in progress. We use
* <p>A {@link ListenableFuture} represents some computation that is already in progress. We use
* {@link Callable} here to defer the execution of the thing that produces ListenableFuture until
* there is subscriber.
*
Expand Down Expand Up @@ -110,6 +114,77 @@ public void onFailure(Throwable throwable) {
}
}

/**
* Returns a {@link Single} that is complete once the supplied {@link ListenableFuture} has
* completed.
*
* <p>A {@link ListenableFuture} represents some computation that is already in progress. We use
* {@link Callable} here to defer the execution of the thing that produces ListenableFuture until
* there is subscriber.
*
* <p>Errors are also propagated except for certain "fatal" exceptions defined by rxjava. Multiple
* subscriptions are not allowed.
*
* <p>Disposes the Single to cancel the underlying ListenableFuture.
*/
public static <T> Single<T> toSingle(Callable<ListenableFuture<T>> callable, Executor executor) {
return Single.create(new OnceSingleOnSubscribe<>(callable, executor));
}

private static class OnceSingleOnSubscribe<T> implements SingleOnSubscribe<T> {
private final AtomicBoolean subscribed = new AtomicBoolean(false);

private final Callable<ListenableFuture<T>> callable;
private final Executor executor;

private OnceSingleOnSubscribe(Callable<ListenableFuture<T>> callable, Executor executor) {
this.callable = callable;
this.executor = executor;
}

@Override
public void subscribe(@NonNull SingleEmitter<T> emitter) throws Throwable {
try {
checkState(!subscribed.getAndSet(true), "This single cannot be subscribed to twice");
ListenableFuture<T> future = callable.call();
Futures.addCallback(
future,
new FutureCallback<T>() {
@Override
public void onSuccess(@Nullable T t) {
checkNotNull(t, "value in future onSuccess callback is null");
emitter.onSuccess(t);
}

@Override
public void onFailure(Throwable throwable) {
/*
* CancellationException can be thrown in two cases:
* 1. The ListenableFuture itself is cancelled.
* 2. Single is disposed by downstream.
*
* This check is used to prevent propagating CancellationException to downstream
* when it has already disposed the Single.
*/
if (throwable instanceof CancellationException && emitter.isDisposed()) {
return;
}

emitter.onError(throwable);
}
},
executor);
emitter.setCancellable(() -> future.cancel(true));
} catch (Throwable t) {
// We failed to construct and listen to the LF. Following RxJava's own behaviour, prefer
// to pass RuntimeExceptions and Errors down to the subscriber except for certain
// "fatal" exceptions.
Exceptions.throwIfFatal(t);
executor.execute(() -> emitter.onError(t));
}
}
}

/**
* Returns a {@link ListenableFuture} that is complete once the {@link Completable} has completed.
*
Expand Down
Loading

0 comments on commit 862fd5e

Please sign in to comment.