diff --git a/build.sbt b/build.sbt index a2b1476..8935010 100644 --- a/build.sbt +++ b/build.sbt @@ -32,8 +32,8 @@ libraryDependencies ++= Seq( "com.google.code.findbugs" % "jsr305" % "3.0.0", "org.scalatest" %% "scalatest" % "2.2.3" % "test", // for TCK "com.google.inject" % "guice" % "3.0" % "test", // to make sbt happy - "org.reactivestreams" % "reactive-streams-tck" % "1.0.0.RC1" % "test", - "com.typesafe.akka" %% "akka-stream-experimental" % "1.0-M4" % "test" + "org.reactivestreams" % "reactive-streams-tck" % "1.0.0.RC3" % "test", + "com.typesafe.akka" %% "akka-stream-experimental" % "1.0-M5" % "test" ) publishMavenStyle := true diff --git a/src/main/scala/io/scalac/amqp/impl/CanceledSubscription.scala b/src/main/scala/io/scalac/amqp/impl/CanceledSubscription.scala new file mode 100644 index 0000000..eb0443b --- /dev/null +++ b/src/main/scala/io/scalac/amqp/impl/CanceledSubscription.scala @@ -0,0 +1,9 @@ +package io.scalac.amqp.impl + +import org.reactivestreams.Subscription + + +private[impl] object CanceledSubscription extends Subscription { + override def request(n: Long) = () + override def cancel() = () +} diff --git a/src/main/scala/io/scalac/amqp/impl/ExchangeSubscriber.scala b/src/main/scala/io/scalac/amqp/impl/ExchangeSubscriber.scala index 74f9b5b..cc5e26b 100644 --- a/src/main/scala/io/scalac/amqp/impl/ExchangeSubscriber.scala +++ b/src/main/scala/io/scalac/amqp/impl/ExchangeSubscriber.scala @@ -2,6 +2,7 @@ package io.scalac.amqp.impl import java.util.concurrent.atomic.AtomicReference +import com.google.common.base.Preconditions.checkNotNull import com.rabbitmq.client.Channel import io.scalac.amqp.Routed @@ -31,7 +32,10 @@ private[amqp] class ExchangeSubscriber(channel: Channel, exchange: String) } /** Our life cycle is bounded to underlying `Channel`. */ - override def onError(t: Throwable) = channel.close() + override def onError(t: Throwable) = { + checkNotNull(t) + channel.close() + } /** Our life cycle is bounded to underlying `Channel`. */ override def onComplete() = channel.close() diff --git a/src/main/scala/io/scalac/amqp/impl/QueuePublisher.scala b/src/main/scala/io/scalac/amqp/impl/QueuePublisher.scala index 85a5e83..f230c98 100644 --- a/src/main/scala/io/scalac/amqp/impl/QueuePublisher.scala +++ b/src/main/scala/io/scalac/amqp/impl/QueuePublisher.scala @@ -1,6 +1,7 @@ package io.scalac.amqp.impl import scala.concurrent.stm.Ref +import scala.util.{Success, Failure, Try} import scala.util.control.NonFatal import com.rabbitmq.client.{ShutdownSignalException, ShutdownListener, Connection} @@ -30,18 +31,24 @@ private[amqp] class QueuePublisher( subscribers.single.getAndTransform(_ + subscriber) match { case ss if ss.contains(subscriber) ⇒ throw new IllegalStateException(s"Rule 1.10: Subscriber=$subscriber is already subscribed to this publisher.") - case _ ⇒ try { - val channel = connection.createChannel() - channel.addShutdownListener(newShutdownListener(subscriber)) - - val subscription = new QueueSubscription(channel, queue, subscriber) - subscriber.onSubscribe(subscription) - - channel.basicQos(prefetch) - channel.basicConsume(queue, false, subscription) - } catch { - case NonFatal(exception) ⇒ subscriber.onError(exception) - } + case _ ⇒ + Try(connection.createChannel()) match { + case Success(channel) ⇒ + channel.addShutdownListener(newShutdownListener(subscriber)) + val subscription = new QueueSubscription(channel, queue, subscriber) + + try { + subscriber.onSubscribe(subscription) + channel.basicQos(prefetch) + channel.basicConsume(queue, false, subscription) + } catch { + case NonFatal(exception) ⇒ subscriber.onError(exception) + } + + case Failure(cause) ⇒ + subscriber.onSubscribe(CanceledSubscription) + subscriber.onError(cause) + } } def newShutdownListener(subscriber: Subscriber[_ >: Delivery]) = new ShutdownListener { diff --git a/src/main/scala/io/scalac/amqp/impl/QueueSubscription.scala b/src/main/scala/io/scalac/amqp/impl/QueueSubscription.scala index d2ff778..acfc800 100644 --- a/src/main/scala/io/scalac/amqp/impl/QueueSubscription.scala +++ b/src/main/scala/io/scalac/amqp/impl/QueueSubscription.scala @@ -36,8 +36,8 @@ private[amqp] class QueueSubscription(channel: Channel, queue: String, subscribe def deliver(delivery: Delivery): Unit = try { if(channel.isOpen()) { - subscriber.onNext(delivery) channel.basicAck(delivery.deliveryTag.underlying, false) + subscriber.onNext(delivery) } } catch { case NonFatal(exception) ⇒ diff --git a/src/test/scala/io/scalac/amqp/impl/ExchangeSubscriberBlackboxSpec.scala b/src/test/scala/io/scalac/amqp/impl/ExchangeSubscriberBlackboxSpec.scala index 0b9bf5e..817bad3 100644 --- a/src/test/scala/io/scalac/amqp/impl/ExchangeSubscriberBlackboxSpec.scala +++ b/src/test/scala/io/scalac/amqp/impl/ExchangeSubscriberBlackboxSpec.scala @@ -30,7 +30,7 @@ class ExchangeSubscriberBlackboxSpec(defaultTimeout: FiniteDuration) extends Sub def createHelperSource(elements: Long): Source[Routed, Unit] = elements match { /** if `elements` is 0 the `Publisher` should signal `onComplete` immediately. */ - case 0 ⇒ Source.empty() + case 0 ⇒ Source.empty /** if `elements` is [[Long.MaxValue]] the produced stream must be infinite. */ case Long.MaxValue ⇒ Source(() ⇒ Iterator.continually(message)) /** It must create a `Publisher` for a stream with exactly the given number of elements. */ @@ -39,5 +39,6 @@ class ExchangeSubscriberBlackboxSpec(defaultTimeout: FiniteDuration) extends Sub case n ⇒ sys.error("n > Int.MaxValue") } - override def createHelperPublisher(elements: Long) = createHelperSource(elements).runWith(Sink.publisher()) + override def createHelperPublisher(elements: Long) = createHelperSource(elements).runWith(Sink.publisher) + override def createElement(element: Int) = message } diff --git a/src/test/scala/io/scalac/amqp/impl/QueuePublisherSpec.scala b/src/test/scala/io/scalac/amqp/impl/QueuePublisherSpec.scala index f357b42..160fbde 100644 --- a/src/test/scala/io/scalac/amqp/impl/QueuePublisherSpec.scala +++ b/src/test/scala/io/scalac/amqp/impl/QueuePublisherSpec.scala @@ -5,6 +5,8 @@ import scala.concurrent.duration._ import java.util.concurrent.atomic.AtomicLong +import akka.stream.scaladsl.{Source, Sink} + import com.rabbitmq.client.AMQP import io.scalac.amqp.{Connection, Delivery} @@ -25,8 +27,6 @@ class QueuePublisherSpec(defaultTimeout: FiniteDuration, publisherShutdownTimeou /** Calls a function after passing n messages. */ def callAfterN(delegate: Publisher[Delivery], n: Long)(f: () ⇒ Unit) = new Publisher[Delivery] { - require(n > 0) - override def subscribe(subscriber: Subscriber[_ >: Delivery]) = delegate.subscribe(new Subscriber[Delivery] { val counter = new AtomicLong() @@ -79,7 +79,7 @@ class QueuePublisherSpec(defaultTimeout: FiniteDuration, publisherShutdownTimeou conn.consume("whatever") } - override def spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() = { + override def untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() = { val queue = declareQueue() val publisher = connection.consume(queue)