From 4fa6707259dfa6bb0414e54f2b13f0d6908864df Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Mon, 14 Nov 2016 10:53:18 +0800 Subject: [PATCH 1/5] [FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources This commit is the first part of making idle streaming sources in Flink possible. It introduces a new element, StreamStatus, that flows with other records in streams. StreamStatus elements are generated at the sources, and affect how operators advance their watermarks with the presence of idle sources. Prior to this commit, when advancing watermarks at downstream operators, the new min watermark is found by simply determining if the min watermark across all input channels has advanced. This resulted in watermark-stalling downstream operators when there are idle sources. With this change, operators can now mark input channels to be idle, and ignore them when advancing their watermark. This commit also includes refactoring of previous watermark forwarding logic into a single class, StatusWatermarkValve. OneInputStreamTasks, TwoInputStreamTasks, and AbstractStreamOperator use valves to help them determine how watermarks and stream statuses are forwarded. --- .../runtime/io/RecordWriterOutput.java | 24 +- .../runtime/io/StreamInputProcessor.java | 125 ++++-- .../runtime/io/StreamTwoInputProcessor.java | 216 +++++++--- ...mestampsAndPeriodicWatermarksOperator.java | 5 + ...stampsAndPunctuatedWatermarksOperator.java | 5 + .../runtime/streamrecord/StreamElement.java | 18 + .../streamrecord/StreamElementSerializer.java | 18 +- .../streamstatus/StatusWatermarkValve.java | 197 +++++++++ .../runtime/streamstatus/StreamStatus.java | 128 ++++++ .../runtime/tasks/OneInputStreamTask.java | 27 +- .../runtime/tasks/OperatorChain.java | 83 +++- .../runtime/tasks/StreamIterationTail.java | 12 +- .../runtime/tasks/StreamStatusProvider.java | 25 ++ .../streaming/runtime/tasks/StreamTask.java | 2 +- .../runtime/tasks/TwoInputStreamTask.java | 20 +- ...ampsAndPeriodicWatermarksOperatorTest.java | 2 - .../StatusWatermarkValveTest.java | 398 ++++++++++++++++++ .../streamstatus/StreamStatusTest.java | 80 ++++ .../runtime/tasks/OneInputStreamTaskTest.java | 40 +- .../runtime/tasks/TwoInputStreamTaskTest.java | 43 +- 20 files changed, 1306 insertions(+), 162 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamStatusProvider.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 26250315e0f29..ed925e76bf697 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -27,9 +27,11 @@ import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.StreamStatusProvider; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -43,11 +45,13 @@ public class RecordWriterOutput implements Output> { private SerializationDelegate serializationDelegate; + private final StreamStatusProvider streamStatusProvider; @SuppressWarnings("unchecked") public RecordWriterOutput( StreamRecordWriter>> recordWriter, - TypeSerializer outSerializer) { + TypeSerializer outSerializer, + StreamStatusProvider streamStatusProvider) { checkNotNull(recordWriter); @@ -62,6 +66,8 @@ public RecordWriterOutput( if (outSerializer != null) { serializationDelegate = new SerializationDelegate(outRecordSerializer); } + + this.streamStatusProvider = streamStatusProvider; } @Override @@ -79,7 +85,19 @@ public void collect(StreamRecord record) { @Override public void emitWatermark(Watermark mark) { serializationDelegate.setInstance(mark); - + + if (streamStatusProvider.getStreamStatus().equals(StreamStatus.ACTIVE)) { + try { + recordWriter.broadcastEmit(serializationDelegate); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + } + + public void emitStreamStatus(StreamStatus streamStatus) { + serializationDelegate.setInstance(streamStatus); + try { recordWriter.broadcastEmit(serializationDelegate); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index b3257a530f194..e0a6ea13f5161 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -43,20 +43,26 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorChain; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}. * *

- * This also keeps track of {@link Watermark} events and forwards them to event subscribers - * once the {@link Watermark} from all inputs advances. + * This internally uses a {@link StatusWatermarkValve} to keep track of {@link Watermark} and {@link StreamStatus} events, + * and forwards them to event subscribers once the {@link StatusWatermarkValve} determines the {@link Watermark} from + * all inputs has advanced, or that a {@link StreamStatus} needs to be propagated downstream to denote a status change. * *

- * Forwarding elements or watermarks must be protected by synchronizing on the given lock + * Forwarding elements, watermarks, or status status elements must be protected by synchronizing on the given lock * object. This ensures that we don't call methods on a {@link OneInputStreamOperator} concurrently * with the timer callback or other things. * @@ -69,29 +75,49 @@ public class StreamInputProcessor { private RecordDeserializer> currentRecordDeserializer; + private final DeserializationDelegate deserializationDelegate; + private final CheckpointBarrierHandler barrierHandler; - // We need to keep track of the channel from which a buffer came, so that we can - // appropriately map the watermarks to input channels - private int currentChannel = -1; + private final Object lock; - private boolean isFinished; + // ---------------- Status and Watermark Valve ------------------ - private final long[] watermarks; - private long lastEmittedWatermark; + /** + * Valve that controls how watermarks and stream statuses are forwarded. + */ + private StatusWatermarkValve statusWatermarkValve; - private final DeserializationDelegate deserializationDelegate; + /** Number of input channels the valve needs to handle. */ + private final int numInputChannels; + + /** + * The channel from which a buffer came, tracked so that we can appropriately map + * the watermarks and watermark statuses to channel indexes of the valve. + */ + private int currentChannel = -1; + + private final OperatorChain> operatorChain; + + private final OneInputStreamOperator streamOperator; + // ---------------- Metrics ------------------ + + private long lastEmittedWatermark; private Counter numRecordsIn; + private boolean isFinished; + @SuppressWarnings("unchecked") public StreamInputProcessor( InputGate[] inputGates, TypeSerializer inputSerializer, StatefulTask checkpointedTask, CheckpointingMode checkpointMode, + Object lock, IOManager ioManager, - Configuration taskManagerConfig) throws IOException { + Configuration taskManagerConfig, + OperatorChain> operatorChain) throws IOException { InputGate inputGate = InputGateUtil.createInputGate(inputGates); @@ -114,6 +140,8 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { if (checkpointedTask != null) { this.barrierHandler.registerCheckpointEventHandler(checkpointedTask); } + + this.lock = checkNotNull(lock); StreamElementSerializer ser = new StreamElementSerializer<>(inputSerializer); this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser); @@ -126,15 +154,19 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { ioManager.getSpillingDirectoriesPaths()); } - watermarks = new long[inputGate.getNumberOfInputChannels()]; - for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) { - watermarks[i] = Long.MIN_VALUE; - } - lastEmittedWatermark = Long.MIN_VALUE; + this.numInputChannels = inputGate.getNumberOfInputChannels(); + + this.lastEmittedWatermark = Long.MIN_VALUE; + + this.operatorChain = operatorChain; + this.streamOperator = operatorChain.getHeadOperator(); + + this.statusWatermarkValve = new StatusWatermarkValve( + numInputChannels, + new ForwardingValveOutputHandler(streamOperator, lock)); } - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - public boolean processInput(OneInputStreamOperator streamOperator, final Object lock) throws Exception { + public boolean processInput() throws Exception { if (isFinished) { return false; } @@ -155,22 +187,14 @@ public boolean processInput(OneInputStreamOperator streamOperator, final StreamElement recordOrMark = deserializationDelegate.getInstance(); if (recordOrMark.isWatermark()) { - long watermarkMillis = recordOrMark.asWatermark().getTimestamp(); - if (watermarkMillis > watermarks[currentChannel]) { - watermarks[currentChannel] = watermarkMillis; - long newMinWatermark = Long.MAX_VALUE; - for (long watermark: watermarks) { - newMinWatermark = Math.min(watermark, newMinWatermark); - } - if (newMinWatermark > lastEmittedWatermark) { - lastEmittedWatermark = newMinWatermark; - synchronized (lock) { - streamOperator.processWatermark(new Watermark(lastEmittedWatermark)); - } - } - } + // handle watermark + statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); continue; - } else if(recordOrMark.isLatencyMarker()) { + } else if (recordOrMark.isStreamStatus()) { + // handle stream status + statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel); + continue; + } else if (recordOrMark.isLatencyMarker()) { // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); @@ -247,4 +271,39 @@ public void cleanup() throws IOException { // cleanup the barrier handler resources barrierHandler.cleanup(); } + + private class ForwardingValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler { + private final OneInputStreamOperator operator; + private final Object lock; + + private ForwardingValveOutputHandler(final OneInputStreamOperator operator, final Object lock) { + this.operator = checkNotNull(operator); + this.lock = checkNotNull(lock); + } + + @Override + public void handleWatermark(Watermark watermark) { + try { + synchronized (lock) { + lastEmittedWatermark = watermark.getTimestamp(); + operator.processWatermark(watermark); + } + } catch (Exception e) { + throw new RuntimeException("Exception occurred while processing valve output watermark: ", e); + } + } + + @SuppressWarnings("unchecked") + @Override + public void handleStreamStatus(StreamStatus streamStatus) { + try { + synchronized (lock) { + operatorChain.setStreamStatus(streamStatus); + } + } catch (Exception e) { + throw new RuntimeException("Exception occurred while processing valve output stream status: ", e); + } + } + } + } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index e5aeec1ec5035..f0e22ed82af2b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -39,23 +39,29 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.tasks.OperatorChain; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. * *

- * This also keeps track of {@link org.apache.flink.streaming.api.watermark.Watermark} events and forwards them to event subscribers - * once the {@link org.apache.flink.streaming.api.watermark.Watermark} from all inputs advances. + * This internally uses a {@link StatusWatermarkValve} to keep track of {@link Watermark} and {@link StreamStatus} events, + * and forwards them to event subscribers once the {@link StatusWatermarkValve} determines the {@link Watermark} from + * all inputs has advanced, or that a {@link StreamStatus} needs to be propagated downstream to denote a status change. * *

- * Forwarding elements or watermarks must be protected by synchronizing on the given lock + * Forwarding elements, watermarks, or status status elements must be protected by synchronizing on the given lock * object. This ensures that we don't call methods on a {@link TwoInputStreamOperator} concurrently * with the timer callback or other things. * @@ -69,26 +75,50 @@ public class StreamTwoInputProcessor { private RecordDeserializer> currentRecordDeserializer; - // We need to keep track of the channel from which a buffer came, so that we can - // appropriately map the watermarks to input channels - private int currentChannel = -1; - - private boolean isFinished; + private final DeserializationDelegate deserializationDelegate1; + private final DeserializationDelegate deserializationDelegate2; private final CheckpointBarrierHandler barrierHandler; - private final long[] watermarks1; - private long lastEmittedWatermark1; + private final Object lock; - private final long[] watermarks2; - private long lastEmittedWatermark2; + // ---------------- Status and Watermark Valves ------------------ + /** + * Stream status for the two inputs. We need to keep track for determining when + * to forward stream status changes downstream. + */ + private StreamStatus firstStatus = StreamStatus.ACTIVE; + private StreamStatus secondStatus = StreamStatus.ACTIVE; + + /** + * Valves that control how watermarks and stream statuses from the 2 inputs are forwarded. + */ + private StatusWatermarkValve statusWatermarkValve1; + private StatusWatermarkValve statusWatermarkValve2; + + /** Number of input channels the valves need to handle. */ private final int numInputChannels1; + private final int numInputChannels2; - private final DeserializationDelegate deserializationDelegate1; - private final DeserializationDelegate deserializationDelegate2; + /** + * The channel from which a buffer came, tracked so that we can appropriately map + * the watermarks and watermark statuses to the correct channel index of the correct valve. + */ + private int currentChannel = -1; - @SuppressWarnings({"unchecked", "rawtypes"}) + private final OperatorChain> operatorChain; + + private final TwoInputStreamOperator streamOperator; + + // ---------------- Metrics ------------------ + + private long lastEmittedWatermark1; + private long lastEmittedWatermark2; + + private boolean isFinished; + + @SuppressWarnings("unchecked") public StreamTwoInputProcessor( Collection inputGates1, Collection inputGates2, @@ -96,8 +126,10 @@ public StreamTwoInputProcessor( TypeSerializer inputSerializer2, StatefulTask checkpointedTask, CheckpointingMode checkpointMode, + Object lock, IOManager ioManager, - Configuration taskManagerConfig) throws IOException { + Configuration taskManagerConfig, + OperatorChain> operatorChain) throws IOException { final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); @@ -120,6 +152,8 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { if (checkpointedTask != null) { this.barrierHandler.registerCheckpointEventHandler(checkpointedTask); } + + this.lock = checkNotNull(lock); StreamElementSerializer ser1 = new StreamElementSerializer<>(inputSerializer1); this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser1); @@ -142,19 +176,20 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { } this.numInputChannels1 = numInputChannels1; - int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1; + this.numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1; + + this.lastEmittedWatermark1 = Long.MIN_VALUE; + this.lastEmittedWatermark2 = Long.MIN_VALUE; + + this.operatorChain = operatorChain; + this.streamOperator = checkNotNull(operatorChain.getHeadOperator()); - watermarks1 = new long[numInputChannels1]; - Arrays.fill(watermarks1, Long.MIN_VALUE); - lastEmittedWatermark1 = Long.MIN_VALUE; + this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock)); + this.statusWatermarkValve2 = new StatusWatermarkValve(numInputChannels2, new ForwardingValveOutputHandler2(streamOperator, lock)); - watermarks2 = new long[numInputChannels2]; - Arrays.fill(watermarks2, Long.MIN_VALUE); - lastEmittedWatermark2 = Long.MIN_VALUE; } - @SuppressWarnings("unchecked") - public boolean processInput(TwoInputStreamOperator streamOperator, Object lock) throws Exception { + public boolean processInput() throws Exception { if (isFinished) { return false; } @@ -177,7 +212,11 @@ public boolean processInput(TwoInputStreamOperator streamOperator, if (currentChannel < numInputChannels1) { StreamElement recordOrWatermark = deserializationDelegate1.getInstance(); if (recordOrWatermark.isWatermark()) { - handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock); + statusWatermarkValve1.inputWatermark(recordOrWatermark.asWatermark(), currentChannel); + continue; + } + else if (recordOrWatermark.isStreamStatus()) { + statusWatermarkValve1.inputStreamStatus(recordOrWatermark.asStreamStatus(), currentChannel); continue; } else if (recordOrWatermark.isLatencyMarker()) { @@ -187,9 +226,10 @@ else if (recordOrWatermark.isLatencyMarker()) { continue; } else { + StreamRecord record = recordOrWatermark.asRecord(); synchronized (lock) { - streamOperator.setKeyContextElement1(recordOrWatermark.asRecord()); - streamOperator.processElement1(recordOrWatermark.asRecord()); + streamOperator.setKeyContextElement1(record); + streamOperator.processElement1(record); } return true; @@ -198,7 +238,11 @@ else if (recordOrWatermark.isLatencyMarker()) { else { StreamElement recordOrWatermark = deserializationDelegate2.getInstance(); if (recordOrWatermark.isWatermark()) { - handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock); + statusWatermarkValve2.inputWatermark(recordOrWatermark.asWatermark(), currentChannel - numInputChannels1); + continue; + } + else if (recordOrWatermark.isStreamStatus()) { + statusWatermarkValve2.inputStreamStatus(recordOrWatermark.asStreamStatus(), currentChannel - numInputChannels1); continue; } else if (recordOrWatermark.isLatencyMarker()) { @@ -208,9 +252,10 @@ else if (recordOrWatermark.isLatencyMarker()) { continue; } else { + StreamRecord record = recordOrWatermark.asRecord(); synchronized (lock) { - streamOperator.setKeyContextElement2(recordOrWatermark.asRecord()); - streamOperator.processElement2(recordOrWatermark.asRecord()); + streamOperator.setKeyContextElement2(record); + streamOperator.processElement2(record); } return true; } @@ -244,41 +289,6 @@ else if (recordOrWatermark.isLatencyMarker()) { } } - private void handleWatermark(TwoInputStreamOperator operator, Watermark mark, int channelIndex, Object lock) throws Exception { - if (channelIndex < numInputChannels1) { - long watermarkMillis = mark.getTimestamp(); - if (watermarkMillis > watermarks1[channelIndex]) { - watermarks1[channelIndex] = watermarkMillis; - long newMinWatermark = Long.MAX_VALUE; - for (long wm : watermarks1) { - newMinWatermark = Math.min(wm, newMinWatermark); - } - if (newMinWatermark > lastEmittedWatermark1) { - lastEmittedWatermark1 = newMinWatermark; - synchronized (lock) { - operator.processWatermark1(new Watermark(lastEmittedWatermark1)); - } - } - } - } else { - channelIndex = channelIndex - numInputChannels1; - long watermarkMillis = mark.getTimestamp(); - if (watermarkMillis > watermarks2[channelIndex]) { - watermarks2[channelIndex] = watermarkMillis; - long newMinWatermark = Long.MAX_VALUE; - for (long wm : watermarks2) { - newMinWatermark = Math.min(wm, newMinWatermark); - } - if (newMinWatermark > lastEmittedWatermark2) { - lastEmittedWatermark2 = newMinWatermark; - synchronized (lock) { - operator.processWatermark2(new Watermark(lastEmittedWatermark2)); - } - } - } - } - } - /** * Sets the metric group for this StreamTwoInputProcessor. * @@ -312,4 +322,76 @@ public void cleanup() throws IOException { // cleanup the barrier handler resources barrierHandler.cleanup(); } + + private class ForwardingValveOutputHandler1 implements StatusWatermarkValve.ValveOutputHandler { + private final TwoInputStreamOperator operator; + private final Object lock; + + private ForwardingValveOutputHandler1(final TwoInputStreamOperator operator, final Object lock) { + this.operator = checkNotNull(operator); + this.lock = checkNotNull(lock); + } + + @Override + public void handleWatermark(Watermark watermark) { + try { + synchronized (lock) { + lastEmittedWatermark1 = watermark.getTimestamp(); + operator.processWatermark1(watermark); + } + } catch (Exception e) { + throw new RuntimeException("Exception occurred while processing valve output watermark: ", e); + } + } + + @Override + public void handleStreamStatus(StreamStatus streamStatus) { + try { + synchronized (lock) { + if (!streamStatus.equals(firstStatus) && streamStatus.equals(secondStatus)) { + operatorChain.setStreamStatus(streamStatus); + } + firstStatus = streamStatus; + } + } catch (Exception e) { + throw new RuntimeException("Exception occurred while processing valve output stream status: ", e); + } + } + } + + private class ForwardingValveOutputHandler2 implements StatusWatermarkValve.ValveOutputHandler { + private final TwoInputStreamOperator operator; + private final Object lock; + + private ForwardingValveOutputHandler2(final TwoInputStreamOperator operator, final Object lock) { + this.operator = checkNotNull(operator); + this.lock = checkNotNull(lock); + } + + @Override + public void handleWatermark(Watermark watermark) { + try { + synchronized (lock) { + lastEmittedWatermark2 = watermark.getTimestamp(); + operator.processWatermark2(watermark); + } + } catch (Exception e) { + throw new RuntimeException("Exception occurred while processing valve output watermark: ", e); + } + } + + @Override + public void handleStreamStatus(StreamStatus streamStatus) { + try { + synchronized (lock) { + if (!streamStatus.equals(secondStatus) && streamStatus.equals(firstStatus)) { + operatorChain.setStreamStatus(streamStatus); + } + secondStatus = streamStatus; + } + } catch (Exception e) { + throw new RuntimeException("Exception occurred while processing valve output stream status: ", e); + } + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java index 4defb96f54dd4..086358034582e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java @@ -82,6 +82,11 @@ public void onProcessingTime(long timestamp) throws Exception { getProcessingTimeService().registerTimer(now + watermarkInterval, this); } + /** + * Override the base implementation to completely ignore watermarks propagated from + * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit + * watermarks from here). + */ @Override public void processWatermark(Watermark mark) throws Exception { // if we receive a Long.MAX_VALUE watermark we forward it since it is used diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java index ac85b8ad5eb58..3fc9f9dd18406 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java @@ -59,6 +59,11 @@ public void processElement(StreamRecord element) throws Exception { } } + /** + * Override the base implementation to completely ignore watermarks propagated from + * upstream (we rely only on the {@link AssignerWithPunctuatedWatermarks} to emit + * watermarks from here). + */ @Override public void processWatermark(Watermark mark) throws Exception { // if we receive a Long.MAX_VALUE watermark we forward it since it is used diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java index 62418bcd24f9e..643e240893d71 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; /** * An element in a data stream. Can be a record or a Watermark. @@ -35,6 +36,14 @@ public final boolean isWatermark() { return getClass() == Watermark.class; } + /** + * Checks whether this element is a stream status. + * @return True, if this element is a stream status, false otherwise. + */ + public final boolean isStreamStatus() { + return getClass() == StreamStatus.class; + } + /** * Checks whether this element is a record. * @return True, if this element is a record, false otherwise. @@ -70,6 +79,15 @@ public final Watermark asWatermark() { return (Watermark) this; } + /** + * Casts this element into a StreamStatus. + * @return This element as a StreamStatus. + * @throws java.lang.ClassCastException Thrown, if this element is actually not a Stream Status. + */ + public final StreamStatus asStreamStatus() { + return (StreamStatus) this; + } + /** * Casts this element into a LatencyMarker. * @return This element as a LatencyMarker. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index 66d32da40f344..3db649ae724d7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -23,13 +23,14 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import java.io.IOException; import static java.util.Objects.requireNonNull; /** - * Serializer for {@link StreamRecord}, {@link Watermark} and {@link LatencyMarker}. + * Serializer for {@link StreamRecord}, {@link Watermark}, {@link LatencyMarker}, and {@link StreamStatus}. * *

* This does not behave like a normal {@link TypeSerializer}, instead, this is only used at the @@ -46,6 +47,7 @@ public final class StreamElementSerializer extends TypeSerializer typeSerializer; @@ -98,7 +100,7 @@ public StreamElement copy(StreamElement from) { StreamRecord fromRecord = from.asRecord(); return fromRecord.copy(typeSerializer.copy(fromRecord.getValue())); } - else if (from.isWatermark() || from.isLatencyMarker()) { + else if (from.isWatermark() || from.isStreamStatus() || from.isLatencyMarker()) { // is immutable return from; } @@ -117,7 +119,7 @@ public StreamElement copy(StreamElement from, StreamElement reuse) { fromRecord.copyTo(valueCopy, reuseRecord); return reuse; } - else if (from.isWatermark() || from.isLatencyMarker()) { + else if (from.isWatermark() || from.isStreamStatus() || from.isLatencyMarker()) { // is immutable return from; } @@ -142,6 +144,9 @@ else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { else if (tag == TAG_WATERMARK) { target.writeLong(source.readLong()); } + else if (tag == TAG_STREAM_STATUS) { + target.writeInt(source.readInt()); + } else if (tag == TAG_LATENCY_MARKER) { target.writeLong(source.readLong()); target.writeInt(source.readInt()); @@ -168,6 +173,10 @@ else if (value.isWatermark()) { target.write(TAG_WATERMARK); target.writeLong(value.asWatermark().getTimestamp()); } + else if (value.isStreamStatus()) { + target.write(TAG_STREAM_STATUS); + target.writeInt(value.asStreamStatus().getStatus()); + } else if (value.isLatencyMarker()) { target.write(TAG_LATENCY_MARKER); target.writeLong(value.asLatencyMarker().getMarkedTime()); @@ -192,6 +201,9 @@ else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { else if (tag == TAG_WATERMARK) { return new Watermark(source.readLong()); } + else if (tag == TAG_STREAM_STATUS) { + return new StreamStatus(source.readInt()); + } else if (tag == TAG_LATENCY_MARKER) { return new LatencyMarker(source.readLong(), source.readInt(), source.readInt()); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java new file mode 100644 index 0000000000000..349fcca295c18 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamstatus; + +import org.apache.flink.streaming.api.watermark.Watermark; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@code StatusWatermarkValve} embodies the logic of how {@link Watermark} and {@link StreamStatus} are propagated to + * downstream outputs, given a set of one or multiple input channels that continuously receive them. Usages of this + * class need to define the number of input channels that the valve needs to handle, as well as provide a customized + * implementation of {@link ValveOutputHandler}, which is called by the valve only when it determines a new watermark or + * stream status can be propagated. + */ +public class StatusWatermarkValve { + + /** + * Usages of {@code StatusWatermarkValve} should implement a {@code ValveOutputHandler} + * to handle watermark and stream status outputs from the valve. + */ + public interface ValveOutputHandler { + void handleWatermark(Watermark watermark); + void handleStreamStatus(StreamStatus streamStatus); + } + + private final ValveOutputHandler outputHandler; + + // ------------------------------------------------------------------------ + // Runtime state for watermark & stream status output determination + // ------------------------------------------------------------------------ + + /** Array of current status of all input channels. Changes as watermarks & stream statuses are fed into the valve */ + private final InputChannelStatus[] channelStatuses; + + /** The last watermark emitted from the valve */ + private long lastOutputWatermark; + + /** The last stream status emitted from the valve */ + private StreamStatus lastOutputStreamStatus; + + /** + * Returns a new {@code StatusWatermarkValve}. + * + * @param numInputChannels the number of input channels that this valve will need to handle + * @param outputHandler the customized output handler for the valve + */ + public StatusWatermarkValve(int numInputChannels, ValveOutputHandler outputHandler) { + checkArgument(numInputChannels > 0); + this.channelStatuses = new InputChannelStatus[numInputChannels]; + for (int i = 0; i < numInputChannels; i++) { + channelStatuses[i] = new InputChannelStatus(); + channelStatuses[i].watermark = Long.MIN_VALUE; + channelStatuses[i].streamStatus = StreamStatus.ACTIVE; + channelStatuses[i].isWatermarkAligned = true; + } + + this.outputHandler = checkNotNull(outputHandler); + + this.lastOutputWatermark = Long.MIN_VALUE; + this.lastOutputStreamStatus = StreamStatus.ACTIVE; + } + + /** + * Feed a {@link Watermark} into the valve. If the input triggers the valve to output a new Watermark, + * {@link ValveOutputHandler#handleWatermark(Watermark)} will be called to process the new Watermark. + * + * @param watermark the watermark to feed to the valve + * @param channelIndex the index of the channel that the fed watermark belongs to (index starting from 0) + */ + public void inputWatermark(Watermark watermark, int channelIndex) { + // ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle). + if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) { + long watermarkMillis = watermark.getTimestamp(); + + // if the input watermark's value is less than the last received watermark for its input channel, ignore it also. + if (watermarkMillis > channelStatuses[channelIndex].watermark) { + channelStatuses[channelIndex].watermark = watermarkMillis; + + // previously unaligned input channels are now aligned if its watermark has caught up + if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) { + channelStatuses[channelIndex].isWatermarkAligned = true; + } + + // now, attempt to find a new min watermark across all aligned channels + findAndOutputNewMinWatermarkAcrossAlignedChannels(); + } + } + } + + /** + * Feed a {@link StreamStatus} into the valve. This may trigger the valve to output either a new Stream Status, + * for which {@link ValveOutputHandler#handleStreamStatus(StreamStatus)} will be called, or a new Watermark, + * for which {@link ValveOutputHandler#handleWatermark(Watermark)} will be called. + * + * @param streamStatus the stream status to feed to the valve + * @param channelIndex the index of the channel that the fed stream status belongs to (index starting from 0) + */ + public void inputStreamStatus(StreamStatus streamStatus, int channelIndex) { + // only account for stream status inputs that will result in a status change for the input channel + if (streamStatus.isIdle() && channelStatuses[channelIndex].streamStatus.isActive()) { + // handle active -> idle toggle for the input channel + channelStatuses[channelIndex].streamStatus = StreamStatus.IDLE; + + // the channel is now idle, therefore not aligned + channelStatuses[channelIndex].isWatermarkAligned = false; + + // if all input channels of the valve is now idle, we need to output an idle stream + // status from the valve (this also marks the valve as idle) + if (!InputChannelStatus.hasActiveChannels(channelStatuses)) { + lastOutputStreamStatus = StreamStatus.IDLE; + outputHandler.handleStreamStatus(lastOutputStreamStatus); + } else if (channelStatuses[channelIndex].watermark == lastOutputWatermark) { + // if the watermark of the channel that just became idle equals the last output + // watermark (the previous overall min watermark), we may be able to find a new + // min watermark from the remaining aligned channels + findAndOutputNewMinWatermarkAcrossAlignedChannels(); + } + } else if (streamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isIdle()) { + // handle idle -> active toggle for the input channel + channelStatuses[channelIndex].streamStatus = StreamStatus.ACTIVE; + + // if the last watermark of the input channel, before it was marked idle, is still larger than + // the overall last output watermark of the valve, then we can set the channel to be aligned already. + if (channelStatuses[channelIndex].watermark >= lastOutputWatermark) { + channelStatuses[channelIndex].isWatermarkAligned = true; + } + + // if the valve was previously marked to be idle, mark it as active and output an active stream + // status because at least one of the input channels is now active + if (lastOutputStreamStatus.isIdle()) { + lastOutputStreamStatus = StreamStatus.ACTIVE; + outputHandler.handleStreamStatus(lastOutputStreamStatus); + } + } + } + + private void findAndOutputNewMinWatermarkAcrossAlignedChannels() { + long newMinWatermark = Long.MAX_VALUE; + + // determine new overall watermark by considering only watermark-aligned channels across all channels + for (InputChannelStatus channelStatus : channelStatuses) { + if (channelStatus.isWatermarkAligned) { + newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); + } + } + + // we acknowledge and output the new overall watermark if it is larger than the last output watermark + if (newMinWatermark > lastOutputWatermark) { + lastOutputWatermark = newMinWatermark; + outputHandler.handleWatermark(new Watermark(lastOutputWatermark)); + } + } + + /** + * An {@code InputChannelStatus} keeps track of an input channel's last watermark, stream status, and whether or not + * the channel's current watermark is aligned with the overall watermark output from the valve. + * + * There are 2 situations where a channel's watermark is not considered aligned: + * - the current stream status of the channel is idle + * - the stream status has resumed to be active, but the watermark of the channel hasn't caught up to the + * last output watermark from the valve yet. + */ + private static class InputChannelStatus { + private long watermark; + private StreamStatus streamStatus; + private boolean isWatermarkAligned; + + /** Utility to check if at least one channel in a given array of input channels is active */ + private static boolean hasActiveChannels(InputChannelStatus[] channelStatuses) { + for (InputChannelStatus status : channelStatuses) { + if (status.streamStatus.isActive()) { + return true; + } + } + return false; + } + } + +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java new file mode 100644 index 0000000000000..efc2730a6015f --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamstatus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; + +/** + * A Stream Status element informs operators whether or not they should continue to expect records and watermarks + * from the input stream that sent them. There are 2 kinds of status, namely {@link StreamStatus#IDLE} and + * {@link StreamStatus#ACTIVE}. Stream Status elements are generated at the sources, and may be propagated through + * the operators of the topology using {@link org.apache.flink.streaming.api.operators.Output#emitStreamStatus(StreamStatus)}. + * They directly infer the current status of the emitting source or operator; a source or operator emits a + * {@link StreamStatus#IDLE} if it will temporarily halt to emit any records or watermarks (i.e. is idle), and emits a + * {@link StreamStatus#ACTIVE} once it resumes to do so (i.e. is active). The cases that sources and downstream operators + * are considered either idle or active is explained below: + * + *

    + *
  • Sources: A source is considered to be idle if it will not emit records for an indefinite amount of time. This + * is the case, for example, for Flink's Kafka Consumer, where sources might initially have no assigned partitions + * to read from, or no records can be read from the assigned partitions. Once the source detects that it will + * resume emitting data, it is considered to be active. Sources are responsible for ensuring that no records (and + * possibly watermarks, in the case of Flink's Kafka Consumer which can generate watermarks directly within the + * source) will be emitted in between a consecutive {@link StreamStatus#IDLE} and {@link StreamStatus#ACTIVE}. + * This guarantee should be enforced on sources through + * {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext} implementations.
  • + * + *
  • Downstream operators: a downstream operator is considered to be idle if all of its input streams are idle, + * i.e. the last received Stream Status element from all input streams is a {@link StreamStatus#IDLE}. As long + * as one of its input streams is active, i.e. the last received Stream Status element from the input stream is + * {@link StreamStatus#ACTIVE}, the operator is active. Operators are responsible for propagating their status + * further downstream once they toggle between being idle and active.
  • + *
+ * + *

+ * Stream Status elements received at downstream operators also affect and controls how they process and advance their + * watermarks. The below describes the effects (the logic is implemented as a {@link StatusWatermarkValve} which + * downstream operators should use for such purposes): + * + *

    + *
  • Since sources guarantee that no records will be emitted between a {@link StreamStatus#IDLE} and + * {@link StreamStatus#ACTIVE}, downstream operators can always safely process and propagate records when they + * receive them, without the need to check whether or not the operator is currently idle or active. However, for + * watermarks, since there may be {@link TimestampsAndPeriodicWatermarksOperator}s that might produce watermarks + * anywhere in the middle of topologies, regardless of whether there are input data at the operator, all + * downstream operators need to check whether or not they are actually active before processing a received + * watermark.
  • + * + *
  • For downstream operators with multiple input streams (ex. head operators of a + * {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask} or + * {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}, or any + * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator}), the watermarks of input streams + * that are temporarily idle, or has resumed to be active but its watermark is behind the overall min watermark + * of the operator, should not be accounted for when deciding whether or not to advance the operator's + * watermark.
  • + *
+ * + *

+ * Note that to notify downstream operators that a source is permanently closed and will no longer send any more elements, + * the source should still send a {@link org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK} instead of + * {@link StreamStatus#IDLE}. Stream Status elements only serve as markers for temporary status. + */ +@PublicEvolving +public final class StreamStatus extends StreamElement { + + public static final int IDLE_STATUS = -1; + public static final int ACTIVE_STATUS = 0; + + public static final StreamStatus IDLE = new StreamStatus(IDLE_STATUS); + public static final StreamStatus ACTIVE = new StreamStatus(ACTIVE_STATUS); + + public final int status; + + public StreamStatus(int status) { + if (status != IDLE_STATUS && status != ACTIVE_STATUS) { + throw new IllegalArgumentException("Invalid status value for StreamStatus; " + + "allowed values are " + ACTIVE_STATUS + " (for ACTIVE) and " + IDLE_STATUS + " (for IDLE)."); + } + + this.status = status; + } + + public boolean isIdle() { + return this.status == IDLE_STATUS; + } + + public boolean isActive() { + return !isIdle(); + } + + public int getStatus() { + return status; + } + + @Override + public boolean equals(Object o) { + return this == o || + o != null && o.getClass() == StreamStatus.class && ((StreamStatus) o).status == this.status; + } + + @Override + public int hashCode() { + return status; + } + + @Override + public String toString() { + String statusStr = (status == ACTIVE_STATUS) ? "ACTIVE" : "IDLE"; + return "StreamStatus(" + statusStr + ")"; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 0f41103d5fa49..8770b7e3d4a87 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.io.StreamInputProcessor; +import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor; @Internal public class OneInputStreamTask extends StreamTask> { @@ -35,18 +36,26 @@ public class OneInputStreamTask extends StreamTask inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader()); int numberOfInputs = configuration.getNumberOfInputs(); if (numberOfInputs > 0) { InputGate[] inputGates = getEnvironment().getAllInputGates(); - inputProcessor = new StreamInputProcessor( - inputGates, inSerializer, - this, + + @SuppressWarnings("unchecked") + OperatorChain> operatorChain = + (OperatorChain) this.operatorChain; + + inputProcessor = new StreamInputProcessor<>( + inputGates, + inSerializer, + this, configuration.getCheckpointMode(), + getCheckpointLock(), getEnvironment().getIOManager(), - getEnvironment().getTaskManagerInfo().getConfiguration()); + getEnvironment().getTaskManagerInfo().getConfiguration(), + operatorChain); // make sure that stream tasks report their I/O statistics inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); @@ -55,12 +64,10 @@ public void init() throws Exception { @Override protected void run() throws Exception { - // cache some references on the stack, to make the code more JIT friendly - final OneInputStreamOperator operator = this.headOperator; + // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessor inputProcessor = this.inputProcessor; - final Object lock = getCheckpointLock(); - - while (running && inputProcessor.processInput(operator, lock)) { + + while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 7771064e2e2ad..3aaea200a5992 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -30,13 +30,14 @@ import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput; import org.apache.flink.streaming.api.collector.selector.DirectedOutput; import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.StreamRecordWriter; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; @@ -60,7 +61,7 @@ * head operator. */ @Internal -public class OperatorChain> { +public class OperatorChain> implements StreamStatusProvider{ private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class); @@ -72,6 +73,15 @@ public class OperatorChain> { private final OP headOperator; + /** + * This output keeps track of the current status, in order to block + * any watermarks explicitly generated at concrete implementations + * when the operator is actually idle. This may happen, since + * timestamp assigner / watermark emitting operators will + * completely bypass the valve's watermark output logic. + */ + private StreamStatus streamStatus = StreamStatus.ACTIVE; + public OperatorChain(StreamTask containingTask) { final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader(); @@ -133,6 +143,22 @@ public OperatorChain(StreamTask containingTask) { } + @Override + public StreamStatus getStreamStatus() { + return streamStatus; + } + + public void setStreamStatus(StreamStatus status) throws IOException { + if (!status.equals(this.streamStatus)) { + this.streamStatus = status; + + // try and forward the stream status change to all outgoing connections + for (RecordWriterOutput streamOutput : streamOutputs) { + streamOutput.emitStreamStatus(status); + } + } + } + public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException { try { @@ -217,7 +243,7 @@ public int getChainLength() { // initialization utilities // ------------------------------------------------------------------------ - private static Output> createOutputCollector( + private Output> createOutputCollector( StreamTask containingTask, StreamConfig operatorConfig, Map chainedConfigs, @@ -268,9 +294,9 @@ private static Output> createOutputCollector( // If the chaining output does not copy we need to copy in the broadcast output, // otherwise multi-chaining would not work correctly. if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - return new CopyingBroadcastingOutputCollector<>(asArray); + return new CopyingBroadcastingOutputCollector<>(asArray, this); } else { - return new BroadcastingOutputCollector<>(asArray); + return new BroadcastingOutputCollector<>(asArray, this); } } } @@ -289,7 +315,7 @@ private static Output> createOutputCollector( } } - private static Output> createChainedOperator( + private Output> createChainedOperator( StreamTask containingTask, StreamConfig operatorConfig, Map chainedConfigs, @@ -308,15 +334,15 @@ private static Output> createChainedOperator( allOperators.add(chainedOperator); if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - return new ChainingOutput<>(chainedOperator); + return new ChainingOutput<>(chainedOperator, this); } else { TypeSerializer inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); - return new CopyingChainingOutput<>(chainedOperator, inSerializer); + return new CopyingChainingOutput<>(chainedOperator, inSerializer, this); } } - private static RecordWriterOutput createStreamOutput( + private RecordWriterOutput createStreamOutput( StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, Environment taskEnvironment, String taskName) @@ -334,7 +360,7 @@ private static RecordWriterOutput createStreamOutput( new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout()); output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup()); - return new RecordWriterOutput<>(output, outSerializer); + return new RecordWriterOutput<>(output, outSerializer, this); } // ------------------------------------------------------------------------ @@ -346,9 +372,12 @@ private static class ChainingOutput implements Output> { protected final OneInputStreamOperator operator; protected final Counter numRecordsIn; - public ChainingOutput(OneInputStreamOperator operator) { + protected final StreamStatusProvider streamStatusProvider; + + public ChainingOutput(OneInputStreamOperator operator, StreamStatusProvider streamStatusProvider) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + this.streamStatusProvider = streamStatusProvider; } @Override @@ -366,7 +395,9 @@ public void collect(StreamRecord record) { @Override public void emitWatermark(Watermark mark) { try { - operator.processWatermark(mark); + if (streamStatusProvider.getStreamStatus().equals(StreamStatus.ACTIVE)) { + operator.processWatermark(mark); + } } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); @@ -398,8 +429,11 @@ private static final class CopyingChainingOutput extends ChainingOutput { private final TypeSerializer serializer; - public CopyingChainingOutput(OneInputStreamOperator operator, TypeSerializer serializer) { - super(operator); + public CopyingChainingOutput( + OneInputStreamOperator operator, + TypeSerializer serializer, + StreamStatusProvider streamStatusProvider) { + super(operator, streamStatusProvider); this.serializer = serializer; } @@ -422,15 +456,22 @@ private static class BroadcastingOutputCollector implements Output>[] outputs; private final Random RNG = new XORShiftRandom(); + + private final StreamStatusProvider streamStatusProvider; - public BroadcastingOutputCollector(Output>[] outputs) { + public BroadcastingOutputCollector( + Output>[] outputs, + StreamStatusProvider streamStatusProvider) { this.outputs = outputs; + this.streamStatusProvider = streamStatusProvider; } @Override public void emitWatermark(Watermark mark) { - for (Output> output : outputs) { - output.emitWatermark(mark); + if (streamStatusProvider.getStreamStatus().equals(StreamStatus.ACTIVE)) { + for (Output> output : outputs) { + output.emitWatermark(mark); + } } } @@ -467,8 +508,10 @@ public void close() { */ private static final class CopyingBroadcastingOutputCollector extends BroadcastingOutputCollector { - public CopyingBroadcastingOutputCollector(Output>[] outputs) { - super(outputs); + public CopyingBroadcastingOutputCollector( + Output>[] outputs, + StreamStatusProvider streamStatusProvider) { + super(outputs, streamStatusProvider); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java index a5f94ad6f59dc..a6150d60ea385 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +39,7 @@ public class StreamIterationTail extends OneInputStreamTask { @Override public void init() throws Exception { - super.init(); - + final String iterationId = getConfiguration().getIterationId(); if (iterationId == null || iterationId.length() == 0) { throw new Exception("Missing iteration ID in the task configuration"); @@ -51,15 +51,17 @@ public void init() throws Exception { final long iterationWaitTime = getConfiguration().getIterationWaitTime(); LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID); - + @SuppressWarnings("unchecked") BlockingQueue> dataChannel = (BlockingQueue>) BlockingQueueBroker.INSTANCE.get(brokerID); - + LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID); - + this.headOperator = new RecordPusher<>(); this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime)); + + super.init(); } private static class RecordPusher extends AbstractStreamOperator implements OneInputStreamOperator { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamStatusProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamStatusProvider.java new file mode 100644 index 0000000000000..a7f4504c3cefa --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamStatusProvider.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; + +public interface StreamStatusProvider { + + StreamStatus getStreamStatus(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 7647fbbf8ac56..8015360cf1589 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -139,7 +139,7 @@ public abstract class StreamTask> protected OP headOperator; /** The chain of operators executed by this task */ - private OperatorChain operatorChain; + protected OperatorChain operatorChain; /** The configuration of this streaming task */ private StreamConfig configuration; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 233e9f10db0c8..d670f60ca16a9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -64,14 +64,20 @@ public void init() throws Exception { throw new RuntimeException("Invalid input type number: " + inputType); } } - - this.inputProcessor = new StreamTwoInputProcessor( + + @SuppressWarnings("unchecked") + OperatorChain> operatorChain = + (OperatorChain) this.operatorChain; + + this.inputProcessor = new StreamTwoInputProcessor<>( inputList1, inputList2, inputDeserializer1, inputDeserializer2, this, configuration.getCheckpointMode(), + getCheckpointLock(), getEnvironment().getIOManager(), - getEnvironment().getTaskManagerInfo().getConfiguration()); + getEnvironment().getTaskManagerInfo().getConfiguration(), + operatorChain); // make sure that stream tasks report their I/O statistics inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); @@ -79,12 +85,10 @@ public void init() throws Exception { @Override protected void run() throws Exception { - // cache some references on the stack, to make the code more JIT friendly - final TwoInputStreamOperator operator = this.headOperator; + // cache processor reference on the stack, to make the code more JIT friendly final StreamTwoInputProcessor inputProcessor = this.inputProcessor; - final Object lock = getCheckpointLock(); - - while (running && inputProcessor.processInput(operator, lock)) { + + while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java index f84836b7e5dc6..9ddea8cb8f181 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java @@ -18,12 +18,10 @@ package org.apache.flink.streaming.runtime.operators; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.Test; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java new file mode 100644 index 0000000000000..564901f996fc4 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamstatus; + +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link StatusWatermarkValve}. While tests in {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest} + * and {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest} may also implicitly test {@link StatusWatermarkValve} + * and that valves are correctly used in the tasks' input processors, the unit tests here additionally makes sure that + * the watermarks and stream statuses to forward are generated from the valve at the exact correct times and in a + * deterministic behaviour. The unit tests here also test more complex stream status / watermark input cases. + * + *

+ * The tests are performed by a series of watermark and stream status inputs to the valve. On every input method call, + * the output is checked to contain only the expected watermark or stream status, and nothing else. This ensures that + * no redundant outputs are generated by the output logic of {@link StatusWatermarkValve}. The behaviours that a series of + * input calls to the valve is trying to test is explained as inline comments within the tests. + */ +public class StatusWatermarkValveTest { + + /** + * Tests that all input channels of a valve start as ACTIVE stream status. + */ + @Test + public void testAllInputChannelsStartAsActive() { + BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler(); + StatusWatermarkValve valve = new StatusWatermarkValve(4, valveOutput); + + // ------------------------------------------------------------------------ + // Ensure that the valve will output an IDLE stream status as soon as + // all input channels become IDLE; this also implicitly ensures that + // all input channels start as ACTIVE. + // ------------------------------------------------------------------------ + + valve.inputStreamStatus(StreamStatus.IDLE, 3); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputStreamStatus(StreamStatus.IDLE, 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputStreamStatus(StreamStatus.IDLE, 1); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputStreamStatus(StreamStatus.IDLE, 2); + assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + } + + /** + * Tests that valves work as expected when they handle only 1 input channel. + * Tested behaviours are explained as inline comments. + */ + @Test + public void testOneInputValve() { + BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler(); + StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput); + + // start off with an ACTIVE status; since the valve should initially start as ACTIVE, + // no state change is toggled, therefore no stream status should be emitted + valve.inputStreamStatus(StreamStatus.ACTIVE, 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // input some monotonously increasing watermarks while ACTIVE; + // the exact same watermarks should be emitted right after the inputs + valve.inputWatermark(new Watermark(0), 0); + assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(25), 0); + assertEquals(new Watermark(25), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // decreasing watermarks should not result in any output + valve.inputWatermark(new Watermark(18), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(42), 0); + assertEquals(new Watermark(42), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // toggling ACTIVE to IDLE should result in an IDLE stream status output + valve.inputStreamStatus(StreamStatus.IDLE, 0); + assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // watermark inputs should be ignored while all input channels (only 1 in this case) are IDLE + valve.inputWatermark(new Watermark(52), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(60), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // no status change toggle while IDLE should result in stream status outputs + valve.inputStreamStatus(StreamStatus.IDLE, 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // toggling IDLE to ACTIVE should result in an ACTIVE stream status output + valve.inputStreamStatus(StreamStatus.ACTIVE, 0); + assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // the valve should remember the last watermark input channels received while they were ACTIVE (which was 42); + // decreasing watermarks should therefore still be ignored, even after a status toggle + valve.inputWatermark(new Watermark(40), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // monotonously increasing watermarks after resuming to be ACTIVE should be output normally + valve.inputWatermark(new Watermark(68), 0); + assertEquals(new Watermark(68), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(72), 0); + assertEquals(new Watermark(72), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + } + + /** + * Tests that valves work as expected when they handle multiple input channels (tested with 3). + * Tested behaviours are explained as inline comments. + */ + @Test + public void testMultipleInputValve() { + BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler(); + StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput); + + // ------------------------------------------------------------------------ + // Ensure that watermarks are output only when all + // channels have been input some watermark. + // ------------------------------------------------------------------------ + + valve.inputWatermark(new Watermark(0), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(0), 1); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(0), 2); + assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // ------------------------------------------------------------------------ + // Ensure that watermarks are output as soon as the overall min + // watermark across all channels have advanced. + // ------------------------------------------------------------------------ + + valve.inputWatermark(new Watermark(12), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(8), 2); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(10), 2); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(15), 1); + // lowest watermark across all channels is now channel 2, with watermark @ 10 + assertEquals(new Watermark(10), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // ------------------------------------------------------------------------ + // Ensure that decreasing watermarks are ignored + // ------------------------------------------------------------------------ + + valve.inputWatermark(new Watermark(6), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // ------------------------------------------------------------------------ + // Ensure that when some input channel becomes idle, that channel will + // no longer be accounted for when advancing the watermark. + // ------------------------------------------------------------------------ + + // marking channel 2 as IDLE shouldn't result in overall status toggle for the valve, + // because there are still other active channels (0 and 1), so there should not be any + // stream status outputs; + // also, now that channel 2 is IDLE, the overall min watermark is 12 (from channel 0), + // so the valve should output that + valve.inputStreamStatus(StreamStatus.IDLE, 2); + assertEquals(new Watermark(12), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // from now on, since channel 2 is IDLE, the valve should use watermarks only from + // channel 0 and 1 to find the min watermark, even if channel 2 has the lowest watermark (10) + valve.inputWatermark(new Watermark(17), 0); + assertEquals(new Watermark(15), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(25), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(20), 1); + assertEquals(new Watermark(20), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // ------------------------------------------------------------------------ + // Ensure that after some channel resumes to be ACTIVE, it needs to + // catch up" with the current overall min watermark before it can be + // accounted for again when finding the min watermark across channels. + // Also tests that before the resumed channel catches up, the overall + // min watermark can still advance with watermarks of other channels. + // ------------------------------------------------------------------------ + + // resuming channel 2 to be ACTIVE shouldn't result in overall status toggle for the valve, + // because the valve wasn't overall IDLE, so there should not be any stream status outputs; + valve.inputStreamStatus(StreamStatus.ACTIVE, 2); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // although watermarks for channel 2 will now be accepted, it still + // hasn't caught up with the overall min watermark (20) + valve.inputWatermark(new Watermark(18), 2); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // since channel 2 hasn't caught up yet, it is still ignored when advancing new min watermarks + valve.inputWatermark(new Watermark(22), 1); + assertEquals(new Watermark(22), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(28), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(33), 1); + assertEquals(new Watermark(28), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // now, channel 2 has caught up with the overall min watermark + valve.inputWatermark(new Watermark(30), 2); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(31), 0); + // this acknowledges that channel 2's watermark is being accounted for again + assertEquals(new Watermark(30), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(34), 2); + assertEquals(new Watermark(31), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // ------------------------------------------------------------------------ + // Ensure that once all channels are IDLE, the valve should also + // determine itself to be IDLE output a IDLE stream status + // ------------------------------------------------------------------------ + + valve.inputStreamStatus(StreamStatus.IDLE, 0); + // this is because once channel 0 becomes IDLE, + // the new min watermark will be 33 (channel 1) + assertEquals(new Watermark(33), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputStreamStatus(StreamStatus.IDLE, 2); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputStreamStatus(StreamStatus.IDLE, 1); + assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // ------------------------------------------------------------------------ + // Ensure that channels gradually become ACTIVE again, the above behaviours + // still hold. Also ensure that as soon as one of the input channels + // become ACTIVE, the valve is ACTIVE again and outputs an ACTIVE stream status. + // ------------------------------------------------------------------------ + + // let channel 0 resume to be ACTIVE + valve.inputStreamStatus(StreamStatus.ACTIVE, 0); + assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // channel 0 is the only ACTIVE channel now, and is the only channel + // accounted for when advancing min watermark + valve.inputWatermark(new Watermark(36), 0); + assertEquals(new Watermark(36), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // new also let channel 1 become ACTIVE + valve.inputStreamStatus(StreamStatus.ACTIVE, 1); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // channel 1 is still behind overall min watermark + valve.inputWatermark(new Watermark(35), 1); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // since channel 1 is still behind, channel 0 remains to be the only + // channel used to advance min watermark + valve.inputWatermark(new Watermark(37), 0); + assertEquals(new Watermark(37), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // now, channel 1 has caught up with the overall min watermark + valve.inputWatermark(new Watermark(38), 1); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(40), 0); + // this acknowledges that channel 1's watermark is being accounted for again + assertEquals(new Watermark(38), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + } + + private class BufferedValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler { + private BlockingQueue outputWatermarks = new LinkedBlockingQueue<>(); + private BlockingQueue outputStreamStatuses = new LinkedBlockingQueue<>(); + + @Override + public void handleWatermark(Watermark watermark) { + outputWatermarks.add(watermark); + } + + @Override + public void handleStreamStatus(StreamStatus streamStatus) { + outputStreamStatuses.add(streamStatus); + } + + public Watermark popLastOutputWatermark() { + return outputWatermarks.poll(); + } + + public StreamStatus popLastOutputStreamStatus() { + return outputStreamStatuses.poll(); + } + + public boolean hasNoOutputWatermarks() { + return outputWatermarks.size() == 0; + } + + public boolean hasNoOutputStreamStatuses() { + return outputStreamStatuses.size() == 0; + } + } + +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java new file mode 100644 index 0000000000000..247dc8b0364d0 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamstatus; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class StreamStatusTest { + + @Test (expected = IllegalArgumentException.class) + public void testIllegalCreationThrowsException() { + new StreamStatus(32); + } + + @Test + public void testEquals() { + StreamStatus idleStatus = new StreamStatus(StreamStatus.IDLE_STATUS); + StreamStatus activeStatus = new StreamStatus(StreamStatus.ACTIVE_STATUS); + + assertEquals(StreamStatus.IDLE, idleStatus); + assertTrue(idleStatus.isIdle()); + assertFalse(idleStatus.isActive()); + + assertEquals(StreamStatus.ACTIVE, activeStatus); + assertTrue(activeStatus.isActive()); + assertFalse(activeStatus.isIdle()); + } + + @Test + public void testTypeCasting() { + StreamStatus status = StreamStatus.ACTIVE; + + assertTrue(status.isStreamStatus()); + assertFalse(status.isRecord()); + assertFalse(status.isWatermark()); + assertFalse(status.isLatencyMarker()); + + try { + status.asWatermark(); + fail("should throw an exception"); + } catch (Exception e) { + // expected + } + + try { + status.asRecord(); + fail("should throw an exception"); + } catch (Exception e) { + // expected + } + + try { + status.asLatencyMarker(); + fail("should throw an exception"); + } catch (Exception e) { + // expected + } + } + +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index be93f6af8b676..169fb596e94ec 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -47,6 +47,7 @@ import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -126,15 +127,19 @@ public void testOpenCloseAndTimestamps() throws Exception { } /** - * This test verifies that watermarks are correctly forwarded. This also checks whether + * This test verifies that watermarks and stream statuses are correctly forwarded. This also checks whether * watermarks are forwarded only when we have received watermarks from all inputs. The - * forwarded watermark must be the minimum of the watermarks of all inputs. + * forwarded watermark must be the minimum of the watermarks of all active inputs. */ @Test @SuppressWarnings("unchecked") - public void testWatermarkForwarding() throws Exception { + public void testWatermarkAndStreamStatusForwarding() throws Exception { final OneInputStreamTask mapTask = new OneInputStreamTask(); - final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + final OneInputStreamTaskTestHarness testHarness = + new OneInputStreamTaskTestHarness( + mapTask, 2, 2, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap mapOperator = new StreamMap(new IdentityMap()); @@ -181,7 +186,7 @@ public void testWatermarkForwarding() throws Exception { TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - // advance watermark from one of the inputs, now we should get a now one since the + // advance watermark from one of the inputs, now we should get a new one since the // minimum increases testHarness.processElement(new Watermark(initialTime + 4), 1, 1); testHarness.waitForInputProcessing(); @@ -196,6 +201,31 @@ public void testWatermarkForwarding() throws Exception { expectedOutput.add(new Watermark(initialTime + 4)); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + // test whether idle input channels are acknowledged correctly when forwarding watermarks + testHarness.processElement(StreamStatus.IDLE, 0, 1); + testHarness.processElement(StreamStatus.IDLE, 1, 0); + testHarness.processElement(new Watermark(initialTime + 6), 0, 0); + testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first + testHarness.processElement(StreamStatus.IDLE, 1, 1); // once this is acknowledged, + // watermark (initial + 6) should be forwarded + testHarness.waitForInputProcessing(); + expectedOutput.add(new Watermark(initialTime + 5)); + expectedOutput.add(new Watermark(initialTime + 6)); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + // make all input channels idle and check that the operator's idle status is forwarded + testHarness.processElement(StreamStatus.IDLE, 0, 0); + testHarness.waitForInputProcessing(); + expectedOutput.add(StreamStatus.IDLE); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + // make some input channels active again and check that the operator's active status is forwarded only once + testHarness.processElement(StreamStatus.ACTIVE, 1, 0); + testHarness.processElement(StreamStatus.ACTIVE, 0, 1); + testHarness.waitForInputProcessing(); + expectedOutput.add(StreamStatus.ACTIVE); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + testHarness.endInput(); testHarness.waitForTaskCompletion(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 3cd9c9a0d5b8f..161c7757c3f50 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.co.CoStreamMap; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.util.TestHarnessUtil; import org.junit.Assert; @@ -89,15 +90,20 @@ public void testOpenCloseAndTimestamps() throws Exception { } /** - * This test verifies that watermarks are correctly forwarded. This also checks whether + * This test verifies that watermarks and stream statuses are correctly forwarded. This also checks whether * watermarks are forwarded only when we have received watermarks from all inputs. The - * forwarded watermark must be the minimum of the watermarks of all inputs. + * forwarded watermark must be the minimum of the watermarks of all active inputs. */ @Test @SuppressWarnings("unchecked") - public void testWatermarkForwarding() throws Exception { + public void testWatermarkAndStreamStatusForwarding() throws Exception { final TwoInputStreamTask coMapTask = new TwoInputStreamTask(); - final TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + final TwoInputStreamTaskTestHarness testHarness = + new TwoInputStreamTaskTestHarness( + coMapTask, 2, 2, new int[] {1, 2}, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); StreamConfig streamConfig = testHarness.getStreamConfig(); CoStreamMap coMapOperator = new CoStreamMap(new IdentityMap()); @@ -147,7 +153,7 @@ public void testWatermarkForwarding() throws Exception { TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - // advance watermark from one of the inputs, now we should get a now one since the + // advance watermark from one of the inputs, now we should get a new one since the // minimum increases testHarness.processElement(new Watermark(initialTime + 4), 1, 1); testHarness.waitForInputProcessing(); @@ -162,6 +168,33 @@ public void testWatermarkForwarding() throws Exception { expectedOutput.add(new Watermark(initialTime + 4)); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + // test whether idle input channels are acknowledged correctly when forwarding watermarks + testHarness.processElement(StreamStatus.IDLE, 0, 1); + testHarness.processElement(StreamStatus.IDLE, 1, 0); + testHarness.processElement(new Watermark(initialTime + 6), 0, 0); + testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first + testHarness.processElement(StreamStatus.IDLE, 1, 1); // once this is acknowledged, + // watermark (initial + 6) should be forwarded + testHarness.waitForInputProcessing(); + expectedOutput.add(new Watermark(initialTime + 5)); + // We don't expect to see Watermark(6) here because the idle status of one + // input doesn't propagate to the other input. That is, if input 1 is at WM 6 and input + // two was at WM 5 before going to IDLE then the output watermark will not jump to WM 6. + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + // make all input channels idle and check that the operator's idle status is forwarded + testHarness.processElement(StreamStatus.IDLE, 0, 0); + testHarness.waitForInputProcessing(); + expectedOutput.add(StreamStatus.IDLE); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + // make some input channels active again and check that the operator's active status is forwarded only once + testHarness.processElement(StreamStatus.ACTIVE, 1, 0); + testHarness.processElement(StreamStatus.ACTIVE, 0, 1); + testHarness.waitForInputProcessing(); + expectedOutput.add(StreamStatus.ACTIVE); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + testHarness.endInput(); testHarness.waitForTaskCompletion(); From fb2724d5d293529ddd2ded841c1407646e34e514 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Mon, 6 Feb 2017 15:24:19 +0800 Subject: [PATCH 2/5] [FLINK-5017] General improvements to Aljoscha's changes --- .../runtime/io/RecordWriterOutput.java | 6 +- .../runtime/io/StreamInputProcessor.java | 8 +- .../runtime/io/StreamTwoInputProcessor.java | 41 +++++++--- .../streamstatus/StatusWatermarkValve.java | 2 + .../runtime/streamstatus/StreamStatus.java | 82 +++++++++---------- .../StreamStatusProvider.java | 13 ++- .../runtime/tasks/OneInputStreamTask.java | 1 - .../runtime/tasks/OperatorChain.java | 16 ++-- .../runtime/tasks/StreamIterationTail.java | 1 - 9 files changed, 98 insertions(+), 72 deletions(-) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/{tasks => streamstatus}/StreamStatusProvider.java (76%) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index ed925e76bf697..51c6cd7058bc2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -31,7 +31,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.tasks.StreamStatusProvider; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -67,7 +67,7 @@ public RecordWriterOutput( serializationDelegate = new SerializationDelegate(outRecordSerializer); } - this.streamStatusProvider = streamStatusProvider; + this.streamStatusProvider = checkNotNull(streamStatusProvider); } @Override @@ -86,7 +86,7 @@ public void collect(StreamRecord record) { public void emitWatermark(Watermark mark) { serializationDelegate.setInstance(mark); - if (streamStatusProvider.getStreamStatus().equals(StreamStatus.ACTIVE)) { + if (streamStatusProvider.getStreamStatus().isActive()) { try { recordWriter.broadcastEmit(serializationDelegate); } catch (Exception e) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index e0a6ea13f5161..94d4c0c23fa2c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -83,9 +83,7 @@ public class StreamInputProcessor { // ---------------- Status and Watermark Valve ------------------ - /** - * Valve that controls how watermarks and stream statuses are forwarded. - */ + /** Valve that controls how watermarks and stream statuses are forwarded. */ private StatusWatermarkValve statusWatermarkValve; /** Number of input channels the valve needs to handle. */ @@ -158,8 +156,8 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { this.lastEmittedWatermark = Long.MIN_VALUE; - this.operatorChain = operatorChain; - this.streamOperator = operatorChain.getHeadOperator(); + this.operatorChain = checkNotNull(operatorChain); + this.streamOperator = checkNotNull(operatorChain.getHeadOperator()); this.statusWatermarkValve = new StatusWatermarkValve( numInputChannels, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index f0e22ed82af2b..1c0631c862b3d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -57,8 +57,8 @@ * *

* This internally uses a {@link StatusWatermarkValve} to keep track of {@link Watermark} and {@link StreamStatus} events, - * and forwards them to event subscribers once the {@link StatusWatermarkValve} determines the {@link Watermark} from - * all inputs has advanced, or that a {@link StreamStatus} needs to be propagated downstream to denote a status change. + * and forwards watermarks to event subscribers once the {@link StatusWatermarkValve} determines the watermarks from + * all inputs has advanced, or changes the task's {@link StreamStatus} once status change is toggled. * *

* Forwarding elements, watermarks, or status status elements must be protected by synchronizing on the given lock @@ -88,8 +88,8 @@ public class StreamTwoInputProcessor { * Stream status for the two inputs. We need to keep track for determining when * to forward stream status changes downstream. */ - private StreamStatus firstStatus = StreamStatus.ACTIVE; - private StreamStatus secondStatus = StreamStatus.ACTIVE; + private StreamStatus firstStatus; + private StreamStatus secondStatus; /** * Valves that control how watermarks and stream statuses from the 2 inputs are forwarded. @@ -181,7 +181,10 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { this.lastEmittedWatermark1 = Long.MIN_VALUE; this.lastEmittedWatermark2 = Long.MIN_VALUE; - this.operatorChain = operatorChain; + this.firstStatus = StreamStatus.ACTIVE; + this.secondStatus = StreamStatus.ACTIVE; + + this.operatorChain = checkNotNull(operatorChain); this.streamOperator = checkNotNull(operatorChain.getHeadOperator()); this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock)); @@ -348,10 +351,18 @@ public void handleWatermark(Watermark watermark) { public void handleStreamStatus(StreamStatus streamStatus) { try { synchronized (lock) { - if (!streamStatus.equals(firstStatus) && streamStatus.equals(secondStatus)) { - operatorChain.setStreamStatus(streamStatus); - } firstStatus = streamStatus; + + // check if we need to toggle the task's stream status + if (!streamStatus.equals(operatorChain.getStreamStatus())) { + if (streamStatus.isActive()) { + // we're no longer idle if at least one input has become active + operatorChain.setStreamStatus(StreamStatus.ACTIVE); + } else if (secondStatus.isIdle()) { + // we're idle once both inputs are idle + operatorChain.setStreamStatus(StreamStatus.IDLE); + } + } } } catch (Exception e) { throw new RuntimeException("Exception occurred while processing valve output stream status: ", e); @@ -384,10 +395,18 @@ public void handleWatermark(Watermark watermark) { public void handleStreamStatus(StreamStatus streamStatus) { try { synchronized (lock) { - if (!streamStatus.equals(secondStatus) && streamStatus.equals(firstStatus)) { - operatorChain.setStreamStatus(streamStatus); - } secondStatus = streamStatus; + + // check if we need to toggle the task's stream status + if (!streamStatus.equals(operatorChain.getStreamStatus())) { + if (streamStatus.isActive()) { + // we're no longer idle if at least one input has become active + operatorChain.setStreamStatus(StreamStatus.ACTIVE); + } else if (firstStatus.isIdle()) { + // we're idle once both inputs are idle + operatorChain.setStreamStatus(StreamStatus.IDLE); + } + } } } catch (Exception e) { throw new RuntimeException("Exception occurred while processing valve output stream status: ", e); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java index 349fcca295c18..182757a78c90d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.streamstatus; +import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.watermark.Watermark; import static org.apache.flink.util.Preconditions.checkArgument; @@ -30,6 +31,7 @@ * implementation of {@link ValveOutputHandler}, which is called by the valve only when it determines a new watermark or * stream status can be propagated. */ +@Internal public class StatusWatermarkValve { /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java index efc2730a6015f..e82fad0420bf7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java @@ -18,66 +18,66 @@ package org.apache.flink.streaming.runtime.streamstatus; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator; +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTask; /** - * A Stream Status element informs operators whether or not they should continue to expect records and watermarks + * A Stream Status element informs stream tasks whether or not they should continue to expect records and watermarks * from the input stream that sent them. There are 2 kinds of status, namely {@link StreamStatus#IDLE} and * {@link StreamStatus#ACTIVE}. Stream Status elements are generated at the sources, and may be propagated through - * the operators of the topology using {@link org.apache.flink.streaming.api.operators.Output#emitStreamStatus(StreamStatus)}. - * They directly infer the current status of the emitting source or operator; a source or operator emits a - * {@link StreamStatus#IDLE} if it will temporarily halt to emit any records or watermarks (i.e. is idle), and emits a - * {@link StreamStatus#ACTIVE} once it resumes to do so (i.e. is active). The cases that sources and downstream operators - * are considered either idle or active is explained below: + * the tasks of the topology. They directly infer the current status of the emitting task; a {@link SourceStreamTask} or + * {@link StreamTask} emits a {@link StreamStatus#IDLE} if it will temporarily halt to emit any records or watermarks + * (i.e. is idle), and emits a {@link StreamStatus#ACTIVE} once it resumes to do so (i.e. is active). Tasks are + * responsible for propagating their status further downstream once they toggle between being idle and active. The cases + * that source tasks and downstream tasks are considered either idle or active is explained below: * *

    - *
  • Sources: A source is considered to be idle if it will not emit records for an indefinite amount of time. This - * is the case, for example, for Flink's Kafka Consumer, where sources might initially have no assigned partitions - * to read from, or no records can be read from the assigned partitions. Once the source detects that it will - * resume emitting data, it is considered to be active. Sources are responsible for ensuring that no records (and - * possibly watermarks, in the case of Flink's Kafka Consumer which can generate watermarks directly within the - * source) will be emitted in between a consecutive {@link StreamStatus#IDLE} and {@link StreamStatus#ACTIVE}. - * This guarantee should be enforced on sources through + *
  • Source tasks: A source task is considered to be idle if its head operator, i.e. a {@link StreamSource}, will + * not emit records for an indefinite amount of time. This is the case, for example, for Flink's Kafka Consumer, + * where sources might initially have no assigned partitions to read from, or no records can be read from the + * assigned partitions. Once the head {@link StreamSource} operator detects that it will resume emitting data, + * the source task is considered to be active. {@link StreamSource}s are responsible for toggling the status + * of the containing source task and ensuring that no records (and possibly watermarks, in the case of Flink's + * Kafka Consumer which can generate watermarks directly within the source) will be emitted while the task is + * idle. This guarantee should be enforced on sources through * {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext} implementations.
  • * - *
  • Downstream operators: a downstream operator is considered to be idle if all of its input streams are idle, - * i.e. the last received Stream Status element from all input streams is a {@link StreamStatus#IDLE}. As long - * as one of its input streams is active, i.e. the last received Stream Status element from the input stream is - * {@link StreamStatus#ACTIVE}, the operator is active. Operators are responsible for propagating their status - * further downstream once they toggle between being idle and active.
  • + *
  • Downstream tasks: a downstream task is considered to be idle if all its input streams are idle, i.e. the last + * received Stream Status element from all input streams is a {@link StreamStatus#IDLE}. As long as one of its + * input streams is active, i.e. the last received Stream Status element from the input stream is + * {@link StreamStatus#ACTIVE}, the task is active.
  • *
* *

- * Stream Status elements received at downstream operators also affect and controls how they process and advance their - * watermarks. The below describes the effects (the logic is implemented as a {@link StatusWatermarkValve} which - * downstream operators should use for such purposes): + * Stream Status elements received at downstream tasks also affect and control how their operators process and advance + * their watermarks. The below describes the effects (the logic is implemented as a {@link StatusWatermarkValve} which + * downstream tasks should use for such purposes): * *

    - *
  • Since sources guarantee that no records will be emitted between a {@link StreamStatus#IDLE} and - * {@link StreamStatus#ACTIVE}, downstream operators can always safely process and propagate records when they - * receive them, without the need to check whether or not the operator is currently idle or active. However, for - * watermarks, since there may be {@link TimestampsAndPeriodicWatermarksOperator}s that might produce watermarks - * anywhere in the middle of topologies, regardless of whether there are input data at the operator, all - * downstream operators need to check whether or not they are actually active before processing a received - * watermark.
  • + *
  • Since source tasks guarantee that no records will be emitted between a {@link StreamStatus#IDLE} and + * {@link StreamStatus#ACTIVE}, downstream tasks can always safely process and propagate records through their + * operator chain when they receive them, without the need to check whether or not the task is currently idle or + * active. However, for watermarks, since there may be watermark generators that might produce watermarks + * anywhere in the middle of topologies regardless of whether there are input data at the operator, the current + * status of the task must be checked before forwarding watermarks emitted from + * an operator. It the status is actually idle, the watermark must be blocked.
  • * - *
  • For downstream operators with multiple input streams (ex. head operators of a - * {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask} or - * {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}, or any - * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator}), the watermarks of input streams - * that are temporarily idle, or has resumed to be active but its watermark is behind the overall min watermark - * of the operator, should not be accounted for when deciding whether or not to advance the operator's - * watermark.
  • + *
  • For downstream tasks with multiple input streams, the watermarks of input streams that are temporarily idle, + * or has resumed to be active but its watermark is behind the overall min watermark of the operator, should not + * be accounted for when deciding whether or not to advance the watermark and propagated through the operator + * chain.
  • *
* *

- * Note that to notify downstream operators that a source is permanently closed and will no longer send any more elements, - * the source should still send a {@link org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK} instead of - * {@link StreamStatus#IDLE}. Stream Status elements only serve as markers for temporary status. + * Note that to notify downstream tasks that a source task is permanently closed and will no longer send any more + * elements, the source should still send a {@link Watermark#MAX_WATERMARK} instead of {@link StreamStatus#IDLE}. + * Stream Status elements only serve as markers for temporary status. */ -@PublicEvolving +@Internal public final class StreamStatus extends StreamElement { public static final int IDLE_STATUS = -1; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamStatusProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusProvider.java similarity index 76% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamStatusProvider.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusProvider.java index a7f4504c3cefa..ae8d9af1bc98c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamStatusProvider.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusProvider.java @@ -15,11 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.tasks; +package org.apache.flink.streaming.runtime.streamstatus; -import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.annotation.Internal; +/** + * Interface for retrieving the current {@link StreamStatus}. + */ +@Internal public interface StreamStatusProvider { + /** + * Returns the current stream status. + * + * @return current stream status. + */ StreamStatus getStreamStatus(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 8770b7e3d4a87..9f11bddaf9950 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -24,7 +24,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.io.StreamInputProcessor; -import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor; @Internal public class OneInputStreamTask extends StreamTask> { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 3aaea200a5992..e2ad6f5ccc885 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -42,6 +42,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider; import org.apache.flink.util.XORShiftRandom; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +62,7 @@ * head operator. */ @Internal -public class OperatorChain> implements StreamStatusProvider{ +public class OperatorChain> implements StreamStatusProvider { private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class); @@ -74,11 +75,10 @@ public class OperatorChain> implements Strea private final OP headOperator; /** - * This output keeps track of the current status, in order to block - * any watermarks explicitly generated at concrete implementations - * when the operator is actually idle. This may happen, since - * timestamp assigner / watermark emitting operators will - * completely bypass the valve's watermark output logic. + * Current status of the input stream of the operator chain. + * Watermarks explicitly generated by operators in the chain (i.e. timestamp + * assigner / watermark extractors), will be blocked and not forwarded if + * this value is {@link StreamStatus#IDLE}. */ private StreamStatus streamStatus = StreamStatus.ACTIVE; @@ -395,7 +395,7 @@ public void collect(StreamRecord record) { @Override public void emitWatermark(Watermark mark) { try { - if (streamStatusProvider.getStreamStatus().equals(StreamStatus.ACTIVE)) { + if (streamStatusProvider.getStreamStatus().isActive()) { operator.processWatermark(mark); } } @@ -468,7 +468,7 @@ public BroadcastingOutputCollector( @Override public void emitWatermark(Watermark mark) { - if (streamStatusProvider.getStreamStatus().equals(StreamStatus.ACTIVE)) { + if (streamStatusProvider.getStreamStatus().isActive()) { for (Output> output : outputs) { output.emitWatermark(mark); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java index a6150d60ea385..f68007e5a8a47 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java @@ -28,7 +28,6 @@ import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From fab0eb176d07880f4d9ff2cc110b108dc58257ac Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Mon, 6 Feb 2017 16:25:14 +0800 Subject: [PATCH 3/5] [FLINK-5017] Add test to verify watermarks are ignored when task is idle --- .../runtime/tasks/OneInputStreamTaskTest.java | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 169fb596e94ec..5e73d5051a566 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -234,6 +234,85 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception { assertEquals(2, resultElements.size()); } + /** + * This test verifies that generated watermarks are ignored and not forwarded when the task is idle. + */ + @Test + public void testIgnoresGeneratedWatermarksWhenIdle() throws Exception { + final OneInputStreamTask testTask = new OneInputStreamTask<>(); + final OneInputStreamTaskTestHarness testHarness = + new OneInputStreamTaskTestHarness( + testTask, 1, 1, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + StreamConfig streamConfig = testHarness.getStreamConfig(); + + // this watermark generator simply emits long-value string values it receives as watermark + WatermarkGeneratingTestOperator watermarkGeneratingTestOperator = new WatermarkGeneratingTestOperator(); + streamConfig.setStreamOperator(watermarkGeneratingTestOperator); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue(); + + testHarness.invoke(); + testHarness.waitForTaskRunning(); + + // the task starts as active, so all generated watermarks should be forwarded + testHarness.processElement(new StreamRecord<>("10"), 0, 0); + testHarness.processElement(new StreamRecord<>("20"), 0, 0); + testHarness.processElement(new StreamRecord<>("30"), 0, 0); + testHarness.waitForInputProcessing(); + + expectedOutput.add(new StreamRecord<>("10")); + expectedOutput.add(new Watermark(10)); + expectedOutput.add(new StreamRecord<>("20")); + expectedOutput.add(new Watermark(20)); + expectedOutput.add(new StreamRecord<>("30")); + expectedOutput.add(new Watermark(30)); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + // now, toggle the task to be idle, and let the watermark generator produce some watermarks + testHarness.processElement(StreamStatus.IDLE); + + // NOTE: normally, tasks will not have records to process while idle; + // we're doing this here only to mimic watermark generating in operators + testHarness.processElement(new StreamRecord<>("40"), 0, 0); + testHarness.processElement(new StreamRecord<>("50"), 0, 0); + testHarness.processElement(new StreamRecord<>("60"), 0, 0); + testHarness.waitForInputProcessing(); + + // the 40 - 60 watermarks should not be forwarded, only the stream status toggle element and records + expectedOutput.add(StreamStatus.IDLE); + expectedOutput.add(new StreamRecord<>("40")); + expectedOutput.add(new StreamRecord<>("50")); + expectedOutput.add(new StreamRecord<>("60")); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + // re-toggle the task to be active and see if new watermarks are correctly forwarded again + testHarness.processElement(StreamStatus.ACTIVE); + + testHarness.processElement(new StreamRecord<>("70"), 0, 0); + testHarness.processElement(new StreamRecord<>("80"), 0, 0); + testHarness.processElement(new StreamRecord<>("90"), 0, 0); + testHarness.waitForInputProcessing(); + + expectedOutput.add(StreamStatus.ACTIVE); + expectedOutput.add(new StreamRecord<>("70")); + expectedOutput.add(new Watermark(70)); + expectedOutput.add(new StreamRecord<>("80")); + expectedOutput.add(new Watermark(80)); + expectedOutput.add(new StreamRecord<>("90")); + expectedOutput.add(new Watermark(90)); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.endInput(); + + testHarness.waitForTaskCompletion(); + + List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput()); + assertEquals(9, resultElements.size()); + } + /** * This test verifies that checkpoint barriers are correctly forwarded. */ @@ -688,5 +767,19 @@ public String map(String value) throws Exception { return value; } } + + private static class WatermarkGeneratingTestOperator + extends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final long serialVersionUID = -5064871833244157221L; + + @Override + public void processElement(StreamRecord element) throws Exception { + output.collect(element); + output.emitWatermark(new Watermark(Long.valueOf(element.getValue()))); + } + + } } From ef5fd94dadfc2bac5ba8da52c29b8b1ddbddc103 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Mon, 6 Feb 2017 21:10:57 +0800 Subject: [PATCH 4/5] [FLINK-5017] Fix failing IterateITCase tests in flink-tests --- .../flink/streaming/runtime/io/StreamInputProcessor.java | 5 +++-- .../flink/streaming/runtime/io/StreamTwoInputProcessor.java | 5 +++-- .../flink/streaming/runtime/tasks/OneInputStreamTask.java | 3 ++- .../org/apache/flink/streaming/runtime/tasks/StreamTask.java | 1 - .../flink/streaming/runtime/tasks/TwoInputStreamTask.java | 3 ++- 5 files changed, 10 insertions(+), 7 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 94d4c0c23fa2c..e2061c31377d6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -115,7 +115,8 @@ public StreamInputProcessor( Object lock, IOManager ioManager, Configuration taskManagerConfig, - OperatorChain> operatorChain) throws IOException { + OperatorChain> operatorChain, + OneInputStreamOperator streamOperator) throws IOException { InputGate inputGate = InputGateUtil.createInputGate(inputGates); @@ -157,7 +158,7 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { this.lastEmittedWatermark = Long.MIN_VALUE; this.operatorChain = checkNotNull(operatorChain); - this.streamOperator = checkNotNull(operatorChain.getHeadOperator()); + this.streamOperator = checkNotNull(streamOperator); this.statusWatermarkValve = new StatusWatermarkValve( numInputChannels, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 1c0631c862b3d..a295395be7ecb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -129,7 +129,8 @@ public StreamTwoInputProcessor( Object lock, IOManager ioManager, Configuration taskManagerConfig, - OperatorChain> operatorChain) throws IOException { + OperatorChain> operatorChain, + TwoInputStreamOperator streamOperator) throws IOException { final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); @@ -185,7 +186,7 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { this.secondStatus = StreamStatus.ACTIVE; this.operatorChain = checkNotNull(operatorChain); - this.streamOperator = checkNotNull(operatorChain.getHeadOperator()); + this.streamOperator = checkNotNull(streamOperator); this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock)); this.statusWatermarkValve2 = new StatusWatermarkValve(numInputChannels2, new ForwardingValveOutputHandler2(streamOperator, lock)); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 9f11bddaf9950..e559ad0a719ce 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -54,7 +54,8 @@ public void init() throws Exception { getCheckpointLock(), getEnvironment().getIOManager(), getEnvironment().getTaskManagerInfo().getConfiguration(), - operatorChain); + operatorChain, + this.headOperator); // make sure that stream tasks report their I/O statistics inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 8015360cf1589..e6d0353154afe 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -646,7 +646,6 @@ private void initializeState() throws Exception { boolean restored = null != restoreStateHandles; if (restored) { - checkRestorePreconditions(operatorChain.getChainLength()); initializeOperators(true); restoreStateHandles = null; // free for GC diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index d670f60ca16a9..175bd76cbe01a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -77,7 +77,8 @@ public void init() throws Exception { getCheckpointLock(), getEnvironment().getIOManager(), getEnvironment().getTaskManagerInfo().getConfiguration(), - operatorChain); + operatorChain, + this.headOperator); // make sure that stream tasks report their I/O statistics inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); From 29aceadd437e7dbde32e2f4c7e40eb8000760e10 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Thu, 9 Feb 2017 17:41:29 +0800 Subject: [PATCH 5/5] [FLINK-5017] Rewrite watermerk ignoring test to also verify not forwarded within chain --- .../state/RocksDBAsyncSnapshotTest.java | 2 + .../runtime/tasks/OperatorChain.java | 5 +- .../async/AsyncWaitOperatorTest.java | 3 + .../streamtask/StreamIterationHeadTest.java | 1 + .../operators/StreamTaskTimerTest.java | 2 + .../TestProcessingTimeServiceTest.java | 1 + .../runtime/tasks/OneInputStreamTaskTest.java | 182 ++++++++++++++++-- .../runtime/tasks/SourceStreamTaskTest.java | 2 + .../StreamTaskCancellationBarrierTest.java | 3 + .../runtime/tasks/StreamTaskTestHarness.java | 27 +-- .../runtime/tasks/TwoInputStreamTaskTest.java | 4 + 11 files changed, 198 insertions(+), 34 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 46a184a035057..b28e7b777b398 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -119,6 +119,7 @@ public void testFullyAsyncSnapshot() throws Exception { final OneInputStreamTask task = new OneInputStreamTask<>(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); testHarness.configureForKeyedStream(new KeySelector() { @Override @@ -219,6 +220,7 @@ public void testCancelFullyAsyncCheckpoints() throws Exception { final OneInputStreamTask task = new OneInputStreamTask<>(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); testHarness.configureForKeyedStream(new KeySelector() { @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index e2ad6f5ccc885..162721db375c1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -118,7 +118,8 @@ public OperatorChain(StreamTask containingTask) { chainedConfigs, userCodeClassloader, streamOutputMap, allOps); if (headOperator != null) { - headOperator.setup(containingTask, configuration, getChainEntryPoint()); + Output output = getChainEntryPoint(); + headOperator.setup(containingTask, configuration, output); } // add head operator to end of chain @@ -257,7 +258,7 @@ private Output> createOutputCollector( for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) { @SuppressWarnings("unchecked") RecordWriterOutput output = (RecordWriterOutput) streamOutputs.get(outputEdge); - + allOutputs.add(new Tuple2>, StreamEdge>(output, outputEdge)); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 10df6d7ddae62..6138dbbdb6acb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -355,6 +355,7 @@ public void testOperatorChainWithProcessingTime() throws Exception { final OneInputStreamTask task = new OneInputStreamTask<>(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); testHarness.taskConfig = chainedVertex.getConfiguration(); @@ -465,6 +466,7 @@ public void testStateSnapshotAndRestore() throws Exception { final OneInputStreamTask task = new OneInputStreamTask<>(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); AsyncWaitOperator operator = new AsyncWaitOperator<>( new LazyAsyncFunction(), @@ -517,6 +519,7 @@ public void testStateSnapshotAndRestore() throws Exception { final OneInputStreamTaskTestHarness restoredTaskHarness = new OneInputStreamTaskTestHarness<>(restoredTask, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); + restoredTaskHarness.asSimpleOneOperatorHarness(); AsyncWaitOperator restoredOperator = new AsyncWaitOperator<>( new MyAsyncFunction(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java index 36cf53a1c4376..abe8c33287bf6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java @@ -33,6 +33,7 @@ public void testIterationHeadWatermarkEmission() throws Exception { StreamIterationHead head = new StreamIterationHead<>(); StreamTaskTestHarness harness = new StreamTaskTestHarness<>(head, BasicTypeInfo.INT_TYPE_INFO); + harness.asSimpleOneOperatorHarness(); harness.getStreamConfig().setIterationId("1"); harness.getStreamConfig().setIterationWaitTime(1); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java index e0e0e916559aa..9054cfcb0f1be 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java @@ -46,6 +46,7 @@ public void testOpenCloseAndTimestamps() throws Exception { final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>( mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); @@ -83,6 +84,7 @@ public void checkScheduledTimestampe() { try { final OneInputStreamTask mapTask = new OneInputStreamTask<>(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap mapOperator = new StreamMap<>(new DummyMapFunction()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java index 4d24b82b1cd97..8518ead458f89 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java @@ -44,6 +44,7 @@ public void testCustomTimeServiceProvider() throws Throwable { final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>( mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 5e73d5051a566..13c13fe39a3a6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; @@ -38,14 +39,16 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.util.TestHarnessUtil; @@ -60,14 +63,7 @@ import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -99,6 +95,7 @@ public class OneInputStreamTaskTest extends TestLogger { public void testOpenCloseAndTimestamps() throws Exception { final OneInputStreamTask mapTask = new OneInputStreamTask(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap mapOperator = new StreamMap(new TestOpenCloseMapFunction()); @@ -140,6 +137,7 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception { mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap mapOperator = new StreamMap(new IdentityMap()); @@ -185,7 +183,6 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception { expectedOutput.add(new Watermark(initialTime + 2)); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - // advance watermark from one of the inputs, now we should get a new one since the // minimum increases testHarness.processElement(new Watermark(initialTime + 4), 1, 1); @@ -235,10 +232,16 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception { } /** - * This test verifies that generated watermarks are ignored and not forwarded when the task is idle. + * This test verifies that watermarks are not forwarded when the task is idle. + * It also verifies that when task is idle, watermarks generated in the middle of chains are also blocked and + * never forwarded. + * + * The tested chain will be: (HEAD: normal operator) --> (watermark generating operator) --> (normal operator). + * The operators will throw an exception and fail the test if either of them were forwarded watermarks when + * the task is idle. */ @Test - public void testIgnoresGeneratedWatermarksWhenIdle() throws Exception { + public void testWatermarksNotForwardedWithinChainWhenIdle() throws Exception { final OneInputStreamTask testTask = new OneInputStreamTask<>(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness( @@ -246,11 +249,67 @@ public void testIgnoresGeneratedWatermarksWhenIdle() throws Exception { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); - StreamConfig streamConfig = testHarness.getStreamConfig(); - - // this watermark generator simply emits long-value string values it receives as watermark - WatermarkGeneratingTestOperator watermarkGeneratingTestOperator = new WatermarkGeneratingTestOperator(); - streamConfig.setStreamOperator(watermarkGeneratingTestOperator); + // ------------------ setup the chain ------------------ + + TriggerableFailOnWatermarkTestOperator headOperator = new TriggerableFailOnWatermarkTestOperator(); + StreamConfig headOperatorConfig = testHarness.getStreamConfig(); + + WatermarkGeneratingTestOperator watermarkOperator = new WatermarkGeneratingTestOperator(); + StreamConfig watermarkOperatorConfig = new StreamConfig(new Configuration()); + + TriggerableFailOnWatermarkTestOperator tailOperator = new TriggerableFailOnWatermarkTestOperator(); + StreamConfig tailOperatorConfig = new StreamConfig(new Configuration()); + + headOperatorConfig.setStreamOperator(headOperator); + headOperatorConfig.setChainStart(); + headOperatorConfig.setChainIndex(0); + headOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge( + new StreamNode(null, 0, null, null, null, null, null), + new StreamNode(null, 1, null, null, null, null, null), + 0, + Collections.emptyList(), + null + ))); + + watermarkOperatorConfig.setStreamOperator(watermarkOperator); + watermarkOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE); + watermarkOperatorConfig.setChainIndex(1); + watermarkOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge( + new StreamNode(null, 1, null, null, null, null, null), + new StreamNode(null, 2, null, null, null, null, null), + 0, + Collections.emptyList(), + null + ))); + + List outEdgesInOrder = new LinkedList(); + outEdgesInOrder.add(new StreamEdge( + new StreamNode(null, 2, null, null, null, null, null), + new StreamNode(null, 3, null, null, null, null, null), + 0, + Collections.emptyList(), + new BroadcastPartitioner())); + + tailOperatorConfig.setStreamOperator(tailOperator); + tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE); + tailOperatorConfig.setBufferTimeout(0); + tailOperatorConfig.setChainIndex(2); + tailOperatorConfig.setChainEnd(); + tailOperatorConfig.setOutputSelectors(Collections.>emptyList()); + tailOperatorConfig.setNumberOfOutputs(1); + tailOperatorConfig.setOutEdgesInOrder(outEdgesInOrder); + tailOperatorConfig.setNonChainedOutputs(outEdgesInOrder); + tailOperatorConfig.setTypeSerializerOut(StringSerializer.INSTANCE); + + Map chainedConfigs = new HashMap<>(2); + chainedConfigs.put(1, watermarkOperatorConfig); + chainedConfigs.put(2, tailOperatorConfig); + headOperatorConfig.setTransitiveChainedTaskConfigs(chainedConfigs); + headOperatorConfig.setOutEdgesInOrder(outEdgesInOrder); + + // ----------------------------------------------------- + + // --------------------- begin test --------------------- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue(); @@ -258,11 +317,20 @@ public void testIgnoresGeneratedWatermarksWhenIdle() throws Exception { testHarness.waitForTaskRunning(); // the task starts as active, so all generated watermarks should be forwarded + testHarness.processElement(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER)); + testHarness.processElement(new StreamRecord<>("10"), 0, 0); + + // this watermark will be forwarded since the task is currently active, + // but should not be in the final output because it should be blocked by the watermark generator in the chain + testHarness.processElement(new Watermark(15)); + testHarness.processElement(new StreamRecord<>("20"), 0, 0); testHarness.processElement(new StreamRecord<>("30"), 0, 0); + testHarness.waitForInputProcessing(); + expectedOutput.add(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER)); expectedOutput.add(new StreamRecord<>("10")); expectedOutput.add(new Watermark(10)); expectedOutput.add(new StreamRecord<>("20")); @@ -274,15 +342,20 @@ public void testIgnoresGeneratedWatermarksWhenIdle() throws Exception { // now, toggle the task to be idle, and let the watermark generator produce some watermarks testHarness.processElement(StreamStatus.IDLE); + // after this, the operators will throw an exception if they are forwarded watermarks anywhere in the chain + testHarness.processElement(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER)); + // NOTE: normally, tasks will not have records to process while idle; // we're doing this here only to mimic watermark generating in operators testHarness.processElement(new StreamRecord<>("40"), 0, 0); testHarness.processElement(new StreamRecord<>("50"), 0, 0); testHarness.processElement(new StreamRecord<>("60"), 0, 0); + testHarness.processElement(new Watermark(65)); // the test will fail if any of the operators were forwarded this testHarness.waitForInputProcessing(); // the 40 - 60 watermarks should not be forwarded, only the stream status toggle element and records expectedOutput.add(StreamStatus.IDLE); + expectedOutput.add(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER)); expectedOutput.add(new StreamRecord<>("40")); expectedOutput.add(new StreamRecord<>("50")); expectedOutput.add(new StreamRecord<>("60")); @@ -290,6 +363,7 @@ public void testIgnoresGeneratedWatermarksWhenIdle() throws Exception { // re-toggle the task to be active and see if new watermarks are correctly forwarded again testHarness.processElement(StreamStatus.ACTIVE); + testHarness.processElement(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER)); testHarness.processElement(new StreamRecord<>("70"), 0, 0); testHarness.processElement(new StreamRecord<>("80"), 0, 0); @@ -297,6 +371,7 @@ public void testIgnoresGeneratedWatermarksWhenIdle() throws Exception { testHarness.waitForInputProcessing(); expectedOutput.add(StreamStatus.ACTIVE); + expectedOutput.add(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER)); expectedOutput.add(new StreamRecord<>("70")); expectedOutput.add(new Watermark(70)); expectedOutput.add(new StreamRecord<>("80")); @@ -310,7 +385,7 @@ public void testIgnoresGeneratedWatermarksWhenIdle() throws Exception { testHarness.waitForTaskCompletion(); List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput()); - assertEquals(9, resultElements.size()); + assertEquals(12, resultElements.size()); } /** @@ -320,6 +395,7 @@ public void testIgnoresGeneratedWatermarksWhenIdle() throws Exception { public void testCheckpointBarriers() throws Exception { final OneInputStreamTask mapTask = new OneInputStreamTask(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap mapOperator = new StreamMap(new IdentityMap()); @@ -378,6 +454,7 @@ public void testCheckpointBarriers() throws Exception { public void testOvertakingCheckpointBarriers() throws Exception { final OneInputStreamTask mapTask = new OneInputStreamTask(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap mapOperator = new StreamMap(new IdentityMap()); @@ -448,6 +525,8 @@ public void testSnapshottingAndRestoring() throws Exception { final Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); final OneInputStreamTask streamTask = new OneInputStreamTask(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(streamTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); + IdentityKeySelector keySelector = new IdentityKeySelector<>(); testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO); @@ -768,18 +847,79 @@ public String map(String value) throws Exception { } } - private static class WatermarkGeneratingTestOperator + /** + * A {@link TriggerableFailOnWatermarkTestOperator} that generates watermarks. + */ + private static class WatermarkGeneratingTestOperator extends TriggerableFailOnWatermarkTestOperator { + + private static final long serialVersionUID = -5064871833244157221L; + + private long lastWatermark; + + @Override + protected void handleElement(StreamRecord element) { + long timestamp = Long.valueOf(element.getValue()); + if (timestamp > lastWatermark) { + output.emitWatermark(new Watermark(timestamp)); + lastWatermark = timestamp; + } + } + + @Override + protected void handleWatermark(Watermark mark) { + if (mark.equals(Watermark.MAX_WATERMARK)) { + output.emitWatermark(mark); + lastWatermark = Long.MAX_VALUE; + } + } + } + + /** + * An operator that can be triggered whether or not to expect watermarks forwarded to it, toggled + * by letting it process special trigger marker records. + * + * If it receives a watermark when it's not expecting one, it'll throw an exception and fail. + */ + private static class TriggerableFailOnWatermarkTestOperator extends AbstractStreamOperator implements OneInputStreamOperator { - private static final long serialVersionUID = -5064871833244157221L; + private static final long serialVersionUID = 2048954179291813243L; + + public final static String EXPECT_FORWARDED_WATERMARKS_MARKER = "EXPECT_WATERMARKS"; + public final static String NO_FORWARDED_WATERMARKS_MARKER = "NO_WATERMARKS"; + + protected boolean expectForwardedWatermarks; @Override public void processElement(StreamRecord element) throws Exception { output.collect(element); - output.emitWatermark(new Watermark(Long.valueOf(element.getValue()))); + + if (element.getValue().equals(EXPECT_FORWARDED_WATERMARKS_MARKER)) { + this.expectForwardedWatermarks = true; + } else if (element.getValue().equals(NO_FORWARDED_WATERMARKS_MARKER)) { + this.expectForwardedWatermarks = false; + } else { + handleElement(element); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + if (!expectForwardedWatermarks) { + throw new Exception("Received a " + mark + ", but this operator should not be forwarded watermarks."); + } else { + handleWatermark(mark); + } } + protected void handleElement(StreamRecord element) { + // do nothing + } + + protected void handleWatermark(Watermark mark) { + output.emitWatermark(mark); + } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index dd1fe58aaec8a..1c50aec0d58f1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -57,6 +57,7 @@ public class SourceStreamTaskTest { public void testOpenClose() throws Exception { final SourceStreamTask, StreamSource>> sourceTask = new SourceStreamTask<>(); final StreamTaskTestHarness testHarness = new StreamTaskTestHarness(sourceTask, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamSource sourceOperator = new StreamSource<>(new OpenCloseTestSource()); @@ -99,6 +100,7 @@ public void testCheckpointing() throws Exception { final SourceStreamTask, SourceFunction>, StreamSource, SourceFunction>>> sourceTask = new SourceStreamTask<>(); final StreamTaskTestHarness> testHarness = new StreamTaskTestHarness>(sourceTask, typeInfo); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamSource, ?> sourceOperator = new StreamSource<>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY)); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java index ebe528574da69..be67a27afec10 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java @@ -50,6 +50,7 @@ public class StreamTaskCancellationBarrierTest { public void testEmitCancellationBarrierWhenNotReady() throws Exception { StreamTask task = new InitBlockingTask(); StreamTaskTestHarness testHarness = new StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); // start the test - this cannot succeed across the 'init()' method testHarness.invoke(); @@ -80,6 +81,7 @@ public void testDeclineCallOnCancelBarrierOneInput() throws Exception { task, 1, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap mapOperator = new StreamMap<>(new IdentityMap()); @@ -124,6 +126,7 @@ public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception { TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness<>( task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); CoStreamMap op = new CoStreamMap<>(new UnionCoMap()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 8dc6afa3f335c..074a6c44f78f3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -106,9 +106,6 @@ public StreamTaskTestHarness(AbstractInvokable task, TypeInformation output this.executionConfig = new ExecutionConfig(); streamConfig = new StreamConfig(taskConfig); - streamConfig.setChainStart(); - streamConfig.setBufferTimeout(0); - streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime); outputSerializer = outputType.createSerializer(executionConfig); outputStreamRecordSerializer = new StreamElementSerializer(outputSerializer); @@ -129,11 +126,25 @@ protected void initializeInputs() throws IOException, InterruptedException {} @SuppressWarnings("unchecked") private void initializeOutput() { outputList = new ConcurrentLinkedQueue(); - mockEnv.addOutput(outputList, outputStreamRecordSerializer); + } + /** + * Users of the test harness can call this utility method to setup the stream config + * if there will only be a single operator to be tested. The method will setup the + * outgoing network connection for the operator. + * + * For more advanced test cases such as testing chains of multiple operators with the harness, + * please manually configure the stream config. + */ + public void asSimpleOneOperatorHarness() { + streamConfig.setChainStart(); + streamConfig.setBufferTimeout(0); + streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime); streamConfig.setOutputSelectors(Collections.>emptyList()); streamConfig.setNumberOfOutputs(1); + streamConfig.setTypeSerializerOut(outputSerializer); + streamConfig.setVertexID(0); StreamOperator dummyOperator = new AbstractStreamOperator() { private static final long serialVersionUID = 1L; @@ -142,13 +153,10 @@ private void initializeOutput() { List outEdgesInOrder = new LinkedList(); StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", dummyOperator, "source dummy", new LinkedList>(), SourceStreamTask.class); StreamNode targetVertexDummy = new StreamNode(null, 1, "group", dummyOperator, "target dummy", new LinkedList>(), SourceStreamTask.class); - outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList(), new BroadcastPartitioner())); + streamConfig.setOutEdgesInOrder(outEdgesInOrder); streamConfig.setNonChainedOutputs(outEdgesInOrder); - streamConfig.setTypeSerializerOut(outputSerializer); - streamConfig.setVertexID(0); - } public StreamMockEnvironment createEnvironment() { @@ -330,9 +338,6 @@ public void waitForInputProcessing() throws Exception { allEmpty = false; } } - try { - Thread.sleep(10); - } catch (InterruptedException ignored) {} if (allEmpty) { break; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 161c7757c3f50..81dd26c5c61bd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -59,6 +59,7 @@ public class TwoInputStreamTaskTest { public void testOpenCloseAndTimestamps() throws Exception { final TwoInputStreamTask coMapTask = new TwoInputStreamTask(); final TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(coMapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); CoStreamMap coMapOperator = new CoStreamMap(new TestOpenCloseMapFunction()); @@ -104,6 +105,7 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); CoStreamMap coMapOperator = new CoStreamMap(new IdentityMap()); @@ -211,6 +213,7 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception { public void testCheckpointBarriers() throws Exception { final TwoInputStreamTask coMapTask = new TwoInputStreamTask(); final TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); CoStreamMap coMapOperator = new CoStreamMap(new IdentityMap()); @@ -291,6 +294,7 @@ public void testCheckpointBarriers() throws Exception { public void testOvertakingCheckpointBarriers() throws Exception { final TwoInputStreamTask coMapTask = new TwoInputStreamTask(); final TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.asSimpleOneOperatorHarness(); StreamConfig streamConfig = testHarness.getStreamConfig(); CoStreamMap coMapOperator = new CoStreamMap(new IdentityMap());