Skip to content

Commit

Permalink
Added AWS v2 SNS sink integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
orpiske committed Feb 15, 2021
1 parent e783715 commit 3cf956d
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 0 deletions.
5 changes: 5 additions & 0 deletions tests/itests-aws-v2/pom.xml
Expand Up @@ -87,6 +87,11 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws2-kms</artifactId>
</dependency>

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws2-sns</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.aws.v2.sns.sink;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import software.amazon.awssdk.regions.Region;

/**
* Creates the set of properties used by a Camel JMS Sink Connector
*/
final class CamelAWSSNSPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSSNSPropertyFactory> {
public static final Map<String, String> SPRING_STYLE = new HashMap<>();
public static final Map<String, String> KAFKA_STYLE = new HashMap<>();

static {
SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-sns.accessKey");
SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-sns.secretKey");
SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-sns.region");

KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-sns.access-key");
KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-sns.secret-key");
KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-sns.region");
}

private CamelAWSSNSPropertyFactory() {
}

public EndpointUrlBuilder<CamelAWSSNSPropertyFactory> withUrl(String topicOrArn) {
String sinkUrl = String.format("aws2-sns:%s", topicOrArn);

return new EndpointUrlBuilder<>(this::withSinkUrl, sinkUrl);
}

public CamelAWSSNSPropertyFactory withTopicOrArn(String topicOrArn) {
return setProperty("camel.sink.path.topicNameOrArn", topicOrArn);
}

public CamelAWSSNSPropertyFactory withSubscribeSNStoSQS(String queue) {
return setProperty("camel.sink.endpoint.subscribeSNStoSQS", "true").setProperty("camel.sink.endpoint.queueUrl",
queue);
}

public CamelAWSSNSPropertyFactory withAmazonConfig(Properties amazonConfigs) {
return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
}

public CamelAWSSNSPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY);
String secretKeyKey = style.get(AWSConfigs.SECRET_KEY);
String regionKey = style.get(AWSConfigs.REGION);

setProperty(accessKeyKey, amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
setProperty(secretKeyKey, amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));
return setProperty(regionKey, amazonConfigs.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id()));
}

public CamelAWSSNSPropertyFactory withConfiguration(String configurationClass) {
return setProperty("camel.component.aws2-sns.configuration", classRef(configurationClass));
}

public static CamelAWSSNSPropertyFactory basic() {
return new CamelAWSSNSPropertyFactory().withName("CamelAWS2SNSSinkConnector")
.withTasksMax(1)
.withConnectorClass("org.apache.camel.kafkaconnector.aws2sns.CamelAws2snsSinkConnector")
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
.withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
}
}
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.aws.v2.sns.sink;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.model.Message;

import static org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory.classRef;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport {
@RegisterExtension
public static AWSService service = AWSServiceFactory.createSNSService();

private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSNSITCase.class);

private AWSSQSClient awsSqsClient;
private String sqsQueueUrl;
private String queueName;

private volatile int received;
private final int expect = 10;

@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-aws2-sns-kafka-connector"};
}

@BeforeEach
public void setUp() {
awsSqsClient = new AWSSQSClient(AWSSDKClientUtils.newSQSClient());

queueName = AWSCommon.DEFAULT_SQS_QUEUE_FOR_SNS + "-" + TestUtils.randomWithRange(0, 1000);
sqsQueueUrl = awsSqsClient.createQueue(queueName);

LOG.info("Created SQS queue {}", sqsQueueUrl);

received = 0;
}

@Override
protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
if (latch.await(120, TimeUnit.SECONDS)) {
assertEquals(expect, received,
"Didn't process the expected amount of messages: " + received + " != " + expect);
} else {
fail("Failed to receive the messages within the specified time");
}
}

private boolean checkMessages(List<Message> messages) {
for (Message message : messages) {
LOG.info("Received: {}", message.body());

received++;
}

if (received == expect) {
return false;
}

return true;
}

protected void consumeMessages(CountDownLatch latch) {
try {
awsSqsClient.receive(sqsQueueUrl, this::checkMessages);
} catch (Throwable t) {
LOG.error("Failed to consume messages: {}", t.getMessage(), t);
fail(t.getMessage());
} finally {
latch.countDown();
}
}

@Test
@Timeout(value = 90)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = service.getConnectionProperties();
String topicName = getTopicForTest(this.getClass());

ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory
.basic()
.withName("CamelAWSSNSSinkConnectorDefault")
.withTopics(topicName)
.withTopicOrArn(queueName)
.withSubscribeSNStoSQS(sqsQueueUrl)
.withConfiguration(TestSnsConfiguration.class.getName())
.withAmazonConfig(amazonProperties);

runTest(connectorPropertyFactory, topicName, expect);
}

@Test
@Timeout(value = 90)
public void testBasicSendReceiveUsingKafkaStyle() throws Exception {
Properties amazonProperties = service.getConnectionProperties();
String topicName = getTopicForTest(this.getClass());

ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory
.basic()
.withName("CamelAWSSNSSinkKafkaStyleConnector")
.withTopics(topicName)
.withTopicOrArn(queueName)
.withSubscribeSNStoSQS(sqsQueueUrl)
.withConfiguration(TestSnsConfiguration.class.getName())
.withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE);

runTest(connectorPropertyFactory, topicName, expect);
}

@Test
@Timeout(value = 90)
public void testBasicSendReceiveUsingUrl() throws Exception {
Properties amazonProperties = service.getConnectionProperties();
String topicName = getTopicForTest(this.getClass());

ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory
.basic()
.withName("CamelAWSSNSSinkKafkaStyleConnectorWithUrl")
.withTopics(topicName)
.withUrl(queueName)
.append("queueUrl", sqsQueueUrl)
.append("subscribeSNStoSQS", "true")
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id()))
.append("configuration", classRef(TestSnsConfiguration.class.getName()))
.buildUrl();

runTest(connectorPropertyFactory, topicName, expect);

}
}
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.aws.v2.sns.sink;

import org.apache.camel.component.aws2.sns.Sns2Configuration;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import software.amazon.awssdk.services.sns.SnsClient;

public class TestSnsConfiguration extends Sns2Configuration {
private SnsClient snsClient;

public TestSnsConfiguration() {
snsClient = AWSSDKClientUtils.newSNSClient();
}

@Override
public void setAmazonSNSClient(SnsClient amazonSNSClient) {
// NO-OP
}

@Override
public SnsClient getAmazonSNSClient() {
return snsClient;
}
}

0 comments on commit 3cf956d

Please sign in to comment.