Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control PUBACK/PUBREC transmission #53

Closed
MattBrittan opened this issue Apr 21, 2021 · 3 comments
Closed

Control PUBACK/PUBREC transmission #53

MattBrittan opened this issue Apr 21, 2021 · 3 comments

Comments

@MattBrittan
Copy link
Contributor

I would like a way to:

  • Prevent a PUBACK/PUBREC from being sent if something goes wrong whilst the message is being processed in my code (e.g. a service used to persist the message fails).
  • Delay the sending of the PUBACK/PUBREC so that data can be committed to storage in batches with all relevant messages being acknowledged when the batch has been successfully processed.
  • Potentially send a customised PUBACK (adding user properties or similar as discussed in issue How about adding hook for PUBXXX messages #40; this only goes part way towards delivering on that request). This would also enable the use of reason codes such as Topic Name invalid.
  • Potentially use this technique instead of implementing persistence on inbound messages (effectively using the broker as a store).

This issue comes out of a discussion on PR #52; that PR ensures that the message router is called in message order and that the relevant PUBACK/PUBREC is returned in order and delays the sending of the PUBACK/PUBREC until the router returns. The discussion was getting fairly long and is really separate from the PR so I felt it best to open a new issue to allow discussion on this point (it's something others may be interested in).

Relevant issues raised in the V3 Client:

The solution I'd like

Add an additional parameter to Router.Route(*packets.Publish); Router.Route(*packets.Publish, func Ack(io.WriterTo) err). The Ack function will send the default acknowledgment (if WriterTo is nil) or use the passed in custom WriterTo. If the Ack function is not called then no PUBACK/PUBREC will be sent (and the broker would resend the message at some point). Note that at QOS0 the ACK function would not do anything.

The benefits of this solution are:

  • Its a clear change to the Router interface so obvious that action is needed (some of the other options could lead to the behaviour being hidden). The default router could be changed to call Ack(nil) which would mean that most users would not notice the change.
  • The Publish packet remains simple and pass by value is not an issue (a mutex may be needed so using a wrapper around the publish packet might create issues for users who copy the struct).
  • It meets all of the requirements noted above.

Alternatives considered

  • Current Approach - Ensures PUBACK/PUBREC sent in order but message handlers called as go routine so order not guaranteed. PUBACK/PUBREC sent regardless of any issues encountered whilst handling the message (leading to potential data loss).
  • PR In-order routing and late ack #52 approach - Ensures PUBACK/PUBREC sent in order and creates a way to avoid sending them (close the connection before returning from the handler. However:
    • forcing a disconnect is likely to create issues
    • this does not provide a way to process multiple messages simultaneously AND control the ACK
    • a change would be needed if inbound persistence is implemented in the future (there is no way to say "don't send an ACK" at all).
  • Don't auto ACK and have a func (c *Client) Ack(pb *packets.Publish) error that users can call. This could be workable but I'm concerned about what will happen when the client disconnects/reconnects (meaning there is a new Client); it would be easy for users to introduce bugs into their code. There is also a potential issue if the function is called twice for the same packets.Publish (as the MID may have been reallocated to another message).
  • Wrap the PUBLISH in a PublishWithAck which provides func (p *PublishWithAck) Ack(io.WriterTo) err (as per the preferred option above). The downsides of this is that it may be a confusing change for users and complicates the struct passed in (we would probably need a mutex which would prevent pass by value).
  • Have the router take a 2nd parameter which would be a lambda function to ACK the packet.

Additional context

@alsm asked paho-dev@eclipse.org mailing list and @icraggs (Eclipse Paho project lead) responded:

if I understand you correctly, this is what I do in the C client. I send the PUBACK or PUBCOMP (old style QoS 2) after the message handler callback has been invoked, and has returned a good result.

I read that spec statement as referring to the receive maximum limit, making it clear that it applies to PUBLISH packets only, not PUBACKs, PUBRECs, PUBRELs or PUBCOMPs. I do think however there is a question of ordering, that the ack for a message shouldn't be delayed so long that the ack for a subsequently received PUBLISH is sent first, at least not routinely.

(An additional note: I have a request to have two callbacks for the receipt of QoS 2 messages, one for the PUBLISH and one for the PUBREL, to allow a 2-phase commit interaction with, for instance, a database. I haven't implemented it yet, but I intend to.)

@fracasula
Copy link
Contributor

Hi @MattBrittan, thanks for the write-up!

I put some more thought on this and I realized that there might be another way as well. I'd like to propose it so that we
can add it to the discussion.

The idea is to buffer the acknowledgments so that order wouldn't matter for the final user (the client would ensure the
order of the MQTT acknowledgments for 100% compatibility with the specs).

Here's how I thought it could work:

  1. the client records the order of the PUBLISH packets as it sees them coming from the broker (in a simple slice?)
  2. the client exposes an .Ack(pb *packets.Publish) error method to allow for manual acks
  3. a call to client.Ack() only buffers the acknowledgment. once there are enough acks in a row those are sent to the
    server in the expected order
  4. upon disconnection the "pending acks" are cleared up to avoid conflicts (according to the specs unacked packets are
    redelivered upon reconnection only)

4.4 Message delivery retry

When a Client reconnects with Clean Start set to 0 and a session is present, both the Client and Server MUST resend any unacknowledged PUBLISH packets (where QoS > 0) and PUBREL packets using their original Packet Identifiers. This is the only circumstance where a Client or Server is REQUIRED to resend messages. Clients and Servers MUST NOT resend messages at any other time [MQTT-4.4.0-1].

Optionally the client can automatically acknowledge all packets unless it's configured to expect manual
acknowledgements (e.g. ClientConfig { EnableManualAcknowledgment bool }) which also guarantees backwards compatibility.

I think this is the approach that I like the most because:

  1. maintains backwards compatibility
  2. gives you full control of acks
  3. maintains 100% compatibility with the specs
  4. easy to use for the final user
  5. copying packets by value wouldn't matter (e.g. mutexes issue described in first write-up)
  6. we leave the router untouched and with a single responsibility (just route)
  7. no abstractions needed, just good ol' code :D
  8. looks like other clients are using this approach in the industry and seems to have worked well for them
    • HiveMQ Java client
  9. acking multiple times is not an issue (unless we want it to?)
  10. acking an unknown packet just returns an error informing the user

See quick implementation here.

This should cover all QoSs:

  • 0 is ignored
  • 1 is at-least-once so upon reconnection if packets are redelivered it is fine
  • 2 is exaclty-once but we're allowing just for the PUBREC to be manually controlled

Concerns:

  1. behaviour upon failure
    • say that we're sending the acks for the first 10 messages but number 4 fails, we should handle retries and max
      number of retries, after a certain threshold we could return an error and disconnect
    • at the moment the client just logs the error and keeps going so retries or other approaches would be a plus
      considered what we have at the moment (i.e. c.errors.Printf("failed to send PUBACK for %d: %s", pb.PacketID, err))
  2. manual acks are always required, you can't skip one
    • even though this is perfectly fine and follows the spec, it could lead to hard-to-debug issues where the users
      may not realize that they have not sent an ack impeding the sending of other acks and the consequent
      redelivery of messages upon reconnection
    • this can be made clear via the documentation, just think that, in order to enable manual acks, you have to look at
      the ClientConfig and look at the EnableCustomAcknowledgment parameter, the perfect place where this should be
      documented properly
    • additional configuration could be exposed such as acks timeouts (after a given timeout we could ack anyway
      or interrupt the connection returning an error making clear that an ack has not been received for a long time) -
      this would be according to the user configuration, we could expose both options (1. auto ack on timeout - 2. error
      on timeout)

In regards to sending customised PUBACK properties, it might be as easy as allowing a variadic of key-value parameters
in the ack method:

type UserProp struct {
	Key   string
	Value string
}

func (c *Client) Ack(pb *packets.Publish, userProps ...UserProp) error {
	// merge userProps with other properties set by the client
	...
}

Of course it would work if we just want to add user properties. I don't think there is another use case though? 🤔
The advantage would be that the user won't be able to mess up with other packet attributes that they are not allowed to touch. Also they won't be able to accidentally override other props set by the client since they just pass the properties and then the client does the merging (maybe errors can be returned on conflicts).

@vishnureddy17
Copy link
Contributor

Should this issue be closed since #52 was merged in? Sorry for the basic question.

@MattBrittan
Copy link
Contributor Author

I suspect you meant to link PR #57 (Manual acknowledgments) which does appear to provide the functionality I was looking for so I'll close this off. I'm not currently using this library myself (need persistence so use the v3 library) and had not had a chance to look at the solution (this is complicated by the MQTT v5 requirement that ACKS be sent in order which does improve ordering guarantees, in very limited situations, but also limits the usefulness of this kind of technique).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants