Skip to content

Commit

Permalink
Introduce MessageHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
AL333Z committed May 30, 2024
1 parent b6be5e6 commit 08ecd06
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import software.amazon.awssdk.services.sqs.model.{GetQueueAttributesRequest, Que

class SQSSubscriber[F[_], T](
val queueName: String,
client: SqsAsyncClient,
asyncClient: SqsAsyncClient,
getQueueUrl: F[String]
)(implicit
F: Async[F],
Expand All @@ -34,7 +34,7 @@ class SQSSubscriber[F[_], T](
private def getLockTTL(queueUrl: String): F[Int] =
F.fromCompletableFuture {
F.delay {
client.getQueueAttributes(
asyncClient.getQueueAttributes(
GetQueueAttributesRequest
.builder()
.queueUrl(queueUrl)
Expand All @@ -49,7 +49,7 @@ class SQSSubscriber[F[_], T](
for {
queueUrl <- getQueueUrl
lockTTL <- getLockTTL(queueUrl)
} yield new SQSPuller(queueName, client, queueUrl, lockTTL)
} yield new SQSPuller(queueName, asyncClient, queueUrl, lockTTL)
}

}
13 changes: 9 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ lazy val core = crossProject(JVMPlatform)
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.Message.rawPayload"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueueAdministration.configuration"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueuePusher.push"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueuePusher.push")
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueuePusher.push"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueueSubscriber.this")
)
)

Expand Down Expand Up @@ -95,7 +96,8 @@ lazy val otel4s = crossProject(JVMPlatform)
),
// TODO: Remove once 0.2 is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.otel4s.MeasuringQueuePusher.push")
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.otel4s.MeasuringQueuePusher.push"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.otel4s.MeasuringQueueSubscriber.this")
)
)
.dependsOn(core % "compile->compile;test->test")
Expand Down Expand Up @@ -124,7 +126,9 @@ lazy val azureServiceBus = crossProject(JVMPlatform)
// TODO: Remove once 0.2 is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.commercetools.queue.azure.servicebus.ServiceBusPusher.push")
"com.commercetools.queue.azure.servicebus.ServiceBusPusher.push"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.commercetools.queue.azure.servicebus.ServiceBusQueueSubscriber.this")
)
)
.dependsOn(core, testkit % Test)
Expand All @@ -141,7 +145,8 @@ lazy val awsSQS = crossProject(JVMPlatform)
// TODO: Remove once 0.2 is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.aws.sqs.SQSMessageContext.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.aws.sqs.SQSPusher.push")
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.aws.sqs.SQSPusher.push"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.aws.sqs.SQSSubscriber.this")
)
)
.dependsOn(core)
Expand Down
51 changes: 51 additions & 0 deletions core/src/main/scala/com/commercetools/queue/MessageHandler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue

import cats.MonadThrow
import cats.syntax.applicativeError._
import cats.syntax.functor._

import scala.concurrent.duration.FiniteDuration

trait MessageHandler[F[_], T, Res] {
def handle(msg: Message[F, T]): F[Decision[Res]]
}

trait ImmediateDecisionMessageHandler[F[_], T, Res] {
def handle(msg: Message[F, T]): F[ImmediateDecision[Res]]
}

sealed trait Decision[+O]
sealed trait ImmediateDecision[+O] extends Decision[O]
object Decision {
case class Ok[O](res: O) extends ImmediateDecision[O]
case object Drop extends ImmediateDecision[Nothing]
case class Fail(t: Throwable, ack: Boolean) extends ImmediateDecision[Nothing]
case class Reenqueue(metadata: Option[Map[String, String]], delay: Option[FiniteDuration]) extends Decision[Nothing]
}

object MessageHandler {
// nack on any failure except for deserialization exception
def default[F[_]: MonadThrow, T, O](f: Message[F, T] => F[O]): ImmediateDecisionMessageHandler[F, T, O] =
msg =>
f(msg).attempt.map {
case Left(de: DeserializationException) => Decision.Fail(de, ack = true)
case Left(t) => Decision.Fail(t, ack = false)
case Right(a) => Decision.Ok(a)
}
}
56 changes: 56 additions & 0 deletions core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,60 @@ abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {
case Left(_) => ctx.nack()
})

/**
* Processes the messages with the provided message handler.
* The messages are ack'ed, nack'ed, reenqueued or moved to a DLQ based on the decision returned from the handler.
* The stream emits results or errors down-stream and does not fail on business logic errors,
* allowing you to build error recovery logic.
*
* Messages in a batch are processed in parallel but result is emitted in order the messages were received,
* with the exclusion of the messages that have been reenqueued, dropped and DLQ'ed.
*/
final def process[Res, D[_] <: Decision[Res]](
batchSize: Int,
waitingTime: FiniteDuration,
publisherForReenqueue: QueuePublisher[F, T]
)(handler: MessageHandler[F, T, Res]
): Stream[F, Either[Throwable, Res]] =
Stream
.resource(publisherForReenqueue.pusher)
.flatMap { pusher =>
messages(batchSize, waitingTime)
.parEvalMap(batchSize) { ctx =>
handler.handle(ctx).flatMap[Option[Either[Throwable, Res]]] {
case Decision.Ok(res) => ctx.ack().as(res.asRight.some)
case Decision.Drop => ctx.ack().as(none)
case Decision.Fail(t, true) => ctx.ack().as(t.asLeft.some)
case Decision.Fail(t, false) => ctx.nack().as(t.asLeft.some)
case Decision.Reenqueue(metadata, delay) =>
ctx.payload.flatMap(pusher.push(_, metadata.getOrElse(ctx.metadata), delay)).as(none)
}
}
.flattenOption
}

/**
* Processes the messages with the provided message handler.
* The messages are ack'ed or nack'ed based on the decision returned from the handler.
* The stream emits results or errors down-stream and does not fail on business logic errors,
* allowing you to build error recovery logic.
*
* Messages in a batch are processed in parallel but result is emitted in order the messages were received,
* with the exclusion of the messages that have been dropped.
*/
final def processWithImmediateDecision[Res](
batchSize: Int,
waitingTime: FiniteDuration
)(handler: ImmediateDecisionMessageHandler[F, T, Res]
): Stream[F, Either[Throwable, Res]] =
messages(batchSize, waitingTime)
.parEvalMap(batchSize) { ctx =>
handler.handle(ctx).flatMap[Option[Either[Throwable, Res]]] {
case Decision.Ok(res) => ctx.ack().as(res.asRight.some)
case Decision.Drop => ctx.ack().as(none)
case Decision.Fail(t, true) => ctx.ack().as(t.asLeft.some)
case Decision.Fail(t, false) => ctx.nack().as(t.asLeft.some)
}
}
.flattenOption
}
39 changes: 36 additions & 3 deletions core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ class SubscriberSuite extends CatsEffectSuite {
.map { state =>
val queue =
new TestQueue[String](name = "test-queue", state = state, messageTTL = 15.minutes, lockTTL = 1.minute)
(queue, new TestQueueSubscriber(queue))
(queue, new TestQueueSubscriber(queue), new TestQueuePublisher(queue))
}
.toResource)

queueSub.test("Successful messages must be acked") { case (queue, subscriber) =>
queueSub.test("Successful messages must be acked") { case (queue, subscriber, _) =>
TestControl
.executeEmbed(for {
// first populate the queue
Expand Down Expand Up @@ -70,7 +70,7 @@ class SubscriberSuite extends CatsEffectSuite {
}

queueSub.test("Messages must be unack'ed if processing fails and emit everything up to failure") {
case (queue, subscriber) =>
case (queue, subscriber, _) =>
TestControl
.executeEmbed(for {
// first populate the queue
Expand Down Expand Up @@ -98,4 +98,37 @@ class SubscriberSuite extends CatsEffectSuite {
}
}

queueSub.test("Messages consumed with process must follow the decision") { case (queue, subscriber, publisher) =>
TestControl
.executeEmbed(for {
// first populate the queue
messages <- List.range(0, 100).traverse { i =>
IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(s"message-$i", _))
}
_ <- queue.setAvailableMessages(messages)
// then process messages in batches of 5
// processing is (virtually) instantaneous in this case,
// so messages are immediately acked, from the mocked time PoV
// however, receiving messages waits for the amount of provided `waitingTime`
// in the test queue implementation, event if enough messages are available
// so this step makes time progress in steps of `waitingTime`
result <- subscriber
.process(batchSize = 5, waitingTime = 40.millis, publisher)(new MessageHandler[IO, String, Int] {
override def handle(msg: Message[IO, String]): IO[Decision[Int]] =
IO.pure(Decision.Ok(1))
})
.interruptAfter(3.seconds)
.compile
.foldMonoid
} yield result)
.flatMap { result =>
for {
_ <- assertIO(IO.pure(result), 100.asRight)
_ <- assertIO(queue.getAvailableMessages, Nil)
_ <- assertIO(queue.getLockedMessages, Nil)
_ <- assertIO(queue.getDelayedMessages, Nil)
} yield ()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class PubSubClient[F[_]: Async] private (
SubscriptionName.of(project, s"fs2-queue-$name"),
channelProvider,
credentials,
endpoint)
endpoint
)

}

Expand Down

0 comments on commit 08ecd06

Please sign in to comment.