Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,18 @@ public Future<Void> sendMessages(final List<? extends Entry> entries,
} else {
stickyKeyHash = stickyKeyHashes.get(i);
}
boolean sendingAllowed =
pendingAcks.addPendingAckIfAllowed(entry.getLedgerId(), entry.getEntryId(), batchSize,
stickyKeyHash);
boolean sendingAllowed;
long[] ackSet = batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i);
int remainingUnacked;
if (ackSet != null) {
remainingUnacked = BitSet.valueOf(ackSet).cardinality();
unackedMessages -= (batchSize - remainingUnacked);
} else {
remainingUnacked = batchSize;
}
sendingAllowed =
pendingAcks.addPendingAckIfAllowed(entry.getLedgerId(), entry.getEntryId(),
remainingUnacked, stickyKeyHash);
if (!sendingAllowed) {
// sending isn't allowed when pending acks doesn't accept adding the entry
// this happens when Key_Shared draining hashes contains the stickyKeyHash
Expand All @@ -401,10 +410,6 @@ public Future<Void> sendMessages(final List<? extends Entry> entries,
.attr("batchSize", batchSize)
.log("Skipping sending of entry since adding to pending acks failed");
} else {
long[] ackSet = batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i);
if (ackSet != null) {
unackedMessages -= (batchSize - BitSet.valueOf(ackSet).cardinality());
}
log.debug()
.attr("ledgerId", entry.getLedgerId())
.attr("entryId", entry.getEntryId())
Expand Down Expand Up @@ -596,6 +601,10 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
.syncBatchPositionBitSetForPendingAck(position);
}
}
if (ackedCount > 0) {
ackOwnerConsumer.getPendingAcks().updateRemainingUnacked(
position.getLedgerId(), position.getEntryId(), (int) ackedCount);
}
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
} else {
position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId());
Expand Down Expand Up @@ -679,6 +688,12 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
}
AckSetStateUtil.getAckSetState(position).setAckSet(ackSets);
ackedCount = getAckedCountForTransactionAck(batchSize, ackSets);
// Update the stored remaining unacked count for this entry so that
// removal paths (expire, skip, redeliver) can use it directly.
if (ackedCount > 0) {
ackOwnerConsumer.getPendingAcks().updateRemainingUnacked(
position.getLedgerId(), position.getEntryId(), (int) ackedCount);
}
}

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
Expand Down Expand Up @@ -747,19 +762,6 @@ private long getAckedCountForTransactionAck(int batchSize, long[] ackSets) {
return ackedCount;
}

private long getUnAckedCountForBatchIndexLevelEnabled(Position position, int batchSize) {
long unAckedCount = batchSize;
if (isAcknowledgmentAtBatchIndexLevelEnabled) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet);
unAckedCount = cursorBitSet.cardinality();
cursorBitSet.recycle();
}
}
return unAckedCount;
}

private void checkAckValidationError(CommandAck ack, Position position) {
if (ack.hasValidationError()) {
log.warn()
Expand Down Expand Up @@ -1139,9 +1141,8 @@ public void removePendingAcksUpToPositionAndDecrementUnacked(long markDeleteLedg

MutableInt mutableTotalUnacked = new MutableInt(0);
pendingAcks.removeAllUpTo(markDeleteLedgerId, markDeleteEntryId,
(ledgerId, entryId, batchSize, stickyKeyHash) -> {
mutableTotalUnacked.add((int) getUnAckedCountForBatchIndexLevelEnabled(
PositionFactory.create(ledgerId, entryId), batchSize));
(ledgerId, entryId, remainingUnacked, stickyKeyHash) -> {
mutableTotalUnacked.add(remainingUnacked);
});
int totalUnacked = mutableTotalUnacked.intValue();
if (totalUnacked > 0) {
Expand All @@ -1160,11 +1161,8 @@ public void redeliverUnacknowledgedMessages(long consumerEpoch) {
if (pendingAcks != null) {
List<Position> pendingPositions = new ArrayList<>((int) pendingAcks.size());
MutableInt totalRedeliveryMessages = new MutableInt(0);
pendingAcks.forEachAndClear((ledgerId, entryId, batchSize, stickyKeyHash) -> {
int unAckedCount =
(int) getUnAckedCountForBatchIndexLevelEnabled(PositionFactory.create(ledgerId, entryId),
batchSize);
totalRedeliveryMessages.add(unAckedCount);
pendingAcks.forEachAndClear((ledgerId, entryId, remainingUnacked, stickyKeyHash) -> {
totalRedeliveryMessages.add(remainingUnacked);
pendingPositions.add(PositionFactory.create(ledgerId, entryId));
});

Expand Down Expand Up @@ -1193,8 +1191,7 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
Position position = PositionFactory.create(msg.getLedgerId(), msg.getEntryId());
IntIntPair pendingAck = pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId());
if (pendingAck != null) {
int unAckedCount = (int) getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt());
totalRedeliveryMessages += unAckedCount;
totalRedeliveryMessages += pendingAck.leftInt();
pendingPositions.add(position);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,23 @@ default boolean checkAndUnblockIfStuck() {
return false;
}

/**
* Prune pending-ack entries up to the specified position and updates the
* consumer unacked-message counters accordingly.
*
* <p>This hook is invoked after the cursor mark-delete operation completes
* (for example, during message expiry, skip, or clear backlog). Since the
* cursor ack set may no longer be available after mark-delete, the counter
* adjustment relies on the remaining unacked count stored in the
* {@code PendingAcksMap} entries.
*
* @param ledgerId the ledger ID of the inclusive upper bound position
* @param entryId the entry ID of the inclusive upper bound position
*/
default void prunePendingAcksUpToPosition(long ledgerId, long entryId) {
// No-op by default
}
Comment on lines +145 to +160
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of adding a separate prunePendingAcksUpToPosition, leave the pruning as an implementation of the dispatcher. Modify the existing markDeletePositionMoveForward so that it contains the markDeletedPosition argument. In the implementation, the pruning can be handled for Key_Shared.


/**
* A callback hook after acknowledge messages.
* @param exOfDeletion the ex of {@link org.apache.bookkeeper.mledger.ManagedCursor#asyncDelete},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ public interface PendingAcksConsumer {
/**
* Accept a pending acknowledgment.
*
* @param ledgerId the ledger ID
* @param entryId the entry ID
* @param batchSize the batch size
* @param stickyKeyHash the sticky key hash
* @param ledgerId the ledger ID
* @param entryId the entry ID
* @param remainingUnacked the number of remaining unacked messages in this entry
* (accounts for batch index level acknowledgments)
* @param stickyKeyHash the sticky key hash
*/
void accept(long ledgerId, long entryId, int batchSize, int stickyKeyHash);
void accept(long ledgerId, long entryId, int remainingUnacked, int stickyKeyHash);
}

private final Consumer consumer;
Expand Down Expand Up @@ -122,11 +123,12 @@ public interface PendingAcksConsumer {
*
* @param ledgerId the ledger ID
* @param entryId the entry ID
* @param batchSize the batch size
* @param remainingUnacked the number of remaining unacked messages in this entry
* (for batch entries with some indexes already acked, this may be less than batchSize)
* @param stickyKeyHash the sticky key hash
* @return true if the pending ack was added, and it's allowed to send a message, false otherwise
*/
public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int batchSize, int stickyKeyHash) {
public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remainingUnacked, int stickyKeyHash) {
try {
writeLock.lock();
// prevent adding sticky hash to pending acks if the PendingAcksMap has already been closed
Expand All @@ -143,7 +145,7 @@ public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int batchSize
}
TreeMap<Long, IntIntPair> ledgerPendingAcks =
pendingAcks.computeIfAbsent(ledgerId, k -> new TreeMap<>());
ledgerPendingAcks.put(entryId, IntIntPair.of(batchSize, stickyKeyHash));
ledgerPendingAcks.put(entryId, IntIntPair.of(remainingUnacked, stickyKeyHash));
return true;
} finally {
writeLock.unlock();
Expand Down Expand Up @@ -311,6 +313,34 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH
}
}

/**
* Atomically update the remaining unacked count for a pending ack entry by subtracting the given delta.
* Called from the ack handler after computing the number of batch indexes acknowledged in a partial ack.
*
* @param ledgerId the ledger ID
* @param entryId the entry ID
* @param ackedDelta the number of batch indexes that were just acknowledged
* @return true if the entry was found and updated, false otherwise
*/
public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta) {
try {
writeLock.lock();
TreeMap<Long, IntIntPair> ledgerMap = pendingAcks.get(ledgerId);
if (ledgerMap == null) {
return false;
}
IntIntPair current = ledgerMap.get(entryId);
if (current == null) {
return false;
}
int newRemaining = current.leftInt() - ackedDelta;
ledgerMap.put(entryId, IntIntPair.of(newRemaining, current.rightInt()));
return true;
} finally {
writeLock.unlock();
}
}

/**
* Remove the pending ack for the given ledger ID and entry ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis
protected enum ReadType {
Normal, Replay
}
private Position lastMarkDeletePositionBeforeReadMoreEntries;
private volatile long readMoreEntriesCallCount;

public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Expand Down Expand Up @@ -362,17 +361,6 @@ public synchronized void readMoreEntries() {
// increment the counter for readMoreEntries calls, to track the number of times readMoreEntries is called
readMoreEntriesCallCount++;

// remove possible expired messages from redelivery tracker and pending acks
Position markDeletePosition = cursor.getMarkDeletedPosition();
if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another detail is that trimming (based on retention) doesn't currently trigger the callbacks at all. Trimming is handled by the org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl#internalTrimLedgers method which calls advanceCursorsIfNecessary (btw. The javadoc of the advanceCursorsIfNecessary method is misleading. For some reason, the javadoc added in https://github.com/apache/pulsar/pull/10667/changes hasn't made it to the code).

Trimming based on retention should also be handled before we can remove this code.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even the current code doesn't currently handle trimming correctly since trimming should trigger state cleanup proactively. However, it would cleanup state correctly. (In some cases that's sufficient, in some cases it's not)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a callback for ManagedCursor for deletions that happen by retention trimming.

redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId());
for (Consumer consumer : consumerList) {
consumer.removePendingAcksUpToPositionAndDecrementUnacked(
markDeletePosition.getLedgerId(), markDeletePosition.getEntryId());
}
lastMarkDeletePositionBeforeReadMoreEntries = markDeletePosition;
}

// totalAvailablePermits may be updated by other threads
int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
Expand Down Expand Up @@ -600,6 +588,14 @@ public CopyOnWriteArrayList<Consumer> getConsumers() {
return consumerList;
}

@Override
public void prunePendingAcksUpToPosition(long ledgerId, long entryId) {
redeliveryMessages.removeAllUpTo(ledgerId, entryId);
for (Consumer consumer : consumerList) {
consumer.removePendingAcksUpToPositionAndDecrementUnacked(ledgerId, entryId);
}
}

@Override
public synchronized boolean canUnsubscribe(Consumer consumer) {
return consumerList.size() == 1 && consumerSet.contains(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
Expand Down Expand Up @@ -211,9 +212,18 @@ public void markDeleteComplete(Object ctx) {
long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog(false);
msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value stats */);
totalMsgExpired.add(numMessagesExpired);
// If the subscription is a Key_Shared subscription, we should to trigger message dispatch.
if (subscription != null && subscription.getType() == SubType.Key_Shared) {
subscription.getDispatcher().markDeletePositionMoveForward();
if (subscription != null) {
Dispatcher dispatcher = subscription.getDispatcher();
if (dispatcher != null) {
Position mdPos = cursor.getMarkDeletedPosition();
if (mdPos != null) {
dispatcher.prunePendingAcksUpToPosition(mdPos.getLedgerId(), mdPos.getEntryId());
}
// If the subscription is a Key_Shared subscription, we should to trigger message dispatch.
if (subscription.getType() == SubType.Key_Shared) {
dispatcher.markDeletePositionMoveForward();
}
Comment on lines +218 to +225
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of adding a separate prunePendingAcksUpToPosition, leave the pruning as an implementation of the dispatcher. Modify the existing markDeletePositionMoveForward so that it contains the markDeletedPosition argument. In the implementation, the pruning can be handled for Key_Shared.

}
}
expirationCheckInProgress = FALSE;
log.debug()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,10 @@ public void clearBacklogComplete(Object ctx) {
future.complete(null);
}
});
Position mdPos = cursor.getMarkDeletedPosition();
if (mdPos != null) {
dispatcher.prunePendingAcksUpToPosition(mdPos.getLedgerId(), mdPos.getEntryId());
}
dispatcher.afterAckMessages(null, ctx);
} else {
future.complete(null);
Expand Down Expand Up @@ -837,6 +841,10 @@ public void skipEntriesComplete(Object ctx) {
.log("Skipped messages");
future.complete(null);
if (dispatcher != null) {
Position mdPos = cursor.getMarkDeletedPosition();
if (mdPos != null) {
dispatcher.prunePendingAcksUpToPosition(mdPos.getLedgerId(), mdPos.getEntryId());
}
dispatcher.afterAckMessages(null, ctx);
}
}
Expand Down
Loading