Skip to content

Commit

Permalink
[FLINK-5465] [streaming] Wait for pending timer threads to finish or …
Browse files Browse the repository at this point in the history
…to exceed a time limit in exceptional stream task shutdown
  • Loading branch information
StefanRRichter committed Nov 23, 2017
1 parent 200612e commit 3c6f0f3
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 2 deletions.
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Defines the current processing time and handles all related actions,
Expand Down Expand Up @@ -93,4 +94,15 @@ public abstract class ProcessingTimeService {
* will result in a hard exception.
*/
public abstract void shutdownService();

/**
* Shuts down and clean up the timer service provider hard and immediately. This does wait
* for all timer to complete or until the time limit is exceeded. Any call to
* {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method.
* @param time time to wait for termination.
* @param timeUnit time unit of parameter time.
* @return {@code true} if this timer service and all pending timers are terminated and
* {@code false} if the timeout elapsed before this happened.
*/
public abstract boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException;
}
Expand Up @@ -69,6 +69,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -122,6 +123,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
/** The logger used by the StreamTask and its subclasses. */
private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);

private static final long TIMER_SERVICE_TERMINATION_AWAIT_MS = 7500L;

// ------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -319,9 +322,17 @@ public final void invoke() throws Exception {
isRunning = false;

// stop all timers and threads
if (timerService != null) {
if (timerService != null && !timerService.isTerminated()) {
try {
timerService.shutdownService();

// wait for a reasonable time for all pending timer threads to finish
boolean timerShutdownComplete =
timerService.shutdownAndAwaitPending(TIMER_SERVICE_TERMINATION_AWAIT_MS, TimeUnit.MILLISECONDS);

if (!timerShutdownComplete) {
LOG.warn("Timer service shutdown exceeded time limit while waiting for pending timers. " +
"Will continue with shutdown procedure.");
}
}
catch (Throwable t) {
// catch and log the exception to not replace the original exception
Expand Down
Expand Up @@ -191,6 +191,12 @@ public void shutdownService() {
}
}

@Override
public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
shutdownService();
return timerService.awaitTermination(time, timeUnit);
}

// safety net to destroy the thread pool
@Override
protected void finalize() throws Throwable {
Expand Down
Expand Up @@ -134,6 +134,12 @@ public void shutdownService() {
this.isTerminated = true;
}

@Override
public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
shutdownService();
return true;
}

public int getNumActiveTimers() {
int count = 0;

Expand Down
Expand Up @@ -22,11 +22,13 @@
import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -442,4 +444,67 @@ public void onProcessingTime(long timestamp) throws Exception {
latch.await();
assertTrue(exceptionWasThrown.get());
}

@Test
public void testShutdownAndWaitPending() {

final Object lock = new Object();
final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
final OneShotLatch blockUntilTerminationInterrupts = new OneShotLatch();
final OneShotLatch blockUntilTriggered = new OneShotLatch();
final AtomicBoolean check = new AtomicBoolean(true);

final SystemProcessingTimeService timeService = new SystemProcessingTimeService(
(message, exception) -> {
},
lock);

timeService.scheduleAtFixedRate(
timestamp -> {

waitUntilTimerStarted.trigger();

try {
blockUntilTerminationInterrupts.await();
check.set(false);
} catch (InterruptedException ignore) {
}

try {
blockUntilTriggered.await();
} catch (InterruptedException ignore) {
check.set(false);
}
},
0L,
10L);

try {
waitUntilTimerStarted.await();
} catch (InterruptedException e) {
Assert.fail();
}

Assert.assertFalse(timeService.isTerminated());

// Check that we wait for the timer to terminate. As the timer blocks on the second latch, this should time out.
try {
Assert.assertFalse(timeService.shutdownAndAwaitPending(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
Assert.fail("Unexpected interruption.");
}

// Let the timer proceed.
blockUntilTriggered.trigger();

// Now we should succeed in terminating the timer.
try {
Assert.assertTrue(timeService.shutdownAndAwaitPending(60, TimeUnit.SECONDS));
} catch (InterruptedException e) {
Assert.fail("Unexpected interruption.");
}

Assert.assertTrue(check.get());
Assert.assertTrue(timeService.isTerminated());
}
}

0 comments on commit 3c6f0f3

Please sign in to comment.