From 8523d4bb283edf6974fc9b5ca58ed381459f1a11 Mon Sep 17 00:00:00 2001 From: eskabetxe Date: Wed, 13 Dec 2017 12:43:27 +0100 Subject: [PATCH 1/2] FLINK-8249: setting region on kinesisProducerConfiguration and testig --- .../connectors/kinesis/util/KinesisConfigUtil.java | 1 + .../connectors/kinesis/util/KinesisConfigUtilTest.java | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index cadde8da3ae29..6c91206044883 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -191,6 +191,7 @@ public static KinesisProducerConfiguration getValidatedProducerConfiguration(Pro } KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config); + kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION)); kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config)); diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java index b52dce2cb7bf6..2eddc612809b9 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java @@ -131,6 +131,16 @@ public void testReplaceDeprecatedKeys() { assertEquals("2", replacedConfig.getProperty(KinesisConfigUtil.COLLECTION_MAX_COUNT)); } + @Test + public void testCorrectlySetRegionInProducerConfiguration() { + String region = "us-east-1"; + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, region); + KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); + + assertEquals("region not expected", region, kpc.getRegion()); + } + // ---------------------------------------------------------------------- // validateAwsConfiguration() tests // ---------------------------------------------------------------------- From 72be9da2048d4e2cb6c855ce16c9652556e1f762 Mon Sep 17 00:00:00 2001 From: eskabetxe Date: Wed, 13 Dec 2017 13:17:38 +0100 Subject: [PATCH 2/2] change test message --- .../connectors/kinesis/util/KinesisConfigUtilTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java index 2eddc612809b9..053b0c3bfa4e8 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java @@ -138,7 +138,7 @@ public void testCorrectlySetRegionInProducerConfiguration() { testConfig.setProperty(AWSConfigConstants.AWS_REGION, region); KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); - assertEquals("region not expected", region, kpc.getRegion()); + assertEquals("incorrect region", region, kpc.getRegion()); } // ----------------------------------------------------------------------