Skip to content

Commit

Permalink
[FLINK-1967] Introduce (Event)time in Streaming
Browse files Browse the repository at this point in the history
This introduces an additional timestamp field in StreamRecord. When using a
SourceFunction and an auto-timestamp interval is set using the
ExecutionConfig, the timestamp is automatically set to System.currentTimeMillis()
upon element emission. The timestamp can be manually set using an
EventTimeSourceFunction.

This also changes the signature of the StreamOperators. They now get
a StreamRecord instead of the unwrapped value. This is necessary for
them to access the timestamp. Before, the StreamTask would unwrap the
value from the StreamRecord, now this is moved one layer higher.

This also introduces watermarks to keep track of processing. At
a configurable interval the sources will emit watermarks that signify
that no records with a lower timestamp will be emitted in the future by
this source. The timestamps are broadcast, stream inputs wait for watermark
events on all input channels and forward the watermark to the
StreamOperator once the watermark advances on all inputs. Operators are
responsible for forwarding the watermark once they know that no elements
with a previous timestamp will be emitted (this is mostly relevant for
buffering operations such as windows). Right now, all operators simply
forward the watermark they receive.

When using an EventTimeSourceFunction the system does not
automatically emit timestamps, the user is required to manually emit
watermarks using the SourceContext.

No watermarks will be emitted unless
ExecutionConfig.setAutoWatermarkInterval is used to set a watermark
interval.

Per default timestamps and watermarks are completely disabled, there is
a switch in ExecutionConfig (enableTimstamps()) to enable them. This
means that, out-of-box, the performance is not changed by adding this
new feature. If it is not used.

This commit contains fixes for other stuff that was discovered while
implementing the feature. See Jira issue numbers and descriptions below.

[FLINK-2290/2295] Fix CoReader Event Handling and Checkpointing

This changes CoReader (now CoStreamingRecordReader) to reuse
UnionGate for the input multiplexing. This way it will not lock in on
one specific input side and read events from both input sides.

This also enables an event listener for checkpoint barriers so that the
TwoInputTask now reacts to those and correctly forwards them.

Then, this adds CoStreamCheckpointintITCase to verify that checkpointing
works in topologies with TwoInputStreamTasks.

This also adds tests for OneInputStreamTask and TwoInputStreamTask
that check whether:
 - whether open()/close() of RichFunctions are correctly
   called
 - Watermarks are correctly forwarded
 - Supersteps/checkpoint barriers are correctly forwarded and the
   blocking of inputs works correctly

Add proper tests for Stream Operators

These test whether:
 - open()/close() on RichFunctions are called
 - Timestamps of emitted elements match the timestamp of the input
   element
 - Watermarks are correctly forwarded

[FLINK-2301] Fix Behaviour of BarrierBuffer and add Tests

Before, a CheckpointBarrier from a more recent Checkpoint would also
trigger unblocking while waiting on an older CheckpointBarrier. Now,
a CheckpointBarrier from a newer checkpoint will unblock all channels
and start a new wait on the new Checkpoint.

The tests for OneInputStreamTask and TwoInputStreamTask check whether
the buffer behaves correctly when receiving CheckpointBarriers from more
recent checkpoints while still waiting on an older CheckpointBarrier.
  • Loading branch information
aljoscha committed Jul 20, 2015
1 parent 2d191ab commit a2eb6cc
Show file tree
Hide file tree
Showing 150 changed files with 6,913 additions and 2,997 deletions.
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25; import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.util.Collector;


import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
Expand All @@ -34,8 +35,8 @@
*/ */
class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector { class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector {


/** The Flink output object */ /** The Flink output Collector */
private final Output<OUT> flinkOutput; private final Collector<OUT> flinkOutput;


/** /**
* Instantiates a new {@link StormBoltCollector} that emits Flink tuples to the given Flink * Instantiates a new {@link StormBoltCollector} that emits Flink tuples to the given Flink
Expand All @@ -50,7 +51,7 @@ class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOu
* @throws UnsupportedOperationException * @throws UnsupportedOperationException
* if the specified number of attributes is not in the valid range of [0,25] * if the specified number of attributes is not in the valid range of [0,25]
*/ */
public StormBoltCollector(final int numberOfAttributes, final Output<OUT> flinkOutput) throws UnsupportedOperationException { public StormBoltCollector(final int numberOfAttributes, final Collector<OUT> flinkOutput) throws UnsupportedOperationException {
super(numberOfAttributes); super(numberOfAttributes);
assert (flinkOutput != null); assert (flinkOutput != null);
this.flinkOutput = flinkOutput; this.flinkOutput = flinkOutput;
Expand Down
Expand Up @@ -26,6 +26,9 @@
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;




Expand All @@ -51,6 +54,12 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
/** Number of attributes of the bolt's output tuples */ /** Number of attributes of the bolt's output tuples */
private final int numberOfAttributes; private final int numberOfAttributes;


/**
* We have to use this because Operators must output
* {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
*/
private TimestampedCollector<OUT> flinkCollector;

/** /**
* Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt}
* such that it can be used within a Flink streaming program. The output type will be one of * such that it can be used within a Flink streaming program. The output type will be one of
Expand Down Expand Up @@ -93,11 +102,12 @@ public void open(final Configuration parameters) throws Exception {


final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext( final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext(
(StreamingRuntimeContext)super.runtimeContext, false); (StreamingRuntimeContext)super.runtimeContext, false);
flinkCollector = new TimestampedCollector<OUT>(output);
OutputCollector stormCollector = null; OutputCollector stormCollector = null;


if (this.numberOfAttributes != -1) { if (this.numberOfAttributes != -1) {
stormCollector = new OutputCollector(new StormBoltCollector<OUT>( stormCollector = new OutputCollector(new StormBoltCollector<OUT>(
this.numberOfAttributes, super.output)); this.numberOfAttributes, flinkCollector));
} }


this.bolt.prepare(null, topologyContext, stormCollector); this.bolt.prepare(null, topologyContext, stormCollector);
Expand All @@ -110,8 +120,13 @@ public void close() throws Exception {
} }


@Override @Override
public void processElement(final IN element) throws Exception { public void processElement(final StreamRecord<IN> element) throws Exception {
this.bolt.execute(new StormTuple<IN>(element)); flinkCollector.setTimestamp(element.getTimestamp());
this.bolt.execute(new StormTuple<IN>(element.getValue()));
} }


@Override
public void processWatermark(Watermark mark) throws Exception {
output.emitWatermark(mark);
}
} }
Expand Up @@ -114,9 +114,9 @@ private void testWrapper(final int numberOfAttributes) throws Exception {


final StreamRecord record = mock(StreamRecord.class); final StreamRecord record = mock(StreamRecord.class);
if (numberOfAttributes == 0) { if (numberOfAttributes == 0) {
when(record.getObject()).thenReturn(rawTuple); when(record.getValue()).thenReturn(rawTuple);
} else { } else {
when(record.getObject()).thenReturn(flinkTuple); when(record.getValue()).thenReturn(flinkTuple);
} }


final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
Expand All @@ -129,8 +129,9 @@ private void testWrapper(final int numberOfAttributes) throws Exception {


final StormBoltWrapper wrapper = new StormBoltWrapper(bolt); final StormBoltWrapper wrapper = new StormBoltWrapper(bolt);
wrapper.setup(mock(Output.class), taskContext); wrapper.setup(mock(Output.class), taskContext);
wrapper.open(new Configuration());


wrapper.processElement(record.getObject()); wrapper.processElement(record);
if (numberOfAttributes == 0) { if (numberOfAttributes == 0) {
verify(bolt).execute(eq(new StormTuple<String>(rawTuple))); verify(bolt).execute(eq(new StormTuple<String>(rawTuple)));
} else { } else {
Expand Down
Expand Up @@ -19,6 +19,8 @@


import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;

import java.util.LinkedList; import java.util.LinkedList;


class TestContext implements SourceContext<Tuple1<Integer>> { class TestContext implements SourceContext<Tuple1<Integer>> {
Expand All @@ -32,9 +34,23 @@ public void collect(final Tuple1<Integer> record) {
this.result.add(record.copy()); this.result.add(record.copy());
} }


@Override
public void collectWithTimestamp(Tuple1<Integer> element, long timestamp) {
this.result.add(element.copy());
}

@Override
public void emitWatermark(Watermark mark) {
// ignore it
}

@Override @Override
public Object getCheckpointLock() { public Object getCheckpointLock() {
return null; return null;
} }


@Override
public void close() {

}
} }
3 changes: 2 additions & 1 deletion flink-core/pom.xml
Expand Up @@ -63,7 +63,8 @@ under the License.
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>${guava.version}</version> <version>${guava.version}</version>
</dependency> </dependency>
</dependencies>
</dependencies>


<build> <build>
<plugins> <plugins>
Expand Down
Expand Up @@ -89,6 +89,10 @@ public class ExecutionConfig implements Serializable {


private GlobalJobParameters globalJobParameters = null; private GlobalJobParameters globalJobParameters = null;


private long autoWatermarkInterval = 0;

private boolean timestampsEnabled = false;

// Serializers and types registered with Kryo and the PojoSerializer // Serializers and types registered with Kryo and the PojoSerializer
// we store them in lists to ensure they are registered in order in all kryo instances. // we store them in lists to ensure they are registered in order in all kryo instances.


Expand Down Expand Up @@ -140,6 +144,62 @@ public boolean isClosureCleanerEnabled() {
return useClosureCleaner; return useClosureCleaner;
} }


/**
* Sets the interval of the automatic watermark emission. Watermaks are used throughout
* the streaming system to keep track of the progress of time. They are used, for example,
* for time based windowing.
*
* @param interval The interval between watermarks in milliseconds.
*/
public ExecutionConfig setAutoWatermarkInterval(long interval) {
this.autoWatermarkInterval = interval;
return this;
}

/**
* Enables streaming timestamps. When this is enabled all records that are emitted
* from a source have a timestamp attached. This is required if a topology contains
* operations that rely on watermarks and timestamps to perform operations, such as
* event-time windows.
*
* <p>
* This is automatically enabled if you enable automatic watermarks.
*
* @see #setAutoWatermarkInterval(long)
*/
public ExecutionConfig enableTimestamps() {
this.timestampsEnabled = true;
return this;
}

/**
* Disables streaming timestamps.
*
* @see #enableTimestamps()
*/
public ExecutionConfig disableTimestamps() {
this.timestampsEnabled = false;
return this;
}

/**
* Returns true when timestamps are enabled.
*
* @see #enableTimestamps()
*/
public boolean areTimestampsEnabled() {
return timestampsEnabled;
}

/**
* Returns the interval of the automatic watermark emission.
*
* @see #setAutoWatermarkInterval(long)
*/
public long getAutoWatermarkInterval() {
return this.autoWatermarkInterval;
}

/** /**
* Gets the parallelism with which operation are executed by default. Operations can * Gets the parallelism with which operation are executed by default. Operations can
* individually override this value to use a specific parallelism. * individually override this value to use a specific parallelism.
Expand Down Expand Up @@ -637,6 +697,8 @@ public String toString() {
* getRuntimeContext().getExecutionConfig().getUserConfig() * getRuntimeContext().getExecutionConfig().getUserConfig()
*/ */
public static class GlobalJobParameters implements Serializable { public static class GlobalJobParameters implements Serializable {
private static final long serialVersionUID = 1L;

/** /**
* Convert UserConfig into a Map<String, String> representation. * Convert UserConfig into a Map<String, String> representation.
* This can be used by the runtime, for example for presenting the user config in the web frontend. * This can be used by the runtime, for example for presenting the user config in the web frontend.
Expand Down
Expand Up @@ -114,6 +114,32 @@ public void emit(T record) throws IOException, InterruptedException {
} }
} }


/**
* This is used to broadcast Streaming Watermarks in-band with records. This ignores
* the {@link ChannelSelector}.
*/
public void broadcastEmit(T record) throws IOException, InterruptedException {
for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
// serialize with corresponding serializer and send full buffer
RecordSerializer<T> serializer = serializers[targetChannel];

synchronized (serializer) {
SerializationResult result = serializer.addRecord(record);
while (result.isFullBuffer()) {
Buffer buffer = serializer.getCurrentBuffer();

if (buffer != null) {
writer.writeBuffer(buffer, targetChannel);
serializer.clearCurrentBuffer();
}

buffer = writer.getBufferProvider().requestBufferBlocking();
result = serializer.setNextBuffer(buffer);
}
}
}
}

public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
RecordSerializer<T> serializer = serializers[targetChannel]; RecordSerializer<T> serializer = serializers[targetChannel];
Expand Down
9 changes: 8 additions & 1 deletion flink-staging/flink-streaming/flink-streaming-core/pom.xml
Expand Up @@ -78,7 +78,14 @@ under the License.
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>${guava.version}</version> <version>${guava.version}</version>
</dependency> </dependency>
</dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.5</version>
</dependency>

</dependencies>


<build> <build>
<plugins> <plugins>
Expand Down

This file was deleted.

0 comments on commit a2eb6cc

Please sign in to comment.