Skip to content

Commit

Permalink
[FLINK-3435] [streaming] Proparly separate IngestionTime and EventTime
Browse files Browse the repository at this point in the history
  - IngestionTime now only auto-generates watermarks
  - EventTime does not auto-generate ingestion watermarks

This also removes the "ExecutionConfig.areTimestampsEnabled()" flag.

This closes #1699
  • Loading branch information
StephanEwen authored and rmetzger committed Feb 24, 2016
1 parent 004eb00 commit ceb6424
Show file tree
Hide file tree
Showing 72 changed files with 1,287 additions and 813 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void dispose() {

@Override
public void processElement(final StreamRecord<IN> element) throws Exception {
this.flinkCollector.setTimestamp(element.getTimestamp());
this.flinkCollector.setTimestamp(element);

IN value = element.getValue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public MergedInputsBoltWrapper(final IRichBolt bolt, final String name, final Co

@Override
public void processElement(final StreamRecord<StormTuple<IN>> element) throws Exception {
this.flinkCollector.setTimestamp(element.getTimestamp());
this.flinkCollector.setTimestamp(element);
this.bolt.execute(element.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import static org.mockito.Mockito.when;

@RunWith(PowerMockRunner.class)
@PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class})
@PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class, StreamRecord.class})
public class BoltWrapperTest extends AbstractTest {

@Test(expected = IllegalArgumentException.class)
Expand Down Expand Up @@ -197,7 +197,7 @@ public void testMultipleOutputStreams() throws Exception {
splitRecord.value = new Tuple1<Integer>(2);
}
wrapper.processElement(record);
verify(output).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
verify(output).collect(new StreamRecord<SplitStreamType>(splitRecord));

if (rawOutType2) {
splitRecord.streamId = "stream2";
Expand All @@ -207,7 +207,7 @@ public void testMultipleOutputStreams() throws Exception {
splitRecord.value = new Tuple1<Integer>(3);
}
wrapper.processElement(record);
verify(output, times(2)).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
verify(output, times(2)).collect(new StreamRecord<SplitStreamType>(splitRecord));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,49 +168,10 @@ public boolean isClosureCleanerEnabled() {
*/
@PublicEvolving
public ExecutionConfig setAutoWatermarkInterval(long interval) {
enableTimestamps();
this.autoWatermarkInterval = interval;
return this;
}

/**
* Enables streaming timestamps. When this is enabled all records that are emitted
* from a source have a timestamp attached. This is required if a topology contains
* operations that rely on watermarks and timestamps to perform operations, such as
* event-time windows.
*
* <p>
* This is automatically enabled if you enable automatic watermarks.
*
* @see #setAutoWatermarkInterval(long)
*/
@PublicEvolving
public ExecutionConfig enableTimestamps() {
this.timestampsEnabled = true;
return this;
}

/**
* Disables streaming timestamps.
*
* @see #enableTimestamps()
*/
@PublicEvolving
public ExecutionConfig disableTimestamps() {
this.timestampsEnabled = false;
return this;
}

/**
* Returns true when timestamps are enabled.
*
* @see #enableTimestamps()
*/
@PublicEvolving
public boolean areTimestampsEnabled() {
return timestampsEnabled;
}

/**
* Returns the interval of the automatic watermark emission.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public Tuple3<String, Integer, Integer> join(Tuple3<Long, String, Integer> first
private static class MyTimestampExtractor extends AscendingTimestampExtractor<Tuple3<Long, String, Integer>> {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element, long currentTimestamp) {
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -158,8 +159,8 @@ public long extractTimestamp(Integer element, long previousElementTimestamp) {
}

@Override
public long checkAndGetNextWatermark(Integer lastElement, long extractedTimestamp) {
return counter - 1;
public Watermark checkAndGetNextWatermark(Integer lastElement, long extractedTimestamp) {
return new Watermark(counter - 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
Expand Down Expand Up @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception {
input.add(new Tuple3<>("c", 11L, 1));

DataStream<Tuple3<String, Long, Integer>> source = env
.addSource(new EventTimeSourceFunction<Tuple3<String,Long,Integer>>() {
.addSource(new SourceFunction<Tuple3<String,Long,Integer>>() {
private static final long serialVersionUID = 1L;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private static class CarTimestamp extends AscendingTimestampExtractor<Tuple4<Int
private static final long serialVersionUID = 1L;

@Override
public long extractAscendingTimestamp(Tuple4<Integer, Integer, Double, Long> element, long previous) {
public long extractAscendingTimestamp(Tuple4<Integer, Integer, Double, Long> element) {
return element.f3;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object WindowJoin {
println("Usage: WindowJoin --grades <path> --salaries <path> --output <path>")

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
env.getConfig.setGlobalJobParameters(params)

// Create streams for grades and salaries by mapping the inputs to the corresponding objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;

import org.junit.After;
Expand Down Expand Up @@ -222,9 +222,8 @@ public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp
}

@Override
public long checkAndGetNextWatermark(Tuple2<Event, Long> lastElement,
long extractedTimestamp) {
return lastElement.f1 - 5;
public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
return new Watermark(lastElement.f1 - 5);
}

}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
Expand Down Expand Up @@ -308,9 +307,8 @@ public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp)
}

@Override
public long checkAndGetNextWatermark(Tuple2<Event, Long> lastElement,
long extractedTimestamp) {
return lastElement.f1 - 5;
public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
return new Watermark(lastElement.f1 - 5);
}

}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,7 @@ public void run(SourceContext<T> sourceContext) throws Exception {
else {
// this source never completes, so emit a Long.MAX_VALUE watermark
// to not block watermark forwarding
if (getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) {
sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
}
sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));

final Object waitLock = new Object();
while (running) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,7 @@ public void run(SourceContext<T> sourceContext) throws Exception {
} else {
// this source never completes, so emit a Long.MAX_VALUE watermark
// to not block watermark forwarding
if (getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) {
sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
}
sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));

final Object waitLock = new Object();
this.waitThread = Thread.currentThread();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public enum TimeCharacteristic {
* Event time means that the time of each individual element in the stream (also called event)
* is determined by the event's individual custom timestamp. These timestamps either exist in the
* elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources.
* The big implication of this is that elements arrive in the sources and in all operators generally
* out of order, meaning that elements with earlier timestamps may arrive after elements with
* later timestamps.
* The big implication of this is that it allows for elements to arrive in the sources and in
* all operators out of order, meaning that elements with earlier timestamps may arrive after
* elements with later timestamps.
* <p>
* Operators that window or order data with respect to event time must buffer data until they can
* be sure that all timestamps for a certain time interval have been received. This is handled by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,17 +768,17 @@ public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? sup
return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}

/**
* Assigns timestamps to the elements in the data stream and periodically creates
* watermarks to signal event time progress.
*
* <p>This method creates watermarks based purely on stream elements. For each element
* that is handled via {@link AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)},
* the {@link AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark(Object, long)}
* the {@link AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark(Object, long)}
* method is called, and a new watermark is emitted, if the returned watermark value is
* non-negative and greater than the previous watermark.
*
*
* <p>This method is useful when the data stream embeds watermark elements, or certain elements
* carry a marker that can be used to determine the current event time watermark.
* This operation gives the programmer full control over the watermark generation. Users
Expand All @@ -794,7 +794,7 @@ public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? sup
*
* @see AssignerWithPunctuatedWatermarks
* @see AssignerWithPeriodicWatermarks
* @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
* @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
*/
public SingleOutputStreamOperator<T, ?> assignTimestampsAndWatermarks(
AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,8 @@ public void registerType(Class<?> type) {
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = requireNonNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().disableTimestamps();
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().enableTimestamps();
getConfig().setAutoWatermarkInterval(200);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.PublicEvolving;

import org.apache.flink.streaming.api.watermark.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,21 +38,20 @@ public abstract class AscendingTimestampExtractor<T> implements AssignerWithPeri

private static final long serialVersionUID = 1L;

/** The current timestamp */
private long currentTimestamp = 0;
/** The current timestamp. */
private long currentTimestamp = Long.MIN_VALUE;

/** Handler that is called when timestamp monotony is violated */
private MonotonyViolationHandler violationHandler = new LoggingHandler();


/**
* Extracts the timestamp from the given element. The timestamp must be monotonically increasing.
*
* @param element The element that the timestamp is extracted from.
* @param previousElementTimestamp The current internal timestamp of the element.
*
* @return The new timestamp.
*/
public abstract long extractAscendingTimestamp(T element, long previousElementTimestamp);
public abstract long extractAscendingTimestamp(T element);

/**
* Sets the handler for violations to the ascending timestamp order.
Expand All @@ -68,7 +68,7 @@ public AscendingTimestampExtractor<T> withViolationHandler(MonotonyViolationHand

@Override
public final long extractTimestamp(T element, long elementPrevTimestamp) {
final long newTimestamp = extractAscendingTimestamp(element, elementPrevTimestamp);
final long newTimestamp = extractAscendingTimestamp(element);
if (newTimestamp >= this.currentTimestamp) {
this.currentTimestamp = newTimestamp;
return newTimestamp;
Expand All @@ -79,8 +79,8 @@ public final long extractTimestamp(T element, long elementPrevTimestamp) {
}

@Override
public final long getCurrentWatermark() {
return currentTimestamp - 1;
public final Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.watermark.Watermark;

/**
* The {@code AssignerWithPeriodicWatermarks} assigns event time timestamps to elements,
Expand Down Expand Up @@ -50,11 +51,13 @@ public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T>

/**
* Returns the current watermark. This method is periodically called by the
* system to retrieve the current watermark.
* system to retrieve the current watermark. The method may return null to
* indicate that no new Watermark is available.
*
* <p>The current watermark will be emitted only if it is larger than the previously
* emitted watermark. If the current watermark is still identical to the previous
* one, no progress in event time has happened since the previous call to this method.
* <p>The returned watermark will be emitted only if it is non-null and larger
* than the previously emitted watermark. If the current watermark is still
* identical to the previous one, no progress in event time has happened since
* the previous call to this method.
*
* <p>If this method returns a value that is smaller than the previously returned watermark,
* then the implementation does not properly handle the event stream timestamps.
Expand All @@ -67,5 +70,5 @@ public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T>
* @see org.apache.flink.streaming.api.watermark.Watermark
* @see ExecutionConfig#getAutoWatermarkInterval()
*/
long getCurrentWatermark();
Watermark getCurrentWatermark();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.streaming.api.functions;

import org.apache.flink.streaming.api.watermark.Watermark;

/**
* The {@code AssignerWithPunctuatedWatermarks} assigns event time timestamps to elements,
* and generates low watermarks that signal event time progress within the stream.
Expand All @@ -40,8 +42,8 @@
* return element.getSequenceTimestamp();
* }
*
* public long checkAndGetNextWatermark(MyElement lastElement, long extractedTimestamp) {
* return lastElement.isEndOfSequence() ? extractedTimestamp : -1L;
* public Watermark checkAndGetNextWatermark(MyElement lastElement, long extractedTimestamp) {
* return lastElement.isEndOfSequence() ? new Watermark(extractedTimestamp) : null;
* }
* }
* }</pre>
Expand Down Expand Up @@ -70,8 +72,7 @@ public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T
* <p>For an example how to use this method, see the documentation of
* {@link AssignerWithPunctuatedWatermarks this class}.
*
* @return A negative value, if no watermark should be emitted, positive value for
* emitting this value as a watermark.
* @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
*/
long checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}
Loading

0 comments on commit ceb6424

Please sign in to comment.