Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport sink test cleanups to camel-master #972

Merged
merged 22 commits into from Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0e38f6d
Make the AWS v2 sink test support reusable
orpiske Feb 3, 2021
46dd93b
Convert the AWS v1 tests to the new reusable sink test base class
orpiske Feb 3, 2021
e794076
Convert the Azure storage blob tests to the new reusable sink test ba…
orpiske Feb 3, 2021
0c9b89f
Convert the Azure storage queue tests to the new reusable sink test b…
orpiske Feb 3, 2021
46dd1fc
Convert the Cassandra tests to the new reusable sink test base class
orpiske Feb 3, 2021
3526aec
Convert the Couchbase tests to the new reusable sink test base class
orpiske Feb 3, 2021
c749baa
Convert the ElasticSearch tests to the new reusable sink test base class
orpiske Feb 3, 2021
5b3b4da
Convert the File tests to the new reusable sink test base class
orpiske Feb 3, 2021
88464ac
Convert the HDFS tests to the new reusable sink test base class
orpiske Feb 3, 2021
9c528b4
Convert the HTTP tests to the new reusable sink test base class
orpiske Feb 3, 2021
c87a85e
Convert the JDBC tests to the new reusable sink test base class
orpiske Feb 3, 2021
132f4a4
Convert the MongoDB tests to the new reusable sink test base class
orpiske Feb 3, 2021
4cbe252
Convert the RabbitMQ tests to the new reusable sink test base class
orpiske Feb 3, 2021
25a3463
Removed unused variables from SJMS2 DQL sink test
orpiske Feb 3, 2021
31bf18c
Convert the SJMS2 tests to the new reusable sink test base class
orpiske Feb 3, 2021
6ca77cf
Convert the SQL tests to the new reusable sink test base class
orpiske Feb 3, 2021
a84f8c7
Convert the SSH tests to the new reusable sink test base class
orpiske Feb 3, 2021
7824354
Convert the Syslog tests to the new reusable sink test base class
orpiske Feb 3, 2021
ddc799d
Fix error message check for SJMS2 startup on error test
orpiske Feb 4, 2021
55ea26a
Temporarily disabled a error handling test for SJMS2 due to failures …
orpiske Feb 4, 2021
ffbb576
Disabled MongoDB source test case due to GH issue #974
orpiske Feb 4, 2021
ce3abfb
Disabled Azure storage queue tests due to GH issue #976
orpiske Feb 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,19 +18,16 @@
package org.apache.camel.kafkaconnector.aws.v1.sns.sink;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.model.Message;
import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.clients.AWSClientUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
Expand All @@ -52,7 +49,7 @@

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport {

@RegisterExtension
public static AWSService service = AWSServiceFactory.createSNSService();
Expand All @@ -61,6 +58,7 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
private AWSSQSClient awsSqsClient;
private String sqsQueueUrl;
private String queueName;
private String topicName;

private volatile int received;
private final int expect = 10;
Expand All @@ -72,16 +70,31 @@ protected String[] getConnectorsInTest() {

@BeforeEach
public void setUp() {
awsSqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient());
topicName = getTopicForTest(this);

awsSqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient());
queueName = AWSCommon.DEFAULT_SQS_QUEUE_FOR_SNS + "-" + TestUtils.randomWithRange(0, 1000);
sqsQueueUrl = awsSqsClient.getQueue(queueName);

LOG.info("Created SQS queue {}", sqsQueueUrl);

received = 0;
}

@Override
protected Map<String, String> messageHeaders(String text, int current) {
return null;
}

@Override
protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
if (latch.await(120, TimeUnit.SECONDS)) {
assertEquals(expect, received,
"Didn't process the expected amount of messages: " + received + " != " + expect);
} else {
fail("Failed to receive the messages within the specified time");
}
}

private boolean checkMessages(List<Message> messages) {
for (Message message : messages) {
LOG.info("Received: {}", message.getBody());
Expand All @@ -96,7 +109,8 @@ private boolean checkMessages(List<Message> messages) {
return true;
}

private void consumeMessages(CountDownLatch latch) {
@Override
protected void consumeMessages(CountDownLatch latch) {
try {
awsSqsClient.receiveFrom(sqsQueueUrl, this::checkMessages);
} catch (Throwable t) {
Expand All @@ -107,92 +121,53 @@ private void consumeMessages(CountDownLatch latch) {
}
}

public void runTest(ConnectorPropertyFactory connectorPropertyFactory)
throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();

getKafkaConnectService().initializeConnector(connectorPropertyFactory);

ExecutorService service = Executors.newCachedThreadPool();

LOG.debug("Creating the consumer ...");
CountDownLatch latch = new CountDownLatch(1);
service.submit(() -> consumeMessages(latch));

KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());

for (int i = 0; i < expect; i++) {
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
}
@Test
@Timeout(value = 90)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = service.getConnectionProperties();

LOG.debug("Created the consumer ... About to receive messages");
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
.withName("CamelAWSSNSSinkConnectorDefault")
.withTopics(topicName)
.withTopicOrArn(queueName)
.withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName())
.withAmazonConfig(amazonProperties);

if (latch.await(120, TimeUnit.SECONDS)) {
assertEquals(expect, received,
"Didn't process the expected amount of messages: " + received + " != " + expect);
} else {
fail("Failed to receive the messages within the specified time");
}
runTest(connectorPropertyFactory, topicName, expect);
}

@Test
@Timeout(value = 90)
public void testBasicSendReceive() {
try {
Properties amazonProperties = service.getConnectionProperties();

ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
.withName("CamelAWSSNSSinkConnectorDefault")
.withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withTopicOrArn(queueName)
.withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName())
.withAmazonConfig(amazonProperties);

runTest(connectorPropertyFactory);
} catch (Exception e) {
LOG.error("Amazon SNS test failed: {}", e.getMessage(), e);
fail(e.getMessage());
}
}
public void testBasicSendReceiveUsingKafkaStyle() throws Exception {
Properties amazonProperties = service.getConnectionProperties();

@Test
@Timeout(value = 90)
public void testBasicSendReceiveUsingKafkaStyle() {
try {
Properties amazonProperties = service.getConnectionProperties();

ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
.withName("CamelAWSSNSSinkKafkaStyleConnector")
.withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withTopicOrArn(queueName)
.withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName())
.withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE);

runTest(connectorPropertyFactory);
} catch (Exception e) {
LOG.error("Amazon SNS test failed: {}", e.getMessage(), e);
fail(e.getMessage());
}
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
.withName("CamelAWSSNSSinkKafkaStyleConnector")
.withTopics(topicName)
.withTopicOrArn(queueName)
.withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName())
.withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE);

runTest(connectorPropertyFactory, topicName, expect);
}

@Disabled("AWS SNS component is failing to parse the sink URL for this one")
@Test
@Timeout(value = 90)
public void testBasicSendReceiveUsingUrl() {
try {
Properties amazonProperties = service.getConnectionProperties();
public void testBasicSendReceiveUsingUrl() throws Exception {
Properties amazonProperties = service.getConnectionProperties();

ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
.withName("CamelAWSSNSSinkKafkaStyleConnector")
.withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withUrl(queueName)
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
.withName("CamelAWSSNSSinkKafkaStyleConnector")
.withTopics(topicName)
.withUrl(queueName)
.append("queueUrl", sqsQueueUrl).append("subscribeSNStoSQS", "true")
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
.append("configuration", "#class:" + TestSNSConfiguration.class.getName()).buildUrl();
.append("configuration", "#class:" + TestSNSConfiguration.class.getName())
.buildUrl();

runTest(connectorPropertyFactory);
} catch (Exception e) {
LOG.error("Amazon SNS test failed: {}", e.getMessage(), e);
fail(e.getMessage());
}
runTest(connectorPropertyFactory, topicName, expect);
}
}