From edaa6b57cdc7fb1fbdb25570f8be4b32e3564bea Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 14 Dec 2016 14:53:11 +0100 Subject: [PATCH] [FLINK-4973] Let LatencyMarksEmitter use StreamTask's ProcessingTimeService The LatencyMarksEmitter class uses now the StreamTask's ProcessingTimeService to schedule latency mark emission. For that the ProcessingTimeService was extended to have the method scheduleAtFixedRate to schedule repeated tasks. The latency mark emission is such a repeated task. This closes #3008. --- .../streaming/api/operators/StreamSource.java | 48 +++-- .../runtime/tasks/ProcessingTimeService.java | 10 + .../tasks/SystemProcessingTimeService.java | 68 +++++++ .../tasks/TestProcessingTimeService.java | 156 +++++++++------ .../HeapInternalTimerServiceTest.java | 36 ++-- .../operators/StreamSourceOperatorTest.java | 85 +++++--- .../TestProcessingTimeServiceTest.java | 6 +- .../SystemProcessingTimeServiceTest.java | 188 +++++++++++++++++- 8 files changed, 474 insertions(+), 123 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 5a16db0f3a1be..84330b6142468 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -23,11 +23,10 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; /** * {@link StreamOperator} for streaming sources. @@ -62,8 +61,12 @@ public void run(final Object lockingObject, final Output> coll LatencyMarksEmitter latencyEmitter = null; if(getExecutionConfig().isLatencyTrackingEnabled()) { - latencyEmitter = new LatencyMarksEmitter<>(lockingObject, collector, getExecutionConfig().getLatencyTrackingInterval(), - getOperatorConfig().getVertexID(), getRuntimeContext().getIndexOfThisSubtask()); + latencyEmitter = new LatencyMarksEmitter<>( + getProcessingTimeService(), + collector, + getExecutionConfig().getLatencyTrackingInterval(), + getOperatorConfig().getVertexID(), + getRuntimeContext().getIndexOfThisSubtask()); } final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); @@ -121,28 +124,35 @@ protected boolean isCanceledOrStopped() { } private static class LatencyMarksEmitter { - private final ScheduledExecutorService scheduleExecutor; private final ScheduledFuture latencyMarkTimer; - public LatencyMarksEmitter(final Object lockingObject, final Output> output, long latencyTrackingInterval, final int vertexID, final int subtaskIndex) { - this.scheduleExecutor = Executors.newScheduledThreadPool(1); - this.latencyMarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - synchronized (lockingObject) { - output.emitLatencyMarker(new LatencyMarker(System.currentTimeMillis(), vertexID, subtaskIndex)); + public LatencyMarksEmitter( + final ProcessingTimeService processingTimeService, + final Output> output, + long latencyTrackingInterval, + final int vertexID, + final int subtaskIndex) { + + latencyMarkTimer = processingTimeService.scheduleAtFixedRate( + new ProcessingTimeCallback() { + @Override + public void onProcessingTime(long timestamp) throws Exception { + try { + // ProcessingTimeService callbacks are executed under the checkpointing lock + output.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex)); + } catch (Throwable t) { + // we catch the Throwables here so that we don't trigger the processing + // timer services async exception handler + LOG.warn("Error while emitting latency marker.", t); } - } catch (Throwable t) { - LOG.warn("Error while emitting latency marker", t); } - } - }, 0, latencyTrackingInterval, TimeUnit.MILLISECONDS); + }, + 0L, + latencyTrackingInterval); } public void close() { latencyMarkTimer.cancel(true); - scheduleExecutor.shutdownNow(); } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java index f64bead0c8225..240aba8026eb9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java @@ -55,6 +55,16 @@ public abstract class ProcessingTimeService { */ public abstract ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback target); + /** + * Registers a task to be executed repeatedly at a fixed rate. + * + * @param callback to be executed after the initial delay and then after each period + * @param initialDelay initial delay to start executing callback + * @param period after the initial delay after which the callback is executed + * @return Scheduled future representing the task to be executed repeatedly + */ + public abstract ScheduledFuture scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period); + /** * Returns true if the service has been shut down, false otherwise. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java index 071dbcead4fa0..abcb19bc9a6d2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; import java.util.concurrent.BlockingQueue; @@ -123,6 +124,33 @@ else if (status == STATUS_SHUTDOWN) { } } + @Override + public ScheduledFuture scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) { + long nextTimestamp = getCurrentProcessingTime() + initialDelay; + + // we directly try to register the timer and only react to the status on exception + // that way we save unnecessary volatile accesses for each timer + try { + return timerService.scheduleAtFixedRate( + new RepeatedTriggerTask(task, checkpointLock, callback, nextTimestamp, period), + initialDelay, + period, + TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + final int status = this.status.get(); + if (status == STATUS_QUIESCED) { + return new NeverCompleteFuture(initialDelay); + } + else if (status == STATUS_SHUTDOWN) { + throw new IllegalStateException("Timer service is shut down"); + } + else { + // something else happened, so propagate the exception + throw e; + } + } + } + @Override public boolean isTerminated() { return status.get() == STATUS_SHUTDOWN; @@ -196,6 +224,46 @@ public void run() { } } + /** + * Internal task which is repeatedly called by the processing time service. + */ + private static final class RepeatedTriggerTask implements Runnable { + private final Object lock; + private final ProcessingTimeCallback target; + private final long period; + private final AsyncExceptionHandler exceptionHandler; + + private long nextTimestamp; + + private RepeatedTriggerTask( + AsyncExceptionHandler exceptionHandler, + Object lock, + ProcessingTimeCallback target, + long nextTimestamp, + long period) { + this.lock = Preconditions.checkNotNull(lock); + this.target = Preconditions.checkNotNull(target); + this.period = period; + this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler); + + this.nextTimestamp = nextTimestamp; + } + + @Override + public void run() { + try { + synchronized (lock) { + target.onProcessingTime(nextTimestamp); + } + + nextTimestamp += period; + } catch (Throwable t) { + TimerException asyncException = new TimerException(t); + exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException); + } + } + } + // ------------------------------------------------------------------------ private static final class NeverCompleteFuture implements ScheduledFuture { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index 2ca287ad4ff42..3c33ad394ffcd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -17,18 +17,19 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.ArrayList; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Preconditions; + +import java.util.Comparator; import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; /** * This is a {@link ProcessingTimeService} used strictly for testing the @@ -36,38 +37,38 @@ * */ public class TestProcessingTimeService extends ProcessingTimeService { - private volatile long currentTime = 0; + private volatile long currentTime = 0L; private volatile boolean isTerminated; private volatile boolean isQuiesced; // sorts the timers by timestamp so that they are processed in the correct order. - private final Map> registeredTasks = new TreeMap<>(); + private final PriorityQueue> priorityQueue; + public TestProcessingTimeService() { + this.priorityQueue = new PriorityQueue<>(16, new Comparator>() { + @Override + public int compare(Tuple2 o1, Tuple2 o2) { + return Long.compare(o1.f0, o2.f0); + } + }); + } public void setCurrentTime(long timestamp) throws Exception { this.currentTime = timestamp; if (!isQuiesced) { - // decide which timers to fire and put them in a list - // we do not fire them here to be able to accommodate timers - // that register other timers. - - Iterator>> it = registeredTasks.entrySet().iterator(); - List>> toRun = new ArrayList<>(); - while (it.hasNext()) { - Map.Entry> t = it.next(); - if (t.getKey() <= this.currentTime) { - toRun.add(t); - it.remove(); - } - } - - // now do the actual firing. - for (Map.Entry> tasks: toRun) { - long now = tasks.getKey(); - for (ScheduledTimerFuture task: tasks.getValue()) { - task.getProcessingTimeCallback().onProcessingTime(now); + while (!priorityQueue.isEmpty() && currentTime >= priorityQueue.peek().f0) { + Tuple2 entry = priorityQueue.poll(); + + CallbackTask callbackTask = entry.f1; + + if (!callbackTask.isDone()) { + callbackTask.onProcessingTime(entry.f0); + + if (callbackTask instanceof PeriodicCallbackTask) { + priorityQueue.offer(Tuple2.of(((PeriodicCallbackTask)callbackTask).nextTimestamp(entry.f0), callbackTask)); + } } } } @@ -84,27 +85,38 @@ public ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback t throw new IllegalStateException("terminated"); } if (isQuiesced) { - return new ScheduledTimerFuture(null, -1); + return new CallbackTask(null); } + CallbackTask callbackTask = new CallbackTask(target); + if (timestamp <= currentTime) { try { - target.onProcessingTime(timestamp); + callbackTask.onProcessingTime(timestamp); } catch (Exception e) { throw new RuntimeException(e); } + } else { + priorityQueue.offer(Tuple2.of(timestamp, callbackTask)); } - ScheduledTimerFuture result = new ScheduledTimerFuture(target, timestamp); + return callbackTask; + } - List tasks = registeredTasks.get(timestamp); - if (tasks == null) { - tasks = new ArrayList<>(); - registeredTasks.put(timestamp, tasks); + @Override + public ScheduledFuture scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) { + if (isTerminated) { + throw new IllegalStateException("terminated"); } - tasks.add(result); + if (isQuiesced) { + return new CallbackTask(null); + } + + PeriodicCallbackTask periodicCallbackTask = new PeriodicCallbackTask(callback, period); - return result; + priorityQueue.offer(Tuple2.of(currentTime + initialDelay, periodicCallbackTask)); + + return periodicCallbackTask; } @Override @@ -116,7 +128,7 @@ public boolean isTerminated() { public void quiesceAndAwaitPending() { if (!isTerminated) { isQuiesced = true; - registeredTasks.clear(); + priorityQueue.clear(); } } @@ -125,35 +137,46 @@ public void shutdownService() { this.isTerminated = true; } - public int getNumRegisteredTimers() { + public int getNumActiveTimers() { int count = 0; - for (List tasks: registeredTasks.values()) { - count += tasks.size(); + + for (Tuple2 entry : priorityQueue) { + if (!entry.f1.isDone()) { + count++; + } } + return count; } - public Set getRegisteredTimerTimestamps() { + public Set getActiveTimerTimestamps() { Set actualTimestamps = new HashSet<>(); - for (List timerFutures : registeredTasks.values()) { - for (ScheduledTimerFuture timer : timerFutures) { - actualTimestamps.add(timer.getTimestamp()); + + for (Tuple2 entry : priorityQueue) { + if (!entry.f1.isDone()) { + actualTimestamps.add(entry.f0); } } + return actualTimestamps; } // ------------------------------------------------------------------------ - private class ScheduledTimerFuture implements ScheduledFuture { + private static class CallbackTask implements ScheduledFuture { - private final ProcessingTimeCallback processingTimeCallback; + protected final ProcessingTimeCallback processingTimeCallback; - private final long timestamp; + private AtomicReference state = new AtomicReference<>(CallbackTaskState.CREATED); - public ScheduledTimerFuture(ProcessingTimeCallback processingTimeCallback, long timestamp) { + private CallbackTask(ProcessingTimeCallback processingTimeCallback) { this.processingTimeCallback = processingTimeCallback; - this.timestamp = timestamp; + } + + public void onProcessingTime(long timestamp) throws Exception { + processingTimeCallback.onProcessingTime(timestamp); + + state.compareAndSet(CallbackTaskState.CREATED, CallbackTaskState.DONE); } @Override @@ -168,21 +191,17 @@ public int compareTo(Delayed o) { @Override public boolean cancel(boolean mayInterruptIfRunning) { - List scheduledTimerFutures = registeredTasks.get(timestamp); - if (scheduledTimerFutures != null) { - scheduledTimerFutures.remove(this); - } - return true; + return state.compareAndSet(CallbackTaskState.CREATED, CallbackTaskState.CANCELLED); } @Override public boolean isCancelled() { - throw new UnsupportedOperationException(); + return state.get() == CallbackTaskState.CANCELLED; } @Override public boolean isDone() { - throw new UnsupportedOperationException(); + return state.get() != CallbackTaskState.CREATED; } @Override @@ -195,12 +214,31 @@ public Object get(long timeout, TimeUnit unit) throws InterruptedException, Exec throw new UnsupportedOperationException(); } - public ProcessingTimeCallback getProcessingTimeCallback() { - return processingTimeCallback; + enum CallbackTaskState { + CREATED, + CANCELLED, + DONE + } + } + + private static class PeriodicCallbackTask extends CallbackTask { + + private final long period; + + private PeriodicCallbackTask(ProcessingTimeCallback processingTimeCallback, long period) { + super(processingTimeCallback); + Preconditions.checkArgument(period > 0L, "The period must be greater than 0."); + + this.period = period; + } + + @Override + public void onProcessingTime(long timestamp) throws Exception { + processingTimeCallback.onProcessingTime(timestamp); } - public long getTimestamp() { - return timestamp; + public long nextTimestamp(long currentTimestamp) { + return currentTimestamp + period; } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java index d753e4ede10b6..680f2accb7dff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java @@ -176,8 +176,8 @@ public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception { assertEquals(2, timerService.numProcessingTimeTimers("hello")); assertEquals(3, timerService.numProcessingTimeTimers("ciao")); - assertEquals(1, processingTimeService.getNumRegisteredTimers()); - assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L)); + assertEquals(1, processingTimeService.getNumActiveTimers()); + assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(10L)); processingTimeService.setCurrentTime(10); @@ -185,8 +185,8 @@ public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception { assertEquals(1, timerService.numProcessingTimeTimers("hello")); assertEquals(2, timerService.numProcessingTimeTimers("ciao")); - assertEquals(1, processingTimeService.getNumRegisteredTimers()); - assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L)); + assertEquals(1, processingTimeService.getNumActiveTimers()); + assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(20L)); processingTimeService.setCurrentTime(20); @@ -194,18 +194,18 @@ public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception { assertEquals(0, timerService.numProcessingTimeTimers("hello")); assertEquals(1, timerService.numProcessingTimeTimers("ciao")); - assertEquals(1, processingTimeService.getNumRegisteredTimers()); - assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L)); + assertEquals(1, processingTimeService.getNumActiveTimers()); + assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(30L)); processingTimeService.setCurrentTime(30); assertEquals(0, timerService.numProcessingTimeTimers()); - assertEquals(0, processingTimeService.getNumRegisteredTimers()); + assertEquals(0, processingTimeService.getNumActiveTimers()); timerService.registerProcessingTimeTimer("ciao", 40); - assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertEquals(1, processingTimeService.getNumActiveTimers()); } /** @@ -233,15 +233,15 @@ public void testRegisterEarlierProcessingTimerMovesPhysicalProcessingTimer() thr assertEquals(1, timerService.numProcessingTimeTimers()); - assertEquals(1, processingTimeService.getNumRegisteredTimers()); - assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L)); + assertEquals(1, processingTimeService.getNumActiveTimers()); + assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(20L)); timerService.registerProcessingTimeTimer("ciao", 10); assertEquals(2, timerService.numProcessingTimeTimers()); - assertEquals(1, processingTimeService.getNumRegisteredTimers()); - assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L)); + assertEquals(1, processingTimeService.getNumActiveTimers()); + assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(10L)); } /** @@ -266,8 +266,8 @@ public void testRegisteringProcessingTimeTimerInOnProcessingTimeDoesNotLeakPhysi assertEquals(1, timerService.numProcessingTimeTimers()); - assertEquals(1, processingTimeService.getNumRegisteredTimers()); - assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L)); + assertEquals(1, processingTimeService.getNumActiveTimers()); + assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(10L)); doAnswer(new Answer() { @Override @@ -279,8 +279,8 @@ public Object answer(InvocationOnMock invocation) throws Exception { processingTimeService.setCurrentTime(10); - assertEquals(1, processingTimeService.getNumRegisteredTimers()); - assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L)); + assertEquals(1, processingTimeService.getNumActiveTimers()); + assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(20L)); doAnswer(new Answer() { @Override @@ -294,8 +294,8 @@ public Object answer(InvocationOnMock invocation) throws Exception { assertEquals(1, timerService.numProcessingTimeTimers()); - assertEquals(1, processingTimeService.getNumRegisteredTimers()); - assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L)); + assertEquals(1, processingTimeService.getNumActiveTimers()); + assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(30L)); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java index e6004202ee857..b153de9a138a3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java @@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamSourceContexts; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -47,6 +46,7 @@ import org.mockito.stubbing.Answer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -181,42 +181,52 @@ public void run() { */ @Test public void testLatencyMarkEmission() throws Exception { - final long now = System.currentTimeMillis(); - final List output = new ArrayList<>(); + final long maxProcessingTime = 100L; + final long latencyMarkInterval = 10L; + + final TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService(); + testProcessingTimeService.setCurrentTime(0L); + final List processingTimes = Arrays.asList(1L, 10L, 11L, 21L, maxProcessingTime); + // regular stream source operator - final StoppableStreamSource> operator = - new StoppableStreamSource<>(new InfiniteSource()); + final StreamSource operator = + new StreamSource<>(new ProcessingTimeServiceSource(testProcessingTimeService, processingTimes)); // emit latency marks every 10 milliseconds. - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 10); - - // trigger an async cancel in a bit - new Thread("canceler") { - @Override - public void run() { - try { - Thread.sleep(200); - } catch (InterruptedException ignored) {} - operator.stop(); - } - }.start(); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, latencyMarkInterval, testProcessingTimeService); // run and wait to be stopped - operator.run(new Object(), new CollectorOutput(output)); + operator.run(new Object(), new CollectorOutput(output)); + + int numberLatencyMarkers = (int) (maxProcessingTime / latencyMarkInterval) + 1; + + assertEquals( + numberLatencyMarkers + 1, // + 1 is the final watermark element + output.size()); - // ensure that there has been some output - assertTrue(output.size() > 0); - // and that its only latency markers - for(StreamElement se: output) { + long timestamp = 0L; + + int i = 0; + // and that its only latency markers + a final watermark + for (; i < output.size() - 1; i++) { + StreamElement se = output.get(i); Assert.assertTrue(se.isLatencyMarker()); Assert.assertEquals(-1, se.asLatencyMarker().getVertexID()); Assert.assertEquals(0, se.asLatencyMarker().getSubtaskIndex()); - Assert.assertTrue(se.asLatencyMarker().getMarkedTime() >= now); + Assert.assertTrue(se.asLatencyMarker().getMarkedTime() == timestamp); + + timestamp += latencyMarkInterval; } + + Assert.assertTrue(output.get(i).isWatermark()); } + @Test + public void testLatencyMarksEmitterLifecycleIntegration() { + + } @Test public void testAutomaticWatermarkContext() throws Exception { @@ -341,4 +351,33 @@ public void stop() { running = false; } } + + private static final class ProcessingTimeServiceSource implements SourceFunction { + + private final TestProcessingTimeService processingTimeService; + private final List processingTimes; + + private boolean cancelled = false; + + private ProcessingTimeServiceSource(TestProcessingTimeService processingTimeService, List processingTimes) { + this.processingTimeService = processingTimeService; + this.processingTimes = processingTimes; + } + + @Override + public void run(SourceContext ctx) throws Exception { + for (Long processingTime : processingTimes) { + if (cancelled) { + break; + } + + processingTimeService.setCurrentTime(processingTime); + } + } + + @Override + public void cancel() { + cancelled = true; + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java index cd1f253a6d22e..29037586e7b7e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java @@ -76,13 +76,13 @@ public void onProcessingTime(long timestamp) { } }); - assertEquals(2, tp.getNumRegisteredTimers()); + assertEquals(2, tp.getNumActiveTimers()); tp.setCurrentTime(35); - assertEquals(1, tp.getNumRegisteredTimers()); + assertEquals(1, tp.getNumActiveTimers()); tp.setCurrentTime(40); - assertEquals(0, tp.getNumRegisteredTimers()); + assertEquals(0, tp.getNumActiveTimers()); tp.shutdownService(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index 766b3130e8d0c..50e438c796a96 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -21,8 +21,10 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; +import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -34,7 +36,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class SystemProcessingTimeServiceTest { +public class SystemProcessingTimeServiceTest extends TestLogger { @Test public void testTriggerHoldsLock() throws Exception { @@ -70,6 +72,134 @@ public void onProcessingTime(long timestamp) { } } + /** + * Tests that the schedule at fixed rate callback is called under the given lock + */ + @Test + public void testScheduleAtFixedRateHoldsLock() throws Exception { + + final Object lock = new Object(); + final AtomicReference errorRef = new AtomicReference<>(); + + final SystemProcessingTimeService timer = new SystemProcessingTimeService( + new ReferenceSettingExceptionHandler(errorRef), lock); + + final OneShotLatch awaitCallback = new OneShotLatch(); + + try { + assertEquals(0, timer.getNumTasksScheduled()); + + // schedule something + ScheduledFuture future = timer.scheduleAtFixedRate( + new ProcessingTimeCallback() { + @Override + public void onProcessingTime(long timestamp) { + assertTrue(Thread.holdsLock(lock)); + + awaitCallback.trigger(); + } + }, + 0L, + 100L); + + // wait until the first execution is active + awaitCallback.await(); + + // cancel periodic callback + future.cancel(true); + + // check that no asynchronous error was reported + if (errorRef.get() != null) { + throw new Exception(errorRef.get()); + } + } + finally { + timer.shutdownService(); + } + } + + /** + * Tests that SystemProcessingTimeService#scheduleAtFixedRate is actually triggered multiple + * times. + */ + @Test(timeout=10000) + public void testScheduleAtFixedRate() throws Exception { + final Object lock = new Object(); + final AtomicReference errorRef = new AtomicReference<>(); + final long period = 10L; + final int countDown = 3; + + final SystemProcessingTimeService timer = new SystemProcessingTimeService( + new ReferenceSettingExceptionHandler(errorRef), lock); + + final CountDownLatch countDownLatch = new CountDownLatch(countDown); + + try { + timer.scheduleAtFixedRate(new ProcessingTimeCallback() { + @Override + public void onProcessingTime(long timestamp) throws Exception { + countDownLatch.countDown(); + } + }, 0L, period); + + countDownLatch.await(); + + if (errorRef.get() != null) { + throw new Exception(errorRef.get()); + } + + } finally { + timer.shutdownService(); + } + } + + /** + * Tests that shutting down the SystemProcessingTimeService will also cancel the scheduled at + * fix rate future. + */ + @Test + public void testQuiesceAndAwaitingCancelsScheduledAtFixRateFuture() throws Exception { + final Object lock = new Object(); + final AtomicReference errorRef = new AtomicReference<>(); + final long period = 10L; + + final SystemProcessingTimeService timer = new SystemProcessingTimeService( + new ReferenceSettingExceptionHandler(errorRef), lock); + + try { + ScheduledFuture scheduledFuture = timer.scheduleAtFixedRate(new ProcessingTimeCallback() { + @Override + public void onProcessingTime(long timestamp) throws Exception { + } + }, 0L, period); + + assertFalse(scheduledFuture.isDone()); + + // this should cancel our future + timer.quiesceAndAwaitPending(); + + assertTrue(scheduledFuture.isCancelled()); + + scheduledFuture = timer.scheduleAtFixedRate(new ProcessingTimeCallback() { + @Override + public void onProcessingTime(long timestamp) throws Exception { + throw new Exception("Test exception."); + } + }, 0L, 100L); + + assertNotNull(scheduledFuture); + + assertEquals(0, timer.getNumTasksScheduled()); + + if (errorRef.get() != null) { + throw new Exception(errorRef.get()); + } + + } finally { + timer.shutdownService(); + } + } + @Test public void testImmediateShutdown() throws Exception { @@ -114,6 +244,21 @@ public void onProcessingTime(long timestamp) {} // expected } + try { + timer.scheduleAtFixedRate( + new ProcessingTimeCallback() { + @Override + public void onProcessingTime(long timestamp) {} + }, + 0L, + 100L); + + fail("should result in an exception"); + } + catch (IllegalStateException e) { + // expected + } + // obviously, we have an asynchronous interrupted exception assertNotNull(errorRef.get()); assertTrue(errorRef.get().getCause() instanceof InterruptedException); @@ -206,6 +351,18 @@ public void onProcessingTime(long timestamp) {} assertEquals(0, timer.getNumTasksScheduled()); + future = timer.scheduleAtFixedRate( + new ProcessingTimeCallback() { + @Override + public void onProcessingTime(long timestamp) throws Exception {} + }, 10000000000L, 50L); + + assertEquals(1, timer.getNumTasksScheduled()); + + future.cancel(false); + + assertEquals(0, timer.getNumTasksScheduled()); + // check that no asynchronous error was reported if (errorRef.get() != null) { throw new Exception(errorRef.get()); @@ -241,4 +398,33 @@ public void onProcessingTime(long timestamp) throws Exception { latch.await(); assertTrue(exceptionWasThrown.get()); } + + @Test + public void testExceptionReportingScheduleAtFixedRate() throws InterruptedException { + final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false); + final OneShotLatch latch = new OneShotLatch(); + final Object lock = new Object(); + + ProcessingTimeService timeServiceProvider = new SystemProcessingTimeService( + new AsyncExceptionHandler() { + @Override + public void handleAsyncException(String message, Throwable exception) { + exceptionWasThrown.set(true); + latch.trigger(); + } + }, lock); + + timeServiceProvider.scheduleAtFixedRate( + new ProcessingTimeCallback() { + @Override + public void onProcessingTime(long timestamp) throws Exception { + throw new Exception("Exception in Timer"); + } + }, + 0L, + 100L ); + + latch.await(); + assertTrue(exceptionWasThrown.get()); + } }