Skip to content

Commit

Permalink
Make access do deserialized payload effectful
Browse files Browse the repository at this point in the history
Deserializing the content might fail if the payload is malformed. The
idea is to make access to it effectful (with memoization) so that
failure occurs when the payload is accessed. The logic behind it is that
this is a business logic error to handle, not something that can be
handled at the library level.

This PR also introduces access to the `rawPayload` to handle malformed
body.
  • Loading branch information
satabin committed May 23, 2024
1 parent 528d984 commit d19d84e
Show file tree
Hide file tree
Showing 13 changed files with 66 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import software.amazon.awssdk.services.sqs.model.{ChangeMessageVisibilityRequest
import java.time.Instant

class SQSMessageContext[F[_], T](
val payload: T,
val payload: F[T],
val rawPayload: String,
val enqueuedAt: Instant,
val metadata: Map[String, String],
receiptHandle: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.effect.syntax.concurrent._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
Expand Down Expand Up @@ -58,11 +59,14 @@ class SQSPuller[F[_], T](
Chunk
.iterator(response.messages().iterator().asScala)
.traverse { message =>
val body = message.body()
deserializer
.deserializeF(message.body())
.deserializeF(body)
.memoize
.map { payload =>
new SQSMessageContext(
payload = payload,
rawPayload = body,
enqueuedAt = Instant.ofEpochMilli(
message
.attributes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import com.commercetools.queue.MessageContext
import java.time.Instant

class ServiceBusMessageContext[F[_], T](
val payload: T,
val payload: F[T],
val underlying: ServiceBusReceivedMessage,
receiver: ServiceBusReceiverClient
)(implicit F: Async[F])
extends MessageContext[F, T] {

override def rawPayload: String = underlying.getBody().toString()

override def enqueuedAt: Instant = underlying.getEnqueuedTime().toInstant()

override def metadata: Map[String, String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.commercetools.queue.azure.servicebus

import cats.effect.Async
import cats.effect.syntax.concurrent._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
Expand Down Expand Up @@ -45,6 +46,7 @@ class ServiceBusPuller[F[_], Data](
chunk.traverse { sbMessage =>
deserializer
.deserializeF(sbMessage.getBody().toString())
.memoize
.map { data =>
new ServiceBusMessageContext(data, sbMessage, receiver)
}
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/com/commercetools/queue/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@ import java.time.Instant
/**
* Interface to access message data received from a queue.
*/
trait Message[T] {
trait Message[F[_], T] {

/**
* Unique message identifier
*/
def messageId: String

/**
* The message payload
* The deserialized message payload
*/
def payload: T
def payload: F[T]

/**
* The raw message content
*/
def rawPayload: String

/**
* When the message was put into the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.concurrent.duration._
* Interface to interact with a message received from a queue.
* The messages must be explicitly aknowledged after having been processed.
*/
abstract class MessageContext[F[_], T](implicit F: Temporal[F]) extends Message[T] {
abstract class MessageContext[F[_], T](implicit F: Temporal[F]) extends Message[F, T] {

/**
* Acknowledges the message. It will be removed from the queue, so that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {
* Messages in a batch are processed sequentially, stopping at the first error.
* All results up to the error will be emitted downstream before failing.
*/
final def processWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: Message[T] => F[Res])
final def processWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: Message[F, T] => F[Res])
: Stream[F, Res] = {
// to have full control over nacking things in time after a failure, and emitting
// results up to the error, we resort to a `Pull`, which allows this fine graind control
Expand Down Expand Up @@ -118,7 +118,7 @@ abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {
* Messages in a batch are processed in parallel but result is emitted in
* order the messages were received.
*/
final def attemptProcessWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: Message[T] => F[Res])
final def attemptProcessWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: Message[F, T] => F[Res])
: Stream[F, Either[Throwable, Res]] =
messages(batchSize, waitingTime).parEvalMap(batchSize)(ctx =>
f(ctx).attempt.flatTap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ class SubscriberSuite extends CatsEffectSuite {
result <- subscriber
// take all messages in one big batch
.processWithAutoAck(batchSize = 100, waitingTime = 40.millis)(m =>
IO.raiseWhen(m.payload == "message-43")(new Exception("BOOM")).as(m))
IO.raiseWhen(m.rawPayload == "message-43")(new Exception("BOOM")).as(m))
.attempt
.compile
.toList
} yield (messages, result))
.flatMap { case (originals, result) =>
for {
// check that all messages were consumed up to message #43
_ <- assertIO(IO.pure(result.init.map(_.map(_.payload))), originals.take(43).map(m => Right(m.payload)))
_ <- assertIO(IO.pure(result.init.map(_.map(_.rawPayload))), originals.take(43).map(m => Right(m.payload)))
_ <- assertIO(IO.pure(result.last.leftMap(_.getMessage())), Left("BOOM"))
_ <- assertIO(queue.getAvailableMessages, originals.drop(43))
_ <- assertIO(queue.getLockedMessages, Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ final case class LockedTestMessage[T](

override def messageId: String = lock.toString

override def payload: T = msg.payload
override def payload: IO[T] = IO.pure(msg.payload)

override def rawPayload: String = msg.payload.toString()

override def enqueuedAt: Instant = msg.enqueuedAt

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ case class TestingMessageContext[T](

def noop: MessageContext[IO, T] = new MessageContext[IO, T] {
override def messageId: String = self.messageId
override def payload: T = self.payload
override def payload: IO[T] = IO.pure(self.payload)
override def rawPayload: String = self.payload.toString()
override def enqueuedAt: Instant = self.enqueuedAt
override def metadata: Map[String, String] = self.metadata
override def ack(): IO[Unit] = IO.unit
Expand All @@ -40,7 +41,8 @@ case class TestingMessageContext[T](

def failing(t: Exception): MessageContext[IO, T] = new MessageContext[IO, T] {
override def messageId: String = self.messageId
override def payload: T = self.payload
override def payload: IO[T] = IO.pure(self.payload)
override def rawPayload: String = self.payload.toString()
override def enqueuedAt: Instant = self.enqueuedAt
override def metadata: Map[String, String] = self.metadata
override def ack(): IO[Unit] = IO.raiseError(t)
Expand All @@ -50,7 +52,8 @@ case class TestingMessageContext[T](

def canceled: MessageContext[IO, T] = new MessageContext[IO, T] {
override def messageId: String = self.messageId
override def payload: T = self.payload
override def payload: IO[T] = IO.pure(self.payload)
override def rawPayload: String = self.payload.toString()
override def enqueuedAt: Instant = self.enqueuedAt
override def metadata: Map[String, String] = self.metadata
override def ack(): IO[Unit] = IO.canceled
Expand Down
41 changes: 28 additions & 13 deletions docs/getting-started/subscribing.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,20 @@ In the following, we explain what kind of control flow handling is provided by t

The @:api(com.commercetools.queue.QueueSubscriber) abstraction provides a `processWithAutoAck()` method, which automatically handles the control flow part for you. You only need to provide the processing function, allowing you to focus on your business logic.

@:callout(info)
The `payload` is effectful as it performs data deserialization, which can fail when a message payload is malformed.
You can access the raw data by using `rawPayload`.

The deserialized data is memoized so that subsequent accesses to it are not recomputed.
@:@

```scala mdoc:compile-only
import scala.concurrent.duration._

subscriber.processWithAutoAck(batchSize = 10, waitingTime = 20.seconds) { message =>
IO.println(s"Received ${message.payload}").as(message.messageId)
message.payload.flatMap { payload =>
IO.println(s"Received $payload").as(message.messageId)
}
}
```

Expand Down Expand Up @@ -71,12 +80,14 @@ import scala.concurrent.duration._
subscriber
.messages(batchSize = 10, waitingTime = 20.seconds)
.evalMap { context =>
IO.println(s"Received ${context.payload}")
.as(context.messageId)
.guaranteeCase {
case Outcome.Succeeded(_) => context.ack()
case _ => context.nack()
}
context.payload.flatMap { payload =>
IO.println(s"Received $payload")
.as(context.messageId)
}
.guaranteeCase {
case Outcome.Succeeded(_) => context.ack()
case _ => context.nack()
}
}
```

Expand Down Expand Up @@ -108,7 +119,9 @@ subscriber.puller.use { queuePuller =>
.pullBatch(batchSize = 10, waitingTime = 20.seconds)
.flatMap { chunk =>
chunk.traverse_ { context =>
IO.println(s"Received ${context.payload}").guaranteeCase {
context.payload.flatMap { payload =>
IO.println(s"Received $payload")
}.guaranteeCase {
case Outcome.Succeeded(_) => context.ack()
case _ => context.nack()
}
Expand Down Expand Up @@ -139,11 +152,13 @@ subscriber.puller.use { queuePuller =>
.flatMap { chunk =>
chunk.traverse_ { context =>
supervisor.supervise {
IO.println(s"Received ${context.payload}")
.guaranteeCase {
case Outcome.Succeeded(_) => context.ack()
case _ => context.nack()
}
context.payload.flatMap { payload =>
IO.println(s"Received $payload")
}
.guaranteeCase {
case Outcome.Succeeded(_) => context.ack()
case _ => context.nack()
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ class MeasuringMessageContext[F[_], T](

override def messageId: String = underlying.messageId

override def payload: T = underlying.payload
override def payload: F[T] = underlying.payload

override def rawPayload: String = underlying.rawPayload

override def enqueuedAt: Instant = underlying.enqueuedAt

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ abstract class QueueClientSuite extends CatsEffectSuite {
.merge(
client
.subscribe(queueName)
.processWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(msg => received.update(msg.payload :: _))
.processWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(msg => received.update(msg.rawPayload :: _))
.take(size)
)
.compile
Expand Down

0 comments on commit d19d84e

Please sign in to comment.