Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jms connection status #1252

Merged
merged 9 commits into from
Oct 30, 2018
8 changes: 4 additions & 4 deletions doc-examples/src/main/java/jms/javasamples/JmsToFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.IOResult;
import akka.stream.KillSwitch;
import akka.stream.Materializer;
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsConsumerControl;
import akka.stream.alpakka.jms.javadsl.JmsProducer;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Keep;
Expand Down Expand Up @@ -57,22 +57,22 @@ private void run() throws Exception {
enqueue(connectionFactory, "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
// #sample

Source<String, KillSwitch> jmsSource = // (1)
Source<String, JmsConsumerControl> jmsSource = // (1)
JmsConsumer.textSource(
JmsConsumerSettings.create(connectionFactory).withBufferSize(10).withQueue("test"));

Sink<ByteString, CompletionStage<IOResult>> fileSink =
FileIO.toPath(Paths.get("target/out.txt")); // (2)

Pair<KillSwitch, CompletionStage<IOResult>> pair =
Pair<JmsConsumerControl, CompletionStage<IOResult>> pair =
jmsSource // : String
.map(ByteString::fromString) // : ByteString (3)
.toMat(fileSink, Keep.both())
.run(materializer);

// #sample

KillSwitch runningSource = pair.first();
JmsConsumerControl runningSource = pair.first();
CompletionStage<IOResult> streamCompletion = pair.second();

runningSource.shutdown();
Expand Down
14 changes: 7 additions & 7 deletions doc-examples/src/main/java/jms/javasamples/JmsToHttpGet.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,23 @@
package jms.javasamples;

// #sample

import akka.Done;
import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.KillSwitch;
import akka.stream.Materializer;
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsConsumerControl;
import akka.stream.alpakka.jms.javadsl.JmsProducer;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

// #sample

import playground.ActiveMqBroker;
import playground.WebServer;
import scala.concurrent.ExecutionContext;
Expand All @@ -32,6 +30,8 @@
import java.util.Arrays;
import java.util.concurrent.CompletionStage;

// #sample

public class JmsToHttpGet {

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -62,12 +62,12 @@ private void run() throws Exception {

final Http http = Http.get(system);

Source<String, KillSwitch> jmsSource = // (1)
Source<String, JmsConsumerControl> jmsSource = // (1)
JmsConsumer.textSource(
JmsConsumerSettings.create(connectionFactory).withBufferSize(10).withQueue("test"));

int parallelism = 4;
Pair<KillSwitch, CompletionStage<Done>> pair =
Pair<JmsConsumerControl, CompletionStage<Done>> pair =
jmsSource // : String
.map(ByteString::fromString) // : ByteString (2)
.map(
Expand All @@ -79,7 +79,7 @@ private void run() throws Exception {
.run(materializer);
// #sample
Thread.sleep(5 * 1000);
KillSwitch runningSource = pair.first();
JmsConsumerControl runningSource = pair.first();
CompletionStage<Done> streamCompletion = pair.second();

runningSource.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsConsumerControl;
import akka.stream.alpakka.jms.javadsl.JmsProducer;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Keep;
Expand Down Expand Up @@ -56,12 +57,12 @@ private void run() throws Exception {
enqueue(connectionFactory, "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
// #sample

Source<String, KillSwitch> jmsConsumer = // (1)
Source<String, JmsConsumerControl> jmsConsumer = // (1)
JmsConsumer.textSource(
JmsConsumerSettings.create(connectionFactory).withBufferSize(10).withQueue("test"));

int parallelism = 5;
Pair<KillSwitch, CompletionStage<Done>> pair =
Pair<JmsConsumerControl, CompletionStage<Done>> pair =
jmsConsumer // : String
.map(ByteString::fromString) // : ByteString (2)
.zipWithIndex() // : Pair<ByteString, Long> (3)
Expand Down
51 changes: 25 additions & 26 deletions doc-examples/src/main/java/jms/javasamples/JmsToWebSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,27 @@
package jms.javasamples;

// #sample

import akka.Done;
import akka.actor.ActorSystem;
import akka.japi.Pair;

import akka.http.javadsl.Http;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.model.ws.WebSocketRequest;
import akka.http.javadsl.model.ws.WebSocketUpgradeResponse;

import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.KillSwitch;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsConsumerControl;
import akka.stream.alpakka.jms.javadsl.JmsProducer;

import java.util.concurrent.CompletionStage;
// #sample
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import playground.ActiveMqBroker;
import playground.WebServer;
import scala.concurrent.ExecutionContext;
Expand All @@ -39,6 +34,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

// #sample

public class JmsToWebSocket {

Expand Down Expand Up @@ -70,28 +68,29 @@ private void run() throws Exception {

final Http http = Http.get(system);

Source<String, KillSwitch> jmsSource = // (1)
Source<String, JmsConsumerControl> jmsSource = // (1)
JmsConsumer.textSource(
JmsConsumerSettings.create(connectionFactory).withBufferSize(10).withQueue("test"));

Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> webSocketFlow = // (2)
http.webSocketClientFlow(WebSocketRequest.create("ws://localhost:8080/webSocket/ping"));

int parallelism = 4;
Pair<Pair<KillSwitch, CompletionStage<WebSocketUpgradeResponse>>, CompletionStage<Done>> pair =
jmsSource // : String
.map(
s -> {
Message msg = TextMessage.create(s);
return msg;
}) // : Message (3)
.viaMat(webSocketFlow, Keep.both()) // : Message (4)
.mapAsync(parallelism, this::wsMessageToString) // : String (5)
.map(s -> "client received: " + s) // : String (6)
.toMat(Sink.foreach(System.out::println), Keep.both()) // (7)
.run(materializer);
Pair<Pair<JmsConsumerControl, CompletionStage<WebSocketUpgradeResponse>>, CompletionStage<Done>>
pair =
jmsSource // : String
.map(
s -> {
Message msg = TextMessage.create(s);
return msg;
}) // : Message (3)
.viaMat(webSocketFlow, Keep.both()) // : Message (4)
.mapAsync(parallelism, this::wsMessageToString) // : String (5)
.map(s -> "client received: " + s) // : String (6)
.toMat(Sink.foreach(System.out::println), Keep.both()) // (7)
.run(materializer);
// #sample
KillSwitch runningSource = pair.first().first();
JmsConsumerControl runningSource = pair.first().first();
CompletionStage<WebSocketUpgradeResponse> wsUpgradeResponse = pair.first().second();
CompletionStage<Done> streamCompletion = pair.second();

Expand Down
8 changes: 4 additions & 4 deletions doc-examples/src/main/scala/jms/JmsToFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ package jms
// #sample
import java.nio.file.Paths

import akka.stream.IOResult
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.JmsConsumer
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{FileIO, Keep, Sink, Source}
import akka.stream.{IOResult, KillSwitch}
import akka.util.ByteString

import scala.concurrent.Future
Expand All @@ -28,15 +28,15 @@ object JmsToFile extends JmsSampleBase with App {
// format: off
// #sample

val jmsSource: Source[String, KillSwitch] = // (1)
val jmsSource: Source[String, JmsConsumerControl] = // (1)
JmsConsumer.textSource(
JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
)

val fileSink: Sink[ByteString, Future[IOResult]] = // (2)
FileIO.toPath(Paths.get("target/out.txt"))

val (runningSource, finished): (KillSwitch, Future[IOResult]) =
val (runningSource, finished): (JmsConsumerControl, Future[IOResult]) =
// stream element type
jmsSource //: String
.map(ByteString(_)) //: ByteString (3)
Expand Down
7 changes: 3 additions & 4 deletions doc-examples/src/main/scala/jms/JmsToHttpGet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ package jms
import akka.Done
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.KillSwitch
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.JmsConsumer
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString

Expand All @@ -30,12 +29,12 @@ object JmsToHttpGet extends JmsSampleBase with App {

// format: off
// #sample
val jmsSource: Source[String, KillSwitch] = // (1)
val jmsSource: Source[String, JmsConsumerControl] = // (1)
JmsConsumer.textSource(
JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
)

val (runningSource, finished): (KillSwitch, Future[Done]) =
val (runningSource, finished): (JmsConsumerControl, Future[Done]) =
jmsSource //: String
.map(ByteString(_)) //: ByteString (2)
.map { bs =>
Expand Down
5 changes: 2 additions & 3 deletions doc-examples/src/main/scala/jms/JmsToOneFilePerMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ package jms
// #sample
import java.nio.file.Paths

import akka.stream.KillSwitch
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.JmsConsumer
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{FileIO, Keep, Sink, Source}
import akka.util.ByteString

Expand All @@ -27,7 +26,7 @@ object JmsToOneFilePerMessage extends JmsSampleBase with App {
// format: off
// #sample

val jmsSource: Source[String, KillSwitch] = // (1)
val jmsSource: Source[String, JmsConsumerControl] = // (1)
JmsConsumer.textSource(
JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
)
Expand Down
14 changes: 6 additions & 8 deletions doc-examples/src/main/scala/jms/JmsToWebSocket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,16 @@ import akka.Done
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws.{WebSocketRequest, WebSocketUpgradeResponse}

import akka.stream.KillSwitch
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.JmsConsumer
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import scala.concurrent.Future
// #sample
import scala.concurrent.duration.DurationInt
import playground.{ActiveMqBroker, WebServer}

import scala.concurrent.duration.DurationInt

object JmsToWebSocket extends JmsSampleBase with App {

ActiveMqBroker.start()
Expand All @@ -32,15 +30,15 @@ object JmsToWebSocket extends JmsSampleBase with App {
// format: off
// #sample

val jmsSource: Source[String, KillSwitch] =
val jmsSource: Source[String, JmsConsumerControl] =
JmsConsumer.textSource( // (1)
JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
)

val webSocketFlow: Flow[ws.Message, ws.Message, Future[WebSocketUpgradeResponse]] = // (2)
Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080/webSocket/ping"))

val ((runningSource, wsUpgradeResponse), streamCompletion): ((KillSwitch, Future[WebSocketUpgradeResponse]), Future[Done]) =
val ((runningSource, wsUpgradeResponse), streamCompletion): ((JmsConsumerControl, Future[WebSocketUpgradeResponse]), Future[Done]) =
// stream element type
jmsSource //: String
.map(ws.TextMessage(_)) //: ws.TextMessage (3)
Expand Down
16 changes: 14 additions & 2 deletions docs/src/main/paradox/jms.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ Java
If a send operation finally fails, the stage also fails unless a different supervision strategy is applied. The
producer stage honours stream supervision.

### Observing connectivity and state of a JMS producer

All JMS producer's materialized values are of type `JmsProducerStatus`. This provides a `connectorState` method returning
a `Source` of `JmsConnectorState` updates that publishes connection attempts, disconnections, completions and failures.
The source is completed after the JMS producer completes or fails.

## Receiving messages from a JMS provider

@java[@scaladoc[JmsConsumer](akka.stream.alpakka.jms.javadsl.JmsConsumer$)]@scala[@scaladoc[JmsConsumer](akka.stream.alpakka.jms.scaladsl.JmsConsumer$)] contains factory methods to facilitate
Expand Down Expand Up @@ -618,9 +624,15 @@ Java

### Stopping a JMS Source

All JMS sources materialize to a `KillSwitch` to allow safely stopping consumption without message loss for transactional and acknowledged messages, and with minimal message loss for the simple JMS source.
All JMS sources materialize to a `JmsConsumerControl` to allow safely stopping consumption without message loss for transactional and acknowledged messages, and with minimal message loss for the simple JMS source.

To stop consumption safely, call `shutdown()` on the `JmsConsumerControl` that is the materialized value of the source. To abruptly abort consumption (without concerns for message loss), call `abort(Throwable)` on the `JmsConsumerControl`.

### Observing connectivity and state of a JMS Source

To stop consumption safely, call `shutdown()` on the `KillSwitch` that is the materialized value of the source. To abruptly abort consumption (without concerns for message loss), call `abort(Throwable)` on the `KillSwitch`.
All JMS sources' materialized values are `JmsConsumerControl`s. This provides a `connectorState` method returning
a `Source` of `JmsConnectorState` updates that publishes connection attempts, disconnections, completions and failures.
The source is completed after the JMS source completes or fails.

## Using IBM MQ

Expand Down
2 changes: 0 additions & 2 deletions jms/src/main/scala/akka/stream/alpakka/jms/Jms.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,5 +206,3 @@ final case class JmsBrowseSettings(connectionFactory: ConnectionFactory,
def withAcknowledgeMode(acknowledgeMode: AcknowledgeMode): JmsBrowseSettings =
copy(acknowledgeMode = Option(acknowledgeMode))
}

final case class StopMessageListenerException() extends Exception("Stopping MessageListener.")
Loading