Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: oneshot channel returns Pkid #806

Closed
wants to merge 17 commits into from
Closed

feat: oneshot channel returns Pkid #806

wants to merge 17 commits into from

Conversation

de-sh
Copy link
Member

@de-sh de-sh commented Feb 22, 2024

Fixes #349, implements #805

Type of change

New feature (non-breaking change which adds functionality)

Allows user access to pkid information once it is assigned by rumqttc internals during run.

Checklist:

  • Formatted with cargo fmt
  • Make an entry to CHANGELOG.md if it's relevant to the users of the library. If it's not relevant mention why.

Tests performed

Ran this against provided example async code, output looks as follows

Event = Incoming(ConnAck(ConnAck { session_present: false, code: Success }))
Event = Outgoing(Subscribe(1))
Pkid: 1
Event = Outgoing(Publish(2))
Pkid: 2
Event = Incoming(SubAck(SubAck { pkid: 1, return_codes: [Success(AtMostOnce)] }))
Event = Outgoing(PubRel(2))
Event = Incoming(PubRec(PubRec { pkid: 2 }))
Event = Incoming(Publish(Topic = hello/world, Qos = AtMostOnce, Retain = false, Pkid = 0, Payload Size = 1))
Event = Incoming(PubComp(PubComp { pkid: 2 }))
Event = Outgoing(Publish(3))
Pkid: 3
Event = Outgoing(PubRel(3))
Event = Incoming(PubRec(PubRec { pkid: 3 }))
Event = Incoming(Publish(Topic = hello/world, Qos = AtMostOnce, Retain = false, Pkid = 0, Payload Size = 2))
Event = Incoming(PubComp(PubComp { pkid: 3 }))
Event = Outgoing(Publish(4))
Pkid: 4
Event = Outgoing(PubRel(4))
Event = Incoming(PubRec(PubRec { pkid: 4 }))
Event = Incoming(Publish(Topic = hello/world, Qos = AtMostOnce, Retain = false, Pkid = 0, Payload Size = 3))
Event = Incoming(PubComp(PubComp { pkid: 4 }))
Event = Outgoing(Publish(5))
Pkid: 5
Event = Outgoing(PubRel(5))
Event = Incoming(PubRec(PubRec { pkid: 5 }))
Event = Incoming(Publish(Topic = hello/world, Qos = AtMostOnce, Retain = false, Pkid = 0, Payload Size = 4))
Event = Incoming(PubComp(PubComp { pkid: 5 }))
Event = Outgoing(Publish(6))
Pkid: 6
Event = Outgoing(PubRel(6))
Event = Incoming(PubRec(PubRec { pkid: 6 }))
Event = Incoming(Publish(Topic = hello/world, Qos = AtMostOnce, Retain = false, Pkid = 0, Payload Size = 5))
Event = Incoming(PubComp(PubComp { pkid: 6 }))
Event = Outgoing(PingReq)
Event = Outgoing(Publish(7))
Pkid: 7
Event = Incoming(PingResp)
Event = Outgoing(PubRel(7))
Event = Incoming(PubRec(PubRec { pkid: 7 }))
Event = Incoming(Publish(Topic = hello/world, Qos = AtMostOnce, Retain = false, Pkid = 0, Payload Size = 6))
Event = Incoming(PubComp(PubComp { pkid: 7 }))
Event = Outgoing(Publish(8))
Pkid: 8
Event = Outgoing(PubRel(8))
Event = Incoming(PubRec(PubRec { pkid: 8 }))
Event = Incoming(Publish(Topic = hello/world, Qos = AtMostOnce, Retain = false, Pkid = 0, Payload Size = 7))
Event = Incoming(PubComp(PubComp { pkid: 8 }))
Event = Outgoing(Publish(9))
Pkid: 9
Event = Outgoing(PubRel(9))
Event = Incoming(PubRec(PubRec { pkid: 9 }))
Event = Incoming(Publish(Topic = hello/world, Qos = AtMostOnce, Retain = false, Pkid = 0, Payload Size = 8))
Event = Incoming(PubComp(PubComp { pkid: 9 }))
Event = Outgoing(Publish(10))
Pkid: 10
Event = Outgoing(PubRel(10))
Event = Incoming(PubRec(PubRec { pkid: 10 }))
Event = Incoming(Publish(Topic = hello/world, Qos = AtMostOnce, Retain = false, Pkid = 0, Payload Size = 9))
Event = Incoming(PubComp(PubComp { pkid: 10 }))
Event = Outgoing(Publish(11))
Pkid: 11
Event = Outgoing(PubRel(11))
Event = Incoming(PubRec(PubRec { pkid: 11 }))
Event = Incoming(Publish(Topic = hello/world, Qos = AtMostOnce, Retain = false, Pkid = 0, Payload Size = 10))
Event = Incoming(PubComp(PubComp { pkid: 11 }))
Event = Outgoing(PingReq)
Event = Incoming(PingResp)

Thus the implementation is complete.

@de-sh de-sh changed the title fulfill promise on handling packets feat: oneshot channel returns Pkid Feb 22, 2024
@coveralls
Copy link

coveralls commented Feb 23, 2024

Pull Request Test Coverage Report for Build 8478057404

Details

  • 80 of 351 (22.79%) changed or added relevant lines in 6 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall first build on pkid at 36.187%

Changes Missing Coverage Covered Lines Changed/Added Lines %
rumqttc/src/state.rs 27 37 72.97%
rumqttc/src/v5/state.rs 26 38 68.42%
rumqttc/src/lib.rs 3 18 16.67%
rumqttc/src/v5/mod.rs 0 29 0.0%
rumqttc/src/client.rs 16 101 15.84%
rumqttc/src/v5/client.rs 8 128 6.25%
Totals Coverage Status
Change from base Build 8432798356: 36.2%
Covered Lines: 6020
Relevant Lines: 16636

💛 - Coveralls

@de-sh de-sh marked this pull request as ready for review February 23, 2024 02:28
Copy link
Member

@swanandx swanandx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea and also tried it out, with both sync & async, and it works nicely!

Here is my feedback:

  • we should avoid placing the pkid_tx directly in packets, mqttbytes should strictly be related to protocol parsing. We can maybe put it in request like Request::Publish(tx, publish) ?
  • just Pkid seems intuitive than PkidPromise
  • users don't need to know about the oneshot channel, thus instead of just type alias, we should impl / rename some methods. e.g. async client is fine, but when in sync, using pkid.blocking_recv() isn't. maybe we can rename that recv to something else?
  • if we fail to send the pkid from state, maybe we can log some debug lvl info? not sure how often we will fail though.
  • examples can be simplified, we just want to show how user can get pkid. So we can simply await right after getting Pkid. Using joinsets / delay queues in unnecessarily adding complexity. Also, users want to associate the publish with the pkid right? awaiting em later via joinset or something will be same as checking for Outgoing event.
  • as you suggested in RFC, having many open oneshot channels might addup perf impact which we are unsure of as of now. Whether to feature gate this or not is your call ( as you would know better haha )

@de-sh
Copy link
Member Author

de-sh commented Feb 25, 2024

as you suggested in RFC, having many open oneshot channels might addup perf impact which we are unsure of as of now. Whether to feature gate this or not is your call ( as you would know better haha )

It doesn't seem to have much of an impact on perf, I believe it is safe to ignore for now. Cost of feature gating is expensive on the code readability front.

will be same as checking for Outgoing event.

Not really, I got the idea from the user comment in the RFC, it's just to give users an idea of what is possible, they can choose to not go the route we illustrate, but we can try and educate them about the possibilities.

if we fail to send the pkid from state, maybe we can log some debug lvl info? not sure how often we will fail though.

This should be fine, the only reason we couldn't send was because of rx drop, which means the user doesn't want to do anything with the pkid.

just Pkid seems intuitive than PkidPromise

From a user perspective, you wouldn't want them to confuse between the awaiting and the occupied state.

  • we should avoid placing the pkid_tx directly in packets, mqttbytes should strictly be related to protocol parsing. We can maybe put it in request like Request::Publish(tx, publish) ?
  • users don't need to know about the oneshot channel, thus instead of just type alias, we should impl / rename some methods. e.g. async client is fine, but when in sync, using pkid.blocking_recv() isn't. maybe we can rename that recv to something else?

@@ -183,21 +186,21 @@ impl MqttState {
/// be put on to the network by the eventloop
pub fn handle_outgoing_packet(&mut self, request: Request) -> Result<(), StateError> {
match request {
Request::Publish(publish) => {
Request::Publish(tx, publish) => {
self.check_size(publish.size())?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EventLoop drops the pkid_tx here if size is not valid

@de-sh
Copy link
Member Author

de-sh commented Feb 25, 2024

I have a few comments about the PkidError type, it needs to be more descriptive of why it failed to resolve into a valid number. This could be one of many:

  1. EventLoop was shutdown, hence the tx was dropped.
  2. EventLoop rejected the packet and hence did not assign it a pkid

We should probably handle this in a separate PR, I believe the current PR is a good to merge now.

@swanandx
Copy link
Member

it's just to give users an idea of what is possible, they can choose to not go the route we illustrate, but we can try and educate them about the possibilities.

my thinking was, the examples should show them how they can get the pkid in general. for simple examples sake, getting pkid one by one should be fine imo ( in async & in sync )

we can make separate examples tailored toward why they might want to use joinset ( or something else ). There we should also explain why it is used and when they should and shouldn't use it.

From a user perspective, you wouldn't want them to confuse between the awaiting and the occupied state.

I didn't get this completely, but i would like to differ. Adding Promise to name is too verbose(? idk correct word). e.g. the receiver is Receiver not ReceiverPromise.

@de-sh
Copy link
Member Author

de-sh commented Mar 5, 2024

e.g. the receiver is Receiver not ReceiverPromise.

That's because the receiver is a generic term for receiving data, here we are adding context that we are promising deliver of a Pkid specifically. Naming is the most contentious thing in software, believe me when I tell you that this is the right term 😂

@de-sh de-sh closed this May 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

rumqttc - Get packet id for publish
3 participants