Skip to content

Commit

Permalink
Add integration tests
Browse files Browse the repository at this point in the history
Service Bus has no emulator yet, so only SQS tests are added.
  • Loading branch information
satabin committed May 7, 2024
1 parent 55bda1d commit 1c16601
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 3 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ jobs:
if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false'
run: sbt +update

- env:
SERVICES: sqs
uses: LocalStack/setup-localstack@main
with:
image-tag: latest

- name: Check that workflows are up to date
run: sbt githubWorkflowCheck

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.sqs

import cats.effect.{IO, Resource}
import com.commercetools.queue.QueueClient
import com.commercetools.queue.aws.sqs.SQSClient
import com.commercetools.queue.testkit.QueueClientSuite
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.regions.Region

import java.net.URI

class SqsClientSuite extends QueueClientSuite {

override def client: Resource[IO, QueueClient[IO]] =
SQSClient[IO](
Region.EU_WEST_1,
AnonymousCredentialsProvider.create(),
endpoint = Some(new URI("http://localhost:4566")))

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.commercetools.queue.aws.sqs
import cats.effect.Async
import cats.syntax.functor._
import cats.syntax.monadError._
import cats.syntax.traverse._
import com.commercetools.queue.{QueuePusher, Serializer}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest}
Expand Down Expand Up @@ -57,11 +58,12 @@ class SQSPusher[F[_], T](
SendMessageBatchRequest
.builder()
.queueUrl(queueUrl)
.entries(messages.map { message =>
.entries(messages.mapWithIndex { (message, idx) =>
SendMessageBatchRequestEntry
.builder()
.messageBody(serializer.serialize(message))
.delaySeconds(delaySeconds)
.id(idx.toString())
.build()
}.asJava)
.build())
Expand Down
33 changes: 31 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ val Scala213 = "2.13.12"
ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.3")
ThisBuild / scalaVersion := Scala213

lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s, unidocs)
lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, awsSqsIt, circe, otel4s, unidocs)

ThisBuild / tlSitePublishBranch := Some("main")

Expand All @@ -42,6 +42,29 @@ lazy val core = crossProject(JVMPlatform)
name := "fs2-queues-core"
)

lazy val testkit = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("testkit"))
.enablePlugins(NoPublishPlugin)
.settings(commonSettings)
.settings(
name := "fs2-queues-testkit",
libraryDependencies ++= List(
"org.scalameta" %%% "munit" % Versions.munit,
"org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect
)
)
.dependsOn(core)

// for sqs integration test, start a localstack with sqs
ThisBuild / githubWorkflowBuildPreamble := List(
WorkflowStep.Use(
UseRef.Public(owner = "LocalStack", repo = "setup-localstack", ref = "main"),
params = Map("image-tag" -> "latest"),
env = Map("SERVICES" -> "sqs")
)
)

lazy val otel4s = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("otel4s"))
Expand Down Expand Up @@ -80,7 +103,7 @@ lazy val azureServiceBus = crossProject(JVMPlatform)
"com.azure" % "azure-messaging-servicebus" % "7.15.1"
)
)
.dependsOn(core)
.dependsOn(core, testkit % Test)

lazy val awsSQS = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
Expand All @@ -95,6 +118,12 @@ lazy val awsSQS = crossProject(JVMPlatform)
)
.dependsOn(core)

lazy val awsSqsIt = project
.in(file("aws/sqs/integration"))
.enablePlugins(NoPublishPlugin)
.settings(commonSettings)
.dependsOn(awsSQS.jvm % Test, testkit.jvm % Test)

lazy val docs = project
.in(file("site"))
.enablePlugins(TypelevelSitePlugin)
Expand Down
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: "3.8"

services:
localstack:
container_name: sqs
image: localstack/localstack
ports:
- "127.0.0.1:4566:4566"
environment:
- SERVICES=sqs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.commercetools.queue.testkit

import cats.effect.std.Random
import cats.effect.{IO, Ref, Resource}
import com.commercetools.queue.QueueClient
import fs2.Stream
import munit.CatsEffectSuite

import scala.concurrent.duration._

/**
* This suite tests that the basic features of a [[com.commercetools.queue.QueueClient QueueClient]] are properly
* implemented for a concrete client.
* This is used in integration tests for the various implemented queue providers.
*/
abstract class QueueClientSuite extends CatsEffectSuite {

/** Provide a way to acquire a queue client for the provider under test. */
def client: Resource[IO, QueueClient[IO]]

val clientFixture = ResourceSuiteLocalFixture("queue-client", client)

override def munitFixtures = List(clientFixture)

val withQueue =
ResourceFixture(
Resource.make(
IO.randomUUID
.map(uuid => s"queue-$uuid")
.flatTap { queueName =>
clientFixture().administration
.create(queueName, 10.minutes, 2.minutes)
})(queueName => clientFixture().administration.delete(queueName)))

withQueue.test("published messages are received by a processor") { queueName =>
for {
random <- Random.scalaUtilRandom[IO]
size <- random.nextLongBounded(30L)
messages = List.range(0L, size).map(_.toString())
received <- Ref[IO].of(List.empty[String])
client = clientFixture()
_ <- Stream
.emits(messages)
.through(client.publish(queueName).sink(batchSize = 10))
.merge(
client
.subscribe(queueName)
.processWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(msg => received.update(msg.payload :: _))
.take(size)
)
.compile
.drain
_ <- assertIO(received.get.map(_.toSet), messages.toSet)
} yield ()
}

}

0 comments on commit 1c16601

Please sign in to comment.