-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AWS SQS: Keep message order #1214
AWS SQS: Keep message order #1214
Conversation
Hi @nebtrx, Thank you for your contribution! We really value the time you've taken to put this together. Before we proceed with reviewing this pull request, please sign the Lightbend Contributors License Agreement: |
messages | ||
.map(msg => { | ||
awsSqsClient | ||
.sendMessage(new SendMessageRequest(queue, msg).withMessageGroupId("group1")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used AWS SQS SDK API here because AmazonSQSBufferedAsyncClient currently does not support FIFO Queues. So, I prefer this sync approach instead of mapping over the messages creating a bunch of futures and waiting for those to preserve the initial messages order
Lightbend CLA already signed 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great to have a test, as well.
val messages = for (i <- 0 until 10) yield s"Message - $i" | ||
|
||
messages | ||
.map(msg => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this foreach
? or just adding it to the for
expression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, indeed. Thanks for the observation. Suggestion applied!
@@ -75,7 +75,7 @@ final class SqsSourceStage(queueUrl: String, settings: SqsSourceSettings)(implic | |||
currentRequests = currentRequests - 1 | |||
maxCurrentConcurrency = if (result.getMessages.isEmpty) 1 else maxConcurrency | |||
|
|||
val receivedMessages = result.getMessages.asScala.reverse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still wonder how it got there...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was there from the very beginning of the SQS connector. Anyways, this library fixes more issues than it causes 😁😉
Fixes #1209 |
val queue = randomFifoQueueUrl() | ||
implicit val awsSqsClient = sqsClient | ||
|
||
for (i <- 0 until 10) yield { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The yield
doesn't do anything useful now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oppss, sorry about that. I little bit of context switching and too much time of for comprehensions instead of for loops. Fixed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thank you for the fix!
You're welcome @2m. Glad to be able to help |
This fixes messages arriving order when pulling from a FIFO queue and adds test coverage for that particular scenario.
Feel free to make suggestions to this one. Otherwise, this is ready to be merged.
Cheers!