Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Migration-5-6.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ Changes in Scala API:
1. The whole API is _finally tagless_ - all methods return `F[_]`. See [related section](README.md#scala-usage) in docs.
1. The API now uses type-conversions - provide type and related converter when creating producer/consumer.
See [related section](README.md#providing-converters-for-producer/consumer) in docs.
1. The `Delivery` is now generic - e.g. `Delivery[Bytes]` (depends on type-conversion).
1. The `Delivery` is now sealed trait - there are `Delivery.Ok[A]` (e.g. `Delivery[Bytes]`, depends on type-conversion) and `Delivery.MalformedContent`.
After getting the `Delivery[A]` you should pattern-match it.
1. The API now requires an implicit `monix.execution.Scheduler` instead of `ExecutionContext`.
1. Methods like `RabbitMQConnection.declareQueue` now return `F[Unit]` (was `Try[Done]` before).
1. Possibility to pass manually created configurations (`ProducerConfig` etc.) is now gone. The only option is to use TypeSafe config.
1. There is no `RabbitMQConsumer.bindTo` method anymore. Use [additional declarations](README.md#additional-declarations-and-bindings) for such thing.
1. There are new methods in [`RabbitMQConnection`](core/src/main/scala/com/avast/clients/rabbitmq/RabbitMQConnection.scala): `newChannel` and `withChannel`.
1. [`RabbitMQPullConsumer`](README.md#pull-consumer) was added

Changes in Java API:

Expand All @@ -27,3 +29,4 @@ Changes in Java API:
1. Method `RabbitMQProducer.send` now returns `CompletableFuture[Void]` (was `void` before) - ***it's not blocking anymore!***
1. `RabbitMQConsumer` and `RabbitMQProducer` (`api` module) are now traits and have their `Default*` counterparts in `core` module
1. There is no `RabbitMQConsumer.bindTo` method anymore. Use [additional declarations](README.md#additional-declarations-and-bindings) for such thing.
1. `RabbitMQPullConsumer` was added
31 changes: 20 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,13 @@ val monitor: Monitor = ???
// if you expect very high load, you can use separate connections for each producer/consumer, but it's usually not needed
val rabbitConnection = RabbitMQConnection.fromConfig[Task](config, blockingExecutor) // DefaultRabbitMQConnection[Task]

val consumer = rabbitConnection.newConsumer[Bytes]("consumer", monitor) { (delivery: Delivery[Bytes]) =>
println(delivery)
Task.now(DeliveryResult.Ack)
val consumer = rabbitConnection.newConsumer[Bytes]("consumer", monitor) {
case delivery: Delivery.Ok[Bytes] =>
println(delivery)
Task.now(DeliveryResult.Ack)

case _: Delivery.MalformedContent =>
Task.now(DeliveryResult.Reject)
} // DefaultRabbitMQConsumer

val sender = rabbitConnection.newProducer("producer", monitor) // DefaultRabbitMQProducer[Task]
Expand Down Expand Up @@ -295,6 +299,9 @@ public class ExampleJava {
}
```

The Java API has some limitations compared to the Scala one - mainly it does not support [types conversions](#providing-converters-for-producer/consumer)
and it offers only asynchronous version with `CompletableFuture` as result of all operations.

See [full example](core/src/test/java/ExampleJava.java)

## Notes
Expand Down Expand Up @@ -360,7 +367,12 @@ Check [reference.conf](core/src/main/resources/reference.conf) for all options o
Sometimes your use-case just doesn't fit the _normal_ consumer scenario. Here you can use the _pull consumer_ which gives you much more
control over the received messages. You _pull_ new message from the queue and acknowledge (reject, ...) it somewhere in the future.

The pull consumer operates with `Option` which is used for expressing either getting the delivery _or_ detecting an empty queue.
The pull consumer uses `PullResult` as return type:
* Ok - contains `DeliveryWithHandle` instance
* EmptyQueue - there was no message in the queue available

Additionally you can call `.toOption` method on the `PullResult`.


A simplified example:
```scala
Expand All @@ -377,14 +389,14 @@ val consumer = connection.newPullConsumer[Bytes](???, ???)


// receive "up to" 100 deliveries
val deliveries: Future[Seq[Option[DeliveryWithHandle[Future, Bytes]]]] = Future.sequence { (1 to 100).map(_ => consumer.pull()) }
val deliveries: Future[Seq[PullResult[Future, Bytes]]] = Future.sequence { (1 to 100).map(_ => consumer.pull()) }

// do your stuff!

???

// "handle" all deliveries
val handleResult: Future[Unit] = deliveries.flatMap(s => Future.sequence(s.flatten.map(_.handle(DeliveryResult.Ack))).map(_ => Unit))
// "handle" all deliveries, ignore failures and "empty queue" results
val handleResult: Future[Unit] = deliveries.flatMap(s => Future.sequence(s.flatMap(_.toOption).map(_.handle(DeliveryResult.Ack))).map(_ => Unit))

consumer.close()
connection.close()
Expand Down Expand Up @@ -425,10 +437,7 @@ case class NewFileSourceAdded(fileSources: Seq[FileSource])
val consumer = MultiFormatConsumer.forType[Future, NewFileSourceAdded](
JsonDeliveryConverter.derive(), // requires implicit `io.circe.Decoder[NewFileSourceAdded]`
GpbDeliveryConverter[NewFileSourceAddedGpb].derive() // requires implicit `com.avast.cactus.Converter[NewFileSourceAddedGpb, NewFileSourceAdded]`
)(
businessLogic.processMessage,
failureHandler
)
)(businessLogic.processMessage)
```
(see [unit test](core/src/test/scala/com/avast/clients/rabbitmq/MultiFormatConsumerTest.scala) for full example)

Expand Down
20 changes: 19 additions & 1 deletion api/src/main/scala/com/avast/clients/rabbitmq/api/Delivery.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
package com.avast.clients.rabbitmq.api

case class Delivery[A](body: A, properties: MessageProperties, routingKey: String)
import com.avast.bytes.Bytes

sealed trait Delivery[+A] {
def properties: MessageProperties

def routingKey: String
}

object Delivery {

case class Ok[+A](body: A, properties: MessageProperties, routingKey: String) extends Delivery[A]

case class MalformedContent(body: Bytes, properties: MessageProperties, routingKey: String, ce: ConversionException)
extends Delivery[Nothing]

def apply[A](body: A, properties: MessageProperties, routingKey: String): Delivery.Ok[A] = {
Delivery.Ok(body, properties, routingKey)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,30 @@ import scala.language.higherKinds
trait RabbitMQPullConsumer[F[_], A] {

/** Retrieves one message from the queue, if there is any.
*
* @return Some(DeliveryHandle[A]) when there was a message available; None otherwise.
*/
def pull(): F[Option[DeliveryWithHandle[F, A]]]
def pull(): F[PullResult[F, A]]
}

/** Trait which contains `Delivery` and it's _handle through which it can be *acked*, *rejected* etc.
*/
trait DeliveryWithHandle[F[_], A] {
trait DeliveryWithHandle[+F[_], +A] {
def delivery: Delivery[A]

def handle(result: DeliveryResult): F[Unit]
}

sealed trait PullResult[+F[_], +A] {
def toOption: Option[DeliveryWithHandle[F, A]]
}

object PullResult {

case class Ok[F[_], A](deliveryWithHandle: DeliveryWithHandle[F, A]) extends PullResult[F, A] {
override def toOption: Option[DeliveryWithHandle[F, A]] = Some(deliveryWithHandle)
}

case object EmptyQueue extends PullResult[Nothing, Nothing] {
override def toOption: Option[DeliveryWithHandle[Nothing, Nothing]] = None
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.avast.clients.rabbitmq.api

case class ConversionException(desc: String, cause: Throwable = null) extends RuntimeException(desc, cause)
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,40 @@ trait RabbitMQPullConsumer extends AutoCloseable {

/**
* Retrieves one message from the queue, if there is any.
*
* @return Some(DeliveryHandle[A]) when there was a message available; None otherwise.
*/
def pull(): CompletableFuture[Optional[DeliveryWithHandle]]
def pull(): CompletableFuture[PullResult]
}

trait DeliveryWithHandle {
def delivery: Delivery

def handle(result: DeliveryResult): CompletableFuture[Void]
}

sealed trait PullResult {
def toOptional: Optional[DeliveryWithHandle]

def isOk: Boolean

def isEmptyQueue: Boolean
}

object PullResult {

/* These two are not _case_ intentionally - it pollutes the API for Java users */

class Ok(deliveryWithHandle: DeliveryWithHandle) extends PullResult {
def getDeliveryWithHandle: DeliveryWithHandle = deliveryWithHandle

override val toOptional: Optional[DeliveryWithHandle] = Optional.of(deliveryWithHandle)
override val isOk: Boolean = true
override val isEmptyQueue: Boolean = false
}

object EmptyQueue extends PullResult {
override val toOptional: Optional[DeliveryWithHandle] = Optional.empty()
override val isOk: Boolean = false
override val isEmptyQueue: Boolean = true
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,14 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {

object Consumer {

def fromConfig[F[_]: ToTask, A: DeliveryConverter](providedConfig: Config,
channel: ServerChannel,
channelFactoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
monitor: Monitor,
consumerListener: ConsumerListener)(readAction: DeliveryReadAction[F, A])(
implicit scheduler: Scheduler): DefaultRabbitMQConsumer = {
def fromConfig[F[_]: ToTask, A: DeliveryConverter](
providedConfig: Config,
channel: ServerChannel,
channelFactoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
monitor: Monitor,
consumerListener: ConsumerListener,
readAction: DeliveryReadAction[F, A])(implicit scheduler: Scheduler): DefaultRabbitMQConsumer = {

val mergedConfig = providedConfig.withFallback(ConsumerDefaultConfig)

Expand All @@ -140,24 +141,25 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {

val consumerConfig = updatedConfig.wrapped.as[ConsumerConfig]("root")

create[F, A](consumerConfig, channel, channelFactoryInfo, blockingScheduler, monitor, consumerListener)(readAction)
create[F, A](consumerConfig, channel, channelFactoryInfo, blockingScheduler, monitor, consumerListener, readAction)
}

def create[F[_]: ToTask, A: DeliveryConverter](consumerConfig: ConsumerConfig,
channel: ServerChannel,
channelFactoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
monitor: Monitor,
consumerListener: ConsumerListener)(readAction: DeliveryReadAction[F, A])(
implicit scheduler: Scheduler): DefaultRabbitMQConsumer = {
def create[F[_]: ToTask, A: DeliveryConverter](
consumerConfig: ConsumerConfig,
channel: ServerChannel,
channelFactoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
monitor: Monitor,
consumerListener: ConsumerListener,
readAction: DeliveryReadAction[F, A])(implicit scheduler: Scheduler): DefaultRabbitMQConsumer = {

prepareConsumer(consumerConfig, readAction, channelFactoryInfo, channel, consumerListener, blockingScheduler, monitor)
}
}

object PullConsumer {

def fromConfig[F[_]: FromTask, A: DeliveryConverter](
def fromConfig[F[_]: FromTask: ToTask, A: DeliveryConverter](
providedConfig: Config,
channel: ServerChannel,
channelFactoryInfo: RabbitMQConnectionInfo,
Expand All @@ -182,11 +184,12 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
create[F, A](consumerConfig, channel, channelFactoryInfo, blockingScheduler, monitor)
}

def create[F[_]: FromTask, A: DeliveryConverter](consumerConfig: PullConsumerConfig,
channel: ServerChannel,
channelFactoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
monitor: Monitor)(implicit scheduler: Scheduler): DefaultRabbitMQPullConsumer[F, A] = {
def create[F[_]: FromTask: ToTask, A: DeliveryConverter](
consumerConfig: PullConsumerConfig,
channel: ServerChannel,
channelFactoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
monitor: Monitor)(implicit scheduler: Scheduler): DefaultRabbitMQPullConsumer[F, A] = {

preparePullConsumer(consumerConfig, channelFactoryInfo, channel, blockingScheduler, monitor)
}
Expand Down Expand Up @@ -326,7 +329,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
prepareConsumer(consumerConfig, channelFactoryInfo, channel, readAction, consumerListener, blockingScheduler, monitor)
}

private def preparePullConsumer[F[_]: FromTask, A: DeliveryConverter](
private def preparePullConsumer[F[_]: FromTask: ToTask, A: DeliveryConverter](
consumerConfig: PullConsumerConfig,
channelFactoryInfo: RabbitMQConnectionInfo,
channel: ServerChannel,
Expand Down Expand Up @@ -435,15 +438,16 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
}

val readAction: DefaultDeliveryReadAction = {
val convAction: DefaultDeliveryReadAction = { (d: Delivery[Bytes]) =>
val convAction: DefaultDeliveryReadAction = { d: Delivery[Bytes] =>
try {
implicitly[DeliveryConverter[A]].convert(d.body) match {
case Right(a) =>
val devA = d.copy(body = a)
implicitly[ToTask[F]].apply(userReadAction(devA))

case Left(ce) => Task.raiseError(ce)
val devA = d.flatMap { d =>
implicitly[DeliveryConverter[A]].convert(d.body) match {
case Right(a) => d.mapBody(_ => a)
case Left(ce) => Delivery.MalformedContent(d.body, d.properties, d.routingKey, ce)
}
}

implicitly[ToTask[F]].apply(userReadAction(devA))
} catch {
case NonFatal(e) =>
Task.raiseError(e)
Expand All @@ -468,7 +472,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
implicit callbackScheduler: Scheduler): DefaultDeliveryReadAction = {
import consumerConfig._

(delivery: Delivery[Bytes]) =>
delivery: Delivery[Bytes] =>
try {
// we try to catch also long-lasting synchronous work on the thread
val action = Task.deferFuture {
Expand All @@ -481,18 +485,18 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {

action
.timeout(ScalaDuration(processTimeout.toMillis, TimeUnit.MILLISECONDS))
.onErrorRecover {
.onErrorRecoverWith {
case e: TimeoutException =>
traceId.foreach(Kluzo.setTraceId)

logger.warn(s"[$name] Task timed-out, applying DeliveryResult.${consumerConfig.timeoutAction}", e)
consumerConfig.timeoutAction
Task.now(consumerConfig.timeoutAction)

case NonFatal(e) =>
traceId.foreach(Kluzo.setTraceId)

logger.warn(s"[$name] Error while executing callback, applying DeliveryResult.${consumerConfig.failureAction}", e)
consumerConfig.failureAction
Task.now(consumerConfig.failureAction)
}
.executeOn(callbackScheduler)
} catch {
Expand All @@ -503,6 +507,23 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {

}

private def executeParsingFailureAction[A](consumerConfig: ConsumerConfig,
ce: ConversionException,
delivery: Delivery[Bytes],
parsingFailureAction: ParsingFailureAction[Task]): Task[DeliveryResult] = {
import consumerConfig._

// logging on DEBUG, user should provide own higher-level logging in the callback
logger.debug(s"[$name] ${ce.getMessage}, executing parsingFailureAction", ce)

parsingFailureAction(name, delivery, ce)
.onErrorRecover {
case NonFatal(ex) =>
logger.warn(s"[$name] Error while executing parsingFailureAction, applying DeliveryResult.${consumerConfig.failureAction}", ex)
consumerConfig.failureAction
}
}

implicit class WrapConfig(val c: Config) extends AnyVal {
def wrapped: Config = {
// we need to wrap it with one level, to be able to parse it with Ficus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class DefaultRabbitMQConnection[F[_]: FromTask: ToTask](connection: ServerConnec
implicit scheduler: Scheduler): DefaultRabbitMQConsumer = {
addAutoCloseable {
DefaultRabbitMQClientFactory.Consumer
.fromConfig[F, A](config.getConfig(configName), createChannel(), info, blockingScheduler, monitor, consumerListener)(readAction)
.fromConfig[F, A](config.getConfig(configName), createChannel(), info, blockingScheduler, monitor, consumerListener, readAction)
}
}

Expand Down Expand Up @@ -141,3 +141,5 @@ class DefaultRabbitMQConnection[F[_]: FromTask: ToTask](connection: ServerConnec
}

}

object DefaultRabbitMQConnection extends StrictLogging {}
Loading