Skip to content
This repository has been archived by the owner on Jan 17, 2020. It is now read-only.

Handle messages asynchronously #71

Closed
ivanovaleksey opened this issue Feb 1, 2018 · 10 comments
Closed

Handle messages asynchronously #71

ivanovaleksey opened this issue Feb 1, 2018 · 10 comments

Comments

@ivanovaleksey
Copy link

ivanovaleksey commented Feb 1, 2018

Hello,
thank you for the great library!

I am looking for Rust MQTT library for my application.
I want it to be able to handle incoming messages asynchronously because there will be DB interaction and other blocking things.

I added thread::sleep within on_message callback.
Message handling had been queued: one message was handling (i.e. waiting on sleep) and new incoming message was queued and processed only after awaking.
Maybe that is totally my fault and I made the main and the only one thread to sleep.

What is the correct way to asynchronous message handling?
Can I create some kind of a thread poll within on_message and just push incoming messages to that pool?
Or it will affect dealing with QoS?
As I can see from the code we firstly send PUBACK and then call user's callback.
Does it mean that it doesn't matter whether I would handle incoming message in the same or in separate thread because PUBACK was already sent?

Also, I have found tokio2 branch. Maybe it better suits for async handling?

Thank you.

@tekjar
Copy link

tekjar commented Feb 1, 2018

@ivanovaleksey tokio2 branch is still in works (maybe it'll be ready in the next 10 days) but it's very ergonomic with handling incoming messages. you get all the incoming messages in a channel and you can handle it in a way that fits your needs. But it's your responsibility to pull the messages out of the channel at a rate greater than incoming messages or else the packets will be dropped.

@ivanovaleksey
Copy link
Author

ivanovaleksey commented Feb 1, 2018

Thank you for the reply @tekjar.
How do I know this rate to avoid dropping messages? As I understand it depends on how much and how often my clients will send messages. But I can't predict this at any given moment.
Is there any way to ensure that every message will be handled and nothing will be dropped?

Does dropping messages occur on master branch? As I understand tokio2 will be merged into the master, so dropping will be on master anyway?

@tekjar
Copy link

tekjar commented Feb 1, 2018

@ivanovaleksey master branch uses threadpool to execute callbacks. I've not tested master enough with incoming messages to understand how threadpool behaves during high loads. If threadpool uses a unbuffered channel, the memory will just keep going up if the callbacks are heavy/blocking. One way to ensure that you don't drop messages in tokio2 is to not do heavy processing in the thread that receives data and offloading work to a threadpool.

@ivanovaleksey
Copy link
Author

Yes, there is a threadpool on master branch but it has capacity of 1, so it look like sequential message handling.

not do heavy processing in the thread that receives data and offloading work to a threadpool

Yes, that was my initial idea too. It doesn't have any impact on PUBACK since PUBACK is sent before calling user's callback, does it? The same is true for tokio2 too?

@tekjar
Copy link

tekjar commented Feb 1, 2018

Yeah puback should be sent before handling the message, or else (most) brokers will keep resending the message if your callback is heavy

@ivanovaleksey
Copy link
Author

Thanks once again @tekjar.
I am closing the issue and will try to use the library.

@tekjar
Copy link

tekjar commented Feb 1, 2018

The reason behind choosing a channel instead of callback is to give users the flexibility. These 'Receiver's are crossbeam-channels receivers which are very flexible. You can use these receivers in multiple threads to load balance or use a threadpool for other use cases.

Thinking about this, may be I should provide a way to configure the size of the channel (or) use unbounded channels

@ivanovaleksey
Copy link
Author

Hello @tekjar, again I have a question about async handling.
I forgot to mention (and now I am afraid it is a big problem) that I need not only to receive messages but also to respond on them.

Suppose I have thread pool with a size of 3.
On receiving a new message I just push it to the pool to archive async handling.
But then each handle, i.e. each one of 3 threads, needs to respond back.
MqttClient::publish requires a &mut MqttClient, but it looks like I couldn't obtain a mutable reference in each thread.
It seems like responding to a message would be a bottleneck.

Is there any workaround here?
Or maybe I am missing something and there is no such problem?

Thank you.

@tekjar
Copy link

tekjar commented Feb 10, 2018

@ivanovaleksey If you are using tokio2 branch, you'll be able to clone MqttClient (not yet implemented). Can you open a separate issue for this?

@ivanovaleksey
Copy link
Author

Thanks @tekjar, I currently use 0.10.1 version.
I am very new to Tokio and I decided to start with "plain" version and then switch to Tokio-based.
So, publishing from multiple threads without locking is not possible in 0.10.1 and would be implemented on tokio2 after a while, right?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants