Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

fix: Watchdog does not does not wait for executor to shutdown on awaitTermination #1884

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 25 additions & 4 deletions gax/src/main/java/com/google/api/gax/rpc/Watchdog.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -66,7 +68,7 @@ public final class Watchdog implements Runnable, BackgroundResource {
// Dummy value to convert the ConcurrentHashMap into a Set
private static Object PRESENT = new Object();
private final ConcurrentHashMap<WatchdogStream, Object> openStreams = new ConcurrentHashMap<>();

private final Phaser phaser;
private final ApiClock clock;
private final Duration scheduleInterval;
private final ScheduledExecutorService executor;
Expand All @@ -84,6 +86,8 @@ private Watchdog(ApiClock clock, Duration scheduleInterval, ScheduledExecutorSer
this.clock = Preconditions.checkNotNull(clock, "clock can't be null");
this.scheduleInterval = scheduleInterval;
this.executor = executor;
// Register the main thread
this.phaser = new Phaser(1);
}

private void start() {
Expand Down Expand Up @@ -113,10 +117,15 @@ public <ResponseT> ResponseObserver<ResponseT> watch(

@Override
public void run() {
// Register the current thread
phaser.register();
try {
runUnsafe();
} catch (Throwable t) {
LOG.log(Level.SEVERE, "Caught throwable in periodic Watchdog run. Continuing.", t);
} finally {
// Unregister the current thread
phaser.arriveAndDeregister();
}
}

Expand All @@ -134,26 +143,37 @@ private void runUnsafe() {
@Override
public void shutdown() {
future.cancel(false);
// Unregister the main thread
phaser.arriveAndDeregister();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to avoid double calls:

if (future.cancel(true)) {
phaser.arriveAndDeregister();

}

}

@Override
public boolean isShutdown() {
return executor.isShutdown();
return future.isCancelled();
}

@Override
public boolean isTerminated() {
return executor.isTerminated();
return phaser.isTerminated();
}

@Override
public void shutdownNow() {
future.cancel(true);
// Unregister the main thread
phaser.arriveAndDeregister();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to avoid double calls:

if (future.cancel(true)) {
phaser.arriveAndDeregister();

}

}

@Override
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(duration, unit);
try {
// Default phase is 0, this method wait until all parties arrive and unregister, then
// terminate the Phaser
phaser.awaitAdvanceInterruptibly(0, duration, unit);
return true;
} catch (TimeoutException e) {
return false;
}
}

@Override
Expand All @@ -173,6 +193,7 @@ enum State {
}

class WatchdogStream<ResponseT> extends StateCheckingResponseObserver<ResponseT> {

private final Object lock = new Object();

private final Duration waitTimeout;
Expand Down
20 changes: 20 additions & 0 deletions gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,26 @@ public void testTimedOutBeforeStart() throws InterruptedException {
assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
public void awaitTermination_shouldReturnFalseIfShutDownIsNotCalledFirst() throws Exception {
watchdog.watch(new AccumulatingObserver<>(), waitTime, idleTime);
watchdog.watch(new AccumulatingObserver<>(), waitTime, idleTime);
boolean awaitTermination = watchdog.awaitTermination(1000, TimeUnit.MILLISECONDS);
assertThat(awaitTermination).isFalse();
}

@Test
public void awaitTermination_shouldReturnTrue() throws Exception {
watchdog.watch(new AccumulatingObserver<>(), waitTime, idleTime);
watchdog.watch(new AccumulatingObserver<>(), waitTime, idleTime);
// Make sure the run() method is run before calling shutdown()
Thread.sleep(2000);
watchdog.shutdown();
boolean awaitTermination = watchdog.awaitTermination(1000, TimeUnit.MILLISECONDS);
assertThat(awaitTermination).isTrue();
assertThat(watchdog.isTerminated()).isTrue();
}

@Test
public void testMultiple() throws Exception {
// Start stream1
Expand Down