-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
jms: close sessions on connection loss #3041
Conversation
jmsSessions = Seq.empty | ||
} | ||
|
||
protected def closeSessionsAsync(): Future[Unit] = { | ||
val closing = Future | ||
.sequence { | ||
jmsSessions.map(s => Future(closeSession(s))) | ||
jmsSessions.map(s => Future { closeSession(s) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a strong preference for curly braces around Future.apply
calls to make them visually stand out.
} | ||
.map(_ => ()) | ||
.flatMap(_ => Future.unit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this was written before Future.unit
existed, but since we're optimizing memory...
log.warning(exception.getMessage) | ||
} | ||
override protected def onSessionOpened(consumerSession: JmsConsumerSession): Unit = | ||
consumerSession |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clearer code through trusting the Scala compiler...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -135,7 +135,8 @@ private[jms] trait JmsConnector[S <: JmsSession] extends TimerGraphStageLogic wi | |||
} | |||
|
|||
private def handleRetriableException(ex: Throwable): Unit = { | |||
jmsSessions = Seq.empty | |||
closeSessions() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the bug fix
.onComplete(sessionOpenedCB.invoke) | ||
|
||
case _ => | ||
throw new IllegalArgumentException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was wondering why the compiler wasn't warning this was unreachable... turns out that a null
would be the only value which hits this... the new code would NPE on createConsumer
which seems to be preferable (you go straight to "why was the argument null").
Scala 3 should complain about this case being unreachable (with the warning saying that it's only reachable in the null case, so recommend explicitly matching null
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
* Converted from Alpakka JMS * Jakarta Messaging 3.1 * jakarta.jms package instead of javax.jms * fix: close sessions on connection loss, #3041 * add to autolabeler and check-build-test * sort build projects * extensive rewrite of tests to use ActiveMQ Artemis * because support for Jakarta Messaging was addd in ActiveMQ 6.0, which requires JDK 17 * Text improvements * ActiveMQ Artemis doc adjustments --------- Co-authored-by: Enno <458526+ennru@users.noreply.github.com>
References #3038.
In case of connection loss, we clear the list of current connections without closing them. In the consumer case, this results in the message processing callback holding onto the session because it closes over it. Closing the session will close the consumers and free the callback.