As the core library can be quite verbose to configure a MessageListenerContainer, the Core Kotlin DSL tool can be used to easily set up a message listener.
-
Depend on the Core Kotlin DSL module:
implementation("com.jashmore:core-kotlin-dsl:${version}")
or
<dependency> <groupId>com.jashmore</groupId> <artifactId>core-kotlin-dsl</artifactId> <version>${version}</version> </dependency>
-
Create the MessageListenerContainer using the Kotlin DSL
val container = coreMessageListener("identifier", sqsAsyncClient, queueUrl) { retriever = prefetchingMessageRetriever { desiredPrefetchedMessages = 10 maxPrefetchedMessages = 20 } processor = coreProcessor { argumentResolverService = coreArgumentResolverService(objectMapper) bean = MessageListener() method = MessageListener::class.java.getMethod("listen", String::class.java) } broker = concurrentBroker { concurrencyLevel = { 10 } concurrencyPollingRate = { Duration.ofSeconds(30) } } resolver = batchingResolver { bufferingSizeLimit = { 5 } bufferingTime = { Duration.ofSeconds(2) } } }
-
Start the container as normal
container.start()
Check out the Core Kotlin DSL for more details about the internals of this module and what you can use.
val container = coreMessageListener("identifier", sqsAsyncClient, queueUrl) {
processor = lambdaProcessor {
method { message ->
log.info("Message received: {}", message.body())
}
}
// other configuration
}
This is equivalent to the @QueueListener annotation used in a Spring Boot application which will set up a container that will request for messages in batches.
val container = batchingMessageListener("identifier", sqsAsyncClient, "url") {
concurrencyLevel = { 10 }
batchSize = { 5 }
batchingPeriod = { Duration.ofSeconds(5) }
processor = lambdaProcessor {
method { message ->
log.info("Message: {}", message.body())
}
}
}
This is equivalent to the @PrefetchingQueueListener annotation used in a Spring Boot application which will set up a container that will prefetch messages for processing.
val container = prefetchingMessageListener("identifier", sqsAsyncClient, "url") {
concurrencyLevel = { 2 }
desiredPrefetchedMessages = 5
maxPrefetchedMessages = 10
processor = lambdaProcessor {
method { message ->
log.info("Message: {}", message.body())
}
}
}
A full example of using the Kotlin DSL can be found in the core-kotlin-example.