diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index a9f7e30510413..2848ba2a0d24d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -245,8 +245,7 @@ public void findEntryComplete(Position position, Object ctx) { .attr("position", position) .log("Expiring all messages until position"); Position prevMarkDeletePos = cursor.getMarkDeletedPosition(); - cursor.asyncMarkDelete(position, cursor.getProperties(), markDeleteCallback, - cursor.getNumberOfEntriesInBacklog(false)); + cursor.asyncMarkDelete(position, null, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false)); if (!Objects.equals(cursor.getMarkDeletedPosition(), prevMarkDeletePos) && subscription != null) { subscription.updateLastMarkDeleteAdvancedTimestamp(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 8d9a2e4578012..5bf4e1f97d462 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -34,11 +35,14 @@ import io.netty.buffer.UnpooledByteBufAllocator; import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -1098,4 +1102,58 @@ public void testGetFindPositionRange_SingleClosedLedger() { assertNull(range.getRight()); assertEquals(range.getLeft(), PositionFactory.create(1, 9)); } + + @Test + @SuppressWarnings("unchecked") + void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception { + final String ledgerAndCursorName = "testExpireMessagesNeverLoseMarkDeleteProperties"; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(10); + config.setRetentionTime(1, TimeUnit.HOURS); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + ManagedCursorImpl spyCursor = spy(cursor); + + Position pos1 = ledger.addEntry(createMessageWrittenToLedger("msg-1")); + Position pos2 = ledger.addEntry(createMessageWrittenToLedger("msg-2")); + + CountDownLatch expiryMarkDeleteEnteredLatch = new CountDownLatch(1); + CountDownLatch cursorMarkDeleteCompletedLatch = new CountDownLatch(1); + CountDownLatch expiryMarkDeleteCompletedLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + Map invocationProperties = invocation.getArgument(1); + // Pause the expiry-triggered mark-delete so the user markDelete() can complete first. + if (invocationProperties == null || invocationProperties.isEmpty()) { + expiryMarkDeleteEnteredLatch.countDown(); + assertTrue(cursorMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS)); + try { + return invocation.callRealMethod(); + } finally { + expiryMarkDeleteCompletedLatch.countDown(); + } + } + + return invocation.callRealMethod(); + }).when(spyCursor) + .asyncMarkDelete(any(Position.class), nullable(Map.class), any(AsyncCallbacks.MarkDeleteCallback.class), + nullable(Object.class)); + + PersistentTopic topic = mockPersistentTopic("topicname"); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(topic, + spyCursor.getName(), spyCursor, null); + + CompletableFuture.runAsync(() -> monitor.findEntryComplete(pos2, null)); + assertTrue(expiryMarkDeleteEnteredLatch.await(5, TimeUnit.SECONDS)); + + Map properties = new HashMap<>(); + properties.put("test-property", 1L); + spyCursor.markDelete(pos1, properties); + cursorMarkDeleteCompletedLatch.countDown(); + + assertTrue(expiryMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS)); + assertEquals(spyCursor.getMarkDeletedPosition(), pos2); + assertEquals(spyCursor.getProperties(), properties); + } }