Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JMS transactions: fail stream or log on ack timeouts #1857

Merged
merged 3 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions jms/src/main/mima-filters/1.1.1.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# PR #1857
# new field in settings
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.jms.JmsConsumerSettings.this")
3 changes: 3 additions & 0 deletions jms/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ alpakka.jms {
# Timeout for acknowledge.
# (Used by TX consumers.)
ack-timeout = 1 second
# For use with transactions, if true the stream fails if Alpakka rolls back the transaction
# when `ack-timeout` is hit.
fail-stream-on-ack-timeout = false
}
#consumer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ final class JmsConsumerSettings private (
val bufferSize: Int,
val selector: Option[String],
val acknowledgeMode: Option[AcknowledgeMode],
val ackTimeout: scala.concurrent.duration.Duration
val ackTimeout: scala.concurrent.duration.Duration,
val failStreamOnAckTimeout: Boolean
) extends akka.stream.alpakka.jms.JmsSettings {

/** Factory to use for creating JMS connections. */
Expand Down Expand Up @@ -72,6 +73,12 @@ final class JmsConsumerSettings private (
/** Java API: Timeout for acknowledge. (Used by TX consumers.) */
def withAckTimeout(value: java.time.Duration): JmsConsumerSettings = copy(ackTimeout = value.asScala)

/**
* For use with transactions, if true the stream fails if Alpakka rolls back the transaction when `ackTimeout` is hit.
*/
def withFailStreamOnAckTimeout(value: Boolean): JmsConsumerSettings =
if (failStreamOnAckTimeout == value) this else copy(failStreamOnAckTimeout = value)

private def copy(
connectionFactory: javax.jms.ConnectionFactory = connectionFactory,
connectionRetrySettings: ConnectionRetrySettings = connectionRetrySettings,
Expand All @@ -81,7 +88,8 @@ final class JmsConsumerSettings private (
bufferSize: Int = bufferSize,
selector: Option[String] = selector,
acknowledgeMode: Option[AcknowledgeMode] = acknowledgeMode,
ackTimeout: scala.concurrent.duration.Duration = ackTimeout
ackTimeout: scala.concurrent.duration.Duration = ackTimeout,
failStreamOnAckTimeout: Boolean = failStreamOnAckTimeout
): JmsConsumerSettings = new JmsConsumerSettings(
connectionFactory = connectionFactory,
connectionRetrySettings = connectionRetrySettings,
Expand All @@ -91,7 +99,8 @@ final class JmsConsumerSettings private (
bufferSize = bufferSize,
selector = selector,
acknowledgeMode = acknowledgeMode,
ackTimeout = ackTimeout
ackTimeout = ackTimeout,
failStreamOnAckTimeout = failStreamOnAckTimeout
)

override def toString =
Expand All @@ -104,7 +113,8 @@ final class JmsConsumerSettings private (
s"bufferSize=$bufferSize," +
s"selector=$selector," +
s"acknowledgeMode=${acknowledgeMode.map(m => AcknowledgeMode.asString(m))}," +
s"ackTimeout=${ackTimeout.toCoarsest}" +
s"ackTimeout=${ackTimeout.toCoarsest}," +
s"failStreamOnAckTimeout=$failStreamOnAckTimeout" +
")"
}

Expand Down Expand Up @@ -135,6 +145,7 @@ object JmsConsumerSettings {
val acknowledgeMode =
getOption("acknowledge-mode", c => AcknowledgeMode.from(c.getString("acknowledge-mode")))
val ackTimeout = c.getDuration("ack-timeout").asScala
val failStreamOnAckTimeout = c.getBoolean("fail-stream-on-ack-timeout")
new JmsConsumerSettings(
connectionFactory,
connectionRetrySettings,
Expand All @@ -144,7 +155,8 @@ object JmsConsumerSettings {
bufferSize,
selector,
acknowledgeMode,
ackTimeout
ackTimeout,
failStreamOnAckTimeout
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package akka.stream.alpakka.jms
import javax.jms

import scala.concurrent.TimeoutException
import scala.concurrent.duration.Duration
import scala.util.control.NoStackTrace

/**
Expand Down Expand Up @@ -56,3 +57,6 @@ final case class StopMessageListenerException() extends Exception("Stopping Mess
case object JmsNotConnected extends Exception("JmsConnector is not connected") with NoStackTrace

case class JmsConnectTimedOut(message: String) extends TimeoutException(message)

final class JmsTxAckTimeout(ackTimeout: Duration)
extends TimeoutException(s"The TxEnvelope didn't get committed or rolled back within ack-timeout ($ackTimeout)")
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package akka.stream.alpakka.jms.impl

import akka.annotation.InternalApi
import akka.stream.alpakka.jms.{AcknowledgeMode, Destination, JmsConsumerSettings, TxEnvelope}
import akka.stream.alpakka.jms.{AcknowledgeMode, Destination, JmsConsumerSettings, JmsTxAckTimeout, TxEnvelope}
import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue}
import akka.stream.{Attributes, Outlet, SourceShape}
import javax.jms
Expand Down Expand Up @@ -50,10 +50,23 @@ private[jms] final class JmsTxSourceStage(settings: JmsConsumerSettings, destina
try {
val envelope = TxEnvelope(message, session)
handleMessage.invoke(envelope)
val action = Await.result(envelope.commitFuture, settings.ackTimeout)
action()
try {
// JMS spec defines that commit/rollback must be done on the same thread.
// While some JMS implementations work without this constraint, IBM MQ is
// very strict about the spec and throws exceptions when called from a different thread.
val action = Await.result(envelope.commitFuture, settings.ackTimeout)
action()
} catch {
case _: TimeoutException =>
val exception = new JmsTxAckTimeout(settings.ackTimeout)
session.session.rollback()
if (settings.failStreamOnAckTimeout) {
handleError.invoke(exception)
} else {
log.warning(exception.getMessage)
}
}
} catch {
case _: TimeoutException => session.session.rollback()
case e: IllegalArgumentException => handleError.invoke(e) // Invalid envelope. Fail the stage.
case e: jms.JMSException => handleError.invoke(e)
}
Expand All @@ -63,7 +76,7 @@ private[jms] final class JmsTxSourceStage(settings: JmsConsumerSettings, destina

case _ =>
throw new IllegalArgumentException(
"Session must be of type JMSAckSession, it is a " +
"Session must be of type JmsAckSession, it is a " +
jmsSession.getClass.getName
)
}
Expand Down
49 changes: 48 additions & 1 deletion jms/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ class JmsTxConnectorsSpec extends JmsSpec {
Sink.foreach { env =>
val text = env.message.asInstanceOf[TextMessage].getText
if (r.nextInt(3) <= 1) {
// Artifially timing out this message
// Artificially timing out this message
Thread.sleep(20)
}
resultQueue.add(text)
Expand Down Expand Up @@ -571,5 +571,52 @@ class JmsTxConnectorsSpec extends JmsSpec {
// messages might get delivered more than once, use set to ignore duplicates
resultList.toSet should contain theSameElementsAs numsIn.map(_.toString)
}

"fail the stream when ack-timeout causes a rollback (and fail-stream-on-ack-timeout is true)" in {
withConnectionFactory() { connectionFactory =>
val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink(
JmsProducerSettings(producerConfig, connectionFactory).withQueue("numbers")
)

val publishKillSwitch = Source
.unfold(1)(n => Some(n + 1 -> n))
.throttle(15, 1.second, 2, ThrottleMode.shaping) // Higher than consumption rate.
.viaMat(KillSwitches.single)(Keep.right)
.alsoTo(Flow[Int].map(n => JmsTextMessage(n.toString).withProperty("Number", n)).to(jmsSink))
.toMat(Sink.ignore)(Keep.left)
.run()

val jmsSource: Source[TxEnvelope, JmsConsumerControl] = JmsConsumer.txSource(
JmsConsumerSettings(consumerConfig, connectionFactory)
.withSessionCount(5)
.withQueue("numbers")
.withAckTimeout(10.millis)
.withFailStreamOnAckTimeout(true)
)

val r = new java.util.Random

val (killSwitch, streamDone) = jmsSource
.throttle(10, 1.second, 2, ThrottleMode.shaping)
.toMat(
Sink.foreach { env =>
if (r.nextInt(3) <= 1) {
// Artificially timing out this message
Thread.sleep(20)
}
env.commit()
}
)(Keep.both)
.run()

// Need to wait for the stream to have started and running for sometime.
Thread.sleep(3000)

killSwitch.shutdown()

streamDone.failed.futureValue shouldBe a[JmsTxAckTimeout]
publishKillSwitch.shutdown()
}
}
}
}