Skip to content

Commit

Permalink
Address reviewer comments
Browse files Browse the repository at this point in the history
- extended passthrough tests
- extended documentation of passthrough messages
  • Loading branch information
andreas-sa-schroeder committed Sep 17, 2018
1 parent 30ba5fc commit 9911170
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 6 deletions.
4 changes: 3 additions & 1 deletion docs/src/main/paradox/jms.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ Java
: @@snip [snip](/jms/src/test/java/akka/stream/alpakka/jms/javadsl/JmsConnectorsTest.java) { #run-flexi-flow-producer }

There are two implementations: One envelope type containing a messages to send to Jms, and one
envelope type containing only values to pass through:
envelope type containing only values to pass through. This allows messages to flow without producing any new messages
to Jms. This is primarily useful when committing offsets back to Kakfa, or when acknowledging Jms messages after sending
the outcome of processing them back to Jms.

Scala
: @@snip [snip](/jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsConnectorsSpec.scala) { #run-flexi-flow-pass-through-producer }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.KillSwitch;
import akka.stream.Materializer;
import akka.stream.alpakka.jms.*;
import akka.stream.alpakka.jms.JmsProducerMessage.*;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.javadsl.TestKit;
Expand Down Expand Up @@ -811,6 +813,15 @@ public void passThroughEmptyMessageEnvelopes() throws Exception {
withServer(
ctx -> {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url);

Pair<KillSwitch, CompletionStage<List<String>>> switchAndItems =
JmsConsumer.textSource(
JmsConsumerSettings.create(connectionFactory)
.withBufferSize(10)
.withQueue("test"))
.toMat(Sink.seq(), Keep.both())
.run(materializer);

// #run-flexi-flow-pass-through-producer
Flow<Envelope<JmsTextMessage, String>, Envelope<JmsTextMessage, String>, NotUsed>
jmsProducer =
Expand All @@ -831,6 +842,11 @@ public void passThroughEmptyMessageEnvelopes() throws Exception {
// #run-flexi-flow-pass-through-producer

assertEquals(data, result.toCompletableFuture().get());

Thread.sleep(500);

switchAndItems.first().shutdown();
assertTrue(switchAndItems.second().toCompletableFuture().get().isEmpty());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,11 +1128,11 @@ class JmsConnectorsSpec extends JmsSpec with MockitoSugar {
}

"pass through message envelopes" in withServer() { ctx =>
val producerConnectionFactory = new ActiveMQConnectionFactory(ctx.url)
val connectionFactory = new ActiveMQConnectionFactory(ctx.url)

//#run-flexi-flow-producer
val jmsProducer = JmsProducer.flexiFlow[JmsTextMessage, String](
JmsProducerSettings(producerConnectionFactory).withQueue("topic")
JmsProducerSettings(connectionFactory).withQueue("test")
)

val data = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
Expand All @@ -1142,14 +1142,31 @@ class JmsConnectorsSpec extends JmsSpec with MockitoSugar {
//#run-flexi-flow-producer

result.futureValue shouldEqual data

val sentData =
JmsConsumer
.textSource(JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test"))
.take(data.size)
.runWith(Sink.seq)

sentData.futureValue shouldEqual data
}

"pass through empty envelopes" in withServer() { ctx =>
val producerConnectionFactory = new ActiveMQConnectionFactory(ctx.url)
"pass through empty envelopes" in {
val connectionFactory = mock[ConnectionFactory]
val connection = mock[Connection]
val session = mock[Session]
val producer = mock[MessageProducer]
val message = mock[TextMessage]

when(connectionFactory.createConnection()).thenReturn(connection)
when(connection.createSession(anyBoolean(), anyInt())).thenReturn(session)
when(session.createProducer(any[javax.jms.Destination])).thenReturn(producer)
when(session.createTextMessage(any[String])).thenReturn(message)

//#run-flexi-flow-pass-through-producer
val jmsProducer = JmsProducer.flexiFlow[JmsTextMessage, String](
JmsProducerSettings(producerConnectionFactory).withQueue("topic")
JmsProducerSettings(connectionFactory).withQueue("topic")
)

val data = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
Expand All @@ -1159,6 +1176,9 @@ class JmsConnectorsSpec extends JmsSpec with MockitoSugar {
//#run-flexi-flow-pass-through-producer

result.futureValue shouldEqual data

verify(session, never()).createTextMessage(any[String])
verify(producer, never()).send(any[javax.jms.Destination], any[Message], any[Int], any[Int], any[Long])
}
}

Expand Down

0 comments on commit 9911170

Please sign in to comment.