From e212370741d93454100aa442d309a68bb42f7052 Mon Sep 17 00:00:00 2001 From: Otavio Rodolfo Piske Date: Mon, 25 Jan 2021 12:47:16 +0100 Subject: [PATCH] Added AWS v2 IAM sink test case --- tests/itests-aws-v2/pom.xml | 5 + .../iam/sink/CamelAWSIAMPropertyFactory.java | 73 +++++++++ .../v2/iam/sink/CamelSinkAWSIAMITCase.java | 140 ++++++++++++++++++ .../aws/v2/iam/sink/TestIAMConfiguration.java | 35 +++++ 4 files changed, 253 insertions(+) create mode 100644 tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelAWSIAMPropertyFactory.java create mode 100644 tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java create mode 100644 tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/TestIAMConfiguration.java diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml index 8669c50773..48f40f9805 100644 --- a/tests/itests-aws-v2/pom.xml +++ b/tests/itests-aws-v2/pom.xml @@ -78,6 +78,11 @@ camel-aws2-ec2 + + org.apache.camel + camel-aws2-iam + + diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelAWSIAMPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelAWSIAMPropertyFactory.java new file mode 100644 index 0000000000..1bfc871d7d --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelAWSIAMPropertyFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.iam.sink; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils; +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; +import org.apache.camel.test.infra.aws.common.AWSConfigs; + +public class CamelAWSIAMPropertyFactory extends SinkConnectorPropertyFactory { + public static final Map SPRING_STYLE = new HashMap<>(); + public static final Map KAFKA_STYLE = new HashMap<>(); + + static { + SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-iam.accessKey"); + SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-iam.secretKey"); + SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-iam.region"); + + KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-iam.access-key"); + KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-iam.secret-key"); + KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-iam.region"); + } + + public CamelAWSIAMPropertyFactory withSinkPathLabel(String value) { + return setProperty("camel.sink.path.label", value); + } + + public CamelAWSIAMPropertyFactory withSinkEndpointOperation(String value) { + return setProperty("camel.sink.endpoint.operation", value); + } + + public CamelAWSIAMPropertyFactory withConfiguration(String value) { + return setProperty("camel.component.aws2-iam.configuration", classRef(value)); + } + + public CamelAWSIAMPropertyFactory withAmazonConfig(Properties amazonConfigs) { + return withAmazonConfig(amazonConfigs, this.SPRING_STYLE); + } + + public CamelAWSIAMPropertyFactory withAmazonConfig(Properties amazonConfigs, Map style) { + AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this); + + return this; + } + + public static CamelAWSIAMPropertyFactory basic() { + return new CamelAWSIAMPropertyFactory() + .withTasksMax(1) + .withName("CamelAws2iamSinkConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.aws2iam.CamelAws2iamSinkConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + + } +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java new file mode 100644 index 0000000000..3627addfd4 --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.iam.sink; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.kafkaconnector.CamelSinkTask; +import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport; +import org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguration; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.test.infra.aws.common.services.AWSService; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; +import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.iam.IamClient; +import software.amazon.awssdk.services.iam.model.ListUsersResponse; +import software.amazon.awssdk.services.iam.model.User; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class CamelSinkAWSIAMITCase extends CamelSinkAWSTestSupport { + @RegisterExtension + public static AWSService awsService = AWSServiceFactory.createIAMService(); + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSIAMITCase.class); + + private IamClient client; + private String logicalName; + + private volatile int received; + private final int expect = 10; + + @Override + protected Map messageHeaders(String text, int current) { + Map headers = new HashMap<>(); + + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsIAMUsername", + "username-" + current); + + return headers; + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + while (true) { + ListUsersResponse response = client.listUsers(); + + List users = response.users(); + + received = users.size(); + for (User user : users) { + LOG.info("Received user: {}", user.userName()); + + if (received >= expect) { + return; + } + } + + if (!waitForData()) { + return; + } + } + } finally { + latch.countDown(); + } + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(110, TimeUnit.SECONDS)) { + assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect); + } else { + fail(String.format("Failed to receive the messages within the specified time: received %d of %d", + received, expect)); + } + } + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-aws2-iam-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + client = AWSSDKClientUtils.newIAMClient(); + logicalName = "iam-" + TestUtils.randomWithRange(1, 100); + + received = 0; + } + + @Test + @Timeout(90) + public void testBasicSendReceive() { + try { + Properties amazonProperties = awsService.getConnectionProperties(); + String topicName = TestUtils.getDefaultTestTopic(this.getClass()); + + ConnectorPropertyFactory testProperties = CamelAWSIAMPropertyFactory + .basic() + .withTopics(topicName) + .withConfiguration(TestCloudWatchConfiguration.class.getName()) + .withAmazonConfig(amazonProperties) + .withSinkPathLabel(logicalName) + .withConfiguration(TestIAMConfiguration.class.getName()) + .withSinkEndpointOperation("createUser"); + + runTest(testProperties, topicName, expect); + } catch (Exception e) { + LOG.error("Amazon IAM test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/TestIAMConfiguration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/TestIAMConfiguration.java new file mode 100644 index 0000000000..55c26cce0e --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/TestIAMConfiguration.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.iam.sink; + +import org.apache.camel.component.aws2.iam.IAM2Configuration; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; +import software.amazon.awssdk.services.iam.IamClient; + +public class TestIAMConfiguration extends IAM2Configuration { + private IamClient client; + + @Override + public IamClient getIamClient() { + if (client == null) { + client = AWSSDKClientUtils.newIAMClient(); + } + + return client; + } +}