From 765945b50ea199d3a096ea360bcfa0cbee1107e2 Mon Sep 17 00:00:00 2001 From: Gordon Tai Date: Fri, 8 Jul 2016 08:46:31 +0800 Subject: [PATCH 1/2] [FLINK-4019][kinesis-connector] Use Kinesis records' approximateArrivalTimestamp Used in the following: 1) Exposed through the KinesisDeserializationSchema for users to obtain 2) Attatched to records as the default event time --- docs/apis/streaming/connectors/kinesis.md | 23 +++++++++++++++++++ .../kinesis/internals/KinesisDataFetcher.java | 5 ++-- .../kinesis/internals/ShardConsumer.java | 17 ++++++++++++-- .../KinesisDeserializationSchema.java | 3 ++- .../KinesisDeserializationSchemaWrapper.java | 3 ++- .../FakeKinesisBehavioursFactory.java | 2 ++ .../testutils/TestableKinesisDataFetcher.java | 2 +- 7 files changed, 48 insertions(+), 7 deletions(-) diff --git a/docs/apis/streaming/connectors/kinesis.md b/docs/apis/streaming/connectors/kinesis.md index 1fc9961759794..ece38f9628db3 100644 --- a/docs/apis/streaming/connectors/kinesis.md +++ b/docs/apis/streaming/connectors/kinesis.md @@ -146,6 +146,29 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. +#### Event Time for Consumed Records + +
+
+{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} +
+
+{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} +
+
+ +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). + #### Threading Model The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption. diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index ab43f63a6fad4..7dba66cea2baa 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -131,7 +131,7 @@ public class KinesisDataFetcher { * */ private final List subscribedShardsState; @@ -491,13 +491,14 @@ protected KinesisDeserializationSchema getClonedDeserializationSchema() { * This method is called by {@link ShardConsumer}s. * * @param record the record to collect + * @param recordTimestamp timestamp to attach to the collected record * @param shardStateIndex index of the shard to update in subscribedShardsState; * this index should be the returned value from * {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called * when the shard state was registered. * @param lastSequenceNumber the last sequence number value to update */ - protected void emitRecordAndUpdateState(T record, int shardStateIndex, SequenceNumber lastSequenceNumber) { + protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { sourceContext.collect(record); updateState(shardStateIndex, lastSequenceNumber); diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 262c8898c5cdb..3b6834397fbd3 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -21,6 +21,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; @@ -185,6 +186,14 @@ private boolean isRunning() { return !Thread.interrupted(); } + /** + * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. + * Note that the server-side Kinesis timestamp is attached to the record when collected. When the + * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default. + * + * @param record + * @throws IOException + */ private void deserializeRecordForCollectionAndUpdateState(UserRecord record) throws IOException { ByteBuffer recordData = record.getData(); @@ -194,17 +203,21 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record) byte[] keyBytes = record.getPartitionKey().getBytes(); - final T value = deserializer.deserialize(keyBytes, dataBytes, subscribedShard.getStreamName(), - record.getSequenceNumber()); + final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime(); + + final T value = deserializer.deserialize( + keyBytes, dataBytes, subscribedShard.getStreamName(), record.getSequenceNumber(), approxArrivalTimestamp); if (record.isAggregated()) { fetcherRef.emitRecordAndUpdateState( value, + approxArrivalTimestamp, subscribedShardStateIndex, new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())); } else { fetcherRef.emitRecordAndUpdateState( value, + approxArrivalTimestamp, subscribedShardStateIndex, new SequenceNumber(record.getSequenceNumber())); } diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java index 1fd6bf58486c4..dfacc9a169de7 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java @@ -39,10 +39,11 @@ public interface KinesisDeserializationSchema extends Serializable, ResultTyp * @param recordValue the record's value as a byte array * @param stream the name of the Kinesis stream that this record was sent to * @param seqNum the sequence number of this record in the Kinesis shard + * @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record * @return the deserialized message as an Java object * @throws IOException */ - T deserialize(byte[] recordKey, byte[] recordValue, String stream, String seqNum) throws IOException; + T deserialize(byte[] recordKey, byte[] recordValue, String stream, String seqNum, long approxArrivalTimestamp) throws IOException; /** * Method to decide whether the element signals the end of the stream. If diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java index 19c3e72390812..c534be7dace66 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java @@ -37,7 +37,8 @@ public KinesisDeserializationSchemaWrapper(DeserializationSchema deserializat } @Override - public T deserialize(byte[] recordKey, byte[] recordValue, String stream, String seqNu) throws IOException { + public T deserialize(byte[] recordKey, byte[] recordValue, String stream, String seqNum, long approxArrivalTimestamp) + throws IOException { return deserializationSchema.deserialize(recordValue); } diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java index 82509dab812bc..fc98fca4fbf19 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import java.nio.ByteBuffer; +import java.util.Date; import java.util.HashMap; import java.util.ArrayList; import java.util.LinkedList; @@ -139,6 +140,7 @@ public static List createRecordBatchWithRange(int min, int max) { new Record() .withData(ByteBuffer.wrap(String.valueOf(i).getBytes())) .withPartitionKey(UUID.randomUUID().toString()) + .withApproximateArrivalTimestamp(new Date(System.currentTimeMillis())) .withSequenceNumber(String.valueOf(i))); } return batch; diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java index 765ff7e670497..57886fe694fef 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java @@ -74,7 +74,7 @@ protected KinesisDeserializationSchema getClonedDeserializationSchema() } @Override - protected void emitRecordAndUpdateState(String record, int shardStateIndex, SequenceNumber lastSequenceNumber) { + protected void emitRecordAndUpdateState(String record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (fakeCheckpointLock) { this.numElementsCollected++; updateState(shardStateIndex, lastSequenceNumber); From 077624e38a751181a4d78434335b1048de4921db Mon Sep 17 00:00:00 2001 From: Gordon Tai Date: Fri, 8 Jul 2016 20:04:13 +0800 Subject: [PATCH 2/2] [FLINK-4019] Address PR review comments. --- docs/apis/streaming/connectors/kinesis.md | 25 +++++++++++++++++-- .../kinesis/internals/KinesisDataFetcher.java | 2 +- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/docs/apis/streaming/connectors/kinesis.md b/docs/apis/streaming/connectors/kinesis.md index ece38f9628db3..bb309be436793 100644 --- a/docs/apis/streaming/connectors/kinesis.md +++ b/docs/apis/streaming/connectors/kinesis.md @@ -83,7 +83,7 @@ kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYP StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment(); DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( - "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)) + "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); {% endhighlight %}
@@ -164,11 +164,32 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record -timestamps, an *approximate arrival timestamp* will be used. This timestamp is attached to records by Kinesis once they +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be ascending). +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, +it can be passed to the consumer in the following way: + +
+
+{% highlight java %} +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( + "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner()); +{% endhighlight %} +
+
+{% highlight scala %} +val kinesis = env.addSource(new FlinkKinesisConsumer[String]( + "kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig)) +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner) +{% endhighlight %} +
+
+ #### Threading Model The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption. diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 7dba66cea2baa..cb5812ecb09bc 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -500,7 +500,7 @@ protected KinesisDeserializationSchema getClonedDeserializationSchema() { */ protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { - sourceContext.collect(record); + sourceContext.collectWithTimestamp(record, recordTimestamp); updateState(shardStateIndex, lastSequenceNumber); } }