-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQTT streaming: Fixup the continuation of a session #1414
Conversation
Prior to this commit, if a server side session was publishing to a client and the client disconnected, and then subsequently reconnected, its publications would be lost. This is because I was closing over some actor state within a future. Whoops. I’ve now written a comprehensive test for this scenario, along with some additional subscription tests along the way.
f37c191
to
e0cb8a5
Compare
reply.future.foreach(command => context.self ! ReceivedProducerPublishingCommand(command)) | ||
reply.future.foreach { | ||
_.runForeach(command => context.self ! ReceivedProducerPublishingCommand(command)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file contains the main change
case Failure(_: WatchedActorTerminatedException) => | ||
case _ => | ||
clientConnector ! ClientConnector.ConnectionLost | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slight optimisation to reduce log messages. We are explicitly watching an actor so don't send it a message when terminated. :-)
@@ -1233,6 +1349,146 @@ class MqttSessionSpec | |||
server.offer(Command(pubAck)) | |||
client.expectMsg(pubAckBytes) | |||
} | |||
|
|||
"produce a duplicate publish on the server given two client connections" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the test that confirms the PR fix.
@@ -167,6 +167,51 @@ class MqttSessionSpec | |||
result.futureValue shouldBe Right(Event(subAck)) | |||
} | |||
|
|||
"Subscribe and stash any subsequent subscriptions" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional test - nothing directly to do with the problem being solved by the PR
@@ -900,6 +944,78 @@ class MqttSessionSpec | |||
} | |||
} | |||
|
|||
"receive two subscriptions for the same topic" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional test - nothing directly to do with the problem being solved by the PR
@@ -1165,7 +1281,7 @@ class MqttSessionSpec | |||
"re-connect given connect, subscribe, connect again, publish" in | |||
reconnectTest(explicitDisconnect = false) | |||
|
|||
"receive a duplicate publish" in { | |||
"consume a duplicate publish on the server" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarification of this test
So that the logs are clearer, but also to ensure no untoward behaviour, we now cancel timers when transitioning to another behaviour.
To facilitate debugging, we now log MQTT events as they pass through the session. These events are available only when the akka.loglevel is set to debug. We also take care to avoid the output of sensitive information.
Ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Prior to this commit, if a server-side session was publishing to a client and the client disconnected, and then subsequently reconnected, its publications would be lost. This is because I was closing over some actor state within a future. Whoops.
I’ve now written a comprehensive test for this scenario, along with some additional, unrelated, subscription tests along the way.
I also cancel timers explicitly to be tidy (and to avoid potential misfiring issues, although I've not seen any), and the logging has been improved so that debug log levels outputs MQTT events sans sensitive info (passwords, payloads); the latter being useful toward logging.