Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return token with an error when Publish has failed due to lost connection #450

Closed
wants to merge 1 commit into from
Closed

Return token with an error when Publish has failed due to lost connection #450

wants to merge 1 commit into from

Conversation

oleksmir
Copy link

@oleksmir oleksmir commented Aug 31, 2020

After the Publish method is invoked, it is possible a situation when the paho client can be in a connecting or reconnecting state. Thus, the returned token will never be completed because nobody invokes the flowComplete method. When a connection is reestablished, this token is not revoked or recreated internally.

On the consumer/client side it will be a problem if the client invokes token.Wait(). Even with WriteTimeout client settings this invocation will never end and lead client code to be stuck. One of the possible solutions is to use token.WaitTimeout() that forces our client to manually check token status until reconnect is finished but even with this approach, you cannot get the completed flow.

Still, my code wants to have guarantees about message delivery and handling. In this case, I want to continue working with a token.Wait() and be possible to interrupt invocation by tuning up the WaitTimeout setting.

This PR proposes to complete token flow if the current connection status is connecting or reconnecting. To notify the client about publishing problems it uses custom errors that can be checked by the invocation side. If an error has occurred, we guarantee that the outbound message was persisted inside the internal storage and will be delivered as soon as the connection is reestablished. Thus, this PR does not modify the logic of the Publish method, just fixes an issue with uncompleted token and Publish unlock.

…tion

Signed-off-by: oleksmir <olexiy.miroshnik@gmail.com>
@oleksmir
Copy link
Author

@alsm @MattBrittan Please, take a look.

@MattBrittan
Copy link
Contributor

When a connection is reestablished, this token is not revoked or recreated internally.

Unfortunately this library has a lot of options and the logic can be somewhat convoluted...

In many instances (on connection if c.options.CleanSession is false and on all reconnections) resume should be called when the connection comes up (initially or upon reconnection) and that will call mids.claimID() which then calls old.flowComplete() which will close the token (without reporting an error). If this is not happening then we have a logic error somewhere (would not surprise me!).

In some other circumstances internalConnLost has if c.options.CleanSession && !c.options.AutoReconnect {c.messageIds.cleanUp()} which calls flowComplete for anything in the store. Note that Publish will already return an error if c.options.AutoReconnect is false and the connection is down.

In terms of Publish() this currently attempts to add the message to the outbound channel and fails (with token.setError() which, to my view, is incorrect when AutoReconnect is true). As this is an unbuffered channel that means that the messages should not be picked up if the connection is down (so you should get the error if the connection does not come up in time to transmit the message).

Thus, this PR does not modify the logic of the Publish method, just fixes an issue with uncompleted token and Publish unlock.

My concern is that I think that this does change the logic somewhat. Generally something that goes wrong after the outgoing message is persisted is not considered to be an error (this predates my involvement but I guess the logic is that the message will be delivered eventually). So (with c.options.CleanSession = false) the token should be completed by flowComplete (in messageIds.cleanUp()) whereas with your change an error will now be returned. Whilst it is possible to detect that error doing so requires a change to existing code so I would consider this to be a breaking change (not a huge deal but something that does require consideration because this code is quite widely used; I would certainly need to update my code to avoid logging these non-errors). I don't see any issue with calling flowComplete in Publish because that just means the call is made a bit earlier than it otherwise would (so should not break any existing code) but that might not accomplish your goals?

I suspect that I have missed something in the above (or not fully understood what you are trying to accomplish) but hopefully the above helps set the scene a bit.

Note: I do not have commit rights to this project (believe that @alsm is currently the only active committer).

@oleksmir
Copy link
Author

oleksmir commented Sep 1, 2020

Thank you @MattBrittan for the quick response.

which then calls old.flowComplete() which will close the token (without reporting an error)

The problem here is that token is never persisted to the store. According to the source code of the publish method, we create a token besides the publish packet. The binding between packet and token is taking place on this line:

case c.obound <- &PacketAndToken{p: pub, t: token}:

and works for the connected case only. In other cases, returning token does not make any sense.

If the connection in the connecting or reconnecting status we will return a token that will never be stored anywhere. No one will complete its flow. Also, the Publish method does not guarantee connection status to be valid in the time we do the second status check. Since we use atomics we can lead to a situation when we have connected status at the beginning of the method but then reconnecting status at the end. In this case, no error will be returned at all.

Maybe I am missing something but I did not get the way token can be restored and completed as 'old'. Even if the connection is reestablished, the code is continue being locked on the Wait().

@MattBrittan
Copy link
Contributor

If the connection in the connecting or reconnecting status we will return a token that will never be stored anywhere.

The token should be persisted here :

persistOutbound(c.persist, pub)

As the message has also been allocated an id the token is in the message id map and, as such, flowComplete() should be called when the connection comes up (see step-by-step process below).

As you say if the connection status is connecting or reconnecting at the time Publish() is called then the message will not be put on the queue (sorry - I was not clear in my initial response; when I said "In terms of Publish() this currently attempts..." I was referring to calls to publish that were already in progress at the time the connection was lost).

Maybe I am missing something but I did not get the way token can be restored and completed as 'old'. Even if the connection is reestablished, the code is continue being locked on the Wait().

Here is a run through (doing this to ensure I understand it as much as to communicate it to you); note that some of this depends upon your options which can make diagnosis without sample code difficult:

  • In Publish() the message is added to the message ID map and stored to whatever store you have setup (see above).
  • When the connection comes back up resume() is called here
  • Resume will find the packets.PublishPacket in the store and call c.claimID(token, details.MessageID) here
  • claimID() uses the message ID to check the message ID map and, if a token exists, calls flowComplete as follows:
mids.Lock()
defer mids.Unlock()
if _, ok := mids.index[id]; !ok {
	mids.index[id] = token
} else {
	old := mids.index[id]
	old.flowComplete()
	mids.index[id] = token
}
if id > mids.lastIssuedID {
	mids.lastIssuedID = id
}
  • The message is then resubmitted (an internal token is used so there is no way for you to get a further update).

So I think that this covers off storing the message, re-sending after the reconnection and freeing existing tokens. However I do have a niggling feeling that there is a bug in there somewhere (We probably should be doing the c.claimID() thing for PubrelPacket but as I'm not using QOS=2 I have not encountered issues with that).

Note: The whole token system is somewhat problematic which is why @alsm has dropped it in the MQTT v5 client.

@oleksmir
Copy link
Author

oleksmir commented Sep 1, 2020

@MattBrittan Thank you for the detailed explanation. Much appreciate.

@oleksmir oleksmir closed this Sep 13, 2020
@oleksmir oleksmir deleted the fix/publish-token-complete-flow branch September 13, 2020 21:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants