New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker]Fix failover/exclusive consumer cumulate ack not remove msg from UnAckedMessageTracker #18407
Conversation
Codecov Report
@@ Coverage Diff @@
## master #18407 +/- ##
============================================
- Coverage 47.67% 46.60% -1.08%
+ Complexity 9333 7687 -1646
============================================
Files 618 463 -155
Lines 58586 51659 -6927
Branches 6098 5514 -584
============================================
- Hits 27933 24077 -3856
+ Misses 27623 24779 -2844
+ Partials 3030 2803 -227
Flags with carried forward coverage won't be shown. Click here to find out more.
|
@@ -458,7 +458,8 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack | |||
Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); | |||
if (individualConsumer != null) { | |||
MessageId innerId = topicMessageId.getInnerMessageId(); | |||
return individualConsumer.acknowledgeCumulativeAsync(innerId); | |||
return individualConsumer.acknowledgeCumulativeAsync(innerId) | |||
.thenAccept(__ -> unAckedMessageTracker.remove(topicMessageId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.thenAccept(__ -> unAckedMessageTracker.remove(topicMessageId)); | |
.thenAccept(__ -> unAckedMessageTracker.removeMessagesTill(topicMessageId)); |
.acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS) | ||
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest) | ||
.subscribe(); | ||
final MessageId msgId = producer.send("1".getBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we send more than one messages and ack the last one to ensure unAckedMessageTracker.removeMessagesTill
works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about doReconsumeLater
method? and also, I'm wondering if this can work properly with the partitioned topic.
Consider this condition:
- We have 4 partitioned topics. the ledgers are
ledger id: 1
-> partition 1, 'ledger id: 2' ->partition 2
... - If one of the partitions, for example, partition-2 create a new ledger that id=5.
- At that time, the producer sends the message in round-robin mode. So, the message id sequence may be like [1:0, 1:1, 5:0, 5:1, 5:2, 5:3, 3:0, 3:1, 4:1, 4:2] in the consumer incoming queue.
- if we try to cumulative ack message 5:0, we will clear [3:0, 3:1, 4:1] in the
UnAckedMessageTracker
.
I'm not sure if I missing some other logic to avoid this condition. but I think I have to mention it here.
return individualConsumer.acknowledgeCumulativeAsync(innerId) | ||
.thenAccept(__ -> unAckedMessageTracker.removeMessagesTill(topicMessageId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unAckedMessageTracker
will have the message IDs from all partitions. If we use removeMessagesTill
here, the message acknowledgment for partition-0 will also remove the message IDs of other partitions, right?
9c168e9
to
326b1cf
Compare
@@ -461,7 +461,8 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack | |||
Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); | |||
if (individualConsumer != null) { | |||
MessageId innerId = topicMessageId.getInnerMessageId(); | |||
return individualConsumer.acknowledgeCumulativeAsync(innerId); | |||
return individualConsumer.acknowledgeCumulativeAsync(innerId) | |||
.thenAccept(__ -> unAckedMessageTracker.remove(topicMessageId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cumulativeAck, not every message will invoke, so use remove can't remove all. if use removeMessagsTill(), all partitioned topics will be removed, so its better to find a better way to handle this case
Motivation
If the consumer subscription type is
Failover
, orExclusive
, when do cumulate ack, the msg is not removed from UnAckedMessageTracker, causing the msg to be triggered redeliver by ack timeout.,
Modifications
Remove the msg from UnAckedMessageTracker when complete
acknowledgeCumulativeAsync
Documentation
doc
doc-required
doc-not-needed
doc-complete