diff --git a/src/main/java/com/google/devtools/build/lib/actions/FileUploadEvent.java b/src/main/java/com/google/devtools/build/lib/actions/FileUploadEvent.java new file mode 100644 index 00000000000000..1da2fbf0bc9a1f --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/actions/FileUploadEvent.java @@ -0,0 +1,29 @@ +// Copyright 2021 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.actions; + +import com.google.auto.value.AutoValue; +import com.google.devtools.build.lib.events.ExtendedEventHandler.ProgressLike; + +/** Notifications for the upload of a file to the remote server. */ +@AutoValue +public abstract class FileUploadEvent implements ProgressLike { + + public static FileUploadEvent create(boolean finished) { + return new AutoValue_FileUploadEvent(finished); + } + + /** Whether the upload progress reported about is finished already. */ + public abstract boolean finished(); +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java index 9599434575de60..c6f2f8f3a27e82 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java @@ -137,6 +137,7 @@ private RemoteExecutionService getRemoteExecutionService() { cache, executor, filesToDownload); + env.getEventBus().register(remoteExecutionService); } return remoteExecutionService; @@ -196,7 +197,7 @@ void setFilesToDownload(ImmutableSet topLevelOutputs) { public void afterCommand() { if (remoteExecutionService != null) { - remoteExecutionService.close(); + remoteExecutionService.shutdown(); } } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java index 02ad2dbbc21f41..2ad9f88a9f88e1 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java @@ -106,7 +106,16 @@ import java.util.stream.StreamSupport; import javax.annotation.Nullable; -/** A cache for storing artifacts (input and output) as well as the output of running an action. */ +/** + * A cache for storing artifacts (input and output) as well as the output of running an action. + * + *

The cache is reference counted. Initially, the reference count is 1. Use {@link #retain()} to + * increase and {@link #release()} to decrease the reference count respectively. Once the reference + * count is reached to 0, the underlying resources will be released (after network I/Os finished). + * + *

Use {@link #awaitTermination()} to wait for the underlying network I/Os to finish. Use {@link + * #shutdownNow()} to cancel all active network I/Os and reject new requests. + */ @ThreadSafety.ThreadSafe public class RemoteCache extends AbstractReferenceCounted { private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); @@ -953,10 +962,19 @@ protected void deallocate() { cacheProtocol.close(); } + public boolean isClosed() { + return closed.get(); + } + + /** Cancels all active network I/Os and rejects new requests. */ public void shutdownNow() { uploadCache.shutdownNow(); } + /** + * Waits the cache to terminate. Only returns if a) the internal reference count is reached to 0 + * and b) All network I/Os are finished. + */ public void awaitTermination() throws InterruptedException { try { uploadCache.awaitTermination().blockingAwait(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java index e0a5779f9d918c..ad7b165f723a8d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java @@ -43,14 +43,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import com.google.common.eventbus.Subscribe; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.Artifact; import com.google.devtools.build.lib.actions.ExecException; +import com.google.devtools.build.lib.actions.FileUploadEvent; import com.google.devtools.build.lib.actions.ForbiddenActionInputException; import com.google.devtools.build.lib.actions.Spawn; import com.google.devtools.build.lib.actions.Spawns; import com.google.devtools.build.lib.actions.UserExecException; import com.google.devtools.build.lib.analysis.platform.PlatformUtils; +import com.google.devtools.build.lib.buildtool.buildevent.BuildInterruptedEvent; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext; @@ -105,7 +108,7 @@ * cache and execution with spawn specific types. */ public class RemoteExecutionService { - private final AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicBoolean shutdown = new AtomicBoolean(false); private final Reporter reporter; private final boolean verboseFailures; private final Path execRoot; @@ -393,7 +396,7 @@ public ExecuteResponse getResponse() { @Nullable public RemoteActionResult lookupCache(RemoteAction action) throws IOException, InterruptedException { - checkState(!closed.get(), "closed"); + checkState(!shutdown.get(), "shutdown"); checkNotNull(remoteCache, "remoteCache can't be null"); ActionResult actionResult = remoteCache.downloadActionResult( @@ -410,7 +413,7 @@ public RemoteActionResult lookupCache(RemoteAction action) @Nullable public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult result) throws InterruptedException, IOException, ExecException { - checkState(!closed.get(), "closed"); + checkState(!shutdown.get(), "shutdown"); checkNotNull(remoteCache, "remoteCache can't be null"); RemoteOutputsMode remoteOutputsMode = remoteOptions.remoteOutputsMode; @@ -721,7 +724,7 @@ private Completable uploadOutputs(RemoteActionExecutionContext context, UploadMa /** Upload outputs of a remote action which was executed locally to remote cache. */ public void uploadOutputs(RemoteAction action) throws IOException, ExecException { - checkState(!closed.get(), "closed"); + checkState(!shutdown.get(), "shutdown"); checkNotNull(remoteCache, "remoteCache can't be null"); Collection outputFiles = action.spawn.getOutputFiles().stream() @@ -765,7 +768,16 @@ public void uploadOutputs(RemoteAction action) Completable completable = Completable.concatArray(uploadOutputsCompletable, uploadActionResultCompletable); - Completable.using(remoteCache::retain, r -> completable, RemoteCache::release) + Completable.using( + () -> { + reporter.post(FileUploadEvent.create(false)); + return remoteCache.retain(); + }, + r -> completable, + remoteCache -> { + reporter.post(FileUploadEvent.create(true)); + remoteCache.release(); + }) .subscribeOn(Schedulers.io()) .subscribe(reportUploadErrorObserver); } @@ -788,7 +800,7 @@ public void onError(@NonNull Throwable e) { private void reportUploadError(Throwable error) { if (remoteCacheInterrupted) { - // Ignore errors that are caused by manually interrupt + // If we interrupt manually, ignore cancellation errors. if (error instanceof CancellationException) { return; } @@ -819,7 +831,7 @@ private void reportUploadError(Throwable error) { */ public void uploadInputsIfNotPresent(RemoteAction action) throws IOException, InterruptedException { - checkState(!closed.get(), "closed"); + checkState(!shutdown.get(), "shutdown"); checkNotNull(remoteCache, "remoteCache can't be null"); checkState(remoteCache instanceof RemoteExecutionCache); @@ -841,7 +853,7 @@ public void uploadInputsIfNotPresent(RemoteAction action) public RemoteActionResult execute( RemoteAction action, boolean acceptCachedResult, OperationObserver observer) throws IOException, InterruptedException { - checkState(!closed.get(), "closed"); + checkState(!shutdown.get(), "shutdown"); checkNotNull(remoteExecutor, "remoteExecutor can't be null"); ExecuteRequest.Builder requestBuilder = @@ -876,7 +888,7 @@ public static class ServerLogs { /** Downloads server logs from a remotely executed action if any. */ public ServerLogs maybeDownloadServerLogs(RemoteAction action, ExecuteResponse resp, Path logDir) throws InterruptedException, IOException { - checkState(!closed.get(), "closed"); + checkState(!shutdown.get(), "shutdown"); checkNotNull(remoteCache, "remoteCache can't be null"); ServerLogs serverLogs = new ServerLogs(); @@ -901,20 +913,35 @@ public ServerLogs maybeDownloadServerLogs(RemoteAction action, ExecuteResponse r return serverLogs; } - public void close() { - if (!closed.compareAndSet(false, true)) { + @Subscribe + public void buildInterrupted(BuildInterruptedEvent event) { + remoteCacheInterrupted = true; + } + + /** + * Shuts the service down. Wait for active network I/O to finish but new requests are rejected. + */ + public void shutdown() { + if (!shutdown.compareAndSet(false, true)) { return; } if (remoteCache != null) { remoteCache.release(); + + if (remoteCacheInterrupted) { + Thread.currentThread().interrupt(); + } + try { remoteCache.awaitTermination(); } catch (InterruptedException e) { remoteCacheInterrupted = true; - reporter.handle(Event.warn("Upload interrupted")); + reporter.handle(Event.warn("remote cache interrupted")); remoteCache.shutdownNow(); } + + checkState(remoteCache.isClosed(), "remote cache is not closed properly."); } if (remoteExecutor != null) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/AsyncTaskCache.java b/src/main/java/com/google/devtools/build/lib/remote/util/AsyncTaskCache.java index 465cf9fb858037..eccf0ab26d6812 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/AsyncTaskCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/AsyncTaskCache.java @@ -304,7 +304,7 @@ public void onError(@NonNull Throwable e) { } /** - * Shuts the cache down. Any in progress tasks will continue running while new requests will be + * Shuts the cache down. Any in progress tasks will continue running while new tasks will be * injected with {@link CancellationException}. */ public void shutdown() { @@ -340,7 +340,8 @@ public Completable awaitTermination() { } /** - * Shuts the cache down. Any in progress tasks will be cancelled with {@link CancellationException}. + * Shuts the cache down. All in progress and new tasks will be cancelled with {@link + * CancellationException}. */ public void shutdownNow() { shutdown(); @@ -405,14 +406,25 @@ public int getSubscriberCount(KeyT key) { return cache.getSubscriberCount(key); } + /** + * Shuts the cache down. Any in progress tasks will continue running while new tasks will be + * injected with {@link CancellationException}. + */ public void shutdown() { cache.shutdown(); } + /** + * Returns a {@link Completable} which will complete once all the in progress tasks finished. + */ public Completable awaitTermination() { return cache.awaitTermination(); } + /** + * Shuts the cache down. All in progress and active tasks will be cancelled with {@link + * CancellationException}. + */ public void shutdownNow() { cache.shutdownNow(); } diff --git a/src/main/java/com/google/devtools/build/lib/runtime/UiEventHandler.java b/src/main/java/com/google/devtools/build/lib/runtime/UiEventHandler.java index 1c56e04ed90ca8..80d3c654c86dc4 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/UiEventHandler.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/UiEventHandler.java @@ -24,6 +24,7 @@ import com.google.devtools.build.lib.actions.ActionScanningCompletedEvent; import com.google.devtools.build.lib.actions.ActionStartedEvent; import com.google.devtools.build.lib.actions.CachingActionEvent; +import com.google.devtools.build.lib.actions.FileUploadEvent; import com.google.devtools.build.lib.actions.RunningActionEvent; import com.google.devtools.build.lib.actions.ScanningActionEvent; import com.google.devtools.build.lib.actions.SchedulingActionEvent; @@ -567,9 +568,8 @@ public void buildComplete(BuildCompleteEvent event) { ignoreRefreshLimitOnce(); refresh(); - // After a build has completed, only stop updating the UI if there is no more BEP - // upload happening. - if (stateTracker.pendingTransports() == 0) { + // After a build has completed, only stop updating the UI if there is no more activities. + if (stateTracker.shouldStopUpdateProgressBar()) { buildRunning = false; done = true; } @@ -721,6 +721,13 @@ public void testFilteringComplete(TestFilteringCompleteEvent event) { refresh(); } + @Subscribe + @AllowConcurrentEvents + public void fileUpload(FileUploadEvent event) { + stateTracker.fileUpload(event); + refreshSoon(); + } + /** * Return true, if the test summary provides information that is both worth being shown in the * scroll-back buffer and new with respect to the alreay shown failure messages. @@ -789,7 +796,7 @@ public void buildEventTransportClosed(BuildEventTransportClosedEvent event) { this.handle(Event.info(null, "Transport " + event.transport().name() + " closed")); } - if (stateTracker.pendingTransports() == 0) { + if (stateTracker.shouldStopUpdateProgressBar()) { stopUpdateThread(); flushStdOutStdErrBuffers(); ignoreRefreshLimitOnce(); diff --git a/src/main/java/com/google/devtools/build/lib/runtime/UiStateTracker.java b/src/main/java/com/google/devtools/build/lib/runtime/UiStateTracker.java index c709eb36316840..9cd3579f05bf2a 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/UiStateTracker.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/UiStateTracker.java @@ -30,6 +30,7 @@ import com.google.devtools.build.lib.actions.ActionStartedEvent; import com.google.devtools.build.lib.actions.Artifact; import com.google.devtools.build.lib.actions.CachingActionEvent; +import com.google.devtools.build.lib.actions.FileUploadEvent; import com.google.devtools.build.lib.actions.RunningActionEvent; import com.google.devtools.build.lib.actions.ScanningActionEvent; import com.google.devtools.build.lib.actions.SchedulingActionEvent; @@ -383,6 +384,9 @@ synchronized String describe() { private final Set bepOpenTransports = new HashSet<>(); // The point in time when closing of BEP transports was started. private long bepTransportClosingStartTimeMillis; + private long remoteCacheWaitStartTimeMillis; + private final AtomicInteger remoteCacheUploads = new AtomicInteger(0); + private final AtomicInteger remoteCacheDownloads = new AtomicInteger(0); UiStateTracker(Clock clock, int targetWidth) { this.activeActions = new ConcurrentHashMap<>(); @@ -464,6 +468,7 @@ void buildComplete(BuildCompleteEvent event) { buildComplete = true; // Build event protocol transports are closed right after the build complete event. bepTransportClosingStartTimeMillis = clock.currentTimeMillis(); + remoteCacheWaitStartTimeMillis = clock.currentTimeMillis(); if (event.getResult().getSuccess()) { status = "INFO"; @@ -963,6 +968,14 @@ synchronized void testFilteringComplete(TestFilteringCompleteEvent event) { } } + void fileUpload(FileUploadEvent event) { + if (event.finished()) { + remoteCacheUploads.decrementAndGet(); + } else { + remoteCacheUploads.incrementAndGet(); + } + } + public synchronized void testSummary(TestSummary summary) { completedTests++; mostRecentTest = summary; @@ -980,8 +993,11 @@ synchronized void buildEventTransportClosed(BuildEventTransportClosedEvent event bepOpenTransports.remove(event.transport()); } - synchronized int pendingTransports() { - return bepOpenTransports.size(); + synchronized boolean shouldStopUpdateProgressBar() { + return buildComplete + && bepOpenTransports.size() == 0 + && remoteCacheUploads.get() == 0 + && remoteCacheDownloads.get() == 0; } /** @@ -996,6 +1012,9 @@ boolean progressBarTimeDependent() { if (runningDownloads.size() >= 1) { return true; } + if (buildComplete && (remoteCacheUploads.get() != 0 || remoteCacheDownloads.get() != 0)) { + return true; + } if (buildComplete && !bepOpenTransports.isEmpty()) { return true; } @@ -1158,6 +1177,55 @@ private void maybeReportBepTransports(PositionAwareAnsiTerminalWriter terminalWr } } + /** + * Display any remote cache network I/Os that are still active after the build. Most likely, + * because uploading/downloading takes longer than the build itself. + */ + private void maybeReportActiveRemoteCacheIOs(PositionAwareAnsiTerminalWriter terminalWriter) + throws IOException { + if (!buildComplete) { + return; + } + + int uploads = remoteCacheUploads.get(); + int downloads = remoteCacheDownloads.get(); + + if (uploads == 0 && downloads == 0) { + return; + } + + long sinceSeconds = + MILLISECONDS.toSeconds(clock.currentTimeMillis() - remoteCacheWaitStartTimeMillis); + if (sinceSeconds == 0) { + // Special case for when bazel was interrupted, in which case we don't want to have a message. + return; + } + String waitSecs = "; " + sinceSeconds + "s"; + + String message = "Waiting for remote cache: "; + if (uploads != 0) { + if (uploads == 1) { + message += "1 upload"; + } else { + message += uploads + " uploads"; + } + } + + if (downloads != 0) { + if (uploads != 0) { + message += ", "; + } + + if (downloads == 1) { + message += "1 download"; + } else { + message += downloads + " downloads"; + } + } + + terminalWriter.newline().append(message + waitSecs); + } + synchronized void writeProgressBar( AnsiTerminalWriter rawTerminalWriter, boolean shortVersion, String timestamp) throws IOException { @@ -1187,6 +1255,7 @@ synchronized void writeProgressBar( } if (!shortVersion) { reportOnDownloads(terminalWriter); + maybeReportActiveRemoteCacheIOs(terminalWriter); maybeReportBepTransports(terminalWriter); } return; @@ -1259,6 +1328,7 @@ synchronized void writeProgressBar( } if (!shortVersion) { reportOnDownloads(terminalWriter); + maybeReportActiveRemoteCacheIOs(terminalWriter); maybeReportBepTransports(terminalWriter); } }