Skip to content

Commit

Permalink
Remote: Async upload (Part 7)
Browse files Browse the repository at this point in the history
Add --experimental_remote_cache_async flag, which when enabled, makes the uploads happen in the background.

Part of #13655.

PiperOrigin-RevId: 395020967
  • Loading branch information
coeuvre authored and Copybara-Service committed Sep 6, 2021
1 parent 862fd5e commit 7f08b78
Show file tree
Hide file tree
Showing 15 changed files with 356 additions and 219 deletions.
2 changes: 0 additions & 2 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/authandtls",
"//src/main/java/com/google/devtools/build/lib/bazel/repository/downloader",
"//src/main/java/com/google/devtools/build/lib/buildeventstream",
"//src/main/java/com/google/devtools/build/lib/collect",
"//src/main/java/com/google/devtools/build/lib/collect/nestedset",
"//src/main/java/com/google/devtools/build/lib/concurrent",
"//src/main/java/com/google/devtools/build/lib/events",
"//src/main/java/com/google/devtools/build/lib/exec:abstract_spawn_strategy",
"//src/main/java/com/google/devtools/build/lib/exec:execution_options",
"//src/main/java/com/google/devtools/build/lib/exec:executor_builder",
"//src/main/java/com/google/devtools/build/lib/exec:executor_lifecycle_listener",
"//src/main/java/com/google/devtools/build/lib/exec:module_action_context_registry",
"//src/main/java/com/google/devtools/build/lib/exec:remote_local_fallback_registry",
"//src/main/java/com/google/devtools/build/lib/exec:spawn_cache",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.devtools.build.lib.actions.ActionGraph;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.exec.ExecutionOptions;
import com.google.devtools.build.lib.exec.ExecutorLifecycleListener;
import com.google.devtools.build.lib.exec.ModuleActionContextRegistry;
import com.google.devtools.build.lib.exec.SpawnCache;
import com.google.devtools.build.lib.exec.SpawnStrategyRegistry;
Expand All @@ -35,11 +32,10 @@
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
import com.google.devtools.build.lib.vfs.Path;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/** Provides a remote execution context. */
final class RemoteActionContextProvider implements ExecutorLifecycleListener {
final class RemoteActionContextProvider {

private final CommandEnvironment env;
@Nullable private final RemoteCache cache;
Expand Down Expand Up @@ -120,8 +116,12 @@ private RemoteExecutionService getRemoteExecutionService() {
workingDirectory.getRelative(remoteOptions.remoteCaptureCorruptedOutputs);
}

boolean verboseFailures =
checkNotNull(env.getOptions().getOptions(ExecutionOptions.class)).verboseFailures;
remoteExecutionService =
new RemoteExecutionService(
env.getReporter(),
verboseFailures,
env.getExecRoot(),
createRemotePathResolver(),
env.getBuildRequestId(),
Expand All @@ -132,6 +132,7 @@ private RemoteExecutionService getRemoteExecutionService() {
executor,
filesToDownload,
captureCorruptedOutputsDir);
env.getEventBus().register(remoteExecutionService);
}

return remoteExecutionService;
Expand Down Expand Up @@ -189,20 +190,16 @@ void setFilesToDownload(ImmutableSet<ActionInput> topLevelOutputs) {
this.filesToDownload = Preconditions.checkNotNull(topLevelOutputs, "filesToDownload");
}

@Override
public void executorCreated() {}

@Override
public void executionPhaseStarting(
ActionGraph actionGraph, Supplier<ImmutableSet<Artifact>> topLevelArtifacts) {}

@Override
public void executionPhaseEnding() {
if (cache != null) {
cache.close();
}
if (executor != null) {
executor.close();
public void afterCommand() {
if (remoteExecutionService != null) {
remoteExecutionService.shutdown();
} else {
if (cache != null) {
cache.release();
}
if (executor != null) {
executor.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,36 @@
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import io.netty.util.AbstractReferenceCounted;
import io.reactivex.rxjava3.core.Completable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/** A cache for storing artifacts (input and output) as well as the output of running an action. */
/**
* A cache for storing artifacts (input and output) as well as the output of running an action.
*
* <p>The cache is reference counted. Initially, the reference count is 1. Use {@link #retain()} to
* increase and {@link #release()} to decrease the reference count respectively. Once the reference
* count is reached to 0, the underlying resources will be released (after network I/Os finished).
*
* <p>Use {@link #awaitTermination()} to wait for the underlying network I/Os to finish. Use {@link
* #shutdownNow()} to cancel all active network I/Os and reject new requests.
*/
@ThreadSafety.ThreadSafe
public class RemoteCache implements AutoCloseable {
public class RemoteCache extends AbstractReferenceCounted {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private static final ListenableFuture<Void> COMPLETED_SUCCESS = immediateFuture(null);
private static final ListenableFuture<byte[]> EMPTY_BYTES = immediateFuture(new byte[0]);

private final CountDownLatch closeCountDownLatch = new CountDownLatch(1);
protected final AsyncTaskCache.NoResult<Digest> casUploadCache = AsyncTaskCache.NoResult.create();

protected final RemoteCacheClient cacheProtocol;
Expand All @@ -95,7 +107,7 @@ public CachedActionResult downloadActionResult(
public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(
RemoteActionExecutionContext context, Iterable<Digest> digests) {
if (Iterables.isEmpty(digests)) {
return Futures.immediateFuture(ImmutableSet.of());
return immediateFuture(ImmutableSet.of());
}
return cacheProtocol.findMissingDigests(context, digests);
}
Expand Down Expand Up @@ -426,10 +438,34 @@ public final List<ListenableFuture<Void>> downloadOutErr(
return downloads;
}

/** Release resources associated with the cache. The cache may not be used after calling this. */
@Override
public void close() {
protected void deallocate() {
casUploadCache.shutdown();
cacheProtocol.close();

closeCountDownLatch.countDown();
}

@Override
public RemoteCache touch(Object o) {
return this;
}

@Override
public RemoteCache retain() {
super.retain();
return this;
}

/** Waits for active network I/Os to finish. */
public void awaitTermination() throws InterruptedException {
casUploadCache.awaitTermination();
closeCountDownLatch.await();
}

/** Shuts the cache down and cancels active network I/Os. */
public void shutdownNow() {
casUploadCache.shutdownNow();
}

public static FailureDetail createFailureDetail(String message, Code detailedCode) {
Expand Down
Loading

0 comments on commit 7f08b78

Please sign in to comment.