-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow opt-out of using the ack-timeout. #1831
Conversation
@@ -134,7 +136,7 @@ object JmsConsumerSettings { | |||
val selector = getStringOption("selector") | |||
val acknowledgeMode = | |||
getOption("acknowledge-mode", c => AcknowledgeMode.from(c.getString("acknowledge-mode"))) | |||
val ackTimeout = c.getDuration("ack-timeout").asScala | |||
val ackTimeout = if (c.hasPath("ack-timeout")) c.getDuration("ack-timeout").asScala else Duration.Inf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ackTimeout
is used as a parameter in Await.result
:
val action = Await.result(envelope.commitFuture, settings.ackTimeout) |
Passing Duration.Inf
to Await.result
should be avoided, as that could potentially block forever. Therefore Await.result
should not be called at all, when the timeout is set to infinity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know the reason why Await.result is called at all at this point, is this because the commit/rollback can only be executed on the thread where the message was received? If that is the case then there is no other way. If not, then some other mechanism could be used to handle the timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did an experiment as follow with the following code instead of the Await.result
val commitFutureOrTimeout: Future[() => Unit] = settings.ackTimeout match {
case finiteAckTimeout: FiniteDuration =>
Source.fromFuture(envelope.commitFuture)
.idleTimeout(finiteAckTimeout)
.runWith(Sink.head)(materializer)
case _ => envelope.commitFuture
}
commitFutureOrTimeout.map(action => action())
.recover {
case _: TimeoutException => session.session.rollback()
case e => handleError.invoke(e)
}
Unfortunately this does not seem to work, it seems that the commit action can not be executed anymore after the onMessage() method has returned...
The error I am getting in this case is:
javax.jms.JMSException: COMMIT FAILED: Transaction marked rollback only xaErrorCode:100
Caused by: javax.transaction.xa.XAException: COMMIT FAILED: Transaction marked rollback only xaErrorCode:100
Caused by: javax.jms.TransactionRolledBackException: COMMIT FAILED: Transaction marked rollback only xaErrorCode:100
Caused by: javax.jms.JMSException: Unmatched acknowledge: MessageAck {commandId = 27, responseRequired = false, ackType = 2, consumerId = ID:MBP-50251-1564251273506-171:2:4:1, firstMessageId = ID:MBP-50251-1564251273506-171:1:1:1:7, lastMessageId = ID:MBP-50251-1564251273506-171:1:1:1:16, destination = queue://numbers, transactionId = TX:ID:MBP-50251-1564251273506-171:2:7, messageCount = 3, poisonCause = null}; Could not find Message-ID ID:MBP-50251-1564251273506-171:1:1:1:7 in dispatched-list (start of ack)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Paging JMS gurus @andreas-schroeder and @akara
Is it the case, that commit/rollback can only be performed from the same thread?
If so, then a timeout larger than a second but still finite would work for you @WellingR, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, it works for me because I can configure the timeout to a large-enough value. However in my opinion there is no "good" default value for this timeout, as alpakka-jms cannot know about the performance/timing characteristics of the application using the library.
An alternative improvment would be to throw an Exception when a commit is attempted on a message which has already been rolled back. (currently the commit action is ignored because no one is waiting for the commitFuture
anymore....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@2m yes, the JMS spec defines that commit/rollback must be done on the same thread. While some JMS implementations work without this constraint, IBM MQ is very strict about the spec and throws exceptions when called from a different thread.
To default to infinity won't be acceptable, and I believe the reporting of rollbacks should be improved instead. |
Thank you for bringing this to our attention. We'll merge #1857 to improve on this problem. |
The ack-timeout introduced in alpakka 1.0.0 is set to 1 second by default for TX Consumers.
This means that if no different timeout is configured and handling the message takes more than one second, then rollback is automatically called. Which surprised me (as I did not know of this default configuration)
In this PR I have disabled the ack-timeout by default. I could understand that this might lead to some discussion. And I would be happy to implement a better solution or different timeout.
One of the problems (which maybe could be addressed separately) is that calling
commit()
onTxEnvelope
does NOT throw an exception if the session was already rolled back due to the ack-timeout. This seems to be because thecommitFuture
of theTxEnvelope
is not completed.