Skip to content

Commit

Permalink
Add support for message attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
AL333Z committed May 28, 2024
1 parent 46f5792 commit 35f6841
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class SQSPuller[F[_], T](
.queueUrl(queueUrl)
.maxNumberOfMessages(batchSize)
.waitTimeSeconds(waitingTime.toSeconds.toInt)
.attributeNamesWithStrings(MessageSystemAttributeName.SENT_TIMESTAMP.toString())
.messageAttributeNames(".*")
.attributeNamesWithStrings(MessageSystemAttributeName.SENT_TIMESTAMP.toString)
.build(): @nowarn("msg=method attributeNamesWithStrings in trait Builder is deprecated")
)
}
Expand All @@ -72,7 +73,12 @@ class SQSPuller[F[_], T](
.attributes()
.get(MessageSystemAttributeName.SENT_TIMESTAMP)
.toLong),
metadata = message.attributesAsStrings().asScala.toMap,
metadata = message
.messageAttributes()
.asScala
.view
.collect { case (k, v) if v.dataType() == "String" => (k, v.stringValue()) }
.toMap,
receiptHandle = message.receiptHandle(),
messageId = message.messageId(),
lockTTL = lockTTL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.syntax.monadError._
import cats.syntax.traverse._
import com.commercetools.queue.{QueuePusher, Serializer}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest}
import software.amazon.awssdk.services.sqs.model.{MessageAttributeValue, SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest}

import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
Expand All @@ -36,32 +36,34 @@ class SQSPusher[F[_], T](
F: Async[F])
extends QueuePusher[F, T] {

override def push(message: T, delay: Option[FiniteDuration]): F[Unit] =
override def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): F[Unit] =
F.fromCompletableFuture {
F.delay {
client.sendMessage(
SendMessageRequest
.builder()
.queueUrl(queueUrl)
.messageBody(serializer.serialize(message))
.messageAttributes(metadata.view.mapValues(mkStringAttributeValue).toMap.asJava)
.delaySeconds(delay.fold(0)(_.toSeconds.toInt))
.build())
}
}.void
.adaptError(makePushQueueException(_, queueName))

override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] =
override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] =
F.fromCompletableFuture {
F.delay {
val delaySeconds = delay.fold(0)(_.toSeconds.toInt)
client.sendMessageBatch(
SendMessageBatchRequest
.builder()
.queueUrl(queueUrl)
.entries(messages.mapWithIndex { (message, idx) =>
.entries(messages.mapWithIndex { case ((payload, metadata), idx) =>
SendMessageBatchRequestEntry
.builder()
.messageBody(serializer.serialize(message))
.messageBody(serializer.serialize(payload))
.messageAttributes(metadata.view.mapValues(mkStringAttributeValue).toMap.asJava)
.delaySeconds(delaySeconds)
.id(idx.toString())
.build()
Expand All @@ -71,4 +73,7 @@ class SQSPusher[F[_], T](
}.void
.adaptError(makePushQueueException(_, queueName))

private def mkStringAttributeValue(s: String): MessageAttributeValue =
MessageAttributeValue.builder().dataType("String").stringValue(s).build()

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.azure.messaging.servicebus.{ServiceBusReceivedMessage, ServiceBusRece
import com.commercetools.queue.MessageContext

import java.time.Instant
import scala.jdk.CollectionConverters.MapHasAsScala

class ServiceBusMessageContext[F[_], T](
val payload: F[T],
Expand All @@ -35,7 +36,9 @@ class ServiceBusMessageContext[F[_], T](
override def enqueuedAt: Instant = underlying.getEnqueuedTime().toInstant()

override def metadata: Map[String, String] =
Map.empty
underlying.getRawAmqpMessage.getApplicationProperties.asScala.view.collect {
case (k, v) if v.isInstanceOf[String] => (k, v.asInstanceOf[String])
}.toMap

override def ack(): F[Unit] =
F.blocking(receiver.complete(underlying)).void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ class ServiceBusPusher[F[_], Data](
F: Async[F])
extends QueuePusher[F, Data] {

override def push(message: Data, delay: Option[FiniteDuration]): F[Unit] = {
override def push(message: Data, metadata: Map[String, String], delay: Option[FiniteDuration]): F[Unit] = {
val sbMessage = new ServiceBusMessage(serializer.serialize(message))
sbMessage.getApplicationProperties.putAll(metadata.asJava)
delay.traverse_ { delay =>
F.realTimeInstant
.map(now => sbMessage.setScheduledEnqueueTime(now.plusMillis(delay.toMillis).atOffset(ZoneOffset.UTC)))
Expand All @@ -44,8 +45,12 @@ class ServiceBusPusher[F[_], Data](
.adaptError(makePushQueueException(_, queueName))
}

override def push(messages: List[Data], delay: Option[FiniteDuration]): F[Unit] = {
val sbMessages = messages.map(msg => new ServiceBusMessage(serializer.serialize(msg)))
override def push(messages: List[(Data, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] = {
val sbMessages = messages.map { case (payload, metadata) =>
val sbm = new ServiceBusMessage(serializer.serialize(payload))
sbm.getApplicationProperties.putAll(metadata.asJava)
sbm
}
delay.traverse_ { delay =>
F.realTimeInstant.map { now =>
sbMessages.foreach { msg =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwable]) {
* produced data to the queue. The messages are published in batches, according
* to the `batchSize` parameter.
*/
def sink(batchSize: Int = 10)(upstream: Stream[F, T]): Stream[F, Nothing] =
def sink(batchSize: Int = 10)(upstream: Stream[F, (T, Map[String, String])]): Stream[F, Nothing] =
Stream.resource(pusher).flatMap { pusher =>
upstream.chunkN(batchSize).foreach { chunk =>
pusher.push(chunk.toList, None)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/com/commercetools/queue/QueuePusher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ trait QueuePusher[F[_], T] {
/**
* Publishes a single message to the queue, with an optional delay.
*/
def push(message: T, delay: Option[FiniteDuration]): F[Unit]
def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): F[Unit]

/**
* Publishes a bunch of messages to the queue, with an optional delay.
*/
def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit]
def push(messages: (List[(T, Map[String, String])]), delay: Option[FiniteDuration]): F[Unit]

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ class TestQueue[T](
Chunk.chain(newlyLocked.map(_._2)))
}

def enqeueMessages(messages: List[T], delay: Option[FiniteDuration]) =
def enqeueMessages(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]) =
state.evalUpdate { state =>
for {
now <- IO.realTimeInstant
state <- update(state)
} yield delay match {
case None =>
state.copy(available = state.available.addAll(messages.map(TestMessage(_, now))))
state.copy(available = state.available.addAll(messages.map(x => TestMessage(x._1, now))))
case Some(delay) =>
val delayed = now.plusMillis(delay.toMillis)
state.copy(delayed = messages.map(TestMessage(_, delayed)) reverse_::: state.delayed)
state.copy(delayed = messages.map(x => TestMessage(x._1, delayed)) reverse_::: state.delayed)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class TestQueuePusher[T](queue: TestQueue[T]) extends QueuePusher[IO, T] {

override val queueName: String = queue.name

override def push(message: T, delay: Option[FiniteDuration]): IO[Unit] =
queue.enqeueMessages(message :: Nil, delay)
override def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): IO[Unit] =
queue.enqeueMessages((message, metadata) :: Nil, delay)

override def push(messages: List[T], delay: Option[FiniteDuration]): IO[Unit] =
override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): IO[Unit] =
queue.enqeueMessages(messages, delay)

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,18 @@ class PubSubPusher[F[_], T](
serializer: Serializer[T])
extends QueuePusher[F, T] {

private def makeMessage(payload: T, waitUntil: Option[Instant]): F[PubsubMessage] =
private def makeMessage(payload: T, metadata: Map[String, String], waitUntil: Option[Instant]): F[PubsubMessage] =
F.delay {
val builder = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(serializer.serialize(payload)))
builder.putAllAttributes(metadata.asJava)
waitUntil.foreach(waitUntil => builder.putAttributes(delayAttribute, waitUntil.toString()))
builder.build
}

override def push(message: T, delay: Option[FiniteDuration]): F[Unit] =
override def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): F[Unit] =
(for {
waitUntil <- delay.traverse(delay => F.realTimeInstant.map(_.plusMillis(delay.toMillis)))
msg <- makeMessage(message, waitUntil)
msg <- makeMessage(message, metadata, waitUntil)
_ <- wrapFuture(
F.delay(
publisher
Expand All @@ -58,10 +59,10 @@ class PubSubPusher[F[_], T](
} yield ())
.adaptError(makePushQueueException(_, queueName))

override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] =
override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] =
(for {
waitUntil <- delay.traverse(delay => F.realTimeInstant.map(_.plusMillis(delay.toMillis)))
msgs <- messages.traverse(makeMessage(_, waitUntil))
msgs <- messages.traverse { case (payload, metadata) => makeMessage(payload, metadata, waitUntil) }
_ <- wrapFuture(
F.delay(publisher
.publishCallable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ class MeasuringQueuePusher[F[_], T](

override def queueName: String = underlying.queueName

override def push(message: T, delay: Option[FiniteDuration]): F[Unit] =
override def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): F[Unit] =
tracer
.span("queue.pushMessage")
.surround {
underlying
.push(message, delay)
.push(message, metadata, delay)
}
.guaranteeCase(metrics.send)

override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] =
override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] =
tracer
.span("queue.pushMessages")
.surround {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ class MeasuringPusherSuite extends CatsEffectSuite {

override def queueName: String = self.queueName

override def push(message: String, delay: Option[FiniteDuration]): IO[Unit] = result
override def push(message: String, metadata: Map[String, String], delay: Option[FiniteDuration]): IO[Unit] =
result

override def push(messages: List[String], delay: Option[FiniteDuration]): IO[Unit] = result
override def push(messages: List[(String, Map[String, String])], delay: Option[FiniteDuration]): IO[Unit] =
result

}

Expand All @@ -47,7 +49,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
val measuringPusher =
new MeasuringQueuePusher[IO, String](pusher(IO.unit), new QueueMetrics(queueName, counter), Tracer.noop)
for {
fiber <- measuringPusher.push("msg", None).start
fiber <- measuringPusher.push("msg", Map.empty, None).start
_ <- assertIO(fiber.join.map(_.isSuccess), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -61,7 +63,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
val measuringPusher =
new MeasuringQueuePusher[IO, String](pusher(IO.unit), new QueueMetrics(queueName, counter), Tracer.noop)
for {
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3").map(x => (x, Map.empty)), None).start
_ <- assertIO(fiber.join.map(_.isSuccess), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -78,7 +80,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
new QueueMetrics(queueName, counter),
Tracer.noop)
for {
fiber <- measuringPusher.push("msg", None).start
fiber <- measuringPusher.push("msg", Map.empty, None).start
_ <- assertIO(fiber.join.map(_.isError), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -95,7 +97,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
new QueueMetrics(queueName, counter),
Tracer.noop)
for {
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3").map(x => (x, Map.empty)), None).start
_ <- assertIO(fiber.join.map(_.isError), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -109,7 +111,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
val measuringPusher =
new MeasuringQueuePusher[IO, String](pusher(IO.canceled), new QueueMetrics(queueName, counter), Tracer.noop)
for {
fiber <- measuringPusher.push("msg", None).start
fiber <- measuringPusher.push("msg", Map.empty, None).start
_ <- assertIO(fiber.join.map(_.isCanceled), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -123,7 +125,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
val measuringPusher =
new MeasuringQueuePusher[IO, String](pusher(IO.canceled), new QueueMetrics(queueName, counter), Tracer.noop)
for {
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3").map(x => (x, Map.empty)), None).start
_ <- assertIO(fiber.join.map(_.isCanceled), true)
_ <- assertIO(
counter.records.get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,26 @@ abstract class QueueClientSuite extends CatsEffectSuite {
for {
random <- Random.scalaUtilRandom[IO]
size <- random.nextLongBounded(30L)
messages = List.range(0L, size).map(_.toString())
received <- Ref[IO].of(List.empty[String])
messages = List
.range(0L, size)
.map(i => (i.toString, Map(s"metadata-$i-key" -> s"$i-value", s"metadata-$i-another-key" -> "another-value")))
received <- Ref[IO].of(List.empty[(String, Map[String, String])])
client = clientFixture()
_ <- Stream
.emits(messages)
.through(client.publish(queueName).sink(batchSize = 10))
.merge(
client
.subscribe(queueName)
.processWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(msg => received.update(msg.rawPayload :: _))
.processWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(msg =>
received.update((msg.rawPayload, msg.metadata) :: _))
.take(size)
)
.compile
.drain
_ <- assertIO(received.get.map(_.toSet), messages.toSet)
_ <- assertIO(
received.get.map(_.map(x => (x._1, keepMetadataWithPrefix(x._2, "metadata"))).toSet),
messages.toSet)
} yield ()
}

Expand All @@ -79,12 +84,17 @@ abstract class QueueClientSuite extends CatsEffectSuite {
withQueue.test("delayed messages should not be pulled before deadline") { queueName =>
val client = clientFixture()
client.publish(queueName).pusher.use { pusher =>
pusher.push("delayed message", Some(2.seconds))
pusher.push("delayed message", Map("metadata-key" -> "value"), Some(2.seconds))
} *> client.subscribe(queueName).puller.use { puller =>
for {
_ <- assertIO(puller.pullBatch(1, 1.second), Chunk.empty)
_ <- IO.sleep(2.seconds)
_ <- assertIO(puller.pullBatch(1, 1.second).map(_.map(_.rawPayload)), Chunk("delayed message"))
_ <- assertIO(
puller
.pullBatch(1, 1.second)
.map(_.map(x => (x.rawPayload, keepMetadataWithPrefix(x.metadata, "metadata")))),
Chunk(("delayed message", Map("metadata-key" -> "value")))
)
} yield ()

}
Expand All @@ -103,4 +113,8 @@ abstract class QueueClientSuite extends CatsEffectSuite {
} yield ()
}

// to only keep metadata entries we are interested in, in not the ones set by the provider
private def keepMetadataWithPrefix(metadata: Map[String, String], prefix: String): Map[String, String] =
metadata.filter(_._1.startsWith(prefix))

}

0 comments on commit 35f6841

Please sign in to comment.