Skip to content

Commit

Permalink
Upgraded up to ZIO 2.0.0-RC6
Browse files Browse the repository at this point in the history
  • Loading branch information
dobrynya committed May 6, 2022
1 parent 9ccb35c commit 8c9598c
Show file tree
Hide file tree
Showing 13 changed files with 419 additions and 455 deletions.
28 changes: 0 additions & 28 deletions .circleci/config.yml

This file was deleted.

40 changes: 19 additions & 21 deletions README.md
Expand Up @@ -14,12 +14,11 @@ zio-jms is available for Scala 2 & 3
For receiving messages it needs to create a message consumer using utility methods in JmsConsumer object

```scala
import zio.{ZIO, Has, Chunk}
import zio.blocking._
import zio.{ZIO, Chunk}
import javax.jms.{Connection, Message, JMSException}
import io.github.dobrynya.zio.jms._

val received: ZIO[Has[Connection] with Blocking, JMSException, Chunk[Message]] =
val received: ZIO[Connection, JMSException, Chunk[Message]] =
JmsConsumer.consume(Queue("test-queue"))
.take(5)
.collect(onlyText)
Expand All @@ -29,14 +28,13 @@ val received: ZIO[Has[Connection] with Blocking, JMSException, Chunk[Message]] =
You can process a stream of messages transactionally like follows

```scala
import zio.{ZIO, Has, Chunk, UIO}
import zio.blocking._
import zio.{ZIO, Chunk, UIO}
import javax.jms.{Connection, Message, JMSException}
import io.github.dobrynya.zio.jms._

def messageProcessor(message: Message): UIO[Unit] = ???

val received: ZIO[Has[Connection] with Blocking, JMSException, Unit] =
val received: ZIO[Connection, JMSException, Unit] =
JmsConsumer.consumeTx(Queue("test-queue"))
.take(5)
.tap(transactionalMessage => messageProcessor(transactionalMessage.message) <* transactionalMessage.commit)
Expand All @@ -47,32 +45,31 @@ Another ability to process input messages without using ZIO streams is more conc
successful processing

```scala
import zio.{ZIO, Has}
import zio.blocking._
import zio.console._
import javax.jms.{Connection, Message, JMSException}
import zio._

import javax.jms.{Connection, JMSException, Message}
import io.github.dobrynya.zio.jms._
import zio.Console.printLine

def someMessageProcessor(message: Message): ZIO[Console, Exception, Unit] =
putStrLn(s"Received message $message")
printLine(s"Received message $message")

val processing: ZIO[Console with Blocking with Has[Connection], Exception, Unit] =
val processing: ZIO[Console with Connection, Exception, Unit] =
JmsConsumer.consumeWith(Topic("test-topic"), someMessageProcessor)
```

In case of possible failures during processing a message I recommend using a transactional consumer which commits a message
when it is processed successfully and rolls back when it fails

```scala
import zio.{ZIO, IO, Has}
import zio.blocking._
import zio.{ZIO, IO}
import javax.jms.{Connection, Message, JMSException}
import io.github.dobrynya.zio.jms._

def someMessageProcessor(message: Message): IO[String, Unit] =
IO.fail(s"Error occurred during processing a message $message")

val processing: ZIO[Blocking with Has[Connection], Any, Unit] =
val processing: ZIO[Connection, Any, Unit] =
JmsConsumer.consumeTxWith(Topic("test-topic"), someMessageProcessor)
```

Expand All @@ -94,18 +91,19 @@ ZStream.fromIterable(messages).run(JmsProducer.sink(Queue("test-queue"), textMes
The last thing is to provide a connection like follows

```scala
import zio.{ZLayer, Has}
import zio.blocking._
import zio._
import javax.jms.{Connection, ConnectionFactory, JMSException}
import io.github.dobrynya.zio.jms._

def connectionFactory: ConnectionFactory = ???

val connectionLayer: ZLayer[Blocking, JMSException, Blocking with Has[Connection]] =
ZLayer.fromManaged(connection(connectionFactory)).passthrough
val connectionLayer: Layer[JMSException, Connection] =
ZLayer.fromManaged(connection(connectionFactory))

val consuming = JmsConsumer.consume(Queue("test-queue")).runDrain
.provideSomeLayer(connectionLayer)
val consuming = JmsConsumer
.consume(Queue("test-queue"))
.runDrain
.provideSomeLayer(connectionLayer)
```

# Request - Reply integration pattern
Expand Down
15 changes: 8 additions & 7 deletions build.sbt
Expand Up @@ -15,18 +15,19 @@ crossPaths := true
publishMavenStyle := true
publishTo := Some("releases" at "https://s01.oss.sonatype.org/service/local/staging/deploy/maven2")

scalaVersion := "3.1.0"
crossScalaVersions := List("2.13.7", "3.1.0")
scalaVersion := "3.1.2"
crossScalaVersions := List("2.13.7", "3.1.2")

libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "1.0.12",
"dev.zio" %% "zio-streams" % "1.0.12",
"dev.zio" %% "zio-test" % "1.0.12" % Test,
"dev.zio" %% "zio-test-sbt" % "1.0.12" % Test,
"dev.zio" %% "zio" % "2.0.0-RC6",
"dev.zio" %% "zio-streams" % "2.0.0-RC6",
"dev.zio" %% "zio-test" % "2.0.0-RC6" % Test,
"dev.zio" %% "zio-test-sbt" % "2.0.0-RC6" % Test,
"org.apache.geronimo.specs" % "geronimo-jms_1.1_spec" % "1.1.1" % Provided,
"org.apache.activemq" % "activemq-broker" % "5.16.2" % Test,
"org.apache.activemq" % "activemq-kahadb-store" % "5.16.2" % Test,
"ch.qos.logback" % "logback-classic" % "1.2.6" % Test
"ch.qos.logback" % "logback-classic" % "1.2.6" % Test,
"io.github.sullis" %% "jms-testkit" % "1.0.4" % Test
)

testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
2 changes: 1 addition & 1 deletion project/build.properties
@@ -1 +1 @@
sbt.version = 1.5.5
sbt.version = 1.6.2
129 changes: 74 additions & 55 deletions src/main/scala/io/github/dobrynya/zio/jms/JmsConsumer.scala
Expand Up @@ -2,95 +2,109 @@ package io.github.dobrynya.zio.jms

import javax.jms.{ JMSException, Message, MessageConsumer, Session, Connection => JMSConnection }
import zio._
import zio.blocking._
import zio.stream.ZStream
import zio.stream._

class JmsConsumer[T](session: Session, consumer: MessageConsumer, semaphore: Semaphore) {
class JmsConsumer[T] private[jms] (session: Session,
consumer: MessageConsumer,
semaphore: Semaphore,
timeout: Option[Long] = None) {

/**
* Consumes the specified destination and emits received message with this consumer to provide helpful operations.
* @return a stream of received messages and a session to commit/rollback messages when working transactionally
*/
def consume(enrich: (Message, JmsConsumer[T]) => T): ZStream[Blocking, JMSException, T] =
ZStream.repeatEffect(semaphore.withPermit(
effectBlockingInterrupt(enrich(consumer.receive(), this)).refineToOrDie
))
def consume(enrich: (Message, JmsConsumer[T]) => T): Stream[JMSException, T] =
ZStream
.repeatZIO(
semaphore.withPermit(
ZIO.attemptBlocking(timeout.map(consumer.receive).getOrElse(consumer.receive()))
.tapError(th => ZIO.logWarning(s"An error occurred during receiving a message: ${th.getMessage}!"))
.refineToOrDie
)
)
.filter(_ != null)
.map(enrich(_, this))

def commitSession: ZIO[Blocking, JMSException, Unit] = semaphore.withPermit(commit(session))
def commitSession: IO[JMSException, Unit] = semaphore.withPermit(commit(session))

def rollbackSession: ZIO[Blocking, JMSException, Unit] = semaphore.withPermit(rollback(session))
def rollbackSession: IO[JMSException, Unit] = semaphore.withPermit(rollback(session))
}

class TxMessage(val message: Message, consumer: JmsConsumer[TxMessage]) {
def commit: ZIO[Blocking, JMSException, Unit] = consumer.commitSession
def rollback: ZIO[Blocking, JMSException, Unit] = consumer.rollbackSession
def commit: IO[JMSException, Unit] = consumer.commitSession
def rollback: IO[JMSException, Unit] = consumer.rollbackSession
}

object JmsConsumer {

def consume(destination: DestinationFactory,
acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE): ZStream[BlockingConnection, JMSException, Message] =
ZStream
.managed(make[Message](destination, transacted = false, acknowledgementMode))
.flatMap(_.consume((m, _) => m))

def make[A](destination: DestinationFactory,
transacted: Boolean,
acknowledgementMode: Int): ZManaged[BlockingConnection, JMSException, JmsConsumer[A]] =
acknowledgementMode: Int,
timeout: Option[Long] = None): ZIO[Scope & JMSConnection, JMSException, JmsConsumer[A]] =
for {
connection <- ZIO.service[JMSConnection].toManaged_
connection <- ZIO.service[JMSConnection]
session <- session(connection, transacted, acknowledgementMode)
d <- destination(session).toManaged_
d <- destination(session)
mc <- consumer(session, d)
semaphore <- Semaphore.make(1).toManaged_
} yield new JmsConsumer(session, mc, semaphore)
semaphore <- Semaphore.make(1)
} yield new JmsConsumer(session, mc, semaphore, timeout)

def consume(
destination: DestinationFactory,
acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE,
timeout: Option[Long] = Some(300)
): ZStream[JMSConnection, JMSException, Message] =
ZStream
.scoped[JMSConnection](make[Message](destination, transacted = false, acknowledgementMode, timeout))
.flatMap(_.consume((m, _) => m))

def consumeTx(destination: DestinationFactory): ZStream[BlockingConnection, JMSException, TxMessage] =
def consumeTx(destination: DestinationFactory, timeout: Option[Long] = Some(300)): ZStream[JMSConnection, JMSException, TxMessage] =
ZStream
.managed(make[TxMessage](destination, transacted = true, Session.SESSION_TRANSACTED))
.scoped[JMSConnection](make[TxMessage](destination, transacted = true, Session.SESSION_TRANSACTED, timeout))
.flatMap(_.consume(new TxMessage(_, _)))

def consumeAndReplyWith[R, E >: JMSException](
destination: DestinationFactory,
responder: (Message, Session) => ZIO[R, E, Option[Message]],
transacted: Boolean = false,
acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE
): ZIO[R with Blocking with Has[JMSConnection], Any, Unit] =
createPipeline(destination, responder, transacted, acknowledgementMode)
acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE,
timeout: Option[Long] = Some(300)
): ZIO[R & JMSConnection, Any, Unit] =
createPipeline(destination, responder, transacted, acknowledgementMode, timeout)

private[jms] def createPipeline[R, E](
destination: DestinationFactory,
responder: (Message, Session) => ZIO[R, E, Option[Message]],
transacted: Boolean,
acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE
): ZIO[R with BlockingConnection, Any, Unit] = {
acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE,
timeout: Option[Long] = Some(300)
): ZIO[R & JMSConnection, Any, Unit] = {
val consumerAndProducer = for {
connection <- ZIO.service[JMSConnection].toManaged_
connection <- ZIO.service[JMSConnection]
session <- session(connection, transacted, acknowledgementMode)
d <- destination(session).toManaged_
d <- destination(session)
mc <- consumer(session, d)
mp <- producer(session)
semaphore <- Semaphore.make(1).toManaged_
semaphore <- Semaphore.make(1)
} yield (session, mc, mp, semaphore)

ZStream
.managed(consumerAndProducer)
.scoped(consumerAndProducer)
.flatMap {
case (session, mc, mp, semaphore) =>
new JmsConsumer[Message](session, mc, semaphore)
new JmsConsumer[Message](session, mc, semaphore, timeout)
.consume((m, _) => m)
.mapM { request =>
.mapZIO { request =>
for {
response <- responder(request, session)
_ <- response
.filter(_ => request.getJMSReplyTo != null)
.map(
response =>
Task {
response.setJMSCorrelationID(request.getJMSCorrelationID)
mp.send(request.getJMSReplyTo, response)
}.tapError(_ => rollback(session).when(transacted))
)
.map { response =>
ZIO.attemptBlocking {
response.setJMSCorrelationID(request.getJMSCorrelationID)
mp.send(request.getJMSReplyTo, response)
}.tapError(_ => rollback(session).when(transacted))
}
.getOrElse(ZIO.unit)
_ <- acknowledge(request).unless(transacted) *> commit(session).when(transacted)
} yield ()
Expand All @@ -102,10 +116,13 @@ object JmsConsumer {
def consumeWith[R, E >: JMSException](
destination: DestinationFactory,
processor: Message => ZIO[R, E, Any],
acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE
): ZIO[R with BlockingConnection, E, Unit] =
make[Message](destination, transacted = false, acknowledgementMode)
.use(_.consume((m, _) => m).foreach(m => processor(m) *> acknowledge(m)).unit)
acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE,
timeout: Option[Long] = Some(300)
): ZIO[R with JMSConnection, E, Unit] =
ZStream
.scoped[R with JMSConnection](make[Message](destination, transacted = false, acknowledgementMode, timeout))
.flatMap(_.consume((m, _) => m))
.foreach(m => processor(m) *> acknowledge(m))

/**
* Consumes the specified destination and provides the processor with received message.
Expand All @@ -116,14 +133,16 @@ object JmsConsumer {
* @tparam E specifies possible error type
* @return unit
*/
def consumeTxWith[R, E >: JMSException](
destination: DestinationFactory,
processor: Message => ZIO[R, E, Any],
): ZIO[R with BlockingConnection, E, Unit] =
make[TxMessage](destination, transacted = true, Session.SESSION_TRANSACTED)
.use(
_.consume(new TxMessage(_, _)).foreach { tm =>
processor(tm.message).tapBoth(_ => tm.rollback, _ => tm.commit)
}.unit
def consumeTxWith[R, E >: JMSException](destination: DestinationFactory,
processor: Message => ZIO[R, E, Any],
timeout: Option[Long] = Some(300)): ZIO[R & JMSConnection, E, Unit] =
ZStream
.scoped[R with JMSConnection](
make[TxMessage](destination, transacted = true, Session.SESSION_TRANSACTED, timeout)
)
.flatMap(_.consume(new TxMessage(_, _)))
.foreach { tm =>
processor(tm.message).tapBoth(e => ZIO.logDebug(s"Rolling back ${tm.message} because of $e!") *> tm.rollback,
_ => ZIO.logDebug("Committing a message") *> tm.commit)
}
}

0 comments on commit 8c9598c

Please sign in to comment.