Feature: Connection status notifications and notifications drop handling #143
Conversation
lets think about merging |
The test story is here is bad - I just added a very very basic one. Any thoughts about kind of integration tests with a "real" broker? Especially for testing the connection state changes this is definitely more convenient than mocked unit tests... |
One more note: I didn't increment the version which is kinda needed because of the API change. This can be done when eventually this branch is merged. |
src/client/connection.rs
Outdated
Err(false) => break 'reconnection, | ||
Ok(_v) => continue 'reconnection, | ||
} | ||
Err(reconnect) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this handling? I intended to abstract all the reconnection behavior in mqtt_io
method and it tells whether to connect or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you observer any bug with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. Probably not. I picked the commit from pr-notifications
and didn't notice that this is not needed.
@flxo Thanks for working on that. We've just run into that issue and here is already a solution :-) |
@tekjar Could we please merge this one and bump the crate version? This feature is quite important for our case. |
Sorry for the delay on this. I'll try to close this today |
handle_notification(notification, ¬ification_tx); | ||
future::ok(reply) | ||
let mut mqtt_state = mqtt_state.borrow_mut(); | ||
mqtt_state.handle_incoming_mqtt_packet(packet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why you've moved linear and_then
to nested and_then
? IMO chaining promises one after the other is much more readable than nesting combinators, which can lead to call back hell sort of code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This two lines just remove the explicit future::ok()
. The Result<>
returned by handle_incoming_mqtt_packet
can be used directly and coverts because of IntoFuture
for Result
. Not sure what you mean by linear and nested her.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. Got it.
What I meant was that method 2 below will keep combinators like and_then
one after the other instead of one inside the other
.and_then(move |packet| {
debug!("Incoming packet = {:?}", packet_info(&packet));
let mut mqtt_state = mqtt_state.borrow_mut();
mqtt_state.handle_incoming_mqtt_packet(packet)
.and_then(|(notification, reply)| {
handle_notification(notification)
Ok(reply)
})
})
.filter(should_forward_packet);
vs
.and_then(move |packet| {
debug!("Incoming packet = {:?}", packet_info(&packet));
let mut mqtt_state = mqtt_state.borrow_mut();
let o = mqtt_state.handle_incoming_mqtt_packet(packet)
future::result(o)
})
.map(|(notification, reply)| {
handle_notification(notification)
reply
})
.filter(should_forward_packet);
src/client/connection.rs
Outdated
let mut mqtt_state = mqtt_state.borrow_mut(); | ||
mqtt_state.handle_incoming_mqtt_packet(packet) | ||
.and_then(|(notification, reply)| { | ||
match notification { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we move this to a method? Helps in understanding the flow for newcomers as well as me when I visit the code after a break
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. I got what you mean and updated the PR. I did not move it to a methods sind the signature would be quite weird and with the not nested and_then
I think it's readable. I also remember why I did the nested and_then: Avoid the double clone of the Rc<MqttState
.
Feel free to refactor this.
Left some small nits. So AFAIU you are using oneshot channel along with crossbeam channel for synchronization. Give we anyway lose the convenience of crossbeam select with the wrapper, why not just use futures channel? |
Coming to your questions
We'll do this in the next PR
Integration tests with a real broker would be good but I thought we should also simulate bad networks, disconnections and half-open connections to continuously test out corner cases. But we can start with any available broker and later work on rumqttd to simulate those conditions. Gave a shot to toxiproxy but too many bugs
I'll increment when I merge this to master :) |
I don't see any drawback for using the futures channel since I'm in most of my project anyway in "future" context. You can still use the |
Cool. Let's merge this after those small changes. I can make those changes myself after merging if you are busy |
match notification { | ||
Notification::None => (), | ||
// Ignore error on notification_tx send, since the receiver can be dropped at any time | ||
_ if mqtt_state.send_notifications() => drop(notification_tx.send(notification)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like this will block the event loop. What happens when the receiver is not dropped and the channel is full?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This blocks the loop. As far a I got it that's ok in order to backpressure on the broker.
A option would be to use a unbounded channel - but this is probably not what we want here.
I'm not fluent in the MQTT spec: What is a broker allowed to do with clients that back pressure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking the event loop is not ok just because the receiver is not able to catch up. Pings should still continue to happen to prevent broker disconnecting the client and publisher should not be slow because the receiver is doing some heavy computation.
This is getting a little tricky than I anticipated. I'll give proper thought to this and ping you in a few days.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. You're right. I forgot the pings...sorry. Will think about that too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool :). Sorry for the back and forths on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Options regarding the pings:
- Accept that an overload of the client will stop sending pings. The broker will probably disconnect in such a situation (or has to according the spec?). The timeouts are quite relaxed to be fine with a short delayed ping. This is the behaviour on
notification_refactor
(this PR). - A
unbound
notification queue: sorry - bad idea... - Discard notifications when the notification queue is full. Behaviour on
master
. I'd really prefer option 1 one over 3 since at least you see what happened. - Another idea is to split the notification queue into several ones to give the client more flexibility what to handle or which of the
Receivers
to drop. e.gNotification::Puback
could be less interesting thatNotification::Connected
orNotification::Disconnected(_)
. Here still the decision whether aNotification
can be discarded or not has to be done. - Configuration options to choose between option 1 and 3. Should not be a
feature
but could be added to the config struct.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @flxo. Sorry for the delay on this. Coming to the options that you've mentioned
-
Blocking the event loop, in general, isn't a good idea. It's not just about pings and disconnects. If someone is doing publishes from a different thread and the receiver is doing heavy computations, publishers won't progress. Just because crossbeam channel is full, blocking the publisher doesn't make any sense. Also, the eventloop block will keep interfering with any future features we might need on the eventloop.
-
Agreed :)
-
Sounds good
Personally, I feel that we should provide an option for future channels
along with crossbeam channel
for notifications. Future channels won't have this problem of blocking the eventloop while doing a blocking send. Also we won't lose data silently. If people choose to use crossbeam channel, the get the flexibility of select but might miss incoming data.
I'm open to discussion on this. May on a new issue :)
Also, I'll busy this month and won't be able to do a lot of rumqtt work. But we can try to reach a conclusion this month and implement it in the next
Check channel state before sending notifications and distibute connection state changes. The notification Receiver is wrapped into a struct that contains a oneshot Receiver that allows the client to check whether the notification struct is dropped or not.
Might i inquire for any news on this, after skimming through this PR, this seems like it would give users such as me ( with devices on bad networks a chance to react on network changes ) a chance to react to those network changes. |
@TotalKrill Couldn't spend any time on this crate now. Will most probably resume the work in June |
@TotalKrill As you might have noticed there are a couple of open points that are not super easy to solve or decide. I'm using this branch in an experimental project and it works so far in that specific scenario. |
@flxo alright, I might have to take a look into that. A semi-related question though, how are topic subscriptions being handled during disconnect/connect? Right now I have noticed that long running connections are not receiving subscription notifications after a while. I am suspecting this is due to a disconnect/reconnect. My preferred behavior would be to resubscribe to topics as well on reconnects. |
As far as I know resubscriptions upon a reconnect is a open topic. See #85. Probably for now you have to do that on your own. |
the clean_session(false) seems to solve it as long as the broker doesnt die |
I tried wrapping my head around this some. The problem is that the eventloop handles publish requests, and that if we are handling other events during this time, the publish might stall since the other events from the broker might ( such as recieved mqtt messages ) might be in the way. as well as that client generated pings and events can get lost. And all these problems are due to the fact that the client is overloaded? Could we not return errors when trying to publish to an overloaded eventloop, or send an error notification in the case when a ping failed to send due to overloading the eventloop. Or is this impossible due to problem with blocking the eventloop? |
Any updates on this subject? I am having troubles when needing to resubscribe when broker crashes. |
@marcotuna I've implemented reconnection events in the master branch. But I'll have to add automatic resubscription to eventloop (of course based on user configuration). You can use reconnection events to resubscribe manually but the subscription will only happen after the publishes in the queue are transmitted by the eventloop |
@flxo Reconnection notifications and notification drop handling are part of master now. Please feel free to comment on the issue if you don't feel the current behavior solves your usecase |
@marcotuna Can you please head issue #85. I'll expose resubscription option today |
This is a follow up on #142 without any change on the reconnection behaviour.
Check channel state before sending notifications and distibute connection state changes. The notification Receiver is wrapped into a struct that contains a oneshot Receiver that allows the client to check whether the notification struct is dropped or not. Upon disconnections the corresponding
Error
is sent.