From f594d7f4c30dd3cc13ad482eb017689fbb9c0c43 Mon Sep 17 00:00:00 2001 From: Tony Wei Date: Thu, 30 Mar 2017 09:48:43 +0800 Subject: [PATCH] [FLINK-5625] Let Date format for timestamp-based start position in Kinesis consumer be configurable --- docs/dev/connectors/kinesis.md | 8 +- .../config/ConsumerConfigConstants.java | 3 + .../kinesis/internals/ShardConsumer.java | 20 ++++- .../kinesis/util/KinesisConfigUtil.java | 34 ++++++--- .../kinesis/FlinkKinesisConsumerTest.java | 76 +++++++++++++++++-- 5 files changed, 115 insertions(+), 26 deletions(-) diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index 59f3d616e2f18..1fbdd679f82fa 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -121,9 +121,11 @@ one of the following values in the provided configuration properties (the naming - `LATEST`: read all shards of all streams starting from the latest record. - `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings). - `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration -properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern -`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative double value representing the number of seconds -that has elapsed since the Unix epoch (for example, `1459799926.480`). +properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : + - `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`). + - a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`). + - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`, + (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user). #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 4ffe0ad7f308f..1387e99f0ea63 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -59,6 +59,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp"; + /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ + public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; + /** The base backoff time between each describeStream attempt */ public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index f6c53ce4e81d0..31d838657d33a 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -38,6 +38,7 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Properties; @@ -115,10 +116,21 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP); - try { - this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - } catch (ParseException e) { - this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000)); + + if (consumerConfig.containsKey(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT)) { + try { + String format = ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT; + SimpleDateFormat customDateFormat = new SimpleDateFormat(consumerConfig.getProperty(format)); + this.initTimestamp = customDateFormat.parse(timestamp); + } catch (Exception e) { + throw new IllegalArgumentException(e.getCause()); + } + } else { + try { + this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); + } catch (ParseException e) { + this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000)); + } } } else { this.initTimestamp = null; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index 59b852974c57a..63a7769e8b358 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -28,6 +28,7 @@ import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Properties; import static org.apache.flink.util.Preconditions.checkArgument; @@ -67,7 +68,9 @@ public static void validateConsumerConfiguration(Properties config) { throw new IllegalArgumentException("Please set value for initial timestamp ('" + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); } - validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, + validateOptionalDateProperty(config, + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, + ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " + "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 ."); } @@ -222,19 +225,28 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } - private static void validateOptionalDateProperty(Properties config, String key, String message) { - if (config.containsKey(key)) { - try { - initTimestampDateFormat.parse(config.getProperty(key)); - } catch (ParseException parseException) { + private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) { + if (config.containsKey(timestampKey)) { + if (config.containsKey(formatKey)) { try { - double value = Double.parseDouble(config.getProperty(key)); - if (value < 0) { - throw new NumberFormatException(); - } - } catch (NumberFormatException numberFormatException){ + SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); + customDateFormat.parse(config.getProperty(timestampKey)); + } catch (ParseException | IllegalArgumentException | NullPointerException exception) { throw new IllegalArgumentException(message); } + } else { + try { + initTimestampDateFormat.parse(config.getProperty(timestampKey)); + } catch (ParseException exception) { + try { + double value = Double.parseDouble(config.getProperty(timestampKey)); + if (value < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException numberFormatException) { + throw new IllegalArgumentException(message); + } + } } } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 45eb1bdeb072b..93bfb058a26d3 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -41,6 +41,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -183,7 +184,7 @@ public void testIllegalValueForInitialTimestampInConfig() { } @Test - public void testDateStringInValidateOptionDatePropertyForInitialTimestampInConfig() { + public void testDateStringForValidateOptionDateProperty() { String timestamp = "2016-04-04T19:58:46.480-00:00"; Properties testConfig = new Properties(); @@ -194,18 +195,18 @@ public void testDateStringInValidateOptionDatePropertyForInitialTimestampInConfi testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp); - KinesisConfigUtil.validateConsumerConfiguration(testConfig); - try { + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - } catch (ParseException e){ + } catch (Exception e) { e.printStackTrace(); fail(); } } @Test - public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInConfig() { + public void testUnixTimestampForValidateOptionDateProperty() { String unixTimestamp = "1459799926.480"; Properties testConfig = new Properties(); @@ -216,14 +217,73 @@ public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInCo testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); - KinesisConfigUtil.validateConsumerConfiguration(testConfig); + try { + KinesisConfigUtil.validateConsumerConfiguration(testConfig); - try{ double value = Double.parseDouble(unixTimestamp); if (value < 0) { throw new NumberFormatException(); } - } catch (Exception e){ + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testInvalidPatternForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "1459799926.480"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() { + String unixTimestamp = "2016-04-04"; + String pattern = "yyyy-MM-dd"; + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern); + + try { + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + + SimpleDateFormat customDateFormat = new SimpleDateFormat(pattern); + customDateFormat.parse(unixTimestamp); + } catch (Exception e) { e.printStackTrace(); fail(); }