From b1055edcefed878fc3fae29bea9b69c5ab83208f Mon Sep 17 00:00:00 2001 From: Dmitry Dobrynin Date: Mon, 29 Jun 2020 02:04:17 +0300 Subject: [PATCH] Added an ability to implement Request - Reply integration pattern --- README.md | 53 ++++++++++++++-- .../com/gh/dobrynya/zio/jms/JmsConsumer.scala | 49 ++++++++++++--- .../com/gh/dobrynya/zio/jms/JmsProducer.scala | 43 ++++++++++++- .../dobrynya/zio/jms/JmsComponentsSpec.scala | 63 ++++++------------- 4 files changed, 149 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index a800b25..cea9179 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ ZIO-JMS adapts JMS API to ZIO streams and makes it working more conveniently and --- +# Receive messages + For receiving messages it needs to create a message consumer using utility methods in JmsConsumer object ```scala @@ -37,7 +39,8 @@ val received: ZIO[Has[Connection] with Blocking, JMSException, Unit] = .runDrain ``` -Another ability to process input messages without using ZIO streams is more concise +Another ability to process input messages without using ZIO streams is more concise. A message is being acknowledged after +successful processing ```scala import zio.{ZIO, Has} @@ -49,7 +52,7 @@ import com.gh.dobrynya.zio.jms._ def someMessageProcessor(message: Message): ZIO[Console, Exception, Unit] = putStrLn(s"Received message $message") -val processing: ZIO[Console with Blocking with Has[Connection]] = +val processing: ZIO[Console with Blocking with Has[Connection], Exception, Unit] = JmsConsumer.consumeWith(Topic("test-topic"), someMessageProcessor) ``` @@ -69,6 +72,8 @@ val processing: ZIO[Blocking with Has[Connection], Any, Unit] = JmsConsumer.consumeTxWith(Topic("test-topic"), someMessageProcessor) ``` +# Send messages + For sending messages it needs to create sinks providing a destination and a message encoder as follows ```scala @@ -80,16 +85,52 @@ val messages = (1 to 100).map(i => s"Message $i") ZStream.fromIterable(messages).run(JmsProducer.sink(Queue("test-queue"), textMessageEncoder)) ``` +# Connection + The last thing is to provide a connection like follows ```scala import zio.{ZLayer, Has} import zio.blocking._ -import javax.jms.{ConnectionFactory, Connection, JMSException} -import com.gh.dobrynya.zio.jms.connection +import javax.jms.{Connection, ConnectionFactory, JMSException} +import com.gh.dobrynya.zio.jms._ def connectionFactory: ConnectionFactory = ??? -val connection: ZLayer[Blocking, JMSException, Blocking with Has[Connection]] = +val connectionLayer: ZLayer[Blocking, JMSException, Blocking with Has[Connection]] = ZLayer.fromManaged(connection(connectionFactory)).passthrough -``` \ No newline at end of file + +val consuming = JmsConsumer.consume(Queue("test-queue")).runDrain + .provideSomeLayer(connectionLayer) +``` + +# Request - Reply integration pattern + +## From a client's perspective + +```scala +import com.gh.dobrynya.zio.jms._ +import zio.stream._ + +val request = Queue("request") +val response = Queue("response") + +val messages: List[String] = ??? + +ZStream.fromIterable(messages) + .run(JmsProducer.requestSink(request, response, textMessageEncoder)) +``` + +## From a server's perspective + +```scala +import com.gh.dobrynya.zio.jms._ +import zio.stream._ + +val request = Queue("request") +JmsConsumer.consumeAndReplyWith(request, + (message, session) => textMessageEncoder(onlyText(message).toUpperCase, session).map(Some.apply)) +``` + +Here is a responder handling input messages and optionally responding to them to a destination specified in +`JMSReplyTo` header. It automatically copy `JMSCorellationID` header before sending. diff --git a/src/main/scala/com/gh/dobrynya/zio/jms/JmsConsumer.scala b/src/main/scala/com/gh/dobrynya/zio/jms/JmsConsumer.scala index 5304473..bfd5790 100644 --- a/src/main/scala/com/gh/dobrynya/zio/jms/JmsConsumer.scala +++ b/src/main/scala/com/gh/dobrynya/zio/jms/JmsConsumer.scala @@ -1,6 +1,6 @@ package com.gh.dobrynya.zio.jms -import javax.jms.{JMSException, Message, MessageConsumer, Session, Connection => JMSConnection} +import javax.jms.{ JMSException, Message, MessageConsumer, Session, Connection => JMSConnection } import zio._ import zio.blocking._ import zio.stream.ZStream @@ -48,13 +48,49 @@ object JmsConsumer { .managed(make[TransactionalMessage](destination, transacted = true, Session.SESSION_TRANSACTED)) .flatMap(_.consume(new TransactionalMessage(_, _))) + def consumeAndReplyWith[R, E >: JMSException]( + destination: DestinationFactory, + responder: (Message, Session) => ZIO[R, E, Option[Message]], + acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE + ): ZIO[R with Blocking with Has[JMSConnection], E, Unit] = { + val consumerAndProducer = for { + connection <- ZIO.service[JMSConnection].toManaged_ + session <- session(connection, transacted = false, acknowledgementMode) + mc <- consumer(session, destination(session)) + mp <- producer(session) + } yield (session, mc, mp) + + ZStream + .managed(consumerAndProducer) + .flatMap { + case (session, mc, mp) => + new JmsConsumer[Message](session, mc) + .consume((m, _) => m) + .mapM(message => responder(message, session).map(_.map((message, _)))) + .collect { + case Some(requestResponse) if requestResponse._1.getJMSReplyTo != null => + requestResponse + } + .mapM { + case (request, response) => + Task { + response.setJMSCorrelationID(request.getJMSCorrelationID) + mp.send(request.getJMSReplyTo, response) + request.acknowledge() + println(s"Sent $response") + }.refineToOrDie + } + } + .runDrain + } + def consumeWith[R, E >: JMSException]( destination: DestinationFactory, processor: Message => ZIO[R, E, Any], acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE ): ZIO[R with BlockingConnection, E, Unit] = make[Message](destination, transacted = false, acknowledgementMode) - .use(_.consume((m, _) => m).foreach(processor).unit) + .use(_.consume((m, _) => m).foreach(m => processor(m) *> acknowledge(m)).unit) /** * Consumes the specified destination and provides the processor with received message. @@ -71,11 +107,8 @@ object JmsConsumer { ): ZIO[R with BlockingConnection, E, Unit] = make[TransactionalMessage](destination, transacted = true, Session.AUTO_ACKNOWLEDGE) .use( - _.consume(new TransactionalMessage(_, _)) - .foreach { tm => - processor(tm.message) - .tapBoth(_ => tm.rollback, _ => tm.commit) - } - .unit + _.consume(new TransactionalMessage(_, _)).foreach { tm => + processor(tm.message).tapBoth(_ => tm.rollback, _ => tm.commit) + }.unit ) } diff --git a/src/main/scala/com/gh/dobrynya/zio/jms/JmsProducer.scala b/src/main/scala/com/gh/dobrynya/zio/jms/JmsProducer.scala index ff39fc4..9583bac 100644 --- a/src/main/scala/com/gh/dobrynya/zio/jms/JmsProducer.scala +++ b/src/main/scala/com/gh/dobrynya/zio/jms/JmsProducer.scala @@ -11,6 +11,7 @@ class JmsProducer[R, E >: JMSException, A](session: Session, sender: A => ZIO[R, } object JmsProducer { + def sink[R, E >: JMSException, A]( destination: DestinationFactory, encoder: (A, Session) => ZIO[R, E, Message], @@ -32,7 +33,7 @@ object JmsProducer { for { connection <- ZIO.service[JMSConnection].toManaged_ session <- session(connection, transacted, acknowledgementMode) - d <- Task(destination(session)).refineToOrDie.toManaged_ + d = destination(session) mp <- producer(session) } yield new JmsProducer[R, E, A](session, @@ -63,4 +64,44 @@ object JmsProducer { ) { jmsProducer => ZSink.foreach(jmsProducer.produce) } + + /** + * Creates a sink for implementing Request - Reply integration pattern. + * It sends messages to the specified destination and enriches JMSReplyTo header with provided response destination. + * You need to specify JMSCorrelationID manually if required by provided encoder. + * @param destination specifies destination + * @param replyTo specifies response destination + * @param encoder converts a message to an appropriate JMS message + * @param transacted specifies session transactionality + * @param acknowledgementMode specifies session acknowledgement mode + * @tparam R specifies dependencies + * @tparam E specifies possible errors + * @tparam A message type + * @return a new sink + */ + def requestSink[R, E >: JMSException, A]( + destination: DestinationFactory, + replyTo: DestinationFactory, + encoder: (A, Session) => ZIO[R, E, Message], + transacted: Boolean = false, + acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE + ): ZSink[R with BlockingConnection, E, A, A, Unit] = + ZSink.managed[R with BlockingConnection, E, A, JmsProducer[R, E, A], A, Unit]( + for { + connection <- ZIO.service[JMSConnection].toManaged_ + session <- session(connection, transacted, acknowledgementMode) + mp <- producer(session) + d = destination(session) + replyHeader = replyTo(session) + } yield + new JmsProducer[R, E, A](session, + message => + encoder(message, session).map { encoded => + encoded.setJMSReplyTo(replyHeader) + mp.send(d, encoded) + encoded + }) + ) { jmsProducer => + ZSink.foreach(jmsProducer.produce) + } } diff --git a/src/test/scala/com/gh/dobrynya/zio/jms/JmsComponentsSpec.scala b/src/test/scala/com/gh/dobrynya/zio/jms/JmsComponentsSpec.scala index 234ea2c..961d975 100644 --- a/src/test/scala/com/gh/dobrynya/zio/jms/JmsComponentsSpec.scala +++ b/src/test/scala/com/gh/dobrynya/zio/jms/JmsComponentsSpec.scala @@ -94,8 +94,8 @@ object JmsComponentsSpec extends DefaultRunnableSpec with ConnectionAware { _ <- send(messages, dest) collector <- ZQueue.unbounded[String] _ <- JmsConsumer - .consumeTxWith(dest, message => ZIO.fail(s"Some processing error: ${onlyText(message)}!")) - .ignore + .consumeTxWith(dest, message => ZIO.fail(s"Some processing error: ${onlyText(message)}!")) + .ignore _ <- JmsConsumer .consumeTxWith(dest, message => collector.offer(onlyText(message))) .fork @@ -105,49 +105,24 @@ object JmsComponentsSpec extends DefaultRunnableSpec with ConnectionAware { } }, testM("Client requires a response to be sent to a dedicated queue via JMSReplyTo header") { - val source = Queue("JmsComponentsSpec-6") - val replyQueue = Queue("JmsComponentsSpec-7") + val requestDestination = Queue("JmsComponentsSpec-6") + val replyDestination = Queue("JmsComponentsSpec-7") - val messages = List("1", "2", "3") - - val produceWithReplyTo = (for { - jmsProducer <- JmsProducer.make( - source, - (message: String, session: Session) => - Task { - val replyTo = replyQueue(session) - val msg = session.createTextMessage(message) - msg.setJMSReplyTo(replyTo) - msg.setJMSCorrelationID(message) - msg - }.refineToOrDie - ) - } yield jmsProducer).use( - p => - ZStream - .fromIterable(messages) - .foreach(p.produce) - ) - - (for { - _ <- produceWithReplyTo - _ <- JmsConsumer - .consume(source) - .tap(s => putStrLn(s"Received request $s")) - .take(messages.size) - .map(m => onlyText(m) -> m.getJMSReplyTo) - .run( - JmsProducer.routerSink( - (message, session) => - textMessageEncoder(message._1.toUpperCase, session) - .tap(m => UIO(m.setJMSCorrelationID(message._1))) - .map(message._2 -> _) - .tap(p => putStrLn(s"Responding to ${p._1}")) - ) - ) - received <- JmsConsumer.consume(replyQueue).take(messages.size).collect(onlyText).runCollect - } yield assert(received.toList)(equalTo(messages))) - .provideSomeLayer[Console with Blocking](connectionLayer) + checkM(Gen.listOf(Gen.alphaNumericString).filter(_.nonEmpty)) { + messages => + (for { + _ <- ZStream + .fromIterable(messages) + .run(JmsProducer.requestSink(requestDestination, replyDestination, textMessageEncoder)) + _ <- JmsConsumer + .consumeAndReplyWith(requestDestination, + (message, session) => + textMessageEncoder(onlyText(message).toUpperCase, session).map(Some.apply)) + .fork + received <- JmsConsumer.consume(replyDestination).take(messages.size).collect(onlyText).runCollect + } yield assert(received.toList)(equalTo(messages.map(_.toUpperCase)))) + .provideSomeLayer[Console with Blocking](connectionLayer) + } } ) @@ timeout(3.minute) @@ timed @@ sequential @@ around(brokerService)(stopBroker)