diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b0a0c7e..319a421 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,6 +52,12 @@ jobs: if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false' run: sbt +update + - env: + SERVICES: sqs + uses: LocalStack/setup-localstack@main + with: + image-tag: latest + - name: Check that workflows are up to date run: sbt githubWorkflowCheck diff --git a/aws/sqs/integration/src/test/scala/com/commercetools/queue/sqs/SqsClientSuite.scala b/aws/sqs/integration/src/test/scala/com/commercetools/queue/sqs/SqsClientSuite.scala new file mode 100644 index 0000000..5f8d0a0 --- /dev/null +++ b/aws/sqs/integration/src/test/scala/com/commercetools/queue/sqs/SqsClientSuite.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.sqs + +import cats.effect.{IO, Resource} +import com.commercetools.queue.QueueClient +import com.commercetools.queue.aws.sqs.SQSClient +import com.commercetools.queue.testkit.QueueClientSuite +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider +import software.amazon.awssdk.regions.Region + +import java.net.URI + +class SqsClientSuite extends QueueClientSuite { + + override def client: Resource[IO, QueueClient[IO]] = + SQSClient[IO]( + Region.EU_WEST_1, + AnonymousCredentialsProvider.create(), + endpoint = Some(new URI("http://localhost:4566"))) + +} diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala index 148b898..a7e7532 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala @@ -19,6 +19,7 @@ package com.commercetools.queue.aws.sqs import cats.effect.Async import cats.syntax.functor._ import cats.syntax.monadError._ +import cats.syntax.traverse._ import com.commercetools.queue.{QueuePusher, Serializer} import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest} @@ -57,11 +58,12 @@ class SQSPusher[F[_], T]( SendMessageBatchRequest .builder() .queueUrl(queueUrl) - .entries(messages.map { message => + .entries(messages.mapWithIndex { (message, idx) => SendMessageBatchRequestEntry .builder() .messageBody(serializer.serialize(message)) .delaySeconds(delaySeconds) + .id(idx.toString()) .build() }.asJava) .build()) diff --git a/build.sbt b/build.sbt index e3105e6..55a5b28 100644 --- a/build.sbt +++ b/build.sbt @@ -15,7 +15,7 @@ val Scala213 = "2.13.12" ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.3") ThisBuild / scalaVersion := Scala213 -lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s, unidocs) +lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, awsSqsIt, circe, otel4s, unidocs) ThisBuild / tlSitePublishBranch := Some("main") @@ -42,6 +42,29 @@ lazy val core = crossProject(JVMPlatform) name := "fs2-queues-core" ) +lazy val testkit = crossProject(JVMPlatform) + .crossType(CrossType.Pure) + .in(file("testkit")) + .enablePlugins(NoPublishPlugin) + .settings(commonSettings) + .settings( + name := "fs2-queues-testkit", + libraryDependencies ++= List( + "org.scalameta" %%% "munit" % Versions.munit, + "org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect + ) + ) + .dependsOn(core) + +// for sqs integration test, start a localstack with sqs +ThisBuild / githubWorkflowBuildPreamble := List( + WorkflowStep.Use( + UseRef.Public(owner = "LocalStack", repo = "setup-localstack", ref = "main"), + params = Map("image-tag" -> "latest"), + env = Map("SERVICES" -> "sqs") + ) +) + lazy val otel4s = crossProject(JVMPlatform) .crossType(CrossType.Pure) .in(file("otel4s")) @@ -80,7 +103,7 @@ lazy val azureServiceBus = crossProject(JVMPlatform) "com.azure" % "azure-messaging-servicebus" % "7.15.1" ) ) - .dependsOn(core) + .dependsOn(core, testkit % Test) lazy val awsSQS = crossProject(JVMPlatform) .crossType(CrossType.Pure) @@ -95,6 +118,12 @@ lazy val awsSQS = crossProject(JVMPlatform) ) .dependsOn(core) +lazy val awsSqsIt = project + .in(file("aws/sqs/integration")) + .enablePlugins(NoPublishPlugin) + .settings(commonSettings) + .dependsOn(awsSQS.jvm % Test, testkit.jvm % Test) + lazy val docs = project .in(file("site")) .enablePlugins(TypelevelSitePlugin) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..db478a3 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,10 @@ +version: "3.8" + +services: + localstack: + container_name: sqs + image: localstack/localstack + ports: + - "127.0.0.1:4566:4566" + environment: + - SERVICES=sqs diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala new file mode 100644 index 0000000..7459db6 --- /dev/null +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala @@ -0,0 +1,57 @@ +package com.commercetools.queue.testkit + +import cats.effect.std.Random +import cats.effect.{IO, Ref, Resource} +import com.commercetools.queue.QueueClient +import fs2.Stream +import munit.CatsEffectSuite + +import scala.concurrent.duration._ + +/** + * This suite tests that the basic features of a [[com.commercetools.queue.QueueClient QueueClient]] are properly + * implemented for a concrete client. + * This is used in integration tests for the various implemented queue providers. + */ +abstract class QueueClientSuite extends CatsEffectSuite { + + /** Provide a way to acquire a queue client for the provider under test. */ + def client: Resource[IO, QueueClient[IO]] + + val clientFixture = ResourceSuiteLocalFixture("queue-client", client) + + override def munitFixtures = List(clientFixture) + + val withQueue = + ResourceFixture( + Resource.make( + IO.randomUUID + .map(uuid => s"queue-$uuid") + .flatTap { queueName => + clientFixture().administration + .create(queueName, 10.minutes, 2.minutes) + })(queueName => clientFixture().administration.delete(queueName))) + + withQueue.test("published messages are received by a processor") { queueName => + for { + random <- Random.scalaUtilRandom[IO] + size <- random.nextLongBounded(30L) + messages = List.range(0L, size).map(_.toString()) + received <- Ref[IO].of(List.empty[String]) + client = clientFixture() + _ <- Stream + .emits(messages) + .through(client.publish(queueName).sink(batchSize = 10)) + .merge( + client + .subscribe(queueName) + .processWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(msg => received.update(msg.payload :: _)) + .take(size) + ) + .compile + .drain + _ <- assertIO(received.get.map(_.toSet), messages.toSet) + } yield () + } + +}