diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 15074f28cc2ea..3a0bf99eb6d63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -1155,15 +1155,12 @@ public int getPriorityLevel() { } public void redeliverUnacknowledgedMessages(long consumerEpoch) { - // cleanup unackedMessage bucket and redeliver those unack-msgs again - clearUnAckedMsgs(); - blockedConsumerOnUnackedMsgs = false; log.debug("Consumer received redelivery"); if (pendingAcks != null) { List pendingPositions = new ArrayList<>((int) pendingAcks.size()); MutableInt totalRedeliveryMessages = new MutableInt(0); - pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { + pendingAcks.forEachAndClear((ledgerId, entryId, batchSize, stickyKeyHash) -> { int unAckedCount = (int) getUnAckedCountForBatchIndexLevelEnabled(PositionFactory.create(ledgerId, entryId), batchSize); @@ -1171,15 +1168,18 @@ public void redeliverUnacknowledgedMessages(long consumerEpoch) { pendingPositions.add(PositionFactory.create(ledgerId, entryId)); }); - for (Position p : pendingPositions) { - pendingAcks.remove(p.getLedgerId(), p.getEntryId()); + if (totalRedeliveryMessages.intValue() > 0) { + addAndGetUnAckedMsgs(this, -totalRedeliveryMessages.intValue()); } + blockedConsumerOnUnackedMsgs = false; msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), totalRedeliveryMessages.intValue()); msgRedeliverCounter.add(totalRedeliveryMessages.intValue()); subscription.redeliverUnacknowledgedMessages(this, pendingPositions); } else { + clearUnAckedMsgs(); + blockedConsumerOnUnackedMsgs = false; subscription.redeliverUnacknowledgedMessages(this, consumerEpoch); } @@ -1191,10 +1191,9 @@ public void redeliverUnacknowledgedMessages(List messageIds) { List pendingPositions = new ArrayList<>(); for (MessageIdData msg : messageIds) { Position position = PositionFactory.create(msg.getLedgerId(), msg.getEntryId()); - IntIntPair pendingAck = pendingAcks.get(position.getLedgerId(), position.getEntryId()); + IntIntPair pendingAck = pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId()); if (pendingAck != null) { int unAckedCount = (int) getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt()); - pendingAcks.remove(position.getLedgerId(), position.getEntryId()); totalRedeliveryMessages += unAckedCount; pendingPositions.add(position); } @@ -1212,16 +1211,7 @@ public void redeliverUnacknowledgedMessages(List messageIds) { msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages); msgRedeliverCounter.add(totalRedeliveryMessages); - int numberOfBlockedPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0); - - // if permitsReceivedWhileConsumerBlocked has been accumulated then pass it to Dispatcher to flow messages - if (numberOfBlockedPermits > 0) { - MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits); - log.debug() - .attr("numberOfBlockedPermits", numberOfBlockedPermits) - .log("Added blockedPermits to consumer's messagePermits"); - subscription.consumerFlow(this, numberOfBlockedPermits); - } + flowConsumerBlockedPermits(this); } public Subscription getSubscription() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 46fb7f6e6c887..8be69aa7879e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -202,9 +202,26 @@ private void processPendingAcks(PendingAcksConsumer processor) { * @param processor the processor to handle each pending ack */ public void forEachAndClose(PendingAcksConsumer processor) { + internalForEachAndClear(processor, true); + } + + /** + * Iterate over all the pending acks and clear the map. + * Unlike {@link #forEachAndClose(PendingAcksConsumer)}, this method does not close the map, + * so new entries can still be added after this method returns. + * + * @param processor the processor to handle each pending ack + */ + public void forEachAndClear(PendingAcksConsumer processor) { + internalForEachAndClear(processor, false); + } + + private void internalForEachAndClear(PendingAcksConsumer processor, boolean close) { try { writeLock.lock(); - closed = true; + if (close) { + closed = true; + } PendingAcksRemoveHandler pendingAcksRemoveHandler = pendingAcksRemoveHandlerSupplier.get(); if (pendingAcksRemoveHandler != null) { try { @@ -323,6 +340,35 @@ public boolean remove(long ledgerId, long entryId) { } } + /** + * Atomically remove and return the pending ack for the given ledger ID and entry ID. + * Unlike {@link #remove(long, long)}, this method returns the removed entry so the caller + * can access the batch size and sticky key hash without a separate get operation. + * + * @param ledgerId the ledger ID + * @param entryId the entry ID + * @return the removed entry as an IntIntPair (batchSize, stickyKeyHash), or null if not found + */ + public IntIntPair removeAndGet(long ledgerId, long entryId) { + try { + writeLock.lock(); + TreeMap ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return null; + } + IntIntPair removedEntry = ledgerMap.remove(entryId); + if (removedEntry != null) { + handleRemovePendingAck(ledgerId, entryId, removedEntry.rightInt()); + } + if (removedEntry != null && ledgerMap.isEmpty()) { + pendingAcks.remove(ledgerId); + } + return removedEntry; + } finally { + writeLock.unlock(); + } + } + /** * Remove all pending acks up to the given ledger ID and entry ID, invoking a callback for each removed entry. * diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java index 8db0e3a0f73e6..02bf098561c1d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java @@ -31,6 +31,7 @@ import static org.testng.Assert.assertTrue; import java.util.ArrayList; import java.util.List; +import org.apache.pulsar.common.util.collections.IntIntPair; import org.testng.annotations.Test; public class PendingAcksMapTest { @@ -218,4 +219,84 @@ public void size_ReturnsCorrectSize() { assertEquals(pendingAcksMap.size(), 3); } + + @Test + public void forEachAndClear_ProcessesAndClearsAllPendingAcks() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123); + pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124); + + List processedEntries = new ArrayList<>(); + pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, stickyKeyHash) -> processedEntries.add(entryId)); + + assertEquals(processedEntries, List.of(1L, 2L)); + assertEquals(pendingAcksMap.size(), 0); + } + + @Test + public void forEachAndClear_AllowsAddingAfterClear() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123); + + pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, stickyKeyHash) -> {}); + + // Unlike forEachAndClose, forEachAndClear should allow new additions + boolean result = pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124); + assertTrue(result); + assertTrue(pendingAcksMap.contains(1L, 2L)); + } + + @Test + public void forEachAndClear_InvokesRemoveHandler() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap.PendingAcksRemoveHandler removeHandler = mock(PendingAcksMap.PendingAcksRemoveHandler.class); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> removeHandler); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123); + pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124); + + pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, stickyKeyHash) -> {}); + + verify(removeHandler).startBatch(); + verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false); + verify(removeHandler).handleRemoving(consumer, 1L, 2L, 124, false); + verify(removeHandler).endBatch(); + } + + @Test + public void removeAndGet_RemovesAndReturnsEntry() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123); + + IntIntPair result = pendingAcksMap.removeAndGet(1L, 1L); + + assertTrue(result != null); + assertEquals(result.leftInt(), 5); + assertEquals(result.rightInt(), 123); + assertFalse(pendingAcksMap.contains(1L, 1L)); + } + + @Test + public void removeAndGet_ReturnsNullWhenNotFound() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + + IntIntPair result = pendingAcksMap.removeAndGet(1L, 1L); + + assertTrue(result == null); + } + + @Test + public void removeAndGet_InvokesRemoveHandler() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap.PendingAcksRemoveHandler removeHandler = mock(PendingAcksMap.PendingAcksRemoveHandler.class); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> removeHandler); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123); + + pendingAcksMap.removeAndGet(1L, 1L); + + verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false); + } } \ No newline at end of file