diff --git a/README.md b/README.md index 851e03ae..aa1cee4a 100644 --- a/README.md +++ b/README.md @@ -101,7 +101,7 @@ will not acknowledge the message automatically when the method executes without The [Spring Cloud AWS Messaging](https://github.com/spring-cloud/spring-cloud-aws/tree/master/spring-cloud-aws-messaging) `@SqsListener` works by requesting a set of messages from the SQS and when they are done it will request some more. There is one disadvantage with this approach in that if 9/10 of the messages finish in 10 milliseconds but one takes 10 seconds no other messages will be picked up until that last message is complete. The -[@BatchingQueueListener](./java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListener.java) +[@QueueListener](./java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java) provides the same basic functionality but it also provides a timeout where eventually it will request for more messages even for the threads that are ready for another message. It will also batch the removal of messages from the queue and therefore with a concurrency level of 10, if there are a lot messages on the queue, only 2 requests would be made to SQS for retrieval and deletion of messages. The usage is something like this: @@ -109,7 +109,7 @@ on the queue, only 2 requests would be made to SQS for retrieval and deletion of ```java @Service public class MyMessageListener { - @BatchingQueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10, maxPeriodBetweenBatchesInMs = 2000) + @QueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10, maxPeriodBetweenBatchesInMs = 2000) public void processMessage(@Payload final String payload) { // process the message payload here } @@ -121,7 +121,7 @@ before requesting messages for threads waiting for another message. ### Setting up a queue listener that prefetches messages When the amount of messages for a service is extremely high, prefetching messages may be a way to optimise the throughput of the application. The -[@PrefetchingQueueListener](./java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListener.java) +[@PrefetchingQueueListener](./java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListener.java) annotation can be used to pretech messages in a background thread while messages are currently being processed. The usage is something like this: ```java @@ -236,6 +236,12 @@ mvn clean install -DskipTests (cd examples/java-dynamic-sqs-listener-core-examples && mvn exec:java) ``` +### Connecting to multiple AWS Accounts using the Spring Starter +If the Spring Boot application needs to connect to SQS queues across multiple AWS Accounts, you will need to provide a +[SqsAsyncClientProvider](./java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/client/SqsAsyncClientProvider.java) +which will be able to obtain a specific `SqsAsyncClient` based on an identifier. For more information on how to do this, take a look at the documentation +at [How To Connect to Multiple AWS Accounts](doc/how-to-guides/spring/spring-how-to-connect-to-multiple-aws-accounts.md) + ### Comparing Libraries If you want to see the difference between this library and others like the [Spring Cloud AWS Messaging](https://github.com/spring-cloud/spring-cloud-aws/tree/master/spring-cloud-aws-messaging) and diff --git a/doc/documentation.md b/doc/documentation.md index 9dd3f80a..389baab0 100644 --- a/doc/documentation.md +++ b/doc/documentation.md @@ -15,6 +15,7 @@ more in depth understanding take a look at the JavaDoc for the API. extending the visibility of a message in the case of long processing so it does not get put back on the queue while processing 1. [How to manually acknowledge message](how-to-guides/core/core-how-to-manually-acknowledge-message.md): useful for when you want to mark the message as successfully processed before the method has finished executing + 1. [How to Connect to an AWS SQS Queue](how-to-guides/how-to-connect-to-aws-sqs-queue.md): necessary for actually using this framework in live environments 1. Spring How To Guides 1. [How to add a custom ArgumentResolver to a Spring application](how-to-guides/spring/spring-how-to-add-custom-argument-resolver.md): useful for integrating custom argument resolution code to be included in a Spring Application. See [How to implement a custom ArgumentResolver](how-to-guides/core/core-how-to-implement-a-custom-argument-resolver.md) @@ -27,7 +28,8 @@ more in depth understanding take a look at the JavaDoc for the API. writing right? 1. [How to Start/Stop Queue Listeners](how-to-guides/spring/spring-how-to-start-stop-queue-listeners.md): guide for starting and stopping the processing of messages for specific queue listeners - 1. [How to Connect to an AWS SQS Queue](how-to-guides/how-to-connect-to-aws-sqs-queue.md): necessary for actually using this framework in live environments + 1. [How to connect to multiple AWS Accounts](how-to-guides/spring/spring-how-to-connect-to-multiple-aws-accounts.md): guide for listening to queues + across multiple AWS Accounts 1. Local Development: 1. [Setting up IntelliJ](local-development/setting-up-intellij.md): steps for setting IntelliJ up for development, e.g. configuring checkstyle, Lombok, etc diff --git a/doc/how-to-guides/spring/spring-how-to-connect-to-multiple-aws-accounts.md b/doc/how-to-guides/spring/spring-how-to-connect-to-multiple-aws-accounts.md new file mode 100644 index 00000000..d12ce3c9 --- /dev/null +++ b/doc/how-to-guides/spring/spring-how-to-connect-to-multiple-aws-accounts.md @@ -0,0 +1,60 @@ +# Spring - How to Connect to multiple AWS Accounts +There may be a scenario where you need to connect to multiple queues across multiple AWS Accounts. In this scenario you would +need to provide multiple `SqsAsyncClients` and for each queue listener you will need to indicate which one is desired. For a full +example take a look at the [Multiple AWS Accounts Example](../../../examples/multiple-aws-account-example) which shows you how to +connect to two locally running ElasticMQ servers. + +## Steps +1. Create some queues that will use specific `SqsAsyncClient`s identified by an id. + ```java + public class MyMessageListeners { + // This uses the "default" SqsAsyncClient which may not be present + @QueueListener("queueNameForDefaultListener") + public void listenerForDefaultClient(@Payload String messageBody) { + + } + + // This uses the "firstClient" SqsAsyncClient + @QueueListener(value = "queueNameForFirstClient", sqsClient = "firstClient") + public void queueNameListenerForFirstClient(@Payload String messageBody) { + + } + + // This uses the "firstClient" SqsAsyncClient + @QueueListener(value = "anotherQueueNameForFirstClient", sqsClient = "firstClient") + public void anotherQueueNameListenerForFirstClient(@Payload String messageBody) { + + } + + // This uses the "secondClient" SqsAsyncClient + @QueueListener(value = "queueNameForSecondClient", sqsClient = "secondClient") + public void queueNameListenerForSecondClient(@Payload String messageBody) { + + } + } + ``` +1. You will need to add to your `@Configuration` a bean of type +[SqsAsyncClientProvider](../../../java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/client/SqsAsyncClientProvider.java) +which will provide all of the `SqsAsyncClient`s for the queues above. + ```java + @Configuration + public class MyConfig { + @Bean + public SqsAsyncClientProvider sqsAsyncClientProvider() { + // this client will be used if there is no client identifier for the listener. Note that this can be null + // and in this case listenerForDefaultClient above will fail to wrap + final SqsAsyncClient defaultClient = ...; + + final SqsAsyncClient firstClient = ...; + final SqsAsyncClient secondClient = ...; + + return new DefaultSqsAsyncClientProvider( + defaultClient, + ImmutableMap.of( + "firstClient", firstClient, + "secondClient", secondClient + ) + ); + } + } + ``` diff --git a/examples/java-dynamic-sqs-listener-core-examples/src/main/java/com/jashmore/sqs/examples/ConcurrentBrokerExample.java b/examples/java-dynamic-sqs-listener-core-examples/src/main/java/com/jashmore/sqs/examples/ConcurrentBrokerExample.java index 06a242ba..4d4c1681 100644 --- a/examples/java-dynamic-sqs-listener-core-examples/src/main/java/com/jashmore/sqs/examples/ConcurrentBrokerExample.java +++ b/examples/java-dynamic-sqs-listener-core-examples/src/main/java/com/jashmore/sqs/examples/ConcurrentBrokerExample.java @@ -110,8 +110,9 @@ public static void main(final String[] args) throws Exception { .bufferingTimeInMs(5000) .build()); final MessageProcessor messageProcessor = new DefaultMessageProcessor( - argumentResolverService(sqsAsyncClient), + argumentResolverService(), queueProperties, + sqsAsyncClient, messageResolver, messageReceivedMethod, messageConsumer @@ -184,12 +185,11 @@ private static SqsAsyncClient startElasticMqServer() throws URISyntaxException { /** * Builds the {@link ArgumentResolverService} that will be used to parse the messages into arguments for the {@link MessageConsumer}. * - * @param sqsAsyncClient the client to communicate with the SQS queue * @return the service to resolve arguments for the message consumer */ - private static ArgumentResolverService argumentResolverService(final SqsAsyncClient sqsAsyncClient) { + private static ArgumentResolverService argumentResolverService() { final PayloadMapper payloadMapper = new JacksonPayloadMapper(OBJECT_MAPPER); - return new CoreArgumentResolverService(payloadMapper, sqsAsyncClient, OBJECT_MAPPER); + return new CoreArgumentResolverService(payloadMapper, OBJECT_MAPPER); } /** diff --git a/examples/multiple-aws-account-example/pom.xml b/examples/multiple-aws-account-example/pom.xml new file mode 100644 index 00000000..503070f9 --- /dev/null +++ b/examples/multiple-aws-account-example/pom.xml @@ -0,0 +1,129 @@ + + + + examples + com.jashmore + 2.3.0-SNAPSHOT + + 4.0.0 + + multiple-aws-account-example + + Java Dynamic SQS Listener - Multiple AWS Accounts Example + Contains examples for listening to SQS queues on multiple AWS Accounts + + + ../../configuration/findbugs/bugsExcludeFilter.xml + + + + + org.springframework.boot + spring-boot-starter + ${spring.boot.version} + + + + org.springframework.boot + spring-boot-starter-web + ${spring.boot.version} + + + + org.projectlombok + lombok + provided + + + + com.jashmore + java-dynamic-sqs-listener-spring-starter + ${project.version} + + + + com.jashmore + local-amazon-sqs + ${project.version} + + + + org.elasticmq + elasticmq-rest-sqs_2.11 + 0.13.9 + + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring.boot.version} + pom + import + + + + + + + + ${basedir}/src/main/resources + true + + **/*.yml + **/application*.properties + + + + ${basedir}/src/main/resources + + **/*.yml + **/application*.properties + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + com.jashmore.sqs.examples.Application + + + + + repackage + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + ${resource.delimiter} + + false + + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${java.version} + ${java.version} + + + + + + + \ No newline at end of file diff --git a/examples/multiple-aws-account-example/src/main/java/com/jashmore/sqs/examples/Application.java b/examples/multiple-aws-account-example/src/main/java/com/jashmore/sqs/examples/Application.java new file mode 100644 index 00000000..32f528ff --- /dev/null +++ b/examples/multiple-aws-account-example/src/main/java/com/jashmore/sqs/examples/Application.java @@ -0,0 +1,80 @@ +package com.jashmore.sqs.examples; + +import com.google.common.collect.ImmutableMap; + +import akka.http.scaladsl.Http; +import com.jashmore.sqs.spring.client.DefaultSqsAsyncClientProvider; +import com.jashmore.sqs.spring.client.SqsAsyncClientProvider; +import lombok.extern.slf4j.Slf4j; +import org.elasticmq.rest.sqs.SQSRestServer; +import org.elasticmq.rest.sqs.SQSRestServerBuilder; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableScheduling; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; + +import java.net.URI; +import java.util.concurrent.ExecutionException; + +@SpringBootApplication +@EnableScheduling +@Slf4j +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class); + } + + /** + * Creates the {@link SqsAsyncClientProvider} that will allow multiple AWS Account SQS queues to be queried. + * + * @return the provider to use + * @throws InterruptedException if it was interrupted while building the clients + */ + @Bean + public SqsAsyncClientProvider sqsAsyncClientProvider() throws InterruptedException { + return new DefaultSqsAsyncClientProvider(null, ImmutableMap.of( + "firstClient", buildClient("firstClientQueue"), + "secondClient", buildClient("secondClientQueue") + )); + } + + /** + * Starts an in-memory ElasticMQ server and returns a {@link SqsAsyncClient} pointing to it, each call to this represents a different AWS Account. + * + *

Note that the region and credentials are hardcoded to fake values as they are not checked but connecting to an actual AWS account here will involve + * reading environment variables or other properties. + * + * @param queueName the name of a queue to create in this SQS Server + * @return the {@link SqsAsyncClient} that points to this SQS Server + * @throws InterruptedException if it was interrupted while creating the queue + */ + private static SqsAsyncClient buildClient(String queueName) throws InterruptedException { + log.info("Starting Local ElasticMQ SQS Server"); + final SQSRestServer sqsRestServer = SQSRestServerBuilder + .withInterface("localhost") + .withDynamicPort() + .start(); + + final Http.ServerBinding serverBinding = sqsRestServer.waitUntilStarted(); + final SqsAsyncClient client = SqsAsyncClient.builder() + .endpointOverride(URI.create("http://localhost:" + serverBinding.localAddress().getPort())) + .region(Region.of("localstack")) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("accessKeyId", "secretAccessKey"))) + .build(); + try { + client.createQueue(CreateQueueRequest.builder() + .queueName(queueName) + .build()) + .get(); + } catch (ExecutionException executionException) { + throw new RuntimeException(executionException.getCause()); + } + return client; + } +} diff --git a/examples/multiple-aws-account-example/src/main/java/com/jashmore/sqs/examples/MessageListeners.java b/examples/multiple-aws-account-example/src/main/java/com/jashmore/sqs/examples/MessageListeners.java new file mode 100644 index 00000000..5f092ef9 --- /dev/null +++ b/examples/multiple-aws-account-example/src/main/java/com/jashmore/sqs/examples/MessageListeners.java @@ -0,0 +1,31 @@ +package com.jashmore.sqs.examples; + +import com.jashmore.sqs.argument.payload.Payload; +import com.jashmore.sqs.spring.container.basic.QueueListener; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@SuppressWarnings("unused") +@Slf4j +public class MessageListeners { + /** + * Basic queue listener. + * + * @param payload the payload of the SQS Message + */ + @QueueListener(value = "firstClientQueue", sqsClient = "firstClient") + public void firstClientMessageProcessing(@Payload final String payload) { + log.info("Message Received from firstClient#firstClientQueue: {}", payload); + } + + /** + * Basic queue listener. + * + * @param payload the payload of the SQS Message + */ + @QueueListener(value = "secondClientQueue", sqsClient = "secondClient") + public void secondClientMessageProcessing(@Payload final String payload) { + log.info("Message Received from secondClient#secondClientQueue: {}", payload); + } +} diff --git a/examples/multiple-aws-account-example/src/main/java/com/jashmore/sqs/examples/ScheduledMessageProducer.java b/examples/multiple-aws-account-example/src/main/java/com/jashmore/sqs/examples/ScheduledMessageProducer.java new file mode 100644 index 00000000..2c16d275 --- /dev/null +++ b/examples/multiple-aws-account-example/src/main/java/com/jashmore/sqs/examples/ScheduledMessageProducer.java @@ -0,0 +1,62 @@ +package com.jashmore.sqs.examples; + +import com.jashmore.sqs.spring.client.SqsAsyncClientProvider; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Helper scheduled task that will place 10 messages onto each queue for processing by the message listeners. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class ScheduledMessageProducer { + private final SqsAsyncClientProvider sqsAsyncClientProvider; + private final AtomicInteger count = new AtomicInteger(); + + /** + * Scheduled job that sends messages to the queue for testing the listener. + * + * @throws Exception if there was an error placing messages on the queue + */ + @Scheduled(initialDelay = 1000, fixedDelay = 1000) + public void addMessages() throws Exception { + log.info("Putting 10 messages onto each queue"); + final int currentValue = count.incrementAndGet(); + + sendMessagesToQueue(getSqsAsyncClient("firstClient"), "firstClientQueue", currentValue); + sendMessagesToQueue(getSqsAsyncClient("secondClient"), "secondClientQueue", currentValue); + } + + private SqsAsyncClient getSqsAsyncClient(final String clientId) { + return sqsAsyncClientProvider.getClient(clientId) + .orElseThrow(() -> new RuntimeException("Unknown client: " + clientId)); + } + + private void sendMessagesToQueue(final SqsAsyncClient sqsAsyncClient, + final String queueName, + final int currentValue) throws ExecutionException, InterruptedException { + final String queueUrl = sqsAsyncClient.getQueueUrl((request) -> request.queueName(queueName)).get().queueUrl(); + + final SendMessageBatchRequest.Builder batchRequestBuilder = SendMessageBatchRequest.builder().queueUrl(queueUrl); + batchRequestBuilder.entries(IntStream.range(0, 10) + .mapToObj(i -> { + final String messageId = "" + currentValue + "-" + i; + final String messageContent = "Message, loop: " + currentValue + " id: " + i; + return SendMessageBatchRequestEntry.builder().id(messageId).messageBody(messageContent).build(); + }) + .collect(Collectors.toSet())); + + sqsAsyncClient.sendMessageBatch(batchRequestBuilder.build()); + } +} diff --git a/examples/pom.xml b/examples/pom.xml index 36f6a007..325f4979 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -24,6 +24,7 @@ java-dynamic-sqs-listener-spring-aws-example java-dynamic-sqs-listener-spring-integration-test-example sqs-listener-library-comparison + multiple-aws-account-example diff --git a/java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java b/java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java index 30c0c8bf..1f5b32ad 100644 --- a/java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java +++ b/java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java @@ -3,6 +3,7 @@ import com.jashmore.sqs.argument.ArgumentResolverService; import com.jashmore.sqs.broker.MessageBroker; import com.jashmore.sqs.processor.argument.Acknowledge; +import com.jashmore.sqs.processor.argument.VisibilityExtender; import com.jashmore.sqs.resolver.MessageResolver; import software.amazon.awssdk.services.sqs.model.Message; @@ -34,6 +35,7 @@ *

  • {@link Acknowledge}: this argument can be used to acknowledge that the messages has been successfully processed and can be deleted from the * queue. If no {@link Acknowledge} argument is included in the argument list of the method, the message will be deleted from the queue if the * method processing the message completes without an exception being thrown.
  • + *
  • {@link VisibilityExtender}: this argument can be used to extend the visibility of a message if it is taking a long time to process.
  • * * *

    If you were to consider this library as similar to a pub-sub system, this could be considered the subscriber in that it will take messages provided diff --git a/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/visibility/VisibilityExtender.java b/java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/processor/argument/VisibilityExtender.java similarity index 97% rename from java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/visibility/VisibilityExtender.java rename to java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/processor/argument/VisibilityExtender.java index 5026e87d..0bfd58c2 100644 --- a/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/visibility/VisibilityExtender.java +++ b/java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/processor/argument/VisibilityExtender.java @@ -1,4 +1,4 @@ -package com.jashmore.sqs.argument.visibility; +package com.jashmore.sqs.processor.argument; import java.util.concurrent.Future; diff --git a/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/CoreArgumentResolverService.java b/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/CoreArgumentResolverService.java index ed82efbd..344578eb 100644 --- a/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/CoreArgumentResolverService.java +++ b/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/CoreArgumentResolverService.java @@ -9,9 +9,7 @@ import com.jashmore.sqs.argument.messageid.MessageIdArgumentResolver; import com.jashmore.sqs.argument.payload.PayloadArgumentResolver; import com.jashmore.sqs.argument.payload.mapper.PayloadMapper; -import com.jashmore.sqs.argument.visibility.VisibilityExtenderArgumentResolver; import lombok.experimental.Delegate; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; import java.util.Set; @@ -26,14 +24,12 @@ public class CoreArgumentResolverService implements ArgumentResolverService { private final DelegatingArgumentResolverService delegatingArgumentResolverService; public CoreArgumentResolverService(final PayloadMapper payloadMapper, - final SqsAsyncClient sqsAsyncClient, final ObjectMapper objectMapper) { final Set> argumentResolvers = ImmutableSet.of( new PayloadArgumentResolver(payloadMapper), new MessageIdArgumentResolver(), new MessageAttributeArgumentResolver(objectMapper), new MessageSystemAttributeArgumentResolver(), - new VisibilityExtenderArgumentResolver(sqsAsyncClient), new MessageArgumentResolver() ); this.delegatingArgumentResolverService = new DelegatingArgumentResolverService(argumentResolvers); diff --git a/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/visibility/DefaultVisibilityExtender.java b/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/visibility/DefaultVisibilityExtender.java index ac1daf99..8203c787 100644 --- a/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/visibility/DefaultVisibilityExtender.java +++ b/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/visibility/DefaultVisibilityExtender.java @@ -1,6 +1,7 @@ package com.jashmore.sqs.argument.visibility; import com.jashmore.sqs.QueueProperties; +import com.jashmore.sqs.processor.argument.VisibilityExtender; import lombok.AllArgsConstructor; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; diff --git a/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/visibility/VisibilityExtenderArgumentResolver.java b/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/visibility/VisibilityExtenderArgumentResolver.java deleted file mode 100644 index faafe1f6..00000000 --- a/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/visibility/VisibilityExtenderArgumentResolver.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.jashmore.sqs.argument.visibility; - -import com.jashmore.sqs.QueueProperties; -import com.jashmore.sqs.argument.ArgumentResolutionException; -import com.jashmore.sqs.argument.ArgumentResolver; -import com.jashmore.sqs.argument.MethodParameter; -import com.jashmore.sqs.argument.payload.Payload; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.model.Message; - -/** - * Resolves message consumer's with a parameter of type {@link VisibilityExtender} to an implementation that can be used - * for the consumer to request the increase of visibility of the message. - * - *

    Message consumer parameters with the {@link VisibilityExtender} type should not have any annotations that would result in another - * {@link ArgumentResolver} from attempting to resolve the parameter. For example you should not have a method of type {@link VisibilityExtender} and also - * with the {@link Payload} annotation. - */ -public class VisibilityExtenderArgumentResolver implements ArgumentResolver { - private final SqsAsyncClient sqsAsyncClient; - - public VisibilityExtenderArgumentResolver(final SqsAsyncClient sqsAsyncClient) { - this.sqsAsyncClient = sqsAsyncClient; - } - - @Override - public boolean canResolveParameter(final MethodParameter methodParameter) { - return methodParameter.getParameter().getType() == VisibilityExtender.class; - } - - @Override - public VisibilityExtender resolveArgumentForParameter(final QueueProperties queueProperties, - final MethodParameter methodParameter, - final Message message) throws ArgumentResolutionException { - return new DefaultVisibilityExtender(sqsAsyncClient, queueProperties, message); - } -} diff --git a/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/processor/DefaultMessageProcessor.java b/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/processor/DefaultMessageProcessor.java index c88c695d..f5b8e7c0 100644 --- a/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/processor/DefaultMessageProcessor.java +++ b/java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/processor/DefaultMessageProcessor.java @@ -7,9 +7,12 @@ import com.jashmore.sqs.argument.ArgumentResolverService; import com.jashmore.sqs.argument.DefaultMethodParameter; import com.jashmore.sqs.argument.MethodParameter; +import com.jashmore.sqs.argument.visibility.DefaultVisibilityExtender; import com.jashmore.sqs.processor.argument.Acknowledge; +import com.jashmore.sqs.processor.argument.VisibilityExtender; import com.jashmore.sqs.resolver.MessageResolver; import lombok.AllArgsConstructor; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.Message; import java.lang.reflect.InvocationTargetException; @@ -30,6 +33,7 @@ @AllArgsConstructor public class DefaultMessageProcessor implements MessageProcessor { private final QueueProperties queueProperties; + private final SqsAsyncClient sqsAsyncClient; private final MessageResolver messageResolver; private final Method messageConsumerMethod; private final Object messageConsumerBean; @@ -39,10 +43,12 @@ public class DefaultMessageProcessor implements MessageProcessor { public DefaultMessageProcessor(final ArgumentResolverService argumentResolverService, final QueueProperties queueProperties, + final SqsAsyncClient sqsAsyncClient, final MessageResolver messageResolver, final Method messageConsumerMethod, final Object messageConsumerBean) { this.queueProperties = queueProperties; + this.sqsAsyncClient = sqsAsyncClient; this.messageResolver = messageResolver; this.messageConsumerMethod = messageConsumerMethod; this.messageConsumerBean = messageConsumerBean; @@ -117,6 +123,10 @@ private List getArgumentResolvers(final ArgumentResolv return message -> (Acknowledge) () -> messageResolver.resolveMessage(message); } + if (isVisibilityExtenderParameter(parameter)) { + return message -> new DefaultVisibilityExtender(sqsAsyncClient, queueProperties, message); + } + final ArgumentResolver argumentResolver = argumentResolverService.getArgumentResolver(methodParameter); return message -> argumentResolver.resolveArgumentForParameter(queueProperties, methodParameter, message); }) @@ -132,6 +142,10 @@ private static boolean isAcknowledgeParameter(final Parameter parameter) { return Acknowledge.class.isAssignableFrom(parameter.getType()); } + private static boolean isVisibilityExtenderParameter(final Parameter parameter) { + return VisibilityExtender.class.isAssignableFrom(parameter.getType()); + } + /** * Internal resolver for resolving the argument given the message. */ diff --git a/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/CoreArgumentResolverServiceTest.java b/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/CoreArgumentResolverServiceTest.java index f6b0cbc9..bcd0d733 100644 --- a/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/CoreArgumentResolverServiceTest.java +++ b/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/CoreArgumentResolverServiceTest.java @@ -17,10 +17,8 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; @@ -35,14 +33,11 @@ public class CoreArgumentResolverServiceTest { private PayloadMapper payloadMapper = new JacksonPayloadMapper(objectMapper); - @Mock - private SqsAsyncClient sqsAsyncClient; - private CoreArgumentResolverService service; @Before public void setUp() { - service = new CoreArgumentResolverService(payloadMapper, sqsAsyncClient, objectMapper); + service = new CoreArgumentResolverService(payloadMapper, objectMapper); } @Test diff --git a/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/visibility/DefaultVisibilityExtenderTest.java b/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/visibility/DefaultVisibilityExtenderTest.java index 708b3f74..d65eb070 100644 --- a/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/visibility/DefaultVisibilityExtenderTest.java +++ b/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/visibility/DefaultVisibilityExtenderTest.java @@ -1,6 +1,6 @@ package com.jashmore.sqs.argument.visibility; -import static com.jashmore.sqs.argument.visibility.VisibilityExtender.DEFAULT_VISIBILITY_EXTENSION_IN_SECONDS; +import static com.jashmore.sqs.processor.argument.VisibilityExtender.DEFAULT_VISIBILITY_EXTENSION_IN_SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; diff --git a/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/visibility/VisibilityExtenderArgumentResolverTest.java b/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/visibility/VisibilityExtenderArgumentResolverTest.java deleted file mode 100644 index 4c48558e..00000000 --- a/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/visibility/VisibilityExtenderArgumentResolverTest.java +++ /dev/null @@ -1,92 +0,0 @@ -package com.jashmore.sqs.argument.visibility; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.jashmore.sqs.QueueProperties; -import com.jashmore.sqs.argument.DefaultMethodParameter; -import com.jashmore.sqs.argument.MethodParameter; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.model.Message; - -import java.lang.reflect.Method; -import java.lang.reflect.Parameter; - -public class VisibilityExtenderArgumentResolverTest { - @Rule - public MockitoRule mockitoRule = MockitoJUnit.rule(); - - @Mock - private SqsAsyncClient sqsAsyncClient; - - @Mock - private QueueProperties queueProperties; - - private final Message message = Message.builder().build(); - - private VisibilityExtenderArgumentResolver visibilityExtenderArgumentResolver; - - @Before - public void setUp() { - visibilityExtenderArgumentResolver = new VisibilityExtenderArgumentResolver(sqsAsyncClient); - } - - @Test - public void canResolveParametersWithVisibilityExtenderType() { - // arrange - final MethodParameter parameter = getParameter(0); - - // act - final boolean canResolveParameter = visibilityExtenderArgumentResolver.canResolveParameter(parameter); - - // assert - assertThat(canResolveParameter).isTrue(); - } - - @Test - public void canNotResolveParametersThatIsNotAVisibilityExtenderType() { - // arrange - final MethodParameter parameter = getParameter(1); - - // act - final boolean canResolveParameter = visibilityExtenderArgumentResolver.canResolveParameter(parameter); - - // assert - assertThat(canResolveParameter).isFalse(); - } - - @Test - public void resolvingParameterReturnsVisibilityExtenderObject() { - // arrange - final MethodParameter parameter = getParameter(0); - - // act - final Object resolvedArgument = visibilityExtenderArgumentResolver.resolveArgumentForParameter(queueProperties, parameter, message); - - // assert - assertThat(resolvedArgument).isInstanceOf(VisibilityExtender.class); - } - - @SuppressWarnings( {"WeakerAccess", "unused"}) - public void method(final VisibilityExtender visibilityExtender, final String string) { - - } - - private MethodParameter getParameter(final int index) { - try { - final Method method = VisibilityExtenderArgumentResolverTest.class.getMethod("method", VisibilityExtender.class, String.class); - return DefaultMethodParameter.builder() - .method(method) - .parameter(method.getParameters()[index]) - .parameterIndex(index) - .build(); - } catch (final NoSuchMethodException exception) { - throw new RuntimeException("Unable to find method for testing against", exception); - } - } -} diff --git a/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/processor/DefaultMessageProcessorTest.java b/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/processor/DefaultMessageProcessorTest.java index 6201af16..2e00c71c 100644 --- a/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/processor/DefaultMessageProcessorTest.java +++ b/java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/processor/DefaultMessageProcessorTest.java @@ -27,6 +27,7 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.Message; import java.lang.reflect.Method; @@ -54,6 +55,9 @@ public class DefaultMessageProcessorTest { @Mock private MessageResolver messageResolver; + @Mock + private SqsAsyncClient sqsAsyncClient; + @Mock private ArgumentResolver mockArgumentResolver; @@ -66,7 +70,8 @@ public void forEachParameterInMethodTheArgumentIsResolved() { final Method method = getMethodWithAcknowledge(); final Message message = Message.builder().build(); doReturn(mockArgumentResolver).when(argumentResolverService).getArgumentResolver(any(MethodParameter.class)); - final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN); + final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, + sqsAsyncClient, messageResolver, method, BEAN); // act processor.processMessage(message); @@ -84,7 +89,7 @@ public void anyParameterUnableToBeResolvedWillThrowAnError() { expectedException.expect(UnsupportedArgumentResolutionException.class); // act - new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN); + new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, sqsAsyncClient, messageResolver, method, BEAN); } @Test @@ -97,7 +102,8 @@ public void methodWillBeInvokedWithArgumentsResolved() { when(mockArgumentResolver.resolveArgumentForParameter(eq(QUEUE_PROPERTIES), any(), eq(message))) .thenReturn("payload") .thenReturn("payload2"); - final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, mockProcessor); + final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, + sqsAsyncClient, messageResolver, method, mockProcessor); // act processor.processMessage(message); @@ -114,7 +120,8 @@ public void methodWithAcknowledgeParameterWillNotDeleteMessageOnSuccess() { doReturn(mockArgumentResolver).when(argumentResolverService).getArgumentResolver(any(MethodParameter.class)); when(mockArgumentResolver.resolveArgumentForParameter(eq(QUEUE_PROPERTIES), any(), eq(message))) .thenReturn("payload"); - final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN); + final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, + sqsAsyncClient, messageResolver, method, BEAN); // act processor.processMessage(message); @@ -131,7 +138,8 @@ public void methodWithoutAcknowledgeParameterWillDeleteMessageOnSuccess() { doReturn(mockArgumentResolver).when(argumentResolverService).getArgumentResolver(any(MethodParameter.class)); when(mockArgumentResolver.resolveArgumentForParameter(eq(QUEUE_PROPERTIES), any(), eq(message))) .thenReturn("payload"); - final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN); + final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, + sqsAsyncClient, messageResolver, method, BEAN); // act processor.processMessage(message); @@ -149,7 +157,8 @@ public void methodWithoutAcknowledgeThatThrowsExceptionDoesNotDeleteMessage() { doReturn(mockArgumentResolver).when(argumentResolverService).getArgumentResolver(any(MethodParameter.class)); when(mockArgumentResolver.resolveArgumentForParameter(eq(QUEUE_PROPERTIES), any(), eq(message))) .thenReturn("payload"); - final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN); + final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, + sqsAsyncClient, messageResolver, method, BEAN); // act try { @@ -170,7 +179,8 @@ public void methodReturningCompletableFutureWillResolveMessageWhenFutureResolved final CompletableFuture future = new CompletableFuture<>(); when(completableFutureArgumentResolver.resolveArgumentForParameter(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message))) .thenReturn(future); - final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN); + final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, + sqsAsyncClient, messageResolver, method, BEAN); // act CompletableFuture.runAsync(() -> processor.processMessage(message)); @@ -190,7 +200,8 @@ public void methodReturningCompletableFutureWillNotResolveMessageWhenFutureRejec final CompletableFuture future = new CompletableFuture<>(); when(completableFutureArgumentResolver.resolveArgumentForParameter(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message))) .thenReturn(future); - final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN); + final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, + sqsAsyncClient, messageResolver, method, BEAN); // act CompletableFuture.runAsync(() -> processor.processMessage(message)); @@ -209,7 +220,8 @@ public void methodReturningCompletableFutureThatReturnsNullWillThrowMessageProce doReturn(completableFutureArgumentResolver).when(argumentResolverService).getArgumentResolver(any()); when(completableFutureArgumentResolver.resolveArgumentForParameter(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message))) .thenReturn(null); - final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN); + final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, + sqsAsyncClient, messageResolver, method, BEAN); expectedException.expect(MessageProcessingException.class); // act @@ -225,7 +237,8 @@ public void threadInterruptedWhileGettingMessageWillThrowException() throws Exce final CompletableFuture future = new CompletableFuture<>(); when(completableFutureArgumentResolver.resolveArgumentForParameter(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message))) .thenReturn(future); - final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN); + final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, + sqsAsyncClient, messageResolver, method, BEAN); // act final AtomicBoolean exceptionThrown = new AtomicBoolean(false); diff --git a/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/MessageArgumentResolutionIntegrationTest.java b/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/MessageArgumentResolutionIntegrationTest.java index 16d8baf5..b83e89ed 100644 --- a/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/MessageArgumentResolutionIntegrationTest.java +++ b/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/MessageArgumentResolutionIntegrationTest.java @@ -35,19 +35,18 @@ public class MessageArgumentResolutionIntegrationTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final PayloadMapper PAYLOAD_MAPPER = new JacksonPayloadMapper(OBJECT_MAPPER); + private static final ArgumentResolverService ARGUMENT_RESOLVER_SERVICE = new CoreArgumentResolverService(PAYLOAD_MAPPER, OBJECT_MAPPER); @Rule public LocalSqsRule localSqsRule = new LocalSqsRule(); private String queueUrl; private QueueProperties queueProperties; - private ArgumentResolverService argumentResolverService; @Before public void setUp() { queueUrl = localSqsRule.createRandomQueue(); queueProperties = QueueProperties.builder().queueUrl(queueUrl).build(); - argumentResolverService = new CoreArgumentResolverService(PAYLOAD_MAPPER, localSqsRule.getLocalAmazonSqsAsync(), OBJECT_MAPPER); } @Test @@ -66,8 +65,9 @@ public void messageAttributesCanBeConsumedInMessageProcessingMethods() throws Ex final MessageConsumer messageConsumer = new MessageConsumer(messageProcessedLatch, messageAttributeReference); final MessageResolver messageResolver = new BlockingMessageResolver(new IndividualMessageResolver(queueProperties, sqsAsyncClient)); final MessageProcessor messageProcessor = new DefaultMessageProcessor( - argumentResolverService, + ARGUMENT_RESOLVER_SERVICE, queueProperties, + sqsAsyncClient, messageResolver, MessageConsumer.class.getMethod("consume", Message.class), messageConsumer diff --git a/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/MessageAttributeIntegrationTest.java b/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/MessageAttributeIntegrationTest.java index 20d97d44..dc41bf40 100644 --- a/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/MessageAttributeIntegrationTest.java +++ b/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/MessageAttributeIntegrationTest.java @@ -41,19 +41,18 @@ public class MessageAttributeIntegrationTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final PayloadMapper PAYLOAD_MAPPER = new JacksonPayloadMapper(OBJECT_MAPPER); + private static final ArgumentResolverService ARGUMENT_RESOLVER_SERVICE = new CoreArgumentResolverService(PAYLOAD_MAPPER, OBJECT_MAPPER); @Rule public LocalSqsRule localSqsRule = new LocalSqsRule(); private String queueUrl; private QueueProperties queueProperties; - private ArgumentResolverService argumentResolverService; @Before public void setUp() { queueUrl = localSqsRule.createRandomQueue(); queueProperties = QueueProperties.builder().queueUrl(queueUrl).build(); - argumentResolverService = new CoreArgumentResolverService(PAYLOAD_MAPPER, localSqsRule.getLocalAmazonSqsAsync(), OBJECT_MAPPER); } @Test @@ -72,8 +71,9 @@ public void messageAttributesCanBeConsumedInMessageProcessingMethods() throws Ex final MessageConsumer messageConsumer = new MessageConsumer(messageProcessedLatch, messageAttributeReference); final MessageResolver messageResolver = new BlockingMessageResolver(new IndividualMessageResolver(queueProperties, sqsAsyncClient)); final MessageProcessor messageProcessor = new DefaultMessageProcessor( - argumentResolverService, + ARGUMENT_RESOLVER_SERVICE, queueProperties, + sqsAsyncClient, messageResolver, MessageConsumer.class.getMethod("consume", String.class), messageConsumer diff --git a/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/MessageSystemAttributeIntegrationTest.java b/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/MessageSystemAttributeIntegrationTest.java index 383607ed..4464c59e 100644 --- a/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/MessageSystemAttributeIntegrationTest.java +++ b/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/MessageSystemAttributeIntegrationTest.java @@ -41,19 +41,18 @@ public class MessageSystemAttributeIntegrationTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final PayloadMapper PAYLOAD_MAPPER = new JacksonPayloadMapper(OBJECT_MAPPER); + private static final ArgumentResolverService ARGUMENT_RESOLVER_SERVICE = new CoreArgumentResolverService(PAYLOAD_MAPPER, OBJECT_MAPPER); @Rule public LocalSqsRule localSqsRule = new LocalSqsRule(); private String queueUrl; private QueueProperties queueProperties; - private ArgumentResolverService argumentResolverService; @Before public void setUp() { queueUrl = localSqsRule.createRandomQueue(); queueProperties = QueueProperties.builder().queueUrl(queueUrl).build(); - argumentResolverService = new CoreArgumentResolverService(PAYLOAD_MAPPER, localSqsRule.getLocalAmazonSqsAsync(), OBJECT_MAPPER); } @Test @@ -72,8 +71,9 @@ public void messageAttributesCanBeConsumedInMessageProcessingMethods() throws Ex final MessageConsumer messageConsumer = new MessageConsumer(messageProcessedLatch, messageAttributeReference); final MessageResolver messageResolver = new BlockingMessageResolver(new IndividualMessageResolver(queueProperties, sqsAsyncClient)); final MessageProcessor messageProcessor = new DefaultMessageProcessor( - argumentResolverService, + ARGUMENT_RESOLVER_SERVICE, queueProperties, + sqsAsyncClient, messageResolver, MessageConsumer.class.getMethod("consume", OffsetDateTime.class), messageConsumer diff --git a/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/VisibilityExtenderIntegrationTest.java b/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/VisibilityExtenderIntegrationTest.java new file mode 100644 index 00000000..0e58c434 --- /dev/null +++ b/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/argument/VisibilityExtenderIntegrationTest.java @@ -0,0 +1,127 @@ +package it.com.jashmore.sqs.argument; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableList; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jashmore.sqs.QueueProperties; +import com.jashmore.sqs.argument.ArgumentResolverService; +import com.jashmore.sqs.argument.CoreArgumentResolverService; +import com.jashmore.sqs.argument.payload.mapper.JacksonPayloadMapper; +import com.jashmore.sqs.argument.payload.mapper.PayloadMapper; +import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker; +import com.jashmore.sqs.broker.concurrent.StaticConcurrentMessageBrokerProperties; +import com.jashmore.sqs.container.SimpleMessageListenerContainer; +import com.jashmore.sqs.processor.DefaultMessageProcessor; +import com.jashmore.sqs.processor.MessageProcessor; +import com.jashmore.sqs.processor.argument.VisibilityExtender; +import com.jashmore.sqs.resolver.MessageResolver; +import com.jashmore.sqs.resolver.blocking.BlockingMessageResolver; +import com.jashmore.sqs.resolver.individual.IndividualMessageResolver; +import com.jashmore.sqs.retriever.MessageRetriever; +import com.jashmore.sqs.retriever.individual.IndividualMessageRetriever; +import com.jashmore.sqs.retriever.individual.IndividualMessageRetrieverProperties; +import com.jashmore.sqs.test.LocalSqsRule; +import com.jashmore.sqs.util.SqsQueuesConfig; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +public class VisibilityExtenderIntegrationTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final PayloadMapper PAYLOAD_MAPPER = new JacksonPayloadMapper(OBJECT_MAPPER); + private static final ArgumentResolverService ARGUMENT_RESOLVER_SERVICE = new CoreArgumentResolverService(PAYLOAD_MAPPER, OBJECT_MAPPER); + private static final int ORIGINAL_MESSAGE_VISIBILITY = 5; + + @Rule + public LocalSqsRule localSqsRule = new LocalSqsRule(ImmutableList.of( + SqsQueuesConfig.QueueConfig.builder() + .queueName("VisibilityExtenderIntegrationTest") + .visibilityTimeout(ORIGINAL_MESSAGE_VISIBILITY) + .maxReceiveCount(2) // make sure it will try multiple times + .build() + )); + + private String queueUrl; + private QueueProperties queueProperties; + + @Before + public void setUp() { + queueUrl = localSqsRule.createRandomQueue(); + queueProperties = QueueProperties.builder().queueUrl(queueUrl).build(); + } + + @Test + public void messageAttributesCanBeConsumedInMessageProcessingMethods() throws Exception { + // arrange + final SqsAsyncClient sqsAsyncClient = localSqsRule.getLocalAmazonSqsAsync(); + final MessageRetriever messageRetriever = new IndividualMessageRetriever( + sqsAsyncClient, + queueProperties, + IndividualMessageRetrieverProperties.builder() + .visibilityTimeoutForMessagesInSeconds(ORIGINAL_MESSAGE_VISIBILITY) + .build() + ); + final CountDownLatch messageProcessedLatch = new CountDownLatch(1); + final AtomicInteger numberTimesMessageProcesed = new AtomicInteger(0); + final MessageConsumer messageConsumer = new MessageConsumer(messageProcessedLatch, numberTimesMessageProcesed); + final MessageResolver messageResolver = new BlockingMessageResolver(new IndividualMessageResolver(queueProperties, sqsAsyncClient)); + final MessageProcessor messageProcessor = new DefaultMessageProcessor( + ARGUMENT_RESOLVER_SERVICE, + queueProperties, + sqsAsyncClient, + messageResolver, + MessageConsumer.class.getMethod("consume", VisibilityExtender.class), + messageConsumer + ); + final ConcurrentMessageBroker messageBroker = new ConcurrentMessageBroker( + messageRetriever, + messageProcessor, + StaticConcurrentMessageBrokerProperties.builder() + .concurrencyLevel(2) + .build() + ); + final SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer( + messageRetriever, messageBroker, messageResolver + ); + simpleMessageListenerContainer.start(); + + // act + sqsAsyncClient.sendMessage(SendMessageRequest.builder() + .queueUrl(queueUrl) + .messageBody("test") + .build()) + .get(2, SECONDS); + + assertThat(messageProcessedLatch.await(ORIGINAL_MESSAGE_VISIBILITY * 3, SECONDS)).isTrue(); + simpleMessageListenerContainer.stop(); + + // assert + assertThat(numberTimesMessageProcesed).hasValue(1); + } + + @AllArgsConstructor + public static class MessageConsumer { + private final CountDownLatch latch; + private final AtomicInteger numberTimesEntered; + + @SuppressWarnings("WeakerAccess") + public void consume(VisibilityExtender visibilityExtender) throws Exception { + numberTimesEntered.incrementAndGet(); + // Extend it past what the current available visibility is + visibilityExtender.extend(3 * ORIGINAL_MESSAGE_VISIBILITY).get(); + Thread.sleep(2 * ORIGINAL_MESSAGE_VISIBILITY * 1000); + latch.countDown(); + } + } +} diff --git a/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/listener/concurrent/ConcurrentMessageBrokerIntegrationTest.java b/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/listener/concurrent/ConcurrentMessageBrokerIntegrationTest.java index 270fdab8..1bfbe87e 100644 --- a/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/listener/concurrent/ConcurrentMessageBrokerIntegrationTest.java +++ b/java-dynamic-sqs-listener-core/src/test/java/it/com/jashmore/sqs/listener/concurrent/ConcurrentMessageBrokerIntegrationTest.java @@ -20,6 +20,8 @@ import com.jashmore.sqs.resolver.individual.IndividualMessageResolver; import com.jashmore.sqs.retriever.AsyncMessageRetriever; import com.jashmore.sqs.retriever.MessageRetriever; +import com.jashmore.sqs.retriever.batching.BatchingMessageRetriever; +import com.jashmore.sqs.retriever.batching.StaticBatchingMessageRetrieverProperties; import com.jashmore.sqs.retriever.individual.IndividualMessageRetriever; import com.jashmore.sqs.retriever.individual.IndividualMessageRetrieverProperties; import com.jashmore.sqs.retriever.prefetch.PrefetchingMessageRetriever; @@ -50,7 +52,7 @@ public class ConcurrentMessageBrokerIntegrationTest { public void setUp() { queueUrl = localSqsRule.createRandomQueue(); queueProperties = QueueProperties.builder().queueUrl(queueUrl).build(); - argumentResolverService = new CoreArgumentResolverService(PAYLOAD_MAPPER, localSqsRule.getLocalAmazonSqsAsync(), OBJECT_MAPPER); + argumentResolverService = new CoreArgumentResolverService(PAYLOAD_MAPPER, OBJECT_MAPPER); } @Test(timeout = 30_000) @@ -72,6 +74,7 @@ public void allMessagesSentIntoQueueAreProcessed() throws Exception { final MessageProcessor messageProcessor = new DefaultMessageProcessor( argumentResolverService, queueProperties, + sqsAsyncClient, messageResolver, MessageConsumer.class.getMethod("consume", String.class), messageConsumer @@ -122,6 +125,58 @@ public void usingPrefetchingMessageRetrieverCanConsumeAllMessages() throws Excep final MessageProcessor messageProcessor = new DefaultMessageProcessor( argumentResolverService, queueProperties, + sqsAsyncClient, + messageResolver, + MessageConsumer.class.getMethod("consume", String.class), + messageConsumer + ); + final ConcurrentMessageBroker messageBroker = new ConcurrentMessageBroker( + messageRetriever, + messageProcessor, + StaticConcurrentMessageBrokerProperties.builder() + .concurrencyLevel(concurrencyLevel) + .build() + ); + final SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer( + messageRetriever, messageBroker, messageResolver + ); + + SqsIntegrationTestUtils.sendNumberOfMessages(numberOfMessages, sqsAsyncClient, queueUrl); + + // act + simpleMessageListenerContainer.start(); + + // assert + messageReceivedLatch.await(1, MINUTES); + + // cleanup + simpleMessageListenerContainer.stop(); + SqsIntegrationTestUtils.assertNoMessagesInQueue(sqsAsyncClient, queueUrl); + } + + + @Test(timeout = 30_000) + public void usingBatchingMessageRetrieverCanConsumeAllMessages() throws Exception { + // arrange + final int concurrencyLevel = 10; + final int numberOfMessages = 100; + final SqsAsyncClient sqsAsyncClient = localSqsRule.getLocalAmazonSqsAsync(); + final AsyncMessageRetriever messageRetriever = new BatchingMessageRetriever( + queueProperties, + sqsAsyncClient, + StaticBatchingMessageRetrieverProperties.builder() + .numberOfThreadsWaitingTrigger(10) + .messageRetrievalPollingPeriodInMs(3000L) + .visibilityTimeoutInSeconds(60) + .build() + ); + final CountDownLatch messageReceivedLatch = new CountDownLatch(numberOfMessages); + final MessageConsumer messageConsumer = new MessageConsumer(messageReceivedLatch); + final MessageResolver messageResolver = new BlockingMessageResolver(new IndividualMessageResolver(queueProperties, sqsAsyncClient)); + final MessageProcessor messageProcessor = new DefaultMessageProcessor( + argumentResolverService, + queueProperties, + sqsAsyncClient, messageResolver, MessageConsumer.class.getMethod("consume", String.class), messageConsumer diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/QueueWrapperInitialisationException.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/QueueWrapperInitialisationException.java new file mode 100644 index 00000000..089025af --- /dev/null +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/QueueWrapperInitialisationException.java @@ -0,0 +1,10 @@ +package com.jashmore.sqs.spring; + +/** + * Exception that is thrown when their is a known error wrapping a queue listener. + */ +public class QueueWrapperInitialisationException extends RuntimeException { + public QueueWrapperInitialisationException(final String message) { + super(message); + } +} diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/client/SqsAsyncClientProvider.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/client/SqsAsyncClientProvider.java new file mode 100644 index 00000000..b414271f --- /dev/null +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/client/SqsAsyncClientProvider.java @@ -0,0 +1,36 @@ +package com.jashmore.sqs.spring.client; + +import software.amazon.awssdk.services.sqs.SqsAsyncClient; + +import java.util.Optional; + +/** + * Used to provide a specific {@link SqsAsyncClient} that can be used to communicate with the SQS Queue. + * + *

    As the framework allows for multiple {@link SqsAsyncClient}s to be used, due to the possibility of queues existing across multiple accounts, this provides + * the ability to identify certain clients for usage by certain queues. For example, you may have two AWS Accounts (accountOne, accountTwo) which SQS queues in + * each. The specific Queue Listeners will need to listen to a SQS Queue on each account and therefore a single {@link SqsAsyncClient} should be provided + * for the entire application. + */ +public interface SqsAsyncClientProvider { + /** + * The default {@link SqsAsyncClient} that can be used if there is no identifier provided. + * + *

    This is more useful for the case that you only have a single AWS Account and don't need to identify it. + * + * @return the default client if it exists + */ + Optional getDefaultClient(); + + /** + * Get the specific client that has the provided identifier. + * + *

    This map is useful when you have those duplicate accounts and certain {@link SqsAsyncClient}s need to be used on certain queues. + * + *

    If an identifier is provided that does not match a known {@link SqsAsyncClient}, an {@link Optional#empty()} is returned. + * + * @param clientIdentifier the identifier of the client to get + * @return the client if it exists, an {@link Optional#empty()} otherwise + */ + Optional getClient(String clientIdentifier); +} diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/queue/QueueResolverService.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/queue/QueueResolverService.java index 232c3101..4568d757 100644 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/queue/QueueResolverService.java +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/queue/QueueResolverService.java @@ -1,5 +1,7 @@ package com.jashmore.sqs.spring.queue; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; + /** * Service that is injected into the applications dependency injection framework that can be used to resolve parameterised strings to a queue url if there is * a queue that exists. @@ -19,9 +21,10 @@ public interface QueueResolverService { *

  • http://localhost:4576/q/myQueueName passed in will be returned as is
  • * * + * @param client the client that can be used to get information about the queue * @param queueNameOrUrl queueName or queueUrl that may have parameterised placeholders in it. * @return the resolved url of the queue if it exists * @throws QueueResolutionException if there was an error resolving the queue URL */ - String resolveQueueUrl(String queueNameOrUrl) throws QueueResolutionException; + String resolveQueueUrl(SqsAsyncClient client, String queueNameOrUrl) throws QueueResolutionException; } diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/client/DefaultSqsAsyncClientProvider.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/client/DefaultSqsAsyncClientProvider.java new file mode 100644 index 00000000..71d39433 --- /dev/null +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/client/DefaultSqsAsyncClientProvider.java @@ -0,0 +1,43 @@ +package com.jashmore.sqs.spring.client; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +import software.amazon.awssdk.services.sqs.SqsAsyncClient; + +import java.util.Map; +import java.util.Optional; + +/** + * Default implementation that stores the default {@link SqsAsyncClient}, if it exists, and the map of available clients. + */ +public class DefaultSqsAsyncClientProvider implements SqsAsyncClientProvider { + private final SqsAsyncClient defaultClient; + private final Map clientMap; + + public DefaultSqsAsyncClientProvider(final SqsAsyncClient defaultClient) { + this.defaultClient = defaultClient; + this.clientMap = ImmutableMap.of(); + } + + public DefaultSqsAsyncClientProvider(final Map clientMap) { + this(null, clientMap); + } + + public DefaultSqsAsyncClientProvider(final SqsAsyncClient defaultClient, final Map clientMap) { + Preconditions.checkNotNull(clientMap, "clientMap should not be null"); + + this.defaultClient = defaultClient; + this.clientMap = ImmutableMap.copyOf(clientMap); + } + + @Override + public Optional getDefaultClient() { + return Optional.ofNullable(defaultClient); + } + + @Override + public Optional getClient(final String clientIdentifier) { + return Optional.ofNullable(clientMap.get(clientIdentifier)); + } +} diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/config/QueueListenerConfiguration.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/config/QueueListenerConfiguration.java index bf962bb8..457ba2e3 100644 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/config/QueueListenerConfiguration.java +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/config/QueueListenerConfiguration.java @@ -1,5 +1,7 @@ package com.jashmore.sqs.spring.config; +import com.google.common.collect.ImmutableMap; + import com.fasterxml.jackson.databind.ObjectMapper; import com.jashmore.sqs.argument.ArgumentResolver; import com.jashmore.sqs.argument.ArgumentResolverService; @@ -10,13 +12,13 @@ import com.jashmore.sqs.argument.messageid.MessageIdArgumentResolver; import com.jashmore.sqs.argument.payload.PayloadArgumentResolver; import com.jashmore.sqs.argument.payload.mapper.JacksonPayloadMapper; -import com.jashmore.sqs.argument.visibility.VisibilityExtenderArgumentResolver; import com.jashmore.sqs.container.MessageListenerContainer; import com.jashmore.sqs.spring.DefaultQueueContainerService; import com.jashmore.sqs.spring.QueueContainerService; import com.jashmore.sqs.spring.QueueWrapper; +import com.jashmore.sqs.spring.client.DefaultSqsAsyncClientProvider; +import com.jashmore.sqs.spring.client.SqsAsyncClientProvider; import com.jashmore.sqs.spring.container.basic.QueueListenerWrapper; -import com.jashmore.sqs.spring.container.batching.BatchingQueueListenerWrapper; import com.jashmore.sqs.spring.container.prefetch.PrefetchingQueueListenerWrapper; import com.jashmore.sqs.spring.queue.DefaultQueueResolverService; import com.jashmore.sqs.spring.queue.QueueResolverService; @@ -50,11 +52,30 @@ public class QueueListenerConfiguration { * @see SqsAsyncClient#create() for more details about how to use this default client */ @Bean(destroyMethod = "close") - @ConditionalOnMissingBean(SqsAsyncClient.class) + @ConditionalOnMissingBean({SqsAsyncClient.class, SqsAsyncClientProvider.class}) public SqsAsyncClient sqsAsyncClient() { return SqsAsyncClient.create(); } + /** + * Provides the {@link SqsAsyncClientProvider} which is used to provide the relevant {@link SqsAsyncClient} as there could be multiple AWS + * Accounts/Credentials being used. + * + *

    When a user provides their own bean of this class they provide all of the {@link SqsAsyncClient}s that will be used, such as defining their + * own default {@link SqsAsyncClient} and all other identifier clients, see {@link SqsAsyncClientProvider#getClient(String)}. + * + *

    The user may define their own {@link SqsAsyncClient} which will be used instead of the one provided by {@link #sqsAsyncClient()} if they only + * want to use the default client and don't want to be able to pick one of multiple clients. + * + * @param defaultClient the default client + * @return the provider for obtains {@link SqsAsyncClient}s, in this case only the default client + */ + @Bean + @ConditionalOnMissingBean({SqsAsyncClientProvider.class}) + public SqsAsyncClientProvider sqsAsyncClientProvider(final SqsAsyncClient defaultClient) { + return new DefaultSqsAsyncClientProvider(defaultClient, ImmutableMap.of()); + } + /** * Contains all of the configuration that needs to supplied if there is no {@link ArgumentResolverService} class defined. * @@ -113,11 +134,6 @@ public MessageIdArgumentResolver messageIdArgumentResolver() { return new MessageIdArgumentResolver(); } - @Bean - public VisibilityExtenderArgumentResolver visibilityExtenderArgumentResolver(final SqsAsyncClient sqsAsyncClient) { - return new VisibilityExtenderArgumentResolver(sqsAsyncClient); - } - @Bean public MessageSystemAttributeArgumentResolver messageSystemAttributeArgumentResolver() { return new MessageSystemAttributeArgumentResolver(); @@ -137,16 +153,15 @@ public MessageArgumentResolver messageArgumentResolver() { } /** - * The default provided {@link QueueResolverService} that can be used if it not overriden by a user defined bean. + * The default provided {@link QueueResolverService} that can be used if it not overridden by a user defined bean. * - * @param sqsAsyncClient client to communicate with the SQS queues * @param environment the environment for this spring application * @return the default service used for queue resolution */ @Bean @ConditionalOnMissingBean(QueueResolverService.class) - public QueueResolverService queueResolverService(final SqsAsyncClient sqsAsyncClient, final Environment environment) { - return new DefaultQueueResolverService(sqsAsyncClient, environment); + public QueueResolverService queueResolverService(final Environment environment) { + return new DefaultQueueResolverService(environment); } /** @@ -183,26 +198,18 @@ public QueueContainerService queueContainerService(final List queu public static class QueueWrapperConfiguration { @Bean public QueueWrapper coreProvidedQueueListenerWrapper(final ArgumentResolverService argumentResolverService, - final SqsAsyncClient sqsAsyncClient, + final SqsAsyncClientProvider sqsAsyncClientProvider, final QueueResolverService queueResolverService, final Environment environment) { - return new QueueListenerWrapper(argumentResolverService, sqsAsyncClient, queueResolverService, environment); + return new QueueListenerWrapper(argumentResolverService, sqsAsyncClientProvider, queueResolverService, environment); } @Bean public QueueWrapper coreProvidedPrefetchingQueueListenerWrapper(final ArgumentResolverService argumentResolverService, - final SqsAsyncClient sqsAsyncClient, + final SqsAsyncClientProvider sqsAsyncClientProvider, final QueueResolverService queueResolverService, final Environment environment) { - return new PrefetchingQueueListenerWrapper(argumentResolverService, sqsAsyncClient, queueResolverService, environment); - } - - @Bean - public QueueWrapper coreProvidedBatchingQueueListenerWrapper(final ArgumentResolverService argumentResolverService, - final SqsAsyncClient sqsAsyncClient, - final QueueResolverService queueResolverService, - final Environment environment) { - return new BatchingQueueListenerWrapper(argumentResolverService, sqsAsyncClient, queueResolverService, environment); + return new PrefetchingQueueListenerWrapper(argumentResolverService, sqsAsyncClientProvider, queueResolverService, environment); } } } diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java index f603f5cf..2f83c210 100644 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java @@ -12,7 +12,9 @@ import com.jashmore.sqs.processor.DefaultMessageProcessor; import com.jashmore.sqs.retriever.batching.BatchingMessageRetriever; import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties; +import com.jashmore.sqs.spring.client.SqsAsyncClientProvider; import org.springframework.core.env.Environment; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; import java.lang.annotation.Retention; import java.lang.annotation.Target; @@ -61,6 +63,19 @@ */ String identifier() default ""; + /** + * The unique identifier for the {@link SqsAsyncClient} that should be used for this queue. + * + *

    As queues can be set up across multiple AWS Accounts there can be multiple {@link SqsAsyncClient}s being + * provided by the {@link SqsAsyncClientProvider}. When this identifier is set, it will obtain the client to be used + * via the {@link SqsAsyncClientProvider#getClient(String)} method. + * + *

    If this value is not set (empty), the default client will be provided by a call to {@link SqsAsyncClientProvider#getDefaultClient()}. + * + * @return the identifier for the client to use or empty if the default should be used + */ + String sqsClient() default ""; + /** * The number of threads that will be processing messages. * diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapper.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapper.java index 566dc009..6aba8ab7 100644 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapper.java +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapper.java @@ -10,7 +10,9 @@ import com.jashmore.sqs.processor.DefaultMessageProcessor; import com.jashmore.sqs.processor.MessageProcessor; import com.jashmore.sqs.resolver.MessageResolver; -import com.jashmore.sqs.resolver.individual.IndividualMessageResolver; +import com.jashmore.sqs.resolver.batching.BatchingMessageResolver; +import com.jashmore.sqs.resolver.batching.BatchingMessageResolverProperties; +import com.jashmore.sqs.resolver.batching.StaticBatchingMessageResolverProperties; import com.jashmore.sqs.retriever.MessageRetriever; import com.jashmore.sqs.retriever.batching.BatchingMessageRetriever; import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties; @@ -18,6 +20,8 @@ import com.jashmore.sqs.spring.AbstractQueueAnnotationWrapper; import com.jashmore.sqs.spring.IdentifiableMessageListenerContainer; import com.jashmore.sqs.spring.QueueWrapper; +import com.jashmore.sqs.spring.QueueWrapperInitialisationException; +import com.jashmore.sqs.spring.client.SqsAsyncClientProvider; import com.jashmore.sqs.spring.queue.QueueResolverService; import com.jashmore.sqs.spring.util.IdentifierUtils; import lombok.AllArgsConstructor; @@ -36,7 +40,7 @@ @AllArgsConstructor public class QueueListenerWrapper extends AbstractQueueAnnotationWrapper { private final ArgumentResolverService argumentResolverService; - private final SqsAsyncClient sqsAsyncClient; + private final SqsAsyncClientProvider sqsAsyncClientProvider; private final QueueResolverService queueResolverService; private final Environment environment; @@ -47,18 +51,19 @@ protected Class getAnnotationClass() { @Override protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(final Object bean, final Method method, final QueueListener annotation) { - final QueueProperties queueProperties = QueueProperties - .builder() - .queueUrl(queueResolverService.resolveQueueUrl(annotation.value())) + final SqsAsyncClient sqsAsyncClient = getSqsAsyncClient(annotation.sqsClient()); + + final QueueProperties queueProperties = QueueProperties.builder() + .queueUrl(queueResolverService.resolveQueueUrl(sqsAsyncClient, annotation.value())) .build(); final int concurrencyLevel = getConcurrencyLevel(annotation); - final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties); + final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties, sqsAsyncClient); - final MessageResolver messageResolver = new IndividualMessageResolver(queueProperties, sqsAsyncClient); + final MessageResolver messageResolver = buildMessageResolver(annotation, queueProperties, sqsAsyncClient); - final MessageProcessor messageProcessor = new DefaultMessageProcessor(argumentResolverService, queueProperties, + final MessageProcessor messageProcessor = new DefaultMessageProcessor(argumentResolverService, queueProperties, sqsAsyncClient, messageResolver, method, bean); final String identifier; @@ -83,9 +88,10 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi .build(); } - private MessageRetriever buildMessageRetriever(final QueueListener annotation, final QueueProperties queueProperties) { - return new BatchingMessageRetriever( - queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties(annotation)); + private MessageRetriever buildMessageRetriever(final QueueListener annotation, + final QueueProperties queueProperties, + final SqsAsyncClient sqsAsyncClient) { + return new BatchingMessageRetriever(queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties(annotation)); } @VisibleForTesting @@ -97,6 +103,22 @@ BatchingMessageRetrieverProperties batchingMessageRetrieverProperties(final Queu .build(); } + private MessageResolver buildMessageResolver(final QueueListener annotation, + final QueueProperties queueProperties, + final SqsAsyncClient sqsAsyncClient) { + final BatchingMessageResolverProperties batchingMessageResolverProperties = batchingMessageResolverProperties(annotation); + + return new BatchingMessageResolver(queueProperties, sqsAsyncClient, batchingMessageResolverProperties); + } + + @VisibleForTesting + BatchingMessageResolverProperties batchingMessageResolverProperties(final QueueListener annotation) { + return StaticBatchingMessageResolverProperties.builder() + .bufferingSizeLimit(getBatchSize(annotation)) + .bufferingTimeInMs(getMaxPeriodBetweenBatchesInMs(annotation)) + .build(); + } + private int getConcurrencyLevel(final QueueListener annotation) { if (StringUtils.isEmpty(annotation.concurrencyLevelString())) { return annotation.concurrencyLevel(); @@ -129,4 +151,14 @@ private int getMessageVisibilityTimeoutInSeconds(final QueueListener annotation) return Integer.parseInt(environment.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString())); } + + private SqsAsyncClient getSqsAsyncClient(final String sqsClient) { + if (StringUtils.isEmpty(sqsClient)) { + return sqsAsyncClientProvider.getDefaultClient() + .orElseThrow(() -> new QueueWrapperInitialisationException("Expected the default SQS Client but there is none")); + } + + return sqsAsyncClientProvider.getClient(sqsClient) + .orElseThrow(() -> new QueueWrapperInitialisationException("Expected a client with id '" + sqsClient + "' but none were found")); + } } diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListener.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListener.java deleted file mode 100644 index b26822e3..00000000 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListener.java +++ /dev/null @@ -1,153 +0,0 @@ -package com.jashmore.sqs.spring.container.batching; - -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.RetentionPolicy.RUNTIME; - -import com.jashmore.sqs.QueueProperties; -import com.jashmore.sqs.aws.AwsConstants; -import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker; -import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties; -import com.jashmore.sqs.container.MessageListenerContainer; -import com.jashmore.sqs.processor.DefaultMessageProcessor; -import com.jashmore.sqs.resolver.batching.BatchingMessageResolver; -import com.jashmore.sqs.retriever.batching.BatchingMessageRetriever; -import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties; -import com.jashmore.sqs.spring.container.basic.QueueListener; -import org.springframework.core.env.Environment; - -import java.lang.annotation.Retention; -import java.lang.annotation.Target; - -/** - * Wrap a method with a {@link MessageListenerContainer} that will execute the method whenever a message is received on the provided queue. The goal of this - * listener compared to others, like the {@link QueueListener @QueueListener} is to reduce the amount of requests - * that are made to the SQS Server, e.g. by batching requests for messages and requests to delete the messages from the queue. - * - *

    This is a simplified annotation that uses the {@link ConcurrentMessageBroker}, {@link BatchingMessageRetriever} and a {@link DefaultMessageProcessor} - * with a {@link BatchingMessageResolver} as the components of the library. Not all of the properties for each implementation are available to simplify - * this usage. - * - * @see BatchingQueueListenerWrapper for what processes this annotation - */ -@Retention(RUNTIME) -@Target(METHOD) -public @interface BatchingQueueListener { - /** - * The queue name or url for the queue to listen to messages on, this may contain placeholders that can be resolved from the Spring Environment. - * - *

    Examples of this field can be: - *

      - *
    • "${my.queue.prop}" which would be resolved to "http://localhost:4576/q/myQueue" if the application.yml contains - * my.queue.prop=http://localhost:4576/q/myQueue
    • - *
    • "http://localhost:4576/q/myQueue" which will be used as is
    • - *
    • "myQueue" which could be resolved to something like "http://localhost:4576/q/myQueue" by getting the URL from SQS
    • - *
    - * - * @return the queue name or URL of the queue - * @see Environment#resolveRequiredPlaceholders(String) for how the placeholders are resolved - * @see QueueProperties#getQueueUrl() for how the URL of the queue is resolved if a queue name is supplied here - */ - String value(); - - /** - * The unique identifier for this listener. - * - *

    This can be used if you need to access the {@link MessageListenerContainer} for this queue listener specifically to start/stop it - * specifically. - * - *

    If no value is provided for the identifier the class path and method name is used as the unique identifier. For example, the method - *

    com.company.queues.MyQueue#method(String, String)
    would result in the following identifier
    my-queue-method
    . - * - *

    The identifier for the queue will also be used to name the threads that will be executing the message processing. For example if your identifier - * is

    'my-queue-method'
    the threads that will be created will be named like
    'my-queue-method-0'
    , etc. - * - * @return the unique identifier for this queue listener - */ - String identifier() default ""; - - /** - * The number of threads that will be processing messages. - * - * @return the total number of threads processing messages - * @see ConcurrentMessageBrokerProperties#getConcurrencyLevel() for more details and constraints - */ - int concurrencyLevel() default 5; - - /** - * The number of threads that will be processing messages converted from a string representation. - * - *

    This can be used when you need to load the value from Spring properties for example

    concurrencyLevelString = "${my.profile.property}"
    - * instead of having it hardcoded in {@link #concurrencyLevel()}. - * - *

    If this value is not empty, the value set by {@link #concurrencyLevel()} will be ignored. - * - * @return the total number of threads processing messages as a string - * @see ConcurrentMessageBrokerProperties#getConcurrencyLevel() for more details and constraints - */ - String concurrencyLevelString() default ""; - - /** - * The total number of threads requesting messages that will result in the the background thread to actually request the messages. - * - *

    This number should be positive but smaller than {@link AwsConstants#MAX_NUMBER_OF_MESSAGES_FROM_SQS} as it does not make sense to have a batch size - * greater than what AWS can provide. - * - * @return the total number of threads requesting messages for trigger a batch of messages to be retrieved - * @see BatchingMessageRetrieverProperties#getNumberOfThreadsWaitingTrigger() for more details about this parameter - */ - int batchSize() default 5; - - /** - * The total number of threads requesting messages that will result in the the background thread to actually request the messages. - * - *

    This number should be positive but smaller than {@link AwsConstants#MAX_NUMBER_OF_MESSAGES_FROM_SQS} as it does not make sense to have a batch size - * greater than what AWS can provide. - * - *

    This can be used when you need to load the value from Spring properties for example - *

    batchSizeString = "${my.profile.property}"
    instead of having it hardcoded in {@link #batchSize()}. - * - * @return the total number of threads requesting messages for trigger a batch of messages to be retrieved - * @see BatchingMessageRetrieverProperties#getNumberOfThreadsWaitingTrigger() for more details about this parameter - */ - String batchSizeString() default ""; - - /** - * The maximum period of time that the {@link BatchingMessageRetriever} will wait for all threads to be ready before retrieving messages. - * - * @return the period in ms that threads will wait for messages to be requested from SQS - * @see BatchingMessageRetrieverProperties#getMessageRetrievalPollingPeriodInMs() for more details about this parameter - */ - long maxPeriodBetweenBatchesInMs() default 2000L; - - /** - * The maximum period of time that the {@link BatchingMessageRetriever} will wait for all threads to be ready before retrieving messages converted - * from a string representation. - * - *

    This can be used when you need to load the value from Spring properties for example - *

    maxPeriodBetweenBatchesInMsString = "${my.profile.property}"
    instead of having it hardcoded in {@link #maxPeriodBetweenBatchesInMs()}. - * - * @return the period in ms that threads will wait for messages to be requested from SQS - * @see BatchingMessageRetrieverProperties#getMessageRetrievalPollingPeriodInMs() for more details - * @see #maxPeriodBetweenBatchesInMs() for more information about this field - */ - String maxPeriodBetweenBatchesInMsString() default ""; - - /** - * The message visibility that will be used for messages obtained from the queue. - * - * @return the message visibility for messages fetched from the queue - * @see BatchingMessageRetrieverProperties#getVisibilityTimeoutInSeconds() for more details and constraints - */ - int messageVisibilityTimeoutInSeconds() default 30; - - /** - * The message visibility that will be used for messages obtained from the queue converted from a string representation. - * - *

    This can be used when you need to load the value from Spring properties for example - *

    messageVisibilityTimeoutInSeconds = "${my.profile.property}"
    instead of having it hardcoded in {@link #messageVisibilityTimeoutInSeconds()}. - * - * @return the message visibility for messages fetched from the queue - * @see BatchingMessageRetrieverProperties#getVisibilityTimeoutInSeconds() for more details and constraints - */ - String messageVisibilityTimeoutInSecondsString() default ""; -} diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListenerWrapper.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListenerWrapper.java deleted file mode 100644 index e8b68199..00000000 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListenerWrapper.java +++ /dev/null @@ -1,147 +0,0 @@ -package com.jashmore.sqs.spring.container.batching; - -import com.google.common.annotations.VisibleForTesting; - -import com.jashmore.sqs.QueueProperties; -import com.jashmore.sqs.argument.ArgumentResolverService; -import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker; -import com.jashmore.sqs.broker.concurrent.StaticConcurrentMessageBrokerProperties; -import com.jashmore.sqs.container.SimpleMessageListenerContainer; -import com.jashmore.sqs.processor.DefaultMessageProcessor; -import com.jashmore.sqs.processor.MessageProcessor; -import com.jashmore.sqs.resolver.MessageResolver; -import com.jashmore.sqs.resolver.batching.BatchingMessageResolver; -import com.jashmore.sqs.resolver.batching.BatchingMessageResolverProperties; -import com.jashmore.sqs.resolver.batching.StaticBatchingMessageResolverProperties; -import com.jashmore.sqs.retriever.MessageRetriever; -import com.jashmore.sqs.retriever.batching.BatchingMessageRetriever; -import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties; -import com.jashmore.sqs.retriever.batching.StaticBatchingMessageRetrieverProperties; -import com.jashmore.sqs.spring.AbstractQueueAnnotationWrapper; -import com.jashmore.sqs.spring.IdentifiableMessageListenerContainer; -import com.jashmore.sqs.spring.QueueWrapper; -import com.jashmore.sqs.spring.container.basic.QueueListener; -import com.jashmore.sqs.spring.queue.QueueResolverService; -import com.jashmore.sqs.spring.util.IdentifierUtils; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.core.env.Environment; -import org.springframework.util.StringUtils; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; - -import java.lang.reflect.Method; - -/** - * {@link QueueWrapper} that will wrap methods annotated with {@link QueueListener @QueueListener} with some predefined - * implementations of the framework. - */ -@SuppressWarnings("Duplicates") -@Slf4j -@RequiredArgsConstructor -public class BatchingQueueListenerWrapper extends AbstractQueueAnnotationWrapper { - private final ArgumentResolverService argumentResolverService; - private final SqsAsyncClient sqsAsyncClient; - private final QueueResolverService queueResolverService; - private final Environment environment; - - @Override - protected Class getAnnotationClass() { - return BatchingQueueListener.class; - } - - @Override - protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(final Object bean, final Method method, - final BatchingQueueListener annotation) { - final QueueProperties queueProperties = QueueProperties.builder() - .queueUrl(queueResolverService.resolveQueueUrl(annotation.value())) - .build(); - - final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties); - - final MessageResolver messageResolver = buildMessageResolver(annotation, queueProperties); - - final MessageProcessor messageProcessor = new DefaultMessageProcessor(argumentResolverService, queueProperties, messageResolver, method, bean); - - final String identifier; - if (StringUtils.isEmpty(annotation.identifier().trim())) { - identifier = IdentifierUtils.buildIdentifierForMethod(bean.getClass(), method); - } else { - identifier = annotation.identifier().trim(); - } - - final ConcurrentMessageBroker messageBroker = new ConcurrentMessageBroker( - messageRetriever, - messageProcessor, - StaticConcurrentMessageBrokerProperties.builder() - .concurrencyLevel(getConcurrencyLevel(annotation)) - .threadNameFormat(identifier + "-%d") - .build() - ); - - return IdentifiableMessageListenerContainer.builder() - .identifier(identifier) - .container(new SimpleMessageListenerContainer(messageRetriever, messageBroker, messageResolver)) - .build(); - } - - private MessageRetriever buildMessageRetriever(final BatchingQueueListener annotation, - final QueueProperties queueProperties) { - return new BatchingMessageRetriever(queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties(annotation)); - } - - @VisibleForTesting - BatchingMessageRetrieverProperties batchingMessageRetrieverProperties(final BatchingQueueListener annotation) { - return StaticBatchingMessageRetrieverProperties.builder() - .visibilityTimeoutInSeconds(getMessageVisibilityTimeoutInSeconds(annotation)) - .messageRetrievalPollingPeriodInMs(getMaxPeriodBetweenBatchesInMs(annotation)) - .numberOfThreadsWaitingTrigger(getBatchSize(annotation)) - .build(); - } - - private MessageResolver buildMessageResolver(final BatchingQueueListener annotation, - final QueueProperties queueProperties) { - final BatchingMessageResolverProperties batchingMessageResolverProperties = batchingMessageResolverProperties(annotation); - - return new BatchingMessageResolver(queueProperties, sqsAsyncClient, batchingMessageResolverProperties); - } - - @VisibleForTesting - BatchingMessageResolverProperties batchingMessageResolverProperties(final BatchingQueueListener annotation) { - return StaticBatchingMessageResolverProperties.builder() - .bufferingSizeLimit(getBatchSize(annotation)) - .bufferingTimeInMs(getMaxPeriodBetweenBatchesInMs(annotation)) - .build(); - } - - private int getConcurrencyLevel(final BatchingQueueListener annotation) { - if (StringUtils.isEmpty(annotation.concurrencyLevelString())) { - return annotation.concurrencyLevel(); - } - - return Integer.parseInt(environment.resolvePlaceholders(annotation.concurrencyLevelString())); - } - - private int getBatchSize(final BatchingQueueListener annotation) { - if (StringUtils.isEmpty(annotation.batchSizeString())) { - return annotation.batchSize(); - } - - return Integer.parseInt(environment.resolvePlaceholders(annotation.batchSizeString())); - } - - private long getMaxPeriodBetweenBatchesInMs(final BatchingQueueListener annotation) { - if (StringUtils.isEmpty(annotation.maxPeriodBetweenBatchesInMsString())) { - return annotation.maxPeriodBetweenBatchesInMs(); - } - - return Long.parseLong(environment.resolvePlaceholders(annotation.maxPeriodBetweenBatchesInMsString())); - } - - private int getMessageVisibilityTimeoutInSeconds(final BatchingQueueListener annotation) { - if (StringUtils.isEmpty(annotation.messageVisibilityTimeoutInSecondsString())) { - return annotation.messageVisibilityTimeoutInSeconds(); - } - - return Integer.parseInt(environment.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString())); - } -} diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListener.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListener.java index 015614e8..6deb0a0e 100644 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListener.java +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListener.java @@ -12,7 +12,9 @@ import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties; import com.jashmore.sqs.retriever.prefetch.PrefetchingMessageRetriever; import com.jashmore.sqs.retriever.prefetch.StaticPrefetchingMessageRetrieverProperties; +import com.jashmore.sqs.spring.client.SqsAsyncClientProvider; import org.springframework.core.env.Environment; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; import java.lang.annotation.Retention; import java.lang.annotation.Target; @@ -61,6 +63,19 @@ */ String identifier() default ""; + /** + * The unique identifier for the {@link SqsAsyncClient} that should be used for this queue. + * + *

    As queues can be set up across multiple AWS Accounts there can be multiple {@link SqsAsyncClient}s being + * provided by the {@link SqsAsyncClientProvider}. When this identifier is set, it will obtain the client to be used + * via the {@link SqsAsyncClientProvider#getClient(String)} method. + * + *

    If this value is not set (empty), the default client will be provided by a call to {@link SqsAsyncClientProvider#getDefaultClient()}. + * + * @return the identifier for the client to use or empty if the default should be used + */ + String sqsClient() default ""; + /** * The number of threads that will be processing messages. * diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListenerWrapper.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListenerWrapper.java index 98000262..7388fd16 100644 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListenerWrapper.java +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListenerWrapper.java @@ -20,6 +20,8 @@ import com.jashmore.sqs.spring.AbstractQueueAnnotationWrapper; import com.jashmore.sqs.spring.IdentifiableMessageListenerContainer; import com.jashmore.sqs.spring.QueueWrapper; +import com.jashmore.sqs.spring.QueueWrapperInitialisationException; +import com.jashmore.sqs.spring.client.SqsAsyncClientProvider; import com.jashmore.sqs.spring.queue.QueueResolverService; import com.jashmore.sqs.spring.util.IdentifierUtils; import lombok.RequiredArgsConstructor; @@ -38,7 +40,7 @@ @RequiredArgsConstructor public class PrefetchingQueueListenerWrapper extends AbstractQueueAnnotationWrapper { private final ArgumentResolverService argumentResolverService; - private final SqsAsyncClient sqsAsyncClient; + private final SqsAsyncClientProvider sqsAsyncClientProvider; private final QueueResolverService queueResolverService; private final Environment environment; @@ -50,14 +52,17 @@ protected Class getAnnotationClass() { @Override protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(final Object bean, final Method method, final PrefetchingQueueListener annotation) { + final SqsAsyncClient sqsAsyncClient = getSqsAsyncClient(annotation.sqsClient()); + final QueueProperties queueProperties = QueueProperties .builder() - .queueUrl(queueResolverService.resolveQueueUrl(annotation.value())) + .queueUrl(queueResolverService.resolveQueueUrl(sqsAsyncClient, annotation.value())) .build(); - final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties); + final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties, sqsAsyncClient); final MessageResolver messageResolver = new IndividualMessageResolver(queueProperties, sqsAsyncClient); - final MessageProcessor messageProcessor = new DefaultMessageProcessor(argumentResolverService, queueProperties, messageResolver, method, bean); + final MessageProcessor messageProcessor = new DefaultMessageProcessor(argumentResolverService, queueProperties, + sqsAsyncClient, messageResolver, method, bean); final String identifier; if (StringUtils.isEmpty(annotation.identifier().trim())) { @@ -124,7 +129,19 @@ PrefetchingMessageRetrieverProperties buildMessageRetrieverProperties(final Pref .build(); } - private MessageRetriever buildMessageRetriever(final PrefetchingQueueListener annotation, final QueueProperties queueProperties) { + private MessageRetriever buildMessageRetriever(final PrefetchingQueueListener annotation, + final QueueProperties queueProperties, + final SqsAsyncClient sqsAsyncClient) { return new PrefetchingMessageRetriever(sqsAsyncClient, queueProperties, buildMessageRetrieverProperties(annotation)); } + + private SqsAsyncClient getSqsAsyncClient(final String sqsClient) { + if (StringUtils.isEmpty(sqsClient)) { + return sqsAsyncClientProvider.getDefaultClient() + .orElseThrow(() -> new QueueWrapperInitialisationException("Expected the default SQS Client but there is none")); + } + + return sqsAsyncClientProvider.getClient(sqsClient) + .orElseThrow(() -> new QueueWrapperInitialisationException("Expected a client with id '" + sqsClient + "' but none were found")); + } } diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/queue/DefaultQueueResolverService.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/queue/DefaultQueueResolverService.java index 83cdd06b..ba280b0b 100644 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/queue/DefaultQueueResolverService.java +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/queue/DefaultQueueResolverService.java @@ -14,11 +14,10 @@ @Service @AllArgsConstructor public class DefaultQueueResolverService implements QueueResolverService { - private final SqsAsyncClient sqsAsyncClient; private final Environment environment; @Override - public String resolveQueueUrl(final String queueNameOrUrl) { + public String resolveQueueUrl(final SqsAsyncClient sqsAsyncClient, final String queueNameOrUrl) { final String resolvedQueueNameOrUrl = environment.resolveRequiredPlaceholders(queueNameOrUrl); if (resolvedQueueNameOrUrl.startsWith("http")) { diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/client/DefaultSqsAsyncClientProviderTest.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/client/DefaultSqsAsyncClientProviderTest.java new file mode 100644 index 00000000..95e17ec2 --- /dev/null +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/client/DefaultSqsAsyncClientProviderTest.java @@ -0,0 +1,103 @@ +package com.jashmore.sqs.spring.client; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; + +import java.util.Map; +import java.util.Optional; + +public class DefaultSqsAsyncClientProviderTest { + @Rule + public MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock + private SqsAsyncClient client; + + @Test + public void whenNoDefaultSqsAsyncClientProvidedGettingDefaultReturnsEmptyOptional() { + // arrange + final SqsAsyncClientProvider sqsAsyncClientProvider = new DefaultSqsAsyncClientProvider((SqsAsyncClient)null); + + // act + final Optional optionalDefaultClient = sqsAsyncClientProvider.getDefaultClient(); + + // assert + assertThat(optionalDefaultClient).isEmpty(); + } + + @Test + public void whenDefaultSqsAsyncClientProvidedGettingDefaultReturnThatClient() { + // arrange + final SqsAsyncClientProvider sqsAsyncClientProvider = new DefaultSqsAsyncClientProvider(client); + + // act + final Optional optionalDefaultClient = sqsAsyncClientProvider.getDefaultClient(); + + // assert + assertThat(optionalDefaultClient).contains(client); + } + + @Test(expected = NullPointerException.class) + public void nullClientMapThrowsException() { + new DefaultSqsAsyncClientProvider(null, null); + } + + @Test + public void whenClientMapProvidedOneCanBeObtainedViaTheIdentifier() { + // arrange + final SqsAsyncClientProvider sqsAsyncClientProvider = new DefaultSqsAsyncClientProvider(null, ImmutableMap.of("id", client)); + + // act + final Optional optionalDefaultClient = sqsAsyncClientProvider.getClient("id"); + + // assert + assertThat(optionalDefaultClient).contains(client); + } + + @Test + public void whenNoDefaultClientProvidedWithClientMapDefaultClientWillBeEmptyOptional() { + // arrange + final SqsAsyncClientProvider sqsAsyncClientProvider = new DefaultSqsAsyncClientProvider(ImmutableMap.of("id", client)); + + // act + final Optional optionalDefaultClient = sqsAsyncClientProvider.getDefaultClient(); + + // assert + assertThat(optionalDefaultClient).isEmpty(); + } + + @Test + public void whenClientMapProvidedUsingAnIdentifierThatDoesNotExistReturnsEmptyOptional() { + // arrange + final SqsAsyncClientProvider sqsAsyncClientProvider = new DefaultSqsAsyncClientProvider(null, ImmutableMap.of("id", client)); + + // act + final Optional optionalDefaultClient = sqsAsyncClientProvider.getClient("unknownid"); + + // assert + assertThat(optionalDefaultClient).isEmpty(); + } + + @Test + public void clientMapCannotBeUpdatedAfterConstruction() { + // arrange + final Map clientMap = Maps.newHashMap(); + final SqsAsyncClientProvider sqsAsyncClientProvider = new DefaultSqsAsyncClientProvider(null, clientMap); + + // act + clientMap.put("id", client); + + // assert + final Optional optionalDefaultClient = sqsAsyncClientProvider.getClient("id"); + assertThat(optionalDefaultClient).isEmpty(); + } +} \ No newline at end of file diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/config/QueueListenerConfigurationTest.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/config/QueueListenerConfigurationTest.java index a0bd673c..49f8622d 100644 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/config/QueueListenerConfigurationTest.java +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/config/QueueListenerConfigurationTest.java @@ -13,12 +13,11 @@ import com.jashmore.sqs.argument.message.MessageArgumentResolver; import com.jashmore.sqs.argument.messageid.MessageIdArgumentResolver; import com.jashmore.sqs.argument.payload.PayloadArgumentResolver; -import com.jashmore.sqs.argument.visibility.VisibilityExtenderArgumentResolver; import com.jashmore.sqs.spring.QueueWrapper; import com.jashmore.sqs.spring.DefaultQueueContainerService; import com.jashmore.sqs.spring.QueueContainerService; +import com.jashmore.sqs.spring.client.SqsAsyncClientProvider; import com.jashmore.sqs.spring.container.basic.QueueListenerWrapper; -import com.jashmore.sqs.spring.container.batching.BatchingQueueListenerWrapper; import com.jashmore.sqs.spring.container.prefetch.PrefetchingQueueListenerWrapper; import org.junit.Rule; import org.junit.Test; @@ -180,7 +179,7 @@ public void allCoreArgumentResolversAreIncludedInContextIfNoCustomArgumentResolv .collect(toSet()); assertThat(argumentResolvers).containsExactlyInAnyOrder( - PayloadArgumentResolver.class, MessageIdArgumentResolver.class, VisibilityExtenderArgumentResolver.class, + PayloadArgumentResolver.class, MessageIdArgumentResolver.class, MessageAttributeArgumentResolver.class, MessageSystemAttributeArgumentResolver.class, MessageArgumentResolver.class ); }); @@ -198,7 +197,7 @@ public void userDefinedArgumentResolversAreIncludedInDefaultDelegatingArgumentRe argumentResolversField.setAccessible(true); assertThat(((Set) argumentResolversField.get(argumentResolverService))) .containsExactlyElementsOf(argumentResolvers); - assertThat(argumentResolvers).hasSize(7); + assertThat(argumentResolvers).hasSize(6); }); } @@ -221,9 +220,7 @@ public void allCoreDefinedQueueWrappersAreIncludedInContext() { .map(QueueWrapper::getClass) .collect(toSet()); - assertThat(queueWrapperClasses).containsExactlyInAnyOrder( - QueueListenerWrapper.class, PrefetchingQueueListenerWrapper.class, BatchingQueueListenerWrapper.class - ); + assertThat(queueWrapperClasses).containsExactlyInAnyOrder(QueueListenerWrapper.class, PrefetchingQueueListenerWrapper.class); }); } @@ -252,7 +249,7 @@ public void userDefinedQueueWrapperIsIncludedInDefaultQueueContainerService() { argumentResolversField.setAccessible(true); assertThat(((List) argumentResolversField.get(service))) .containsExactlyElementsOf(queueWrappers); - assertThat(queueWrappers).hasSize(4); + assertThat(queueWrappers).hasSize(3); }); } @@ -274,6 +271,56 @@ public void userDefinedQueueContainerServiceWillResultInNoCoreQueueWrappersInCon .run((context) -> assertThat(context).doesNotHaveBean(QueueWrapper.class)); } + @Test + public void whenNoSqsAsyncClientProviderADefaultImplementationIsCreated() { + this.contextRunner + .withSystemProperties("aws.region:localstack") + .run((context) -> { + assertThat(context).hasSingleBean(SqsAsyncClientProvider.class); + assertThat(context.getBean(SqsAsyncClientProvider.class)).isSameAs( + context.getBean(QueueListenerConfiguration.class).sqsAsyncClientProvider(null)); + }); + } + + @Test + public void whenNoCustomSqsAsyncClientProviderAndSqsAsyncClientDefaultSqsAsyncClientIsTheSqsAsyncClientProvidersDefault() { + this.contextRunner + .withSystemProperties("aws.region:localstack") + .run((context) -> { + final SqsAsyncClientProvider sqsAsyncClientProvider = context.getBean(SqsAsyncClientProvider.class); + final SqsAsyncClient expectedDefault = context.getBean(SqsAsyncClient.class); + assertThat(sqsAsyncClientProvider.getDefaultClient()).contains(expectedDefault); + }); + } + + @Test + public void whenCustomSqsAsyncClientProvidedTheDefaultSqsAsyncClientProviderUsesThisAsTheDefault() { + this.contextRunner + .withUserConfiguration(UserConfigurationWithSqsClient.class) + .run((context) -> { + final SqsAsyncClientProvider sqsAsyncClientProvider = context.getBean(SqsAsyncClientProvider.class); + final SqsAsyncClient userDefinedSqsAsyncClient = context.getBean(UserConfigurationWithSqsClient.class).userDefinedSqsAsyncClient(); + assertThat(sqsAsyncClientProvider.getDefaultClient()).contains(userDefinedSqsAsyncClient); + }); + } + + @Test + public void whenCustomSqsAsyncClientProviderProvidedNoSqsAsyncClientBeanIsBuilt() { + this.contextRunner + .withUserConfiguration(UserConfigurationWithSqsClientProvider.class) + .run((context) -> assertThat(context).doesNotHaveBean(SqsAsyncClient.class)); + } + + @Test + public void whenCustomSqsAsyncClientProviderThatIsUsedInsteadOfTheDefault() { + this.contextRunner + .withUserConfiguration(UserConfigurationWithSqsClientProvider.class) + .run((context) -> { + assertThat(context.getBean(SqsAsyncClientProvider.class)) + .isSameAs(context.getBean(UserConfigurationWithSqsClientProvider.class).userDefinedSqsAsyncClientProvider()); + }); + } + @Configuration static class UserConfigurationWithSqsClient { @Bean @@ -344,4 +391,12 @@ public QueueWrapper customQueueWrapper() { return mock(QueueWrapper.class); } } + + @Configuration + static class UserConfigurationWithSqsClientProvider { + @Bean + public SqsAsyncClientProvider userDefinedSqsAsyncClientProvider() { + return mock(SqsAsyncClientProvider.class); + } + } } \ No newline at end of file diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapperTest.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapperTest.java index 6b252aa9..e11c88e8 100644 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapperTest.java +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapperTest.java @@ -2,6 +2,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -10,6 +11,8 @@ import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties; import com.jashmore.sqs.retriever.batching.StaticBatchingMessageRetrieverProperties; import com.jashmore.sqs.spring.IdentifiableMessageListenerContainer; +import com.jashmore.sqs.spring.QueueWrapperInitialisationException; +import com.jashmore.sqs.spring.client.SqsAsyncClientProvider; import com.jashmore.sqs.spring.queue.QueueResolverService; import org.junit.Before; import org.junit.Rule; @@ -22,6 +25,7 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient; import java.lang.reflect.Method; +import java.util.Optional; /** * Class is hard to test as it is the one building all of the dependencies internally using new constructors. Don't really know a better way to do this @@ -39,7 +43,10 @@ public class QueueListenerWrapperTest { private ArgumentResolverService argumentResolverService; @Mock - private SqsAsyncClient sqsAsyncClient; + private SqsAsyncClientProvider sqsAsyncClientProvider; + + @Mock + private SqsAsyncClient defaultSqsAsyncClient; @Mock private QueueResolverService queueResolver; @@ -51,7 +58,9 @@ public class QueueListenerWrapperTest { @Before public void setUp() { - queueListenerWrapper = new QueueListenerWrapper(argumentResolverService, sqsAsyncClient, queueResolver, environment); + queueListenerWrapper = new QueueListenerWrapper(argumentResolverService, sqsAsyncClientProvider, queueResolver, environment); + + when(sqsAsyncClientProvider.getDefaultClient()).thenReturn(Optional.of(defaultSqsAsyncClient)); } @Test @@ -106,7 +115,7 @@ public void queueIsResolvedViaTheQueueResolverService() throws NoSuchMethodExcep queueListenerWrapper.wrapMethod(bean, method); // assert - verify(queueResolver).resolveQueueUrl("test"); + verify(queueResolver).resolveQueueUrl(defaultSqsAsyncClient, "test"); } @Test @@ -203,6 +212,46 @@ public void batchingMessageRetrieverPropertiesBuiltFromSpringValues() throws Exc ); } + @Test + public void whenNoDefaultSqsClientAvailableAndItIsRequestedTheListenerWillNotBeWrapped() throws Exception { + // arrange + final Object bean = new QueueListenerWrapperTest(); + final Method method = QueueListenerWrapperTest.class.getMethod("myMethod"); + when(sqsAsyncClientProvider.getDefaultClient()).thenReturn(Optional.empty()); + expectedException.expect(QueueWrapperInitialisationException.class); + expectedException.expectMessage("Expected the default SQS Client but there is none"); + + // act + queueListenerWrapper.wrapMethod(bean, method); + } + + @Test + public void whenSpecificSqsClientRequestButNoneAvailableAnExceptionIsThrown() throws Exception { + // arrange + final Object bean = new QueueListenerWrapperTest(); + final Method method = QueueListenerWrapperTest.class.getMethod("methodUsingSpecificSqsAsyncClient"); + when(sqsAsyncClientProvider.getClient("clientId")).thenReturn(Optional.empty()); + expectedException.expect(QueueWrapperInitialisationException.class); + expectedException.expectMessage("Expected a client with id 'clientId' but none were found"); + + // act + queueListenerWrapper.wrapMethod(bean, method); + } + + @Test + public void whenSpecificSqsClientRequestWhichCanBeFoundTheContainerCanBeBuilt() throws Exception { + // arrange + final Object bean = new QueueListenerWrapperTest(); + final Method method = QueueListenerWrapperTest.class.getMethod("methodUsingSpecificSqsAsyncClient"); + when(sqsAsyncClientProvider.getClient("clientId")).thenReturn(Optional.of(mock(SqsAsyncClient.class))); + + // act + final IdentifiableMessageListenerContainer container = queueListenerWrapper.wrapMethod(bean, method); + + // assert + assertThat(container).isNotNull(); + } + @QueueListener("test") public void myMethod() { @@ -223,4 +272,9 @@ public void methodWithFieldsUsingEnvironmentProperties() { public void methodWithFields() { } + + @QueueListener(value = "test2", sqsClient = "clientId") + public void methodUsingSpecificSqsAsyncClient() { + + } } diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListenerWrapperTest.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListenerWrapperTest.java deleted file mode 100644 index 56dfe4df..00000000 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListenerWrapperTest.java +++ /dev/null @@ -1,272 +0,0 @@ -package com.jashmore.sqs.spring.container.batching; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.jashmore.sqs.argument.ArgumentResolverService; -import com.jashmore.sqs.container.SimpleMessageListenerContainer; -import com.jashmore.sqs.resolver.batching.BatchingMessageResolverProperties; -import com.jashmore.sqs.resolver.batching.StaticBatchingMessageResolverProperties; -import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties; -import com.jashmore.sqs.retriever.batching.StaticBatchingMessageRetrieverProperties; -import com.jashmore.sqs.spring.IdentifiableMessageListenerContainer; -import com.jashmore.sqs.spring.queue.QueueResolverService; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; -import org.springframework.core.env.Environment; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; - -import java.lang.reflect.Method; - -@SuppressWarnings("WeakerAccess") -public class BatchingQueueListenerWrapperTest { - @Rule - public MockitoRule mockitoRule = MockitoJUnit.rule(); - - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - @Mock - private ArgumentResolverService argumentResolverService; - - @Mock - private SqsAsyncClient sqsAsyncClient; - - @Mock - private QueueResolverService queueResolver; - - @Mock - private Environment environment; - - private BatchingQueueListenerWrapper batchingQueueListenerWrapper; - - @Before - public void setUp() { - batchingQueueListenerWrapper = new BatchingQueueListenerWrapper(argumentResolverService, sqsAsyncClient, queueResolver, environment); - } - - @Test - public void queueListenerWrapperCanBuildMessageListenerContainer() throws NoSuchMethodException { - // arrange - final Object bean = new BatchingQueueListenerWrapperTest(); - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("myMethod"); - - // act - final IdentifiableMessageListenerContainer messageListenerContainer = batchingQueueListenerWrapper.wrapMethod(bean, method); - - // assert - assertThat(messageListenerContainer).isNotNull(); - assertThat(messageListenerContainer.getContainer()).isInstanceOf(SimpleMessageListenerContainer.class); - } - - @Test - public void queueListenerWrapperWithoutIdentifierWillConstructOneByDefault() throws NoSuchMethodException { - // arrange - final Object bean = new BatchingQueueListenerWrapperTest(); - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("myMethod"); - - // act - final IdentifiableMessageListenerContainer messageListenerContainer = batchingQueueListenerWrapper.wrapMethod(bean, method); - - // assert - assertThat(messageListenerContainer).isNotNull(); - assertThat(messageListenerContainer.getIdentifier()).isEqualTo("batching-queue-listener-wrapper-test-my-method"); - } - - @Test - public void queueListenerWrapperWithIdentifierWillUseThatForTheMessageListenerContainer() throws NoSuchMethodException { - // arrange - final Object bean = new BatchingQueueListenerWrapperTest(); - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("myMethodWithIdentifier"); - - // act - final IdentifiableMessageListenerContainer messageListenerContainer = batchingQueueListenerWrapper.wrapMethod(bean, method); - - // assert - assertThat(messageListenerContainer).isNotNull(); - assertThat(messageListenerContainer.getIdentifier()).isEqualTo("identifier"); - } - - @Test - public void queueIsResolvedViaTheQueueResolverService() throws NoSuchMethodException { - // arrange - final Object bean = new BatchingQueueListenerWrapperTest(); - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("myMethod"); - - // act - batchingQueueListenerWrapper.wrapMethod(bean, method); - - // assert - verify(queueResolver).resolveQueueUrl("test"); - } - - @Test - public void invalidConcurrencyLevelStringFailsToWrapMessageListener() throws Exception { - // arrange - when(environment.resolvePlaceholders(anyString())).thenReturn("1"); - when(environment.resolvePlaceholders("${prop.concurrency}")).thenReturn("Test Invalid"); - final Object bean = new BatchingQueueListenerWrapperTest(); - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("methodWithFieldsUsingEnvironmentProperties"); - expectedException.expect(NumberFormatException.class); - - // act - batchingQueueListenerWrapper.wrapMethod(bean, method); - } - - @Test - public void invalidMessageVisibilityTimeoutInSecondsStringFailsToWrapMessageListener() throws Exception { - // arrange - when(environment.resolvePlaceholders(anyString())).thenReturn("1"); - when(environment.resolvePlaceholders("${prop.visibility}")).thenReturn("Test Invalid"); - final Object bean = new BatchingQueueListenerWrapperTest(); - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("methodWithFieldsUsingEnvironmentProperties"); - expectedException.expect(NumberFormatException.class); - - // act - batchingQueueListenerWrapper.wrapMethod(bean, method); - } - - @Test - public void invalidMaxPeriodBetweenBatchesInMsStringFailsToWrapMessageListener() throws Exception { - // arrange - when(environment.resolvePlaceholders(anyString())).thenReturn("1"); - when(environment.resolvePlaceholders("${prop.period}")).thenReturn("Test Invalid"); - final Object bean = new BatchingQueueListenerWrapperTest(); - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("methodWithFieldsUsingEnvironmentProperties"); - expectedException.expect(NumberFormatException.class); - - // act - batchingQueueListenerWrapper.wrapMethod(bean, method); - } - - @Test - public void invalidBatchSizeStringFailsToWrapMessageListener() throws Exception { - // arrange - when(environment.resolvePlaceholders(anyString())).thenReturn("1"); - when(environment.resolvePlaceholders("${prop.batchSize}")).thenReturn("Test Invalid"); - final Object bean = new BatchingQueueListenerWrapperTest(); - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("methodWithFieldsUsingEnvironmentProperties"); - expectedException.expect(NumberFormatException.class); - - // act - batchingQueueListenerWrapper.wrapMethod(bean, method); - } - - @Test - public void validStringFieldsWillCorrectlyBuildMessageListener() throws Exception { - // arrange - when(environment.resolvePlaceholders(anyString())).thenReturn("1"); - final Object bean = new BatchingQueueListenerWrapperTest(); - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("methodWithFieldsUsingEnvironmentProperties"); - - // act - final IdentifiableMessageListenerContainer messageListenerContainer = batchingQueueListenerWrapper.wrapMethod(bean, method); - - // assert - assertThat(messageListenerContainer).isNotNull(); - } - - @Test - public void batchingMessageRetrieversBuiltFromAnnotationProperties() throws Exception { - // arrange - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("methodWithFields"); - final BatchingQueueListener annotation = method.getAnnotation(BatchingQueueListener.class); - - // act - final BatchingMessageRetrieverProperties properties = batchingQueueListenerWrapper.batchingMessageRetrieverProperties(annotation); - - // assert - assertThat(properties).isEqualTo(StaticBatchingMessageRetrieverProperties.builder() - .visibilityTimeoutInSeconds(300) - .messageRetrievalPollingPeriodInMs(40L) - .numberOfThreadsWaitingTrigger(10) - .build() - ); - } - - @Test - public void batchingMessageRetrieversBuiltFromAnnotationStringProperties() throws Exception { - // arrange - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("methodWithFieldsUsingEnvironmentProperties"); - final BatchingQueueListener annotation = method.getAnnotation(BatchingQueueListener.class); - when(environment.resolvePlaceholders("${prop.batchSize}")).thenReturn("8"); - when(environment.resolvePlaceholders("${prop.period}")).thenReturn("30"); - when(environment.resolvePlaceholders("${prop.visibility}")).thenReturn("40"); - - // act - final BatchingMessageRetrieverProperties properties = batchingQueueListenerWrapper.batchingMessageRetrieverProperties(annotation); - - // assert - assertThat(properties).isEqualTo(StaticBatchingMessageRetrieverProperties.builder() - .numberOfThreadsWaitingTrigger(8) - .visibilityTimeoutInSeconds(40) - .messageRetrievalPollingPeriodInMs(30L) - .build() - ); - } - - @Test - public void batchingMessageResolverBuiltFromAnnotationProperties() throws Exception { - // arrange - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("methodWithFields"); - final BatchingQueueListener annotation = method.getAnnotation(BatchingQueueListener.class); - - // act - final BatchingMessageResolverProperties properties = batchingQueueListenerWrapper.batchingMessageResolverProperties(annotation); - - // assert - assertThat(properties).isEqualTo(StaticBatchingMessageResolverProperties.builder() - .bufferingSizeLimit(10) - .bufferingTimeInMs(40L) - .build() - ); - } - - @Test - public void batchingMessageResolverBuiltFromAnnotationStringProperties() throws Exception { - // arrange - final Method method = BatchingQueueListenerWrapperTest.class.getMethod("methodWithFieldsUsingEnvironmentProperties"); - final BatchingQueueListener annotation = method.getAnnotation(BatchingQueueListener.class); - when(environment.resolvePlaceholders("${prop.batchSize}")).thenReturn("8"); - when(environment.resolvePlaceholders("${prop.period}")).thenReturn("30"); - when(environment.resolvePlaceholders("${prop.visibility}")).thenReturn("40"); - - // act - final BatchingMessageResolverProperties properties = batchingQueueListenerWrapper.batchingMessageResolverProperties(annotation); - - // assert - assertThat(properties).isEqualTo(StaticBatchingMessageResolverProperties.builder() - .bufferingSizeLimit(8) - .bufferingTimeInMs(30L) - .build() - ); - } - - @BatchingQueueListener("test") - public void myMethod() { - - } - - @BatchingQueueListener(value = "test2", identifier = "identifier") - public void myMethodWithIdentifier() { - - } - - @BatchingQueueListener(value = "test2", concurrencyLevelString = "${prop.concurrency}", batchSizeString = "${prop.batchSize}", - messageVisibilityTimeoutInSecondsString = "${prop.visibility}", maxPeriodBetweenBatchesInMsString = "${prop.period}") - public void methodWithFieldsUsingEnvironmentProperties() { - - } - - @BatchingQueueListener(value = "test2", concurrencyLevel = 20, batchSize = 10, messageVisibilityTimeoutInSeconds = 300, maxPeriodBetweenBatchesInMs = 40) - public void methodWithFields() { - - } -} \ No newline at end of file diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListenerWrapperTest.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListenerWrapperTest.java index 8b965351..f23c6bf6 100644 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListenerWrapperTest.java +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListenerWrapperTest.java @@ -3,6 +3,7 @@ import static com.jashmore.sqs.aws.AwsConstants.MAX_SQS_RECEIVE_WAIT_TIME_IN_SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -11,6 +12,8 @@ import com.jashmore.sqs.retriever.prefetch.PrefetchingMessageRetrieverProperties; import com.jashmore.sqs.retriever.prefetch.StaticPrefetchingMessageRetrieverProperties; import com.jashmore.sqs.spring.IdentifiableMessageListenerContainer; +import com.jashmore.sqs.spring.QueueWrapperInitialisationException; +import com.jashmore.sqs.spring.client.SqsAsyncClientProvider; import com.jashmore.sqs.spring.queue.QueueResolverService; import org.junit.Before; import org.junit.Rule; @@ -23,6 +26,7 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient; import java.lang.reflect.Method; +import java.util.Optional; /** * Class is hard to test as it is the one building all of the dependencies internally using new constructors. Don't really know a better way to do this @@ -40,7 +44,10 @@ public class PrefetchingQueueListenerWrapperTest { private ArgumentResolverService argumentResolverService; @Mock - private SqsAsyncClient sqsAsyncClient; + private SqsAsyncClientProvider sqsAsyncClientProvider; + + @Mock + private SqsAsyncClient defaultClient; @Mock private QueueResolverService queueResolver; @@ -52,7 +59,9 @@ public class PrefetchingQueueListenerWrapperTest { @Before public void setUp() { - prefetchingQueueListenerWrapper = new PrefetchingQueueListenerWrapper(argumentResolverService, sqsAsyncClient, queueResolver, environment); + prefetchingQueueListenerWrapper = new PrefetchingQueueListenerWrapper(argumentResolverService, sqsAsyncClientProvider, queueResolver, environment); + + when(sqsAsyncClientProvider.getDefaultClient()).thenReturn(Optional.of(defaultClient)); } @Test @@ -108,7 +117,7 @@ public void queueIsResolvedViaTheQueueResolverService() throws NoSuchMethodExcep prefetchingQueueListenerWrapper.wrapMethod(bean, method); // assert - verify(queueResolver).resolveQueueUrl("test"); + verify(queueResolver).resolveQueueUrl(defaultClient, "test"); } @Test @@ -220,6 +229,46 @@ public void prefetchingQueueListenerCanBeBuiltFromProperties() throws Exception ); } + @Test + public void whenNoDefaultSqsClientAvailableAndItIsRequestedTheListenerWillNotBeWrapped() throws Exception { + // arrange + final Object bean = new PrefetchingQueueListenerWrapperTest(); + final Method method = PrefetchingQueueListenerWrapperTest.class.getMethod("myMethod"); + when(sqsAsyncClientProvider.getDefaultClient()).thenReturn(Optional.empty()); + expectedException.expect(QueueWrapperInitialisationException.class); + expectedException.expectMessage("Expected the default SQS Client but there is none"); + + // act + prefetchingQueueListenerWrapper.wrapMethod(bean, method); + } + + @Test + public void whenSpecificSqsClientRequestButNoneAvailableAnExceptionIsThrown() throws Exception { + // arrange + final Object bean = new PrefetchingQueueListenerWrapperTest(); + final Method method = PrefetchingQueueListenerWrapperTest.class.getMethod("methodUsingSpecificSqsAsyncClient"); + when(sqsAsyncClientProvider.getClient("clientId")).thenReturn(Optional.empty()); + expectedException.expect(QueueWrapperInitialisationException.class); + expectedException.expectMessage("Expected a client with id 'clientId' but none were found"); + + // act + prefetchingQueueListenerWrapper.wrapMethod(bean, method); + } + + @Test + public void whenSpecificSqsClientRequestWhichCanBeFoundTheContainerCanBeBuilt() throws Exception { + // arrange + final Object bean = new PrefetchingQueueListenerWrapperTest(); + final Method method = PrefetchingQueueListenerWrapperTest.class.getMethod("methodUsingSpecificSqsAsyncClient"); + when(sqsAsyncClientProvider.getClient("clientId")).thenReturn(Optional.of(mock(SqsAsyncClient.class))); + + // act + final IdentifiableMessageListenerContainer container = prefetchingQueueListenerWrapper.wrapMethod(bean, method); + + // assert + assertThat(container).isNotNull(); + } + @PrefetchingQueueListener("test") public void myMethod() { @@ -244,4 +293,9 @@ public void methodWithFieldsUsingEnvironmentProperties() { public void methodWithFieldsUsingProperties() { } + + @PrefetchingQueueListener(value = "test2", sqsClient = "clientId") + public void methodUsingSpecificSqsAsyncClient() { + + } } diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/queue/DefaultQueueResolverServiceTest.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/queue/DefaultQueueResolverServiceTest.java index 73a1a204..a5203359 100644 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/queue/DefaultQueueResolverServiceTest.java +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/queue/DefaultQueueResolverServiceTest.java @@ -40,7 +40,7 @@ public class DefaultQueueResolverServiceTest { @Before public void setUp() { - defaultQueueResolverService = new DefaultQueueResolverService(sqsAsyncClient, environment); + defaultQueueResolverService = new DefaultQueueResolverService(environment); } @After @@ -56,7 +56,7 @@ public void resolvedValueThatIsAUrlIsReturned() { when(environment.resolveRequiredPlaceholders("${variable}")).thenReturn("http://url"); // act - final String queueUrl = defaultQueueResolverService.resolveQueueUrl("${variable}"); + final String queueUrl = defaultQueueResolverService.resolveQueueUrl(sqsAsyncClient, "${variable}"); // assert assertThat(queueUrl).isEqualTo("http://url"); @@ -70,7 +70,7 @@ public void resolvedValueThatIsNotAUrlCallsOutToAmazonForUrl() { .thenReturn(CompletableFuture.completedFuture(GetQueueUrlResponse.builder().queueUrl("http://url").build())); // act - final String queueUrl = defaultQueueResolverService.resolveQueueUrl("${variable}"); + final String queueUrl = defaultQueueResolverService.resolveQueueUrl(sqsAsyncClient, "${variable}"); // assert assertThat(queueUrl).isEqualTo("http://url"); @@ -92,7 +92,7 @@ public GetQueueUrlResponse get() throws ExecutionException { expectedException.expectCause(equalTo(exceptionCause)); // act - defaultQueueResolverService.resolveQueueUrl("${variable}"); + defaultQueueResolverService.resolveQueueUrl(sqsAsyncClient, "${variable}"); } @Test @@ -111,7 +111,7 @@ public GetQueueUrlResponse get() throws ExecutionException { expectedException.expectCause(equalTo(exceptionCause)); // act - defaultQueueResolverService.resolveQueueUrl("${variable}"); + defaultQueueResolverService.resolveQueueUrl(sqsAsyncClient, "${variable}"); } @Test @@ -129,7 +129,7 @@ public GetQueueUrlResponse get() throws InterruptedException { // act try { - defaultQueueResolverService.resolveQueueUrl("${variable}"); + defaultQueueResolverService.resolveQueueUrl(sqsAsyncClient, "${variable}"); } catch (final QueueResolutionException exception) { // assert assertThat(Thread.currentThread().isInterrupted()).isTrue(); diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/client/MultipleSqsAsyncClientIntegrationTest.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/client/MultipleSqsAsyncClientIntegrationTest.java new file mode 100644 index 00000000..8b8d4cbb --- /dev/null +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/client/MultipleSqsAsyncClientIntegrationTest.java @@ -0,0 +1,93 @@ +package it.com.jashmore.sqs.client; + +import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import com.jashmore.sqs.spring.client.DefaultSqsAsyncClientProvider; +import com.jashmore.sqs.spring.client.SqsAsyncClientProvider; +import com.jashmore.sqs.spring.container.basic.QueueListener; +import com.jashmore.sqs.test.LocalSqsRule; +import com.jashmore.sqs.util.LocalSqsAsyncClient; +import com.jashmore.sqs.util.SqsQueuesConfig; +import it.com.jashmore.example.Application; +import lombok.extern.slf4j.Slf4j; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Service; +import org.springframework.test.context.junit4.SpringRunner; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; + +@SuppressWarnings("unused") +@Slf4j +@SpringBootTest(classes = {Application.class, MultipleSqsAsyncClientIntegrationTest.TestConfig.class}, webEnvironment = RANDOM_PORT) +@RunWith(SpringRunner.class) +public class MultipleSqsAsyncClientIntegrationTest { + private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(3); + + @ClassRule + public static final LocalSqsRule FIRST_CLIENT_LOCAL_SQS_RULE = new LocalSqsRule(ImmutableList.of( + SqsQueuesConfig.QueueConfig.builder().queueName("firstClientQueueName").build() + )); + + @ClassRule + public static final LocalSqsRule SECOND_CLIENT_LOCAL_SQS_RULE = new LocalSqsRule(ImmutableList.of( + SqsQueuesConfig.QueueConfig.builder().queueName("secondClientQueueName").build() + )); + + @Configuration + public static class TestConfig { + @Bean + public SqsAsyncClientProvider sqsAsyncClientProvider() { + final LocalSqsAsyncClient firstClient = FIRST_CLIENT_LOCAL_SQS_RULE.getLocalAmazonSqsAsync(); + firstClient.buildQueues(); + final LocalSqsAsyncClient secondClient = SECOND_CLIENT_LOCAL_SQS_RULE.getLocalAmazonSqsAsync(); + secondClient.buildQueues(); + return new DefaultSqsAsyncClientProvider(ImmutableMap.of( + "firstClient", firstClient, + "secondClient", secondClient + )); + } + + @Service + public static class MessageListeners { + @QueueListener(value = "firstClientQueueName", sqsClient = "firstClient") + public void firstClientMessageListener(final Message message) throws BrokenBarrierException, InterruptedException { + log.info("Obtained first client message: {}", message); + CYCLIC_BARRIER.await(); + } + + @QueueListener(value = "secondClientQueueName", sqsClient = "secondClient") + public void secondClientMessageListener(final Message message) throws BrokenBarrierException, InterruptedException { + log.info("Obtained second client message: {}", message); + CYCLIC_BARRIER.await(); + } + } + } + + @Test + public void shouldBeAbleToProcessMessagesFromMultipleAwsAccounts() throws Exception { + // arrange + FIRST_CLIENT_LOCAL_SQS_RULE.getLocalAmazonSqsAsync().sendMessageToLocalQueue("firstClientQueueName", SendMessageRequest.builder() + .messageBody("message") + .build()) + .get(5, TimeUnit.SECONDS); + SECOND_CLIENT_LOCAL_SQS_RULE.getLocalAmazonSqsAsync().sendMessageToLocalQueue("secondClientQueueName", SendMessageRequest.builder() + .messageBody("message") + .build()) + .get(5, TimeUnit.SECONDS); + + // act + CYCLIC_BARRIER.await(20, TimeUnit.SECONDS); + } +} diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/container/batching/BatchingQueueListenerEnvironmentIntegrationTest.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/container/batching/BatchingQueueListenerEnvironmentIntegrationTest.java deleted file mode 100644 index 9b311d37..00000000 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/container/batching/BatchingQueueListenerEnvironmentIntegrationTest.java +++ /dev/null @@ -1,92 +0,0 @@ -package it.com.jashmore.sqs.container.batching; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; - -import com.google.common.collect.ImmutableList; - -import com.jashmore.sqs.argument.payload.Payload; -import com.jashmore.sqs.spring.container.batching.BatchingQueueListener; -import com.jashmore.sqs.test.LocalSqsRule; -import com.jashmore.sqs.test.PurgeQueuesRule; -import com.jashmore.sqs.util.LocalSqsAsyncClient; -import com.jashmore.sqs.util.SqsQueuesConfig; -import it.com.jashmore.example.Application; -import lombok.extern.slf4j.Slf4j; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.stereotype.Service; -import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.junit4.SpringRunner; - -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.IntStream; - -@Slf4j -@SpringBootTest(classes = {Application.class, BatchingQueueListenerEnvironmentIntegrationTest.TestConfig.class}, webEnvironment = RANDOM_PORT) -@RunWith(SpringRunner.class) -@TestPropertySource(properties = { - "prop.concurrency=5" -}) -public class BatchingQueueListenerEnvironmentIntegrationTest { - private static final String QUEUE_NAME = "BatchingQueueListenerEnvironmentIntegrationTest"; - - private static final int NUMBER_OF_MESSAGES_TO_SEND = 5; - private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(NUMBER_OF_MESSAGES_TO_SEND + 1); - private static final AtomicInteger messagesProcessed = new AtomicInteger(0); - - @ClassRule - public static final LocalSqsRule LOCAL_SQS_RULE = new LocalSqsRule(ImmutableList.of( - SqsQueuesConfig.QueueConfig.builder().queueName(QUEUE_NAME).build() - )); - - @Rule - public final PurgeQueuesRule purgeQueuesRule = new PurgeQueuesRule(LOCAL_SQS_RULE.getLocalAmazonSqsAsync()); - - @Autowired - private LocalSqsAsyncClient localSqsAsyncClient; - - @Configuration - public static class TestConfig { - @Bean - public LocalSqsAsyncClient localSqsAsyncClient() { - return LOCAL_SQS_RULE.getLocalAmazonSqsAsync(); - } - - @SuppressWarnings("unused") - @Service - public static class MessageListener { - @BatchingQueueListener(value = QUEUE_NAME, concurrencyLevelString = "${prop.concurrency}") - public void listenToMessage(@Payload final String payload) { - try { - log.info("Received message: {}", payload); - messagesProcessed.incrementAndGet(); - CYCLIC_BARRIER.await(10, TimeUnit.SECONDS); - } catch (final Exception e) { - // do nothing - } - } - } - } - - @Test - public void allMessagesAreProcessedByListeners() throws Exception { - // arrange - IntStream.range(0, NUMBER_OF_MESSAGES_TO_SEND) - .forEach(i -> localSqsAsyncClient.sendMessageToLocalQueue(QUEUE_NAME, "message: " + i)); - - // act - CYCLIC_BARRIER.await(10, TimeUnit.SECONDS); - - // assert - assertThat(messagesProcessed.get()).isEqualTo(NUMBER_OF_MESSAGES_TO_SEND); - } -} diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/container/batching/BatchingQueueListenerIntegrationTest.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/container/batching/BatchingQueueListenerIntegrationTest.java deleted file mode 100644 index b41dd6b3..00000000 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/container/batching/BatchingQueueListenerIntegrationTest.java +++ /dev/null @@ -1,96 +0,0 @@ -package it.com.jashmore.sqs.container.batching; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; - -import com.google.common.collect.ImmutableList; - -import com.jashmore.sqs.argument.payload.Payload; -import com.jashmore.sqs.spring.container.batching.BatchingQueueListener; -import com.jashmore.sqs.test.LocalSqsRule; -import com.jashmore.sqs.test.PurgeQueuesRule; -import com.jashmore.sqs.util.LocalSqsAsyncClient; -import com.jashmore.sqs.util.SqsQueuesConfig; -import it.com.jashmore.example.Application; -import lombok.extern.slf4j.Slf4j; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.stereotype.Service; -import org.springframework.test.context.junit4.SpringRunner; -import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; -import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse; -import software.amazon.awssdk.services.sqs.model.QueueAttributeName; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.stream.IntStream; - -@Slf4j -@SpringBootTest(classes = {Application.class, BatchingQueueListenerIntegrationTest.TestConfig.class}, webEnvironment = RANDOM_PORT) -@RunWith(SpringRunner.class) -public class BatchingQueueListenerIntegrationTest { - private static final String QUEUE_NAME = "BatchingQueueListenerIntegrationTest"; - private static final int NUMBER_OF_MESSAGES_TO_SEND = 100; - private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(NUMBER_OF_MESSAGES_TO_SEND); - private static final int MESSAGE_VISIBILITY_IN_SECONDS = 2; - - @ClassRule - public static final LocalSqsRule LOCAL_SQS_RULE = new LocalSqsRule(ImmutableList.of( - SqsQueuesConfig.QueueConfig.builder().queueName(QUEUE_NAME).build() - )); - - @Rule - public final PurgeQueuesRule purgeQueuesRule = new PurgeQueuesRule(LOCAL_SQS_RULE.getLocalAmazonSqsAsync()); - - @Autowired - private LocalSqsAsyncClient localSqsAsyncClient; - - @Configuration - public static class TestConfig { - @Service - public static class MessageListener { - @SuppressWarnings("unused") - @BatchingQueueListener(value = QUEUE_NAME, messageVisibilityTimeoutInSeconds = MESSAGE_VISIBILITY_IN_SECONDS) - public void listenToMessage(@Payload final String payload) { - log.info("Obtained message: {}", payload); - COUNT_DOWN_LATCH.countDown(); - } - } - - @Bean - public LocalSqsAsyncClient localSqsAsyncClient() { - return LOCAL_SQS_RULE.getLocalAmazonSqsAsync(); - } - } - - @Test - public void allMessagesAreProcessedByListeners() throws InterruptedException, ExecutionException { - // arrange - IntStream.range(0, NUMBER_OF_MESSAGES_TO_SEND) - .forEach(i -> { - log.info("Sending message: " + i); - localSqsAsyncClient.sendMessageToLocalQueue(QUEUE_NAME, "message: " + i); - }); - - // act - COUNT_DOWN_LATCH.await(); - // Wait the visibility timeout to make sure that all messages were processed and deleted from the queue - Thread.sleep(MESSAGE_VISIBILITY_IN_SECONDS * 1000 * 2); - - // assert - final CompletableFuture queueAttributes = localSqsAsyncClient.getQueueAttributes(GetQueueAttributesRequest.builder() - .queueUrl(localSqsAsyncClient.getQueueUrl(QUEUE_NAME)) - .attributeNames(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE) - .build()); - final GetQueueAttributesResponse getQueueAttributesResponse = queueAttributes.get(); - assertThat(getQueueAttributesResponse.attributes().get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)).isEqualTo("0"); - assertThat(getQueueAttributesResponse.attributes().get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE)).isEqualTo("0"); - } -} diff --git a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/queue/EnvironmentQueueResolverServiceIntegrationTest.java b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/queue/EnvironmentQueueResolverServiceIntegrationTest.java index d8e7ff0f..d145426d 100644 --- a/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/queue/EnvironmentQueueResolverServiceIntegrationTest.java +++ b/java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/it/com/jashmore/sqs/queue/EnvironmentQueueResolverServiceIntegrationTest.java @@ -46,7 +46,7 @@ public LocalSqsAsyncClient localSqsAsyncClient() { @Test public void queueResolverResolvesVariablesFromEnvironmentProperties() { // act - final String queueUrl = queueResolver.resolveQueueUrl("${property.with.queue.url}"); + final String queueUrl = queueResolver.resolveQueueUrl(LOCAL_SQS_RULE.getLocalAmazonSqsAsync(), "${property.with.queue.url}"); // assert assertThat(queueUrl).isEqualTo("http://sqs.some.url"); @@ -55,7 +55,7 @@ public void queueResolverResolvesVariablesFromEnvironmentProperties() { @Test public void queueResolverForQueueNameObtainsQueueUrlFromSqs() { // act - final String queueUrl = queueResolver.resolveQueueUrl("${property.with.queue.name}"); + final String queueUrl = queueResolver.resolveQueueUrl(LOCAL_SQS_RULE.getLocalAmazonSqsAsync(), "${property.with.queue.name}"); // assert assertThat(queueUrl).startsWith(LOCAL_SQS_RULE.getServerUrl());