Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIVE-4704: Allow sender workers to poll SQS queues for messages #814

Merged
merged 2 commits into from
Nov 9, 2022

Conversation

frankie297
Copy link
Contributor

@frankie297 frankie297 commented Nov 8, 2022

What does this change?

As part of the work to create an EC2-based sender worker we want the ability to allow our sender workers to poll multiple SQS queues for received messages, and to clean up these messages after processing.

This PR includes:

  • Adding an async SQS client, so we can asynchronously process messages
  • Adds a pollQueue function that will use the sqs client to call the receive messages function
  • Adds a long polling wait time of 20 seconds, AWS say this helps with empty responses and false empty responses. It also helps reduce costs
  • We then parse the received message and delete the message from the queue after the message has successfully processed
  • in the event of no messages on the queue, it will poll for 20 seconds before completing.
  • Removes the previously hard coded test notification

How to test

  • When testing locally, this PR updates the script to include the new sqsEc2Url parameter, for the EC2 SQS queue arns.

I tested locally, adding a message manually onto a queue (the iOS sender) and triggering the sender worker. Scenarios tested were:

  • Receiving and processing a message from the queue
  • Polling an empty queue
  • Polling a queue with multiple messages (in this scenario only one message was processed)
  • Deleting messages from the queue

This PR sets up the ability to poll, and the configuration. In a following PR will be the functionality to actually poll the queues on some sort of loop.

With the help of @DavidLawes

Copy link
Contributor

@DavidLawes DavidLawes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this is a step towards continuous polling and further development work, it looks good to me :)


def pollQueue(queue: String, handleChunkTokens: (SQSEvent, Context) => Unit): Future[Unit] = Future {
logger.info(s"About to poll queue - $queue")
val receiveMessages = sqsClient.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queue).withWaitTimeSeconds(20)).getMessages.asScala.toList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wait time might be something we want to experiment with later on, there might be some tradeoff we'd be willing to make, eg make more api calls but more quickly get messages off the queue

def pollQueue(queue: String, handleChunkTokens: (SQSEvent, Context) => Unit): Future[Unit] = Future {
logger.info(s"About to poll queue - $queue")
val receiveMessages = sqsClient.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queue).withWaitTimeSeconds(20)).getMessages.asScala.toList

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ReceiveMessageRequest can accept a getMaxNumberOfMessages, which would allow us to get more than 1 (up to 10) messages from the queue for each api call. This might be a setting we experiment with later, but probably not something we need to add now


handleChunkTokens(parsedEvent, null)

parsedEvent.getRecords.forEach(m => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment handleChunkTokens returns Unit and the method itself handles all failures resulting from processing. Without changing some of the internal logic of this method (used by both lambdas and ec2) we didn't think we could conditionally delete messages from the source queue and/or put onto a dlq.

We decided for now that, as long as the ec2 service has started correctly and has begun polling, we will delete any messages we receive.

We should check what happens if any of the messages fail to send (I thought we logged errors from handleChunkTokens and also triggered alerts)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants