Skip to content

Commit

Permalink
Added an ability to implement Request - Reply integration pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry Dobrynin committed Jun 28, 2020
1 parent f200598 commit b1055ed
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 59 deletions.
53 changes: 47 additions & 6 deletions README.md
Expand Up @@ -5,6 +5,8 @@ ZIO-JMS adapts JMS API to ZIO streams and makes it working more conveniently and

---

# Receive messages

For receiving messages it needs to create a message consumer using utility methods in JmsConsumer object

```scala
Expand Down Expand Up @@ -37,7 +39,8 @@ val received: ZIO[Has[Connection] with Blocking, JMSException, Unit] =
.runDrain
```

Another ability to process input messages without using ZIO streams is more concise
Another ability to process input messages without using ZIO streams is more concise. A message is being acknowledged after
successful processing

```scala
import zio.{ZIO, Has}
Expand All @@ -49,7 +52,7 @@ import com.gh.dobrynya.zio.jms._
def someMessageProcessor(message: Message): ZIO[Console, Exception, Unit] =
putStrLn(s"Received message $message")

val processing: ZIO[Console with Blocking with Has[Connection]] =
val processing: ZIO[Console with Blocking with Has[Connection], Exception, Unit] =
JmsConsumer.consumeWith(Topic("test-topic"), someMessageProcessor)
```

Expand All @@ -69,6 +72,8 @@ val processing: ZIO[Blocking with Has[Connection], Any, Unit] =
JmsConsumer.consumeTxWith(Topic("test-topic"), someMessageProcessor)
```

# Send messages

For sending messages it needs to create sinks providing a destination and a message encoder as follows

```scala
Expand All @@ -80,16 +85,52 @@ val messages = (1 to 100).map(i => s"Message $i")
ZStream.fromIterable(messages).run(JmsProducer.sink(Queue("test-queue"), textMessageEncoder))
```

# Connection

The last thing is to provide a connection like follows

```scala
import zio.{ZLayer, Has}
import zio.blocking._
import javax.jms.{ConnectionFactory, Connection, JMSException}
import com.gh.dobrynya.zio.jms.connection
import javax.jms.{Connection, ConnectionFactory, JMSException}
import com.gh.dobrynya.zio.jms._

def connectionFactory: ConnectionFactory = ???

val connection: ZLayer[Blocking, JMSException, Blocking with Has[Connection]] =
val connectionLayer: ZLayer[Blocking, JMSException, Blocking with Has[Connection]] =
ZLayer.fromManaged(connection(connectionFactory)).passthrough
```

val consuming = JmsConsumer.consume(Queue("test-queue")).runDrain
.provideSomeLayer(connectionLayer)
```

# Request - Reply integration pattern

## From a client's perspective

```scala
import com.gh.dobrynya.zio.jms._
import zio.stream._

val request = Queue("request")
val response = Queue("response")

val messages: List[String] = ???

ZStream.fromIterable(messages)
.run(JmsProducer.requestSink(request, response, textMessageEncoder))
```

## From a server's perspective

```scala
import com.gh.dobrynya.zio.jms._
import zio.stream._

val request = Queue("request")
JmsConsumer.consumeAndReplyWith(request,
(message, session) => textMessageEncoder(onlyText(message).toUpperCase, session).map(Some.apply))
```

Here is a responder handling input messages and optionally responding to them to a destination specified in
`JMSReplyTo` header. It automatically copy `JMSCorellationID` header before sending.
49 changes: 41 additions & 8 deletions src/main/scala/com/gh/dobrynya/zio/jms/JmsConsumer.scala
@@ -1,6 +1,6 @@
package com.gh.dobrynya.zio.jms

import javax.jms.{JMSException, Message, MessageConsumer, Session, Connection => JMSConnection}
import javax.jms.{ JMSException, Message, MessageConsumer, Session, Connection => JMSConnection }
import zio._
import zio.blocking._
import zio.stream.ZStream
Expand Down Expand Up @@ -48,13 +48,49 @@ object JmsConsumer {
.managed(make[TransactionalMessage](destination, transacted = true, Session.SESSION_TRANSACTED))
.flatMap(_.consume(new TransactionalMessage(_, _)))

def consumeAndReplyWith[R, E >: JMSException](
destination: DestinationFactory,
responder: (Message, Session) => ZIO[R, E, Option[Message]],
acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE
): ZIO[R with Blocking with Has[JMSConnection], E, Unit] = {
val consumerAndProducer = for {
connection <- ZIO.service[JMSConnection].toManaged_
session <- session(connection, transacted = false, acknowledgementMode)
mc <- consumer(session, destination(session))
mp <- producer(session)
} yield (session, mc, mp)

ZStream
.managed(consumerAndProducer)
.flatMap {
case (session, mc, mp) =>
new JmsConsumer[Message](session, mc)
.consume((m, _) => m)
.mapM(message => responder(message, session).map(_.map((message, _))))
.collect {
case Some(requestResponse) if requestResponse._1.getJMSReplyTo != null =>
requestResponse
}
.mapM {
case (request, response) =>
Task {
response.setJMSCorrelationID(request.getJMSCorrelationID)
mp.send(request.getJMSReplyTo, response)
request.acknowledge()
println(s"Sent $response")
}.refineToOrDie
}
}
.runDrain
}

def consumeWith[R, E >: JMSException](
destination: DestinationFactory,
processor: Message => ZIO[R, E, Any],
acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE
): ZIO[R with BlockingConnection, E, Unit] =
make[Message](destination, transacted = false, acknowledgementMode)
.use(_.consume((m, _) => m).foreach(processor).unit)
.use(_.consume((m, _) => m).foreach(m => processor(m) *> acknowledge(m)).unit)

/**
* Consumes the specified destination and provides the processor with received message.
Expand All @@ -71,11 +107,8 @@ object JmsConsumer {
): ZIO[R with BlockingConnection, E, Unit] =
make[TransactionalMessage](destination, transacted = true, Session.AUTO_ACKNOWLEDGE)
.use(
_.consume(new TransactionalMessage(_, _))
.foreach { tm =>
processor(tm.message)
.tapBoth(_ => tm.rollback, _ => tm.commit)
}
.unit
_.consume(new TransactionalMessage(_, _)).foreach { tm =>
processor(tm.message).tapBoth(_ => tm.rollback, _ => tm.commit)
}.unit
)
}
43 changes: 42 additions & 1 deletion src/main/scala/com/gh/dobrynya/zio/jms/JmsProducer.scala
Expand Up @@ -11,6 +11,7 @@ class JmsProducer[R, E >: JMSException, A](session: Session, sender: A => ZIO[R,
}

object JmsProducer {

def sink[R, E >: JMSException, A](
destination: DestinationFactory,
encoder: (A, Session) => ZIO[R, E, Message],
Expand All @@ -32,7 +33,7 @@ object JmsProducer {
for {
connection <- ZIO.service[JMSConnection].toManaged_
session <- session(connection, transacted, acknowledgementMode)
d <- Task(destination(session)).refineToOrDie.toManaged_
d = destination(session)
mp <- producer(session)
} yield
new JmsProducer[R, E, A](session,
Expand Down Expand Up @@ -63,4 +64,44 @@ object JmsProducer {
) { jmsProducer =>
ZSink.foreach(jmsProducer.produce)
}

/**
* Creates a sink for implementing Request - Reply integration pattern.
* It sends messages to the specified destination and enriches JMSReplyTo header with provided response destination.
* You need to specify JMSCorrelationID manually if required by provided encoder.
* @param destination specifies destination
* @param replyTo specifies response destination
* @param encoder converts a message to an appropriate JMS message
* @param transacted specifies session transactionality
* @param acknowledgementMode specifies session acknowledgement mode
* @tparam R specifies dependencies
* @tparam E specifies possible errors
* @tparam A message type
* @return a new sink
*/
def requestSink[R, E >: JMSException, A](
destination: DestinationFactory,
replyTo: DestinationFactory,
encoder: (A, Session) => ZIO[R, E, Message],
transacted: Boolean = false,
acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE
): ZSink[R with BlockingConnection, E, A, A, Unit] =
ZSink.managed[R with BlockingConnection, E, A, JmsProducer[R, E, A], A, Unit](
for {
connection <- ZIO.service[JMSConnection].toManaged_
session <- session(connection, transacted, acknowledgementMode)
mp <- producer(session)
d = destination(session)
replyHeader = replyTo(session)
} yield
new JmsProducer[R, E, A](session,
message =>
encoder(message, session).map { encoded =>
encoded.setJMSReplyTo(replyHeader)
mp.send(d, encoded)
encoded
})
) { jmsProducer =>
ZSink.foreach(jmsProducer.produce)
}
}
63 changes: 19 additions & 44 deletions src/test/scala/com/gh/dobrynya/zio/jms/JmsComponentsSpec.scala
Expand Up @@ -94,8 +94,8 @@ object JmsComponentsSpec extends DefaultRunnableSpec with ConnectionAware {
_ <- send(messages, dest)
collector <- ZQueue.unbounded[String]
_ <- JmsConsumer
.consumeTxWith(dest, message => ZIO.fail(s"Some processing error: ${onlyText(message)}!"))
.ignore
.consumeTxWith(dest, message => ZIO.fail(s"Some processing error: ${onlyText(message)}!"))
.ignore
_ <- JmsConsumer
.consumeTxWith(dest, message => collector.offer(onlyText(message)))
.fork
Expand All @@ -105,49 +105,24 @@ object JmsComponentsSpec extends DefaultRunnableSpec with ConnectionAware {
}
},
testM("Client requires a response to be sent to a dedicated queue via JMSReplyTo header") {
val source = Queue("JmsComponentsSpec-6")
val replyQueue = Queue("JmsComponentsSpec-7")
val requestDestination = Queue("JmsComponentsSpec-6")
val replyDestination = Queue("JmsComponentsSpec-7")

val messages = List("1", "2", "3")

val produceWithReplyTo = (for {
jmsProducer <- JmsProducer.make(
source,
(message: String, session: Session) =>
Task {
val replyTo = replyQueue(session)
val msg = session.createTextMessage(message)
msg.setJMSReplyTo(replyTo)
msg.setJMSCorrelationID(message)
msg
}.refineToOrDie
)
} yield jmsProducer).use(
p =>
ZStream
.fromIterable(messages)
.foreach(p.produce)
)

(for {
_ <- produceWithReplyTo
_ <- JmsConsumer
.consume(source)
.tap(s => putStrLn(s"Received request $s"))
.take(messages.size)
.map(m => onlyText(m) -> m.getJMSReplyTo)
.run(
JmsProducer.routerSink(
(message, session) =>
textMessageEncoder(message._1.toUpperCase, session)
.tap(m => UIO(m.setJMSCorrelationID(message._1)))
.map(message._2 -> _)
.tap(p => putStrLn(s"Responding to ${p._1}"))
)
)
received <- JmsConsumer.consume(replyQueue).take(messages.size).collect(onlyText).runCollect
} yield assert(received.toList)(equalTo(messages)))
.provideSomeLayer[Console with Blocking](connectionLayer)
checkM(Gen.listOf(Gen.alphaNumericString).filter(_.nonEmpty)) {
messages =>
(for {
_ <- ZStream
.fromIterable(messages)
.run(JmsProducer.requestSink(requestDestination, replyDestination, textMessageEncoder))
_ <- JmsConsumer
.consumeAndReplyWith(requestDestination,
(message, session) =>
textMessageEncoder(onlyText(message).toUpperCase, session).map(Some.apply))
.fork
received <- JmsConsumer.consume(replyDestination).take(messages.size).collect(onlyText).runCollect
} yield assert(received.toList)(equalTo(messages.map(_.toUpperCase))))
.provideSomeLayer[Console with Blocking](connectionLayer)
}
}
) @@ timeout(3.minute) @@ timed @@ sequential @@ around(brokerService)(stopBroker)

Expand Down

0 comments on commit b1055ed

Please sign in to comment.