From bfe22cf7ea72941056fbbb657094ec75e58c74c4 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 16 Feb 2016 10:37:59 +0100 Subject: [PATCH 1/2] [FLINK-3379] [FLINK-3415] [streaming] Refactor TimestampExtractor into two separate classes - one class handled periodic watermarks - the other class handled watermarks triggered by elements This also makes sure that any timestamp assigner / watermark generators cannot generate negative watermarks --- .../streaming/examples/join/WindowJoin.java | 18 +- .../ml/IncrementalLearningSkeleton.java | 16 +- .../examples/windowing/TopSpeedWindowing.java | 21 +- .../java/org/apache/flink/cep/CEPITCase.java | 60 ++---- .../streaming/api/datastream/DataStream.java | 107 +++++++++- .../AssignerWithPeriodicWatermarks.java | 71 +++++++ .../AssignerWithPunctuatedWatermarks.java | 77 +++++++ .../api/functions/TimestampAssigner.java | 53 +++++ .../api/functions/TimestampExtractor.java | 16 +- .../operators/AbstractUdfStreamOperator.java | 2 +- .../api/operators/TimestampedCollector.java | 4 +- .../operators/ExtractTimestampsOperator.java | 10 +- ...mestampsAndPeriodicWatermarksOperator.java | 89 ++++++++ ...stampsAndPunctuatedWatermarksOperator.java | 75 +++++++ .../api/complex/ComplexIntegrationTest.java | 64 ++---- ...ampsAndPeriodicWatermarksOperatorTest.java | 154 ++++++++++++++ ...psAndPunctuatedWatermarksOperatorTest.java | 95 +++++++++ .../windowing/CoGroupJoinITCase.java | 104 ++++----- .../operators/windowing/WindowFoldITCase.java | 65 +++--- .../streaming/timestamp/TimestampITCase.java | 201 ++++++++++-------- .../OneInputStreamOperatorTestHarness.java | 39 +++- .../streaming/api/scala/DataStream.scala | 80 ++++++- .../api/scala/CoGroupJoinITCase.scala | 62 +++--- .../api/scala/WindowFoldITCase.scala | 24 +-- 24 files changed, 1123 insertions(+), 384 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampAssigner.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index 2bb153cd0e090..2afccc8b9b704 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; @@ -51,6 +51,7 @@ *
  • write a simple streaming program. * */ +@SuppressWarnings("serial") public class WindowJoin { // ************************************************************************* @@ -211,23 +212,12 @@ public Tuple3 join(Tuple3 first } } - private static class MyTimestampExtractor implements TimestampExtractor> { - private static final long serialVersionUID = 1L; + private static class MyTimestampExtractor extends AscendingTimestampExtractor> { @Override - public long extractTimestamp(Tuple3 element, long currentTimestamp) { + public long extractAscendingTimestamp(Tuple3 element, long currentTimestamp) { return element.f0; } - - @Override - public long extractWatermark(Tuple3 element, long currentTimestamp) { - return element.f0 - 1; - } - - @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; - } } private static class NameKeySelector implements KeySelector, String> { diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java index 3f38312d28d2d..405a58659f5a0 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java @@ -20,7 +20,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; @@ -69,7 +69,7 @@ public static void main(String[] args) throws Exception { // build new model on every second of new data DataStream model = trainingData - .assignTimestamps(new LinearTimestamp()) + .assignTimestampsAndWatermarks(new LinearTimestamp()) .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS)) .apply(new PartialModelBuilder()); @@ -145,26 +145,20 @@ private Integer getTrainingData() throws InterruptedException { } } - public static class LinearTimestamp implements TimestampExtractor { + public static class LinearTimestamp implements AssignerWithPunctuatedWatermarks { private static final long serialVersionUID = 1L; private long counter = 0L; @Override - public long extractTimestamp(Integer element, long currentTimestamp) { + public long extractTimestamp(Integer element, long previousElementTimestamp) { return counter += 10L; } @Override - public long extractWatermark(Integer element, long currentTimestamp) { + public long checkAndGetNextWatermark(Integer lastElement, long extractedTimestamp) { return counter - 1; } - - @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; - } - } /** diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java index 5a56a40771d57..b0d1462ec1895 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java @@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; @@ -150,8 +150,7 @@ public void cancel() { } } - private static class ParseCarData extends - RichMapFunction> { + private static class ParseCarData extends RichMapFunction> { private static final long serialVersionUID = 1L; @Override @@ -162,25 +161,13 @@ public Tuple4 map(String record) { } } - private static class CarTimestamp implements TimestampExtractor> { + private static class CarTimestamp extends AscendingTimestampExtractor> { private static final long serialVersionUID = 1L; @Override - public long extractTimestamp(Tuple4 element, - long currentTimestamp) { + public long extractAscendingTimestamp(Tuple4 element, long previous) { return element.f3; } - - @Override - public long extractWatermark(Tuple4 element, - long currentTimestamp) { - return element.f3 - 1; - } - - @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; - } } // ************************************************************************* diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 7dcda4c5cedb7..b29dd92b6edde 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -27,8 +27,9 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -37,6 +38,7 @@ import java.util.Map; +@SuppressWarnings("serial") public class CEPITCase extends StreamingMultipleProgramsTestBase { private String resultPath; @@ -77,7 +79,6 @@ public void testSimplePatternCEP() throws Exception { ); Pattern pattern = Pattern.begin("start").where(new FilterFunction() { - private static final long serialVersionUID = 5681493970790509488L; @Override public boolean filter(Event value) throws Exception { @@ -86,7 +87,6 @@ public boolean filter(Event value) throws Exception { }) .followedBy("middle").subtype(SubEvent.class).where( new FilterFunction() { - private static final long serialVersionUID = 448591738315698540L; @Override public boolean filter(SubEvent value) throws Exception { @@ -95,7 +95,6 @@ public boolean filter(SubEvent value) throws Exception { } ) .followedBy("end").where(new FilterFunction() { - private static final long serialVersionUID = 6080276591060431966L; @Override public boolean filter(Event value) throws Exception { @@ -104,7 +103,6 @@ public boolean filter(Event value) throws Exception { }); DataStream result = CEP.pattern(input, pattern).select(new PatternSelectFunction() { - private static final long serialVersionUID = 1447462674590806097L; @Override public String select(Map pattern) { @@ -148,7 +146,6 @@ public void testSimpleKeyedPatternCEP() throws Exception { new Event(2, "end", 1.0), new Event(42, "end", 42.0) ).keyBy(new KeySelector() { - private static final long serialVersionUID = -2112041392652797483L; @Override public Integer getKey(Event value) throws Exception { @@ -157,7 +154,6 @@ public Integer getKey(Event value) throws Exception { }); Pattern pattern = Pattern.begin("start").where(new FilterFunction() { - private static final long serialVersionUID = 5681493970790509488L; @Override public boolean filter(Event value) throws Exception { @@ -166,7 +162,6 @@ public boolean filter(Event value) throws Exception { }) .followedBy("middle").subtype(SubEvent.class).where( new FilterFunction() { - private static final long serialVersionUID = 448591738315698540L; @Override public boolean filter(SubEvent value) throws Exception { @@ -175,7 +170,6 @@ public boolean filter(SubEvent value) throws Exception { } ) .followedBy("end").where(new FilterFunction() { - private static final long serialVersionUID = 6080276591060431966L; @Override public boolean filter(Event value) throws Exception { @@ -184,7 +178,6 @@ public boolean filter(Event value) throws Exception { }); DataStream result = CEP.pattern(input, pattern).select(new PatternSelectFunction() { - private static final long serialVersionUID = 1447462674590806097L; @Override public String select(Map pattern) { @@ -218,31 +211,21 @@ public void testSimplePatternEventTime() throws Exception { Tuple2.of(new Event(3, "end", 3.0), 3L), Tuple2.of(new Event(4, "end", 4.0), 10L), Tuple2.of(new Event(5, "middle", 5.0), 7L) - ).assignTimestamps(new TimestampExtractor>() { - private static final long serialVersionUID = 878281782188702293L; + ).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks>() { - private Long currentMaxTimestamp = Long.MIN_VALUE; + private long currentMaxTimestamp = -1; @Override - public long extractTimestamp(Tuple2 element, long currentTimestamp) { - if (currentMaxTimestamp < element.f1) { - currentMaxTimestamp = element.f1; - } - + public long extractTimestamp(Tuple2 element, long previousTimestamp) { + currentMaxTimestamp = Math.max(currentMaxTimestamp, element.f1); return element.f1; } - @Override - public long extractWatermark(Tuple2 element, long currentTimestamp) { - return currentMaxTimestamp - 5; - } - @Override public long getCurrentWatermark() { - return Long.MIN_VALUE; + return currentMaxTimestamp - 5; } }).map(new MapFunction, Event>() { - private static final long serialVersionUID = -5288731103938665328L; @Override public Event map(Tuple2 value) throws Exception { @@ -251,21 +234,18 @@ public Event map(Tuple2 value) throws Exception { }); Pattern pattern = Pattern.begin("start").where(new FilterFunction() { - private static final long serialVersionUID = 2601494641888389648L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } }).followedBy("middle").where(new FilterFunction() { - private static final long serialVersionUID = -3133506934766766660L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } }).followedBy("end").where(new FilterFunction() { - private static final long serialVersionUID = -8528031731858936269L; @Override public boolean filter(Event value) throws Exception { @@ -275,7 +255,6 @@ public boolean filter(Event value) throws Exception { DataStream result = CEP.pattern(input, pattern).select( new PatternSelectFunction() { - private static final long serialVersionUID = 1447462674590806097L; @Override public String select(Map pattern) { @@ -317,38 +296,27 @@ public void testSimpleKeyedPatternEventTime() throws Exception { Tuple2.of(new Event(1, "middle", 5.0), 7L), Tuple2.of(new Event(3, "middle", 6.0), 9L), Tuple2.of(new Event(3, "end", 7.0), 7L) - ).assignTimestamps(new TimestampExtractor>() { - private static final long serialVersionUID = 878281782188702293L; + ).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks>() { - private Long currentMaxTimestamp = Long.MIN_VALUE; + private long currentMaxTimestamp = -1L; @Override public long extractTimestamp(Tuple2 element, long currentTimestamp) { - if (currentMaxTimestamp < element.f1) { - currentMaxTimestamp = element.f1; - } - + currentMaxTimestamp = Math.max(element.f1, currentMaxTimestamp); return element.f1; } - @Override - public long extractWatermark(Tuple2 element, long currentTimestamp) { - return currentMaxTimestamp - 5; - } - @Override public long getCurrentWatermark() { - return Long.MIN_VALUE; + return currentMaxTimestamp - 5; } }).map(new MapFunction, Event>() { - private static final long serialVersionUID = -5288731103938665328L; @Override public Event map(Tuple2 value) throws Exception { return value.f0; } }).keyBy(new KeySelector() { - private static final long serialVersionUID = -3282946957177720879L; @Override public Integer getKey(Event value) throws Exception { @@ -357,21 +325,18 @@ public Integer getKey(Event value) throws Exception { }); Pattern pattern = Pattern.begin("start").where(new FilterFunction() { - private static final long serialVersionUID = 2601494641888389648L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } }).followedBy("middle").where(new FilterFunction() { - private static final long serialVersionUID = -3133506934766766660L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } }).followedBy("end").where(new FilterFunction() { - private static final long serialVersionUID = -8528031731858936269L; @Override public boolean filter(Event value) throws Exception { @@ -381,7 +346,6 @@ public boolean filter(Event value) throws Exception { DataStream result = CEP.pattern(input, pattern).select( new PatternSelectFunction() { - private static final long serialVersionUID = 1447462674590806097L; @Override public String select(Map pattern) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 5d0c627d33fa3..d20d34700e260 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -47,6 +47,8 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.TimestampExtractor; import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; @@ -73,6 +75,8 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator; +import org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator; +import org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; @@ -735,6 +739,10 @@ public AllWindowedStream windowAll(WindowAssigner(this, assigner); } + // ------------------------------------------------------------------------ + // Timestamps and watermarks + // ------------------------------------------------------------------------ + /** * Extracts a timestamp from an element and assigns it as the internal timestamp of that element. * The internal timestamps are, for example, used to to event-time window operations. @@ -745,11 +753,15 @@ public AllWindowedStream windowAll(WindowAssigner assignTimestamps(TimestampExtractor extractor) { // match parallelism to input, otherwise dop=1 sources could lead to some strange // behaviour: the watermark will creep along very slowly because the elements @@ -760,6 +772,95 @@ public AllWindowedStream windowAll(WindowAssignerThis method creates watermarks periodically (for example every second), based + * on the watermarks indicated by the given watermark generator. Even when no new elements + * in the stream arrive, the given watermark generator will be periodically checked for + * new watermarks. The interval in which watermarks are generated is defined in + * {@link ExecutionConfig#setAutoWatermarkInterval(long)}. + * + *

    Use this method for the common cases, where some characteristic over all elements + * should generate the watermarks, or where watermarks are simply trailing behind the + * wall clock time by a certain amount. + * + *

    For cases where watermarks should be created in an irregular fashion, for example + * based on certain markers that some element carry, use the + * {@link AssignerWithPunctuatedWatermarks}. + * + * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and + * watermark generator. + * @return The stream after the transformation, with assigned timestamps and watermarks. + * + * @see AssignerWithPeriodicWatermarks + * @see AssignerWithPunctuatedWatermarks + * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks) + */ + public SingleOutputStreamOperator assignTimestampsAndWatermarks( + AssignerWithPeriodicWatermarks timestampAndWatermarkAssigner) { + + // match parallelism to input, otherwise dop=1 sources could lead to some strange + // behaviour: the watermark will creep along very slowly because the elements + // from the source go to each extraction operator round robin. + final int inputParallelism = getTransformation().getParallelism(); + final AssignerWithPeriodicWatermarks cleanedAssigner = clean(timestampAndWatermarkAssigner); + + TimestampsAndPeriodicWatermarksOperator operator = + new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner); + + return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) + .setParallelism(inputParallelism); + } + + /** + * Assigns timestamps to the elements in the data stream and periodically creates + * watermarks to signal event time progress. + * + *

    This method creates watermarks based purely on stream elements. For each element + * that is handled via {@link AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)}, + * the {@link AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark(Object, long)} + * method is called, and a new watermark is emitted, if the returned watermark value is + * non-negative and greater than the previous watermark. + * + *

    This method is useful when the data stream embeds watermark elements, or certain elements + * carry a marker that can be used to determine the current event time watermark. + * This operation gives the programmer full control over the watermark generation. Users + * should be aware that too aggressive watermark generation (i.e., generating hundreds of + * watermarks every second) can cost some performance. + * + *

    For cases where watermarks should be created in a regular fashion, for example + * every x milliseconds, use the {@link AssignerWithPeriodicWatermarks}. + * + * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and + * watermark generator. + * @return The stream after the transformation, with assigned timestamps and watermarks. + * + * @see AssignerWithPunctuatedWatermarks + * @see AssignerWithPeriodicWatermarks + * @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks) + */ + public SingleOutputStreamOperator assignTimestampsAndWatermarks( + AssignerWithPunctuatedWatermarks timestampAndWatermarkAssigner) { + + // match parallelism to input, otherwise dop=1 sources could lead to some strange + // behaviour: the watermark will creep along very slowly because the elements + // from the source go to each extraction operator round robin. + final int inputParallelism = getTransformation().getParallelism(); + final AssignerWithPunctuatedWatermarks cleanedAssigner = clean(timestampAndWatermarkAssigner); + + TimestampsAndPunctuatedWatermarksOperator operator = + new TimestampsAndPunctuatedWatermarksOperator<>(cleanedAssigner); + + return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) + .setParallelism(inputParallelism); + } + + // ------------------------------------------------------------------------ + // Data sinks + // ------------------------------------------------------------------------ + /** * Writes a DataStream to the standard output stream (stdout). * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java new file mode 100644 index 0000000000000..821dd94cbd737 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.api.common.ExecutionConfig; + +/** + * The {@code AssignerWithPeriodicWatermarks} assigns event time timestamps to elements, + * and generates low watermarks that signal event time progress within the stream. + * These timestamps and watermarks are used by functions and operators that operate + * on event time, for example event time windows. + * + *

    This class is used to generate watermarks in a periodical interval. + * At most every {@code i} milliseconds (configured via + * {@link ExecutionConfig#getAutoWatermarkInterval()}, the system will call the + * {@link #getCurrentWatermark()} method to probe for the next watermark value. + * The system will generate a new watermark, if the probed value is larger than + * zero and larger than the previous watermark. + * + *

    The system may call the {@link #getCurrentWatermark()} method less often than every + * {@code i} milliseconds, of no new elements arrived since the last call to the + * method. + * + *

    Timestamps and watermarks are defined as {@code longs} that represent the + * milliseconds since the Epoch (midnight, January 1, 1970 UTC). + * A watermark with a certain value {@code t} indicates that no elements with event + * timestamps {@code x}, where {@code x} is lower or equal to {@code t}, will occur any more. + * + * @param The type of the elements to which this assigner assigns timestamps. + * + * @see org.apache.flink.streaming.api.watermark.Watermark + */ +public interface AssignerWithPeriodicWatermarks extends TimestampAssigner { + + /** + * Returns the current watermark. This method is periodically called by the + * system to retrieve the current watermark. + * + *

    The current watermark will be emitted only if it is larger than the previously + * emitted watermark. If the current watermark is still identical to the previous + * one, no progress in event time has happened since the previous call to this method. + * + *

    If this method returns a value that is smaller than the previously returned watermark, + * then the implementation does not properly handle the event stream timestamps. + * In that case, the returned watermark will not be emitted (to preserve the contract of + * ascending watermarks), and the violation will be logged and registered in the metrics. + * + *

    The interval in which this method is called and Watermarks are generated + * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. + * + * @see org.apache.flink.streaming.api.watermark.Watermark + * @see ExecutionConfig#getAutoWatermarkInterval() + */ + long getCurrentWatermark(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java new file mode 100644 index 0000000000000..39f3bb712a85c --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +/** + * The {@code AssignerWithPunctuatedWatermarks} assigns event time timestamps to elements, + * and generates low watermarks that signal event time progress within the stream. + * + *

    Use these class if certain special elements act as markers that signify event time + * progress, and when you want to emit watermarks specifically at certain events. + * + *

    For use cases that should periodically emit watermarks based on element timestamps, + * use the {@link AssignerWithPeriodicWatermarks} instead. + * + *

    The following example illustrates how to use this timestamp extractor and watermark + * generator. It assumes elements carry a timestamp that describes when they were created, + * and that some elements carry a flag, marking them as the end of a sequence such that no + * elements with smaller timestamps can come any more. + * + *

    {@code
    + * public class WatermarkOnFlagAssigner implements AssignerWithPunctuatedWatermarks {
    + *
    + *     public long extractTimestamp(MyElement element, long previousElementTimestamp) {
    + *         return element.getSequenceTimestamp();
    + *     }
    + *
    + *     public long checkAndGetNextWatermark(MyElement lastElement, long extractedTimestamp) {
    + *         return lastElement.isEndOfSequence() ? extractedTimestamp : -1L;
    + *     }
    + * }
    + * }
    + * + *

    Timestamps and watermarks are defined as {@code longs} that represent the + * milliseconds since the Epoch (midnight, January 1, 1970 UTC). + * A watermark with a certain value {@code t} indicates that no elements with event + * timestamps {@code x}, where {@code x} is lower or equal to {@code t}, will occur any more. + * + * @param The type of the elements to which this assigner assigns timestamps. + * + * @see org.apache.flink.streaming.api.watermark.Watermark + */ +public interface AssignerWithPunctuatedWatermarks extends TimestampAssigner { + + /** + * Asks this implementation if it wants to emit a watermark. This method is called right after + * the {@link #extractTimestamp(Object, long)} method. If the method returns a positive + * value, a new watermark should be emitted. If a negative value is emitted, no new watermark + * will be generated. + * + *

    Note that whenever this method returns a positive value that is larger than the previous + * value, a new watermark is generated. Hence, the implementation has full control how often + * watermarks are generated. + * + *

    For an example how to use this method, see the documentation of + * {@link AssignerWithPunctuatedWatermarks this class}. + * + * @return A negative value, if no watermark should be emitted, positive value for + * emitting this value as a watermark. + */ + long checkAndGetNextWatermark(T lastElement, long extractedTimestamp); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampAssigner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampAssigner.java new file mode 100644 index 0000000000000..50fb0b4fec103 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampAssigner.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.api.common.functions.Function; + +/** + * A {@code TimestampAssigner} assigns event time timestamps to elements. + * These timestamps are used by all functions that operate on event time, + * for example event time windows. + * + *

    Timestamps are represented in milliseconds since the Epoch + * (midnight, January 1, 1970 UTC). + * + *

    A timestamp assigner that assigns to each element a timestamp via + * {@link System#currentTimeMillis()} effectively realizes "ingestion time" + * semantics. + * + * @param The type of the elements to which this assigner assigns timestamps. + */ +public interface TimestampAssigner extends Function { + + /** + * Assigns a timestamp to an element, in milliseconds since the Epoch. + * + *

    The method gets the previously assigned timestamp of the element. + * That previous timestamp may have been assigned from a previous assigner, + * by ingestion time, or be simply uninitialized. In the latter case, the + * timestamp will be a negative value. + * + * @param element The element that the timestamp is wil be assigned to. + * @param previousElementTimestamp The previous internal timestamp of the element, + * or a negative value, if no timestamp has been assigned, yet. + * @return The new timestamp. + */ + long extractTimestamp(T element, long previousElementTimestamp); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java index 6cc9d35bb3323..cb59b6a6c27b5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.Function; /** @@ -32,11 +32,15 @@ * {@link org.apache.flink.streaming.api.functions.AscendingTimestampExtractor}. This will * keep track of watermarks. * - * @see org.apache.flink.streaming.api.watermark.Watermark - * * @param The type of the elements that this function can extract timestamps from + * + * @deprecated This class has been replaced by {@link AssignerWithPeriodicWatermarks} and + * {@link AssignerWithPunctuatedWatermarks} + * + * @see AssignerWithPeriodicWatermarks + * @see AssignerWithPunctuatedWatermarks */ -@PublicEvolving +@Deprecated public interface TimestampExtractor extends Function { /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 200bf9518fcd6..86b07d64656cd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -187,8 +187,8 @@ public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { @Override public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { if (userFunction instanceof OutputTypeConfigurable) { + @SuppressWarnings("unchecked") OutputTypeConfigurable outputTypeConfigurable = (OutputTypeConfigurable) userFunction; - outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java index a5e9ef1a5d3a9..05b65c52aa1a0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java index 8d30e5a361d92..a4815dc831f08 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.runtime.operators; -import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.functions.TimestampExtractor; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -30,17 +29,20 @@ * from user elements and assigning them as the internal timestamp of the {@link StreamRecord}. * * @param The type of the input elements + * + * @deprecated Subsumed by {@link TimestampsAndPeriodicWatermarksOperator} and + * {@link TimestampsAndPunctuatedWatermarksOperator}. */ -@Internal +@Deprecated public class ExtractTimestampsOperator extends AbstractUdfStreamOperator> implements OneInputStreamOperator, Triggerable { private static final long serialVersionUID = 1L; - transient long watermarkInterval; + private transient long watermarkInterval; - transient long currentWatermark; + private transient long currentWatermark; public ExtractTimestampsOperator(TimestampExtractor extractor) { super(extractor); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java new file mode 100644 index 0000000000000..c7a14c7b35352 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * A stream operator that extracts timestamps from stream elements and + * generates periodic watermarks. + * + * @param The type of the input elements + */ +public class TimestampsAndPeriodicWatermarksOperator + extends AbstractUdfStreamOperator> + implements OneInputStreamOperator, Triggerable { + + private static final long serialVersionUID = 1L; + + private transient long watermarkInterval; + + private transient long currentWatermark; + + + public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks assigner) { + super(assigner); + chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + public void open() throws Exception { + super.open(); + + currentWatermark = -1L; + watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); + + if (watermarkInterval > 0) { + registerTimer(System.currentTimeMillis() + watermarkInterval, this); + } + } + + @Override + public void processElement(StreamRecord element) throws Exception { + long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.getTimestamp()); + output.collect(element.replace(element.getValue(), newTimestamp)); + } + + @Override + public void trigger(long timestamp) throws Exception { + // register next timer + long newWatermark = userFunction.getCurrentWatermark(); + if (newWatermark > currentWatermark) { + currentWatermark = newWatermark; + // emit watermark + output.emitWatermark(new Watermark(currentWatermark)); + } + + registerTimer(System.currentTimeMillis() + watermarkInterval, this); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + // if we receive a Long.MAX_VALUE watermark we forward it since it is used + // to signal the end of input and to not block watermark progress downstream + if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) { + currentWatermark = Long.MAX_VALUE; + output.emitWatermark(mark); + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java new file mode 100644 index 0000000000000..75649ebe144c6 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * A stream operator that extracts timestamps from stream elements and + * generates watermarks based on punctuation elements. + * + * @param The type of the input elements + */ +public class TimestampsAndPunctuatedWatermarksOperator + extends AbstractUdfStreamOperator> + implements OneInputStreamOperator { + + private static final long serialVersionUID = 1L; + + private transient long currentWatermark; + + + public TimestampsAndPunctuatedWatermarksOperator(AssignerWithPunctuatedWatermarks assigner) { + super(assigner); + chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + public void open() throws Exception { + super.open(); + currentWatermark = -1L; + } + + @Override + public void processElement(StreamRecord element) throws Exception { + final T value = element.getValue(); + final long newTimestamp = userFunction.extractTimestamp(value, element.getTimestamp()); + output.collect(element.replace(element.getValue(), newTimestamp)); + + final long nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp); + if (nextWatermark >= 0 && nextWatermark > currentWatermark) { + currentWatermark = nextWatermark; + output.emitWatermark(new Watermark(nextWatermark)); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + // if we receive a Long.MAX_VALUE watermark we forward it since it is used + // to signal the end of input and to not block watermark progress downstream + if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) { + currentWatermark = Long.MAX_VALUE; + output.emitWatermark(mark); + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java index 9297ae65b2d3f..8c29a156f4a9c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java @@ -37,7 +37,8 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -51,6 +52,7 @@ import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; + import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -279,16 +281,8 @@ public Tuple2 map(Long value) throws Exception { .window(GlobalWindows.create()) .trigger(PurgingTrigger.of(CountTrigger.of(10_000))) .sum(1) - -// .filter(new FilterFunction>() { -// -// @Override -// public boolean filter(Tuple2 value) throws Exception { -// return value.f0 < 100 || value.f0 > 19900; -// } -// }) + .print(); -// .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE); env.execute(); } @@ -483,7 +477,7 @@ public void complexIntegrationTest6() throws Exception { DataStream>> sourceStream6 = env.fromCollection(sales); sourceStream6 - .assignTimestamps(new Timestamp6()) + .assignTimestampsAndWatermarks(new Timestamp6()) .timeWindowAll(Time.of(1, TimeUnit.MILLISECONDS)) .reduce(new SalesReduceFunction()) .flatMap(new FlatMapFunction6()) @@ -558,23 +552,12 @@ public Tuple2> map(Tuple2> } } - private static class MyTimestampExtractor implements TimestampExtractor> { - private static final long serialVersionUID = 1L; - - @Override - public long extractTimestamp(Tuple5 value, long currentTimestamp) { - return (long) value.f0; - } + private static class MyTimestampExtractor extends AscendingTimestampExtractor> { - @Override - public long extractWatermark(Tuple5 value, - long currentTimestamp) { - return (long) value.f0 - 1; - } @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; + public long extractAscendingTimestamp(Tuple5 element, long currentTimestamp) { + return (long) element.f0; } } @@ -710,37 +693,28 @@ public void flatMap(Long value, Collector out) throws Exception { } } - private static class Timestamp6 implements TimestampExtractor>> { - + private static class Timestamp6 implements AssignerWithPunctuatedWatermarks>> { + @Override - public long extractTimestamp(Tuple2> value, - long currentTimestamp) { + public long extractTimestamp(Tuple2> value, long previousTimestamp) { + Calendar cal = Calendar.getInstance(); cal.setTime(value.f0); return 12 * (cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH); } @Override - public long extractWatermark(Tuple2> value, - long currentTimestamp) { - Calendar cal = Calendar.getInstance(); - cal.setTime(value.f0); - return 12 * (cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH) - 1; - } - - @Override - public long getCurrentWatermark() { - return 0; + public long checkAndGetNextWatermark(Tuple2> lastElement, long extractedTimestamp) { + return extractedTimestamp - 1; } } private static class SalesReduceFunction implements ReduceFunction>> { - private static final long serialVersionUID = 1L; @Override public Tuple2> reduce(Tuple2> value1, - Tuple2> value2) throws Exception { + Tuple2> value2) throws Exception { + HashMap map1 = value1.f1; HashMap map2 = value2.f1; for (Character key : map2.keySet()) { @@ -755,8 +729,8 @@ public Tuple2> reduce(Tuple2>, Tuple2>> { + private static class FlatMapFunction6 implements FlatMapFunction>, + Tuple2>> { @Override public void flatMap(Tuple2> value, Collector operator = + new TimestampsAndPeriodicWatermarksOperator(new LongExtractor()); + + final ExecutionConfig config = new ExecutionConfig(); + config.setAutoWatermarkInterval(50); + + OneInputStreamOperatorTestHarness testHarness = + new OneInputStreamOperatorTestHarness(operator, config); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1L, 1)); + testHarness.processElement(new StreamRecord<>(2L, 1)); + testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored + testHarness.processElement(new StreamRecord<>(3L, 3)); + testHarness.processElement(new StreamRecord<>(4L, 3)); + + // validate first part of the sequence. we poll elements until our + // watermark updates to "3", which must be the result of the "4" element. + { + ConcurrentLinkedQueue output = testHarness.getOutput(); + long nextElementValue = 1L; + long lastWatermark = -1L; + + while (lastWatermark < 3) { + if (output.size() > 0) { + Object next = output.poll(); + assertNotNull(next); + Tuple2 update = validateElement(next, nextElementValue, lastWatermark); + nextElementValue = update.f0; + lastWatermark = update.f1; + + // check the invariant + assertTrue(lastWatermark < nextElementValue); + } else { + Thread.sleep(10); + } + } + + output.clear(); + } + + testHarness.processElement(new StreamRecord<>(4L, 4)); + testHarness.processElement(new StreamRecord<>(5L, 4)); + testHarness.processElement(new StreamRecord<>(6L, 4)); + testHarness.processElement(new StreamRecord<>(7L, 4)); + testHarness.processElement(new StreamRecord<>(8L, 4)); + + // validate the next part of the sequence. we poll elements until our + // watermark updates to "7", which must be the result of the "8" element. + { + ConcurrentLinkedQueue output = testHarness.getOutput(); + long nextElementValue = 4L; + long lastWatermark = 2L; + + while (lastWatermark < 7) { + if (output.size() > 0) { + Object next = output.poll(); + assertNotNull(next); + Tuple2 update = validateElement(next, nextElementValue, lastWatermark); + nextElementValue = update.f0; + lastWatermark = update.f1; + + // check the invariant + assertTrue(lastWatermark < nextElementValue); + } else { + Thread.sleep(10); + } + } + + output.clear(); + } + + testHarness.processWatermark(new Watermark(Long.MAX_VALUE)); + assertEquals(Long.MAX_VALUE, ((Watermark) testHarness.getOutput().poll()).getTimestamp()); + } + + // ------------------------------------------------------------------------ + + private Tuple2 validateElement(Object element, long nextElementValue, long currentWatermark) { + if (element instanceof StreamRecord) { + @SuppressWarnings("unchecked") + StreamRecord record = (StreamRecord) element; + assertEquals(nextElementValue, record.getValue().longValue()); + assertEquals(nextElementValue, record.getTimestamp()); + return new Tuple2<>(nextElementValue + 1, currentWatermark); + } + else if (element instanceof Watermark) { + long wt = ((Watermark) element).getTimestamp(); + assertTrue(wt > currentWatermark); + return new Tuple2<>(nextElementValue, wt); + } + else { + throw new IllegalArgumentException("unrecognized element: " + element); + } + } + + // ------------------------------------------------------------------------ + + private static class LongExtractor implements AssignerWithPeriodicWatermarks { + private static final long serialVersionUID = 1L; + + private long currentTimestamp = -1L; + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + currentTimestamp = element; + return element; + } + + @Override + public long getCurrentWatermark() { + return currentTimestamp - 1; + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java new file mode 100644 index 0000000000000..5a5fce65832f5 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +public class TimestampsAndPunctuatedWatermarksOperatorTest { + + @Test + @SuppressWarnings("unchecked") + public void testTimestampsAndPeriodicWatermarksOperator() throws Exception { + + final TimestampsAndPunctuatedWatermarksOperator> operator = + new TimestampsAndPunctuatedWatermarksOperator<>(new PunctuatedExtractor()); + + OneInputStreamOperatorTestHarness, Tuple2> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>(3L, true), 0L)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>(5L, false), 0L)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>(4L, false), 0L)); + testHarness.processWatermark(new Watermark(10)); // this watermark should be ignored + testHarness.processElement(new StreamRecord<>(new Tuple2<>(4L, false), 0L)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>(4L, true), 0L)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>(9L, false), 0L)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>(5L, false), 0L)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>(7L, true), 0L)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>(10L, false), 0L)); + + testHarness.processWatermark(new Watermark(Long.MAX_VALUE)); + + ConcurrentLinkedQueue output = testHarness.getOutput(); + + assertEquals(3L, ((StreamRecord>) output.poll()).getTimestamp()); + assertEquals(3L, ((Watermark) output.poll()).getTimestamp()); + + assertEquals(5L, ((StreamRecord>) output.poll()).getTimestamp()); + assertEquals(4L, ((StreamRecord>) output.poll()).getTimestamp()); + assertEquals(4L, ((StreamRecord>) output.poll()).getTimestamp()); + assertEquals(4L, ((StreamRecord>) output.poll()).getTimestamp()); + assertEquals(4L, ((Watermark) output.poll()).getTimestamp()); + + assertEquals(9L, ((StreamRecord>) output.poll()).getTimestamp()); + assertEquals(5L, ((StreamRecord>) output.poll()).getTimestamp()); + assertEquals(7L, ((StreamRecord>) output.poll()).getTimestamp()); + assertEquals(7L, ((Watermark) output.poll()).getTimestamp()); + + assertEquals(10L, ((StreamRecord>) output.poll()).getTimestamp()); + assertEquals(Long.MAX_VALUE, ((Watermark) output.poll()).getTimestamp()); + } + + // ------------------------------------------------------------------------ + + private static class PunctuatedExtractor implements AssignerWithPunctuatedWatermarks> { + private static final long serialVersionUID = 1L; + + @Override + public long extractTimestamp(Tuple2 element, long previousTimestamp) { + return element.f0; + } + + @Override + public long checkAndGetNextWatermark(Tuple2 lastElement, long extractedTimestamp) { + return lastElement.f1 ? extractedTimestamp : -1L; + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java index cfae0263b9b44..72ea8fb9125e3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java @@ -1,23 +1,22 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.flink.streaming.runtime.operators.windowing; -import com.google.common.collect.Lists; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.functions.KeySelector; @@ -26,20 +25,25 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; + import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +@SuppressWarnings("serial") public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { private static List testResults; @@ -47,7 +51,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { @Test public void testCoGroup() throws Exception { - testResults = Lists.newArrayList(); + testResults = new ArrayList<>(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); @@ -74,10 +78,9 @@ public void run(SourceContext> ctx) throws Exception { @Override public void cancel() { } - }).assignTimestamps(new Tuple2TimestampExtractor()); + }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor()); DataStream> source2 = env.addSource(new SourceFunction>() { - private static final long serialVersionUID = 1L; @Override public void run(SourceContext> ctx) throws Exception { @@ -94,7 +97,7 @@ public void run(SourceContext> ctx) throws Exception { @Override public void cancel() { } - }).assignTimestamps(new Tuple2TimestampExtractor()); + }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor()); source1.coGroup(source2) @@ -127,7 +130,7 @@ public void invoke(String value) throws Exception { env.execute("CoGroup Test"); - List expectedResult = Lists.newArrayList( + List expectedResult = Arrays.asList( "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)", "F:(b,3)(b,4)(b,5) S:(b,3)", "F:(a,6)(a,7)(a,8) S:", @@ -142,14 +145,13 @@ public void invoke(String value) throws Exception { @Test public void testJoin() throws Exception { - testResults = Lists.newArrayList(); + testResults = new ArrayList<>(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); DataStream> source1 = env.addSource(new SourceFunction>() { - private static final long serialVersionUID = 1L; @Override public void run(SourceContext> ctx) throws Exception { @@ -166,12 +168,11 @@ public void run(SourceContext> ctx) throws Excep } @Override - public void cancel() { - } - }).assignTimestamps(new Tuple3TimestampExtractor()); + public void cancel() {} + + }).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor()); DataStream> source2 = env.addSource(new SourceFunction>() { - private static final long serialVersionUID = 1L; @Override public void run(SourceContext> ctx) throws Exception { @@ -186,9 +187,9 @@ public void run(SourceContext> ctx) throws Excep } @Override - public void cancel() { - } - }).assignTimestamps(new Tuple3TimestampExtractor()); + public void cancel() {} + + }).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor()); source1.join(source2) @@ -210,7 +211,7 @@ public void invoke(String value) throws Exception { env.execute("Join Test"); - List expectedResult = Lists.newArrayList( + List expectedResult = Arrays.asList( "(a,x,0):(a,u,0)", "(a,x,0):(a,w,1)", "(a,y,1):(a,u,0)", @@ -237,7 +238,7 @@ public void invoke(String value) throws Exception { @Test public void testSelfJoin() throws Exception { - testResults = Lists.newArrayList(); + testResults = new ArrayList<>(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); @@ -263,7 +264,7 @@ public void run(SourceContext> ctx) throws Excep @Override public void cancel() { } - }).assignTimestamps(new Tuple3TimestampExtractor()); + }).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor()); source1.join(source1) .where(new Tuple3KeyExtractor()) @@ -284,7 +285,7 @@ public void invoke(String value) throws Exception { env.execute("Self-Join Test"); - List expectedResult = Lists.newArrayList( + List expectedResult = Arrays.asList( "(a,x,0):(a,x,0)", "(a,x,0):(a,y,1)", "(a,x,0):(a,z,2)", @@ -314,46 +315,36 @@ public void invoke(String value) throws Exception { Assert.assertEquals(expectedResult, testResults); } - private static class Tuple2TimestampExtractor implements TimestampExtractor> { - private static final long serialVersionUID = 1L; - + private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks> { + @Override - public long extractTimestamp(Tuple2 element, long currentTimestamp) { + public long extractTimestamp(Tuple2 element, long previousTimestamp) { return element.f1; } @Override - public long extractWatermark(Tuple2 element, long currentTimestamp) { - return element.f1 - 1; - } - - @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; + public long checkAndGetNextWatermark(Tuple2 element, long extractedTimestamp) { + return extractedTimestamp - 1; } } - private static class Tuple3TimestampExtractor implements TimestampExtractor> { - private static final long serialVersionUID = 1L; + private static class Tuple3TimestampExtractor implements AssignerWithPeriodicWatermarks> { + private long currentTimestamp; + @Override - public long extractTimestamp(Tuple3 element, long currentTimestamp) { + public long extractTimestamp(Tuple3 element, long previousTimestamp) { + currentTimestamp = element.f2; return element.f2; } - @Override - public long extractWatermark(Tuple3 element, long currentTimestamp) { - return element.f2 - 1; - } - @Override public long getCurrentWatermark() { - return Long.MIN_VALUE; + return currentTimestamp - 1; } } private static class Tuple2KeyExtractor implements KeySelector, String> { - private static final long serialVersionUID = 1L; @Override public String getKey(Tuple2 value) throws Exception { @@ -362,7 +353,6 @@ public String getKey(Tuple2 value) throws Exception { } private static class Tuple3KeyExtractor implements KeySelector, String> { - private static final long serialVersionUID = 1L; @Override public String getKey(Tuple3 value) throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java index fb7142bbd090c..5cc75d5f2c035 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java @@ -1,37 +1,39 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.flink.streaming.runtime.operators.windowing; -import com.google.common.collect.Lists; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -40,6 +42,7 @@ * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions * work for windows, because FoldWindowFunction is OutputTypeConfigurable. */ +@SuppressWarnings("serial") public class WindowFoldITCase extends StreamingMultipleProgramsTestBase { private static List testResults; @@ -47,7 +50,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase { @Test public void testFoldWindow() throws Exception { - testResults = Lists.newArrayList(); + testResults = new ArrayList<>(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); @@ -74,7 +77,7 @@ public void run(SourceContext> ctx) throws Exception { @Override public void cancel() { } - }).assignTimestamps(new Tuple2TimestampExtractor()); + }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor()); source1 .keyBy(0) @@ -97,7 +100,7 @@ public void invoke(Tuple2 value) throws Exception { env.execute("Fold Window Test"); - List expectedResult = Lists.newArrayList( + List expectedResult = Arrays.asList( "(R:aaa,3)", "(R:aaa,21)", "(R:bbb,12)"); @@ -111,14 +114,13 @@ public void invoke(Tuple2 value) throws Exception { @Test public void testFoldAllWindow() throws Exception { - testResults = Lists.newArrayList(); + testResults = new ArrayList<>(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); DataStream> source1 = env.addSource(new SourceFunction>() { - private static final long serialVersionUID = 1L; @Override public void run(SourceContext> ctx) throws Exception { @@ -138,7 +140,7 @@ public void run(SourceContext> ctx) throws Exception { @Override public void cancel() { } - }).assignTimestamps(new Tuple2TimestampExtractor()); + }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor()); source1 .windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) @@ -160,7 +162,7 @@ public void invoke(Tuple2 value) throws Exception { env.execute("Fold All-Window Test"); - List expectedResult = Lists.newArrayList( + List expectedResult = Arrays.asList( "(R:aaa,3)", "(R:bababa,24)"); @@ -170,22 +172,19 @@ public void invoke(Tuple2 value) throws Exception { Assert.assertEquals(expectedResult, testResults); } - private static class Tuple2TimestampExtractor implements TimestampExtractor> { - private static final long serialVersionUID = 1L; + private static class Tuple2TimestampExtractor implements AssignerWithPeriodicWatermarks> { + private long currentTimestamp = -1; + @Override - public long extractTimestamp(Tuple2 element, long currentTimestamp) { + public long extractTimestamp(Tuple2 element, long previousTimestamp) { + currentTimestamp = element.f1; return element.f1; } - @Override - public long extractWatermark(Tuple2 element, long currentTimestamp) { - return element.f1 - 1; - } - @Override public long getCurrentWatermark() { - return Long.MIN_VALUE; + return currentTimestamp - 1; } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java index eed455c463b51..020ef4eb3b16a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.timestamp; import org.apache.flink.api.common.functions.MapFunction; @@ -27,7 +28,8 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor; -import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -38,6 +40,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.NoOpSink; import org.apache.flink.test.util.ForkableFlinkMiniCluster; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -245,8 +248,8 @@ public void testTimestampExtractorWithAutoInterval() throws Exception { DataStream source1 = env.addSource(new SourceFunction() { @Override public void run(SourceContext ctx) throws Exception { - int index = 0; - while (index < NUM_ELEMENTS) { + int index = 1; + while (index <= NUM_ELEMENTS) { ctx.collect(index); latch.await(); index++; @@ -254,9 +257,7 @@ public void run(SourceContext ctx) throws Exception { } @Override - public void cancel() { - - } + public void cancel() {} }); DataStream extractOp = source1.assignTimestamps( @@ -280,7 +281,7 @@ public long extractAscendingTimestamp(Integer element, long currentTimestamp) { // verify that we get NUM_ELEMENTS watermarks for (int j = 0; j < NUM_ELEMENTS; j++) { - if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) { + if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) { Assert.fail("Wrong watermark."); } } @@ -307,8 +308,8 @@ public void testTimestampExtractorWithCustomWatermarkEmit() throws Exception { DataStream source1 = env.addSource(new SourceFunction() { @Override public void run(SourceContext ctx) throws Exception { - int index = 0; - while (index < NUM_ELEMENTS) { + int index = 1; + while (index <= NUM_ELEMENTS) { ctx.collect(index); latch.await(); index++; @@ -316,27 +317,22 @@ public void run(SourceContext ctx) throws Exception { } @Override - public void cancel() { - - } + public void cancel() {} }); - source1.assignTimestamps(new TimestampExtractor() { - @Override - public long extractTimestamp(Integer element, long currentTimestamp) { - return element; - } - - @Override - public long extractWatermark(Integer element, long currentTimestamp) { - return element - 1; - } + source1 + .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks() { + + @Override + public long extractTimestamp(Integer element, long currentTimestamp) { + return element; + } - @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; - } - }) + @Override + public long checkAndGetNextWatermark(Integer element, long extractedTimestamp) { + return extractedTimestamp - 1; + } + }) .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) .transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()); @@ -345,7 +341,7 @@ public long getCurrentWatermark() { // verify that we get NUM_ELEMENTS watermarks for (int j = 0; j < NUM_ELEMENTS; j++) { - if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) { + if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) { Assert.fail("Wrong watermark."); } } @@ -372,8 +368,8 @@ public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() throws Exc DataStream source1 = env.addSource(new SourceFunction() { @Override public void run(SourceContext ctx) throws Exception { - int index = 0; - while (index < NUM_ELEMENTS) { + int index = 1; + while (index <= NUM_ELEMENTS) { ctx.collect(index); Thread.sleep(100); ctx.collect(index - 1); @@ -383,27 +379,22 @@ public void run(SourceContext ctx) throws Exception { } @Override - public void cancel() { - - } + public void cancel() {} }); - source1.assignTimestamps(new TimestampExtractor() { - @Override - public long extractTimestamp(Integer element, long currentTimestamp) { - return element; - } + source1 + .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks() { - @Override - public long extractWatermark(Integer element, long currentTimestamp) { - return element - 1; - } + @Override + public long extractTimestamp(Integer element, long previousTimestamp) { + return element; + } - @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; - } - }) + @Override + public long checkAndGetNextWatermark(Integer element, long extractedTimestamp) { + return extractedTimestamp - 1; + } + }) .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) .transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()); @@ -412,7 +403,7 @@ public long getCurrentWatermark() { // verify that we get NUM_ELEMENTS watermarks for (int j = 0; j < NUM_ELEMENTS; j++) { - if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) { + if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) { Assert.fail("Wrong watermark."); } } @@ -453,30 +444,84 @@ public void run(SourceContext ctx) throws Exception { } @Override - public void cancel() { - - } + public void cancel() {} }); - source1.assignTimestamps(new TimestampExtractor() { - @Override - public long extractTimestamp(Integer element, long currentTimestamp) { - return element; - } + source1 + .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks() { + + @Override + public long extractTimestamp(Integer element, long currentTimestamp) { + return element; + } + @Override + public long checkAndGetNextWatermark(Integer element, long extractedTimestamp) { + return -1L; + } + }) + .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)); + + + env.execute(); + + Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1); + Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE); + } + + /** + * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks. + * + * Same test as before, but using a different timestamp extractor + */ + @Test + public void testTimestampExtractorWithLongMaxWatermarkFromSource2() throws Exception { + final int NUM_ELEMENTS = 10; + + StreamExecutionEnvironment env = StreamExecutionEnvironment + .createRemoteEnvironment("localhost", cluster.getLeaderRPCPort()); + + env.setParallelism(2); + env.getConfig().disableSysoutLogging(); + env.getConfig().enableTimestamps(); + env.getConfig().setAutoWatermarkInterval(10); + + DataStream source1 = env.addSource(new EventTimeSourceFunction() { @Override - public long extractWatermark(Integer element, long currentTimestamp) { - return Long.MIN_VALUE; + public void run(SourceContext ctx) throws Exception { + int index = 0; + while (index < NUM_ELEMENTS) { + ctx.collectWithTimestamp(index, index); + ctx.collectWithTimestamp(index - 1, index - 1); + index++; + ctx.emitWatermark(new Watermark(index-2)); + } + + // emit the final Long.MAX_VALUE watermark, do it twice and verify that + // we only see one in the result + ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); + ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); } @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; - } - }) - .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)); + public void cancel() {} + }); + + source1 + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() { + @Override + public long extractTimestamp(Integer element, long currentTimestamp) { + return element; + } + @Override + public long getCurrentWatermark() { + return -1L; + } + }) + .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)); + env.execute(); Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1); @@ -625,8 +670,7 @@ public void processElement(StreamRecord element) throws Exception { } @Override - public void processWatermark(Watermark mark) throws Exception { - } + public void processWatermark(Watermark mark) throws Exception {} } public static class DisabledTimestampCheckingOperator extends AbstractStreamOperator implements OneInputStreamOperator { @@ -640,8 +684,7 @@ public void processElement(StreamRecord element) throws Exception { } @Override - public void processWatermark(Watermark mark) throws Exception { - } + public void processWatermark(Watermark mark) throws Exception {} } public static class IdentityCoMap implements CoMapFunction { @@ -682,9 +725,7 @@ public void run(SourceContext ctx) throws Exception { } @Override - public void cancel() { - - } + public void cancel() {} } public static class MyNonWatermarkingSource implements SourceFunction { @@ -703,9 +744,7 @@ public void run(SourceContext ctx) throws Exception { } @Override - public void cancel() { - - } + public void cancel() {} } // This is a event-time source. This should only emit elements with timestamps. The test should @@ -720,9 +759,7 @@ public void run(SourceContext ctx) throws Exception { } @Override - public void cancel() { - - } + public void cancel() {} } // This is a normal source. This should only emit elements without timestamps. The test should @@ -737,9 +774,7 @@ public void run(SourceContext ctx) throws Exception { } @Override - public void cancel() { - - } + public void cancel() {} } // This is a normal source. This should only emit elements without timestamps. This also @@ -755,8 +790,6 @@ public void run(SourceContext ctx) throws Exception { } @Override - public void cancel() { - - } + public void cancel() {} } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 1e2d3ece6335f..c484eae9f27e4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.mockito.invocation.InvocationOnMock; @@ -42,6 +43,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -67,15 +69,17 @@ public class OneInputStreamOperatorTestHarness { final Object checkpointLock; StreamTask mockTask; - - AbstractStateBackend stateBackend; public OneInputStreamOperatorTestHarness(OneInputStreamOperator operator) { + this(operator, new ExecutionConfig()); + } + + public OneInputStreamOperatorTestHarness(OneInputStreamOperator operator, ExecutionConfig executionConfig) { this.operator = operator; this.outputList = new ConcurrentLinkedQueue(); this.config = new StreamConfig(new Configuration()); - this.executionConfig = new ExecutionConfig(); + this.executionConfig = executionConfig; this.checkpointLock = new Object(); final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024); @@ -100,6 +104,35 @@ public AbstractStateBackend answer(InvocationOnMock invocationOnMock) throws Thr } catch (Exception e) { e.printStackTrace(); } + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final long execTime = (Long) invocation.getArguments()[0]; + final Triggerable target = (Triggerable) invocation.getArguments()[1]; + + Thread caller = new Thread() { + @Override + public void run() { + final long delay = execTime - System.currentTimeMillis(); + if (delay > 0) { + try { + Thread.sleep(delay); + } catch (InterruptedException ignored) {} + } + + synchronized (checkpointLock) { + try { + target.trigger(execTime); + } catch (Exception ignored) {} + } + } + }; + caller.start(); + + return null; + } + }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class)); } public void configureForKeyedStream(KeySelector keySelector, TypeInformation keyType) { diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index f66644ee19f01..e6caaaa4653cd 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -30,7 +30,7 @@ import org.apache.flink.core.fs.{FileSystem, Path} import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _} import org.apache.flink.streaming.api.functions.sink.SinkFunction -import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor} +import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, AssignerWithPeriodicWatermarks, AscendingTimestampExtractor, TimestampExtractor} import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window} @@ -630,6 +630,7 @@ class DataStream[T](stream: JavaStream[T]) { def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] = { new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](stream, assigner)) } + /** * Extracts a timestamp from an element and assigns it as the internal timestamp of that element. * The internal timestamps are, for example, used to to event-time window operations. @@ -641,21 +642,82 @@ class DataStream[T](stream: JavaStream[T]) { * * @see org.apache.flink.streaming.api.watermark.Watermark */ - @PublicEvolving + @deprecated def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = { stream.assignTimestamps(clean(extractor)) } /** - * Extracts a timestamp from an element and assigns it as the internal timestamp of that element. - * The internal timestamps are, for example, used to to event-time window operations. + * Assigns timestamps to the elements in the data stream and periodically creates + * watermarks to signal event time progress. * - * If you know that the timestamps are strictly increasing you can use an - * [[org.apache.flink.streaming.api.functions.AscendingTimestampExtractor]]. Otherwise, - * you should provide a [[TimestampExtractor]] that also implements - * [[TimestampExtractor#getCurrentWatermark]] to keep track of watermarks. + * This method creates watermarks periodically (for example every second), based + * on the watermarks indicated by the given watermark generator. Even when no new elements + * in the stream arrive, the given watermark generator will be periodically checked for + * new watermarks. The interval in which watermarks are generated is defined in + * [[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)]]. * - * @see org.apache.flink.streaming.api.watermark.Watermark + * Use this method for the common cases, where some characteristic over all elements + * should generate the watermarks, or where watermarks are simply trailing behind the + * wall clock time by a certain amount. + * + * For cases where watermarks should be created in an irregular fashion, for example + * based on certain markers that some element carry, use the + * [[AssignerWithPunctuatedWatermarks]]. + * + * @see AssignerWithPeriodicWatermarks + * @see AssignerWithPunctuatedWatermarks + * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks) + */ + @PublicEvolving + def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]) + : DataStream[T] = { + + stream.assignTimestampsAndWatermarks(assigner) + } + + /** + * Assigns timestamps to the elements in the data stream and periodically creates + * watermarks to signal event time progress. + * + * This method creates watermarks based purely on stream elements. For each element + * that is handled via [[AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)]], + * the [[AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark()]] method is called, + * and a new watermark is emitted, if the returned watermark value is larger than the previous + * watermark. + * + * This method is useful when the data stream embeds watermark elements, or certain elements + * carry a marker that can be used to determine the current event time watermark. + * This operation gives the programmer full control over the watermark generation. Users + * should be aware that too aggressive watermark generation (i.e., generating hundreds of + * watermarks every second) can cost some performance. + * + * For cases where watermarks should be created in a regular fashion, for example + * every x milliseconds, use the [[AssignerWithPeriodicWatermarks]]. + * + * @see AssignerWithPunctuatedWatermarks + * @see AssignerWithPeriodicWatermarks + * @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks) + */ + @PublicEvolving + def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]) + : DataStream[T] = { + + stream.assignTimestampsAndWatermarks(assigner) + } + + /** + * Assigns timestamps to the elements in the data stream and periodically creates + * watermarks to signal event time progress. + * + * This method is a shortcut for data streams where the element timestamp are known + * to be monotonously ascending within each parallel stream. + * In that case, the system can generate watermarks automatically and perfectly + * by tracking the ascending timestamps. + * + * For cases where the timestamps are not monotonously increasing, use the more + * general methods [[assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)]] + * and [[assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)]]. */ @PublicEvolving def assignAscendingTimestamps(extractor: T => Long): DataStream[T] = { diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala index 3c1e9c3aba2c3..5f10eacb534a6 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala @@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala import java.util.concurrent.TimeUnit import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.functions.TimestampExtractor +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows @@ -55,9 +55,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(("a", 8)) } - def cancel() { - } - }).assignTimestamps(new CoGroupJoinITCase.Tuple2TimestampExtractor) + def cancel() {} + + }).assignTimestampsAndWatermarks(new CoGroupJoinITCase.Tuple2TimestampExtractor) val source2 = env.addSource(new SourceFunction[(String, Int)]() { def run(ctx: SourceFunction.SourceContext[(String, Int)]) { @@ -71,7 +71,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { def cancel() { } - }).assignTimestamps(new CoGroupJoinITCase.Tuple2TimestampExtractor) + }).assignTimestampsAndWatermarks(new CoGroupJoinITCase.Tuple2TimestampExtractor) source1.coGroup(source2) .where(_._1) @@ -119,9 +119,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(("a", "k", 8)) } - def cancel() { - } - }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor) + def cancel() {} + + }).assignTimestampsAndWatermarks(new CoGroupJoinITCase.Tuple3TimestampExtractor) val source2 = env.addSource(new SourceFunction[(String, String, Int)]() { def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) { @@ -135,9 +135,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(("a", "z", 8)) } - def cancel() { - } - }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor) + def cancel() {} + + }).assignTimestampsAndWatermarks(new CoGroupJoinITCase.Tuple3TimestampExtractor) source1.join(source2) .where(_._1) @@ -195,9 +195,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(("a", "k", 8)) } - def cancel() { - } - }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor) + def cancel() {} + + }).assignTimestampsAndWatermarks(new CoGroupJoinITCase.Tuple3TimestampExtractor) source1.join(source1) .where(_._1) @@ -245,31 +245,25 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { object CoGroupJoinITCase { private var testResults: mutable.MutableList[String] = null - private class Tuple2TimestampExtractor extends TimestampExtractor[(String, Int)] { - def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long = { + private class Tuple2TimestampExtractor extends AssignerWithPunctuatedWatermarks[(String, Int)] { + + override def extractTimestamp(element: (String, Int), previousTimestamp: Long): Long = { element._2 } - def extractWatermark(element: (String, Int), currentTimestamp: Long): Long = { - element._2 - 1 - } - - def getCurrentWatermark: Long = { - Long.MinValue - } + override def checkAndGetNextWatermark( + lastElement: (String, Int), + extractedTimestamp: Long): Long = extractedTimestamp - 1 } - private class Tuple3TimestampExtractor extends TimestampExtractor[(String, String, Int)] { - def extractTimestamp(element: (String, String, Int), currentTimestamp: Long): Long = { - element._3 - } + private class Tuple3TimestampExtractor extends + AssignerWithPunctuatedWatermarks[(String, String, Int)] { + + override def extractTimestamp(element: (String, String, Int), previousTimestamp: Long): Long + = element._3 - def extractWatermark(element: (String, String, Int), currentTimestamp: Long): Long = { - element._3 - 1 - } - - def getCurrentWatermark: Long = { - Long.MinValue - } + override def checkAndGetNextWatermark( + lastElement: (String, String, Int), + extractedTimestamp: Long): Long = extractedTimestamp - 1 } } diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala index d4e8bb256df01..f358ac683b749 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala @@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala import java.util.concurrent.TimeUnit import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.functions.TimestampExtractor +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows @@ -61,7 +61,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { def cancel() { } - }).assignTimestamps(new WindowFoldITCase.Tuple2TimestampExtractor) + }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor) source1 .keyBy(0) @@ -106,7 +106,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { def cancel() { } - }).assignTimestamps(new WindowFoldITCase.Tuple2TimestampExtractor) + }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor) source1 .windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) @@ -132,17 +132,17 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { object WindowFoldITCase { private var testResults: mutable.MutableList[String] = null - private class Tuple2TimestampExtractor extends TimestampExtractor[(String, Int)] { - def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long = { - element._2 + private class Tuple2TimestampExtractor extends AssignerWithPeriodicWatermarks[(String, Int)] { + + private var currentTimestamp = -1L + + override def extractTimestamp(element: (String, Int), previousTimestamp: Long): Long = { + currentTimestamp = element._2 + currentTimestamp } - def extractWatermark(element: (String, Int), currentTimestamp: Long): Long = { - element._2 - 1 - } - - def getCurrentWatermark: Long = { - Long.MinValue + override def getCurrentWatermark(): Long = { + currentTimestamp - 1 } } } From 6dcdc3e3a82024c066d73b2d25b27a723bbccdd3 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Mon, 15 Feb 2016 18:41:15 +0100 Subject: [PATCH 2/2] [FLINK-3401] [streaming] [api breaking] AscendingTimestampExtractor only logs violations of ascending timestamp order. The user can also explicitly set an 'IgnoringHandler' or a 'FailingHandler', which do nothing on violations, respectively fail hard. --- .../streaming/examples/join/WindowJoin.java | 4 +- .../examples/windowing/TopSpeedWindowing.java | 2 +- .../AscendingTimestampExtractor.java | 124 ++++++++++++++---- .../api/complex/ComplexIntegrationTest.java | 2 +- .../AscendingTimestampExtractorTest.java | 100 ++++++++++++++ .../streaming/timestamp/TimestampITCase.java | 2 +- .../streaming/api/scala/DataStream.scala | 2 +- 7 files changed, 208 insertions(+), 28 deletions(-) create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index 2afccc8b9b704..7c64482c14ce9 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -74,8 +74,8 @@ public static void main(String[] args) throws Exception { DataStream> salaries = input.f1; // extract the timestamps - grades = grades.assignTimestamps(new MyTimestampExtractor()); - salaries = salaries.assignTimestamps(new MyTimestampExtractor()); + grades = grades.assignTimestampsAndWatermarks(new MyTimestampExtractor()); + salaries = salaries.assignTimestampsAndWatermarks(new MyTimestampExtractor()); // apply a temporal join over the two stream based on the names over one // second windows diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java index b0d1462ec1895..9104416df0fd5 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java @@ -71,7 +71,7 @@ public static void main(String[] args) throws Exception { int evictionSec = 10; double triggerMeters = 50; DataStream> topSpeeds = carData - .assignTimestamps(new CarTimestamp()) + .assignTimestampsAndWatermarks(new CarTimestamp()) .keyBy(0) .window(GlobalWindows.create()) .evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS))) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java index 97f0095a1527c..60216b6c310ee 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,48 +15,128 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + /** - * Interface for user functions that extract timestamps from elements. The extracting timestamps - * must be monotonically increasing. + * A timestamp assigner and watermark generator for streams where timestamps are monotonously + * ascending. In this case, the local watermarks for the streams are easy to generate, because + * they strictly follow the timestamps. * * @param The type of the elements that this function can extract timestamps from */ @PublicEvolving -public abstract class AscendingTimestampExtractor implements TimestampExtractor { - - long currentTimestamp = 0; +public abstract class AscendingTimestampExtractor implements AssignerWithPeriodicWatermarks { + + private static final long serialVersionUID = 1L; + + /** The current timestamp */ + private long currentTimestamp = 0; + /** Handler that is called when timestamp monotony is violated */ + private MonotonyViolationHandler violationHandler = new LoggingHandler(); + /** - * Extracts a timestamp from an element. The timestamp must be monotonically increasing. + * Extracts the timestamp from the given element. The timestamp must be monotonically increasing. * * @param element The element that the timestamp is extracted from. - * @param currentTimestamp The current internal timestamp of the element. + * @param previousElementTimestamp The current internal timestamp of the element. + * * @return The new timestamp. */ - public abstract long extractAscendingTimestamp(T element, long currentTimestamp); + public abstract long extractAscendingTimestamp(T element, long previousElementTimestamp); - @Override - public final long extractTimestamp(T element, long currentTimestamp) { - long newTimestamp = extractAscendingTimestamp(element, currentTimestamp); - if (newTimestamp < this.currentTimestamp) { - throw new RuntimeException("Timestamp is lower than previously extracted timestamp. " + - "You should implement a custom TimestampExtractor."); - } - this.currentTimestamp = newTimestamp; - return this.currentTimestamp; + /** + * Sets the handler for violations to the ascending timestamp order. + * + * @param handler The violation handler to use. + * @return This extractor. + */ + public AscendingTimestampExtractor withViolationHandler(MonotonyViolationHandler handler) { + this.violationHandler = requireNonNull(handler); + return this; } - + + // ------------------------------------------------------------------------ + @Override - public final long extractWatermark(T element, long currentTimestamp) { - return Long.MIN_VALUE; + public final long extractTimestamp(T element, long elementPrevTimestamp) { + final long newTimestamp = extractAscendingTimestamp(element, elementPrevTimestamp); + if (newTimestamp >= this.currentTimestamp) { + this.currentTimestamp = newTimestamp; + return newTimestamp; + } else { + violationHandler.handleViolation(newTimestamp, this.currentTimestamp); + return newTimestamp; + } } @Override public final long getCurrentWatermark() { return currentTimestamp - 1; } + + // ------------------------------------------------------------------------ + // Handling violations of monotonous timestamps + // ------------------------------------------------------------------------ + + /** + * Interface for handlers that handle violations of the monotonous ascending timestamps + * property. + */ + public interface MonotonyViolationHandler extends java.io.Serializable { + + /** + * Called when the property of monotonously ascending timestamps is violated, i.e., + * when {@code elementTimestamp < lastTimestamp}. + * + * @param elementTimestamp The timestamp of the current element. + * @param lastTimestamp The last timestamp. + */ + void handleViolation(long elementTimestamp, long lastTimestamp); + } + + /** + * Handler that does nothing when timestamp monotony is violated. + */ + public static final class IgnoringHandler implements MonotonyViolationHandler { + private static final long serialVersionUID = 1L; + + @Override + public void handleViolation(long elementTimestamp, long lastTimestamp) {} + } + + /** + * Handler that fails the program when timestamp monotony is violated. + */ + public static final class FailingHandler implements MonotonyViolationHandler { + private static final long serialVersionUID = 1L; + + @Override + public void handleViolation(long elementTimestamp, long lastTimestamp) { + throw new RuntimeException("Ascending timestamps condition violated. Element timestamp " + + elementTimestamp + " is smaller than last timestamp " + lastTimestamp); + } + } + + /** + * Handler that only logs violations of timestamp monotony, on WARN log level. + */ + public static final class LoggingHandler implements MonotonyViolationHandler { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class); + + @Override + public void handleViolation(long elementTimestamp, long lastTimestamp) { + LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp); + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java index 8c29a156f4a9c..41df4e6753cb6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java @@ -205,7 +205,7 @@ public void complexIntegrationTest2() throws Exception { DataStream sourceStream22 = env.addSource(new PojoSource()); sourceStream21 - .assignTimestamps(new MyTimestampExtractor()) + .assignTimestampsAndWatermarks(new MyTimestampExtractor()) .keyBy(2, 2) .timeWindow(Time.of(10, TimeUnit.MILLISECONDS), Time.of(4, TimeUnit.MILLISECONDS)) .maxBy(3) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java new file mode 100644 index 0000000000000..4f1eeb98d0e99 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class AscendingTimestampExtractorTest { + + @Test + public void testWithFailingHandler() { + AscendingTimestampExtractor extractor = new LongExtractor() + .withViolationHandler(new AscendingTimestampExtractor.FailingHandler()); + + runValidTests(extractor); + try { + runInvalidTest(extractor); + fail("should fail with an exception"); + } catch (Exception ignored) {} + } + + @Test + public void testWithIgnoringHandler() { + AscendingTimestampExtractor extractor = new LongExtractor() + .withViolationHandler(new AscendingTimestampExtractor.IgnoringHandler()); + + runValidTests(extractor); + runInvalidTest(extractor); + } + + @Test + public void testWithLoggingHandler() { + AscendingTimestampExtractor extractor = new LongExtractor() + .withViolationHandler(new AscendingTimestampExtractor.LoggingHandler()); + + runValidTests(extractor); + runInvalidTest(extractor); + } + + @Test + public void testWithDefaultHandler() { + AscendingTimestampExtractor extractor = new LongExtractor(); + + runValidTests(extractor); + runInvalidTest(extractor); + } + + // ------------------------------------------------------------------------ + + private void runValidTests(AscendingTimestampExtractor extractor) { + assertEquals(13L, extractor.extractTimestamp(13L, -1L)); + assertEquals(13L, extractor.extractTimestamp(13L, 0L)); + assertEquals(14L, extractor.extractTimestamp(14L, 0L)); + assertEquals(20L, extractor.extractTimestamp(20L, 0L)); + assertEquals(20L, extractor.extractTimestamp(20L, 0L)); + assertEquals(20L, extractor.extractTimestamp(20L, 0L)); + assertEquals(500L, extractor.extractTimestamp(500L, 0L)); + + assertEquals(Long.MAX_VALUE - 1, extractor.extractTimestamp(Long.MAX_VALUE - 1, 99999L)); + + + } + + private void runInvalidTest(AscendingTimestampExtractor extractor) { + assertEquals(1000L, extractor.extractTimestamp(1000L, 100)); + assertEquals(1000L, extractor.extractTimestamp(1000L, 100)); + + // violation + assertEquals(999L, extractor.extractTimestamp(999L, 100)); + } + + // ------------------------------------------------------------------------ + + private static class LongExtractor extends AscendingTimestampExtractor { + private static final long serialVersionUID = 1L; + + @Override + public long extractAscendingTimestamp(Long element, long currentTimestamp) { + return element; + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java index 020ef4eb3b16a..b9674d6ef8345 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java @@ -260,7 +260,7 @@ public void run(SourceContext ctx) throws Exception { public void cancel() {} }); - DataStream extractOp = source1.assignTimestamps( + DataStream extractOp = source1.assignTimestampsAndWatermarks( new AscendingTimestampExtractor() { @Override public long extractAscendingTimestamp(Integer element, long currentTimestamp) { diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index e6caaaa4653cd..64c6d082db0b6 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -727,7 +727,7 @@ class DataStream[T](stream: JavaStream[T]) { cleanExtractor(element) } } - stream.assignTimestamps(extractorFunction) + stream.assignTimestampsAndWatermarks(extractorFunction) } /**