Skip to content

Commit

Permalink
Stop blocking in publishers
Browse files Browse the repository at this point in the history
STM protected publish buffer for ordered and asynchronous delivery.
Double close protection for channel.
  • Loading branch information
LGLO authored and mkiedys committed Jan 27, 2016
1 parent 4e28148 commit 09072f7
Showing 1 changed file with 63 additions and 11 deletions.
74 changes: 63 additions & 11 deletions src/main/scala/io/scalac/amqp/impl/ExchangeSubscriber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,88 @@ import com.rabbitmq.client.Channel
import io.scalac.amqp.Routed
import org.reactivestreams.{Subscriber, Subscription}

import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.stm.{Ref, atomic}
import scala.util.control.NonFatal

private[amqp] class ExchangeSubscriber(channel: Channel, exchange: String)
extends Subscriber[Routed] {
require(exchange.length <= 255, "exchange.length > 255")

val active = new AtomicReference[Subscription]()
val publishingThreadRunning = Ref(false)
val buffer = Ref(Queue[Routed]())
val closeRequested = Ref(false)

override def onSubscribe(subscription: Subscription) =
override def onSubscribe(subscription: Subscription): Unit =
active.compareAndSet(null, subscription) match {
case true subscription.request(1)
case false subscription.cancel() // 2.5: cancel
}

override def onNext(routed: Routed) = {
channel.basicPublish(
exchange,
routed.routingKey,
Conversions.toBasicProperties(routed.message),
routed.message.body.toArray)
active.get().request(1)
override def onNext(routed: Routed): Unit = {
requireNonNull(routed) // 2.13
val running = atomic { implicit txn =>
buffer.transform(_ :+ routed)
publishingThreadRunning.getAndTransform(_ => true)
}
if (!running) {
Future(publishFromBuffer())
}
}

@tailrec
private def publishFromBuffer(): Unit = {
val headOpt = buffer.single.transformAndExtract(q => (q.tail, q.headOption))
headOpt.foreach(publish)
val continue = atomic { implicit txn =>
publishingThreadRunning.transformAndGet(_ => buffer().nonEmpty)
}
if (continue) {
publishFromBuffer()
}
}

private def publish(routed: Routed): Unit = {
try {
channel.basicPublish(
exchange,
routed.routingKey,
Conversions.toBasicProperties(routed.message),
routed.message.body.toArray)
active.get().request(1)
} catch {
case NonFatal(exception) => // 2.6
active.get().cancel()
closeChannel()
}
}

/** Double check before calling `close`. Second `close` on channel kills connection.*/
private def closeChannel(): Unit = {
if (closeRequested.single.compareAndSet(false, true) && channel.isOpen()) {
channel.close()
}
}

/** Our life cycle is bounded to underlying `Channel`. */
override def onError(t: Throwable) = {
override def onError(t: Throwable): Unit = {
requireNonNull(t)
channel.close()
shutdownWhenFinished()
}

/** Our life cycle is bounded to underlying `Channel`. */
override def onComplete() = channel.close()
override def onComplete(): Unit = shutdownWhenFinished()

private def shutdownWhenFinished(): Unit = {
Future {
publishingThreadRunning.single.await(!_)
closeChannel()
}
}

override def toString = s"ExchangeSubscriber(channel=$channel, exchange=$exchange)"
}

0 comments on commit 09072f7

Please sign in to comment.