diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java index b16be3bbd4c0cf..c09934743d9036 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java @@ -142,6 +142,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; @@ -164,6 +165,7 @@ public class RemoteExecutionService { @Nullable private final Path captureCorruptedOutputsDir; private final Cache merkleTreeCache; private final Set reportedErrors = new HashSet<>(); + private final Phaser backgroundTaskPhaser = new Phaser(1); private final Scheduler scheduler; @@ -1160,13 +1162,18 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult) .subscribe( new SingleObserver() { @Override - public void onSubscribe(@NonNull Disposable d) {} + public void onSubscribe(@NonNull Disposable d) { + backgroundTaskPhaser.register(); + } @Override - public void onSuccess(@NonNull ActionResult actionResult) {} + public void onSuccess(@NonNull ActionResult actionResult) { + backgroundTaskPhaser.arriveAndDeregister(); + } @Override public void onError(@NonNull Throwable e) { + backgroundTaskPhaser.arriveAndDeregister(); reportUploadError(e); } }); @@ -1300,7 +1307,7 @@ public void shutdown() { remoteCache.release(); try { - remoteCache.awaitTermination(); + backgroundTaskPhaser.awaitAdvanceInterruptibly(backgroundTaskPhaser.arrive()); } catch (InterruptedException e) { buildInterrupted.set(true); remoteCache.shutdownNow();