Skip to content

Commit

Permalink
Update to reactive-streams 1.0.0.RC3
Browse files Browse the repository at this point in the history
  • Loading branch information
mkiedys committed Mar 31, 2015
1 parent fc4766c commit fdba020
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 21 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Expand Up @@ -32,8 +32,8 @@ libraryDependencies ++= Seq(
"com.google.code.findbugs" % "jsr305" % "3.0.0",
"org.scalatest" %% "scalatest" % "2.2.3" % "test", // for TCK
"com.google.inject" % "guice" % "3.0" % "test", // to make sbt happy
"org.reactivestreams" % "reactive-streams-tck" % "1.0.0.RC1" % "test",
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-M4" % "test"
"org.reactivestreams" % "reactive-streams-tck" % "1.0.0.RC3" % "test",
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-M5" % "test"
)

publishMavenStyle := true
Expand Down
9 changes: 9 additions & 0 deletions src/main/scala/io/scalac/amqp/impl/CanceledSubscription.scala
@@ -0,0 +1,9 @@
package io.scalac.amqp.impl

import org.reactivestreams.Subscription


private[impl] object CanceledSubscription extends Subscription {
override def request(n: Long) = ()
override def cancel() = ()
}
6 changes: 5 additions & 1 deletion src/main/scala/io/scalac/amqp/impl/ExchangeSubscriber.scala
Expand Up @@ -2,6 +2,7 @@ package io.scalac.amqp.impl

import java.util.concurrent.atomic.AtomicReference

import com.google.common.base.Preconditions.checkNotNull
import com.rabbitmq.client.Channel

import io.scalac.amqp.Routed
Expand Down Expand Up @@ -31,7 +32,10 @@ private[amqp] class ExchangeSubscriber(channel: Channel, exchange: String)
}

/** Our life cycle is bounded to underlying `Channel`. */
override def onError(t: Throwable) = channel.close()
override def onError(t: Throwable) = {
checkNotNull(t)
channel.close()
}

/** Our life cycle is bounded to underlying `Channel`. */
override def onComplete() = channel.close()
Expand Down
31 changes: 19 additions & 12 deletions src/main/scala/io/scalac/amqp/impl/QueuePublisher.scala
@@ -1,6 +1,7 @@
package io.scalac.amqp.impl

import scala.concurrent.stm.Ref
import scala.util.{Success, Failure, Try}
import scala.util.control.NonFatal

import com.rabbitmq.client.{ShutdownSignalException, ShutdownListener, Connection}
Expand Down Expand Up @@ -30,18 +31,24 @@ private[amqp] class QueuePublisher(
subscribers.single.getAndTransform(_ + subscriber) match {
case ss if ss.contains(subscriber)
throw new IllegalStateException(s"Rule 1.10: Subscriber=$subscriber is already subscribed to this publisher.")
case _ try {
val channel = connection.createChannel()
channel.addShutdownListener(newShutdownListener(subscriber))

val subscription = new QueueSubscription(channel, queue, subscriber)
subscriber.onSubscribe(subscription)

channel.basicQos(prefetch)
channel.basicConsume(queue, false, subscription)
} catch {
case NonFatal(exception) subscriber.onError(exception)
}
case _
Try(connection.createChannel()) match {
case Success(channel)
channel.addShutdownListener(newShutdownListener(subscriber))
val subscription = new QueueSubscription(channel, queue, subscriber)

try {
subscriber.onSubscribe(subscription)
channel.basicQos(prefetch)
channel.basicConsume(queue, false, subscription)
} catch {
case NonFatal(exception) subscriber.onError(exception)
}

case Failure(cause)
subscriber.onSubscribe(CanceledSubscription)
subscriber.onError(cause)
}
}

def newShutdownListener(subscriber: Subscriber[_ >: Delivery]) = new ShutdownListener {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/scalac/amqp/impl/QueueSubscription.scala
Expand Up @@ -36,8 +36,8 @@ private[amqp] class QueueSubscription(channel: Channel, queue: String, subscribe

def deliver(delivery: Delivery): Unit = try {
if(channel.isOpen()) {
subscriber.onNext(delivery)
channel.basicAck(delivery.deliveryTag.underlying, false)
subscriber.onNext(delivery)
}
} catch {
case NonFatal(exception)
Expand Down
Expand Up @@ -30,7 +30,7 @@ class ExchangeSubscriberBlackboxSpec(defaultTimeout: FiniteDuration) extends Sub

def createHelperSource(elements: Long): Source[Routed, Unit] = elements match {
/** if `elements` is 0 the `Publisher` should signal `onComplete` immediately. */
case 0 Source.empty()
case 0 Source.empty
/** if `elements` is [[Long.MaxValue]] the produced stream must be infinite. */
case Long.MaxValue Source(() Iterator.continually(message))
/** It must create a `Publisher` for a stream with exactly the given number of elements. */
Expand All @@ -39,5 +39,6 @@ class ExchangeSubscriberBlackboxSpec(defaultTimeout: FiniteDuration) extends Sub
case n sys.error("n > Int.MaxValue")
}

override def createHelperPublisher(elements: Long) = createHelperSource(elements).runWith(Sink.publisher())
override def createHelperPublisher(elements: Long) = createHelperSource(elements).runWith(Sink.publisher)
override def createElement(element: Int) = message
}
6 changes: 3 additions & 3 deletions src/test/scala/io/scalac/amqp/impl/QueuePublisherSpec.scala
Expand Up @@ -5,6 +5,8 @@ import scala.concurrent.duration._

import java.util.concurrent.atomic.AtomicLong

import akka.stream.scaladsl.{Source, Sink}

import com.rabbitmq.client.AMQP

import io.scalac.amqp.{Connection, Delivery}
Expand All @@ -25,8 +27,6 @@ class QueuePublisherSpec(defaultTimeout: FiniteDuration, publisherShutdownTimeou

/** Calls a function after passing n messages. */
def callAfterN(delegate: Publisher[Delivery], n: Long)(f: () Unit) = new Publisher[Delivery] {
require(n > 0)

override def subscribe(subscriber: Subscriber[_ >: Delivery]) =
delegate.subscribe(new Subscriber[Delivery] {
val counter = new AtomicLong()
Expand Down Expand Up @@ -79,7 +79,7 @@ class QueuePublisherSpec(defaultTimeout: FiniteDuration, publisherShutdownTimeou
conn.consume("whatever")
}

override def spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() = {
override def untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() = {
val queue = declareQueue()
val publisher = connection.consume(queue)

Expand Down

0 comments on commit fdba020

Please sign in to comment.