Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1155,31 +1155,31 @@ public int getPriorityLevel() {
}

public void redeliverUnacknowledgedMessages(long consumerEpoch) {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
clearUnAckedMsgs();
blockedConsumerOnUnackedMsgs = false;
log.debug("Consumer received redelivery");

if (pendingAcks != null) {
List<Position> pendingPositions = new ArrayList<>((int) pendingAcks.size());
MutableInt totalRedeliveryMessages = new MutableInt(0);
pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> {
pendingAcks.forEachAndClear((ledgerId, entryId, batchSize, stickyKeyHash) -> {
int unAckedCount =
(int) getUnAckedCountForBatchIndexLevelEnabled(PositionFactory.create(ledgerId, entryId),
batchSize);
totalRedeliveryMessages.add(unAckedCount);
pendingPositions.add(PositionFactory.create(ledgerId, entryId));
});

for (Position p : pendingPositions) {
pendingAcks.remove(p.getLedgerId(), p.getEntryId());
if (totalRedeliveryMessages.intValue() > 0) {
addAndGetUnAckedMsgs(this, -totalRedeliveryMessages.intValue());
}
blockedConsumerOnUnackedMsgs = false;

msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), totalRedeliveryMessages.intValue());
msgRedeliverCounter.add(totalRedeliveryMessages.intValue());

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
} else {
clearUnAckedMsgs();
blockedConsumerOnUnackedMsgs = false;
subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
}

Expand All @@ -1191,10 +1191,9 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
List<Position> pendingPositions = new ArrayList<>();
for (MessageIdData msg : messageIds) {
Position position = PositionFactory.create(msg.getLedgerId(), msg.getEntryId());
IntIntPair pendingAck = pendingAcks.get(position.getLedgerId(), position.getEntryId());
IntIntPair pendingAck = pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId());
if (pendingAck != null) {
int unAckedCount = (int) getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt());
pendingAcks.remove(position.getLedgerId(), position.getEntryId());
totalRedeliveryMessages += unAckedCount;
pendingPositions.add(position);
}
Expand All @@ -1212,16 +1211,7 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);
msgRedeliverCounter.add(totalRedeliveryMessages);

int numberOfBlockedPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);

// if permitsReceivedWhileConsumerBlocked has been accumulated then pass it to Dispatcher to flow messages
if (numberOfBlockedPermits > 0) {
MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
log.debug()
.attr("numberOfBlockedPermits", numberOfBlockedPermits)
.log("Added blockedPermits to consumer's messagePermits");
subscription.consumerFlow(this, numberOfBlockedPermits);
}
flowConsumerBlockedPermits(this);
}

public Subscription getSubscription() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,26 @@ private void processPendingAcks(PendingAcksConsumer processor) {
* @param processor the processor to handle each pending ack
*/
public void forEachAndClose(PendingAcksConsumer processor) {
internalForEachAndClear(processor, true);
}

/**
* Iterate over all the pending acks and clear the map.
* Unlike {@link #forEachAndClose(PendingAcksConsumer)}, this method does not close the map,
* so new entries can still be added after this method returns.
*
* @param processor the processor to handle each pending ack
*/
public void forEachAndClear(PendingAcksConsumer processor) {
internalForEachAndClear(processor, false);
}

private void internalForEachAndClear(PendingAcksConsumer processor, boolean close) {
try {
writeLock.lock();
closed = true;
if (close) {
closed = true;
}
PendingAcksRemoveHandler pendingAcksRemoveHandler = pendingAcksRemoveHandlerSupplier.get();
if (pendingAcksRemoveHandler != null) {
try {
Expand Down Expand Up @@ -323,6 +340,35 @@ public boolean remove(long ledgerId, long entryId) {
}
}

/**
* Atomically remove and return the pending ack for the given ledger ID and entry ID.
* Unlike {@link #remove(long, long)}, this method returns the removed entry so the caller
* can access the batch size and sticky key hash without a separate get operation.
*
* @param ledgerId the ledger ID
* @param entryId the entry ID
* @return the removed entry as an IntIntPair (batchSize, stickyKeyHash), or null if not found
*/
public IntIntPair removeAndGet(long ledgerId, long entryId) {
try {
writeLock.lock();
TreeMap<Long, IntIntPair> ledgerMap = pendingAcks.get(ledgerId);
if (ledgerMap == null) {
return null;
}
IntIntPair removedEntry = ledgerMap.remove(entryId);
if (removedEntry != null) {
handleRemovePendingAck(ledgerId, entryId, removedEntry.rightInt());
}
if (removedEntry != null && ledgerMap.isEmpty()) {
pendingAcks.remove(ledgerId);
}
return removedEntry;
} finally {
writeLock.unlock();
}
}

/**
* Remove all pending acks up to the given ledger ID and entry ID, invoking a callback for each removed entry.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.apache.pulsar.common.util.collections.IntIntPair;
import org.testng.annotations.Test;

public class PendingAcksMapTest {
Expand Down Expand Up @@ -218,4 +219,84 @@ public void size_ReturnsCorrectSize() {

assertEquals(pendingAcksMap.size(), 3);
}

@Test
public void forEachAndClear_ProcessesAndClearsAllPendingAcks() {
Consumer consumer = createMockConsumer("consumer1");
PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null);
pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);

List<Long> processedEntries = new ArrayList<>();
pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, stickyKeyHash) -> processedEntries.add(entryId));

assertEquals(processedEntries, List.of(1L, 2L));
assertEquals(pendingAcksMap.size(), 0);
}

@Test
public void forEachAndClear_AllowsAddingAfterClear() {
Consumer consumer = createMockConsumer("consumer1");
PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null);
pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);

pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, stickyKeyHash) -> {});

// Unlike forEachAndClose, forEachAndClear should allow new additions
boolean result = pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
assertTrue(result);
assertTrue(pendingAcksMap.contains(1L, 2L));
}

@Test
public void forEachAndClear_InvokesRemoveHandler() {
Consumer consumer = createMockConsumer("consumer1");
PendingAcksMap.PendingAcksRemoveHandler removeHandler = mock(PendingAcksMap.PendingAcksRemoveHandler.class);
PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> removeHandler);
pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);

pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, stickyKeyHash) -> {});

verify(removeHandler).startBatch();
verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
verify(removeHandler).handleRemoving(consumer, 1L, 2L, 124, false);
verify(removeHandler).endBatch();
}

@Test
public void removeAndGet_RemovesAndReturnsEntry() {
Consumer consumer = createMockConsumer("consumer1");
PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null);
pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123);

IntIntPair result = pendingAcksMap.removeAndGet(1L, 1L);

assertTrue(result != null);
assertEquals(result.leftInt(), 5);
assertEquals(result.rightInt(), 123);
assertFalse(pendingAcksMap.contains(1L, 1L));
}

@Test
public void removeAndGet_ReturnsNullWhenNotFound() {
Consumer consumer = createMockConsumer("consumer1");
PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null);

IntIntPair result = pendingAcksMap.removeAndGet(1L, 1L);

assertTrue(result == null);
}

@Test
public void removeAndGet_InvokesRemoveHandler() {
Consumer consumer = createMockConsumer("consumer1");
PendingAcksMap.PendingAcksRemoveHandler removeHandler = mock(PendingAcksMap.PendingAcksRemoveHandler.class);
PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> removeHandler);
pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123);

pendingAcksMap.removeAndGet(1L, 1L);

verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
}
}
Loading