[fix][broker] Fix consumer receive individual acknowledged messages from compacted topic after reconnection#23495
[fix][broker] Fix consumer receive individual acknowledged messages from compacted topic after reconnection#23495summeriiii wants to merge 1 commit intoapache:masterfrom
Conversation
…rom compacted topic after reconnection
|
@coderzc PTAL~ |
| List<Entry> unAckedEntries = new ArrayList<>(); | ||
| long entriesSize = 0; | ||
| for (Entry entry : entries) { | ||
| entriesSize += entry.getLength(); | ||
| if (COMPACTION_CURSOR_NAME.equals(cursor.getName())) { | ||
| for (Entry entry : entries) { | ||
| entriesSize += entry.getLength(); | ||
| } | ||
| unAckedEntries = entries; | ||
| } else { | ||
| for (Entry entry : entries) { | ||
| Position position = entry.getPosition(); | ||
| if (!cursor.isMessageIndividualDeleted(position)) { | ||
| unAckedEntries.add(entry); | ||
| entriesSize += entry.getLength(); | ||
| } | ||
| } | ||
| } | ||
| cursor.updateReadStats(entries.size(), entriesSize); | ||
| cursor.updateReadStats(unAckedEntries.size(), entriesSize); | ||
|
|
||
| Entry lastEntry = entries.get(entries.size() - 1); | ||
| cursor.seek(lastEntry.getPosition().getNext(), true); | ||
| callback.readEntriesComplete(entries, readEntriesCtx); | ||
| callback.readEntriesComplete(unAckedEntries, readEntriesCtx); |
There was a problem hiding this comment.
This will cause these messages to be lost the next time compaction is run, and other consumers will not be able to read these messages. so why use individual ack for Exclusive mode?
There was a problem hiding this comment.
-
for the compaction_cursor_name
__compaction, we don't skip the individual acked position, this will not affect the compactionif (COMPACTION_CURSOR_NAME.equals(cursor.getName())) { for (Entry entry : entries) { entriesSize += entry.getLength(); } unAckedEntries = entries; }
-
The individual ack is not recommended in Exclusive mode? I don't know about this, I always use this before😂
There was a problem hiding this comment.
for the compaction_cursor_name __compaction , we don't skip the individual acked position, this will not affect the compaction
OK, I see
The individual ack is not recommended in Exclusive mode? I don't know about this, I always use this before😂
Yes, for Exclusive mode, suggest using acknowledgeCumulative acknowledge message. actually, if every message is acknowledged, then acknowledgeCumulative and individual acknowledgment have the same effect, for this cause, I think #21187 has fixed it. But if the acknowledged messages are discontinuous then we can receive acknowledged messages, due to the compaction delete some message, using individual acknowledged will make acknowledged messages are discontinuous
There was a problem hiding this comment.
@summeriiii I think for the compacted topic, we cannot use individual acknowledgment, otherwise the entry will not be deleted because the messages deleted by compact will never be acknowledged. so I suggest using acknowledgeCumulative acknowledge message for compacted topic.
|
|
||
| boolean isMessageDeleted(Position position); | ||
|
|
||
| boolean isMessageIndividualDeleted(Position position); |
There was a problem hiding this comment.
I think we can using isMessageDeleted instead
There was a problem hiding this comment.
I tried to use isMessageDeleted before, but the markDeletePosition is not correct in some cases, eg: expire messages(CompactionTest#testCompactionWithTTL), so I add a new method isMessageIndividualDeleted only to check individualDeletedMessages
Fixes #23494
Motivation
After consumed the compacted topic and individual acknowledged messages, if the topic unload and consumer reconnect, we will receive the individual acknowledged messages again.
Like the none-compacted topic, I think that consumer should not receive acknowledge message after reconnection.
Modifications
isMessageIndividualDeletedto check if this message has been individual acknowledgedCompactedTopicUtils#asyncReadCompactedEntries, use isMessageIndividualDeleted to filter out and skip unnecessary entryDocumentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: