Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jms passthrough envelope #1212

Merged
merged 4 commits into from
Sep 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/src/main/paradox/jms.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,29 @@ Java

When no destination is defined on the message, the destination given in the producer settings is used.

### Passing context through the producer

In some use cases, it is useful to pass through context information when producing (e.g. for acknowledging or committing
messages after sending to Jms). For this, the `JmsProducer.flexiFlow` accepts implementations of `JmsProducerEnvelope`,
which it will pass through.

Scala
: @@snip [snip](/jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsConnectorsSpec.scala) { #run-flexi-flow-producer }

Java
: @@snip [snip](/jms/src/test/java/akka/stream/alpakka/jms/javadsl/JmsConnectorsTest.java) { #run-flexi-flow-producer }

There are two implementations: One envelope type containing a messages to send to Jms, and one
envelope type containing only values to pass through. This allows messages to flow without producing any new messages
to Jms. This is primarily useful when committing offsets back to Kakfa, or when acknowledging Jms messages after sending
the outcome of processing them back to Jms.

Scala
: @@snip [snip](/jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsConnectorsSpec.scala) { #run-flexi-flow-pass-through-producer }

Java
: @@snip [snip](/jms/src/test/java/akka/stream/alpakka/jms/javadsl/JmsConnectorsTest.java) { #run-flexi-flow-pass-through-producer }

### Configuring the Producer

Scala
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.jms

object JmsProducerMessage {

sealed trait Envelope[+M <: JmsMessage, +PassThrough] {
def passThrough: PassThrough
}

def message[M <: JmsMessage, PassThrough](message: M, passThrough: PassThrough): Envelope[M, PassThrough] =
Message(message, passThrough)

def passThroughMessage[M <: JmsMessage, PassThrough](passThrough: PassThrough): Envelope[M, PassThrough] =
PassThroughMessage(passThrough)

final case class Message[+M <: JmsMessage, +PassThrough](message: M, passThrough: PassThrough)
extends Envelope[M, PassThrough]

final case class PassThroughMessage[+M <: JmsMessage, +PassThrough](passThrough: PassThrough)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to document the use case for PassThroughMessage similar to the Scaladocs in the Kafka case, or document it in the main documentation. Otherwise it may not be clear when and why to use this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll add a section in the docs explaining that this allows to ack messages consumed without producing, and without the need for fan-out flows.

extends Envelope[M, PassThrough]
}
57 changes: 35 additions & 22 deletions jms/src/main/scala/akka/stream/alpakka/jms/JmsProducerStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.stream.alpakka.jms
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream._
import akka.stream.alpakka.jms.JmsProducerStage._
import akka.stream.alpakka.jms.JmsProducerMessage._
import akka.stream.impl.{Buffer, ReactiveStreamsCompliance}
import akka.stream.stage._
import akka.util.OptionVal
Expand All @@ -15,13 +16,14 @@ import scala.concurrent.Future
import scala.util.control.{NoStackTrace, NonFatal}
import scala.util.{Failure, Success, Try}

private[jms] final class JmsProducerStage[A <: JmsMessage](settings: JmsProducerSettings)
extends GraphStage[FlowShape[A, A]] {
private[jms] final class JmsProducerStage[A <: JmsMessage, PassThrough](settings: JmsProducerSettings)
extends GraphStage[FlowShape[Envelope[A, PassThrough], Envelope[A, PassThrough]]] {

private val in = Inlet[A]("JmsProducer.in")
private val out = Outlet[A]("JmsProducer.out")
private type E = Envelope[A, PassThrough]
private val in = Inlet[E]("JmsProducer.in")
private val out = Outlet[E]("JmsProducer.out")

override def shape: FlowShape[A, A] = FlowShape.of(in, out)
override def shape: FlowShape[E, E] = FlowShape.of(in, out)

override protected def initialAttributes: Attributes =
ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")
Expand All @@ -42,7 +44,8 @@ private[jms] final class JmsProducerStage[A <: JmsMessage](settings: JmsProducer
private val jmsProducers: Buffer[JmsMessageProducer] = Buffer(settings.sessionCount, settings.sessionCount)

// in-flight messages with the producers that were used to send them.
private val inFlightMessagesWithProducer: Buffer[Holder[A]] = Buffer(settings.sessionCount, settings.sessionCount)
private val inFlightMessagesWithProducer: Buffer[Holder[E]] =
Buffer(settings.sessionCount, settings.sessionCount)

protected def jmsSettings: JmsProducerSettings = settings

Expand Down Expand Up @@ -72,17 +75,24 @@ private[jms] final class JmsProducerStage[A <: JmsMessage](settings: JmsProducer
override def onUpstreamFinish(): Unit = if (inFlightMessagesWithProducer.isEmpty) completeStage()

override def onPush(): Unit = {
val elem: A = grab(in)
// fetch a jms producer from the pool, and create a holder object to capture the in-flight message.
val jmsProducer = jmsProducers.dequeue()
val holder = new Holder(NotYetThere, futureCB, jmsProducer)
inFlightMessagesWithProducer.enqueue(holder)

// send the element asynchronously, notifying the holder of (successful or failed) completion.
Future {
jmsProducer.send(elem)
elem
}.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
val elem: E = grab(in)
elem match {
case m: Message[_, _] =>
// fetch a jms producer from the pool, and create a holder object to capture the in-flight message.
val jmsProducer = jmsProducers.dequeue()
val holder = new Holder[E](NotYetThere, futureCB, Some(jmsProducer))
inFlightMessagesWithProducer.enqueue(holder)

// send the element asynchronously, notifying the holder of (successful or failed) completion.
Future {
jmsProducer.send(m.message)
elem
}.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
case p: PassThroughMessage[_, _] =>
val holder = new Holder[E](NotYetThere, futureCB, None)
inFlightMessagesWithProducer.enqueue(holder)
holder(Success(elem))
}

// immediately ask for the next element if producers are available.
pullIfNeeded()
Expand All @@ -95,7 +105,7 @@ private[jms] final class JmsProducerStage[A <: JmsMessage](settings: JmsProducer
jmsConnection.foreach(_.close)
}

private val futureCB = getAsyncCallback[Holder[A]](
private val futureCB = getAsyncCallback[Holder[E]](
holder =>
holder.elem match {
case Success(_) => pushNextIfPossible() // on success, try to push out the new element.
Expand All @@ -104,7 +114,10 @@ private[jms] final class JmsProducerStage[A <: JmsMessage](settings: JmsProducer
)

private def pullIfNeeded(): Unit =
if (jmsProducers.nonEmpty && !hasBeenPulled(in)) tryPull(in) // only pull if a producer is available in the pool.
if (jmsProducers.nonEmpty // only pull if a producer is available in the pool.
&& !inFlightMessagesWithProducer.isFull // and a place is available in the in-flight queue.
&& !hasBeenPulled(in))
tryPull(in)

private def pushNextIfPossible(): Unit =
if (inFlightMessagesWithProducer.isEmpty) {
Expand All @@ -115,7 +128,7 @@ private[jms] final class JmsProducerStage[A <: JmsMessage](settings: JmsProducer
pullIfNeeded()
} else if (isAvailable(out)) {
val holder = inFlightMessagesWithProducer.dequeue()
jmsProducers.enqueue(holder.jmsProducer) // put back jms producer to the pool.
holder.jmsProducer.foreach(jmsProducers.enqueue) // put back jms producer to the pool.
holder.elem match {
case Success(elem) =>
push(out, elem)
Expand All @@ -125,7 +138,7 @@ private[jms] final class JmsProducerStage[A <: JmsMessage](settings: JmsProducer
}
}

private def handleFailure(ex: Throwable, holder: Holder[A]): Unit =
private def handleFailure(ex: Throwable, holder: Holder[E]): Unit =
holder.supervisionDirectiveFor(decider, ex) match {
case Supervision.Stop => failStage(ex) // fail only if supervision asks for it.
case _ => pushNextIfPossible()
Expand All @@ -142,7 +155,7 @@ private[jms] object JmsProducerStage {
*
* To get a condensed view of what the Holder is about, have a look there too.
*/
class Holder[A <: JmsMessage](var elem: Try[A], val cb: AsyncCallback[Holder[A]], val jmsProducer: JmsMessageProducer)
class Holder[A](var elem: Try[A], val cb: AsyncCallback[Holder[A]], val jmsProducer: Option[JmsMessageProducer])
extends (Try[A] => Unit) {

// To support both fail-fast when the supervision directive is Stop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.stream.alpakka.jms.javadsl
import java.util.concurrent.CompletionStage

import akka.stream.alpakka.jms.{JmsMessage, JmsProducerSettings}
import akka.stream.alpakka.jms.JmsProducerMessage._
import akka.stream.scaladsl.{Flow, Keep}
import akka.{Done, NotUsed}

Expand All @@ -23,6 +24,14 @@ object JmsProducer {
): akka.stream.javadsl.Flow[R, R, NotUsed] =
akka.stream.alpakka.jms.scaladsl.JmsProducer.flow(settings).asJava

/**
* Java API: Creates an [[JmsProducer]] for [[Envelope]]s
*/
def flexiFlow[R <: JmsMessage, PassThrough](
settings: JmsProducerSettings
): akka.stream.javadsl.Flow[Envelope[R, PassThrough], Envelope[R, PassThrough], NotUsed] =
akka.stream.alpakka.jms.scaladsl.JmsProducer.flexiFlow[R, PassThrough](settings).asJava

/**
* Java API: Creates an [[JmsProducer]] for [[JmsMessage]]s
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.stream.alpakka.jms.scaladsl
import akka.{Done, NotUsed}
import akka.stream.alpakka.jms._
import akka.stream.scaladsl.{Flow, Keep, Sink}
import akka.stream.alpakka.jms.JmsProducerMessage.Envelope

import scala.concurrent.Future

Expand All @@ -16,6 +17,20 @@ object JmsProducer {
* Scala API: Creates an [[JmsProducer]] for [[JmsMessage]]s
*/
def flow[T <: JmsMessage](settings: JmsProducerSettings): Flow[T, T, NotUsed] = {
require(settings.destination.isDefined, "Producer destination must be defined in producer flow")
Flow[T]
.map(m => JmsProducerMessage.Message(m, NotUsed))
.via(Flow.fromGraph(new JmsProducerStage(settings)))
.collectType[JmsProducerMessage.Message[T, NotUsed]]
.map(_.message)
}

/**
* Scala API: Creates an [[JmsProducer]] for [[Envelope]]s
*/
def flexiFlow[T <: JmsMessage, PassThrough](
settings: JmsProducerSettings
): Flow[Envelope[T, PassThrough], Envelope[T, PassThrough], NotUsed] = {
require(settings.destination.isDefined, "Producer destination must be defined in producer flow")
Flow.fromGraph(new JmsProducerStage(settings))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.KillSwitch;
import akka.stream.Materializer;
import akka.stream.alpakka.jms.*;
import akka.stream.alpakka.jms.JmsProducerMessage.*;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.javadsl.TestKit;
Expand Down Expand Up @@ -778,6 +781,75 @@ public void failAfterRetry() throws Exception {
});
}

@Test
public void passThroughMessageEnvelopes() throws Exception {
withServer(
ctx -> {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url);
// #run-flexi-flow-producer
Flow<Envelope<JmsTextMessage, String>, Envelope<JmsTextMessage, String>, NotUsed>
jmsProducer =
JmsProducer.flexiFlow(
JmsProducerSettings.create(connectionFactory).withQueue("test"));

List<String> data = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
List<Envelope<JmsTextMessage, String>> input = new ArrayList<>();
for (String s : data) {
input.add(JmsProducerMessage.message(JmsTextMessage.create(s), s));
}

CompletionStage<List<String>> result =
Source.from(input)
.via(jmsProducer)
.map(Envelope::passThrough)
.runWith(Sink.seq(), materializer);
// #run-flexi-flow-producer
assertEquals(data, result.toCompletableFuture().get());
});
}

@Test
public void passThroughEmptyMessageEnvelopes() throws Exception {
withServer(
ctx -> {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url);

Pair<KillSwitch, CompletionStage<List<String>>> switchAndItems =
JmsConsumer.textSource(
JmsConsumerSettings.create(connectionFactory)
.withBufferSize(10)
.withQueue("test"))
.toMat(Sink.seq(), Keep.both())
.run(materializer);

// #run-flexi-flow-pass-through-producer
Flow<Envelope<JmsTextMessage, String>, Envelope<JmsTextMessage, String>, NotUsed>
jmsProducer =
JmsProducer.flexiFlow(
JmsProducerSettings.create(connectionFactory).withQueue("test"));

List<String> data = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
List<Envelope<JmsTextMessage, String>> input = new ArrayList<>();
for (String s : data) {
input.add(JmsProducerMessage.passThroughMessage(s));
}

CompletionStage<List<String>> result =
Source.from(input)
.via(jmsProducer)
.map(Envelope::passThrough)
.runWith(Sink.seq(), materializer);
// #run-flexi-flow-pass-through-producer

assertEquals(data, result.toCompletableFuture().get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to assert that none of these pass-through envelopes are published?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add an assertion for that


Thread.sleep(500);

switchAndItems.first().shutdown();
assertTrue(switchAndItems.second().toCompletableFuture().get().isEmpty());
});
}

private static ActorSystem system;
private static Materializer materializer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,60 @@ class JmsConnectorsSpec extends JmsSpec with MockitoSugar {
// Due to buffering, it will finish re-connecting before stream finishes.
connectCount shouldBe 5
}

"pass through message envelopes" in withServer() { ctx =>
val connectionFactory = new ActiveMQConnectionFactory(ctx.url)

//#run-flexi-flow-producer
val jmsProducer = JmsProducer.flexiFlow[JmsTextMessage, String](
JmsProducerSettings(connectionFactory).withQueue("test")
)

val data = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
val in = data.map(t => JmsProducerMessage.Message(JmsTextMessage(t), t))

val result = Source(in).via(jmsProducer).map(_.passThrough).runWith(Sink.seq)
//#run-flexi-flow-producer

result.futureValue shouldEqual data

val sentData =
JmsConsumer
.textSource(JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test"))
.take(data.size)
.runWith(Sink.seq)

sentData.futureValue shouldEqual data
}

"pass through empty envelopes" in {
val connectionFactory = mock[ConnectionFactory]
val connection = mock[Connection]
val session = mock[Session]
val producer = mock[MessageProducer]
val message = mock[TextMessage]

when(connectionFactory.createConnection()).thenReturn(connection)
when(connection.createSession(anyBoolean(), anyInt())).thenReturn(session)
when(session.createProducer(any[javax.jms.Destination])).thenReturn(producer)
when(session.createTextMessage(any[String])).thenReturn(message)

//#run-flexi-flow-pass-through-producer
val jmsProducer = JmsProducer.flexiFlow[JmsTextMessage, String](
JmsProducerSettings(connectionFactory).withQueue("topic")
)

val data = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
val in = data.map(t => JmsProducerMessage.PassThroughMessage(t))

val result = Source(in).via(jmsProducer).map(_.passThrough).runWith(Sink.seq)
//#run-flexi-flow-pass-through-producer

result.futureValue shouldEqual data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to mock this one instead so you can actually collect the messages that are published vs the messages that are not published. You can achieve the same with creating a consumer to validate your messages as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean mocking the connection factory (and session etc) and asserting that send was never invoked?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I'm thinking. Thanks!


verify(session, never()).createTextMessage(any[String])
verify(producer, never()).send(any[javax.jms.Destination], any[Message], any[Int], any[Int], any[Long])
}
}

"publish and subscribe with a durable subscription" in withServer() { ctx =>
Expand Down