Skip to content

Latest commit

 

History

History
129 lines (103 loc) · 10.9 KB

core-implementations-overview.md

File metadata and controls

129 lines (103 loc) · 10.9 KB

Core Implementations

This document provides a quick overview of the core implementations of the API that can be used. Note that consumer can define their own implementations and are not required to use these core implementations.

Architecture

The following is the diagram for how a single message would be processed through the library.

Core Framework Architecture Diagram

Message Retriever

The MessageRetriever has a simple API in that it only exposes methods to obtain a message from the queue, and it makes no requirements on how it should get these messages from the SQS queue. This allows for ability to optimise the retrieval of messages such as batching requests for retrieving messages or pre-fetching messages.

Core implementations include:

  • PrefetchingMessageRetriever: this will prefetch messages from the queue so that new messages can be processed as soon as possible. This implementation is not appropriate if the time to process messages is long enough for the prefetched message's visibility timeout to expire before it can be processed. In this scenario, the message's re-drive policy may place the message back into the queue resulting in it being processed multiple times. This implementation is appropriate for high volumes of messages that take little time to process.
  • BatchingMessageRetriever: This will batch requests for messages from the consumer into a single call out to the SQS queue once reaching the threshold of messages, or the batching timeout expires. This reduces the number of calls out to the SQS queue but can reduce the performance as no messages processing will occur while waiting for the batch size to be reached.

Message Processor

The MessageProcessor has the responsibility of processing the message that has been retrieved by theMessageRetriever by calling the corresponding Java method that processes the message. This should resolve any arguments for the method by calling to the ArgumentResolverService and using the resulting values as the method arguments.

Core implementations include:

  • CoreMessageProcessor: default implementation that calls out to a ArgumentResolverService to resolve the arguments and calls the method.
  • DecoratingMessageProcessor: implementation that allows for the message processing to be decorated with MessageProcessingDecorator logic. This can be useful for adding tracing, metrics or other extra functionality in the message processing.
  • LambdaMessageProcessor: implementation that will use a lambda/functional synchronous methods to process the message. This does not support any argument resolution using an ArgumentResolverService.
  • AsyncLambdaMessageProcessor: implementation that will use a lambda/functional asynchronous methods (returns a CompletableFuture) to process the message. This does not support any argument resolution using an ArgumentResolverService.

ArgumentResolverService

The ArgumentResolverService is used to obtain the ArgumentResolver that can be used to populate an argument in a method when processing a message. For example, a parameter with the @Payload annotation will be resolved with the body of the message cast to that type.

The implementations of the ArgumentResolverService include:

The core arguments that be resolved include:

  • software.amazon.awssdk.services.sqs.model.Message: arguments that have the Message type will place the entire message that is being processed into this argument. This is useful if you need to forward this message to other services or want to manually extract information from the service. This is provided by the MessageArgumentResolver.
  • @Payload: arguments annotated with this will parse the message body into that object. If this is a String, the raw message body will be provided, otherwise if it is a Java Bean, an attempt to cast the message body to that bean will be used. This is provided by the PayloadArgumentResolver which uses a PayloadMapper, such as the JacksonPayloadMapper, to parse the message body.
  • @MessageId: string arguments annotated with this will place the message ID of the message into this argument. This is provided by the MessageIdArgumentResolver.
  • Acknowledge: arguments of this type will be injected with an implementation that allows for a message to be manually acknowledged when it is successfully processed. Note that if this is included, the MessageProcessor is not required to acknowledge the message after a successful execution and the consumer must acknowledge the message them self. The implementation of the Acknowledge should be provided by the MessageProcessor instead of an ArgumentResolver.
  • @MessageAttribute: arguments annotated with this will attempt to parse the contents of the message attribute into this field. For example, if the argument is a String then the attribute will be cast to a string where as if the argument is an integer it will try and parse the string into the number. This also works with POJOs in that the resolver will attempt to deserialised the message attribute into this POJO shape, e.g. via the Jackson Object Mapper. This is provided by the MessageAttributeArgumentResolver.
  • @MessageSystemAttribute: arguments annotated with this will attempt to parse the contents of a system message attribute into this field. For example, the SENT_TIMESTAMP of the message can be obtained by this annotation. This is provided by the MessageSystemAttributeArgumentResolver.
  • VisibilityExtender: arguments of this type will be injected with an implementation that extends the message visibility of the current message. These implementations should be provided by the MessageProcessor instead of an ArgumentResolver.

Message Broker

The MessageBroker is the main container that controls the whole flow of messages from the MessageRetriever to the MessageProcessor. It can provide logic like the rate of concurrency of messages processing or when messages should be processed.

Core implementation include:

  • ConcurrentMessageBroker: this implementation will run on multiple threads each processing messages. It allows the configuration to be changed dynamically, such as changing the rate of concurrency to change while the application is running.

Message Resolver

The MessageResolver is used when the message has been successfully processed, and it needs to be removed from the SQS queue.

Core implementation include:

  • BatchingMessageResolver: this implementation will batch calls to delete messages from the SQS queue into a batch that will go out together once asynchronously. This is useful if you are processing many messages at the same time, and it is desirable to reduce the number of calls out to SQS. A disadvantage is that the message may sit in the batch for enough time for the visibility timeout to expire, and it is placed onto the queue again. To mitigate this, a smaller batch timeout should be used or by increasing the visibility timeout. Note that you can configure this to always delete a message as soon as it is finished by setting the batch size of 1.