From f594d7f4c30dd3cc13ad482eb017689fbb9c0c43 Mon Sep 17 00:00:00 2001 From: Tony Wei Date: Thu, 30 Mar 2017 09:48:43 +0800 Subject: [PATCH 1/5] [FLINK-5625] Let Date format for timestamp-based start position in Kinesis consumer be configurable --- docs/dev/connectors/kinesis.md | 8 +- .../config/ConsumerConfigConstants.java | 3 + .../kinesis/internals/ShardConsumer.java | 20 ++++- .../kinesis/util/KinesisConfigUtil.java | 34 ++++++--- .../kinesis/FlinkKinesisConsumerTest.java | 76 +++++++++++++++++-- 5 files changed, 115 insertions(+), 26 deletions(-) diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index 59f3d616e2f18..1fbdd679f82fa 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -121,9 +121,11 @@ one of the following values in the provided configuration properties (the naming - `LATEST`: read all shards of all streams starting from the latest record. - `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings). - `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration -properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern -`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative double value representing the number of seconds -that has elapsed since the Unix epoch (for example, `1459799926.480`). +properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : + - `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`). + - a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`). + - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`, + (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user). #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 4ffe0ad7f308f..1387e99f0ea63 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -59,6 +59,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp"; + /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ + public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; + /** The base backoff time between each describeStream attempt */ public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index f6c53ce4e81d0..31d838657d33a 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -38,6 +38,7 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Properties; @@ -115,10 +116,21 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP); - try { - this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - } catch (ParseException e) { - this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000)); + + if (consumerConfig.containsKey(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT)) { + try { + String format = ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT; + SimpleDateFormat customDateFormat = new SimpleDateFormat(consumerConfig.getProperty(format)); + this.initTimestamp = customDateFormat.parse(timestamp); + } catch (Exception e) { + throw new IllegalArgumentException(e.getCause()); + } + } else { + try { + this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); + } catch (ParseException e) { + this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000)); + } } } else { this.initTimestamp = null; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index 59b852974c57a..63a7769e8b358 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -28,6 +28,7 @@ import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Properties; import static org.apache.flink.util.Preconditions.checkArgument; @@ -67,7 +68,9 @@ public static void validateConsumerConfiguration(Properties config) { throw new IllegalArgumentException("Please set value for initial timestamp ('" + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); } - validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, + validateOptionalDateProperty(config, + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, + ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " + "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 ."); } @@ -222,19 +225,28 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } - private static void validateOptionalDateProperty(Properties config, String key, String message) { - if (config.containsKey(key)) { - try { - initTimestampDateFormat.parse(config.getProperty(key)); - } catch (ParseException parseException) { + private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) { + if (config.containsKey(timestampKey)) { + if (config.containsKey(formatKey)) { try { - double value = Double.parseDouble(config.getProperty(key)); - if (value < 0) { - throw new NumberFormatException(); - } - } catch (NumberFormatException numberFormatException){ + SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); + customDateFormat.parse(config.getProperty(timestampKey)); + } catch (ParseException | IllegalArgumentException | NullPointerException exception) { throw new IllegalArgumentException(message); } + } else { + try { + initTimestampDateFormat.parse(config.getProperty(timestampKey)); + } catch (ParseException exception) { + try { + double value = Double.parseDouble(config.getProperty(timestampKey)); + if (value < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException numberFormatException) { + throw new IllegalArgumentException(message); + } + } } } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 45eb1bdeb072b..93bfb058a26d3 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -41,6 +41,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -183,7 +184,7 @@ public void testIllegalValueForInitialTimestampInConfig() { } @Test - public void testDateStringInValidateOptionDatePropertyForInitialTimestampInConfig() { + public void testDateStringForValidateOptionDateProperty() { String timestamp = "2016-04-04T19:58:46.480-00:00"; Properties testConfig = new Properties(); @@ -194,18 +195,18 @@ public void testDateStringInValidateOptionDatePropertyForInitialTimestampInConfi testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp); - KinesisConfigUtil.validateConsumerConfiguration(testConfig); - try { + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - } catch (ParseException e){ + } catch (Exception e) { e.printStackTrace(); fail(); } } @Test - public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInConfig() { + public void testUnixTimestampForValidateOptionDateProperty() { String unixTimestamp = "1459799926.480"; Properties testConfig = new Properties(); @@ -216,14 +217,73 @@ public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInCo testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); - KinesisConfigUtil.validateConsumerConfiguration(testConfig); + try { + KinesisConfigUtil.validateConsumerConfiguration(testConfig); - try{ double value = Double.parseDouble(unixTimestamp); if (value < 0) { throw new NumberFormatException(); } - } catch (Exception e){ + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testInvalidPatternForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "1459799926.480"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() { + String unixTimestamp = "2016-04-04"; + String pattern = "yyyy-MM-dd"; + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern); + + try { + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + + SimpleDateFormat customDateFormat = new SimpleDateFormat(pattern); + customDateFormat.parse(unixTimestamp); + } catch (Exception e) { e.printStackTrace(); fail(); } From 3eb9ee71aa16d9745d1c65c577d0580dce21a581 Mon Sep 17 00:00:00 2001 From: Tony Wei Date: Thu, 30 Mar 2017 13:35:20 +0800 Subject: [PATCH 2/5] remove UnusedImports --- .../streaming/connectors/kinesis/util/KinesisConfigUtil.java | 1 - .../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java | 1 - 2 files changed, 2 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index 63a7769e8b358..345334837d244 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -28,7 +28,6 @@ import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.Date; import java.util.Properties; import static org.apache.flink.util.Preconditions.checkArgument; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 93bfb058a26d3..eec704633fa47 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -40,7 +40,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; From 31e26832e929edfd1c2e9a40d1b306e767e1658c Mon Sep 17 00:00:00 2001 From: Tony Wei Date: Thu, 30 Mar 2017 14:44:25 +0800 Subject: [PATCH 3/5] address comments --- docs/dev/connectors/kinesis.md | 6 ++-- .../config/ConsumerConfigConstants.java | 2 ++ .../kinesis/internals/ShardConsumer.java | 22 ++++--------- .../kinesis/util/KinesisConfigUtil.java | 32 +++++++------------ .../kinesis/FlinkKinesisConsumerTest.java | 5 +-- 5 files changed, 26 insertions(+), 41 deletions(-) diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index 1fbdd679f82fa..632c3897bc82a 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -122,10 +122,10 @@ one of the following values in the provided configuration properties (the naming - `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings). - `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : - - `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`). - a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`). - - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`, - (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user). + - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`. + If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then ehe default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` + (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern). #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 1387e99f0ea63..7a9699a3eb96f 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -110,6 +110,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString(); + public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; + public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L; public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 31d838657d33a..9f14024dadbc0 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -30,7 +30,6 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; -import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,20 +116,13 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP); - if (consumerConfig.containsKey(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT)) { - try { - String format = ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT; - SimpleDateFormat customDateFormat = new SimpleDateFormat(consumerConfig.getProperty(format)); - this.initTimestamp = customDateFormat.parse(timestamp); - } catch (Exception e) { - throw new IllegalArgumentException(e.getCause()); - } - } else { - try { - this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - } catch (ParseException e) { - this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000)); - } + try { + String format = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, + ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT); + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + this.initTimestamp = customDateFormat.parse(timestamp); + } catch (ParseException | IllegalArgumentException | NullPointerException exception) { + this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000)); } } else { this.initTimestamp = null; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index 345334837d244..6042cb9784a00 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -37,8 +37,6 @@ * Utilities for Flink Kinesis connector configuration. */ public class KinesisConfigUtil { - public static SimpleDateFormat initTimestampDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); - /** * Validate configuration properties for {@link FlinkKinesisConsumer}. */ @@ -69,7 +67,7 @@ public static void validateConsumerConfiguration(Properties config) { } validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, - ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, + config.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT), "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " + "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 ."); } @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } - private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) { + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) { if (config.containsKey(timestampKey)) { - if (config.containsKey(formatKey)) { - try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - } catch (ParseException | IllegalArgumentException | NullPointerException exception) { - throw new IllegalArgumentException(message); - } - } else { + try { + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + customDateFormat.parse(config.getProperty(timestampKey)); + } catch (ParseException | IllegalArgumentException | NullPointerException exception) { try { - initTimestampDateFormat.parse(config.getProperty(timestampKey)); - } catch (ParseException exception) { - try { - double value = Double.parseDouble(config.getProperty(timestampKey)); - if (value < 0) { - throw new NumberFormatException(); - } - } catch (NumberFormatException numberFormatException) { - throw new IllegalArgumentException(message); + double value = Double.parseDouble(config.getProperty(timestampKey)); + if (value < 0) { + throw new NumberFormatException(); } + } catch (NumberFormatException numberFormatException) { + throw new IllegalArgumentException(message); } } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index eec704633fa47..9e6407ea7927b 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -197,7 +197,8 @@ public void testDateStringForValidateOptionDateProperty() { try { KinesisConfigUtil.validateConsumerConfiguration(testConfig); - KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); + SimpleDateFormat customDateFormat = new SimpleDateFormat(ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT); + customDateFormat.parse(timestamp); } catch (Exception e) { e.printStackTrace(); fail(); @@ -257,7 +258,7 @@ public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfi testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); - testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "1459799926.480"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable"); testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd"); KinesisConfigUtil.validateConsumerConfiguration(testConfig); From 468b2174c144b89170463736ba9ace77eab66af4 Mon Sep 17 00:00:00 2001 From: Tony Wei Date: Thu, 30 Mar 2017 16:20:19 +0800 Subject: [PATCH 4/5] handle the exceptions more carefully --- docs/dev/connectors/kinesis.md | 2 +- .../connectors/kinesis/internals/ShardConsumer.java | 4 +++- .../connectors/kinesis/util/KinesisConfigUtil.java | 6 ++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index 632c3897bc82a..ef1afca8fb4cc 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -124,7 +124,7 @@ one of the following values in the provided configuration properties (the naming properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : - a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`). - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`. - If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then ehe default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` + If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern). #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 9f14024dadbc0..dfaee0c8fa860 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -121,7 +121,9 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT); SimpleDateFormat customDateFormat = new SimpleDateFormat(format); this.initTimestamp = customDateFormat.parse(timestamp); - } catch (ParseException | IllegalArgumentException | NullPointerException exception) { + } catch (IllegalArgumentException | NullPointerException exception) { + throw new IllegalArgumentException(exception.getCause()); + } catch (ParseException exception) { this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000)); } } else { diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index 6042cb9784a00..244f5a5e7a61d 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -227,11 +227,13 @@ private static void validateOptionalDateProperty(Properties config, String times try { SimpleDateFormat customDateFormat = new SimpleDateFormat(format); customDateFormat.parse(config.getProperty(timestampKey)); - } catch (ParseException | IllegalArgumentException | NullPointerException exception) { + } catch (IllegalArgumentException | NullPointerException exception) { + throw new IllegalArgumentException(message); + } catch (ParseException exception) { try { double value = Double.parseDouble(config.getProperty(timestampKey)); if (value < 0) { - throw new NumberFormatException(); + throw new IllegalArgumentException(message); } } catch (NumberFormatException numberFormatException) { throw new IllegalArgumentException(message); From 12bc549c96b022902de17f21bd65f66572d03797 Mon Sep 17 00:00:00 2001 From: Tony Wei Date: Thu, 30 Mar 2017 18:22:19 +0800 Subject: [PATCH 5/5] address other comments --- .../kinesis/config/ConsumerConfigConstants.java | 4 ++-- .../connectors/kinesis/internals/ShardConsumer.java | 2 +- .../connectors/kinesis/FlinkKinesisConsumerTest.java | 11 ----------- 3 files changed, 3 insertions(+), 14 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 7a9699a3eb96f..7c31af44ca71c 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -56,10 +56,10 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; - /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ + /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */ public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp"; - /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ + /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; /** The base backoff time between each describeStream attempt */ diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index dfaee0c8fa860..ca85854ea99fa 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -122,7 +122,7 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SimpleDateFormat customDateFormat = new SimpleDateFormat(format); this.initTimestamp = customDateFormat.parse(timestamp); } catch (IllegalArgumentException | NullPointerException exception) { - throw new IllegalArgumentException(exception.getCause()); + throw new IllegalArgumentException(exception); } catch (ParseException exception) { this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000)); } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 9e6407ea7927b..741f0ca689b64 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -196,9 +196,6 @@ public void testDateStringForValidateOptionDateProperty() { try { KinesisConfigUtil.validateConsumerConfiguration(testConfig); - - SimpleDateFormat customDateFormat = new SimpleDateFormat(ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT); - customDateFormat.parse(timestamp); } catch (Exception e) { e.printStackTrace(); fail(); @@ -219,11 +216,6 @@ public void testUnixTimestampForValidateOptionDateProperty() { try { KinesisConfigUtil.validateConsumerConfiguration(testConfig); - - double value = Double.parseDouble(unixTimestamp); - if (value < 0) { - throw new NumberFormatException(); - } } catch (Exception e) { e.printStackTrace(); fail(); @@ -280,9 +272,6 @@ public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty( try { KinesisConfigUtil.validateConsumerConfiguration(testConfig); - - SimpleDateFormat customDateFormat = new SimpleDateFormat(pattern); - customDateFormat.parse(unixTimestamp); } catch (Exception e) { e.printStackTrace(); fail();