From 4336cd34fe016c7b34d64f80dc9b31526c8f9539 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Mon, 4 Apr 2016 12:09:56 +0200 Subject: [PATCH 1/2] [FLINK-3660] Measure latency and exposes them via a metric This commit adds the initial runtime support for measuring latency of records going through the system. I therefore introduced a new StreamElement, called a LatencyMarker. Similar to Watermarks, LatencyMarkers are emitted from the sources at an configured interval. The default value for the interval is 2000 ms. The emission of markers can be disabled by setting the interval to 0. LatencyMarkers can not "overtake" regular elements. This ensures that the measured latency approximates the end-to-end latency of regular stream elements. Regular operators (excluding those participating in iterations) forward latency markers if they are not a sink. Operators with many outputs randomly select one to forward the maker to. This ensures that every marker exists only once in the system, and that repartition steps are not causing an explosion in the number of transferred markers. If an operator is a sink, it will maintain the last 512 latencies from each known source instance. The min/max/mean/p50/p95/p99 of each known source is reported using a special LatencyGauge from the sink (every operator can be a sink, if it doesn't have any outputs). This commit does not visualize the latency in the web interface. Also, there is currently no mechanism to ensure that the system clocks are in-sync, so the latency measurements will be inaccurate if the hardware clocks are not correct. --- docs/monitoring/metrics.md | 113 ++++++----- docs/setup/config.md | 3 + .../flink/storm/wrappers/BoltWrapperTest.java | 16 +- .../flink/api/common/ExecutionConfig.java | 39 ++++ .../flink/configuration/ConfigConstants.java | 9 + .../flink/runtime/execution/Environment.java | 2 +- .../io/network/api/writer/RecordWriter.java | 53 +++-- .../operators/testutils/DummyEnvironment.java | 2 +- .../collector/selector/DirectedOutput.java | 11 + .../api/operators/AbstractStreamOperator.java | 188 +++++++++++++++++- .../operators/AbstractUdfStreamOperator.java | 4 +- .../api/operators/OneInputStreamOperator.java | 3 + .../flink/streaming/api/operators/Output.java | 3 + .../api/operators/StreamCounter.java | 44 ---- .../api/operators/StreamGroupedReduce.java | 1 - .../api/operators/StreamOperator.java | 2 +- .../streaming/api/operators/StreamSink.java | 2 +- .../streaming/api/operators/StreamSource.java | 42 ++++ .../api/operators/TwoInputStreamOperator.java | 25 ++- .../runtime/io/RecordWriterOutput.java | 17 +- .../runtime/io/StreamInputProcessor.java | 26 ++- .../runtime/io/StreamRecordWriter.java | 9 + .../runtime/io/StreamTwoInputProcessor.java | 37 ++-- .../runtime/streamrecord/LatencyMarker.java | 106 ++++++++++ .../MultiplexingStreamRecordSerializer.java | 23 ++- .../runtime/streamrecord/StreamElement.java | 17 ++ .../runtime/tasks/OneInputStreamTask.java | 2 +- .../runtime/tasks/OperatorChain.java | 55 +++-- .../runtime/tasks/StreamIterationTail.java | 12 +- .../streaming/runtime/tasks/StreamTask.java | 8 + .../runtime/tasks/TwoInputStreamTask.java | 2 +- .../api/graph/StreamGraphGeneratorTest.java | 18 +- .../api/operators/StreamCounterTest.java | 61 ------ .../operators/StreamOperatorChainingTest.java | 4 +- .../operators/StreamSourceOperatorTest.java | 69 ++++++- ...ignedProcessingTimeWindowOperatorTest.java | 17 +- ...ignedProcessingTimeWindowOperatorTest.java | 18 +- .../operators/windowing/CollectingOutput.java | 8 +- .../flink/streaming/util/MockOutput.java | 6 + .../OneInputStreamOperatorTestHarness.java | 8 +- .../TwoInputStreamOperatorTestHarness.java | 8 +- .../streaming/runtime/TimestampITCase.java | 1 + 42 files changed, 821 insertions(+), 273 deletions(-) delete mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java delete mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 0e514078fffae..6de5b5e546168 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -376,7 +376,6 @@ Flink exposes the following system metrics: Description - JobManager @@ -475,52 +474,76 @@ Flink exposes the following system metrics: - - Task - currentLowWatermark - The lowest watermark a task has received. - - - lastCheckpointDuration - The time it took to complete the last checkpoint. - - - lastCheckpointSize - The total size of the last checkpoint. - - - restartingTime - The time it took to restart the job. - - - numBytesInLocal - The total number of bytes this task has read from a local source. - - - numBytesInRemote - The total number of bytes this task has read from a remote source. - - - numBytesOut - The total number of bytes this task has emitted. - - - - - Operator - numRecordsIn - The total number of records this operator has received. - - - numRecordsOut - The total number of records this operator has emitted. - - - numSplitsProcessed - The total number of InputSplits this data source has processed. - + Task + currentLowWatermark + The lowest watermark a task has received. + + + lastCheckpointDuration + The time it took to complete the last checkpoint. + + + lastCheckpointSize + The total size of the last checkpoint. + + + restartingTime + The time it took to restart the job. + + + numBytesInLocal + The total number of bytes this task has read from a local source. + + + numBytesInRemote + The total number of bytes this task has read from a remote source. + + + numBytesOut + The total number of bytes this task has emitted. + + + Operator + numRecordsIn + The total number of records this operator has received. + + + numRecordsOut + The total number of records this operator has emitted. + + + numSplitsProcessed + The total number of InputSplits this data source has processed (if the operator is a data source). + + + latency + A latency gauge reporting the latency distribution from the different sources. + +### Latency tracking + +Flink allows to track the latency of records traveling through the system. To enable the latency tracking +a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`. + +At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMarker`. +The marker contains a timestamp from the time when the record has been emitted at the sources. +Latency markers can not overtake regular user records, thus if records are queuing up in front of an operator, +it will add to the latency tracked by the marker. + +Note that the latency markers are not accounting for the time user records spend in operators as they are +bypassing them. In particular the markers are not accounting for the time records spend for example in window buffers. +Only if operators are not able to accept new records, thus they are queuing up, the latency measured using +the markers will reflect that. + +All intermediate operators keep a list of the last `n` latencies from each source to compute +a latency distribution. +The sink operators keep a list from each source, and each parallel source instance to allow detecting +latency issues caused by individual machines. + +Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting +up an automated clock synchronisation service (like NTP) to avoid false latency results. + {% top %} diff --git a/docs/setup/config.md b/docs/setup/config.md index 54ef39403303d..3f6b70502929e 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -378,6 +378,9 @@ Previously this key was named `recovery.mode` and the default value was `standal - `metrics.scope.tm.operator`: (Default: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>) Defines the scope format string that is applied to all metrics scoped to an operator. +- `metrics.latency.history-size`: (Default: 128) Defines the number of measured latencies to maintain at each operator + + ## Background ### Configuring the Network Buffers diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index c15b5f6bf6f52..6bec01b1c4bd4 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -35,6 +35,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.storm.util.AbstractTest; import org.apache.flink.storm.util.SplitStreamType; import org.apache.flink.storm.util.StormConfig; @@ -154,7 +155,7 @@ private void testWrapper(final int numberOfAttributes) throws Exception { PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class), false); wrapper.open(); wrapper.processElement(record); @@ -190,7 +191,7 @@ public void testMultipleOutputStreams() throws Exception { } final BoltWrapper wrapper = new BoltWrapper(bolt, null, raw); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), output); + wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), output, false); wrapper.open(); final SplitStreamType splitRecord = new SplitStreamType(); @@ -243,7 +244,7 @@ public void testOpen() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper wrapper = new BoltWrapper(bolt); - wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class), false); wrapper.open(); verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class)); @@ -256,7 +257,7 @@ public void testOpen() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper wrapper = new BoltWrapper(bolt); - wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class), false); wrapper.open(); verify(bolt).prepare(same(stormConfig), any(TopologyContext.class), any(OutputCollector.class)); @@ -273,7 +274,7 @@ public void testOpen() throws Exception { TestDummyBolt testBolt = new TestDummyBolt(); BoltWrapper wrapper = new BoltWrapper(testBolt); - wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class), false); wrapper.open(); for (Entry entry : cfg.toMap().entrySet()) { @@ -300,7 +301,7 @@ public void testOpenSink() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper wrapper = new BoltWrapper(bolt); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class), false); wrapper.open(); verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNotNull(OutputCollector.class)); @@ -317,7 +318,7 @@ public void testClose() throws Exception { final BoltWrapper wrapper = new BoltWrapper(bolt); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class), false); wrapper.close(); wrapper.dispose(); @@ -369,6 +370,7 @@ public Map getComponentConfiguration() { when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 0, 1, 0)); when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader()); when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup()); + when(env.getTaskManagerInfo()).thenReturn(new TaskManagerRuntimeInfo("foo", new Configuration(), "foo")); StreamTask mockTask = mock(StreamTask.class); when(mockTask.getCheckpointLock()).thenReturn(new Object()); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index aadf867930279..06e00f1aa23ab 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -120,6 +120,11 @@ public class ExecutionConfig implements Serializable { private long autoWatermarkInterval = 0; + /** + * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. + */ + private long latencyTrackingInterval = 2000L; + /** * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration */ @@ -204,6 +209,40 @@ public long getAutoWatermarkInterval() { return this.autoWatermarkInterval; } + /** + * Interval for sending latency tracking marks from the sources to the sinks. + * Flink will send latency tracking marks from the sources at the specified interval. + * + * Recommended value: 2000 (2 seconds). + * + * Setting a tracking interval <= 0 disables the latency tracking. + * + * @param interval Interval in milliseconds. + */ + @PublicEvolving + public ExecutionConfig setLatencyTrackingInterval(long interval) { + this.latencyTrackingInterval = interval; + return this; + } + + /** + * Returns the latency tracking interval. + * @return The latency tracking interval in milliseconds + */ + @PublicEvolving + public long getLatencyTrackingInterval() { + return latencyTrackingInterval; + } + + /** + * Returns if latency tracking is enabled + * @return True, if the tracking is enabled, false otherwise. + */ + @PublicEvolving + public boolean isLatencyTrackingEnabled() { + return latencyTrackingInterval > 0; + } + /** * Gets the parallelism with which operation are executed by default. Operations can * individually override this value to use a specific parallelism. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 9e66e2aa74137..00f951a205dfb 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -868,6 +868,10 @@ public final class ConfigConstants { /** The scope format string that is applied to all metrics scoped to an operator. */ public static final String METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator"; + /** The number of measured latencies to maintain at each operator */ + public static final String METRICS_LATENCY_HISTORY_SIZE = "metrics.latency.history-size"; + + // ------------------------------------------------------------------------ // Default Values // ------------------------------------------------------------------------ @@ -1280,6 +1284,11 @@ public final class ConfigConstants { /** Default retry delay on location lookup failures. */ public static final int DEFAULT_QUERYABLE_STATE_CLIENT_LOOKUP_RETRY_DELAY = 1000; + // ----------------------------- Metrics ---------------------------- + + /** The default number of measured latencies to maintain at each operator */ + public static final int DEFAULT_METRICS_LATENCY_HISTORY_SIZE = 128; + // ----------------------------- Environment Variables ---------------------------- /** The environment variable name which contains the location of the configuration directory */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index cbbeec7387af2..f0ff918ad7a75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -79,7 +79,7 @@ public interface Environment { ExecutionAttemptID getExecutionId(); /** - * Returns the task-wide configuration object, originally attache to the job vertex. + * Returns the task-wide configuration object, originally attached to the job vertex. * * @return The task-wide configuration */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 49636982ef41e..422aa652e9c0c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -25,8 +25,10 @@ import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.util.XORShiftRandom; import java.io.IOException; +import java.util.Random; import static org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult; @@ -54,6 +56,8 @@ public class RecordWriter { /** {@link RecordSerializer} per outgoing channel */ private final RecordSerializer[] serializers; + private final Random RNG = new XORShiftRandom(); + public RecordWriter(ResultPartitionWriter writer) { this(writer, new RoundRobinChannelSelector()); } @@ -78,22 +82,7 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele public void emit(T record) throws IOException, InterruptedException { for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - // serialize with corresponding serializer and send full buffer - RecordSerializer serializer = serializers[targetChannel]; - - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - writeBuffer(buffer, targetChannel, serializer); - } - - buffer = writer.getBufferProvider().requestBufferBlocking(); - result = serializer.setNextBuffer(buffer); - } - } + sendToTarget(record, targetChannel); } } @@ -103,21 +92,31 @@ public void emit(T record) throws IOException, InterruptedException { */ public void broadcastEmit(T record) throws IOException, InterruptedException { for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - // serialize with corresponding serializer and send full buffer - RecordSerializer serializer = serializers[targetChannel]; + sendToTarget(record, targetChannel); + } + } - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); + /** + * This is used to send LatencyMarks to a random target channel + */ + public void randomEmit(T record) throws IOException, InterruptedException { + sendToTarget(record, RNG.nextInt(numChannels)); + } - if (buffer != null) { - writeBuffer(buffer, targetChannel, serializer); - } + private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { + RecordSerializer serializer = serializers[targetChannel]; - buffer = writer.getBufferProvider().requestBufferBlocking(); - result = serializer.setNextBuffer(buffer); + synchronized (serializer) { + SerializationResult result = serializer.addRecord(record); + while (result.isFullBuffer()) { + Buffer buffer = serializer.getCurrentBuffer(); + + if (buffer != null) { + writeBuffer(buffer, targetChannel, serializer); } + + buffer = writer.getBufferProvider().requestBufferBlocking(); + result = serializer.setNextBuffer(buffer); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index bb0712283a8c3..04ba4e520ba32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -92,7 +92,7 @@ public Configuration getTaskConfiguration() { @Override public TaskManagerRuntimeInfo getTaskManagerInfo() { - return null; + return new TaskManagerRuntimeInfo("foo", new Configuration(), "foo"); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java index 8346013b7cc3b..24f1c63bba157 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java @@ -23,13 +23,16 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.XORShiftRandom; public class DirectedOutput implements Output> { @@ -42,6 +45,8 @@ public class DirectedOutput implements Output> { protected final Output>[] allOutputs; + private final Random RNG = new XORShiftRandom(); + @SuppressWarnings({"unchecked", "rawtypes"}) public DirectedOutput( @@ -100,6 +105,12 @@ public void emitWatermark(Watermark mark) { } } + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + // randomly select an output + allOutputs[RNG.nextInt(allOutputs.length)].emitLatencyMarker(latencyMarker); + } + protected Set>> selectOutputs(StreamRecord record) { Set>> selectedOutputs = new HashSet<>(selectAllOutputs.length); Collections.addAll(selectedOutputs, selectAllOutputs); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 0ca89ef951e60..805ed1d5a8a34 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -19,13 +19,17 @@ package org.apache.flink.streaming.api.operators; import org.apache.commons.io.IOUtils; +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -38,12 +42,16 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.ConcurrentModificationException; import java.util.Collection; import java.util.concurrent.RunnableFuture; @@ -103,26 +111,45 @@ public abstract class AbstractStreamOperator private transient Collection lazyRestoreStateHandles; - protected transient MetricGroup metrics; + + // --------------- Metrics --------------------------- + + /** Metric group for the operator */ + protected MetricGroup metrics; + + /** Flag indicating if this operator is a sink */ + protected transient boolean isSink = false; + + protected LatencyGauge latencyGauge; // ------------------------------------------------------------------------ // Life Cycle // ------------------------------------------------------------------------ @Override - public void setup(StreamTask containingTask, StreamConfig config, Output> output) { + public void setup(StreamTask containingTask, StreamConfig config, Output> output, boolean isSink) { this.container = containingTask; this.config = config; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + this.isSink = isSink; + Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); + int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); + if (historySize <= 0) { + LOG.warn("{} has been set to a value below 0: {}. Using default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize); + historySize = ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE; + } + + latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize, !isSink)); this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap()); stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader()); stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader()); } + @Override public MetricGroup getMetricGroup() { return metrics; } @@ -365,6 +392,158 @@ public final ChainingStrategy getChainingStrategy() { return chainingStrategy; } + + // ------------------------------------------------------------------------ + // Metrics + // ------------------------------------------------------------------------ + + // ------- One input stream + public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + // ------- Two input stream + public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception { + reportOrForwardLatencyMarker(latencyMarker); + } + + + protected void reportOrForwardLatencyMarker(LatencyMarker maker) { + // all operators are tracking latencies + this.latencyGauge.reportLatency(maker); + if (!isSink) { + // everything except sinks forwards latency markers + this.output.emitLatencyMarker(maker); + } + } + + // ----------------------- Helper classes ----------------------- + + + /** + * The gauge uses a HashMap internally to avoid classloading issues when accessing + * the values using JMX. + */ + private static class LatencyGauge implements Gauge>> { + private final Map latencyStats = new HashMap<>(); + private final int historySize; + private final boolean ignoreSubtaskIndex; + + LatencyGauge(int historySize, boolean ignoreSubtaskIndex) { + this.historySize = historySize; + this.ignoreSubtaskIndex = ignoreSubtaskIndex; + } + + public void reportLatency(LatencyMarker marker) { + LatencySourceDescriptor sourceDescriptor = LatencySourceDescriptor.of(marker, this.ignoreSubtaskIndex); + DescriptiveStatistics sourceStats = latencyStats.get(sourceDescriptor); + if (sourceStats == null) { + // 512 element window (4 kb) + sourceStats = new DescriptiveStatistics(this.historySize); + latencyStats.put(sourceDescriptor, sourceStats); + } + long now = System.currentTimeMillis(); + sourceStats.addValue(now - marker.getMarkedTime()); + } + + @Override + public Map> getValue() { + while (true) { + try { + Map> ret = new HashMap<>(); + for (Map.Entry source : latencyStats.entrySet()) { + HashMap sourceStatistics = new HashMap<>(6); + sourceStatistics.put("max", source.getValue().getMax()); + sourceStatistics.put("mean", source.getValue().getMean()); + sourceStatistics.put("min", source.getValue().getMin()); + sourceStatistics.put("p50", source.getValue().getPercentile(50)); + sourceStatistics.put("p95", source.getValue().getPercentile(95)); + sourceStatistics.put("p99", source.getValue().getPercentile(99)); + ret.put(source.getKey().toString(), sourceStatistics); + } + return ret; + // Concurrent access onto the "latencyStats" map could cause + // ConcurrentModificationExceptions. To avoid unnecessary blocking + // of the reportLatency() method, we retry this operation until + // it succeeds. + } catch(ConcurrentModificationException ignore) { + LOG.debug("Unable to report latency statistics", ignore); + } + } + } + } + + /** + * Identifier for a latency source + */ + private static class LatencySourceDescriptor { + /** + * A unique ID identifying a logical source in Flink + */ + private final int vertexID; + + /** + * Identifier for parallel subtasks of a logical source + */ + private final int subtaskIndex; + + /** + * + * @param marker The latency marker to extract the LatencySourceDescriptor from. + * @param ignoreSubtaskIndex Set to true to ignore the subtask index, to treat the latencies from all the parallel instances of a source as the same. + * @return A LatencySourceDescriptor for the given marker. + */ + public static LatencySourceDescriptor of(LatencyMarker marker, boolean ignoreSubtaskIndex) { + if (ignoreSubtaskIndex) { + return new LatencySourceDescriptor(marker.getVertexID(), -1); + } else { + return new LatencySourceDescriptor(marker.getVertexID(), marker.getSubtaskIndex()); + } + + } + + private LatencySourceDescriptor(int vertexID, int subtaskIndex) { + this.vertexID = vertexID; + this.subtaskIndex = subtaskIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + LatencySourceDescriptor that = (LatencySourceDescriptor) o; + + if (vertexID != that.vertexID) { + return false; + } + return subtaskIndex == that.subtaskIndex; + } + + @Override + public int hashCode() { + int result = vertexID; + result = 31 * result + subtaskIndex; + return result; + } + + @Override + public String toString() { + return "LatencySourceDescriptor{" + + "vertexID=" + vertexID + + ", subtaskIndex=" + subtaskIndex + + '}'; + } + } + public class CountingOutput implements Output> { private final Output> output; private final Counter numRecordsOut; @@ -379,6 +558,11 @@ public void emitWatermark(Watermark mark) { output.emitWatermark(mark); } + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + output.emitLatencyMarker(latencyMarker); + } + @Override public void collect(StreamRecord record) { numRecordsOut.inc(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 72f30b87f0db8..5cccaf790a61c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -89,8 +89,8 @@ public F getUserFunction() { @Override - public void setup(StreamTask containingTask, StreamConfig config, Output> output) { - super.setup(containingTask, config, output); + public void setup(StreamTask containingTask, StreamConfig config, Output> output, boolean isSink) { + super.setup(containingTask, config, output, isSink); FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext()); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java index 323feb52e0d95..d9de2302d85a6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** @@ -46,4 +47,6 @@ public interface OneInputStreamOperator extends StreamOperator { * @see org.apache.flink.streaming.api.watermark.Watermark */ void processWatermark(Watermark mark) throws Exception; + + void processLatencyMarker(LatencyMarker latencyMarker) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java index 4a7002fa11186..ec2409e15a9cf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.util.Collector; /** @@ -39,4 +40,6 @@ public interface Output extends Collector { * timestamp will be emitted in the future. */ void emitWatermark(Watermark mark); + + void emitLatencyMarker(LatencyMarker latencyMarker); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java deleted file mode 100644 index 8835032921c65..0000000000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -@Internal -public class StreamCounter extends AbstractStreamOperator implements OneInputStreamOperator { - - private static final long serialVersionUID = 1L; - - private Long count = 0L; - - public StreamCounter() { - chainingStrategy = ChainingStrategy.ALWAYS; - } - - @Override - public void processElement(StreamRecord element) { - output.collect(element.replace(++count)); - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - output.emitWatermark(mark); - } -} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java index b11e22c04fca2..229c25444eecf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java @@ -70,5 +70,4 @@ public void processWatermark(Watermark mark) throws Exception { output.emitWatermark(mark); } - } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index fae5fd063b599..61ef4b08b0794 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -54,7 +54,7 @@ public interface StreamOperator extends Serializable { /** * Initializes the operator. Sets access to the context and the output. */ - void setup(StreamTask containingTask, StreamConfig config, Output> output); + void setup(StreamTask containingTask, StreamConfig config, Output> output, boolean isSink); /** * This method is called immediately before any elements are processed, it should contain the diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java index 9fa2039929a9b..3cf7602e5dc31 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java @@ -30,7 +30,7 @@ public class StreamSink extends AbstractUdfStreamOperator sinkFunction) { super(sinkFunction); - + isSink = true; chainingStrategy = ChainingStrategy.ALWAYS; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 1409ae4403b60..a07e6b77b4230 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -21,8 +21,14 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + /** * {@link StreamOperator} for streaming sources. * @@ -53,6 +59,13 @@ public void run(final Object lockingObject) throws Exception { public void run(final Object lockingObject, final Output> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); + + LatencyMarksEmitter latencyEmitter = null; + if(getExecutionConfig().isLatencyTrackingEnabled()) { + latencyEmitter = new LatencyMarksEmitter<>(lockingObject, collector, getExecutionConfig().getLatencyTrackingInterval(), + getOperatorConfig().getVertexID(), getRuntimeContext().getIndexOfThisSubtask()); + } + final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.ctx = StreamSourceContexts.getSourceContext( @@ -70,6 +83,9 @@ public void run(final Object lockingObject, final Output> coll } finally { // make sure that the context is closed in any case ctx.close(); + if(latencyEmitter != null) { + latencyEmitter.close(); + } } } @@ -103,4 +119,30 @@ protected void markCanceledOrStopped() { protected boolean isCanceledOrStopped() { return canceledOrStopped; } + + private static class LatencyMarksEmitter { + private final ScheduledExecutorService scheduleExecutor; + private final ScheduledFuture latencyMarkTimer; + + public LatencyMarksEmitter(final Object lockingObject, final Output> output, long latencyTrackingInterval, final int vertexID, final int subtaskIndex) { + this.scheduleExecutor = Executors.newScheduledThreadPool(1); + this.latencyMarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + synchronized (lockingObject) { + output.emitLatencyMarker(new LatencyMarker(System.currentTimeMillis(), vertexID, subtaskIndex)); + } + } catch (Throwable t) { + LOG.warn("Error while emitting latency marker", t); + } + } + }, 0, latencyTrackingInterval, TimeUnit.MILLISECONDS); + } + + public void close() { + latencyMarkTimer.cancel(true); + scheduleExecutor.shutdownNow(); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java index d22583daa1f3e..e45fedf801b87 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** @@ -38,13 +39,13 @@ public interface TwoInputStreamOperator extends StreamOperator element) throws Exception; + void processElement1(StreamRecord element) throws Exception; /** * Processes one element that arrived on the second input of this two-input operator. * This method is guaranteed to not be called concurrently with other methods of the operator. */ - public void processElement2(StreamRecord element) throws Exception; + void processElement2(StreamRecord element) throws Exception; /** * Processes a {@link Watermark} that arrived on the first input of this two-input operator. @@ -52,7 +53,7 @@ public interface TwoInputStreamOperator extends StreamOperator extends StreamOperator implements Output> { public RecordWriterOutput( StreamRecordWriter>> recordWriter, TypeSerializer outSerializer, - boolean enableWatermarkMultiplexing) { + boolean enableMultiplexing) { checkNotNull(recordWriter); @@ -58,7 +59,7 @@ public RecordWriterOutput( (StreamRecordWriter) recordWriter; TypeSerializer outRecordSerializer; - if (enableWatermarkMultiplexing) { + if (enableMultiplexing) { outRecordSerializer = new MultiplexingStreamRecordSerializer(outSerializer); } else { outRecordSerializer = (TypeSerializer) @@ -94,6 +95,18 @@ public void emitWatermark(Watermark mark) { } } + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + serializationDelegate.setInstance(latencyMarker); + + try { + recordWriter.randomEmit(serializationDelegate); + } + catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + public void broadcastEvent(AbstractEvent barrier) throws IOException, InterruptedException { recordWriter.broadcastEvent(barrier); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 2dbc6d4b841db..47e55dcc54c13 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -89,7 +89,7 @@ public StreamInputProcessor( StatefulTask checkpointedTask, CheckpointingMode checkpointMode, IOManager ioManager, - boolean enableWatermarkMultiplexing) throws IOException { + boolean enableMultiplexing) throws IOException { InputGate inputGate = InputGateUtil.createInputGate(inputGates); @@ -107,13 +107,13 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { this.barrierHandler.registerCheckpointEventHandler(checkpointedTask); } - if (enableWatermarkMultiplexing) { - MultiplexingStreamRecordSerializer ser = new MultiplexingStreamRecordSerializer(inputSerializer); - this.deserializationDelegate = new NonReusingDeserializationDelegate(ser); + if (enableMultiplexing) { + MultiplexingStreamRecordSerializer ser = new MultiplexingStreamRecordSerializer<>(inputSerializer); + this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser); } else { StreamRecordSerializer ser = new StreamRecordSerializer(inputSerializer); this.deserializationDelegate = (NonReusingDeserializationDelegate) - (NonReusingDeserializationDelegate) new NonReusingDeserializationDelegate>(ser); + (NonReusingDeserializationDelegate) new NonReusingDeserializationDelegate<>(ser); } // Initialize one deserializer per input channel @@ -150,14 +150,14 @@ public boolean processInput(OneInputStreamOperator streamOperator, final } if (result.isFullRecord()) { - StreamElement recordOrWatermark = deserializationDelegate.getInstance(); + StreamElement recordOrMark = deserializationDelegate.getInstance(); - if (recordOrWatermark.isWatermark()) { - long watermarkMillis = recordOrWatermark.asWatermark().getTimestamp(); + if (recordOrMark.isWatermark()) { + long watermarkMillis = recordOrMark.asWatermark().getTimestamp(); if (watermarkMillis > watermarks[currentChannel]) { watermarks[currentChannel] = watermarkMillis; long newMinWatermark = Long.MAX_VALUE; - for (long watermark : watermarks) { + for (long watermark: watermarks) { newMinWatermark = Math.min(watermark, newMinWatermark); } if (newMinWatermark > lastEmittedWatermark) { @@ -168,9 +168,15 @@ public boolean processInput(OneInputStreamOperator streamOperator, final } } continue; + } else if(recordOrMark.isLatencyMarker()) { + // handle latency marker + synchronized (lock) { + streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); + } + continue; } else { // now we can do the actual processing - StreamRecord record = recordOrWatermark.asRecord(); + StreamRecord record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java index f46b366765685..6d5e89b3d5a2a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java @@ -98,6 +98,15 @@ public void broadcastEmit(T record) throws IOException, InterruptedException { } } + @Override + public void randomEmit(T record) throws IOException, InterruptedException { + checkErroneous(); + super.randomEmit(record); + if (flushAlways) { + flush(); + } + } + /** * Closes the writer. This stops the flushing thread (if there is one). */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 70ce783621487..a25c1a195e229 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -40,7 +40,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import java.io.IOException; @@ -97,7 +96,7 @@ public StreamTwoInputProcessor( StatefulTask checkpointedTask, CheckpointingMode checkpointMode, IOManager ioManager, - boolean enableWatermarkMultiplexing) throws IOException { + boolean enableMultiplexing) throws IOException { final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); @@ -115,24 +114,24 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { this.barrierHandler.registerCheckpointEventHandler(checkpointedTask); } - if (enableWatermarkMultiplexing) { - MultiplexingStreamRecordSerializer ser = new MultiplexingStreamRecordSerializer(inputSerializer1); - this.deserializationDelegate1 = new NonReusingDeserializationDelegate(ser); + if (enableMultiplexing) { + MultiplexingStreamRecordSerializer ser = new MultiplexingStreamRecordSerializer<>(inputSerializer1); + this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser); } else { - StreamRecordSerializer ser = new StreamRecordSerializer(inputSerializer1); + StreamRecordSerializer ser = new StreamRecordSerializer<>(inputSerializer1); this.deserializationDelegate1 = (DeserializationDelegate) - (DeserializationDelegate) new NonReusingDeserializationDelegate>(ser); + (DeserializationDelegate) new NonReusingDeserializationDelegate<>(ser); } - if (enableWatermarkMultiplexing) { - MultiplexingStreamRecordSerializer ser = new MultiplexingStreamRecordSerializer(inputSerializer2); - this.deserializationDelegate2 = new NonReusingDeserializationDelegate(ser); + if (enableMultiplexing) { + MultiplexingStreamRecordSerializer ser = new MultiplexingStreamRecordSerializer<>(inputSerializer2); + this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser); } else { - StreamRecordSerializer ser = new StreamRecordSerializer(inputSerializer2); + StreamRecordSerializer ser = new StreamRecordSerializer<>(inputSerializer2); this.deserializationDelegate2 = (DeserializationDelegate) - (DeserializationDelegate) new NonReusingDeserializationDelegate>(ser); + (DeserializationDelegate) new NonReusingDeserializationDelegate<>(ser); } // Initialize one deserializer per input channel @@ -185,7 +184,13 @@ public boolean processInput(TwoInputStreamOperator streamOperator, if (currentChannel < numInputChannels1) { StreamElement recordOrWatermark = deserializationDelegate1.getInstance(); if (recordOrWatermark.isWatermark()) { - handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel, lock); + handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock); + continue; + } + else if (recordOrWatermark.isLatencyMarker()) { + synchronized (lock) { + streamOperator.processLatencyMarker1(recordOrWatermark.asLatencyMarker()); + } continue; } else { @@ -203,6 +208,12 @@ public boolean processInput(TwoInputStreamOperator streamOperator, handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock); continue; } + else if (recordOrWatermark.isLatencyMarker()) { + synchronized (lock) { + streamOperator.processLatencyMarker2(recordOrWatermark.asLatencyMarker()); + } + continue; + } else { synchronized (lock) { streamOperator.setKeyContextElement2(recordOrWatermark.asRecord()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java new file mode 100644 index 0000000000000..714bdae35ec6e --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamrecord; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Special record type carrying a timestamp of its creation time at a source operator + * and the vertexId and subtask index of the operator. + * + * At sinks, the marker can be used to approximate the time a record needs to travel + * through the dataflow. + */ +@PublicEvolving +public final class LatencyMarker extends StreamElement { + + // ------------------------------------------------------------------------ + + /** The time the latency mark is denoting */ + private final long markedTime; + + private final int vertexID; + + private final int subtaskIndex; + + /** + * Creates a latency mark with the given timestamp + */ + public LatencyMarker(long markedTime, int vertexID, int subtaskIndex) { + this.markedTime = markedTime; + this.vertexID = vertexID; + this.subtaskIndex = subtaskIndex; + } + + /** + * Returns the timestamp marked by the LatencyMarker + */ + public long getMarkedTime() { + return markedTime; + } + + public int getVertexID() { + return vertexID; + } + + public int getSubtaskIndex() { + return subtaskIndex; + } + + // ------------------------------------------------------------------------ + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + + LatencyMarker that = (LatencyMarker) o; + + if (markedTime != that.markedTime) { + return false; + } + if (vertexID != that.vertexID) { + return false; + } + return subtaskIndex == that.subtaskIndex; + + } + + @Override + public int hashCode() { + int result = (int) (markedTime ^ (markedTime >>> 32)); + result = 31 * result + vertexID; + result = 31 * result + subtaskIndex; + return result; + } + + @Override + public String toString() { + return "LatencyMarker{" + + "markedTime=" + markedTime + + ", vertexID=" + vertexID + + ", subtaskIndex=" + subtaskIndex + + '}'; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java index 832c4b6660f67..95e3ebda69931 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java @@ -43,6 +43,7 @@ public final class MultiplexingStreamRecordSerializer extends TypeSerializer< private static final int TAG_REC_WITH_TIMESTAMP = 0; private static final int TAG_REC_WITHOUT_TIMESTAMP = 1; private static final int TAG_WATERMARK = 2; + private static final int TAG_LATENCY_MARKER = 3; private final TypeSerializer typeSerializer; @@ -95,7 +96,7 @@ public StreamElement copy(StreamElement from) { StreamRecord fromRecord = from.asRecord(); return fromRecord.copy(typeSerializer.copy(fromRecord.getValue())); } - else if (from.isWatermark()) { + else if (from.isWatermark() || from.isLatencyMarker()) { // is immutable return from; } @@ -114,7 +115,7 @@ public StreamElement copy(StreamElement from, StreamElement reuse) { fromRecord.copyTo(valueCopy, reuseRecord); return reuse; } - else if (from.isWatermark()) { + else if (from.isWatermark() || from.isLatencyMarker()) { // is immutable return from; } @@ -139,7 +140,11 @@ else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { else if (tag == TAG_WATERMARK) { target.writeLong(source.readLong()); } - else { + else if (tag == TAG_LATENCY_MARKER) { + target.writeLong(source.readLong()); + target.writeInt(source.readInt()); + target.writeInt(source.readInt()); + } else { throw new IOException("Corrupt stream, found tag: " + tag); } } @@ -161,6 +166,12 @@ else if (value.isWatermark()) { target.write(TAG_WATERMARK); target.writeLong(value.asWatermark().getTimestamp()); } + else if (value.isLatencyMarker()) { + target.write(TAG_LATENCY_MARKER); + target.writeLong(value.asLatencyMarker().getMarkedTime()); + target.writeInt(value.asLatencyMarker().getVertexID()); + target.writeInt(value.asLatencyMarker().getSubtaskIndex()); + } else { throw new RuntimeException(); } @@ -179,6 +190,9 @@ else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { else if (tag == TAG_WATERMARK) { return new Watermark(source.readLong()); } + else if (tag == TAG_LATENCY_MARKER) { + return new LatencyMarker(source.readLong(), source.readInt(), source.readInt()); + } else { throw new IOException("Corrupt stream, found tag: " + tag); } @@ -203,6 +217,9 @@ else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { else if (tag == TAG_WATERMARK) { return new Watermark(source.readLong()); } + else if (tag == TAG_LATENCY_MARKER) { + return new LatencyMarker(source.readLong(), source.readInt(), source.readInt()); + } else { throw new IOException("Corrupt stream, found tag: " + tag); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java index f6cccf72ca44b..62418bcd24f9e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java @@ -43,6 +43,14 @@ public final boolean isRecord() { return getClass() == StreamRecord.class; } + /** + * Checks whether this element is a record. + * @return True, if this element is a record, false otherwise. + */ + public final boolean isLatencyMarker() { + return getClass() == LatencyMarker.class; + } + /** * Casts this element into a StreamRecord. * @return This element as a stream record. @@ -61,4 +69,13 @@ public final StreamRecord asRecord() { public final Watermark asWatermark() { return (Watermark) this; } + + /** + * Casts this element into a LatencyMarker. + * @return This element as a LatencyMarker. + * @throws java.lang.ClassCastException Thrown, if this element is actually not a LatencyMarker. + */ + public final LatencyMarker asLatencyMarker() { + return (LatencyMarker) this; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 0a6534bc8b4dc..97546b83551da 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -46,7 +46,7 @@ public void init() throws Exception { this, configuration.getCheckpointMode(), getEnvironment().getIOManager(), - isSerializingTimestamps()); + isSerializingMixedStream()); // make sure that stream tasks report their I/O statistics AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 9e96f5d8fd5fa..5a978af76f8c9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -17,6 +17,13 @@ package org.apache.flink.streaming.runtime.tasks; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; @@ -29,24 +36,22 @@ import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput; import org.apache.flink.streaming.api.collector.selector.DirectedOutput; import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.StreamRecordWriter; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.apache.flink.util.XORShiftRandom; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; /** * The {@code OperatorChain} contains all operators that are executed as one chain within a single @@ -72,7 +77,7 @@ public OperatorChain(StreamTask containingTask, AccumulatorRegistry.Rep final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader(); final StreamConfig configuration = containingTask.getConfiguration(); - final boolean enableTimestamps = containingTask.isSerializingTimestamps(); + final boolean enableMultiplexing = containingTask.isSerializingMixedStream(); headOperator = configuration.getStreamOperator(userCodeClassloader); @@ -94,7 +99,7 @@ public OperatorChain(StreamTask containingTask, AccumulatorRegistry.Rep RecordWriterOutput streamOutput = createStreamOutput( outEdge, chainedConfigs.get(outEdge.getSourceId()), i, - containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName()); + containingTask.getEnvironment(), enableMultiplexing, reporter, containingTask.getName()); this.streamOutputs[i] = streamOutput; streamOutputMap.put(outEdge, streamOutput); @@ -106,7 +111,7 @@ public OperatorChain(StreamTask containingTask, AccumulatorRegistry.Rep chainedConfigs, userCodeClassloader, streamOutputMap, allOps); if (headOperator != null) { - headOperator.setup(containingTask, configuration, getChainEntryPoint()); + headOperator.setup(containingTask, configuration, getChainEntryPoint(), streamOutputs.length == 0); } // add head operator to end of chain @@ -285,7 +290,7 @@ private static Output> createChainedOperator( // now create the operator and give it the output collector to write its output to OneInputStreamOperator chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output); + chainedOperator.setup(containingTask, operatorConfig, output, streamOutputs.size() == 0); allOperators.add(chainedOperator); @@ -300,7 +305,7 @@ private static Output> createChainedOperator( private static RecordWriterOutput createStreamOutput( StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, - Environment taskEnvironment, boolean withTimestamps, + Environment taskEnvironment, boolean enableMultiplexing, AccumulatorRegistry.Reporter reporter, String taskName) { TypeSerializer outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); @@ -317,7 +322,7 @@ private static RecordWriterOutput createStreamOutput( output.setReporter(reporter); output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup()); - return new RecordWriterOutput(output, outSerializer, withTimestamps); + return new RecordWriterOutput<>(output, outSerializer, enableMultiplexing); } // ------------------------------------------------------------------------ @@ -356,6 +361,16 @@ public void emitWatermark(Watermark mark) { } } + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + try { + operator.processLatencyMarker(latencyMarker); + } + catch (Exception e) { + throw new ExceptionInChainedOperatorException(e); + } + } + @Override public void close() { try { @@ -393,6 +408,8 @@ public void collect(StreamRecord record) { private static class BroadcastingOutputCollector implements Output> { protected final Output>[] outputs; + + private final Random RNG = new XORShiftRandom(); public BroadcastingOutputCollector(Output>[] outputs) { this.outputs = outputs; @@ -405,6 +422,18 @@ public void emitWatermark(Watermark mark) { } } + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + if(outputs.length <= 0) { + // ignore + } else if(outputs.length == 1) { + outputs[0].emitLatencyMarker(latencyMarker); + } else { + // randomly select an output + outputs[RNG.nextInt(outputs.length)].emitLatencyMarker(latencyMarker); + } + } + @Override public void collect(StreamRecord record) { for (Output> output : outputs) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java index 58e3cb808d9ea..1c2aba0ba3432 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +59,7 @@ public void init() throws Exception { LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID); this.headOperator = new RecordPusher<>(); - this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime)); + this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime), false); // TODO is 'false' here correct? } private static class RecordPusher extends AbstractStreamOperator implements OneInputStreamOperator { @@ -74,6 +75,11 @@ public void processElement(StreamRecord record) throws Exception { public void processWatermark(Watermark mark) { // ignore } + + @Override + public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { + // ignore + } } private static class IterationTailOutput implements Output> { @@ -95,6 +101,10 @@ private static class IterationTailOutput implements Output> public void emitWatermark(Watermark mark) { } + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + } + @Override public void collect(StreamRecord record) { try { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 4893fedd4e0f3..2e6ebf3c077c5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -461,6 +461,14 @@ boolean isSerializingTimestamps() { return tc == TimeCharacteristic.EventTime | tc == TimeCharacteristic.IngestionTime; } + /** + * Check if the tasks is sending a mixed stream (of watermarks, latency marks and records) + * @return true if stream contains more than just records + */ + protected boolean isSerializingMixedStream() { + return isSerializingTimestamps() || getExecutionConfig().isLatencyTrackingEnabled(); + } + // ------------------------------------------------------------------------ // Access to properties and utilities // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index fb089590f9f03..bc80607baebc5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -71,7 +71,7 @@ public void init() throws Exception { this, configuration.getCheckpointMode(), getEnvironment().getIOManager(), - isSerializingTimestamps()); + isSerializingMixedStream()); // make sure that stream tasks report their I/O statistics AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index c93a439d85be9..ee5c41416f031 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -41,6 +41,7 @@ import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.EvenOddOutputSelector; @@ -453,7 +454,17 @@ public void processWatermark1(Watermark mark) throws Exception {} public void processWatermark2(Watermark mark) throws Exception {} @Override - public void setup(StreamTask containingTask, StreamConfig config, Output> output) {} + public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception { + // ignore + } + + @Override + public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception { + // ignore + } + + @Override + public void setup(StreamTask containingTask, StreamConfig config, Output> output, boolean isSink) {} } private static class OutputTypeConfigurableOperationWithOneInput @@ -475,6 +486,11 @@ public void processElement(StreamRecord element) { @Override public void processWatermark(Watermark mark) {} + @Override + public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { + + } + @Override public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { tpeInformation = outTypeInfo; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java deleted file mode 100644 index dc8024cb6e579..0000000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import java.util.concurrent.ConcurrentLinkedQueue; - -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.TestHarnessUtil; -import org.junit.Test; - -/** - * Tests for {@link StreamCounter}. These test that: - * - *
    - *
  • Timestamps of processed elements match the input timestamp
  • - *
  • Watermarks are correctly forwarded
  • - *
- */ -public class StreamCounterTest { - - @Test - public void testCount() throws Exception { - StreamCounter operator = new StreamCounter(); - - OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator); - - long initialTime = 0L; - ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue(); - - testHarness.open(); - - testHarness.processElement(new StreamRecord("eins", initialTime + 1)); - testHarness.processElement(new StreamRecord("zwei", initialTime + 2)); - testHarness.processWatermark(new Watermark(initialTime + 2)); - testHarness.processElement(new StreamRecord("drei", initialTime + 3)); - - expectedOutput.add(new StreamRecord(1L, initialTime + 1)); - expectedOutput.add(new StreamRecord(2L, initialTime + 2)); - expectedOutput.add(new Watermark(initialTime + 2)); - expectedOutput.add(new StreamRecord(3L, initialTime + 3)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java index 32e8ea90dc2fb..e3bec1d77d86d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java @@ -153,7 +153,7 @@ public void invoke(String value) throws Exception { mockTask, mock(AccumulatorRegistry.Reporter.class)); - headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint()); + headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint(), false); for (StreamOperator operator : operatorChain.getAllOperators()) { if (operator != null) { @@ -295,7 +295,7 @@ public void invoke(String value) throws Exception { mockTask, mock(AccumulatorRegistry.Reporter.class)); - headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint()); + headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint(), false); for (StreamOperator operator : operatorChain.getAllOperators()) { if (operator != null) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java index 10b30d0b47a59..a98007192f8ff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java @@ -33,12 +33,14 @@ import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamSourceContexts; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.junit.Assert; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -65,7 +67,7 @@ public void testEmitMaxWatermarkForFiniteSource() throws Exception { final List output = new ArrayList<>(); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); operator.run(new Object(), new CollectorOutput(output)); assertEquals(1, output.size()); @@ -82,7 +84,7 @@ public void testNoMaxWatermarkOnImmediateCancel() throws Exception { new StreamSource<>(new InfiniteSource()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); operator.cancel(); // run and exit @@ -102,7 +104,7 @@ public void testNoMaxWatermarkOnAsyncCancel() throws Exception { new StreamSource<>(new InfiniteSource()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); // trigger an async cancel in a bit new Thread("canceler") { @@ -135,7 +137,7 @@ public void testNoMaxWatermarkOnImmediateStop() throws Exception { new StoppableStreamSource<>(new InfiniteSource()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); operator.stop(); // run and stop @@ -154,7 +156,7 @@ public void testNoMaxWatermarkOnAsyncStop() throws Exception { new StoppableStreamSource<>(new InfiniteSource()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); // trigger an async cancel in a bit new Thread("canceler") { @@ -172,7 +174,49 @@ public void run() { assertTrue(output.isEmpty()); } - + + /** + * Test that latency marks are emitted + */ + @Test + public void testLatencyMarkEmission() throws Exception { + final long now = System.currentTimeMillis(); + + final List output = new ArrayList<>(); + + // regular stream source operator + final StoppableStreamSource> operator = + new StoppableStreamSource<>(new InfiniteSource()); + + // emit latency marks every 10 milliseconds. + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 10, null); + + // trigger an async cancel in a bit + new Thread("canceler") { + @Override + public void run() { + try { + Thread.sleep(200); + } catch (InterruptedException ignored) {} + operator.stop(); + } + }.start(); + + // run and wait to be stopped + operator.run(new Object(), new CollectorOutput(output)); + + // ensure that there has been some output + assertTrue(output.size() > 0); + // and that its only latency markers + for(StreamElement se: output) { + Assert.assertTrue(se.isLatencyMarker()); + Assert.assertEquals(-1, se.asLatencyMarker().getVertexID()); + Assert.assertEquals(0, se.asLatencyMarker().getSubtaskIndex()); + Assert.assertTrue(se.asLatencyMarker().getMarkedTime() >= now); + } + } + + @Test public void testAutomaticWatermarkContext() throws Exception { @@ -184,7 +228,7 @@ public void testAutomaticWatermarkContext() throws Exception { TestTimeServiceProvider timeProvider = new TestTimeServiceProvider(); timeProvider.setCurrentTime(0); - setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, timeProvider); + setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, 0, timeProvider); final List output = new ArrayList<>(); @@ -218,10 +262,12 @@ public void testAutomaticWatermarkContext() throws Exception { private static void setupSourceOperator(StreamSource operator, TimeCharacteristic timeChar, long watermarkInterval, - final TestTimeServiceProvider timeProvider) { + long latencyMarkInterval, + final TimeServiceProvider timeProvider) { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setAutoWatermarkInterval(watermarkInterval); + executionConfig.setLatencyTrackingInterval(latencyMarkInterval); StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setStateBackend(new MemoryStateBackend()); @@ -245,7 +291,7 @@ public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable } }).when(mockTask).getTimerService(); - operator.setup(mockTask, cfg, (Output>) mock(Output.class)); + operator.setup(mockTask, cfg, (Output>) mock(Output.class), false); } // ------------------------------------------------------------------------ @@ -299,6 +345,11 @@ public void emitWatermark(Watermark mark) { list.add(mark); } + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + list.add(latencyMarker); + } + @Override public void collect(StreamRecord record) { list.add(record); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 8a3d91945eedc..ccd1c69647272 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -193,7 +193,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); op.open(); assertTrue(op.getNextSlideTime() % 1000 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); @@ -201,7 +201,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); op.open(); assertTrue(op.getNextSlideTime() % 1000 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); @@ -209,7 +209,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); op.open(); assertTrue(op.getNextSlideTime() % 500 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); @@ -217,7 +217,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); op.open(); assertTrue(op.getNextSlideTime() % 100 == 0); assertTrue(op.getNextEvaluationTime() % 1100 == 0); @@ -249,7 +249,7 @@ public void testTumblingWindow() throws Exception { IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSize); - op.setup(mockTask, new StreamConfig(new Configuration()), out); + op.setup(mockTask, new StreamConfig(new Configuration()), out, false); op.open(); final int numElements = 1000; @@ -312,7 +312,7 @@ public void testSlidingWindow() throws Exception { validatingIdentityFunction, identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50); - op.setup(mockTask, new StreamConfig(new Configuration()), out); + op.setup(mockTask, new StreamConfig(new Configuration()), out, false); op.open(); final int numElements = 1000; @@ -384,7 +384,7 @@ public void testTumblingWindowSingleElements() throws Exception { validatingIdentityFunction, identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50); - op.setup(mockTask, new StreamConfig(new Configuration()), out); + op.setup(mockTask, new StreamConfig(new Configuration()), out, false); op.open(); synchronized (lock) { @@ -451,7 +451,7 @@ public void testSlidingWindowSingleElements() throws Exception { validatingIdentityFunction, identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50); - op.setup(mockTask, new StreamConfig(new Configuration()), out); + op.setup(mockTask, new StreamConfig(new Configuration()), out, false); op.open(); synchronized (lock) { @@ -794,6 +794,7 @@ public void apply(Integer key, when(env.getTaskInfo()).thenReturn(new TaskInfo("Test task name", 1, 0, 1, 0)); when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader()); when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup()); + when(env.getTaskManagerInfo()).thenReturn(new TaskManagerRuntimeInfo("foo", new Configuration(), "foo")); when(task.getEnvironment()).thenReturn(env); return task; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 12a842f415c52..16f291a53d2cd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -201,7 +201,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000); - op.setup(mockTask, createTaskConfig(mockKeySelector, StringSerializer.INSTANCE, 10), mockOut); + op.setup(mockTask, createTaskConfig(mockKeySelector, StringSerializer.INSTANCE, 10), mockOut, false); op.open(); assertTrue(op.getNextSlideTime() % 1000 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); @@ -209,7 +209,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); op.open(); assertTrue(op.getNextSlideTime() % 1000 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); @@ -217,7 +217,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); op.open(); assertTrue(op.getNextSlideTime() % 500 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); @@ -225,7 +225,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); op.open(); assertTrue(op.getNextSlideTime() % 100 == 0); assertTrue(op.getNextEvaluationTime() % 1100 == 0); @@ -257,7 +257,7 @@ public void testTumblingWindowUniqueElements() throws Exception { final StreamTask mockTask = createMockTaskWithTimer(timerService, lock); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); + op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out, false); op.open(); final int numElements = 1000; @@ -325,7 +325,7 @@ public void testTumblingWindowDuplicateElements() throws Exception { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSize); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); + op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out, false); op.open(); final int numWindows = 10; @@ -404,7 +404,7 @@ public void testSlidingWindow() throws Exception { IntSerializer.INSTANCE, tupleSerializer, 150, 50); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); + op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out, false); op.open(); final int numElements = 1000; @@ -484,7 +484,7 @@ public void testSlidingWindowSingleElements() throws Exception { sumFunction, fieldOneSelector, IntSerializer.INSTANCE, tupleSerializer, 150, 50); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); + op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out, false); op.open(); synchronized (lock) { @@ -558,7 +558,7 @@ public void testPropagateExceptionsFromProcessElement() throws Exception { IntSerializer.INSTANCE, tupleSerializer, hundredYears, hundredYears); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); + op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out, false); op.open(); for (int i = 0; i < 100; i++) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java index 3c1c24b4e0334..42be1311a73a2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java @@ -20,6 +20,7 @@ import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.ArrayList; @@ -60,7 +61,12 @@ public void waitForNElements(int n, long timeout) throws InterruptedException { @Override public void emitWatermark(Watermark mark) { - throw new UnsupportedOperationException("the output should not emit watermarks"); + throw new UnsupportedOperationException("The output should not emit watermarks"); + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + throw new UnsupportedOperationException("The output should not emit latency markers"); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java index 7abd2f9e4ca93..bf3a488740b39 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; public class MockOutput implements Output> { @@ -43,6 +44,11 @@ public void emitWatermark(Watermark mark) { throw new RuntimeException("THIS MUST BE IMPLEMENTED"); } + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + throw new RuntimeException(); + } + @Override public void close() { } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 9d8e6a5ea42a8..1dda01992f813 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -202,7 +203,7 @@ public ConcurrentLinkedQueue getOutput() { * {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} */ public void setup() throws Exception { - operator.setup(mockTask, config, new MockOutput()); + operator.setup(mockTask, config, new MockOutput(), false); setupCalled = true; } @@ -290,6 +291,11 @@ public void emitWatermark(Watermark mark) { outputList.add(mark); } + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + outputList.add(latencyMarker); + } + @Override public void collect(StreamRecord element) { if (outputSerializer == null) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index d848d2ae6d8ca..e016e81241afc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -73,7 +74,7 @@ public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator o when(mockTask.getEnvironment()).thenReturn(env); when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput()); + operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput(), false); } /** @@ -126,6 +127,11 @@ public void emitWatermark(Watermark mark) { outputList.add(mark); } + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + outputList.add(latencyMarker); + } + @Override @SuppressWarnings("unchecked") public void collect(StreamRecord element) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java index 2ed759d0b4e0f..5855214956f29 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java @@ -777,6 +777,7 @@ public void processElement(StreamRecord element) throws Exception { @Override public void processWatermark(Watermark mark) throws Exception {} + } public static class IdentityCoMap implements CoMapFunction { From 77a833860212c44846f4d47cad6d517f8b2f03a6 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Thu, 13 Oct 2016 16:05:31 +0200 Subject: [PATCH 2/2] Removed isSink --- .../flink/storm/wrappers/BoltWrapperTest.java | 14 +++++----- .../api/operators/AbstractStreamOperator.java | 27 +++++++------------ .../operators/AbstractUdfStreamOperator.java | 4 +-- .../api/operators/StreamOperator.java | 2 +- .../streaming/api/operators/StreamSink.java | 10 ++++++- .../runtime/tasks/OperatorChain.java | 4 +-- .../runtime/tasks/StreamIterationTail.java | 2 +- .../api/graph/StreamGraphGeneratorTest.java | 2 +- .../operators/StreamOperatorChainingTest.java | 4 +-- .../operators/StreamSourceOperatorTest.java | 2 +- ...ignedProcessingTimeWindowOperatorTest.java | 16 +++++------ ...ignedProcessingTimeWindowOperatorTest.java | 18 ++++++------- .../OneInputStreamOperatorTestHarness.java | 2 +- .../TwoInputStreamOperatorTestHarness.java | 2 +- 14 files changed, 55 insertions(+), 54 deletions(-) diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index 6bec01b1c4bd4..e0659da8095b3 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -155,7 +155,7 @@ private void testWrapper(final int numberOfAttributes) throws Exception { PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class), false); + wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); wrapper.open(); wrapper.processElement(record); @@ -191,7 +191,7 @@ public void testMultipleOutputStreams() throws Exception { } final BoltWrapper wrapper = new BoltWrapper(bolt, null, raw); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), output, false); + wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), output); wrapper.open(); final SplitStreamType splitRecord = new SplitStreamType(); @@ -244,7 +244,7 @@ public void testOpen() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper wrapper = new BoltWrapper(bolt); - wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class), false); + wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class)); wrapper.open(); verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class)); @@ -257,7 +257,7 @@ public void testOpen() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper wrapper = new BoltWrapper(bolt); - wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class), false); + wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class)); wrapper.open(); verify(bolt).prepare(same(stormConfig), any(TopologyContext.class), any(OutputCollector.class)); @@ -274,7 +274,7 @@ public void testOpen() throws Exception { TestDummyBolt testBolt = new TestDummyBolt(); BoltWrapper wrapper = new BoltWrapper(testBolt); - wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class), false); + wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class)); wrapper.open(); for (Entry entry : cfg.toMap().entrySet()) { @@ -301,7 +301,7 @@ public void testOpenSink() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper wrapper = new BoltWrapper(bolt); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class), false); + wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); wrapper.open(); verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNotNull(OutputCollector.class)); @@ -318,7 +318,7 @@ public void testClose() throws Exception { final BoltWrapper wrapper = new BoltWrapper(bolt); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class), false); + wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); wrapper.close(); wrapper.dispose(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 805ed1d5a8a34..77e4d9acaa9d9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -117,9 +117,6 @@ public abstract class AbstractStreamOperator /** Metric group for the operator */ protected MetricGroup metrics; - /** Flag indicating if this operator is a sink */ - protected transient boolean isSink = false; - protected LatencyGauge latencyGauge; // ------------------------------------------------------------------------ @@ -127,14 +124,13 @@ public abstract class AbstractStreamOperator // ------------------------------------------------------------------------ @Override - public void setup(StreamTask containingTask, StreamConfig config, Output> output, boolean isSink) { + public void setup(StreamTask containingTask, StreamConfig config, Output> output) { this.container = containingTask; this.config = config; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); - this.isSink = isSink; Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); if (historySize <= 0) { @@ -142,7 +138,7 @@ public void setup(StreamTask containingTask, StreamConfig config, Output>> { + protected static class LatencyGauge implements Gauge>> { private final Map latencyStats = new HashMap<>(); private final int historySize; - private final boolean ignoreSubtaskIndex; - LatencyGauge(int historySize, boolean ignoreSubtaskIndex) { + LatencyGauge(int historySize) { this.historySize = historySize; - this.ignoreSubtaskIndex = ignoreSubtaskIndex; } - public void reportLatency(LatencyMarker marker) { - LatencySourceDescriptor sourceDescriptor = LatencySourceDescriptor.of(marker, this.ignoreSubtaskIndex); + public void reportLatency(LatencyMarker marker, boolean isSink) { + LatencySourceDescriptor sourceDescriptor = LatencySourceDescriptor.of(marker, !isSink); DescriptiveStatistics sourceStats = latencyStats.get(sourceDescriptor); if (sourceStats == null) { // 512 element window (4 kb) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 5cccaf790a61c..72f30b87f0db8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -89,8 +89,8 @@ public F getUserFunction() { @Override - public void setup(StreamTask containingTask, StreamConfig config, Output> output, boolean isSink) { - super.setup(containingTask, config, output, isSink); + public void setup(StreamTask containingTask, StreamConfig config, Output> output) { + super.setup(containingTask, config, output); FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext()); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index 61ef4b08b0794..fae5fd063b599 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -54,7 +54,7 @@ public interface StreamOperator extends Serializable { /** * Initializes the operator. Sets access to the context and the output. */ - void setup(StreamTask containingTask, StreamConfig config, Output> output, boolean isSink); + void setup(StreamTask containingTask, StreamConfig config, Output> output); /** * This method is called immediately before any elements are processed, it should contain the diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java index 3cf7602e5dc31..bd0f574b43152 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @Internal @@ -30,7 +31,6 @@ public class StreamSink extends AbstractUdfStreamOperator sinkFunction) { super(sinkFunction); - isSink = true; chainingStrategy = ChainingStrategy.ALWAYS; } @@ -43,4 +43,12 @@ public void processElement(StreamRecord element) throws Exception { public void processWatermark(Watermark mark) throws Exception { // ignore it for now, we are a sink, after all } + + @Override + protected void reportOrForwardLatencyMarker(LatencyMarker maker) { + // all operators are tracking latencies + this.latencyGauge.reportLatency(maker, true); + + // sinks don't forward latency markers + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 5a978af76f8c9..7342b6d86988b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -111,7 +111,7 @@ public OperatorChain(StreamTask containingTask, AccumulatorRegistry.Rep chainedConfigs, userCodeClassloader, streamOutputMap, allOps); if (headOperator != null) { - headOperator.setup(containingTask, configuration, getChainEntryPoint(), streamOutputs.length == 0); + headOperator.setup(containingTask, configuration, getChainEntryPoint()); } // add head operator to end of chain @@ -290,7 +290,7 @@ private static Output> createChainedOperator( // now create the operator and give it the output collector to write its output to OneInputStreamOperator chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output, streamOutputs.size() == 0); + chainedOperator.setup(containingTask, operatorConfig, output); allOperators.add(chainedOperator); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java index 1c2aba0ba3432..a5f94ad6f59dc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java @@ -59,7 +59,7 @@ public void init() throws Exception { LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID); this.headOperator = new RecordPusher<>(); - this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime), false); // TODO is 'false' here correct? + this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime)); } private static class RecordPusher extends AbstractStreamOperator implements OneInputStreamOperator { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index ee5c41416f031..aa86304df0627 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -464,7 +464,7 @@ public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception } @Override - public void setup(StreamTask containingTask, StreamConfig config, Output> output, boolean isSink) {} + public void setup(StreamTask containingTask, StreamConfig config, Output> output) {} } private static class OutputTypeConfigurableOperationWithOneInput diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java index e3bec1d77d86d..32e8ea90dc2fb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java @@ -153,7 +153,7 @@ public void invoke(String value) throws Exception { mockTask, mock(AccumulatorRegistry.Reporter.class)); - headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint(), false); + headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint()); for (StreamOperator operator : operatorChain.getAllOperators()) { if (operator != null) { @@ -295,7 +295,7 @@ public void invoke(String value) throws Exception { mockTask, mock(AccumulatorRegistry.Reporter.class)); - headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint(), false); + headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint()); for (StreamOperator operator : operatorChain.getAllOperators()) { if (operator != null) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java index a98007192f8ff..42087b4b55849 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java @@ -291,7 +291,7 @@ public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable } }).when(mockTask).getTimerService(); - operator.setup(mockTask, cfg, (Output>) mock(Output.class), false); + operator.setup(mockTask, cfg, (Output>) mock(Output.class)); } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index ccd1c69647272..51e61a1ed9f9a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -193,7 +193,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); op.open(); assertTrue(op.getNextSlideTime() % 1000 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); @@ -201,7 +201,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); op.open(); assertTrue(op.getNextSlideTime() % 1000 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); @@ -209,7 +209,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); op.open(); assertTrue(op.getNextSlideTime() % 500 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); @@ -217,7 +217,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); op.open(); assertTrue(op.getNextSlideTime() % 100 == 0); assertTrue(op.getNextEvaluationTime() % 1100 == 0); @@ -249,7 +249,7 @@ public void testTumblingWindow() throws Exception { IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSize); - op.setup(mockTask, new StreamConfig(new Configuration()), out, false); + op.setup(mockTask, new StreamConfig(new Configuration()), out); op.open(); final int numElements = 1000; @@ -312,7 +312,7 @@ public void testSlidingWindow() throws Exception { validatingIdentityFunction, identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50); - op.setup(mockTask, new StreamConfig(new Configuration()), out, false); + op.setup(mockTask, new StreamConfig(new Configuration()), out); op.open(); final int numElements = 1000; @@ -384,7 +384,7 @@ public void testTumblingWindowSingleElements() throws Exception { validatingIdentityFunction, identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50); - op.setup(mockTask, new StreamConfig(new Configuration()), out, false); + op.setup(mockTask, new StreamConfig(new Configuration()), out); op.open(); synchronized (lock) { @@ -451,7 +451,7 @@ public void testSlidingWindowSingleElements() throws Exception { validatingIdentityFunction, identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50); - op.setup(mockTask, new StreamConfig(new Configuration()), out, false); + op.setup(mockTask, new StreamConfig(new Configuration()), out); op.open(); synchronized (lock) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 16f291a53d2cd..12a842f415c52 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -201,7 +201,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000); - op.setup(mockTask, createTaskConfig(mockKeySelector, StringSerializer.INSTANCE, 10), mockOut, false); + op.setup(mockTask, createTaskConfig(mockKeySelector, StringSerializer.INSTANCE, 10), mockOut); op.open(); assertTrue(op.getNextSlideTime() % 1000 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); @@ -209,7 +209,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); op.open(); assertTrue(op.getNextSlideTime() % 1000 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); @@ -217,7 +217,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); op.open(); assertTrue(op.getNextSlideTime() % 500 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); @@ -225,7 +225,7 @@ public void testWindowTriggerTimeAlignment() throws Exception { op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut, false); + op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); op.open(); assertTrue(op.getNextSlideTime() % 100 == 0); assertTrue(op.getNextEvaluationTime() % 1100 == 0); @@ -257,7 +257,7 @@ public void testTumblingWindowUniqueElements() throws Exception { final StreamTask mockTask = createMockTaskWithTimer(timerService, lock); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out, false); + op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); op.open(); final int numElements = 1000; @@ -325,7 +325,7 @@ public void testTumblingWindowDuplicateElements() throws Exception { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSize); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out, false); + op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); op.open(); final int numWindows = 10; @@ -404,7 +404,7 @@ public void testSlidingWindow() throws Exception { IntSerializer.INSTANCE, tupleSerializer, 150, 50); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out, false); + op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); op.open(); final int numElements = 1000; @@ -484,7 +484,7 @@ public void testSlidingWindowSingleElements() throws Exception { sumFunction, fieldOneSelector, IntSerializer.INSTANCE, tupleSerializer, 150, 50); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out, false); + op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); op.open(); synchronized (lock) { @@ -558,7 +558,7 @@ public void testPropagateExceptionsFromProcessElement() throws Exception { IntSerializer.INSTANCE, tupleSerializer, hundredYears, hundredYears); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out, false); + op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); op.open(); for (int i = 0; i < 100; i++) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 1dda01992f813..d1622ffaf3ca2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -203,7 +203,7 @@ public ConcurrentLinkedQueue getOutput() { * {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} */ public void setup() throws Exception { - operator.setup(mockTask, config, new MockOutput(), false); + operator.setup(mockTask, config, new MockOutput()); setupCalled = true; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index e016e81241afc..32b4c77b7fc78 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -74,7 +74,7 @@ public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator o when(mockTask.getEnvironment()).thenReturn(env); when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput(), false); + operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput()); } /**