From f23a699f990ee5f86a12b2b5249a055c27ff1caf Mon Sep 17 00:00:00 2001 From: Gordon Tai Date: Wed, 13 Jul 2016 15:36:56 +0800 Subject: [PATCH] [hotfix][kinesis-connector] Remove duplicate info in KinesisDeserializationSchema --- .../streaming/connectors/kinesis/internals/ShardConsumer.java | 3 --- .../kinesis/serialization/KinesisDeserializationSchema.java | 3 +-- .../serialization/KinesisDeserializationSchemaWrapper.java | 2 +- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index dbd97f8e2ec9a..3f0d2bb32c633 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -209,12 +209,9 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record) byte[] dataBytes = new byte[recordData.remaining()]; recordData.get(dataBytes); - byte[] keyBytes = record.getPartitionKey().getBytes(); - final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime(); final T value = deserializer.deserialize( - keyBytes, dataBytes, record.getPartitionKey(), record.getSequenceNumber(), diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java index c94df20f93422..0effdd807c253 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java @@ -35,7 +35,6 @@ public interface KinesisDeserializationSchema extends Serializable, ResultTyp /** * Deserializes a Kinesis record's bytes * - * @param recordKey the records's key as a byte array (null if no key has been set for the record) * @param recordValue the record's value as a byte array * @param partitionKey the record's partition key at the time of writing * @param seqNum the sequence number of this record in the Kinesis shard @@ -45,7 +44,7 @@ public interface KinesisDeserializationSchema extends Serializable, ResultTyp * @return the deserialized message as an Java object * @throws IOException */ - T deserialize(byte[] recordKey, byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException; + T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException; /** * Method to decide whether the element signals the end of the stream. If diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java index 86fb72b978452..6e66038a9c2b5 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java @@ -37,7 +37,7 @@ public KinesisDeserializationSchemaWrapper(DeserializationSchema deserializat } @Override - public T deserialize(byte[] recordKey, byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) + public T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException { return deserializationSchema.deserialize(recordValue); }