-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JMS producer multi session #1148
JMS producer multi session #1148
Conversation
case v: Byte => message.setByteProperty(key, v) | ||
case v: Short => message.setShortProperty(key, v) | ||
case v: Long => message.setLongProperty(key, v) | ||
case v: Double => message.setDoubleProperty(key, v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the properties contain a value which is not of any of these types, would it make sense to explicitly throw an exception instead of the default MatchError?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I would say yes.
However, the Jms producer crashes on any exception, and does not distinguish between retriable errors and non-retriable ones - it doesn't perform any retries. So in the end, the behavior would still be very similar if we introduced a custom exception here. It would make much more sense to introduce a custom exception if the Jms producer stage did retries, and would not try to do those retries if the property value type is of an unexpected type.
(please note: this is part of the code I moved out of the stage into a separate class - I didn't modify it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks better!
I had some comments about the exception traits though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I've followed your suggestions :)
Test compilation fails for Scala 2.11. |
@ennru thanks for notifying and sorry for not seeing this myself. I will take care of fixing the issues. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work!
I think your solution with the Holder and its callbacks deserves a description in comments so that others can understand it more easily.
@@ -40,11 +40,23 @@ private[jms] trait JmsConnector { this: GraphStageLogic => | |||
onSessionOpened(session) | |||
} | |||
|
|||
private[jms] def initSessionAsync(dispatcher: Dispatcher): Future[Unit] = { | |||
ec = materializer match { | |||
private[jms] def executionContext(attributes: Attributes): ExecutionContext = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and a few other things in this class should be just regular private
or protected
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed; I wanted to follow the style of the existing implementation, I'll change that to simpler visibility restrictions.
|
||
private val jmsProducers: Buffer[JmsMessageProducer] = Buffer(settings.sessionCount, settings.sessionCount) | ||
|
||
private val buffer: Buffer[Holder[A]] = Buffer(settings.sessionCount, settings.sessionCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would producerWithMessage
describe better what it holds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about inFlightMessagesWithProducer
?
@@ -782,5 +785,92 @@ class JmsConnectorsSpec extends JmsSpec { | |||
|
|||
result.futureValue should ===(input) | |||
} | |||
|
|||
"publish and consume strings through a queue with multiple sessions" in withServer() { ctx => | |||
//#connection-factory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to remove the copied //#
as that marks what Paradox cuts out for the documentation.
It would be very useful if you added a section on the producer settings in the documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I wasn't sure what //#
does. I'll add producer settings documentation as well.
/** | ||
* Marker trait indicating that the exception thrown is intermittent. The failed operation might succeed if tried again. | ||
*/ | ||
trait RetriableException extends Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes some sense to mark some exception cases as NonRetriableException
. However the other way around RetriableException
does not make as much sense.
The essence is that you want to point out for some specific exceptions that a retry makes no sense. However for all other exceptions, a retry might make sense (so there is no real need for the marker trait RetriableException
especially since the code does not even use this).
/** | ||
* Marker trait indicating that the exception thrown is persistent. The operation will always fail when retried. | ||
*/ | ||
trait NonRetriableException extends Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a jms library, maybe we should call this a NonRetriableJmsException
.
Travis failed because an AMQP test failed: https://travis-ci.org/akka/alpakka/jobs/420470777#L680 |
number of parallel sessions increases throughput at the cost of message ordering. While the messages may arrive | ||
out of order at the Jms broker, the producer flow outputs messages in the order they are received. | ||
* `timeToLive` (optional) the time messages should be kept on the Jms broker. This setting can be overridden on | ||
individual messages. If not set, messages will never expire. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I didn't document acknowledgeMode
. This is because I read various threads on Jms stating that this setting is not considered when producing records.
Looks like it is always the same AMQP test that is consistently failing: |
Don't worry for the AMQP test, it is not your fault. See #1167. |
The build failure is fixed in master, please re-base. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just another tiny comment. LGTM, otherwise.
case settings: JmsConsumerSettings => | ||
settings.sessionCount | ||
case settings: JmsConsumerSettings => settings.sessionCount | ||
case settings: JmsProducerSettings => settings.sessionCount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may push up sessionCount
to JmsSettings
to simplify this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do :) I try to find some time today to finish things up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the JmsBrowseSettings
would then need to implement sessionCount
as well, which would be hard-coded to 1. Should we go for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, makes sense to me.
- created a separate JmsMessageProducer class to keep the stage logic more focussed on coordinating elements within the stage. - grouped the logic for creating the execution context to use in the producer stage as well as in the consumer stage.
- JmsProducerStage now supprots multiple sessions by behaving similarly to MapAsync - Producers are pooled and parallelism is controlled by how producers are returned to the pool. Fixes akka#1144
- Introduces traits to separate recoverable and non-recoverable errors - Introduces exceptions for populate map message and populate message properties - Refactors JmsMessageProducer to produce new exceptions
In order to explain the intent, added some brief comments to the marker traits.
- Adding tests for new exception types - Deleting retriable exception marker trait - renaming NonRetriableException to NonRetriableJmsException - Adding docs on producer settings - Adding comments to producer graph stage logic -
The number of sessions created did not reflect the desired sessionCount. Now it does.
The contributor advice states that case classes should not be used as settings objects. Therefore, this commit changes the producer config docs example from using the case class constructor to using the withXxx methods.
The previous implementation of the test was flaky, as it did assume that on failure of one send operation, the producer stage would emit all in-flight messages before the failure and then fail the stage. The actual behavior is fail-fast: failure of one send operation causes immediate failure of the stage. This commit adjusts the test to this behavior (that corresponds to MapAsync's behavior).
799d93d
to
17be020
Compare
Test failure is https://travis-ci.org/akka/alpakka/jobs/422226062#L715 |
The tests "fail fast on the first failing send" and "sink disconnect exceptional completion" were racy. This commit fixes these tests by introducing explicit count down latches to wait for execution progress.
I've changed the failing test and my test (that was actually racy too) in the hope of improving things. |
well, now I am at a loss at how to stabilize the failing test - any ideas welcome :) |
We've seen failure on that from time to time: #1031 |
Now, it's an ftp test that fails: https://travis-ci.org/akka/alpakka/jobs/422635814#L644 |
Thank you for this great addition to the JMS connector! |
- JmsProducerStage now supports multiple sessions by behaving similarly to MapAsync - Producers are pooled and parallelism is controlled by how producers are returned to the pool - Created a separate JmsMessageProducer class to keep the stage logic more focussed on coordinating elements within the stage. - Grouped the logic for creating the execution context to use in the producer stage as well as in the consumer stage. Fixes akka#1144
- JmsProducerStage now supports multiple sessions by behaving similarly to MapAsync - Producers are pooled and parallelism is controlled by how producers are returned to the pool - Created a separate JmsMessageProducer class to keep the stage logic more focussed on coordinating elements within the stage. - Grouped the logic for creating the execution context to use in the producer stage as well as in the consumer stage. Fixes akka#1144
similarly to MapAsync
are returned to the pool.
more focussed on coordinating elements within the stage.
producer stage as well as in the consumer stage.
Fixes #1144