Skip to content

Commit

Permalink
Move BEP upload timeout management to the BES module.
Browse files Browse the repository at this point in the history
Timeout management for the BEP transports is currently very ad-hoc. For the FileTransports we don't have a timeout at all and for the BES transport we create a separate future that cancels the upload in case bes_timeout is reached.

With this CL we unify the timeout management by wrapping the close Futures returned by the streamer with a Futures.withTimeout each. The timeout we use is the one specified by BuildEventTransport#getTimeout. Notice that this CL keeps the preserves the old behavior by setting the timeout of BES transport to bes_timeout and Duration.ZERO (e.g. no timeout) for FileTransport.

Another thing to notice when Futures.withTimeout times out it wraps the TimeoutException with an ExecutionException so we need to add some special-casing in the BES module to detect that case an print the right message in the terminal (instead of a cryptic TimeoutException stack trace).

PiperOrigin-RevId: 245325738
  • Loading branch information
lfpino authored and Copybara-Service committed Apr 25, 2019
1 parent 21401a2 commit 82f5090
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 83 deletions.
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.buildeventservice;


import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand All @@ -25,6 +26,7 @@
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
Expand Down Expand Up @@ -71,7 +73,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -86,9 +87,6 @@ public abstract class BuildEventServiceModule<BESOptionsT extends BuildEventServ
private static final Logger logger = Logger.getLogger(BuildEventServiceModule.class.getName());
private static final GoogleLogger googleLogger = GoogleLogger.forEnclosingClass();

private final AtomicReference<AbruptExitException> pendingAbruptExitException =
new AtomicReference<>();

private BuildEventProtocolOptions bepOptions;
private AuthAndTLSOptions authTlsOptions;
private BuildEventStreamOptions besStreamOptions;
Expand Down Expand Up @@ -160,6 +158,10 @@ private void cancelPendingUploads() {
halfCloseFuturesMap = ImmutableMap.of();
}

private static boolean isTimeoutException(ExecutionException e) {
return e.getCause() instanceof TimeoutException;
}

private void waitForPreviousInvocation() {
if (closeFuturesMap.isEmpty()) {
return;
Expand All @@ -184,15 +186,19 @@ private void waitForPreviousInvocation() {
getMaxWaitForPreviousInvocation().getSeconds());
cmdLineReporter.handle(Event.warn(msg));
googleLogger.atWarning().withCause(exception).log(msg);
} catch (ExecutionException exception) {
} catch (ExecutionException e) {
// Futures.withTimeout wraps the TimeoutException in an ExecutionException when the future
// times out.
String previousExceptionMsg =
isTimeoutException(e) ? "The Build Event Protocol upload timed out" : e.getMessage();
String msg =
String.format(
"Previous invocation failed to finish Build Event Protocol upload "
+ "with the following exception: '%s'. "
+ "Ignoring the failure and starting a new invocation...",
exception.getMessage());
previousExceptionMsg);
cmdLineReporter.handle(Event.warn(msg));
googleLogger.atWarning().withCause(exception).log(msg);
googleLogger.atWarning().withCause(e).log(msg);
} finally {
cancelPendingUploads();
}
Expand Down Expand Up @@ -289,13 +295,12 @@ public OutErr getOutputListener() {

private void forceShutdownBuildEventStreamer() {
streamer.close(AbortReason.INTERNAL);

closeFuturesMap = constructCloseFuturesMapWithTimeouts(streamer.getCloseFuturesMap());
try {
// TODO(b/130148250): Uninterruptibles.getUninterruptibly waits forever if no timeout is
// passed. We should fix this by waiting at most the value set by bes_timeout.
googleLogger.atInfo().log("Closing pending build event transports");
Uninterruptibles.getUninterruptibly(
Futures.allAsList(streamer.getCloseFuturesMap().values()));
Uninterruptibles.getUninterruptibly(Futures.allAsList(closeFuturesMap.values()));
} catch (ExecutionException e) {
googleLogger.atSevere().withCause(e).log("Failed to close a build event transport");
LoggingUtil.logToRemote(Level.SEVERE, "Failed to close a build event transport", e);
Expand All @@ -314,12 +319,6 @@ public void blazeShutdownOnCrash() {

@Override
public void blazeShutdown() {
AbruptExitException pendingException = pendingAbruptExitException.getAndSet(null);
if (pendingException != null) {
cancelPendingUploads();
return;
}

if (closeFuturesMap.isEmpty()) {
return;
}
Expand Down Expand Up @@ -380,6 +379,15 @@ private void waitForBuildEventTransportsToClose() throws AbruptExitException {
Uninterruptibles.getUninterruptibly(Futures.allAsList(closeFuturesMap.values()));
}
} catch (ExecutionException e) {
// Futures.withTimeout wraps the TimeoutException in an ExecutionException when the future
// times out.
if (isTimeoutException(e)) {
throw new AbruptExitException(
"The Build Event Protocol upload timed out",
ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
e);
}

Throwables.throwIfInstanceOf(e.getCause(), AbruptExitException.class);
throw new RuntimeException(
String.format(
Expand All @@ -394,9 +402,42 @@ private void waitForBuildEventTransportsToClose() throws AbruptExitException {
}
}

private static ImmutableMap<BuildEventTransport, ListenableFuture<Void>>
constructCloseFuturesMapWithTimeouts(
ImmutableMap<BuildEventTransport, ListenableFuture<Void>> bepTransportToCloseFuturesMap) {
ImmutableMap.Builder<BuildEventTransport, ListenableFuture<Void>> builder =
ImmutableMap.builder();

bepTransportToCloseFuturesMap.forEach(
(bepTransport, closeFuture) -> {
final ListenableFuture<Void> closeFutureWithTimeout;
if (bepTransport.getTimeout().isZero() || bepTransport.getTimeout().isNegative()) {
closeFutureWithTimeout = closeFuture;
} else {
final ScheduledExecutorService timeoutExecutor =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("bes-close-" + bepTransport.name() + "-%d")
.build());

closeFutureWithTimeout =
Futures.withTimeout(
closeFuture,
bepTransport.getTimeout().toMillis(),
TimeUnit.MILLISECONDS,
timeoutExecutor);
closeFutureWithTimeout.addListener(
() -> timeoutExecutor.shutdown(), MoreExecutors.directExecutor());
}
builder.put(bepTransport, closeFutureWithTimeout);
});

return builder.build();
}

private void closeBepTransports() throws AbruptExitException {
closeFuturesMap = streamer.getCloseFuturesMap();
halfCloseFuturesMap = streamer.getHalfClosedMap();
closeFuturesMap = constructCloseFuturesMapWithTimeouts(streamer.getCloseFuturesMap());
halfCloseFuturesMap = constructCloseFuturesMapWithTimeouts(streamer.getHalfClosedMap());
switch (besOptions.besUploadMode) {
case WAIT_FOR_UPLOAD_COMPLETE:
waitForBuildEventTransportsToClose();
Expand Down
Expand Up @@ -34,6 +34,7 @@
/** A {@link BuildEventTransport} that streams {@link BuildEvent}s to BuildEventService. */
public class BuildEventServiceTransport implements BuildEventTransport {
private final BuildEventServiceUploader besUploader;
private final Duration besTimeout;

private BuildEventServiceTransport(
BuildEventServiceClient besClient,
Expand All @@ -46,6 +47,7 @@ private BuildEventServiceTransport(
EventBus eventBus,
Duration closeTimeout,
Sleeper sleeper) {
this.besTimeout = closeTimeout;
this.besUploader =
new BuildEventServiceUploader.Builder()
.besClient(besClient)
Expand All @@ -57,7 +59,6 @@ private BuildEventServiceTransport(
.sleeper(sleeper)
.artifactGroupNamer(artifactGroupNamer)
.eventBus(eventBus)
.closeTimeout(closeTimeout)
.build();
}

Expand All @@ -81,6 +82,11 @@ public void sendBuildEvent(BuildEvent event) {
besUploader.enqueueEvent(event);
}

@Override
public Duration getTimeout() {
return besTimeout;
}

/** A builder for {@link BuildEventServiceTransport}. */
public static class Builder {
private BuildEventServiceClient besClient;
Expand Down
Expand Up @@ -15,7 +15,6 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED;
import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED;
import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS;
Expand Down Expand Up @@ -51,7 +50,6 @@
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.util.AbruptExitException;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.LoggingUtil;
import com.google.devtools.build.lib.util.Sleeper;
import com.google.devtools.build.v1.BuildStatus.Result;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
Expand All @@ -62,14 +60,11 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
Expand Down Expand Up @@ -100,7 +95,6 @@ public final class BuildEventServiceUploader implements Runnable {
private final BuildEventServiceProtoUtil besProtoUtil;
private final BuildEventProtocolOptions buildEventProtocolOptions;
private final boolean publishLifecycleEvents;
private final Duration closeTimeout;
private final Sleeper sleeper;
private final Clock clock;
private final ArtifactGroupNamer namer;
Expand Down Expand Up @@ -136,9 +130,6 @@ public final class BuildEventServiceUploader implements Runnable {
@GuardedBy("lock")
private Thread uploadThread;

@GuardedBy("lock")
private boolean interruptCausedByTimeout;

@GuardedBy("lock")
private boolean interruptCausedByCancel;

Expand All @@ -150,7 +141,6 @@ private BuildEventServiceUploader(
BuildEventServiceProtoUtil besProtoUtil,
BuildEventProtocolOptions buildEventProtocolOptions,
boolean publishLifecycleEvents,
Duration closeTimeout,
Sleeper sleeper,
Clock clock,
ArtifactGroupNamer namer,
Expand All @@ -160,7 +150,6 @@ private BuildEventServiceUploader(
this.besProtoUtil = besProtoUtil;
this.buildEventProtocolOptions = buildEventProtocolOptions;
this.publishLifecycleEvents = publishLifecycleEvents;
this.closeTimeout = closeTimeout;
this.sleeper = sleeper;
this.clock = clock;
this.namer = namer;
Expand Down Expand Up @@ -215,10 +204,6 @@ public ListenableFuture<Void> close() {
// Enqueue the last event which will terminate the upload.
eventQueue.addLast(new SendLastBuildEventCommand(nextSeqNum.getAndIncrement(), currentTime()));

if (!closeTimeout.isZero()) {
startCloseTimer(closeFuture, closeTimeout);
}

final SettableFuture<Void> finalCloseFuture = closeFuture;
closeFuture.addListener(
() -> {
Expand All @@ -232,13 +217,6 @@ public ListenableFuture<Void> close() {
return closeFuture;
}

private void closeOnTimeout() {
synchronized (lock) {
interruptCausedByTimeout = true;
closeNow();
}
}

private void closeOnCancel() {
synchronized (lock) {
interruptCausedByCancel = true;
Expand Down Expand Up @@ -288,14 +266,7 @@ public void run() {
logger.info("Aborting the BES upload due to having received an interrupt");
synchronized (lock) {
Preconditions.checkState(
interruptCausedByTimeout || interruptCausedByCancel,
"Unexpected interrupt on BES uploader thread");
if (interruptCausedByTimeout) {
logAndExitAbruptly(
"The Build Event Protocol upload timed out",
ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
e);
}
interruptCausedByCancel, "Unexpected interrupt on BES uploader thread");
}
} catch (StatusException e) {
logAndExitAbruptly(
Expand Down Expand Up @@ -605,32 +576,6 @@ private void ensureUploadThreadStarted() {
}
}

private void startCloseTimer(ListenableFuture<Void> closeFuture, Duration closeTimeout) {
Thread closeTimer =
new Thread(
() -> {
// Call closeOnTimeout() if the future does not complete within closeTimeout
try {
getUninterruptibly(closeFuture, closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
closeOnTimeout();
} catch (ExecutionException e) {
if (e.getCause() instanceof TimeoutException) {
// This is likely due to an internal timeout doing the local file uploading.
closeOnTimeout();
} else {
// This code only cares about calling closeOnTimeout() if the closeFuture does
// not complete within closeTimeout.
String failureMsg = "BES close failure";
logger.severe(failureMsg);
LoggingUtil.logToRemote(Level.SEVERE, failureMsg, e);
}
}
},
"bes-uploader-close-timer");
closeTimer.start();
}

private PathConverter waitForLocalFileUploads(SendRegularBuildEventCommand orderedBuildEvent)
throws LocalFileUploadException, InterruptedException {
try {
Expand Down Expand Up @@ -711,7 +656,6 @@ static class Builder {
private BuildEventServiceProtoUtil besProtoUtil;
private BuildEventProtocolOptions bepOptions;
private boolean publishLifecycleEvents;
private Duration closeTimeout;
private Sleeper sleeper;
private Clock clock;
private ArtifactGroupNamer artifactGroupNamer;
Expand Down Expand Up @@ -742,11 +686,6 @@ Builder publishLifecycleEvents(boolean value) {
return this;
}

Builder closeTimeout(Duration value) {
this.closeTimeout = value;
return this;
}

Builder clock(Clock value) {
this.clock = value;
return this;
Expand Down Expand Up @@ -774,12 +713,10 @@ BuildEventServiceUploader build() {
checkNotNull(besProtoUtil),
checkNotNull(bepOptions),
publishLifecycleEvents,
checkNotNull(closeTimeout),
checkNotNull(sleeper),
checkNotNull(clock),
checkNotNull(artifactGroupNamer),
checkNotNull(eventBus));
}
}
}

Expand Up @@ -15,6 +15,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import java.time.Duration;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -72,6 +73,14 @@ default ListenableFuture<Void> getHalfCloseFuture() {
return close();
}

/**
* Returns how long a caller should wait for the transport to finish uploading events and closing
* gracefully. Setting the timeout to {@link Duration#ZERO} means that there's no timeout.
*/
default Duration getTimeout() {
return Duration.ZERO;
}

@VisibleForTesting
@Nullable
BuildEventArtifactUploader getUploader();
Expand Down

0 comments on commit 82f5090

Please sign in to comment.