-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Jms producer retries #1227
Jms producer retries #1227
Conversation
@@ -84,6 +84,30 @@ final case class ConnectionRetrySettings(connectTimeout: FiniteDuration = 10.sec | |||
def withMaxBackoff(maxBackoff: Long, unit: TimeUnit): ConnectionRetrySettings = | |||
copy(maxBackoff = Duration(maxBackoff, unit)) | |||
def withMaxRetries(maxRetries: Int): ConnectionRetrySettings = copy(maxRetries = maxRetries) | |||
def withInfiniteRetries(): ConnectionRetrySettings = withMaxRetries(-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some helper methods for indefinite retries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
final case class SendRetrySettings(initialRetry: FiniteDuration = 50.millis, | ||
backoffFactor: Double = 2, | ||
maxBackoff: FiniteDuration = 10.seconds, | ||
maxRetries: Int = 10) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose the default send retries to be much more tight than the reconnect retries. If other default settings make more sense, we should provide those.
@@ -154,7 +181,9 @@ final case class JmsProducerSettings(connectionFactory: ConnectionFactory, | |||
copy(acknowledgeMode = Option(acknowledgeMode)) | |||
} | |||
|
|||
final case class Credentials(username: String, password: String) | |||
final case class Credentials(username: String, password: String) { | |||
override def toString = s"Credentials($username,${"*" * password.length})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*-ing out passwords since they would be printed out otherwise in error messages I added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good idea to mask out the credentials logging. Thanks for doing this.
private[jms] final class JmsBrowseStage(settings: JmsBrowseSettings) extends GraphStage[SourceShape[Message]] { | ||
private val queue = settings.destination.getOrElse { throw new IllegalArgumentException("Destination is missing") } | ||
|
||
private[jms] final class JmsBrowseStage(settings: JmsBrowseSettings, queue: Destination) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Destination is now checked on flow creation, and passed explicitly into the stages
val allSessions = openSessions(failureHandler) | ||
allSessions.failed.foreach(failureHandler) | ||
val allSessions = openSessions() | ||
allSessions.failed.foreach(connectionFailedCB.invoke) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here, I think we must switch back to the graph stage threads with the asyncCallback
onConnectionFailure(ex) | ||
override def onException(ex: jms.JMSException): Unit = { | ||
Try(connection.close()) // best effort closing the connection. | ||
connectionFailedCB.invoke(ex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here also, I think we must switch back to the graph stage threads with the asyncCallback. This however changed the amount of reconnects in the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it is future-based, I'm not to worried about switching back. But this also works. Thanks!
val createDestination = jmsSettings.destination match { | ||
case Some(destination) => destination.create | ||
case _ => throw new IllegalArgumentException("Destination is missing") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is now finally gone :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking this out.
// wait for all sessions to successfully initialize before invoking the onSession callback. | ||
// reduces flakiness (start, consume, then crash) at the cost of increased latency of startup. | ||
allSessions.foreach(_.foreach(onSession.invoke)) | ||
} | ||
|
||
def openSessions(onConnectionFailure: jms.JMSException => Unit): Future[Seq[S]] | ||
def connectionFailed(ex: Throwable): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JmsProducer needs to do a bit more work on connection failure (i.e. purge the old producers), therefore the abstract method.
jmsProducer.send(defaultDestination, message, deliveryMode, priority, timeToLive) | ||
val destination = elem.destination match { | ||
case Some(messageDestination) => lookup(messageDestination) | ||
case None => defaultDestination |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here also, just simplified some code
Future.sequence(sessionFutures) | ||
}(ExecutionContexts.sameThreadExecutionContext) | ||
} | ||
|
||
private[jms] object JmsMessageProducer { | ||
def apply(jmsSession: JmsProducerSession, settings: JmsProducerSettings): JmsMessageProducer = { | ||
def apply(jmsSession: JmsProducerSession, settings: JmsProducerSettings, epoch: Int): JmsMessageProducer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The epoch is used to track whether the producer is stale; on each disconnect, the epoch is incremented. By remembering the creation epoch, old producers can be discarded once the in-flight message lands.
@@ -37,7 +37,7 @@ private[jms] final class JmsConsumerStage(settings: JmsConsumerSettings) | |||
createDestination: Session => javax.jms.Destination): JmsConsumerSession = { | |||
val session = | |||
connection.createSession(false, settings.acknowledgeMode.getOrElse(AcknowledgeMode.AutoAcknowledge).mode) | |||
new JmsConsumerSession(connection, session, createDestination(session), settings.destination.get) | |||
new JmsConsumerSession(connection, session, createDestination(session), destination) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No more .get
necessary either :)
@@ -8,16 +8,19 @@ import akka.stream.ActorAttributes.SupervisionStrategy | |||
import akka.stream._ | |||
import akka.stream.alpakka.jms.JmsProducerStage._ | |||
import akka.stream.alpakka.jms.JmsProducerMessage._ | |||
import akka.stream.impl.{Buffer, ReactiveStreamsCompliance} | |||
import akka.stream.impl.Buffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
produced messages can't be null, so no need for the reactive streams compliance check
@@ -29,7 +32,7 @@ private[jms] final class JmsProducerStage[A <: JmsMessage, PassThrough](settings | |||
ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher") | |||
|
|||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |||
new GraphStageLogic(shape) with JmsProducerConnector { | |||
new TimerGraphStageLogic(shape) with JmsProducerConnector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need some timers for scheduling retries
// available producers for sending messages. Initially full, but might contain less elements if | ||
// messages are currently in-flight. | ||
private val jmsProducers: Buffer[JmsMessageProducer] = Buffer(settings.sessionCount, settings.sessionCount) | ||
|
||
// in-flight messages with the producers that were used to send them. | ||
private val inFlightMessagesWithProducer: Buffer[Holder[E]] = | ||
private val inFlightMessages: Buffer[Holder[E]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The in-flight messages don't store the producers any more. Producer handling is now de-coupled from the messages.
jmsSessions = Seq.empty | ||
jmsProducers.clear() | ||
currentJmsProducerEpoch += 1 | ||
initSessionAsync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On disconnect, purge all producers on the landing deck, and increase the epoch so that the in-flight producers get purged once they land.
inFlightMessagesWithProducer.enqueue(holder) | ||
val holder = new Holder[E](NotYetThere) | ||
inFlightMessages.enqueue(holder) | ||
sendWithRetries(SendAttempt[E](m, holder)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted the send logic to a separate method that we can call, and storing the information we need for re-trying in the SendAttempt
class
holder(Success(elem)) | ||
pushNextIfPossible() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we're already in the graph stage thread, we can continue with handling the pass-through message directly. Also, the holder.apply
method got stripped from the futureCB
, as it got replaced with the sendCompletedCB
async callback.
case tried => sendCompletedCB.invoke((send, tried, jmsProducer)) | ||
} | ||
} else { | ||
nextTryOrFail(new RuntimeException("JmsProducer is not connected, skipping send attempt"), send) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potentially, the send attempt can happen while the producer is disconnected. Here, this is handled as a regular attempt that counts towards the max retries.
case Failure(t) => | ||
holder(Failure(t)) | ||
handleFailure(t, holder) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This here is the replacement for the previous futureCB
.
holder.elem match { | ||
case Success(elem) => | ||
push(out, elem) | ||
pullIfNeeded() // Ask for the next element. | ||
|
||
case Failure(NonFatal(ex)) => handleFailure(ex, holder) | ||
case Failure(ex) => handleFailure(ex, holder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
everything that is wrapped in a Failure
is NonFatal
(except you really try hard)
@@ -155,8 +206,7 @@ private[jms] object JmsProducerStage { | |||
* | |||
* To get a condensed view of what the Holder is about, have a look there too. | |||
*/ | |||
class Holder[A](var elem: Try[A], val cb: AsyncCallback[Holder[A]], val jmsProducer: Option[JmsMessageProducer]) | |||
extends (Try[A] => Unit) { | |||
class Holder[A](var elem: Try[A]) extends (Try[A] => Unit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
future callback is not needed, producer is handled outside of the message, so the holder got a lot simpler.
|
||
object JmsConsumer { | ||
|
||
/** | ||
* Java API: Creates an [[JmsConsumer]] for [[javax.jms.Message]] | ||
*/ | ||
def create(settings: JmsConsumerSettings): akka.stream.javadsl.Source[Message, KillSwitch] = | ||
akka.stream.javadsl.Source.fromGraph(new JmsConsumerStage(settings)) | ||
akka.stream.alpakka.jms.scaladsl.JmsConsumer.apply(settings).asJava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the Scala code to avoid having to duplicate the require
's
Source.fromGraph(new JmsConsumerStage(settings)) | ||
def apply(settings: JmsConsumerSettings): Source[Message, KillSwitch] = { | ||
require(settings.destination.isDefined, noConsumerDestination(settings)) | ||
Source.fromGraph(new JmsConsumerStage(settings, settings.destination.get)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, every time a consumer or producer is created, the destination is checked and explicitly passed to the stages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, settings.destination.get
might tick off some static analysis tools as it is considered unsafe for the compiler. This is because the compiler does not check we have just called settings.destination.isDefined
just the line above. It may be safer to use extraction (and the static analysis tools definitely like it better), as follows:
settings.destination match {
case None => throw new IllegalArgumentException(noConsumerDestination(settings))
case Some(dest) => Source.fromGraph(new JmsConsumerStage(settings, dest))
}
def time(m: Map[String, Any]) = m("time").asInstanceOf[Long] | ||
resultList.filter(b => time(b) > restartTime) shouldNot be(empty) | ||
resultList.sliding(2).forall(pair => index(pair.head) + 1 == index(pair.last)) shouldBe true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for sure not the only test we need, more will follow.
One note: in this version the connection status callback logic of the consumer is racy (as the failing test luckily indicated). I've fixed that locally, and can push that tonight. However, by now, I think that exposing the connection status via a materialized value and allow for polling that is the more straightforward solution. |
Yes, exposing the connection status needs to be something else than just futures as it may flip more than once. It could materialize a source streaming connect/disconnect events, but I guess that could be too much for most scenarios. |
The exposed interface was obviously also not the best ;) the futures were created and invoked on each connect/disconnect. Concerning the intended use-case of connectivity status: imagine you are running your app in Kubernetes, and want to signal readiness based on Jms connectivity status (Kubernetes readiness checks: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/#define-readiness-probes ). Before the re-connect facility, it was "easy" (i.e. possible) to create a responsive Kubernetes readiness check based on stream failures. With the re-connect facility, there is no way to know the connectivity status of the consumer or producer when using infinite reconnects. Similarly, there is no way to build a responsive readiness check that uses a couple connect retries, as the failure will only propagate once the connect retries are exhausted. So to be able to use the re-connect feature and at the same time being able to build a readiness check, I need to expose / access the Jms connection status. |
So, fixing the re-connect failure cases with retries is not as straight-forward, as the failing tests indicate. We need to extend the scope of connect retries, and I think we won't get around handling the current state of the consumer as well (running vs. shutting down) in order to not retry connecting when the consumer stage is completing. So for this PR, I see two options to continue:
I would favour the second option. Note that introducing connectivity status in What's your opinion? |
Yes, please revert to the safe-point you describe and continue on the re-connect and connectivity status in a separate PR. Regarding the API changes: we'll release Alpakka 1.0-M1 today which will contain the |
Which, finally, makes it ready for review. |
- switching back to event loop on connection failure (that was missing) - changed the logic of sending to attempt retries at configurable time intervals - attempts while disconnected are counted towards max retries
- Invoke handleFailure when giving up on retries - Fix retry count off-by-one error - Improve send retry delays and max backoff - Added more tests for send retry - Removed println from test supervisor - Fix produce elements in order test
cd5305b
to
854a7f4
Compare
rebased on top of master |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Just one little nitpick regarding the comment.
@@ -72,26 +83,21 @@ private[jms] final class JmsProducerStage[A <: JmsMessage, PassThrough](settings | |||
in, | |||
new InHandler { | |||
|
|||
override def onUpstreamFinish(): Unit = if (inFlightMessagesWithProducer.isEmpty) completeStage() | |||
override def onUpstreamFinish(): Unit = if (inFlightMessages.isEmpty) completeStage() | |||
|
|||
override def onPush(): Unit = { | |||
val elem: E = grab(in) | |||
elem match { | |||
case m: Message[_, _] => | |||
// fetch a jms producer from the pool, and create a holder object to capture the in-flight message. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment got out of date.
Concurrent tests can fail in oh so many ways.
@2m comment and test fixed. |
LGTM, but I have not spent lots of time on this connector. @akara could you take a look at this PR as well? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only have minor, mostly syntactic and code safety comments. Otherwise all good to me. Thanks for all the effort! We'll have a great JMS connector going forward.
---------------|-------------------------------------------------------------------------------------|-------------- | ||
initialRetry | Wait time before retrying the first time | 20 ms | ||
backoffFactor | Back-off factor for subsequent retries | 1.5 | ||
maxBackoff | Maximum back-off time allowed, after which all retries will happen after this delay | 500 ms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default sounds good to me.
@@ -154,7 +181,9 @@ final case class JmsProducerSettings(connectionFactory: ConnectionFactory, | |||
copy(acknowledgeMode = Option(acknowledgeMode)) | |||
} | |||
|
|||
final case class Credentials(username: String, password: String) | |||
final case class Credentials(username: String, password: String) { | |||
override def toString = s"Credentials($username,${"*" * password.length})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good idea to mask out the credentials logging. Thanks for doing this.
protected def jmsSettings: JmsSettings | ||
|
||
protected def onSessionOpened(jmsSession: S): Unit = {} | ||
|
||
protected val fail: AsyncCallback[Throwable] = getAsyncCallback[Throwable](e => failStage(e)) | ||
|
||
private val connectionFailedCB: AsyncCallback[Throwable] = getAsyncCallback[Throwable](e => connectionFailed(e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a style comment. Not critical. (e => connectionFailed(e))
can be better represented as just (connectionFailed)
as that is already a function of type Throwable => Unit
.
after(delay, system.scheduler) { | ||
openConnectionWithRetry(startConnection, nextN) | ||
} | ||
after(waitTime(nextN), system.scheduler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we take care of waitTime(nextN)
rolling over just in case we keep exponentially increasing the value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The time is capped at maxBackoff
in waitTime
:
https://github.com/akka/alpakka/pull/1227/files#diff-01235c136811eb7422e859a2a1190b2fR112 and for the connect backoff, here: https://github.com/akka/alpakka/pull/1227/files#diff-01235c136811eb7422e859a2a1190b2fR90
Is this what you mean by rolling over?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got what you mean: as soon as the duration exceeds Long.MaxValue
nanoseconds, multiplication of finite duration will fail with java.lang.IllegalArgumentException
. Math.pow(2, 64)
already exceeds Long.MaxValue
, so we definitely need to take care of that. I'll fix this 👍
onConnectionFailure(ex) | ||
override def onException(ex: jms.JMSException): Unit = { | ||
Try(connection.close()) // best effort closing the connection. | ||
connectionFailedCB.invoke(ex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it is future-based, I'm not to worried about switching back. But this also works. Thanks!
Source.fromGraph(new JmsTxSourceStage(settings)) | ||
def txSource(settings: JmsConsumerSettings): Source[TxEnvelope, KillSwitch] = { | ||
require(settings.destination.isDefined, noConsumerDestination(settings)) | ||
Source.fromGraph(new JmsTxSourceStage(settings, settings.destination.get)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here too.
Source.fromGraph(new JmsBrowseStage(settings)) | ||
def browse(settings: JmsBrowseSettings): Source[Message, NotUsed] = { | ||
require(settings.destination.isDefined, noBrowseDestination(settings)) | ||
Source.fromGraph(new JmsBrowseStage(settings, settings.destination.get)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
|
||
val in = (1 to 50).map(i => JmsTextMessage(i.toString)) | ||
val jmsFlow = JmsProducer.flow[JmsTextMessage](JmsProducerSettings(factory).withQueue("test").withSessionCount(8)) | ||
|
||
val result = Source(in).via(jmsFlow).toMat(Sink.seq)(Keep.right).run() | ||
|
||
result.futureValue shouldEqual in | ||
delays.get() shouldBe 50 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delays.get()
does not have any side-effect, right? Should it rather be spelled delays.get shouldBe 50
?
val result = Source(List("one")).runWith(jms) | ||
|
||
result.failed.futureValue shouldBe a[JMSException] | ||
sendAttempts.get() shouldBe 6 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, sendAttempts.get shouldBe 6
val result = Source(List("one")).runWith(jms) | ||
|
||
result.failed.futureValue shouldBe a[JMSException] | ||
sendAttempts.get() shouldBe 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto here.
- use .get instead of .get() as it is side-effect free - avoid option.get entirely with pattern matching - use automatic method to function lifting to avoid parameter naming - use case instead of tuple addressing with _1, _2, ...
Thanks @akara for your review! I've addressed all your comments but one, where I'm not sure I understood properly (see #1227 (comment)). |
When using exponential backoff, the retry delay can very quickly exceed the FiniteDuration limit of Long.MaxValue nanoseconds. This requires special treatment, as pointed out by Akara Sucharitakul.
Thank you everyone for great work! Pressing the big green button. |
This is my proposal on how to achieve producer retries with reconnects.
I also simplified some code along the line (which I hope is okay for everyone).
Also, I added a missing async callback on disconnect that incidentally also changes the amount of retries performed in the reconnect test (I will point out in annotations where).
Documentation is lacking for now, I will add some examples once we are fine with the design and the way it works.
Fixes #1226