-
Notifications
You must be signed in to change notification settings - Fork 647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Jms connection status #1252
Jms connection status #1252
Conversation
This PR extends #1227, don't merge this before the former is merged/approved. |
Finally, travis failed with the test I expeced to fail (due to re-connection failing): https://travis-ci.org/akka/alpakka/jobs/435270299#L690 |
745a6ef
to
92b1192
Compare
rebased on top of master |
I am wondering if exposing a boolean as a connectivity status is useful enough. An alternative would be to expose a WDYT? |
The boolean connectivity status would be enough for my use case, as I don't need immediate connectivity updates - having a delay of 1 second would be fine for me. That said, I see having a stream of connectivity updates could also be beneficial if a fast response to connectivity changes is needed, so let's explore how this could look like. How would you suggest expose the |
92b1192
to
13b0456
Compare
I think it should be materialized value. Together with a KillSwitch, like you have connectivity status currently. |
Hi @2m, I've spiked the change and one challenge is that the source needs to be pre-materialized, and that can only happen during |
How are you building the status source? If it is, for example, a Alternatively, to convert from val fs = Future[Source[T, ...]]
val s: Source[T, ...] = Source.fromFuture(fs).flatMapConcat(identity) |
@2m the Future conversion is elegant, will do! 👍It will take me a couple to assemble something to show; the |
f59e84f
to
6896007
Compare
@2m here we go - I rebased on master and squashed everything into one commit. Have a look and tell me what you think. I will add a couple more tests for connection status, and for correct handling of reconnection failures on each stage (creating connection, session, consumer/producer, and destination). |
Here are some more tests. I was thinking to replace the status update |
So I went ahead and did the split of "stopping" into "failing" and "completing". I've also updated the docs and updated all factory methods in For the Java-API, this is a breaking change. This PR is complete from my side, please have a look. |
Stream-based connection status Connection status is now exposed as stream of updates. For the Java interface, a Java enum is exposed, for the Scala interface, a sealed trait with case objects and a case class is used. Fix reconnect attempts Now also retry to connect (for a limited amount of times and after delay) if creating sessions, consumers, producers or destinations fail Fix flaky 'disconnect exceptional completion' test Fixes akka#1231 Fixes akka#1031
- Add tests for connection status and reconnects on session creation failures - Connecting status attempt number was off-by-one for JmsConsumer - Session creation failure was not properly handled
- Also made sure that published states are natural: no intermediate disconnected state is published on failure - Also improved logic handling consumer downstream completion - Also ensured that access to state and state queue is safe - Added more tests for connection status
- Breaking change: using JmsConsumerControl and JmsProducerStatus in all JmsProducer and JmsConsumer factory methods - Adjusted and extended docs - Also renamed stage state source to connectorState - Also fixed flaky test "abort connection on security exception" - Also simplified openConnection method
- Reverted openConnection simplification, as it did not handle exceptions on connection.setExceptionListener - Added test that demonstrates the simplification was invalid - Removed unnecessary broker forced restart in Spec
e9d2889
to
82b235a
Compare
rebased once more on top of master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Great work @andreas-schroeder!
Thanks @2m - also for your input, I think this PR wouldn't be in that shape without your ideas. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
source: stream.scaladsl.Source[jms.scaladsl.JmsConnectorState, NotUsed] | ||
): Source[JmsConnectorState, NotUsed] = | ||
source.map { | ||
case Disconnected => JmsConnectorState.Disconnected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the Scala states could contain the Java state and offer asJava
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, that introduce a dependency from the scaladsl to the javadsl, would that be fine for you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, even if the dependency goes the other direction for most cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented with 6a406b7
Thank you for your continued efforts with the JMS connector. |
when is the next Milestone release? |
This is my proposal for JMS connection status.
Fixes #1251