[fix][broker] Move pending acks cleanup to selected mark-delete callbacks#25592
[fix][broker] Move pending acks cleanup to selected mark-delete callbacks#25592nodece wants to merge 1 commit intoapache:masterfrom
Conversation
| Position mdPos = cursor.getMarkDeletedPosition(); | ||
| if (mdPos != null) { | ||
| dispatcher.prunePendingAcksUpToPosition(mdPos.getLedgerId(), mdPos.getEntryId()); | ||
| } | ||
| // If the subscription is a Key_Shared subscription, we should to trigger message dispatch. | ||
| if (subscription.getType() == SubType.Key_Shared) { | ||
| dispatcher.markDeletePositionMoveForward(); | ||
| } |
There was a problem hiding this comment.
instead of adding a separate prunePendingAcksUpToPosition, leave the pruning as an implementation of the dispatcher. Modify the existing markDeletePositionMoveForward so that it contains the markDeletedPosition argument. In the implementation, the pruning can be handled for Key_Shared.
| /** | ||
| * Prune pending-ack entries up to the specified position and updates the | ||
| * consumer unacked-message counters accordingly. | ||
| * | ||
| * <p>This hook is invoked after the cursor mark-delete operation completes | ||
| * (for example, during message expiry, skip, or clear backlog). Since the | ||
| * cursor ack set may no longer be available after mark-delete, the counter | ||
| * adjustment relies on the remaining unacked count stored in the | ||
| * {@code PendingAcksMap} entries. | ||
| * | ||
| * @param ledgerId the ledger ID of the inclusive upper bound position | ||
| * @param entryId the entry ID of the inclusive upper bound position | ||
| */ | ||
| default void prunePendingAcksUpToPosition(long ledgerId, long entryId) { | ||
| // No-op by default | ||
| } |
There was a problem hiding this comment.
instead of adding a separate prunePendingAcksUpToPosition, leave the pruning as an implementation of the dispatcher. Modify the existing markDeletePositionMoveForward so that it contains the markDeletedPosition argument. In the implementation, the pruning can be handled for Key_Shared.
|
|
||
| // remove possible expired messages from redelivery tracker and pending acks | ||
| Position markDeletePosition = cursor.getMarkDeletedPosition(); | ||
| if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition) { |
There was a problem hiding this comment.
Another detail is that trimming (based on retention) doesn't currently trigger the callbacks at all. Trimming is handled by the org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl#internalTrimLedgers method which calls advanceCursorsIfNecessary (btw. The javadoc of the advanceCursorsIfNecessary method is misleading. For some reason, the javadoc added in https://github.com/apache/pulsar/pull/10667/changes hasn't made it to the code).
Trimming based on retention should also be handled before we can remove this code.
There was a problem hiding this comment.
Even the current code doesn't currently handle trimming correctly since trimming should trigger state cleanup proactively. However, it would cleanup state correctly. (In some cases that's sufficient, in some cases it's not)
There was a problem hiding this comment.
There should be a callback for ManagedCursor for deletions that happen by retention trimming.
Motivation
Pending acks cleanup was previously executed in the
readMoreEntriesdispatch path. This introduces unnecessary per-dispatch overhead in a hot path and also risks concurrent access to the same pending-ack entries from both dispatch and ack threads.In addition, ack-triggered mark-delete operations are very frequent and should not carry extra cleanup work. Cleanup is only required when messages are removed due to expiry, skip, or clear-backlog operations, where mark-delete completion semantics are more appropriate.
Modifications
readMoreEntrieshot path in dispatcher.prunePendingAcksUpToPosition()hook in dispatcher.PendingAcksMapto storeremainingUnackedinstead of batch size.remainingUnackeddirectly for accurate counter updates without relying on cursor state after mark-delete.