Skip to content

Commit

Permalink
AWS SQS: Keep message order (#1214)
Browse files Browse the repository at this point in the history
This fixes messages arriving order when pulling from a FIFO queue and adds test coverage for that particular scenario.

Cheers!
  • Loading branch information
nebtrx authored and 2m committed Sep 17, 2018
1 parent fe62bd1 commit 26fe3d3
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ final class SqsSourceStage(queueUrl: String, settings: SqsSourceSettings)(implic
currentRequests = currentRequests - 1
maxCurrentConcurrency = if (result.getMessages.isEmpty) 1 else maxConcurrency

val receivedMessages = result.getMessages.asScala.reverse
val receivedMessages = result.getMessages.asScala
receivedMessages.foreach(buffer.offer)

if (receivedMessages.isEmpty && settings.closeOnEmptyReceive) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.stream.alpakka.sqs.scaladsl
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import com.amazonaws.services.sqs.AmazonSQSAsync
import com.amazonaws.services.sqs.model.CreateQueueRequest
import org.elasticmq.rest.sqs.{SQSLimits, SQSRestServer, SQSRestServerBuilder}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite, Tag}
Expand All @@ -33,6 +34,12 @@ trait DefaultTestContext extends BeforeAndAfterAll with BeforeAndAfterEach with

def randomQueueUrl(): String = sqsClient.createQueue(s"queue-${Random.nextInt}").getQueueUrl

val fifoQueueRequest = new CreateQueueRequest(s"queue-${Random.nextInt}.fifo")
.addAttributesEntry("FifoQueue", "true")
.addAttributesEntry("ContentBasedDeduplication", "true")

def randomFifoQueueUrl(): String = sqsClient.createQueue(fifoQueueRequest).getQueueUrl

override protected def beforeEach(): Unit =
sqsServer = SQSRestServerBuilder.withActorSystem(system).withSQSLimits(SQSLimits.Relaxed).withDynamicPort().start()

Expand Down Expand Up @@ -62,5 +69,4 @@ trait DefaultTestContext extends BeforeAndAfterAll with BeforeAndAfterEach with
//#init-client
awsSqsClient
}

}
20 changes: 20 additions & 0 deletions sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,26 @@ class SqsSpec extends FlatSpec with Matchers with DefaultTestContext {
probe.cancel()
}

it should "pull messages from a fifo queue following the same production order" taggedAs Integration in {
val queue = randomFifoQueueUrl()
implicit val awsSqsClient = sqsClient

for (i <- 0 until 10) {
val msg = s"Message - $i"
awsSqsClient.sendMessage(new SendMessageRequest(queue, msg).withMessageGroupId("group1"))
}

val probe = SqsSource(queue, sqsSourceSettings)
.runWith(TestSink.probe[Message])

for (i <- 0 until 10) {
val result = probe.requestNext()
result.getBody shouldBe s"Message - $i"
}

probe.cancel()
}

it should "publish batch of messages and pull them" taggedAs Integration in {
val queue = randomQueueUrl()
implicit val awsSqsClient = sqsClient
Expand Down

0 comments on commit 26fe3d3

Please sign in to comment.