Skip to content
This repository has been archived by the owner on Jan 17, 2020. It is now read-only.

WIP: Better story about in flight messages and rate limit #141

Merged
merged 1 commit into from Feb 28, 2019
Merged

WIP: Better story about in flight messages and rate limit #141

merged 1 commit into from Feb 28, 2019

Conversation

flxo
Copy link
Contributor

@flxo flxo commented Feb 24, 2019

Hi.

the way rumqtt handles kind of overload situations seems not optimal to me. The current behaviour just blocks for a configured amount of time when the request channel is full. If the application sends any command during this delay, rumqtt will block again when polling next time. This leeds to a rate of 1/configured delay. If the clients keeps sending you will stay at this rate.

I changed this implementation and removed the fixed delay with a stream interception when the publication queue is at a configured level (MqttOptions::in_flight). The essential part is here:

    // Apply outgoing queue limit (in flights) by answering stream poll with not ready if queue is full
    // by returning NotReady.
    fn limit_in_flight_request_stream(&self, requests: impl RequestStream) -> impl RequestStream {
        let mqtt_state = self.mqtt_state.clone();
        let in_flight = self.mqttoptions.in_flight();
        let mut stream = requests.peekable();
        poll_fn(move || -> Poll<Option<Request>, NetworkError> {
            if mqtt_state.borrow().publish_queue_len() >= in_flight {
                match stream.peek() {
                    Err(_) => stream.poll(),
                    _ => Ok(Async::NotReady)
                }
            } else {
                stream.poll()
            }
        })
    }

This affects any command on the request queue. I'm not sure if there is a definition for the in flight window and if it should contain e.g Subscribes or others. I personally think it doesn't matter if they're included.

Next the stream throttling implementation is also a little bit misleading. I replaced the sleep time that happen when a configured rate is reached with a simple StreamExt::throttle that is a prefect match.

This patch changes the client API.

What do you think? I consider this as work in progress and appreciate any feedback!

cheers!

@tekjar
Copy link

tekjar commented Feb 25, 2019

If the application sends any command during this delay, rumqtt will block again when polling next time.

I didn't completely understand what you mean here but overall you solution looks much elegant and clean than what I did :).

You seem to have cleaned up Prepend stream as well. Can we make that a separate PR? That just makes testing and visiting back this merge in future easier

@tekjar
Copy link

tekjar commented Feb 25, 2019

Or never mind. Prepend isn't a big change. We'll leave it there :)

@flxo
Copy link
Contributor Author

flxo commented Feb 25, 2019

If the application sends any command during this delay, rumqtt will block again when polling next time.

I didn't completely understand what you mean here but overall you solution looks much elegant and clean than what I did :).

Ok. Imagine a client continuously publishing messages and the connection or broker isn't fast enough to handle them the rumqtt internal queue MqttState::outgoing_pub reaches MqttOptions::outgoing_queuelimit.0. The currently implementation detects this and waits for MqttOptions::outgoing_queuelimit.. If during this period of time the client pushes another Publish, rumqtt will do the same in the next iteration, since MqttState:: outgoing_pub is (again) full. This results in processing one element per MqttOptions::outgoing_queuelimit.1. Not very helpful to resolve the overload... Exactly the same happens for MqttOptions::outgoing_ratelimit.

You seem to have cleaned up Prepend stream as well. Can we make that a separate PR? That just makes testing and visiting back this merge in future easier

Yeah. During this refactoring I removed prepend and used Stream::chain but later on realised that we need some kind of buffer over reconnects. Then I renamed prepend::StreamExt since it collides with futures::stream::StreamExt I wanted to use for Stream::throttle. I renamed Prepend::merge_session to insert since Prepend doesn't care about sessions and inserts item in between the buffer and the wrapped stream.
I will put this into a dedicated commit.

@flxo
Copy link
Contributor Author

flxo commented Feb 25, 2019

The most crucial point to me is if I made something stupid when returning NotReady or not. Could this lead to a deadlock? I'm not sure but think not because if the broker stops replying to our Publishes the Ping will timeout as well and we're reconnecting or just failing.

@tekjar
Copy link

tekjar commented Feb 25, 2019

Could this lead to a deadlock

One way I think deadlock could happen is, if publish request queue is full (sender is blocked now) and there is no one to notify the RequestStream to pull out an item when publish_queue_len is less than in_flight length. Select should take care of this as long as there is someone to wake up the event loop. I think it makes sense to write unit tests to test these scenarios.

@tekjar tekjar changed the base branch from master to connection_refactor February 28, 2019 09:47
@tekjar
Copy link

tekjar commented Feb 28, 2019

These changes already looks good to me. I'll merge this in a different branch and experiment with some test cases that I already wrote. Thanks a lot for the contribution :)

@tekjar tekjar merged commit 5660643 into AtherEnergy:connection_refactor Feb 28, 2019
@flxo
Copy link
Contributor Author

flxo commented Feb 28, 2019

Ok. Thanks. I didn't have time to work on this the last days. Last week I ran a couple of load tests with the patch without any issues but the tricky part are definitely reconnections and connections flaws. Let me know if there's anything I can do.

@flxo flxo deleted the pr-in-flight branch February 28, 2019 17:19
@tekjar
Copy link

tekjar commented Feb 28, 2019

I didn't have time to work on this the last days

No problem. I'll finish this :)

Let me know if there's anything I can do.

I just have a few doubts about stream progress when the request channel is full and stream NotReady return during rate limiting. I'll confirm those and raise a PR against the master. A review during that time would help

@flxo
Copy link
Contributor Author

flxo commented Feb 28, 2019

I just have a few doubts about stream progress when the request channel is full and stream NotReady return during rate limiting. I'll confirm those and raise a PR against the master. A review during that time would help

Yes. This is legitime. I played today evening and and learnt that mixing crossbeam and futures channels isn't a good idea. Something needs to wake the reactor but I don't have a deeper understanding of what happens inside.

Looking forward to your final patch!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants