Skip to content

Commit

Permalink
Remote: Async upload (Part 5)
Browse files Browse the repository at this point in the history
Deduplicate calls to findMissingBlobs and file uploads. This is a different implementation to #13166.

Part of #13655.

Fixes #12113.

Closes #13166.

PiperOrigin-RevId: 395015753
  • Loading branch information
coeuvre authored and Copybara-Service committed Sep 6, 2021
1 parent 3ada002 commit db15e47
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import build.bazel.remote.execution.v2.Digest;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand All @@ -31,7 +32,6 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.concurrent.ThreadSafety;
import com.google.devtools.build.lib.exec.SpawnProgressEvent;
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
import com.google.devtools.build.lib.remote.common.OutputDigestMismatchException;
import com.google.devtools.build.lib.remote.common.ProgressStatusListener;
Expand All @@ -40,14 +40,17 @@
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CachedActionResult;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxFutures;
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution;
import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution.Code;
import com.google.devtools.build.lib.util.io.OutErr;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import io.reactivex.rxjava3.core.Completable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -62,15 +65,11 @@
public class RemoteCache implements AutoCloseable {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

/** See {@link SpawnExecutionContext#lockOutputFiles()}. */
@FunctionalInterface
interface OutputFilesLocker {
void lock() throws InterruptedException;
}

private static final ListenableFuture<Void> COMPLETED_SUCCESS = immediateFuture(null);
private static final ListenableFuture<byte[]> EMPTY_BYTES = immediateFuture(new byte[0]);

protected final AsyncTaskCache.NoResult<Digest> casUploadCache = AsyncTaskCache.NoResult.create();

protected final RemoteCacheClient cacheProtocol;
protected final RemoteOptions options;
protected final DigestUtil digestUtil;
Expand All @@ -88,11 +87,19 @@ public CachedActionResult downloadActionResult(
return getFromFuture(cacheProtocol.downloadActionResult(context, actionKey, inlineOutErr));
}

/**
* Returns a set of digests that the remote cache does not know about. The returned set is
* guaranteed to be a subset of {@code digests}.
*/
public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(
RemoteActionExecutionContext context, Iterable<Digest> digests) {
if (Iterables.isEmpty(digests)) {
return Futures.immediateFuture(ImmutableSet.of());
}
return cacheProtocol.findMissingDigests(context, digests);
}

/** Upload the action result to the remote cache. */
public ListenableFuture<Void> uploadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, ActionResult actionResult) {
return cacheProtocol.uploadActionResult(context, actionKey, actionResult);
Expand All @@ -101,6 +108,9 @@ public ListenableFuture<Void> uploadActionResult(
/**
* Upload a local file to the remote cache.
*
* <p>Trying to upload the same file multiple times concurrently, results in only one upload being
* performed.
*
* @param context the context for the action.
* @param digest the digest of the file.
* @param file the file to upload.
Expand All @@ -111,12 +121,20 @@ public final ListenableFuture<Void> uploadFile(
return COMPLETED_SUCCESS;
}

return cacheProtocol.uploadFile(context, digest, file);
Completable upload =
casUploadCache.executeIfNot(
digest,
RxFutures.toCompletable(
() -> cacheProtocol.uploadFile(context, digest, file), directExecutor()));
return RxFutures.toListenableFuture(upload);
}

/**
* Upload sequence of bytes to the remote cache.
*
* <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being
* performed.
*
* @param context the context for the action.
* @param digest the digest of the file.
* @param data the BLOB to upload.
Expand All @@ -127,7 +145,12 @@ public final ListenableFuture<Void> uploadBlob(
return COMPLETED_SUCCESS;
}

return cacheProtocol.uploadBlob(context, digest, data);
Completable upload =
casUploadCache.executeIfNot(
digest,
RxFutures.toCompletable(
() -> cacheProtocol.uploadBlob(context, digest, data), directExecutor()));
return RxFutures.toListenableFuture(upload);
}

public static void waitForBulkTransfer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxFutures;
import com.google.protobuf.Message;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.subjects.AsyncSubject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {
Expand All @@ -57,16 +61,56 @@ public RemoteExecutionCache(
public void ensureInputsPresent(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs)
Map<Digest, Message> additionalInputs,
boolean force)
throws IOException, InterruptedException {
Iterable<Digest> allDigests =
Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
ImmutableSet<Digest> missingDigests =
getFromFuture(cacheProtocol.findMissingDigests(context, allDigests));
ImmutableSet<Digest> allDigests =
ImmutableSet.<Digest>builder()
.addAll(merkleTree.getAllDigests())
.addAll(additionalInputs.keySet())
.build();

// Collect digests that are not being or already uploaded
ConcurrentHashMap<Digest, AsyncSubject<Boolean>> missingDigestSubjects =
new ConcurrentHashMap<>();

List<ListenableFuture<Void>> uploadFutures = new ArrayList<>();
for (Digest missingDigest : missingDigests) {
uploadFutures.add(uploadBlob(context, missingDigest, merkleTree, additionalInputs));
for (Digest digest : allDigests) {
Completable upload =
casUploadCache.execute(
digest,
Completable.defer(
() -> {
// The digest hasn't been processed, add it to the collection which will be used
// later for findMissingDigests call
AsyncSubject<Boolean> missingDigestSubject = AsyncSubject.create();
missingDigestSubjects.put(digest, missingDigestSubject);

return missingDigestSubject.flatMapCompletable(
missing -> {
if (!missing) {
return Completable.complete();
}
return RxFutures.toCompletable(
() -> uploadBlob(context, digest, merkleTree, additionalInputs),
MoreExecutors.directExecutor());
});
}),
force);
uploadFutures.add(RxFutures.toListenableFuture(upload));
}

ImmutableSet<Digest> missingDigests =
getFromFuture(findMissingDigests(context, missingDigestSubjects.keySet()));
for (Map.Entry<Digest, AsyncSubject<Boolean>> entry : missingDigestSubjects.entrySet()) {
AsyncSubject<Boolean> missingSubject = entry.getValue();
if (missingDigests.contains(entry.getKey())) {
missingSubject.onNext(true);
} else {
// The digest is already existed in the remote cache, skip the upload.
missingSubject.onNext(false);
}
missingSubject.onComplete();
}

waitForBulkTransfer(uploadFutures, /* cancelRemainingOnInterrupt=*/ false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult)
*
* <p>Must be called before calling {@link #executeRemotely}.
*/
public void uploadInputsIfNotPresent(RemoteAction action)
public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
throws IOException, InterruptedException {
checkState(mayBeExecutedRemotely(action.spawn), "spawn can't be executed remotely");

Expand All @@ -1049,7 +1049,7 @@ public void uploadInputsIfNotPresent(RemoteAction action)
additionalInputs.put(action.actionKey.getDigest(), action.action);
additionalInputs.put(action.commandHash, action.command);
remoteExecutionCache.ensureInputsPresent(
action.remoteActionExecutionContext, action.merkleTree, additionalInputs);
action.remoteActionExecutionContext, action.merkleTree, additionalInputs, force);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public ExecutionResult execute(
additionalInputs.put(actionDigest, action);
additionalInputs.put(commandHash, command);

remoteCache.ensureInputsPresent(context, merkleTree, additionalInputs);
remoteCache.ensureInputsPresent(context, merkleTree, additionalInputs, /*force=*/ true);
}

try (SilentCloseable c =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,18 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionContext context)
}

AtomicBoolean useCachedResult = new AtomicBoolean(acceptCachedResult);
AtomicBoolean forceUploadInput = new AtomicBoolean(false);
try {
return retrier.execute(
() -> {
// Upload the command and all the inputs into the remote cache.
try (SilentCloseable c = prof.profile(UPLOAD_TIME, "upload missing inputs")) {
Duration networkTimeStart = action.getNetworkTime().getDuration();
Stopwatch uploadTime = Stopwatch.createStarted();
remoteExecutionService.uploadInputsIfNotPresent(action);
// Upon retry, we force upload inputs
remoteExecutionService.uploadInputsIfNotPresent(
action, forceUploadInput.getAndSet(true));

// subtract network time consumed here to ensure wall clock during upload is not
// double
// counted, and metrics time computation does not exceed total time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public void onError(Throwable t) {
});

// Upload all missing inputs (that is, the virtual action input from above)
client.ensureInputsPresent(context, merkleTree, ImmutableMap.of());
client.ensureInputsPresent(context, merkleTree, ImmutableMap.of(), /*force=*/ true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.io.IOException;
import java.util.Map;

class InMemoryRemoteCache extends RemoteCache {
class InMemoryRemoteCache extends RemoteExecutionCache {

InMemoryRemoteCache(
Map<Digest, byte[]> casEntries, RemoteOptions options, DigestUtil digestUtil) {
Expand Down Expand Up @@ -74,6 +74,10 @@ int getNumFailedDownloads() {
return ((InMemoryCacheClient) cacheProtocol).getNumFailedDownloads();
}

Map<Digest, Integer> getNumFindMissingDigests() {
return ((InMemoryCacheClient) cacheProtocol).getNumFindMissingDigests();
}

@Override
public void close() {
cacheProtocol.close();
Expand Down
Loading

0 comments on commit db15e47

Please sign in to comment.