Skip to content

Commit

Permalink
Remote: Fixes a confusion that background upload counter could increa…
Browse files Browse the repository at this point in the history
…se after build finished.

At the end of a build, the number of files waiting to be uploaded could increase as other ones finished. This PR fixes that.

Also, changes to only emit profile block `upload outputs` for blocking uploads.

Fixes #13655 (comment).

Closes #13954.

PiperOrigin-RevId: 398161750
  • Loading branch information
coeuvre authored and Copybara-Service committed Sep 22, 2021
1 parent de0c6bd commit 003e2d0
Show file tree
Hide file tree
Showing 16 changed files with 158 additions and 193 deletions.
82 changes: 13 additions & 69 deletions src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
import com.google.devtools.build.lib.concurrent.ThreadSafety;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.exec.SpawnProgressEvent;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
Expand Down Expand Up @@ -66,7 +62,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/**
* A cache for storing artifacts (input and output) as well as the output of running an action.
Expand All @@ -85,7 +80,6 @@ public class RemoteCache extends AbstractReferenceCounted {
private static final ListenableFuture<Void> COMPLETED_SUCCESS = immediateFuture(null);
private static final ListenableFuture<byte[]> EMPTY_BYTES = immediateFuture(new byte[0]);

private final ExtendedEventHandler reporter;
private final CountDownLatch closeCountDownLatch = new CountDownLatch(1);
protected final AsyncTaskCache.NoResult<Digest> casUploadCache = AsyncTaskCache.NoResult.create();

Expand All @@ -94,11 +88,9 @@ public class RemoteCache extends AbstractReferenceCounted {
protected final DigestUtil digestUtil;

public RemoteCache(
ExtendedEventHandler reporter,
RemoteCacheClient cacheProtocol,
RemoteOptions options,
DigestUtil digestUtil) {
this.reporter = reporter;
this.cacheProtocol = cacheProtocol;
this.options = options;
this.digestUtil = digestUtil;
Expand All @@ -110,23 +102,6 @@ public CachedActionResult downloadActionResult(
return getFromFuture(cacheProtocol.downloadActionResult(context, actionKey, inlineOutErr));
}

private void postUploadStartedEvent(@Nullable ActionExecutionMetadata action, String resourceId) {
if (action == null) {
return;
}

reporter.post(ActionUploadStartedEvent.create(action, resourceId));
}

private void postUploadFinishedEvent(
@Nullable ActionExecutionMetadata action, String resourceId) {
if (action == null) {
return;
}

reporter.post(ActionUploadFinishedEvent.create(action, resourceId));
}

/**
* Returns a set of digests that the remote cache does not know about. The returned set is
* guaranteed to be a subset of {@code digests}.
Expand All @@ -143,38 +118,14 @@ public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(
public ListenableFuture<Void> uploadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, ActionResult actionResult) {

ActionExecutionMetadata action = context.getSpawnOwner();

Completable upload =
Completable.using(
() -> {
String resourceId = "ac/" + actionKey.getDigest().getHash();
postUploadStartedEvent(action, resourceId);
return resourceId;
},
resourceId ->
RxFutures.toCompletable(
() -> cacheProtocol.uploadActionResult(context, actionKey, actionResult),
directExecutor()),
resourceId -> postUploadFinishedEvent(action, resourceId));
RxFutures.toCompletable(
() -> cacheProtocol.uploadActionResult(context, actionKey, actionResult),
directExecutor());

return RxFutures.toListenableFuture(upload);
}

private Completable doUploadFile(RemoteActionExecutionContext context, Digest digest, Path file) {
ActionExecutionMetadata action = context.getSpawnOwner();
return Completable.using(
() -> {
String resourceId = "cas/" + digest.getHash();
postUploadStartedEvent(action, resourceId);
return resourceId;
},
resourceId ->
RxFutures.toCompletable(
() -> cacheProtocol.uploadFile(context, digest, file), directExecutor()),
resourceId -> postUploadFinishedEvent(action, resourceId));
}

/**
* Upload a local file to the remote cache.
*
Expand All @@ -191,26 +142,15 @@ public final ListenableFuture<Void> uploadFile(
return COMPLETED_SUCCESS;
}

Completable upload = casUploadCache.executeIfNot(digest, doUploadFile(context, digest, file));
Completable upload =
casUploadCache.executeIfNot(
digest,
RxFutures.toCompletable(
() -> cacheProtocol.uploadFile(context, digest, file), directExecutor()));

return RxFutures.toListenableFuture(upload);
}

private Completable doUploadBlob(
RemoteActionExecutionContext context, Digest digest, ByteString data) {
ActionExecutionMetadata action = context.getSpawnOwner();
return Completable.using(
() -> {
String resourceId = "cas/" + digest.getHash();
postUploadStartedEvent(action, resourceId);
return resourceId;
},
resourceId ->
RxFutures.toCompletable(
() -> cacheProtocol.uploadBlob(context, digest, data), directExecutor()),
resourceId -> postUploadFinishedEvent(action, resourceId));
}

/**
* Upload sequence of bytes to the remote cache.
*
Expand All @@ -227,7 +167,11 @@ public final ListenableFuture<Void> uploadBlob(
return COMPLETED_SUCCESS;
}

Completable upload = casUploadCache.executeIfNot(digest, doUploadBlob(context, digest, data));
Completable upload =
casUploadCache.executeIfNot(
digest,
RxFutures.toCompletable(
() -> cacheProtocol.uploadBlob(context, digest, data), directExecutor()));

return RxFutures.toListenableFuture(upload);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
Expand All @@ -43,11 +42,10 @@
public class RemoteExecutionCache extends RemoteCache {

public RemoteExecutionCache(
ExtendedEventHandler reporter,
RemoteCacheClient protocolImpl,
RemoteOptions options,
DigestUtil digestUtil) {
super(reporter, protocolImpl, options, digestUtil);
super(protocolImpl, options, digestUtil);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.DirectoryMetadata;
import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.FileMetadata;
Expand Down Expand Up @@ -1070,7 +1071,8 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult)
Single.using(
remoteCache::retain,
remoteCache ->
manifest.uploadAsync(action.getRemoteActionExecutionContext(), remoteCache),
manifest.uploadAsync(
action.getRemoteActionExecutionContext(), remoteCache, reporter),
RemoteCache::release)
.subscribeOn(scheduler)
.subscribe(
Expand All @@ -1087,7 +1089,10 @@ public void onError(@NonNull Throwable e) {
}
});
} else {
manifest.upload(action.getRemoteActionExecutionContext(), remoteCache);
try (SilentCloseable c =
Profiler.instance().profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {
manifest.upload(action.getRemoteActionExecutionContext(), remoteCache, reporter);
}
}
} catch (IOException e) {
reportUploadError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ private void initHttpAndDiskCache(
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
RemoteCache remoteCache =
new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteCaching(
executorService, env, remoteCache, /* retryScheduler= */ null, digestUtil);
Expand Down Expand Up @@ -573,7 +572,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}
execChannel.release();
RemoteExecutionCache remoteCache =
new RemoteExecutionCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
new RemoteExecutionCache(cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteExecution(
executorService,
Expand Down Expand Up @@ -609,8 +608,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}
}

RemoteCache remoteCache =
new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteCaching(
executorService, env, remoteCache, retryScheduler, digestUtil);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ public void store(SpawnResult result) throws ExecException, InterruptedException
}
}

try (SilentCloseable c = prof.profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {
remoteExecutionService.uploadOutputs(action, result);
}
remoteExecutionService.uploadOutputs(action, result);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,7 @@ SpawnResult execLocallyAndUpload(
}
}

try (SilentCloseable c = Profiler.instance().profile(UPLOAD_TIME, "upload outputs")) {
remoteExecutionService.uploadOutputs(action, result);
}
remoteExecutionService.uploadOutputs(action, result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@
import build.bazel.remote.execution.v2.Tree;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.common.RemotePathResolver;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxUtils;
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution;
import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution.Code;
Expand All @@ -56,6 +62,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/** UploadManifest adds output metadata to a {@link ActionResult}. */
Expand Down Expand Up @@ -341,10 +348,11 @@ ActionResult getActionResult() {
}

/** Uploads outputs and action result (if exit code is 0) to remote cache. */
public ActionResult upload(RemoteActionExecutionContext context, RemoteCache remoteCache)
public ActionResult upload(
RemoteActionExecutionContext context, RemoteCache remoteCache, ExtendedEventHandler reporter)
throws IOException, InterruptedException {
try {
return uploadAsync(context, remoteCache).blockingGet();
return uploadAsync(context, remoteCache, reporter).blockingGet();
} catch (RuntimeException e) {
throwIfInstanceOf(e.getCause(), InterruptedException.class);
throwIfInstanceOf(e.getCause(), IOException.class);
Expand All @@ -368,29 +376,91 @@ private Completable upload(
return toCompletable(() -> remoteCache.uploadBlob(context, digest, blob), directExecutor());
}

private static void reportUploadStarted(
ExtendedEventHandler reporter,
@Nullable ActionExecutionMetadata action,
String prefix,
Iterable<Digest> digests) {
if (action != null) {
for (Digest digest : digests) {
reporter.post(ActionUploadStartedEvent.create(action, prefix + digest.getHash()));
}
}
}

private static void reportUploadFinished(
ExtendedEventHandler reporter,
@Nullable ActionExecutionMetadata action,
String resourceIdPrefix,
Iterable<Digest> digests) {
if (action != null) {
for (Digest digest : digests) {
reporter.post(
ActionUploadFinishedEvent.create(action, resourceIdPrefix + digest.getHash()));
}
}
}

/**
* Returns a {@link Single} which upon subscription will upload outputs and action result (if exit
* code is 0) to remote cache.
*/
public Single<ActionResult> uploadAsync(
RemoteActionExecutionContext context, RemoteCache remoteCache) {
RemoteActionExecutionContext context,
RemoteCache remoteCache,
ExtendedEventHandler reporter) {
Collection<Digest> digests = new ArrayList<>();
digests.addAll(digestToFile.keySet());
digests.addAll(digestToBlobs.keySet());

Completable uploadOutputs =
mergeBulkTransfer(
toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
.flatMapPublisher(Flowable::fromIterable)
.flatMapSingle(digest -> toTransferResult(upload(context, remoteCache, digest))));
ActionExecutionMetadata action = context.getSpawnOwner();

String outputPrefix = "cas/";
Flowable<RxUtils.TransferResult> bulkTransfers =
toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
.doOnSubscribe(d -> reportUploadStarted(reporter, action, outputPrefix, digests))
.doOnError(error -> reportUploadFinished(reporter, action, outputPrefix, digests))
.doOnDispose(() -> reportUploadFinished(reporter, action, outputPrefix, digests))
.doOnSuccess(
missingDigests -> {
List<Digest> existedDigests =
digests.stream()
.filter(digest -> !missingDigests.contains(digest))
.collect(Collectors.toList());
reportUploadFinished(reporter, action, outputPrefix, existedDigests);
})
.flatMapPublisher(Flowable::fromIterable)
.flatMapSingle(
digest ->
toTransferResult(upload(context, remoteCache, digest))
.doFinally(
() ->
reportUploadFinished(
reporter, action, outputPrefix, ImmutableList.of(digest))));
Completable uploadOutputs = mergeBulkTransfer(bulkTransfers);

ActionResult actionResult = result.build();
Completable uploadActionResult = Completable.complete();
if (actionResult.getExitCode() == 0 && actionKey != null) {
String actionResultPrefix = "ac/";
uploadActionResult =
toCompletable(
() -> remoteCache.uploadActionResult(context, actionKey, actionResult),
directExecutor());
() -> remoteCache.uploadActionResult(context, actionKey, actionResult),
directExecutor())
.doOnSubscribe(
d ->
reportUploadStarted(
reporter,
action,
actionResultPrefix,
ImmutableList.of(actionKey.getDigest())))
.doFinally(
() ->
reportUploadFinished(
reporter,
action,
actionResultPrefix,
ImmutableList.of(actionKey.getDigest())));
}

return Completable.concatArray(uploadOutputs, uploadActionResult).toSingleDefault(actionResult);
Expand Down
Loading

0 comments on commit 003e2d0

Please sign in to comment.