Skip to content

Commit

Permalink
[aws-sqs] Fix tests to work with aws credentials
Browse files Browse the repository at this point in the history
  • Loading branch information
Vratislav Hais authored and zbendhiba committed Oct 15, 2021
1 parent 1bc47aa commit d9d117a
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 49 deletions.
Expand Up @@ -17,6 +17,8 @@
package org.apache.camel.quarkus.component.aws2.sqs.it;

import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;

import javax.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -112,7 +114,7 @@ public Response purgeQueue(@PathParam("queueName") String queueName) throws Exce
public String sqsReceive(@PathParam("queueName") String queueName, @PathParam("deleteMessage") String deleteMessage)
throws Exception {
return consumerTemplate.receiveBody(componentUri(queueName)
+ "?deleteAfterRead=" + deleteMessage + "&deleteIfFiltered=" + deleteMessage + "&defaultVisibilityTimeout=1",
+ "?deleteAfterRead=" + deleteMessage + "&deleteIfFiltered=" + deleteMessage + "&defaultVisibilityTimeout=0",
10000,
String.class);
}
Expand Down Expand Up @@ -158,7 +160,7 @@ public Response deleteMessage(@PathParam("queueName") String queueName, @PathPar
producerTemplate.sendBodyAndHeader(componentUri(queueName) + "?operation=deleteMessage",
null,
Sqs2Constants.RECEIPT_HANDLE,
receipt);
URLDecoder.decode(receipt, StandardCharsets.UTF_8));
return Response.ok().build();
}

Expand All @@ -179,11 +181,13 @@ public Response deleteQueue(@PathParam("queueName") String queueName) throws Exc
public List<String> autoCreateDelayedQueue(@PathParam("queueName") String queueName, @PathParam("delay") String delay)
throws Exception {
// queue creation without any operation resulted in 405 status code
return producerTemplate.requestBody(
"aws2-sqs://" + queueName
+ "?autoCreateQueue=true&delayQueue=true&delaySeconds=" + delay + "&operation=listQueues",
null,
ListQueuesResponse.class).queueUrls();
return producerTemplate
.requestBody(
"aws2-sqs://" + queueName
+ "?autoCreateQueue=true&delayQueue=true&delaySeconds=" + delay + "&operation=listQueues",
null,
ListQueuesResponse.class)
.queueUrls();
}

private String componentUri() {
Expand Down
Expand Up @@ -16,11 +16,14 @@
*/
package org.apache.camel.quarkus.component.aws2.sqs.it;

import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
Expand All @@ -30,10 +33,12 @@
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import org.apache.camel.quarkus.test.support.aws2.Aws2TestResource;
import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.ConfigProvider;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.anyOf;
Expand All @@ -43,6 +48,25 @@
@QuarkusTestResource(Aws2TestResource.class)
class Aws2SqsSnsTest {

@AfterEach
public void purgeQueueAndWait() {
String qName = getPredefinedQueueName();
purgeQueue(qName);
// purge takes up to 60 seconds
// all messages delivered within those 60 seconds might get deleted
try {
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException ignored) {
}
Assertions.assertEquals(receiveMessageFromQueue(qName, false), "");
}

private void purgeQueue(String queueName) {
RestAssured.delete("/aws2-sqs-sns/sqs/purge/queue/" + queueName)
.then()
.statusCode(200);
}

@Test
public void sqs() {
final String queueName = getPredefinedQueueName();
Expand All @@ -65,10 +89,15 @@ private String[] listQueues() {
@Test
public void sqsDeleteMessage() {
final String qName = getPredefinedQueueName();
final String msg = sendSingleMessageToQueue(qName);
sendSingleMessageToQueue(qName);
final String receipt = receiveReceiptOfMessageFromQueue(qName);
final String msg = sendSingleMessageToQueue(qName);
deleteMessageFromQueue(qName, receipt);
Assertions.assertNotEquals(receiveMessageFromQueue(qName), msg);
// assertion is here twice because in case delete wouldn't work in our queue would be two messages
// it's possible that the first retrieval would retrieve the correct message and therefore the test
// would incorrectly pass. By receiving message twice we check if the message was really deleted.
Assertions.assertEquals(receiveMessageFromQueue(qName, false), msg);
Assertions.assertEquals(receiveMessageFromQueue(qName, false), msg);
}

private String getPredefinedQueueName() {
Expand All @@ -87,7 +116,8 @@ private String sendSingleMessageToQueue(String queueName) {
}

private String receiveReceiptOfMessageFromQueue(String queueName) {
return RestAssured.get("/aws2-sqs-sns/sqs/receive/receipt/" + queueName)
return RestAssured
.get("/aws2-sqs-sns/sqs/receive/receipt/" + queueName)
.then()
.statusCode(200)
.extract()
Expand All @@ -96,34 +126,47 @@ private String receiveReceiptOfMessageFromQueue(String queueName) {
}

private void deleteMessageFromQueue(String queueName, String receipt) {
RestAssured.delete("/aws2-sqs-sns/sqs/delete/message/" + queueName + "/" + receipt)
RestAssured
.delete("/aws2-sqs-sns/sqs/delete/message/" + queueName + "/"
+ URLEncoder.encode(receipt, StandardCharsets.UTF_8))
.then()
.statusCode(200);
}

@Test
@Disabled
public void sqsAutoCreateDelayedQueue() {
final String qName = "delayQueue";
final int delay = 10;
createDelayQueueAndVerifyExistence("delayQueue", delay);
final String msgSent = sendSingleMessageToQueue(qName);
Instant start = Instant.now();
awaitMessageWithExpectedContentFromQueue(msgSent, qName);
Assertions.assertTrue(Duration.between(start, Instant.now()).getSeconds() >= delay);
deleteQueue(qName);
final String qName = "delayQueue-" + RandomStringUtils.randomAlphanumeric(49).toLowerCase(Locale.ROOT);
final int delay = 20;
try {
createDelayQueueAndVerifyExistence(qName, delay);
Instant start = Instant.now();
final String msgSent = sendSingleMessageToQueue(qName);
awaitMessageWithExpectedContentFromQueue(msgSent, qName);
System.out.println(Duration.between(start, Instant.now()).getSeconds());
Assertions.assertTrue(Duration.between(start, Instant.now()).getSeconds() >= delay);
} catch (AssertionError e) {
e.printStackTrace();
Assertions.fail();
} finally {
deleteQueue(qName);
}
}

private void createDelayQueueAndVerifyExistence(String queueName, int delay) {
RestAssured.post("/aws2-sqs-sns/sqs/queue/autocreate/delayed/" + queueName + "/" + delay)
.then()
.statusCode(200);
Assertions.assertTrue(Stream.of(listQueues()).anyMatch(url -> url.contains(queueName)));
.statusCode(200)
.extract()
.body()
.as(String[].class);
Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until(
() -> Stream.of(listQueues()).anyMatch(url -> url.contains(queueName)));
}

private void awaitMessageWithExpectedContentFromQueue(String expectedContent, String queueName) {
Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until(
() -> receiveMessageFromQueue(queueName),
Matchers.is(expectedContent));
() -> receiveMessageFromQueue(queueName, false).equals(expectedContent));
}

private void deleteQueue(String queueName) {
Expand All @@ -138,10 +181,6 @@ private void awaitQueueDeleted(String queueName) {
() -> Stream.of(listQueues()).noneMatch(url -> url.contains(queueName)));
}

private String receiveMessageFromQueue(String queueName) {
return receiveMessageFromQueue(queueName, true);
}

private String receiveMessageFromQueue(String queueName, boolean deleteMessage) {
return RestAssured.get("/aws2-sqs-sns/sqs/receive/" + queueName + "/" + deleteMessage)
.then()
Expand Down Expand Up @@ -174,27 +213,6 @@ private int sendMessageBatchAndRetrieveSuccessCount(List<String> batch) {
.asString());
}

@Test
public void sqsPurgeQueue() {
final String qName = getPredefinedQueueName();
sendSingleMessageToQueue(qName);
purgeQueue(qName);
awaitAllMessagesDeletedFromQueue(qName);
}

private void purgeQueue(String queueName) {
RestAssured.delete("/aws2-sqs-sns/sqs/purge/queue/" + queueName)
.then()
.statusCode(200);
}

private void awaitAllMessagesDeletedFromQueue(String queueName) {
// it can take up to 60 seconds to purge all messages in queue as stated in documentation
Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(60, TimeUnit.SECONDS).until(
() -> receiveMessageFromQueue(queueName, false),
Matchers.emptyOrNullString());
}

@Test
void sns() {
final String snsMsg = "sns" + UUID.randomUUID().toString().replace("-", "");
Expand Down
Expand Up @@ -18,6 +18,7 @@

import java.util.Collections;
import java.util.Locale;
import java.util.Map;

import org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvContext;
import org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvCustomizer;
Expand All @@ -36,6 +37,8 @@
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest;

import static java.util.Map.entry;

public class Aws2SqsSnsTestEnvCustomizer implements Aws2TestEnvCustomizer {

@Override
Expand All @@ -55,6 +58,7 @@ public void customize(Aws2TestEnvContext envContext) {
final String queueUrl = sqsClient.createQueue(
CreateQueueRequest.builder()
.queueName(queueName)
.attributes(Map.ofEntries(entry(QueueAttributeName.VISIBILITY_TIMEOUT, "0")))
.build())
.queueUrl();
envContext.closeable(() -> sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(queueUrl).build()));
Expand Down

0 comments on commit d9d117a

Please sign in to comment.