-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JMS: Adding connection timeouts and retries (fixes #1192 and #1193) #1194
Conversation
…at start and after failure.
getAsyncCallback[JmsSession] { session => | ||
jmsSessions :+= session | ||
onSessionOpened(session) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to SourceStageLogic
in JmsConsumerStage.scala
for this function only applies to consumers
case m: ActorMaterializer => m.system.dispatchers.lookup(dispatcher.dispatcher) | ||
case x => throw new IllegalArgumentException(s"Stage only works with the ActorMaterializer, was: $x") | ||
} | ||
ActorMaterializerHelper.downcast(materializer).system.dispatchers.lookup(dispatcher.dispatcher) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced with Akka library call providing the same functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Hmm, the build failure does not give me any indication what I could have messed up. |
No worries, it's not related to your changes. |
1 similar comment
Thanks for the heads up! No worries, I'll rebase when time comes. |
…connection exception issues using mock connections and artifacts
Added tests with mock connection failure and start issues. Note this change only affects consumers for now. A bit more work required to make it work with producers. |
The other JMS PR is now merged, you've compilation errors with Scala 2.11. |
We have been focusing on the consumer ( Making the publisher safely deal with reconnects (without message losses) is not quite trivial, especially with the sessions pooling and publishes in-flight. I am contemplating whether publisher reconnects should be a separate task/PR altogether. Please voice your opinion. |
Sounds good. Please, focus on the consumer side and leave the producer changes out. |
# Conflicts: # jms/src/main/scala/akka/stream/alpakka/jms/JmsConnector.scala # jms/src/main/scala/akka/stream/alpakka/jms/JmsProducerStage.scala # jms/src/test/java/akka/stream/alpakka/jms/javadsl/JmsConnectorsTest.java
@@ -552,12 +558,19 @@ class JmsConnectorsSpec extends JmsSpec with MockitoSugar { | |||
JmsProducerSettings(connectionFactory).withQueue("numbers") | |||
) | |||
|
|||
val completionFuture: Future[Done] = Source | |||
.failed[JmsTextMessage](new RuntimeException("Simulated error")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was unreliable. Since setting the jmsConnection
in JmsConnector
is done through an async callback, the early arrival of the failure will cause the jmsConnection
(which is not referenced) to not be closed. Need to cause the failure mid-stream instead of at the beginning for this test to be reliable.
case object Connected extends ConnectionStatus | ||
case object TimedOut extends ConnectionStatus | ||
|
||
protected def initSessionAsync(withReconnect: Boolean = true): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withReconnect
is set to false
for the publisher at this point, until we can be certain reconnects behaving correctly with the publisher. Then even the if
block below can be taken out.
# Conflicts: # jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsConnectorsSpec.scala
…e timing issues. The connection is ONLY used for shutdown. So no performance impact otherwise.
@@ -30,10 +35,6 @@ private[jms] trait JmsConnector[S <: JmsSession] { this: GraphStageLogic => | |||
|
|||
protected val fail: AsyncCallback[Throwable] = getAsyncCallback[Throwable](e => failStage(e)) | |||
|
|||
private val onConnection: AsyncCallback[jms.Connection] = getAsyncCallback[jms.Connection] { c => | |||
jmsConnection = Some(c) | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the connection setting using AsyncCallback
and set directly instead. Some tests rely on the connection to be immediately available after establishing and fail if not. We minimize the risk of such failure. Also connection is only accessed at startup and shutdown. Making it @volatile
does not incur runtime overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, apparently this is still not enough. That test is still failing despite tightening up the timeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Need to guarantee the connection is closed in an async system. To do so, we need to hold the connection in a future to ensure the connection will be closed even if it is resolved later than the postStop
is called. This should settle it. But this test has creeped the scope of change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work!
Yeah, these tests are hard to get super-stable.
if (status.compareAndSet(Connecting, TimedOut)) { | ||
connectionRef.get.foreach(_.close()) | ||
connectionRef.set(None) | ||
Future.failed(new TimeoutException("Timed out trying to establish connection")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could possibly add the applicable values in the error messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@andreas-schroeder Would you like to review, as well?
def maxWaitTime: Duration = | ||
if (maxRetries < 0) Duration.Inf | ||
else { | ||
val totalWaitTime = (0 to maxRetries).map(connectTimeout + waitTime(_)).reduce(_ + _) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super-cool idea to have that maxWaitTime on the settings to see how long it will wait in total!
How about
val totalWaitTime = (0 to maxRetries).map(connectTimeout + waitTime(_).min(maxBackoff)).reduce(_ + _)
to account for maxBackoff
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking through this, we do not need the maxWaitTime
any longer. It was used for establishing the blocking time waiting the connection in JmsProducerStage
. After rebasing from your change where the connection is handled asynchronously there, there is no longer a need for this method. Let me remove it.
|
||
} | ||
|
||
case class ConnectionRetryException(message: String, cause: Throwable) extends Exception(message, cause) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about moving that exception to JmsExceptions.scala
? I think either all exceptions should live there, or all live in Jms.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will do.
else Future.failed(ConnectionRetryException(s"Could not establish connection after $n retries.", t)) | ||
} else { | ||
val delay = if (maxed) maxBackoff else waitTime(nextN) | ||
if (delay >= maxBackoff) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just
val delay = waitTime(nextN).min(maxBackoff)
after(delay, system.scheduler) {
openConnectionWithRetry(startConnection, nextN)
}
? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since waitTime
is an exponential function, there is a very small chance a very large backoff may overflow, throwing this logic into haywire. That's why I really want to stop calculating the waitTime
after max is reached.
@@ -208,7 +208,7 @@ final class JmsTxSourceStage(settings: JmsConsumerSettings) | |||
abstract class SourceStageLogic[T](shape: SourceShape[T], | |||
out: Outlet[T], | |||
settings: JmsConsumerSettings, | |||
attributes: Attributes) | |||
inheritedAttributes: Attributes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great rename, that one was bothering me as well 👍
Throwable exception = tryResult.failed().get(); | ||
assertTrue( | ||
"Did not fail with a ConnectionRetryException", | ||
ConnectionRetryException.class.isAssignableFrom(exception.getClass())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ain't that just exception instanceof ConnectionRetryException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, you're absolutely right. Was looking for a better way to express as well. Tells me my Java is getting a little rusty :-) Let me make this change, and the next one.
ConnectionRetryException.class.isAssignableFrom(exception.getClass())); | ||
assertTrue( | ||
"Cause of failure is not a JMSException", | ||
JMSException.class.isAssignableFrom(exception.getCause().getClass())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exception.getCause() instanceof JMSException
, or am I wrong?
JMSException.class.isAssignableFrom(exception.getCause().getClass())); | ||
}); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 for that test. Great idea to check for the total time passed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've commented on some (really) minor things I found while reading through the PR that could be either fixed right now, or in a subsequent change on the fly.
I would love to either build on top of that or help out on implementing the producer send retries.
Great work.
Thank you very much for the comments @andreas-schroeder! Let me get the changes in by this weekend, before @ennru merges. Yes, we have to look at how to invalidate the in-flight publishes in case of a reconnect. Your expertise is greatly appreciated. |
…on failed (connection dropped after creation), plugging the time gap of potential session failure after connection established and is closed.
Thank you for investing your efforts into the JMS connector! |
All good for review now.