Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jms per message destination #1181

Merged
merged 12 commits into from
Sep 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/src/main/paradox/jms.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@ Scala
Java
: @@snip [snip](/jms/src/test/java/akka/stream/alpakka/jms/javadsl/JmsConnectorsTest.java) { #run-flow-producer }

### Sending messages with per-message destinations

It is also possible to define message destinations per message:

Scala
: @@snip [snip](/jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsConnectorsSpec.scala) { #run-directed-flow-producer }

Java
: @@snip [snip](/jms/src/test/java/akka/stream/alpakka/jms/javadsl/JmsConnectorsTest.java) { #run-directed-flow-producer }

When no destination is defined on the message, the destination given in the producer settings is used.

### Configuring the Producer

Scala
Expand Down
12 changes: 6 additions & 6 deletions jms/src/main/scala/akka/stream/alpakka/jms/Jms.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ sealed trait JmsSettings {

sealed trait Destination {
val name: String
val create: (jms.Session) => jms.Destination
val create: jms.Session => jms.Destination
}
final case class Topic(override val name: String) extends Destination {
override val create: (jms.Session) => jms.Destination = session => session.createTopic(name)
override val create: jms.Session => jms.Destination = session => session.createTopic(name)
}
final case class DurableTopic(name: String, subscriberName: String) extends Destination {
override val create: (jms.Session) => jms.Destination = session => session.createTopic(name)
override val create: jms.Session => jms.Destination = session => session.createTopic(name)
}
final case class Queue(override val name: String) extends Destination {
override val create: (jms.Session) => jms.Destination = session => session.createQueue(name)
override val create: jms.Session => jms.Destination = session => session.createQueue(name)
}
final case class CustomDestination(override val name: String, override val create: (jms.Session) => jms.Destination)
final case class CustomDestination(override val name: String, override val create: jms.Session => jms.Destination)
extends Destination

final class AcknowledgeMode(val mode: Int)
Expand Down Expand Up @@ -78,7 +78,7 @@ final case class JmsConsumerSettings(connectionFactory: ConnectionFactory,
def withBufferSize(size: Int): JmsConsumerSettings = copy(bufferSize = size)
def withQueue(name: String): JmsConsumerSettings = copy(destination = Some(Queue(name)))
def withTopic(name: String): JmsConsumerSettings = copy(destination = Some(Topic(name)))
def withDurableTopic(name: String, subscriberName: String) =
def withDurableTopic(name: String, subscriberName: String): JmsConsumerSettings =
copy(destination = Some(DurableTopic(name, subscriberName)))
def withDestination(destination: Destination): JmsConsumerSettings = copy(destination = Some(destination))
def withSelector(selector: String): JmsConsumerSettings = copy(selector = Some(selector))
Expand Down
166 changes: 105 additions & 61 deletions jms/src/main/scala/akka/stream/alpakka/jms/JmsConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,38 @@ package akka.stream.alpakka.jms

import java.util.concurrent.ArrayBlockingQueue

import javax.jms
import javax.jms._
import akka.stream.{ActorAttributes, ActorMaterializer, Attributes}
import akka.stream.alpakka.jms.impl.SoftReferenceCache
import akka.stream.stage.{AsyncCallback, GraphStageLogic}
import akka.stream.{ActorAttributes, ActorMaterializer, Attributes}
import javax.jms
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to keep the jms prefix to distinguish things.


import scala.concurrent.{ExecutionContext, Future}

/**
* Internal API
*/
private[jms] trait JmsConnector { this: GraphStageLogic =>
private[jms] trait JmsConnector[S <: JmsSession] { this: GraphStageLogic =>

implicit protected var ec: ExecutionContext = _

protected var jmsConnection: Option[Connection] = None
protected var jmsConnection: Option[jms.Connection] = None

protected var jmsSessions = Seq.empty[JmsSession]
protected var jmsSessions = Seq.empty[S]

protected def jmsSettings: JmsSettings

protected def onSessionOpened(jmsSession: JmsSession): Unit = {}
protected def onSessionOpened(jmsSession: S): Unit = {}

protected def fail: AsyncCallback[Throwable] = getAsyncCallback[Throwable](e => failStage(e))
protected val fail: AsyncCallback[Throwable] = getAsyncCallback[Throwable](e => failStage(e))

private def onConnection = getAsyncCallback[Connection] { c =>
private val onConnection: AsyncCallback[jms.Connection] = getAsyncCallback[jms.Connection] { c =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning these up.

jmsConnection = Some(c)
}

private def onSession =
getAsyncCallback[JmsSession] { session =>
jmsSessions :+= session
onSessionOpened(session)
}
private val onSession: AsyncCallback[S] = getAsyncCallback[S] { session =>
jmsSessions :+= session
onSessionOpened(session)
}

protected def executionContext(attributes: Attributes): ExecutionContext = {
val dispatcher = attributes.get[ActorAttributes.Dispatcher](
Expand All @@ -55,82 +54,119 @@ private[jms] trait JmsConnector { this: GraphStageLogic =>
}
}

protected def initSessionAsync(executionContext: ExecutionContext): Future[Unit] = {
protected def initSessionAsync(executionContext: ExecutionContext): Unit = {
ec = executionContext
val future = Future {
val sessions = openSessions()
sessions foreach { session =>
onSession.invoke(session)
}
}
future.onFailure {
case e: Exception => fail.invoke(e)
}
future
val sessions: Seq[Future[S]] = openSessions()
val allSessions = Future.sequence(sessions)
allSessions.failed.foreach(fail.invoke)
// wait for all sessions to successfully initialize before invoking the onSession callback.
// reduces flakiness (start, consume, then crash) at the cost of increased latency of startup.
allSessions.foreach(_.foreach(onSession.invoke))
}

protected def createSession(connection: Connection, createDestination: jms.Session => jms.Destination): JmsSession
def openSessions(): Seq[Future[S]]

protected def openSessions(): Seq[JmsSession] = {
def openConnection(): jms.Connection = {
val factory = jmsSettings.connectionFactory
val connection = jmsSettings.credentials match {
case Some(Credentials(username, password)) => factory.createConnection(username, password)
case _ => factory.createConnection()
}
connection.setExceptionListener(new ExceptionListener {
override def onException(exception: JMSException) =
connection.setExceptionListener(new jms.ExceptionListener {
override def onException(exception: jms.JMSException): Unit =
fail.invoke(exception)
})
connection.start()
onConnection.invoke(connection)
connection
}
}

private[jms] trait JmsConsumerConnector extends JmsConnector[JmsConsumerSession] { this: GraphStageLogic =>

protected def createSession(connection: jms.Connection,
createDestination: jms.Session => jms.Destination): JmsConsumerSession

def openSessions(): Seq[Future[JmsConsumerSession]] = {
val connection = openConnection()
connection.start()

val createDestination = jmsSettings.destination match {
case Some(destination) =>
destination.create
case Some(destination) => destination.create
case _ => throw new IllegalArgumentException("Destination is missing")
}

0 until jmsSettings.sessionCount map { _ =>
createSession(connection, createDestination)
for (_ <- 0 until jmsSettings.sessionCount)
yield Future(createSession(connection, createDestination))
}
}

private[jms] trait JmsProducerConnector extends JmsConnector[JmsProducerSession] { this: GraphStageLogic =>

private def createSession(connection: jms.Connection,
createDestination: jms.Session => jms.Destination): JmsProducerSession = {
val session = connection.createSession(false, AcknowledgeMode.AutoAcknowledge.mode)
new JmsProducerSession(connection, session, createDestination(session))
}

def openSessions(): Seq[Future[JmsProducerSession]] = {
val connection = openConnection()

val createDestination = jmsSettings.destination match {
case Some(destination) => destination.create
case _ => throw new IllegalArgumentException("Destination is missing")
}

for (_ <- 0 until jmsSettings.sessionCount)
yield Future(createSession(connection, createDestination))
}
}

private[jms] object JmsMessageProducer {
def apply(jmsSession: JmsSession, settings: JmsProducerSettings): JmsMessageProducer = {
val producer = jmsSession.session.createProducer(jmsSession.jmsDestination)
def apply(jmsSession: JmsProducerSession, settings: JmsProducerSettings): JmsMessageProducer = {
val producer = jmsSession.session.createProducer(null)
if (settings.timeToLive.nonEmpty) {
producer.setTimeToLive(settings.timeToLive.get.toMillis)
}
new JmsMessageProducer(producer, jmsSession)
}
}

private[jms] class JmsMessageProducer(jmsProducer: MessageProducer, jmsSession: JmsSession) {
private[jms] class JmsMessageProducer(jmsProducer: jms.MessageProducer, jmsSession: JmsProducerSession) {

private val defaultDestination = jmsSession.jmsDestination

private val destinationCache = new SoftReferenceCache[Destination, jms.Destination]()

def send(elem: JmsMessage): Unit = {
val message: Message = createMessage(elem)
val message: jms.Message = createMessage(elem)
populateMessageProperties(message, elem)

val (sendHeaders, headersBeforeSend: Set[JmsHeader]) = elem.headers.partition(_.usedDuringSend)
populateMessageHeader(message, headersBeforeSend)

val deliveryModeOption = findHeader(sendHeaders) { case x: JmsDeliveryMode => x.deliveryMode }
val priorityOption = findHeader(sendHeaders) { case x: JmsPriority => x.priority }
val timeToLiveInMillisOption = findHeader(sendHeaders) { case x: JmsTimeToLive => x.timeInMillis }
val deliveryMode = sendHeaders
.collectFirst { case x: JmsDeliveryMode => x.deliveryMode }
.getOrElse(jmsProducer.getDeliveryMode)

jmsProducer.send(
message,
deliveryModeOption.getOrElse(jmsProducer.getDeliveryMode),
priorityOption.getOrElse(jmsProducer.getPriority),
timeToLiveInMillisOption.getOrElse(jmsProducer.getTimeToLive)
)
val priority = sendHeaders
.collectFirst { case x: JmsPriority => x.priority }
.getOrElse(jmsProducer.getPriority)

val timeToLive = sendHeaders
.collectFirst { case x: JmsTimeToLive => x.timeInMillis }
.getOrElse(jmsProducer.getTimeToLive)

elem.destination match {
case Some(messageDestination) =>
jmsProducer.send(lookup(messageDestination), message, deliveryMode, priority, timeToLive)
case None =>
jmsProducer.send(defaultDestination, message, deliveryMode, priority, timeToLive)
}
}

private def findHeader[T](headersDuringSend: Set[JmsHeader])(f: PartialFunction[JmsHeader, T]): Option[T] =
headersDuringSend.collectFirst(f)
private def lookup(dest: Destination) = destinationCache.lookup(dest, dest.create(jmsSession.session))

private[jms] def createMessage(element: JmsMessage): Message =
private[jms] def createMessage(element: JmsMessage): jms.Message =
element match {

case textMessage: JmsTextMessage => jmsSession.session.createTextMessage(textMessage.body)
Expand All @@ -150,7 +186,7 @@ private[jms] class JmsMessageProducer(jmsProducer: MessageProducer, jmsSession:
}

private[jms] def populateMessageProperties(message: javax.jms.Message, jmsMessage: JmsMessage): Unit =
jmsMessage.properties().foreach {
jmsMessage.properties.foreach {
case (key, v) =>
v match {
case v: String => message.setStringProperty(key, v)
Expand Down Expand Up @@ -190,10 +226,11 @@ private[jms] class JmsMessageProducer(jmsProducer: MessageProducer, jmsSession:
}
}

private[jms] class JmsSession(val connection: jms.Connection,
val session: jms.Session,
val jmsDestination: jms.Destination,
val settingsDestination: Destination) {
private[jms] sealed trait JmsSession {

def connection: jms.Connection

def session: jms.Session

private[jms] def closeSessionAsync()(implicit ec: ExecutionContext): Future[Unit] = Future { closeSession() }

Expand All @@ -202,11 +239,18 @@ private[jms] class JmsSession(val connection: jms.Connection,
private[jms] def abortSessionAsync()(implicit ec: ExecutionContext): Future[Unit] = Future { abortSession() }

private[jms] def abortSession(): Unit = closeSession()
}

private[jms] def createProducer()(implicit ec: ExecutionContext): Future[jms.MessageProducer] =
Future {
session.createProducer(jmsDestination)
}
private[jms] class JmsProducerSession(val connection: jms.Connection,
val session: jms.Session,
val jmsDestination: jms.Destination)
extends JmsSession

private[jms] class JmsConsumerSession(val connection: jms.Connection,
val session: jms.Session,
val jmsDestination: jms.Destination,
val settingsDestination: Destination)
extends JmsSession {

private[jms] def createConsumer(
selector: Option[String]
Expand All @@ -233,7 +277,7 @@ private[jms] class JmsAckSession(override val connection: jms.Connection,
override val jmsDestination: jms.Destination,
override val settingsDestination: Destination,
val maxPendingAcks: Int)
extends JmsSession(connection, session, jmsDestination, settingsDestination) {
extends JmsConsumerSession(connection, session, jmsDestination, settingsDestination) {

private[jms] var pendingAck = 0
private[jms] val ackQueue = new ArrayBlockingQueue[() => Unit](maxPendingAcks + 1)
Expand All @@ -254,7 +298,7 @@ private[jms] class JmsTxSession(override val connection: jms.Connection,
override val session: jms.Session,
override val jmsDestination: jms.Destination,
override val settingsDestination: Destination)
extends JmsSession(connection, session, jmsDestination, settingsDestination) {
extends JmsConsumerSession(connection, session, jmsDestination, settingsDestination) {

private[jms] val commitQueue = new ArrayBlockingQueue[TxEnvelope => Unit](1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@ private[jms] final class JmsConsumerStage(settings: JmsConsumerSettings)
private val backpressure = new Semaphore(bufferSize)

protected def createSession(connection: Connection,
createDestination: Session => javax.jms.Destination): JmsSession = {
createDestination: Session => javax.jms.Destination): JmsConsumerSession = {
val session =
connection.createSession(false, settings.acknowledgeMode.getOrElse(AcknowledgeMode.AutoAcknowledge).mode)
new JmsSession(connection, session, createDestination(session), settings.destination.get)
new JmsConsumerSession(connection, session, createDestination(session), settings.destination.get)
}

protected def pushMessage(msg: Message): Unit = {
push(out, msg)
backpressure.release()
}

override protected def onSessionOpened(jmsSession: JmsSession): Unit =
override protected def onSessionOpened(jmsSession: JmsConsumerSession): Unit =
jmsSession
.createConsumer(settings.selector)
.onComplete {
Expand Down Expand Up @@ -90,7 +90,7 @@ final class JmsAckSourceStage(settings: JmsConsumerSettings)

protected def pushMessage(msg: AckEnvelope): Unit = push(out, msg)

override protected def onSessionOpened(jmsSession: JmsSession): Unit =
override protected def onSessionOpened(jmsSession: JmsConsumerSession): Unit =
jmsSession match {
case session: JmsAckSession =>
session.createConsumer(settings.selector).onComplete {
Expand Down Expand Up @@ -167,7 +167,7 @@ final class JmsTxSourceStage(settings: JmsConsumerSettings)

protected def pushMessage(msg: TxEnvelope): Unit = push(out, msg)

override protected def onSessionOpened(jmsSession: JmsSession): Unit =
override protected def onSessionOpened(jmsSession: JmsConsumerSession): Unit =
jmsSession match {
case session: JmsTxSession =>
session.createConsumer(settings.selector).onComplete {
Expand Down Expand Up @@ -210,7 +210,7 @@ abstract class SourceStageLogic[T](shape: SourceShape[T],
settings: JmsConsumerSettings,
attributes: Attributes)
extends GraphStageLogic(shape)
with JmsConnector
with JmsConsumerConnector
with StageLogging {

override protected def jmsSettings: JmsConsumerSettings = settings
Expand Down
Loading