-
Notifications
You must be signed in to change notification settings - Fork 647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQTT streaming: Fixes wrong Exception thrown when client ping timeouts #1642
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one!
@@ -1514,7 +1515,9 @@ class MqttSessionSpec | |||
server.watchCompletion().foreach(_ => session.shutdown()) | |||
} | |||
|
|||
"close when no ping request received" ignore { // assertAllStagesStopped { // https://github.com/akka/alpakka/issues/1563 | |||
"close when no ping request received" in assertAllStagesStopped { | |||
implicit val patienceConfig = PatienceConfig(scaled(Span(3000, Millis)), scaled(Span(15, Millis))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please explain why we need this patience config? Is there a way to avoid it? I’m not overly concerned, but we’ve not had to use such config so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that the test is using 1 second keep alive parameter so using default PatienceConfig of 150 milliseconds as max value is not enough for the ping timeout to be triggered. Therefore I'm adding 3 seconds max time before I see the stream failing because of the ping timeout or failing otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about reducing the ping timeout instead?
Oh I just remembered that the MQTT timeout is the granularity of seconds... so, don’t worry. A comment on the patience config as per your previous explanation would be great. Also, you may want to use “.dilated” on the patience config time. |
I'll add a little comment on that. Regarding patience config, I've used the same pattern that the default one which is used everywhere else, meaning the implicit one defined in
where Default values for PatienceConfig are:
Instead of using Does it make sense? |
@jcroig I've just had a look for other uses of patience config within Alpakka. This appears to fall in line. I don't think we need to do any more, and thanks for putting the comment in. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great!
LGTM.
Thank you for fixing the small details to make it even better! |
Purpose
When mqtt client doesn't send ping request within the expected period (given by the connect keep alive parameter) stream should failed with
ServerConnector.PingFailed
exception but it fails withClientConnection.PingFailed
exception. That makes that ActorMqttServerSession command flow doesn't translate the failure to the public api error ActorMqttServerSession.PingFailed (MqttSession.scala::482)References
References #xxxx
Changes
Background Context
There was an ignored failing unit test due to this bug that however was pointing to a different issue. After this PR this unit test is not failing anymore so I activate it again.