Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ will not acknowledge the message automatically when the method executes without
The [Spring Cloud AWS Messaging](https://github.com/spring-cloud/spring-cloud-aws/tree/master/spring-cloud-aws-messaging) `@SqsListener` works by requesting
a set of messages from the SQS and when they are done it will request some more. There is one disadvantage with this approach in that if 9/10 of the messages
finish in 10 milliseconds but one takes 10 seconds no other messages will be picked up until that last message is complete. The
[@BatchingQueueListener](./java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListener.java)
[@QueueListener](./java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java)
provides the same basic functionality but it also provides a timeout where eventually it will request for more messages even for the threads that are
ready for another message. It will also batch the removal of messages from the queue and therefore with a concurrency level of 10, if there are a lot messages
on the queue, only 2 requests would be made to SQS for retrieval and deletion of messages. The usage is something like this:

```java
@Service
public class MyMessageListener {
@BatchingQueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10, maxPeriodBetweenBatchesInMs = 2000)
@QueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10, maxPeriodBetweenBatchesInMs = 2000)
public void processMessage(@Payload final String payload) {
// process the message payload here
}
Expand All @@ -121,7 +121,7 @@ before requesting messages for threads waiting for another message.

### Setting up a queue listener that prefetches messages
When the amount of messages for a service is extremely high, prefetching messages may be a way to optimise the throughput of the application. The
[@PrefetchingQueueListener](./java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListener.java)
[@PrefetchingQueueListener](./java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListener.java)
annotation can be used to pretech messages in a background thread while messages are currently being processed. The usage is something like this:

```java
Expand Down Expand Up @@ -236,6 +236,12 @@ mvn clean install -DskipTests
(cd examples/java-dynamic-sqs-listener-core-examples && mvn exec:java)
```

### Connecting to multiple AWS Accounts using the Spring Starter
If the Spring Boot application needs to connect to SQS queues across multiple AWS Accounts, you will need to provide a
[SqsAsyncClientProvider](./java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/client/SqsAsyncClientProvider.java)
which will be able to obtain a specific `SqsAsyncClient` based on an identifier. For more information on how to do this, take a look at the documentation
at [How To Connect to Multiple AWS Accounts](doc/how-to-guides/spring/spring-how-to-connect-to-multiple-aws-accounts.md)

### Comparing Libraries
If you want to see the difference between this library and others like the
[Spring Cloud AWS Messaging](https://github.com/spring-cloud/spring-cloud-aws/tree/master/spring-cloud-aws-messaging) and
Expand Down
4 changes: 3 additions & 1 deletion doc/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ more in depth understanding take a look at the JavaDoc for the API.
extending the visibility of a message in the case of long processing so it does not get put back on the queue while processing
1. [How to manually acknowledge message](how-to-guides/core/core-how-to-manually-acknowledge-message.md): useful for when you want to mark the
message as successfully processed before the method has finished executing
1. [How to Connect to an AWS SQS Queue](how-to-guides/how-to-connect-to-aws-sqs-queue.md): necessary for actually using this framework in live environments
1. Spring How To Guides
1. [How to add a custom ArgumentResolver to a Spring application](how-to-guides/spring/spring-how-to-add-custom-argument-resolver.md): useful for
integrating custom argument resolution code to be included in a Spring Application. See [How to implement a custom ArgumentResolver](how-to-guides/core/core-how-to-implement-a-custom-argument-resolver.md)
Expand All @@ -27,7 +28,8 @@ more in depth understanding take a look at the JavaDoc for the API.
writing right?
1. [How to Start/Stop Queue Listeners](how-to-guides/spring/spring-how-to-start-stop-queue-listeners.md): guide for starting and stopping the
processing of messages for specific queue listeners
1. [How to Connect to an AWS SQS Queue](how-to-guides/how-to-connect-to-aws-sqs-queue.md): necessary for actually using this framework in live environments
1. [How to connect to multiple AWS Accounts](how-to-guides/spring/spring-how-to-connect-to-multiple-aws-accounts.md): guide for listening to queues
across multiple AWS Accounts
1. Local Development:
1. [Setting up IntelliJ](local-development/setting-up-intellij.md): steps for setting IntelliJ up for development,
e.g. configuring checkstyle, Lombok, etc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Spring - How to Connect to multiple AWS Accounts
There may be a scenario where you need to connect to multiple queues across multiple AWS Accounts. In this scenario you would
need to provide multiple `SqsAsyncClients` and for each queue listener you will need to indicate which one is desired. For a full
example take a look at the [Multiple AWS Accounts Example](../../../examples/multiple-aws-account-example) which shows you how to
connect to two locally running ElasticMQ servers.

## Steps
1. Create some queues that will use specific `SqsAsyncClient`s identified by an id.
```java
public class MyMessageListeners {
// This uses the "default" SqsAsyncClient which may not be present
@QueueListener("queueNameForDefaultListener")
public void listenerForDefaultClient(@Payload String messageBody) {

}

// This uses the "firstClient" SqsAsyncClient
@QueueListener(value = "queueNameForFirstClient", sqsClient = "firstClient")
public void queueNameListenerForFirstClient(@Payload String messageBody) {

}

// This uses the "firstClient" SqsAsyncClient
@QueueListener(value = "anotherQueueNameForFirstClient", sqsClient = "firstClient")
public void anotherQueueNameListenerForFirstClient(@Payload String messageBody) {

}

// This uses the "secondClient" SqsAsyncClient
@QueueListener(value = "queueNameForSecondClient", sqsClient = "secondClient")
public void queueNameListenerForSecondClient(@Payload String messageBody) {

}
}
```
1. You will need to add to your `@Configuration` a bean of type
[SqsAsyncClientProvider](../../../java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-api/src/main/java/com/jashmore/sqs/spring/client/SqsAsyncClientProvider.java)
which will provide all of the `SqsAsyncClient`s for the queues above.
```java
@Configuration
public class MyConfig {
@Bean
public SqsAsyncClientProvider sqsAsyncClientProvider() {
// this client will be used if there is no client identifier for the listener. Note that this can be null
// and in this case listenerForDefaultClient above will fail to wrap
final SqsAsyncClient defaultClient = ...;

final SqsAsyncClient firstClient = ...;
final SqsAsyncClient secondClient = ...;

return new DefaultSqsAsyncClientProvider(
defaultClient,
ImmutableMap.of(
"firstClient", firstClient,
"secondClient", secondClient
)
);
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ public static void main(final String[] args) throws Exception {
.bufferingTimeInMs(5000)
.build());
final MessageProcessor messageProcessor = new DefaultMessageProcessor(
argumentResolverService(sqsAsyncClient),
argumentResolverService(),
queueProperties,
sqsAsyncClient,
messageResolver,
messageReceivedMethod,
messageConsumer
Expand Down Expand Up @@ -184,12 +185,11 @@ private static SqsAsyncClient startElasticMqServer() throws URISyntaxException {
/**
* Builds the {@link ArgumentResolverService} that will be used to parse the messages into arguments for the {@link MessageConsumer}.
*
* @param sqsAsyncClient the client to communicate with the SQS queue
* @return the service to resolve arguments for the message consumer
*/
private static ArgumentResolverService argumentResolverService(final SqsAsyncClient sqsAsyncClient) {
private static ArgumentResolverService argumentResolverService() {
final PayloadMapper payloadMapper = new JacksonPayloadMapper(OBJECT_MAPPER);
return new CoreArgumentResolverService(payloadMapper, sqsAsyncClient, OBJECT_MAPPER);
return new CoreArgumentResolverService(payloadMapper, OBJECT_MAPPER);
}

/**
Expand Down
129 changes: 129 additions & 0 deletions examples/multiple-aws-account-example/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>examples</artifactId>
<groupId>com.jashmore</groupId>
<version>2.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>multiple-aws-account-example</artifactId>

<name>Java Dynamic SQS Listener - Multiple AWS Accounts Example</name>
<description>Contains examples for listening to SQS queues on multiple AWS Accounts</description>

<properties>
<findbugs.config.location>../../configuration/findbugs/bugsExcludeFilter.xml</findbugs.config.location>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring.boot.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.jashmore</groupId>
<artifactId>java-dynamic-sqs-listener-spring-starter</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.jashmore</groupId>
<artifactId>local-amazon-sqs</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.elasticmq</groupId>
<artifactId>elasticmq-rest-sqs_2.11</artifactId>
<version>0.13.9</version>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
<filtering>true</filtering>
<includes>
<include>**/*.yml</include>
<include>**/application*.properties</include>
</includes>
</resource>
<resource>
<directory>${basedir}/src/main/resources</directory>
<excludes>
<exclude>**/*.yml</exclude>
<exclude>**/application*.properties</exclude>
</excludes>
</resource>
</resources>

<pluginManagement>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.jashmore.sqs.examples.Application</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<delimiters>
<delimiter>${resource.delimiter}</delimiter>
</delimiters>
<useDefaultDelimiters>false</useDefaultDelimiters>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.jashmore.sqs.examples;

import com.google.common.collect.ImmutableMap;

import akka.http.scaladsl.Http;
import com.jashmore.sqs.spring.client.DefaultSqsAsyncClientProvider;
import com.jashmore.sqs.spring.client.SqsAsyncClientProvider;
import lombok.extern.slf4j.Slf4j;
import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;

import java.net.URI;
import java.util.concurrent.ExecutionException;

@SpringBootApplication
@EnableScheduling
@Slf4j
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class);
}

/**
* Creates the {@link SqsAsyncClientProvider} that will allow multiple AWS Account SQS queues to be queried.
*
* @return the provider to use
* @throws InterruptedException if it was interrupted while building the clients
*/
@Bean
public SqsAsyncClientProvider sqsAsyncClientProvider() throws InterruptedException {
return new DefaultSqsAsyncClientProvider(null, ImmutableMap.of(
"firstClient", buildClient("firstClientQueue"),
"secondClient", buildClient("secondClientQueue")
));
}

/**
* Starts an in-memory ElasticMQ server and returns a {@link SqsAsyncClient} pointing to it, each call to this represents a different AWS Account.
*
* <p>Note that the region and credentials are hardcoded to fake values as they are not checked but connecting to an actual AWS account here will involve
* reading environment variables or other properties.
*
* @param queueName the name of a queue to create in this SQS Server
* @return the {@link SqsAsyncClient} that points to this SQS Server
* @throws InterruptedException if it was interrupted while creating the queue
*/
private static SqsAsyncClient buildClient(String queueName) throws InterruptedException {
log.info("Starting Local ElasticMQ SQS Server");
final SQSRestServer sqsRestServer = SQSRestServerBuilder
.withInterface("localhost")
.withDynamicPort()
.start();

final Http.ServerBinding serverBinding = sqsRestServer.waitUntilStarted();
final SqsAsyncClient client = SqsAsyncClient.builder()
.endpointOverride(URI.create("http://localhost:" + serverBinding.localAddress().getPort()))
.region(Region.of("localstack"))
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("accessKeyId", "secretAccessKey")))
.build();
try {
client.createQueue(CreateQueueRequest.builder()
.queueName(queueName)
.build())
.get();
} catch (ExecutionException executionException) {
throw new RuntimeException(executionException.getCause());
}
return client;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.jashmore.sqs.examples;

import com.jashmore.sqs.argument.payload.Payload;
import com.jashmore.sqs.spring.container.basic.QueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("unused")
@Slf4j
public class MessageListeners {
/**
* Basic queue listener.
*
* @param payload the payload of the SQS Message
*/
@QueueListener(value = "firstClientQueue", sqsClient = "firstClient")
public void firstClientMessageProcessing(@Payload final String payload) {
log.info("Message Received from firstClient#firstClientQueue: {}", payload);
}

/**
* Basic queue listener.
*
* @param payload the payload of the SQS Message
*/
@QueueListener(value = "secondClientQueue", sqsClient = "secondClient")
public void secondClientMessageProcessing(@Payload final String payload) {
log.info("Message Received from secondClient#secondClientQueue: {}", payload);
}
}
Loading