From 1e3d98f89dc476fd29f1bf5e73be9c1dbb2103d1 Mon Sep 17 00:00:00 2001 From: Gordon Tai Date: Mon, 11 Jul 2016 22:41:48 +0800 Subject: [PATCH 1/2] [FLINK-4170] Simplify Kinesis connecter config keys to be less overly verbose --- docs/apis/streaming/connectors/kinesis.md | 129 ++++----- .../kinesis/FlinkKinesisConsumer.java | 2 +- .../kinesis/FlinkKinesisProducer.java | 15 +- .../kinesis/config/AWSConfigConstants.java | 67 +++++ .../config/ConsumerConfigConstants.java | 131 +++++++++ .../config/CredentialProviderType.java | 39 --- .../config/KinesisConfigConstants.java | 134 ---------- ...tion.java => ProducerConfigConstants.java} | 16 +- .../kinesis/examples/ConsumeFromKinesis.java | 8 +- .../kinesis/examples/ProduceIntoKinesis.java | 8 +- .../kinesis/internals/KinesisDataFetcher.java | 13 +- .../kinesis/internals/ShardConsumer.java | 10 +- .../kinesis/proxy/KinesisProxy.java | 46 ++-- .../connectors/kinesis/util/AWSUtil.java | 22 +- .../kinesis/util/KinesisConfigUtil.java | 182 +++++++------ .../kinesis/FlinkKinesisConsumerTest.java | 250 ++++++++++-------- .../ManualConsumerProducerTest.java | 15 +- .../manualtests/ManualExactlyOnceTest.java | 8 +- ...alExactlyOnceWithStreamReshardingTest.java | 10 +- .../manualtests/ManualProducerTest.java | 8 +- .../ExactlyOnceValidatingConsumerThread.java | 11 +- .../KinesisEventsGeneratorProducerThread.java | 8 +- 22 files changed, 589 insertions(+), 543 deletions(-) create mode 100644 flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java create mode 100644 flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java delete mode 100644 flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/CredentialProviderType.java delete mode 100644 flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java rename flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/{InitialPosition.java => ProducerConfigConstants.java} (62%) diff --git a/docs/apis/streaming/connectors/kinesis.md b/docs/apis/streaming/connectors/kinesis.md index e37272ece55c8..e4ded62a7ff76 100644 --- a/docs/apis/streaming/connectors/kinesis.md +++ b/docs/apis/streaming/connectors/kinesis.md @@ -69,51 +69,43 @@ Before consuming data from Kinesis streams, make sure that all streams are creat
{% highlight java %} -Properties kinesisConsumerConfig = new Properties(); -kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); -kinesisConsumerConfig.put( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, - "aws_access_key_id_here"); -kinesisConsumerConfig.put( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, - "aws_secret_key_here"); -kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "LATEST"); +Properties consumerConfig = new Properties(); +consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); +consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment(); DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( - "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); + "kinesis_stream_name", new SimpleStringSchema(), consumerConfig)); {% endhighlight %}
{% highlight scala %} -val kinesisConsumerConfig = new Properties(); -kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); -kinesisConsumerConfig.put( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, - "aws_access_key_id_here"); -kinesisConsumerConfig.put( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, - "aws_secret_key_here"); -kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "LATEST"); +val consumerConfig = new Properties(); +consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); +consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); val env = StreamExecutionEnvironment.getEnvironment val kinesis = env.addSource(new FlinkKinesisConsumer[String]( - "kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig)) + "kinesis_stream_name", new SimpleStringSchema, consumerConfig)) {% endhighlight %}
The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties` -instance, the setting keys for which are enumerated in `KinesisConfigConstants`. The example +instance, the configuration keys for which can be found in `ConsumerConfigConstants`. The example demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which -the AWS access key ID and secret key are directly supplied in the configuration (other options are setting -`KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE` to `ENV_VAR`, `SYS_PROP`, and `PROFILE`). Also, data is being consumed -from the newest position in the Kinesis stream (the other option will be setting `KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE` +the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting +`ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, and `PROFILE`). Also, data is being consumed +from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible). -Other optional configuration keys can be found in `KinesisConfigConstants`. +Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`. #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics @@ -214,7 +206,7 @@ on how to deal with any errors or warnings that the Flink Kinesis Consumer may h by a single thread in each parallel consumer subtask to discover any new shards as a result of stream resharding. By default, the consumer performs the shard discovery at an interval of 10 seconds, and will retry indefinitely until it gets a result from Kinesis. If this interferes with other non-Flink consuming applications, users can slow down the consumer of -calling this API by setting a value for `KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS` in the supplied +calling this API by setting a value for `ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` in the supplied configuration properties. This sets the discovery interval to a different value. Note that this setting directly impacts the maximum delay of discovering a new shard and starting to consume it, as shards will not be discovered during the interval. @@ -223,18 +215,18 @@ only once when per shard consuming threads are started, and will retry if Kinesi API has exceeded, up to a default of 3 attempts. Note that since the rate limit for this API is per shard (not per stream), the consumer itself should not exceed the limit. Usually, if this happens, users can either try to slow down any other non-Flink consuming applications of calling this API, or modify the retry behaviour of this API call in the consumer by -setting keys prefixed by `KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_*` in the supplied configuration properties. +setting keys prefixed by `ConsumerConfigConstants.SHARD_GETITERATOR_*` in the supplied configuration properties. - *[GetRecords](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)*: this is constantly called by per shard consuming threads to fetch records from Kinesis. When a shard has multiple concurrent consumers (when there are any other non-Flink consuming applications running), the per shard rate limit may be exceeded. By default, on each call of this API, the consumer will retry if Kinesis complains that the data size / transaction limit for the API has exceeded, up to a default of 3 attempts. Users can either try to slow down other non-Flink consuming applications, or adjust the throughput -of the consumer by setting the `KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX` and -`KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the supplied configuration properties. Setting the former +of the consumer by setting the `ConsumerConfigConstants.SHARD_GETRECORDS_MAX` and +`ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the supplied configuration properties. Setting the former adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 100), while the latter modifies the sleep interval between each fetch (there will be no sleep by default). The retry behaviour of the -consumer when calling this API can also be modified by using the other keys prefixed by `KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_*`. +consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`. ### Kinesis Producer @@ -251,17 +243,12 @@ For the monitoring to work, the user accessing the stream needs access to the Cl
{% highlight java %} -Properties kinesisProducerConfig = new Properties(); -kinesisProducerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); -kinesisProducerConfig.put( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, - "aws_access_key_id_here"); -kinesisProducerConfig.put( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, - "aws_secret_key_here"); - -FlinkKinesisProducer kinesis = - new FlinkKinesisProducer<>(new SimpleStringSchema(), kinesisProducerConfig); +Properties producerConfig = new Properties(); +producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); +producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); kinesis.setFailOnError(true); kinesis.setDefaultStream("kinesis_stream_name"); kinesis.setDefaultPartition("0"); @@ -272,16 +259,12 @@ simpleStringStream.addSink(kinesis);
{% highlight scala %} -val kinesisProducerConfig = new Properties(); -kinesisProducerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); -kinesisProducerConfig.put( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, - "aws_access_key_id_here"); -kinesisProducerConfig.put( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, - "aws_secret_key_here"); - -val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, kinesisProducerConfig); +val producerConfig = new Properties(); +producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); +producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig); kinesis.setFailOnError(true); kinesis.setDefaultStream("kinesis_stream_name"); kinesis.setDefaultPartition("0"); @@ -299,46 +282,38 @@ Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSche done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream. Otherwise, the returned stream name is used. -Other optional configuration keys can be found in `KinesisConfigConstants`. +Other optional configuration keys for the producer can be found in `ProducerConfigConstants`. ### Using Non-AWS Kinesis Endpoints for Testing -It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as kinesalite; this is especially useful when performing functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overriden via a configuration property. +It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as +[Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink +application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property. -To override the AWS endpoint, set the `KinesisConfigConstants.CONFIG_AWS_ENDPOINT` property in the Flink configuration, in addition to the `KinesisConfigConstants.CONFIG_AWS_REGION` required by Flink. Although the region is required, it will not be used to determine the AWS endpoint URL. +To override the AWS endpoint, taking the producer for example, set the `ProducerConfigConstants.AWS_ENDPOINT` property in the +Flink configuration, in addition to the `ProducerConfigConstants.AWS_REGION` required by Flink. Although the region is +required, it will not be used to determine the AWS endpoint URL. -The following example shows how one might supply the `KinesisConfigConstants.CONFIG_AWS_ENDPOINT` configuration property: +The following example shows how one might supply the `ProducerConfigConstants.AWS_ENDPOINT` configuration property:
{% highlight java %} -Properties kinesisProducerConfig = new Properties(); -kinesisProducerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); -kinesisProducerConfig.put( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, - "aws_access_key_id_here"); -kinesisProducerConfig.put( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, - "aws_secret_key_here"); -kinesisProducerConfig.put( - KinesisConfigConstants.CONFIG_AWS_ENDPOINT, - "http://localhost:4567"); +Properties producerConfig = new Properties(); +producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); +producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567"); {% endhighlight %}
{% highlight scala %} -val kinesisProducerConfig = new Properties(); -kinesisProducerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); -kinesisProducerConfig.put( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, - "aws_access_key_id_here"); -kinesisProducerConfig.put( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, - "aws_secret_key_here"); -kinesisProducerConfig.put( - KinesisConfigConstants.CONFIG_AWS_ENDPOINT, - "http://localhost:4567"); +val producerConfig = new Properties(); +producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); +producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567"); {% endhighlight %}
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 25561c3b88bd1..7b1f836153a25 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -152,7 +152,7 @@ public FlinkKinesisConsumer(List streams, KinesisDeserializationSchema schema, Properties c this.configProps = checkNotNull(configProps, "configProps can not be null"); // check the configuration properties for any conflicting settings - KinesisConfigUtil.validateConfiguration(this.configProps); + KinesisConfigUtil.validateProducerConfiguration(this.configProps); ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema)); this.schema = schema; @@ -171,15 +170,15 @@ public void open(Configuration parameters) throws Exception { KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); - producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION)); producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); - if (configProps.containsKey(KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT)) { + if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) { producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps, - KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG)); + ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG)); } - if (configProps.containsKey(KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT)) { + if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) { producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps, - KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG)); + ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG)); } producer = new KinesisProducer(producerConfig); diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java new file mode 100644 index 0000000000000..cd4eda3f72843 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.config; + +import com.amazonaws.auth.AWSCredentialsProvider; + +/** + * Configuration keys for AWS service usage + */ +public class AWSConfigConstants { + + /** + * Possible configuration values for the type of credential provider to use when accessing AWS Kinesis. + * Internally, a corresponding implementation of {@link AWSCredentialsProvider} will be used. + */ + public enum CredentialProvider { + + /** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials */ + ENV_VAR, + + /** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials */ + SYS_PROP, + + /** Use a AWS credentials profile file to create the AWS credentials */ + PROFILE, + + /** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties */ + BASIC + } + + /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */ + public static final String AWS_REGION = "aws.region"; + + /** The AWS access key ID to use when setting credentials provider type to BASIC */ + public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid"; + + /** The AWS secret key to use when setting credentials provider type to BASIC */ + public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey"; + + /** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/ + public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider"; + + /** Optional configuration for profile path if credential provider type is set to be PROFILE */ + public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path"; + + /** Optional configuration for profile name if credential provider type is set to be PROFILE */ + public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name"; + + /** The AWS endpoint for Kinesis (derived from the AWS region setting if not set) */ + public static final String AWS_ENDPOINT = "aws.endpoint"; + +} diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java new file mode 100644 index 0000000000000..28ff3e49725c2 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.config; + +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; + +/** + * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer} + */ +public class ConsumerConfigConstants extends AWSConfigConstants { + + /** + * The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used + * when the consumer tasks retrieve the first shard iterator for each Kinesis shard. + */ + public enum InitialPosition { + + /** Start reading from the earliest possible record in the stream (excluding expired data records) */ + TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM), + + /** Start reading from the latest incoming record */ + LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM); + + private SentinelSequenceNumber sentinelSequenceNumber; + + InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) { + this.sentinelSequenceNumber = sentinelSequenceNumber; + } + + public SentinelSequenceNumber toSentinelSequenceNumber() { + return this.sentinelSequenceNumber; + } + } + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; + + /** The base backoff time between each describeStream attempt */ + public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; + + /** The maximum backoff time between each describeStream attempt */ + public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max"; + + /** The power constant for exponential backoff between each describeStream attempt */ + public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst"; + + /** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard */ + public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount"; + + /** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException */ + public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries"; + + /** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */ + public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base"; + + /** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */ + public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max"; + + /** The power constant for exponential backoff between each getRecords attempt */ + public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst"; + + /** The interval between each getRecords request to a AWS Kinesis shard in milliseconds */ + public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis"; + + /** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException */ + public static final String SHARD_GETITERATOR_RETRIES = "flink.shard.getiterator.maxretries"; + + /** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */ + public static final String SHARD_GETITERATOR_BACKOFF_BASE = "flink.shard.getiterator.backoff.base"; + + /** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */ + public static final String SHARD_GETITERATOR_BACKOFF_MAX = "flink.shard.getiterator.backoff.max"; + + /** The power constant for exponential backoff between each getShardIterator attempt */ + public static final String SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getiterator.backoff.expconst"; + + /** The interval between each attempt to discover new shards */ + public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis"; + + // ------------------------------------------------------------------------ + // Default values for consumer configuration + // ------------------------------------------------------------------------ + + public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString(); + + public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L; + + public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L; + + public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; + + public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100; + + public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3; + + public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L; + + public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L; + + public static final double DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; + + public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0L; + + public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3; + + public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L; + + public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L; + + public static final double DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; + + public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L; + +} diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/CredentialProviderType.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/CredentialProviderType.java deleted file mode 100644 index d0561281c590e..0000000000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/CredentialProviderType.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.config; - -import com.amazonaws.auth.AWSCredentialsProvider; - -/** - * Possible configuration values for the type of credential provider to use when accessing AWS Kinesis. - * Internally, a corresponding implementation of {@link AWSCredentialsProvider} will be used. - */ -public enum CredentialProviderType { - - /** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_KEY to create AWS credentials */ - ENV_VAR, - - /** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials */ - SYS_PROP, - - /** Use a AWS credentials profile file to create the AWS credentials */ - PROFILE, - - /** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties */ - BASIC -} diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java deleted file mode 100644 index 1090ee734d7b5..0000000000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.config; - -/** - * Keys and default values used to configure the Kinesis consumer. - */ -public class KinesisConfigConstants { - - // ------------------------------------------------------------------------ - // Configuration Keys - // ------------------------------------------------------------------------ - - /** The base backoff time between each describeStream attempt */ - public static final String CONFIG_STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; - - /** The maximum backoff time between each describeStream attempt */ - public static final String CONFIG_STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max"; - - /** The power constant for exponential backoff between each describeStream attempt */ - public static final String CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst"; - - /** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard */ - public static final String CONFIG_SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount"; - - /** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException */ - public static final String CONFIG_SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries"; - - /** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */ - public static final String CONFIG_SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base"; - - /** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */ - public static final String CONFIG_SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max"; - - /** The power constant for exponential backoff between each getRecords attempt */ - public static final String CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst"; - - /** The interval between each getRecords request to a AWS Kinesis shard in milliseconds */ - public static final String CONFIG_SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis"; - - /** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException */ - public static final String CONFIG_SHARD_GETITERATOR_RETRIES = "flink.shard.getiterator.maxretries"; - - /** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */ - public static final String CONFIG_SHARD_GETITERATOR_BACKOFF_BASE = "flink.shard.getiterator.backoff.base"; - - /** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */ - public static final String CONFIG_SHARD_GETITERATOR_BACKOFF_MAX = "flink.shard.getiterator.backoff.max"; - - /** The power constant for exponential backoff between each getShardIterator attempt */ - public static final String CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getiterator.backoff.expconst"; - - /** The interval between each attempt to discover new shards */ - public static final String CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis"; - - /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ - public static final String CONFIG_STREAM_INIT_POSITION_TYPE = "flink.stream.initpos.type"; - - /** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/ - public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE = "aws.credentials.provider"; - - /** The AWS access key ID to use when setting credentials provider type to BASIC */ - public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID = "aws.credentials.provider.basic.accesskeyid"; - - /** The AWS secret key to use when setting credentials provider type to BASIC */ - public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY = "aws.credentials.provider.basic.secretkey"; - - /** Optional configuration for profile path if credential provider type is set to be PROFILE */ - public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_PATH = "aws.credentials.provider.profile.path"; - - /** Optional configuration for profile name if credential provider type is set to be PROFILE */ - public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_NAME = "aws.credentials.provider.profile.name"; - - /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */ - public static final String CONFIG_AWS_REGION = "aws.region"; - - /** The AWS endpoint for Kinesis (derived from the AWS region setting if not set) */ - public static final String CONFIG_AWS_ENDPOINT = "aws.endpoint"; - - /** Maximum number of items to pack into an PutRecords request. **/ - public static final String CONFIG_PRODUCER_COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount"; - - /** Maximum number of items to pack into an aggregated record. **/ - public static final String CONFIG_PRODUCER_AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount"; - - - // ------------------------------------------------------------------------ - // Default configuration values - // ------------------------------------------------------------------------ - - public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L; - - public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L; - - public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; - - public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100; - - public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3; - - public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L; - - public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L; - - public static final double DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; - - public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0; - - public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3; - - public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L; - - public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L; - - public static final double DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; - - public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L; - -} diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/InitialPosition.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java similarity index 62% rename from flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/InitialPosition.java rename to flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java index 1b3cd10e26354..1edddfcfc9a7e 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/InitialPosition.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java @@ -17,17 +17,17 @@ package org.apache.flink.streaming.connectors.kinesis.config; -import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; /** - * The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used - * when the consumer tasks retrieve the first shard iterator for each Kinesis shard. + * Optional producer specific configuration keys for {@link FlinkKinesisProducer} */ -public enum InitialPosition { +public class ProducerConfigConstants extends AWSConfigConstants { - /** Start reading from the earliest possible record in the stream (excluding expired data records) */ - TRIM_HORIZON, + /** Maximum number of items to pack into an PutRecords request. **/ + public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount"; + + /** Maximum number of items to pack into an aggregated record. **/ + public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount"; - /** Start reading from the latest incoming record */ - LATEST } diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java index 8bae66587e23f..55668c697922f 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java @@ -20,7 +20,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import java.util.Properties; @@ -37,9 +37,9 @@ public static void main(String[] args) throws Exception { see.setParallelism(1); Properties kinesisConsumerConfig = new Properties(); - kinesisConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region")); - kinesisConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, pt.getRequired("accesskey")); - kinesisConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, pt.getRequired("secretkey")); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region")); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accesskey")); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretkey")); DataStream kinesis = see.addSource(new FlinkKinesisConsumer<>( "flink-test", diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java index 258f8ccdcf737..d1781375f4471 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java @@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import java.util.Properties; @@ -41,9 +41,9 @@ public static void main(String[] args) throws Exception { DataStream simpleStringStream = see.addSource(new EventsGenerator()); Properties kinesisProducerConfig = new Properties(); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region")); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, pt.getRequired("accessKey")); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, pt.getRequired("secretKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>( new SimpleStringSchema(), kinesisProducerConfig); diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index cb5812ecb09bc..d83ab06566319 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -20,7 +20,8 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; @@ -29,7 +30,6 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; -import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -242,9 +242,12 @@ public void runFetcher() throws Exception { // we are starting fresh (not restoring from a checkpoint); when we are starting fresh, this simply means // all existing shards of streams we are subscribing to are new shards; when we are restoring from checkpoint, // any new shards due to Kinesis resharding from the time of the checkpoint will be considered new shards. + InitialPosition initialPosition = InitialPosition.valueOf(configProps.getProperty( + ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); + SentinelSequenceNumber startingStateForNewShard = (isRestoredFromFailure) ? SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM - : KinesisConfigUtil.getInitialPositionAsSentinelSequenceNumber(configProps); + : initialPosition.toSentinelSequenceNumber(); if (LOG.isInfoEnabled()) { String logFormat = (isRestoredFromFailure) @@ -305,8 +308,8 @@ public void runFetcher() throws Exception { final long discoveryIntervalMillis = Long.valueOf( configProps.getProperty( - KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS, - Long.toString(KinesisConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS))); + ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS))); while (running) { if (LOG.isDebugEnabled()) { diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 3f0d2bb32c633..494f5deaf9f3c 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -22,7 +22,7 @@ import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.ShardIteratorType; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; @@ -93,11 +93,11 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, Properties consumerConfig = fetcherRef.getConsumerConfiguration(); this.kinesis = kinesis; this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty( - KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX, - Integer.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX))); + ConsumerConfigConstants.SHARD_GETRECORDS_MAX, + Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX))); this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( - KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_INTERVAL_MILLIS, - Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); + ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); } @SuppressWarnings("unchecked") diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index 22f667e845164..906689f9e211f 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -28,7 +28,7 @@ import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.Shard; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; import org.slf4j.Logger; @@ -119,50 +119,50 @@ private KinesisProxy(Properties configProps) { this.describeStreamBaseBackoffMillis = Long.valueOf( configProps.getProperty( - KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_BASE, - Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE))); + ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, + Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE))); this.describeStreamMaxBackoffMillis = Long.valueOf( configProps.getProperty( - KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_MAX, - Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX))); + ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, + Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX))); this.describeStreamExpConstant = Double.valueOf( configProps.getProperty( - KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, - Double.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT))); + ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, + Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT))); this.getRecordsBaseBackoffMillis = Long.valueOf( configProps.getProperty( - KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_BASE, - Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE))); + ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE))); this.getRecordsMaxBackoffMillis = Long.valueOf( configProps.getProperty( - KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_MAX, - Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX))); + ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX))); this.getRecordsExpConstant = Double.valueOf( configProps.getProperty( - KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, - Double.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT))); + ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, + Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT))); this.getRecordsMaxAttempts = Integer.valueOf( configProps.getProperty( - KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_RETRIES, - Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES))); + ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES))); this.getShardIteratorBaseBackoffMillis = Long.valueOf( configProps.getProperty( - KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_BASE, - Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE))); + ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE))); this.getShardIteratorMaxBackoffMillis = Long.valueOf( configProps.getProperty( - KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_MAX, - Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX))); + ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX))); this.getShardIteratorExpConstant = Double.valueOf( configProps.getProperty( - KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, - Double.toString(KinesisConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT))); + ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, + Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT))); this.getShardIteratorMaxAttempts = Integer.valueOf( configProps.getProperty( - KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_RETRIES, - Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES))); + ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES))); } diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java index 6446eef6dc26a..72fbdcb7352b6 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java @@ -29,8 +29,8 @@ import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesis.AmazonKinesisClient; import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; -import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider; import java.util.Properties; @@ -52,9 +52,9 @@ public static AmazonKinesisClient createKinesisClient(Properties configProps) { AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials(), awsClientConfig); - client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)))); - if (configProps.containsKey(KinesisConfigConstants.CONFIG_AWS_ENDPOINT)) { - client.setEndpoint(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_ENDPOINT)); + client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)))); + if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) { + client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT)); } return client; } @@ -66,8 +66,8 @@ public static AmazonKinesisClient createKinesisClient(Properties configProps) { * @return The corresponding AWS Credentials Provider instance */ public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) { - CredentialProviderType credentialProviderType = CredentialProviderType.valueOf(configProps.getProperty( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, CredentialProviderType.BASIC.toString())); + CredentialProvider credentialProviderType = CredentialProvider.valueOf(configProps.getProperty( + AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, CredentialProvider.BASIC.toString())); AWSCredentialsProvider credentialsProvider; @@ -80,9 +80,9 @@ public static AWSCredentialsProvider getCredentialsProvider(final Properties con break; case PROFILE: String profileName = configProps.getProperty( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_NAME, null); + AWSConfigConstants.AWS_PROFILE_NAME, null); String profileConfigPath = configProps.getProperty( - KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_PATH, null); + AWSConfigConstants.AWS_PROFILE_PATH, null); credentialsProvider = (profileConfigPath == null) ? new ProfileCredentialsProvider(profileName) : new ProfileCredentialsProvider(profileConfigPath, profileName); @@ -93,8 +93,8 @@ public static AWSCredentialsProvider getCredentialsProvider(final Properties con @Override public AWSCredentials getCredentials() { return new BasicAWSCredentials( - configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID), - configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY)); + configProps.getProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID), + configProps.getProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)); } @Override diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index cfc3b9c528c25..d9d553b56f1bf 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -18,10 +18,13 @@ package org.apache.flink.streaming.connectors.kinesis.util; import com.amazonaws.regions.Regions; -import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; -import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; -import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import java.util.Properties; @@ -33,59 +36,15 @@ public class KinesisConfigUtil { /** - * Checks that the values specified for config keys in the properties config is recognizable. + * Validate configuration properties for {@link FlinkKinesisConsumer}. */ - public static void validateConfiguration(Properties config) { + public static void validateConsumerConfiguration(Properties config) { checkNotNull(config, "config can not be null"); - if (!config.containsKey(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE)) { - // if the credential provider type is not specified, it will default to BASIC later on, - // so the Access Key ID and Secret Key must be given - if (!config.containsKey(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID) - || !config.containsKey(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY)) { - throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID+"') " + - "and Secret Key ('" + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY + "') when using the BASIC AWS credential provider type."); - } - } else { - String credentialsProviderType = config.getProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE); - - // value specified for KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE needs to be recognizable - CredentialProviderType providerType; - try { - providerType = CredentialProviderType.valueOf(credentialsProviderType); - } catch (IllegalArgumentException e) { - StringBuilder sb = new StringBuilder(); - for (CredentialProviderType type : CredentialProviderType.values()) { - sb.append(type.toString()).append(", "); - } - throw new IllegalArgumentException("Invalid AWS Credential Provider Type set in config. Valid values are: " + sb.toString()); - } - - // if BASIC type is used, also check that the Access Key ID and Secret Key is supplied - if (providerType == CredentialProviderType.BASIC) { - if (!config.containsKey(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID) - || !config.containsKey(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY)) { - throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID+"') " + - "and Secret Key ('" + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY + "') when using the BASIC AWS credential provider type."); - } - } - } + validateAwsConfiguration(config); - if (!config.containsKey(KinesisConfigConstants.CONFIG_AWS_REGION)) { - throw new IllegalArgumentException("The AWS region ('" + KinesisConfigConstants.CONFIG_AWS_REGION + "') must be set in the config."); - } else { - // specified AWS Region name must be recognizable - if (!AWSUtil.isValidRegion(config.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION))) { - StringBuilder sb = new StringBuilder(); - for (Regions region : Regions.values()) { - sb.append(region.getName()).append(", "); - } - throw new IllegalArgumentException("Invalid AWS region set in config. Valid values are: " + sb.toString()); - } - } - - if (config.containsKey(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE)) { - String initPosType = config.getProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE); + if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) { + String initPosType = config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION); // specified initial position in stream must be either LATEST or TRIM_HORIZON try { @@ -99,65 +58,112 @@ public static void validateConfiguration(Properties config) { } } - validateOptionalPositiveIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX, + validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "Invalid value given for maximum records per getRecords shard operation. Must be a valid non-negative integer value."); - validateOptionalPositiveIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_RETRIES, + validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value."); - validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_BASE, - "Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value."); + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, + "Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value"); - validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_MAX, - "Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value."); + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, + "Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value"); - validateOptionalPositiveDoubleProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, - "Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value."); + validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, + "Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value"); - validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_INTERVAL_MILLIS, + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative long value."); - validateOptionalPositiveIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_RETRIES, + validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value."); - validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_BASE, - "Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value."); + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, + "Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value"); - validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_MAX, - "Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value."); + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, + "Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value"); - validateOptionalPositiveDoubleProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, - "Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value."); + validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, + "Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value"); - validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS, - "Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value."); + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, + "Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value"); - validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_BASE, - "Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value."); + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, + "Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value"); + + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, + "Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value"); + + validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, + "Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value"); + } - validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_MAX, - "Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value."); + /** + * Validate configuration properties for {@link FlinkKinesisProducer}. + */ + public static void validateProducerConfiguration(Properties config) { + checkNotNull(config, "config can not be null"); - validateOptionalPositiveDoubleProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, - "Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value."); + validateAwsConfiguration(config); - validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT, + validateOptionalPositiveLongProperty(config, ProducerConfigConstants.COLLECTION_MAX_COUNT, "Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid non-negative long value."); - validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT, + validateOptionalPositiveLongProperty(config, ProducerConfigConstants.AGGREGATION_MAX_COUNT, "Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid non-negative long value."); } - public static SentinelSequenceNumber getInitialPositionAsSentinelSequenceNumber(Properties config) { - InitialPosition initialPosition = InitialPosition.valueOf(config.getProperty( - KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, InitialPosition.LATEST.toString())); + /** + * Validate configuration properties related to Amazon AWS service + */ + public static void validateAwsConfiguration(Properties config) { + if (!config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) { + // if the credential provider type is not specified, it will default to BASIC later on, + // so the Access Key ID and Secret Key must be given + if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID) + || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) { + throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + + "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); + } + } else { + String credentialsProviderType = config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER); + + // value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable + CredentialProvider providerType; + try { + providerType = CredentialProvider.valueOf(credentialsProviderType); + } catch (IllegalArgumentException e) { + StringBuilder sb = new StringBuilder(); + for (CredentialProvider type : CredentialProvider.values()) { + sb.append(type.toString()).append(", "); + } + throw new IllegalArgumentException("Invalid AWS Credential Provider Type set in config. Valid values are: " + sb.toString()); + } + + // if BASIC type is used, also check that the Access Key ID and Secret Key is supplied + if (providerType == CredentialProvider.BASIC) { + if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID) + || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) { + throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + + "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); + } + } + } - switch (initialPosition) { - case TRIM_HORIZON: - return SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM; - case LATEST: - default: - return SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM; + if (!config.containsKey(AWSConfigConstants.AWS_REGION)) { + throw new IllegalArgumentException("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config."); + } else { + // specified AWS Region name must be recognizable + if (!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) { + StringBuilder sb = new StringBuilder(); + for (Regions region : Regions.values()) { + sb.append(region.getName()).append(", "); + } + throw new IllegalArgumentException("Invalid AWS region set in config. Valid values are: " + sb.toString()); + } } } diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 33c6c3634ee54..e80b71dde934b 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -20,7 +20,9 @@ import com.amazonaws.services.kinesis.model.Shard; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; @@ -56,19 +58,19 @@ public class FlinkKinesisConsumerTest { private ExpectedException exception = ExpectedException.none(); // ---------------------------------------------------------------------- - // FlinkKinesisConsumer.validatePropertiesConfig() tests + // FlinkKinesisConsumer.validateAwsConfiguration() tests // ---------------------------------------------------------------------- @Test public void testMissingAwsRegionInConfig() { exception.expect(IllegalArgumentException.class); - exception.expectMessage("The AWS region ('" + KinesisConfigConstants.CONFIG_AWS_REGION + "') must be set in the config."); + exception.expectMessage("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config."); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateAwsConfiguration(testConfig); } @Test @@ -77,36 +79,36 @@ public void testUnrecognizableAwsRegionInConfig() { exception.expectMessage("Invalid AWS region"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "wrongRegionId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateAwsConfiguration(testConfig); } @Test public void testCredentialProviderTypeDefaultToBasicButNoCredentialsSetInConfig() { exception.expect(IllegalArgumentException.class); - exception.expectMessage("Please set values for AWS Access Key ID ('"+KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID+"') " + - "and Secret Key ('" + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY + "') when using the BASIC AWS credential provider type."); + exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + + "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateAwsConfiguration(testConfig); } @Test public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() { exception.expect(IllegalArgumentException.class); - exception.expectMessage("Please set values for AWS Access Key ID ('"+KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID+"') " + - "and Secret Key ('" + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY + "') when using the BASIC AWS credential provider type."); + exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + + "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, "BASIC"); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateAwsConfiguration(testConfig); } @Test @@ -115,27 +117,31 @@ public void testUnrecognizableCredentialProviderTypeInConfig() { exception.expectMessage("Invalid AWS Credential Provider Type"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, "wrongProviderType"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateAwsConfiguration(testConfig); } + // ---------------------------------------------------------------------- + // FlinkKinesisConsumer.validateConsumerConfiguration() tests + // ---------------------------------------------------------------------- + @Test public void testUnrecognizableStreamInitPositionTypeInConfig() { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid initial position in stream"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, "BASIC"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "wrongInitPosition"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -144,12 +150,12 @@ public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() { exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -158,12 +164,12 @@ public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() { exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -172,12 +178,12 @@ public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInCon exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -186,12 +192,12 @@ public void testUnparsableIntForGetRecordsRetriesInConfig() { exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_RETRIES, "unparsableInt"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -200,12 +206,12 @@ public void testUnparsableIntForGetRecordsMaxCountInConfig() { exception.expectMessage("Invalid value given for maximum records per getRecords shard operation"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX, "unparsableInt"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -214,12 +220,12 @@ public void testUnparsableLongForGetRecordsBackoffBaseMillisInConfig() { exception.expectMessage("Invalid value given for get records operation base backoff milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -228,12 +234,12 @@ public void testUnparsableLongForGetRecordsBackoffMaxMillisInConfig() { exception.expectMessage("Invalid value given for get records operation max backoff milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -242,12 +248,12 @@ public void testUnparsableDoubleForGetRecordsBackoffExponentialConstantInConfig( exception.expectMessage("Invalid value given for get records operation backoff exponential constant"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -256,12 +262,12 @@ public void testUnparsableLongForGetRecordsIntervalMillisInConfig() { exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateProducerConfiguration(testConfig); } @Test @@ -270,12 +276,12 @@ public void testUnparsableIntForGetShardIteratorRetriesInConfig() { exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_RETRIES, "unparsableInt"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -284,12 +290,12 @@ public void testUnparsableLongForGetShardIteratorBackoffBaseMillisInConfig() { exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -298,12 +304,12 @@ public void testUnparsableLongForGetShardIteratorBackoffMaxMillisInConfig() { exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -312,12 +318,12 @@ public void testUnparsableDoubleForGetShardIteratorBackoffExponentialConstantInC exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -326,12 +332,44 @@ public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() { exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + // ---------------------------------------------------------------------- + // FlinkKinesisConsumer.validateProducerConfiguration() tests + // ---------------------------------------------------------------------- + + @Test + public void testUnparsableLongForCollectionMaxCountInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for maximum number of items to pack into a PutRecords request"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "unparsableLong"); + + KinesisConfigUtil.validateProducerConfiguration(testConfig); + } + + @Test + public void testUnparsableLongForAggregationMaxCountInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for maximum number of items to pack into an aggregated record"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateProducerConfiguration(testConfig); } // ---------------------------------------------------------------------- @@ -341,9 +379,9 @@ public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() { @Test public void testSnapshotStateShouldBeNullIfSourceNotOpened() throws Exception { Properties config = new Properties(); - config.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); @@ -353,9 +391,9 @@ public void testSnapshotStateShouldBeNullIfSourceNotOpened() throws Exception { @Test public void testSnapshotStateShouldBeNullIfSourceNotRun() throws Exception { Properties config = new Properties(); - config.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); consumer.open(new Configuration()); // only opened, not run diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java index 7239c3706ee99..6e02a5590d3b6 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java @@ -23,7 +23,8 @@ import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; @@ -53,9 +54,9 @@ public static void main(String[] args) throws Exception { DataStream simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator()); Properties kinesisProducerConfig = new Properties(); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region")); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, pt.getRequired("accessKey")); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, pt.getRequired("secretKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>( new KinesisSerializationSchema() { @@ -91,9 +92,9 @@ public String getPartitionId(String element) { // consuming topology Properties consumerProps = new Properties(); - consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, pt.getRequired("accessKey")); - consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, pt.getRequired("secretKey")); - consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region")); + consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); + consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); + consumerProps.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region")); DataStream consuming = see.addSource(new FlinkKinesisConsumer<>("test-flink", new SimpleStringSchema(), consumerProps)); // validate consumed records for correctness consuming.flatMap(new FlatMapFunction() { diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java index 409124c6f7e60..37059437c39c2 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java @@ -21,7 +21,7 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; @@ -57,9 +57,9 @@ public static void main(String[] args) throws Exception { final String region = pt.getRequired("region"); Properties configProps = new Properties(); - configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, accessKey); - configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, secretKey); - configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, region); + configProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, accessKey); + configProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey); + configProps.setProperty(AWSConfigConstants.AWS_REGION, region); AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps); // create a stream for the test: diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java index 7bcc8064ea52e..934a7955735fe 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java @@ -27,7 +27,7 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; @@ -68,10 +68,10 @@ public static void main(String[] args) throws Exception { final String region = pt.getRequired("region"); final Properties configProps = new Properties(); - configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, accessKey); - configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, secretKey); - configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, region); - configProps.setProperty(KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS, "0"); + configProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, accessKey); + configProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey); + configProps.setProperty(ConsumerConfigConstants.AWS_REGION, region); + configProps.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "0"); final AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps); // the stream is first created with 1 shard diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java index 325686814c03e..35e9ef6c0c2bf 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java @@ -21,7 +21,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; @@ -51,9 +51,9 @@ public static void main(String[] args) throws Exception { DataStream simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator()); Properties kinesisProducerConfig = new Properties(); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region")); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, pt.getRequired("accessKey")); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, pt.getRequired("secretKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>( new KinesisSerializationSchema() { diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java index 911710f8f3b29..157964c505580 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java @@ -25,8 +25,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; -import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; @@ -71,11 +70,11 @@ public void run() { // consuming topology Properties consumerProps = new Properties(); - consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, awsAccessKey); - consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, awsSecretKey); - consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, awsRegion); + consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey); + consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretKey); + consumerProps.setProperty(ConsumerConfigConstants.AWS_REGION, awsRegion); // start reading from beginning - consumerProps.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, InitialPosition.TRIM_HORIZON.name()); + consumerProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name()); DataStream consuming = see.addSource(new FlinkKinesisConsumer<>(kinesisStreamName, new SimpleStringSchema(), consumerProps)); consuming .flatMap(new ArtificialFailOnceFlatMapper(failAtRecordCount)) diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java index 696d9ca2b68d8..fdfdfe1e870b5 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java @@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,9 +58,9 @@ public void run() { DataStream simpleStringStream = see.addSource(new KinesisEventsGeneratorProducerThread.EventsGenerator(totalEventCount)).setParallelism(1); Properties producerProps = new Properties(); - producerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, awsAccessKey); - producerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, awsSecretKey); - producerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, awsRegion); + producerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey); + producerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretKey); + producerProps.setProperty(AWSConfigConstants.AWS_REGION, awsRegion); FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerProps); From 0cbcebb97b5441fc79bc26d07ea3ca3a826f3c1b Mon Sep 17 00:00:00 2001 From: Gordon Tai Date: Wed, 13 Jul 2016 00:30:33 +0800 Subject: [PATCH 2/2] [FLINK-4170] Fix failing test --- .../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index e80b71dde934b..dbf95f9eb2d3f 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -267,7 +267,7 @@ public void testUnparsableLongForGetRecordsIntervalMillisInConfig() { testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong"); - KinesisConfigUtil.validateProducerConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test