Skip to content

Commit

Permalink
Merge pull request #308 from orpiske/fix-aws-cases-remote-execution
Browse files Browse the repository at this point in the history
Fixes AWS test cases when running with remote AWS services
  • Loading branch information
oscerd committed Jun 26, 2020
2 parents 5d0eee5 + 9957652 commit a27894e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 13 deletions.
Expand Up @@ -170,7 +170,7 @@ public void testBasicSendReceiveUsingUrl() throws ExecutionException, Interrupte
.withUrl(AWSCommon.DEFAULT_S3_BUCKET)
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
.append("proxyProtocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
.appendIfAvailable("proxyProtocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
.buildUrl();

Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,6 +57,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class);

private AWSSQSClient awssqsClient;
private String queueName;

private volatile int received;
private final int expect = 10;
Expand All @@ -69,7 +71,9 @@ protected String[] getConnectorsInTest() {
public void setUp() {
awssqsClient = awsService.getClient();

String queueUrl = awssqsClient.getQueue(AWSCommon.DEFAULT_SQS_QUEUE);
queueName = AWSCommon.DEFAULT_SQS_QUEUE + "-" + TestUtils.randomWithRange(0, 1000);
String queueUrl = awssqsClient.getQueue(queueName);

LOG.debug("Using queue {} for the test", queueUrl);

received = 0;
Expand All @@ -78,7 +82,7 @@ public void setUp() {
@AfterEach
public void tearDown() {
deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
if (!awssqsClient.deleteQueue(AWSCommon.DEFAULT_SQS_QUEUE)) {
if (!awssqsClient.deleteQueue(queueName)) {
fail("Failed to delete queue");
}
}
Expand All @@ -100,7 +104,7 @@ private boolean checkMessages(List<Message> messages) {

private void consumeMessages(CountDownLatch latch) {
try {
awssqsClient.receive(AWSCommon.DEFAULT_SQS_QUEUE, this::checkMessages);
awssqsClient.receive(queueName, this::checkMessages);
} catch (Throwable t) {
LOG.error("Failed to consume messages: {}", t.getMessage(), t);
} finally {
Expand Down Expand Up @@ -147,7 +151,6 @@ public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws Ex

@Test
@Timeout(value = 120)
@RepeatedTest(3)
public void testBasicSendReceive() {
try {
Properties amazonProperties = awsService.getConnectionProperties();
Expand All @@ -157,7 +160,7 @@ public void testBasicSendReceive() {
.withName("CamelAwssqsSinkConnectorSpringBootStyle")
.withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
.withAmazonConfig(amazonProperties)
.withQueueNameOrArn(AWSCommon.DEFAULT_SQS_QUEUE);
.withQueueNameOrArn(queueName);

runTest(testProperties);

Expand All @@ -167,6 +170,7 @@ public void testBasicSendReceive() {
}
}

@DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote")
@Test
@Timeout(value = 120)
@RepeatedTest(3)
Expand All @@ -179,7 +183,7 @@ public void testBasicSendReceiveUsingKafkaStyle() {
.withName("CamelAwssqsSinkConnectorKafkaStyle")
.withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
.withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE)
.withQueueNameOrArn(AWSCommon.DEFAULT_SQS_QUEUE);
.withQueueNameOrArn(queueName);

runTest(testProperties);

Expand All @@ -189,6 +193,7 @@ public void testBasicSendReceiveUsingKafkaStyle() {
}
}

@DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote")
@Test
@Timeout(value = 120)
@RepeatedTest(3)
Expand All @@ -200,7 +205,7 @@ public void testBasicSendReceiveUsingUrl() {
.basic()
.withName("CamelAwssqsSinkConnectorUsingUrl")
.withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
.withUrl(AWSCommon.DEFAULT_SQS_QUEUE)
.withUrl(queueName)
.append("autoCreateQueue", "true")
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
Expand Down
Expand Up @@ -52,6 +52,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class);

private AWSSQSClient awssqsClient;
private String queueName;

private volatile int received;
private final int expect = 10;
Expand All @@ -64,14 +65,15 @@ protected String[] getConnectorsInTest() {
@BeforeEach
public void setUp() {
awssqsClient = service.getClient();
queueName = AWSCommon.DEFAULT_SQS_QUEUE + "-" + TestUtils.randomWithRange(0, 1000);
received = 0;
}

@AfterEach
public void tearDown() {
deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));

if (!awssqsClient.deleteQueue(AWSCommon.DEFAULT_SQS_QUEUE)) {
if (!awssqsClient.deleteQueue(queueName)) {
fail("Failed to delete queue");
}
}
Expand All @@ -93,7 +95,7 @@ public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws Ex

LOG.debug("Sending SQS messages");
for (int i = 0; i < expect; i++) {
awssqsClient.send(AWSCommon.DEFAULT_SQS_QUEUE, "Source test message " + i);
awssqsClient.send(queueName, "Source test message " + i);
}
LOG.debug("Done sending SQS messages");

Expand All @@ -111,7 +113,7 @@ public void testBasicSendReceive() throws ExecutionException, InterruptedExcepti
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withQueueOrArn(AWSCommon.DEFAULT_SQS_QUEUE)
.withQueueOrArn(queueName)
.withAmazonConfig(service.getConnectionProperties());

runTest(connectorPropertyFactory);
Expand All @@ -126,7 +128,7 @@ public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, Inte
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withQueueOrArn(AWSCommon.DEFAULT_SQS_QUEUE)
.withQueueOrArn(queueName)
.withAmazonConfig(service.getConnectionProperties(), CamelAWSSQSPropertyFactory.KAFKA_STYLE);

runTest(connectorPropertyFactory);
Expand All @@ -143,7 +145,7 @@ public void testBasicSendReceiveUsingUrl() throws ExecutionException, Interrupte
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withUrl(AWSCommon.DEFAULT_SQS_QUEUE)
.withUrl(queueName)
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
.append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
Expand Down

0 comments on commit a27894e

Please sign in to comment.