-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 5969] prevent redelivery of acked batch message at the client api #5990
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is on the client side I assume you will want to do it in the clients for other languages as well, right?
* @param messageId batchMessageIdImpl | ||
* @return boolean isAllMsgAcked for the batch | ||
*/ | ||
public boolean ack (BatchMessageIdImpl messageId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be called from multiple threads? If so I think there are some potential concurrency issues with this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concurrent access is managed by the underline batchedAcked concurrent map and synchronized set. I try to manage the locks at the lowest data structure level as possible.
|
||
// a map of partial acked batch and its messages already acked | ||
@VisibleForTesting | ||
Map<String, Set<BatchMessageIdImpl>> ackedBatches = new HashMap<String, Set<BatchMessageIdImpl>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a missing ack, it can result in the client using a lot of memory for storing acked batches. Why can't we leverage existing AcknowledgmentGroupingTracker for doing that job? It already has information about acked messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it uses Set of integer to track the acked batch. So the memory footprint is small. I think it can even use bitset like the BatchAcker. I try to keep this isolated and testable from AcknowledgmentGroupingTracker. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I think we should go with bitset. Otherwise, the memory usage can become out of control.
- Since the logic here seems to be duplicating the functionality of AcknowledgmentGroupingTracker. I would prefer not adding duplicated classes. It is confusing and hard to maintain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie Updated the code with BitSet.
Yes. Java client is used to validate the logic in this PR. Will be expand to C++ client in another PR. |
@zzzming In shared subscription mode, this will cause more confusion for users, because messages can be redelivery to other consumers. The status of each consumer is not synchronized. If this approach only work with single active consumer subscription mode, i think we can solve it better on the server side, client filter messages based on information provided by the broker. So that, we can keep the client side simple, This is better for clients in different languages. And if we solve this problem from the server side in the future(I prefer this solution #5969 (comment) ), the current PR mechanism will also be discarded, so we should consider whether it is necessary. |
@codelipenghui Thanks for the update. I'll look forward to the broker side implementation. Do we have an ETA on this? |
@zzzming:Thanks for your contribution. For this PR, do we need to update docs? |
Closed by #6052 |
Motivation
To address redelivered of acked batch message described by #5969
This is only a client side change. It covers cases of not-to-deliver-acked batch message on the client side only. Since it tracks acked messages of partial acked batch in memory, restart of client will lose the state tracking.
Modifications
A new BatchAckedTracker is added to track all acked message in any partially acked batches. Redelivery of unacked batch will be evaluated against this tracker so that it sends only unacked messages to the client in ConsumerImpl. When all messages are acked by the client in redelivery, ConsumerImpl sends the batch ack to the broker.
This only addresses the issue in Java client. C++, and standalone Go clients updates will be tracked in separated PR.
Verifying this change
Three new unit tests are introduced.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation