From 8c9598c78fcf7405eadb6ac4c16fc268eaa6386b Mon Sep 17 00:00:00 2001 From: Dmitry Dobrynin Date: Sat, 18 Dec 2021 12:21:13 +0300 Subject: [PATCH] Upgraded up to ZIO 2.0.0-RC6 --- .circleci/config.yml | 28 --- README.md | 40 ++- build.sbt | 15 +- project/build.properties | 2 +- .../github/dobrynya/zio/jms/JmsConsumer.scala | 129 +++++----- .../github/dobrynya/zio/jms/JmsProducer.scala | 108 +++++---- .../dobrynya/zio/jms/destinations.scala | 10 +- .../io/github/dobrynya/zio/jms/package.scala | 65 ++--- .../dobrynya/zio/jms/ConnectionAware.scala | 26 -- .../dobrynya/zio/jms/JmsComponentsSpec.scala | 229 ------------------ .../dobrynya/zio/jms/JmsConsumerSpec.scala | 160 ++++++++++++ .../dobrynya/zio/jms/JmsProducerSpec.scala | 43 ++++ .../dobrynya/zio/jms/JmsTestKitAware.scala | 19 ++ 13 files changed, 419 insertions(+), 455 deletions(-) delete mode 100644 .circleci/config.yml delete mode 100644 src/test/scala/io/github/dobrynya/zio/jms/ConnectionAware.scala delete mode 100644 src/test/scala/io/github/dobrynya/zio/jms/JmsComponentsSpec.scala create mode 100644 src/test/scala/io/github/dobrynya/zio/jms/JmsConsumerSpec.scala create mode 100644 src/test/scala/io/github/dobrynya/zio/jms/JmsProducerSpec.scala create mode 100644 src/test/scala/io/github/dobrynya/zio/jms/JmsTestKitAware.scala diff --git a/.circleci/config.yml b/.circleci/config.yml deleted file mode 100644 index 579c4b3..0000000 --- a/.circleci/config.yml +++ /dev/null @@ -1,28 +0,0 @@ -version: 2 -jobs: - build: - docker: - - image: circleci/openjdk:8-jdk - - working_directory: ~/repo - - environment: - JVM_OPTS: -Xmx5120m - TERM: dumb - - steps: - - checkout - - - restore_cache: - keys: - - v1-dependencies-{{ checksum "build.sbt" }} - - v1-dependencies- - - - run: cat /dev/null | sbt test:compile - - - save_cache: - paths: - - ~/.m2 - key: v1-dependencies--{{ checksum "build.sbt" }} - - - run: cat /dev/null | sbt test:test \ No newline at end of file diff --git a/README.md b/README.md index ae14afe..670b385 100644 --- a/README.md +++ b/README.md @@ -14,12 +14,11 @@ zio-jms is available for Scala 2 & 3 For receiving messages it needs to create a message consumer using utility methods in JmsConsumer object ```scala -import zio.{ZIO, Has, Chunk} -import zio.blocking._ +import zio.{ZIO, Chunk} import javax.jms.{Connection, Message, JMSException} import io.github.dobrynya.zio.jms._ -val received: ZIO[Has[Connection] with Blocking, JMSException, Chunk[Message]] = +val received: ZIO[Connection, JMSException, Chunk[Message]] = JmsConsumer.consume(Queue("test-queue")) .take(5) .collect(onlyText) @@ -29,14 +28,13 @@ val received: ZIO[Has[Connection] with Blocking, JMSException, Chunk[Message]] = You can process a stream of messages transactionally like follows ```scala -import zio.{ZIO, Has, Chunk, UIO} -import zio.blocking._ +import zio.{ZIO, Chunk, UIO} import javax.jms.{Connection, Message, JMSException} import io.github.dobrynya.zio.jms._ def messageProcessor(message: Message): UIO[Unit] = ??? -val received: ZIO[Has[Connection] with Blocking, JMSException, Unit] = +val received: ZIO[Connection, JMSException, Unit] = JmsConsumer.consumeTx(Queue("test-queue")) .take(5) .tap(transactionalMessage => messageProcessor(transactionalMessage.message) <* transactionalMessage.commit) @@ -47,16 +45,16 @@ Another ability to process input messages without using ZIO streams is more conc successful processing ```scala -import zio.{ZIO, Has} -import zio.blocking._ -import zio.console._ -import javax.jms.{Connection, Message, JMSException} +import zio._ + +import javax.jms.{Connection, JMSException, Message} import io.github.dobrynya.zio.jms._ +import zio.Console.printLine def someMessageProcessor(message: Message): ZIO[Console, Exception, Unit] = - putStrLn(s"Received message $message") + printLine(s"Received message $message") -val processing: ZIO[Console with Blocking with Has[Connection], Exception, Unit] = +val processing: ZIO[Console with Connection, Exception, Unit] = JmsConsumer.consumeWith(Topic("test-topic"), someMessageProcessor) ``` @@ -64,15 +62,14 @@ In case of possible failures during processing a message I recommend using a tra when it is processed successfully and rolls back when it fails ```scala -import zio.{ZIO, IO, Has} -import zio.blocking._ +import zio.{ZIO, IO} import javax.jms.{Connection, Message, JMSException} import io.github.dobrynya.zio.jms._ def someMessageProcessor(message: Message): IO[String, Unit] = IO.fail(s"Error occurred during processing a message $message") -val processing: ZIO[Blocking with Has[Connection], Any, Unit] = +val processing: ZIO[Connection, Any, Unit] = JmsConsumer.consumeTxWith(Topic("test-topic"), someMessageProcessor) ``` @@ -94,18 +91,19 @@ ZStream.fromIterable(messages).run(JmsProducer.sink(Queue("test-queue"), textMes The last thing is to provide a connection like follows ```scala -import zio.{ZLayer, Has} -import zio.blocking._ +import zio._ import javax.jms.{Connection, ConnectionFactory, JMSException} import io.github.dobrynya.zio.jms._ def connectionFactory: ConnectionFactory = ??? -val connectionLayer: ZLayer[Blocking, JMSException, Blocking with Has[Connection]] = - ZLayer.fromManaged(connection(connectionFactory)).passthrough +val connectionLayer: Layer[JMSException, Connection] = + ZLayer.fromManaged(connection(connectionFactory)) -val consuming = JmsConsumer.consume(Queue("test-queue")).runDrain - .provideSomeLayer(connectionLayer) +val consuming = JmsConsumer + .consume(Queue("test-queue")) + .runDrain + .provideSomeLayer(connectionLayer) ``` # Request - Reply integration pattern diff --git a/build.sbt b/build.sbt index 209ba8a..b4948cb 100644 --- a/build.sbt +++ b/build.sbt @@ -15,18 +15,19 @@ crossPaths := true publishMavenStyle := true publishTo := Some("releases" at "https://s01.oss.sonatype.org/service/local/staging/deploy/maven2") -scalaVersion := "3.1.0" -crossScalaVersions := List("2.13.7", "3.1.0") +scalaVersion := "3.1.2" +crossScalaVersions := List("2.13.7", "3.1.2") libraryDependencies ++= Seq( - "dev.zio" %% "zio" % "1.0.12", - "dev.zio" %% "zio-streams" % "1.0.12", - "dev.zio" %% "zio-test" % "1.0.12" % Test, - "dev.zio" %% "zio-test-sbt" % "1.0.12" % Test, + "dev.zio" %% "zio" % "2.0.0-RC6", + "dev.zio" %% "zio-streams" % "2.0.0-RC6", + "dev.zio" %% "zio-test" % "2.0.0-RC6" % Test, + "dev.zio" %% "zio-test-sbt" % "2.0.0-RC6" % Test, "org.apache.geronimo.specs" % "geronimo-jms_1.1_spec" % "1.1.1" % Provided, "org.apache.activemq" % "activemq-broker" % "5.16.2" % Test, "org.apache.activemq" % "activemq-kahadb-store" % "5.16.2" % Test, - "ch.qos.logback" % "logback-classic" % "1.2.6" % Test + "ch.qos.logback" % "logback-classic" % "1.2.6" % Test, + "io.github.sullis" %% "jms-testkit" % "1.0.4" % Test ) testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework") diff --git a/project/build.properties b/project/build.properties index 215ddd2..b46cfa1 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 1.5.5 \ No newline at end of file +sbt.version = 1.6.2 \ No newline at end of file diff --git a/src/main/scala/io/github/dobrynya/zio/jms/JmsConsumer.scala b/src/main/scala/io/github/dobrynya/zio/jms/JmsConsumer.scala index 7efff4c..07a1f60 100644 --- a/src/main/scala/io/github/dobrynya/zio/jms/JmsConsumer.scala +++ b/src/main/scala/io/github/dobrynya/zio/jms/JmsConsumer.scala @@ -2,95 +2,109 @@ package io.github.dobrynya.zio.jms import javax.jms.{ JMSException, Message, MessageConsumer, Session, Connection => JMSConnection } import zio._ -import zio.blocking._ -import zio.stream.ZStream +import zio.stream._ -class JmsConsumer[T](session: Session, consumer: MessageConsumer, semaphore: Semaphore) { +class JmsConsumer[T] private[jms] (session: Session, + consumer: MessageConsumer, + semaphore: Semaphore, + timeout: Option[Long] = None) { /** * Consumes the specified destination and emits received message with this consumer to provide helpful operations. * @return a stream of received messages and a session to commit/rollback messages when working transactionally */ - def consume(enrich: (Message, JmsConsumer[T]) => T): ZStream[Blocking, JMSException, T] = - ZStream.repeatEffect(semaphore.withPermit( - effectBlockingInterrupt(enrich(consumer.receive(), this)).refineToOrDie - )) + def consume(enrich: (Message, JmsConsumer[T]) => T): Stream[JMSException, T] = + ZStream + .repeatZIO( + semaphore.withPermit( + ZIO.attemptBlocking(timeout.map(consumer.receive).getOrElse(consumer.receive())) + .tapError(th => ZIO.logWarning(s"An error occurred during receiving a message: ${th.getMessage}!")) + .refineToOrDie + ) + ) + .filter(_ != null) + .map(enrich(_, this)) - def commitSession: ZIO[Blocking, JMSException, Unit] = semaphore.withPermit(commit(session)) + def commitSession: IO[JMSException, Unit] = semaphore.withPermit(commit(session)) - def rollbackSession: ZIO[Blocking, JMSException, Unit] = semaphore.withPermit(rollback(session)) + def rollbackSession: IO[JMSException, Unit] = semaphore.withPermit(rollback(session)) } class TxMessage(val message: Message, consumer: JmsConsumer[TxMessage]) { - def commit: ZIO[Blocking, JMSException, Unit] = consumer.commitSession - def rollback: ZIO[Blocking, JMSException, Unit] = consumer.rollbackSession + def commit: IO[JMSException, Unit] = consumer.commitSession + def rollback: IO[JMSException, Unit] = consumer.rollbackSession } object JmsConsumer { - def consume(destination: DestinationFactory, - acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE): ZStream[BlockingConnection, JMSException, Message] = - ZStream - .managed(make[Message](destination, transacted = false, acknowledgementMode)) - .flatMap(_.consume((m, _) => m)) - def make[A](destination: DestinationFactory, transacted: Boolean, - acknowledgementMode: Int): ZManaged[BlockingConnection, JMSException, JmsConsumer[A]] = + acknowledgementMode: Int, + timeout: Option[Long] = None): ZIO[Scope & JMSConnection, JMSException, JmsConsumer[A]] = for { - connection <- ZIO.service[JMSConnection].toManaged_ + connection <- ZIO.service[JMSConnection] session <- session(connection, transacted, acknowledgementMode) - d <- destination(session).toManaged_ + d <- destination(session) mc <- consumer(session, d) - semaphore <- Semaphore.make(1).toManaged_ - } yield new JmsConsumer(session, mc, semaphore) + semaphore <- Semaphore.make(1) + } yield new JmsConsumer(session, mc, semaphore, timeout) + + def consume( + destination: DestinationFactory, + acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE, + timeout: Option[Long] = Some(300) + ): ZStream[JMSConnection, JMSException, Message] = + ZStream + .scoped[JMSConnection](make[Message](destination, transacted = false, acknowledgementMode, timeout)) + .flatMap(_.consume((m, _) => m)) - def consumeTx(destination: DestinationFactory): ZStream[BlockingConnection, JMSException, TxMessage] = + def consumeTx(destination: DestinationFactory, timeout: Option[Long] = Some(300)): ZStream[JMSConnection, JMSException, TxMessage] = ZStream - .managed(make[TxMessage](destination, transacted = true, Session.SESSION_TRANSACTED)) + .scoped[JMSConnection](make[TxMessage](destination, transacted = true, Session.SESSION_TRANSACTED, timeout)) .flatMap(_.consume(new TxMessage(_, _))) def consumeAndReplyWith[R, E >: JMSException]( destination: DestinationFactory, responder: (Message, Session) => ZIO[R, E, Option[Message]], transacted: Boolean = false, - acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE - ): ZIO[R with Blocking with Has[JMSConnection], Any, Unit] = - createPipeline(destination, responder, transacted, acknowledgementMode) + acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE, + timeout: Option[Long] = Some(300) + ): ZIO[R & JMSConnection, Any, Unit] = + createPipeline(destination, responder, transacted, acknowledgementMode, timeout) private[jms] def createPipeline[R, E]( destination: DestinationFactory, responder: (Message, Session) => ZIO[R, E, Option[Message]], transacted: Boolean, - acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE - ): ZIO[R with BlockingConnection, Any, Unit] = { + acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE, + timeout: Option[Long] = Some(300) + ): ZIO[R & JMSConnection, Any, Unit] = { val consumerAndProducer = for { - connection <- ZIO.service[JMSConnection].toManaged_ + connection <- ZIO.service[JMSConnection] session <- session(connection, transacted, acknowledgementMode) - d <- destination(session).toManaged_ + d <- destination(session) mc <- consumer(session, d) mp <- producer(session) - semaphore <- Semaphore.make(1).toManaged_ + semaphore <- Semaphore.make(1) } yield (session, mc, mp, semaphore) ZStream - .managed(consumerAndProducer) + .scoped(consumerAndProducer) .flatMap { case (session, mc, mp, semaphore) => - new JmsConsumer[Message](session, mc, semaphore) + new JmsConsumer[Message](session, mc, semaphore, timeout) .consume((m, _) => m) - .mapM { request => + .mapZIO { request => for { response <- responder(request, session) _ <- response .filter(_ => request.getJMSReplyTo != null) - .map( - response => - Task { - response.setJMSCorrelationID(request.getJMSCorrelationID) - mp.send(request.getJMSReplyTo, response) - }.tapError(_ => rollback(session).when(transacted)) - ) + .map { response => + ZIO.attemptBlocking { + response.setJMSCorrelationID(request.getJMSCorrelationID) + mp.send(request.getJMSReplyTo, response) + }.tapError(_ => rollback(session).when(transacted)) + } .getOrElse(ZIO.unit) _ <- acknowledge(request).unless(transacted) *> commit(session).when(transacted) } yield () @@ -102,10 +116,13 @@ object JmsConsumer { def consumeWith[R, E >: JMSException]( destination: DestinationFactory, processor: Message => ZIO[R, E, Any], - acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE - ): ZIO[R with BlockingConnection, E, Unit] = - make[Message](destination, transacted = false, acknowledgementMode) - .use(_.consume((m, _) => m).foreach(m => processor(m) *> acknowledge(m)).unit) + acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE, + timeout: Option[Long] = Some(300) + ): ZIO[R with JMSConnection, E, Unit] = + ZStream + .scoped[R with JMSConnection](make[Message](destination, transacted = false, acknowledgementMode, timeout)) + .flatMap(_.consume((m, _) => m)) + .foreach(m => processor(m) *> acknowledge(m)) /** * Consumes the specified destination and provides the processor with received message. @@ -116,14 +133,16 @@ object JmsConsumer { * @tparam E specifies possible error type * @return unit */ - def consumeTxWith[R, E >: JMSException]( - destination: DestinationFactory, - processor: Message => ZIO[R, E, Any], - ): ZIO[R with BlockingConnection, E, Unit] = - make[TxMessage](destination, transacted = true, Session.SESSION_TRANSACTED) - .use( - _.consume(new TxMessage(_, _)).foreach { tm => - processor(tm.message).tapBoth(_ => tm.rollback, _ => tm.commit) - }.unit + def consumeTxWith[R, E >: JMSException](destination: DestinationFactory, + processor: Message => ZIO[R, E, Any], + timeout: Option[Long] = Some(300)): ZIO[R & JMSConnection, E, Unit] = + ZStream + .scoped[R with JMSConnection]( + make[TxMessage](destination, transacted = true, Session.SESSION_TRANSACTED, timeout) ) + .flatMap(_.consume(new TxMessage(_, _))) + .foreach { tm => + processor(tm.message).tapBoth(e => ZIO.logDebug(s"Rolling back ${tm.message} because of $e!") *> tm.rollback, + _ => ZIO.logDebug("Committing a message") *> tm.commit) + } } diff --git a/src/main/scala/io/github/dobrynya/zio/jms/JmsProducer.scala b/src/main/scala/io/github/dobrynya/zio/jms/JmsProducer.scala index 6ece9bf..9161b47 100644 --- a/src/main/scala/io/github/dobrynya/zio/jms/JmsProducer.scala +++ b/src/main/scala/io/github/dobrynya/zio/jms/JmsProducer.scala @@ -9,10 +9,10 @@ class JmsProducer[R, E >: JMSException, A](session: Session, sender: A => ZIO[R, semaphore.withPermit(sender(message).map(message -> _)) def commit: IO[JMSException, Unit] = - semaphore.withPermit(Task(session.commit()).refineToOrDie) + semaphore.withPermit(ZIO.attemptBlocking(session.commit()).refineToOrDie) def rollback: IO[JMSException, Unit] = - semaphore.withPermit(Task(session.rollback()).refineToOrDie) + semaphore.withPermit(ZIO.attemptBlocking(session.rollback()).refineToOrDie) } object JmsProducer { @@ -33,25 +33,25 @@ object JmsProducer { encoder: (A, Session) => ZIO[R, E, Message], transacted: Boolean = false, acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE - ): ZSink[R with BlockingConnection, E, A, A, Unit] = - ZSink.managed[R with BlockingConnection, E, A, JmsProducer[R, E, A], A, Unit]( - make(destination, encoder, transacted, acknowledgementMode) - ) { jmsProducer => - ZSink.foreach(message => jmsProducer.produce(message) <* jmsProducer.commit.when(transacted)) - } + ): ZSink[R & JMSConnection & Scope, E, A, A, Unit] = + ZSink + .fromZIO(make(destination, encoder, transacted, acknowledgementMode)) + .flatMap { jmsProducer => + ZSink.foreach(message => jmsProducer.produce(message) <* jmsProducer.commit.when(transacted)) + } def make[R, E >: JMSException, A]( destination: DestinationFactory, encoder: (A, Session) => ZIO[R, E, Message], transacted: Boolean = false, acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE - ): ZManaged[R with BlockingConnection, JMSException, JmsProducer[R, E, A]] = + ): ZIO[R & JMSConnection & Scope, JMSException, JmsProducer[R, E, A]] = for { - connection <- ZIO.service[JMSConnection].toManaged_ + connection <- ZIO.service[JMSConnection] session <- session(connection, transacted, acknowledgementMode) - d <- destination(session).toManaged_ + d <- destination(session) mp <- producer(session) - semaphore <- Semaphore.make(1).toManaged_ + semaphore <- Semaphore.make(1) } yield new JmsProducer[R, E, A](session, message => @@ -65,25 +65,27 @@ object JmsProducer { encoderAndRouter: (A, Session) => ZIO[R, E, (Destination, Message)], transacted: Boolean = false, acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE - ): ZSink[R with BlockingConnection, E, A, A, Unit] = - ZSink.managed[R with BlockingConnection, E, A, JmsProducer[R, E, A], A, Unit]( - for { - connection <- ZIO.service[JMSConnection].toManaged_ - session <- session(connection, transacted, acknowledgementMode) - mp <- producer(session) - semaphore <- Semaphore.make(1).toManaged_ - } yield - new JmsProducer[R, E, A](session, - message => - encoderAndRouter(message, session).map { - case (d, encoded) => - mp.send(d, encoded) - encoded - }, - semaphore) - ) { jmsProducer => - ZSink.foreach(message => jmsProducer.produce(message) <* jmsProducer.commit.when(transacted)) - } + ): ZSink[R & JMSConnection & Scope, E, A, A, Unit] = + ZSink + .fromZIO[R & JMSConnection & Scope, E, JmsProducer[R, E, A]]( + for { + connection <- ZIO.service[JMSConnection] + session <- session(connection, transacted, acknowledgementMode) + mp <- producer(session) + semaphore <- Semaphore.make(1) + } yield + new JmsProducer[R, E, A](session, + message => + encoderAndRouter(message, session).map { + case (d, encoded) => + mp.send(d, encoded) + encoded + }, + semaphore) + ) + .flatMap { jmsProducer => + ZSink.foreach(message => jmsProducer.produce(message) <* jmsProducer.commit.when(transacted)) + } /** * Creates a sink for implementing Request - Reply integration pattern. @@ -105,25 +107,27 @@ object JmsProducer { encoder: (A, Session) => ZIO[R, E, Message], transacted: Boolean = false, acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE - ): ZSink[R with BlockingConnection, E, A, A, Unit] = - ZSink.managed[R with BlockingConnection, E, A, JmsProducer[R, E, A], A, Unit]( - for { - connection <- ZIO.service[JMSConnection].toManaged_ - session <- session(connection, transacted, acknowledgementMode) - mp <- producer(session) - semaphore <- Semaphore.make(1).toManaged_ - d <- destination(session).toManaged_ - replyHeader <- replyTo(session).toManaged_ - } yield - new JmsProducer[R, E, A](session, - message => - encoder(message, session).map { encoded => - encoded.setJMSReplyTo(replyHeader) - mp.send(d, encoded) - encoded - }, - semaphore) - ) { jmsProducer => - ZSink.foreach(message => jmsProducer.produce(message) <* jmsProducer.commit.when(transacted)) - } + ): ZSink[R & JMSConnection & Scope, E, A, A, Unit] = + ZSink + .fromZIO[R & JMSConnection & Scope, E, JmsProducer[R, E, A]]( + for { + connection <- ZIO.service[JMSConnection] + session <- session(connection, transacted, acknowledgementMode) + mp <- producer(session) + semaphore <- Semaphore.make(1) + d <- destination(session) + replyHeader <- replyTo(session) + } yield + new JmsProducer[R, E, A](session, + message => + encoder(message, session).map { encoded => + encoded.setJMSReplyTo(replyHeader) + mp.send(d, encoded) + encoded + }, + semaphore) + ) + .flatMap { jmsProducer => + ZSink.foreach(message => jmsProducer.produce(message) <* jmsProducer.commit.when(transacted)) + } } diff --git a/src/main/scala/io/github/dobrynya/zio/jms/destinations.scala b/src/main/scala/io/github/dobrynya/zio/jms/destinations.scala index 75e85a2..4e8db61 100644 --- a/src/main/scala/io/github/dobrynya/zio/jms/destinations.scala +++ b/src/main/scala/io/github/dobrynya/zio/jms/destinations.scala @@ -1,24 +1,24 @@ package io.github.dobrynya.zio.jms -import zio.{ IO, Task } +import zio.{ IO, ZIO } import javax.jms._ case class Queue(name: String) extends DestinationFactory { override def apply(session: Session): IO[JMSException, Destination] = - Task(session.createQueue(name)).refineToOrDie[JMSException] + ZIO.attempt(session.createQueue(name)).refineToOrDie[JMSException] } case class Topic(name: String) extends DestinationFactory { override def apply(session: Session): IO[JMSException, Destination] = - Task(session.createTopic(name)).refineToOrDie[JMSException] + ZIO.attempt(session.createTopic(name)).refineToOrDie[JMSException] } case object TemporaryQueue extends DestinationFactory { override def apply(session: Session): IO[JMSException, Destination] = - Task(session.createTemporaryQueue()).refineToOrDie[JMSException] + ZIO.attempt(session.createTemporaryQueue()).refineToOrDie[JMSException] } case object TemporaryTopic extends DestinationFactory { override def apply(session: Session): IO[JMSException, Destination] = - Task(session.createTemporaryQueue()).refineToOrDie[JMSException] + ZIO.attempt(session.createTemporaryQueue()).refineToOrDie[JMSException] } diff --git a/src/main/scala/io/github/dobrynya/zio/jms/package.scala b/src/main/scala/io/github/dobrynya/zio/jms/package.scala index a3685ae..125c449 100644 --- a/src/main/scala/io/github/dobrynya/zio/jms/package.scala +++ b/src/main/scala/io/github/dobrynya/zio/jms/package.scala @@ -2,54 +2,57 @@ package io.github.dobrynya.zio import javax.jms._ import zio._ -import zio.blocking._ package object jms { - type DestinationFactory = Session => IO[JMSException, Destination] - type BlockingConnection = Blocking with Has[Connection] def connection(connectionFactory: ConnectionFactory, clientId: Option[String] = None, - credentials: Option[(String, String)] = None): ZManaged[Blocking, JMSException, Connection] = - Managed.make { - effectBlockingInterrupt { - val connection = credentials - .map(creds => connectionFactory.createConnection(creds._1, creds._2)) - .getOrElse(connectionFactory.createConnection()) - clientId.foreach(connection.setClientID) - connection.start() - connection - } - }(c => Task(c.close()).ignore).refineToOrDie + credentials: Option[(String, String)] = None): ZIO[Scope, JMSException, Connection] = + ZIO.acquireRelease(ZIO.attemptBlocking { + val connection = credentials + .map(creds => connectionFactory.createConnection(creds._1, creds._2)) + .getOrElse(connectionFactory.createConnection()) + clientId.foreach(connection.setClientID) + connection.start() + connection + })(c => ZIO.succeedBlocking(c.close())).refineToOrDie + + def makeConnection(clientId: Option[String] = None, + credentials: Option[(String, String)] = None): ZLayer[ConnectionFactory, JMSException, Connection] = + ZLayer.scoped(ZIO.service[ConnectionFactory].flatMap(connection(_, clientId, credentials))) def textMessageEncoder: (String, Session) => IO[JMSException, TextMessage] = - (text: String, session: Session) => Task(session.createTextMessage(text)).refineToOrDie + (text: String, session: Session) => ZIO.attempt(session.createTextMessage(text)).refineToOrDie def onlyText: PartialFunction[Message, String] = { case text: TextMessage => text.getText } - def acknowledge(message: Message): ZIO[Blocking, JMSException, Unit] = - effectBlockingInterrupt(message.acknowledge()).refineToOrDie + def acknowledge(message: Message): IO[JMSException, Unit] = + ZIO.attemptBlocking(message.acknowledge()).refineToOrDie private[jms] def session(connection: Connection, transacted: Boolean, - acknowledgeMode: Int): ZManaged[Blocking, JMSException, Session] = - Managed - .make(effectBlockingInterrupt(connection.createSession(transacted, acknowledgeMode)))(s => Task(s.close()).orDie) - .refineToOrDie - - private[jms] def producer(session: Session): ZManaged[Blocking, JMSException, MessageProducer] = - Managed.make(effectBlockingInterrupt(session.createProducer(null)).refineToOrDie)(p => UIO(p.close())) + acknowledgeMode: Int): ZIO[Scope, JMSException, Session] = + ZIO.acquireRelease(ZIO.attemptBlocking(connection.createSession(transacted, acknowledgeMode)).refineToOrDie)(s => + ZIO.succeedBlocking(s.close())) + + private[jms] def producer(session: Session): ZIO[Scope, JMSException, MessageProducer] = { + val acquire: IO[JMSException, MessageProducer] = + ZIO.attemptBlocking(session.createProducer(null)).refineToOrDie + ZIO.acquireRelease(acquire)(p => ZIO.succeedBlocking(p.close())) + } - private[jms] def consumer(session: Session, - destination: Destination): ZManaged[Blocking, JMSException, MessageConsumer] = - Managed.make(effectBlockingInterrupt(session.createConsumer(destination)).refineToOrDie)(c => UIO(c.close())) + private[jms] def consumer(session: Session, destination: Destination): ZIO[Scope, JMSException, MessageConsumer] = { + def acquire: IO[JMSException, MessageConsumer] = + ZIO.attemptBlocking(session.createConsumer(destination)).refineToOrDie[JMSException] + ZIO.acquireRelease(acquire)(c => ZIO.succeedBlocking(c.close())) + } - private[jms] def commit(session: Session): ZIO[Blocking, JMSException, Unit] = - effectBlockingInterrupt(session.commit()).refineToOrDie + private[jms] def commit(session: Session): IO[JMSException, Unit] = + ZIO.attemptBlocking(session.commit()).refineToOrDie - private[jms] def rollback(session: Session): ZIO[Blocking, JMSException, Unit] = - effectBlockingInterrupt(session.rollback()).refineToOrDie + private[jms] def rollback(session: Session): IO[JMSException, Unit] = + ZIO.attemptBlocking(session.rollback()).refineToOrDie } diff --git a/src/test/scala/io/github/dobrynya/zio/jms/ConnectionAware.scala b/src/test/scala/io/github/dobrynya/zio/jms/ConnectionAware.scala deleted file mode 100644 index 3973bf8..0000000 --- a/src/test/scala/io/github/dobrynya/zio/jms/ConnectionAware.scala +++ /dev/null @@ -1,26 +0,0 @@ -package io.github.dobrynya.zio.jms - -import javax.jms.{Connection, JMSException} -import org.apache.activemq.broker.BrokerService -import org.apache.activemq.ActiveMQConnectionFactory -import zio._ -import zio.blocking.Blocking - -trait ConnectionAware { - val connectionFactory = new ActiveMQConnectionFactory("vm://localhost") - val managedConnection: ZManaged[Blocking, JMSException, Connection] = connection(connectionFactory) - - val brokerService: IO[Any, BrokerService] = Task { - val brokerService = new BrokerService() - brokerService.setUseJmx(false) - brokerService.setPersistent(false) - brokerService.setUseShutdownHook(true) - brokerService.start() - brokerService - } - - val stopBroker: BrokerService => UIO[Unit] = (brokerService: BrokerService) => UIO { - brokerService.stop() - brokerService.waitUntilStopped() - } -} diff --git a/src/test/scala/io/github/dobrynya/zio/jms/JmsComponentsSpec.scala b/src/test/scala/io/github/dobrynya/zio/jms/JmsComponentsSpec.scala deleted file mode 100644 index ff5bd40..0000000 --- a/src/test/scala/io/github/dobrynya/zio/jms/JmsComponentsSpec.scala +++ /dev/null @@ -1,229 +0,0 @@ -package io.github.dobrynya.zio.jms - -import javax.jms.{Queue => _, _} -import zio.{Queue => ZQueue, _} -import zio.blocking.Blocking -import zio.duration._ -import zio.stream._ -import zio.test._ -import zio.test.Assertion._ -import zio.test.TestAspect._ - -object JmsComponentsSpec extends DefaultRunnableSpec with ConnectionAware { - val connectionLayer: ZLayer[Blocking, JMSException, Blocking with Has[Connection]] = - ZLayer.fromManaged(managedConnection).passthrough - - def spec: ZSpec[environment.TestEnvironment, Any] = - suite("JMS components")( - testM("Messages should be sent and received in order of sending") { - checkM(Gen.listOf(Gen.anyString)) { messages => - val received = jmsObjects("JmsSpec-1").use { - case (s, p, mc, d) => - ZIO.foreach(messages)(send(p, s, d, _)) *> ZIO.foreach((1 to messages.size).toList)(_ => receiveText(mc)) - } - assertM(received)(equalTo(messages)) - } - }, - testM("Transactional producer should send messages transactionally") { - checkM(Gen.anyString) { - message => - for { - received <- jmsObjects("JmsSpec-2", transacted = true, Session.SESSION_TRANSACTED).use { - case (s, p, _, d) => - send(p, s, d, message) *> Task(s.rollback()) *> send(p, s, d, message) *> Task(s.commit()) - } *> - jmsObjects("JmsSpec-2").use { - case (_, _, mc, _) => - ZIO.collectAll(List(receiveText(mc, Some(100)), receiveText(mc, Some(100)))) - } - } yield assert(received)(equalTo(List(message, null))) - } - }, - testM("Transactional consumer should rollback session and get the same message twice") { - checkM(Gen.anyString) { message => - for { - received <- jmsObjects("JmsSpec-3").use { - case (s, p, _, d) => send(p, s, d, message) - } *> - ZIO.foreach(1 to 2) { i => - jmsObjects("JmsSpec-3", transacted = true, Session.SESSION_TRANSACTED).use { - case (s, _, mc, _) => receiveText(mc) <* Task(if (i == 1) s.rollback() else s.commit()) - } - } - } yield assert(received.toList)(equalTo(List(message, message))) - } - }, - testM("Acknowledging consumer should get the same message twice without acknowledgement") { - checkM(Gen.anyString) { - message => - for { - received <- jmsObjects("JmsSpec-4").use { - case (s, p, _, d) => send(p, s, d, message) - } *> - ZIO.foreach(List(1, 2)) { i => - jmsObjects("JmsSpec-4", transacted = false, Session.CLIENT_ACKNOWLEDGE).use { - case (s, _, mc, _) => - receive(mc) - .tap(m => Task(if (i == 2) m.acknowledge())) - .map(onlyText orElse nullableMessage) - } - } - } yield assert(received)(equalTo(List(message, message))) - } - }, - testM("All sent messages should be received with auto acknowledgements") { - val dest = Queue("JmsComponentsSpec-1") - - checkM(Gen.listOf(Gen.alphaNumericString)) { messages => - (for { - _ <- send(messages, dest) - received <- JmsConsumer - .consume(dest) - .take(messages.size) - .collect(onlyText) - .runCollect - } yield assert(received.toList)(equalTo(messages))) - .provideCustomLayer(connectionLayer) - } - }, - testM("Not committed messages should arrive twice") { - val dest = Queue("JmsComponentsSpec-2") - - checkM(Gen.listOf(Gen.alphaNumericString)) { messages => - (for { - _ <- send(messages, dest) - received <- ZIO.foreach((1 to 2).toList)( - i => - JmsConsumer - .consumeTx(dest) - .take(messages.size) - .tap(_.commit.when(i == 2)) - .map(_.message) - .collect(onlyText) - .runCollect - ) - } yield assert(received.flatten)(equalTo(messages ++ messages))) - .provideCustomLayer(connectionLayer) - } - }, - testM("Not acknowledged messages should arrive twice") { - val dest = Queue("JmsComponentsSpec-3") - - checkM(Gen.listOf(Gen.alphaNumericString)) { messages => - (for { - _ <- send(messages, dest) - received <- ZIO.foreach(List(1, 2))( - i => - JmsConsumer - .consume(dest, Session.CLIENT_ACKNOWLEDGE) - .take(messages.size) - .tap(acknowledge(_).when(i == 2)) - .collect(onlyText) - .runCollect - ) - } yield assert(received.flatten)(equalTo(messages ++ messages))) - .provideCustomLayer(connectionLayer) - } - }, - testM("Processor should collect all sent messages") { - val dest = Queue("JmsComponentsSpec-4") - - checkM(Gen.listOf(Gen.alphaNumericString)) { messages => - (for { - _ <- send(messages, dest) - collector <- ZQueue.unbounded[String] - _ <- JmsConsumer.consumeWith(dest, message => collector.offer(onlyText(message))).fork - received <- ZStream.fromQueue(collector).take(messages.size).runCollect - } yield assert(received.toList)(equalTo(messages))) - .provideCustomLayer(connectionLayer) - } - }, - testM("Failing processor should not lose any message") { - val dest = Queue("JmsComponentsSpec-5") - - checkM(Gen.listOf(Gen.alphaNumericString).filter(_.nonEmpty)) { - messages => - ( - for { - _ <- send(messages, dest) - collector <- ZQueue.unbounded[String] - _ <- JmsConsumer - .consumeTxWith(dest, message => ZIO.fail(s"Some processing error: ${onlyText(message)}!")) - .ignore - _ <- JmsConsumer - .consumeTxWith(dest, message => collector.offer(onlyText(message))) - .fork - received <- ZStream.fromQueue(collector).take(messages.size).runCollect.map(_.toList) - } yield assert(received)(equalTo(messages)) - ).provideCustomLayer(connectionLayer) - } - }, - testM("Client requires a response to be sent to a dedicated queue via JMSReplyTo header") { - val requestDestination = Queue("JmsComponentsSpec-6") - val replyDestination = Queue("JmsComponentsSpec-7") - - checkM(Gen.listOf(Gen.alphaNumericString).filter(_.nonEmpty)) { - messages => - (for { - _ <- ZStream - .fromIterable(messages) - .run(JmsProducer.requestSink(requestDestination, replyDestination, textMessageEncoder)) - _ <- JmsConsumer - .consumeAndReplyWith(requestDestination, - (message, session) => - textMessageEncoder(onlyText(message).toUpperCase, session).map(Some.apply)) - .fork - received <- JmsConsumer.consume(replyDestination).take(messages.size).collect(onlyText).runCollect - } yield assert(received.toList)(equalTo(messages.map(_.toUpperCase)))) - .provideCustomLayer(connectionLayer) - } - }, - testM("JmsProducer should commit messages automatically in case of a transactional session") { - checkM(Gen.listOf(Gen.alphaNumericString)) { - messages => - val destination = Queue("JmsComponentsSpec-8") - - (for { - _ <- ZStream - .fromIterable(messages) - .run( - JmsProducer.sink(destination, textMessageEncoder, transacted = true, Session.SESSION_TRANSACTED) - ) - received <- JmsConsumer.consume(destination).collect(onlyText).take(messages.size).runCollect - } yield assert(received.toList)(equalTo(messages))) - .provideCustomLayer(connectionLayer) - } - } - ) @@ timeout(3.minute) @@ timed @@ sequential @@ around(brokerService)(stopBroker) - - private def send(messages: List[String], destination: DestinationFactory) = - ZStream - .fromIterable(messages) - .run(JmsProducer.sink(destination, textMessageEncoder)) - - private def receive(consumer: MessageConsumer, timeout: Option[Long] = None): ZIO[Any, JMSException, Message] = - Task(timeout.map(consumer.receive).getOrElse(consumer.receive())).refineToOrDie - - private def receiveText(consumer: MessageConsumer, timeout: Option[Long] = None): ZIO[Any, JMSException, String] = - receive(consumer, timeout).map(onlyText orElse nullableMessage) - - private def send(p: MessageProducer, s: Session, d: Destination, message: String) = - Task(p.send(d, s.createTextMessage(message))) - - private def jmsObjects( - dest: String, - transacted: Boolean = false, - acknowledgementMode: Int = Session.AUTO_ACKNOWLEDGE - ): ZManaged[Blocking, Throwable, (Session, MessageProducer, MessageConsumer, Destination)] = - for { - c <- managedConnection - s <- session(c, transacted, acknowledgementMode) - d <- Queue(dest)(s).toManaged_ - p <- producer(s) - mc <- consumer(s, d) - } yield (s, p, mc, d) - - private def nullableMessage[T <: AnyRef]: PartialFunction[Message, T] = { - case null => null.asInstanceOf[T] - } -} diff --git a/src/test/scala/io/github/dobrynya/zio/jms/JmsConsumerSpec.scala b/src/test/scala/io/github/dobrynya/zio/jms/JmsConsumerSpec.scala new file mode 100644 index 0000000..406b03c --- /dev/null +++ b/src/test/scala/io/github/dobrynya/zio/jms/JmsConsumerSpec.scala @@ -0,0 +1,160 @@ +package io.github.dobrynya.zio.jms + +import zio.{ Queue => ZQueue, _ } +import stream._ +import test._ +import TestAspect._ +import JmsTestKitAware._ +import javax.jms.{ Connection, Message, Session } + +object JmsConsumerSpec extends ZIOSpecDefault { + private val randomMessages = Gen.listOf(Gen.alphaNumericString.filter(s => s != null && s.nonEmpty)).filter(_.nonEmpty) + + override def spec: Spec[TestEnvironment, Throwable] = + suite("JmsConsumer test suite")( + test("JmsConsumer should consume messages in order of sending") { + val q = Queue("test-1") + check(randomMessages) { messages => + for { + _ <- send(q, messages) + received <- JmsConsumer + .consume(q) + .take(messages.size) + .collect(onlyText) + .runCollect + .map(_.toList) + } yield assertTrue(received == messages) + } + }, + test("JmsConsumer should consume message transactionally") { + val q = Queue("test-2") + check(randomMessages) { messages => + for { + _ <- send(q, messages) + notComitted <- JmsConsumer + .consumeTx(q) + .take(messages.size) + .map(_.message) + .collect(onlyText) + .runCollect + .map(_.toList) + comitted <- JmsConsumer + .consumeTx(q) + .take(messages.size) + .tap(_.commit) + .map(_.message) + .collect(onlyText) + .runCollect + .map(_.toList) + } yield assertTrue(comitted == messages && notComitted == messages) + } + }, + test("JmsConsumer should consume all sent messages") { + val q = Queue("test-3") + check(randomMessages) { messages => + for { + _ <- send(q, messages) + queue <- ZQueue.unbounded[Message] + consumer <- JmsConsumer.consumeWith(q, m => queue.offer(m)).fork + received <- queue.takeN(messages.size).map(_.toList.collect(onlyText)) <* consumer.interrupt + } yield assertTrue(received == messages) + + } + }, + test("JmsConsumer should consume all sent messages transactionally") { + check(randomMessages, Gen.fromIterable(1 to 1005000).map(i => s"test-4-$i")) { + (messages, queueName) => + val q = Queue(queueName) + for { + _ <- send(q, messages) + collector <- ZQueue.unbounded[String] + _ <- JmsConsumer + .consumeTxWith(q, message => ZIO.fail(s"Expected: ${onlyText(message)}!")) + .ignore + consumer <- JmsConsumer + .consumeTxWith(q, message => collector.offer(onlyText(message))) + .fork + received <- collector.takeN(messages.size).map(_.toList) <* consumer.interrupt + } yield assertTrue(received == messages) + } + }, + test("Consuming a queue should be interruptable") { + val q = Queue("test-5") + for { + collector <- Ref.make(false) + consumer <- JmsConsumer.consumeWith(q, _ => collector.set(true)).fork + receivedMessage <- consumer.interrupt.delay(1.second) *> collector.get + } yield assertTrue(!receivedMessage) + }, + test("Transactional consuming a queue should be interruptable") { + val q = Queue("test-6") + for { + collector <- Ref.make(false) + consumer <- JmsConsumer.consumeTxWith(q, _ => collector.set(true)).fork + receivedMessage <- consumer.interrupt.delay(1.second) *> collector.get + } yield assertTrue(!receivedMessage) + }, + test("Closing a MessageConsumer should interrupt waiting for a message") { + val q = Queue("test-7") + ZIO.scoped { + for { + c <- ZIO.service[Connection] + s <- session(c, transacted = false, Session.AUTO_ACKNOWLEDGE) + d <- q(s) + mc <- consumer(s, d) + flag <- Ref.make(false) + _ <- ZIO.attemptBlocking(mc.receive()).flatMap(m => flag.set(m == null)).fork + _ <- ZIO.attemptBlocking(mc.close()).delay(1.second) + result <- flag.get.delay(1.second) + } yield assertTrue(result) + } + }, + test("JmsConsumer should reply to the dedicated destination when processing messages") { + val request = Queue("test-8-request") + val reply = Queue("test-8-reply") + check(randomMessages) { + messages => + for { + _ <- ZIO.scoped( + ZStream.fromIterable(messages).run(JmsProducer.requestSink(request, reply, textMessageEncoder)) + ) + consumer <- JmsConsumer + .consumeAndReplyWith( + request, + (message, session) => + textMessageEncoder(onlyText(message).toUpperCase, session).map(Some.apply) + ) + .fork + received <- JmsConsumer.consume(reply).take(messages.size).collect(onlyText).runCollect.map(_.toList) <* + consumer.interrupt + } yield assertTrue(received == messages.map(_.toUpperCase)) + } + }, + test("JmsConsumer should reply to the dedicated destination when processing messages transactionally") { + val request = Queue("test-9-request") + val reply = Queue("test-9-reply") + check(randomMessages) { + messages => + for { + _ <- ZIO.scoped( + ZStream.fromIterable(messages).run(JmsProducer.requestSink(request, reply, textMessageEncoder)) + ) + consumer <- JmsConsumer + .consumeAndReplyWith(request, + (message, session) => + textMessageEncoder(onlyText(message).toUpperCase, session) + .map(Some.apply), + transacted = true, + Session.SESSION_TRANSACTED) + .fork + received <- JmsConsumer.consume(reply).collect(onlyText).take(messages.size).runCollect.map(_.toList) <* + consumer.interrupt + } yield assertTrue(received == messages.map(_.toUpperCase)) + } + } + ).provideCustomShared(brokerLayer >>> connectionFactoryLayer >>> makeConnection()) @@ + withLiveEnvironment @@ timed @@ timeout(3.minutes) @@ sequential + + def send(destination: DestinationFactory, messages: List[String]): RIO[Connection, Unit] = + ZIO.scoped(ZStream.fromIterable(messages).run(JmsProducer.sink(destination, textMessageEncoder))) +} diff --git a/src/test/scala/io/github/dobrynya/zio/jms/JmsProducerSpec.scala b/src/test/scala/io/github/dobrynya/zio/jms/JmsProducerSpec.scala new file mode 100644 index 0000000..0a83ab3 --- /dev/null +++ b/src/test/scala/io/github/dobrynya/zio/jms/JmsProducerSpec.scala @@ -0,0 +1,43 @@ +package io.github.dobrynya.zio.jms + +import zio.{ durationInt, Scope, ZIO } +import zio.stream.ZStream +import zio.test._ +import TestAspect._ +import org.apache.activemq.command.ActiveMQQueue +import JmsTestKitAware._ + +object JmsProducerSpec extends ZIOSpecDefault { + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("JmsProducer test suite")( + test("requestSink should set JMSReplyTo header") { + val request = Queue("test-jp-1") + val reply = Queue("test-jp-2") + check(Gen.alphaNumericString) { message => + for { + _ <- ZIO.scoped( + ZStream + .from(message) + .run(JmsProducer.requestSink(request, reply, textMessageEncoder)) + ) + received <- JmsConsumer.consume(request).runHead + } yield assertTrue(received.map(_.getJMSReplyTo).contains(new ActiveMQQueue(reply.name))) + } + }, + test("routerSink should send messages basing on message content") { + check(Gen.fromIterable(1 to 2)) { queueNumber => + for { + _ <- ZIO.scoped( + ZStream + .from(queueNumber.toString) + .run( + JmsProducer.routerSink((m: String, s) => Queue(s"test-jp-2-$m")(s).zip(textMessageEncoder(m, s))) + ) + ) + received <- JmsConsumer.consume(Queue(s"test-jp-2-$queueNumber")).collect(onlyText).runHead + } yield assertTrue(received.contains(s"$queueNumber")) + } + } + ).provideCustomShared(brokerLayer >>> connectionFactoryLayer >>> makeConnection()) @@ + withLiveEnvironment @@ timed @@ timeout(3.minutes) @@ sequential +} diff --git a/src/test/scala/io/github/dobrynya/zio/jms/JmsTestKitAware.scala b/src/test/scala/io/github/dobrynya/zio/jms/JmsTestKitAware.scala new file mode 100644 index 0000000..221dfd0 --- /dev/null +++ b/src/test/scala/io/github/dobrynya/zio/jms/JmsTestKitAware.scala @@ -0,0 +1,19 @@ +package io.github.dobrynya.zio.jms + +import jmstestkit.JmsBroker +import zio.{ Scope, ULayer, URIO, URLayer, ZIO, ZLayer } + +import javax.jms.ConnectionFactory + +object JmsTestKitAware { + val broker: URIO[Scope, JmsBroker] = + ZIO.acquireRelease(ZIO.succeed(JmsBroker()))(b => ZIO.succeed(b.stop())) + + val brokerLayer: ULayer[JmsBroker] = ZLayer.scoped(broker) + + val connectionFactory: URIO[JmsBroker, ConnectionFactory] = + ZIO.serviceWith[JmsBroker](_.createConnectionFactory) + + val connectionFactoryLayer: URLayer[JmsBroker, ConnectionFactory] = + ZLayer.fromZIO(connectionFactory) +}