From 787f941a62e4af0844c5f829dec108f8922978a7 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 17 May 2026 08:54:13 +0800 Subject: [PATCH 1/6] Fix expire messages may lose mark-delete properties --- .../PersistentMessageExpiryMonitor.java | 2 +- .../service/PersistentMessageFinderTest.java | 64 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index a9f7e30510413..c12e5c287e5a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -245,7 +245,7 @@ public void findEntryComplete(Position position, Object ctx) { .attr("position", position) .log("Expiring all messages until position"); Position prevMarkDeletePos = cursor.getMarkDeletedPosition(); - cursor.asyncMarkDelete(position, cursor.getProperties(), markDeleteCallback, + cursor.asyncMarkDelete(position, null, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false)); if (!Objects.equals(cursor.getMarkDeletedPosition(), prevMarkDeletePos) && subscription != null) { subscription.updateLastMarkDeleteAdvancedTimestamp(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 8d9a2e4578012..67496630ba877 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -34,11 +35,14 @@ import io.netty.buffer.UnpooledByteBufAllocator; import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -1098,4 +1102,64 @@ public void testGetFindPositionRange_SingleClosedLedger() { assertNull(range.getRight()); assertEquals(range.getLeft(), PositionFactory.create(1, 9)); } + + @Test + @SuppressWarnings("unchecked") + void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception { + final String ledgerAndCursorName = "testExpireMessagesNeverLoseMarkDeleteProperties"; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(10); + config.setRetentionTime(1, TimeUnit.HOURS); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + ManagedCursorImpl spyCursor = spy(cursor); + + Position pos1 = ledger.addEntry(createMessageWrittenToLedger("msg-1")); + Position pos2 = ledger.addEntry(createMessageWrittenToLedger("msg-2")); + + CountDownLatch userMarkDeleteCompletedLatch = new CountDownLatch(1); + CountDownLatch expiryMarkDeleteCompletedLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + Map invocationProperties = invocation.getArgument(1); + + // Let the user-triggered markDelete() with properties complete first. + if (invocationProperties != null && invocationProperties.size() == 1) { + try { + return invocation.callRealMethod(); + } finally { + userMarkDeleteCompletedLatch.countDown(); + } + } + + // Then let the expiry-triggered mark-delete proceed with inherited properties. + if (invocationProperties == null || invocationProperties.isEmpty()) { + userMarkDeleteCompletedLatch.await(); + try { + return invocation.callRealMethod(); + } finally { + expiryMarkDeleteCompletedLatch.countDown(); + } + } + + return invocation.callRealMethod(); + }).when(spyCursor).asyncMarkDelete(any(Position.class), nullable(Map.class), any(AsyncCallbacks.MarkDeleteCallback.class), + nullable(Object.class)); + + PersistentTopic topic = mockPersistentTopic("topicname"); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(topic, + spyCursor.getName(), spyCursor, null); + + // Start expiry first so it captures the current properties before the user mark-delete completes. + CompletableFuture.runAsync(() -> monitor.findEntryComplete(pos2, null)); + + Map properties = new HashMap<>(); + properties.put("test-property", 1L); + spyCursor.markDelete(pos1, properties); + + expiryMarkDeleteCompletedLatch.await(); + assertEquals(spyCursor.getMarkDeletedPosition(), pos2); + assertEquals(spyCursor.getProperties(), properties); + } } From 6a7ffe2d927350b0142aa8718a5ff964b5e83a0d Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 17 May 2026 16:28:27 +0800 Subject: [PATCH 2/6] Fix checkstyle --- .../service/persistent/PersistentMessageExpiryMonitor.java | 3 +-- .../pulsar/broker/service/PersistentMessageFinderTest.java | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index c12e5c287e5a6..2848ba2a0d24d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -245,8 +245,7 @@ public void findEntryComplete(Position position, Object ctx) { .attr("position", position) .log("Expiring all messages until position"); Position prevMarkDeletePos = cursor.getMarkDeletedPosition(); - cursor.asyncMarkDelete(position, null, markDeleteCallback, - cursor.getNumberOfEntriesInBacklog(false)); + cursor.asyncMarkDelete(position, null, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false)); if (!Objects.equals(cursor.getMarkDeletedPosition(), prevMarkDeletePos) && subscription != null) { subscription.updateLastMarkDeleteAdvancedTimestamp(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 67496630ba877..705898df0ecf2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -1144,8 +1144,9 @@ void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception { } return invocation.callRealMethod(); - }).when(spyCursor).asyncMarkDelete(any(Position.class), nullable(Map.class), any(AsyncCallbacks.MarkDeleteCallback.class), - nullable(Object.class)); + }).when(spyCursor) + .asyncMarkDelete(any(Position.class), nullable(Map.class), any(AsyncCallbacks.MarkDeleteCallback.class), + nullable(Object.class)); PersistentTopic topic = mockPersistentTopic("topicname"); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(topic, From 3a6bdf1acf802b7cdcfaec855b4076be219b9b6f Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 18 May 2026 11:32:20 +0800 Subject: [PATCH 3/6] Fix expire messages mark-delete execution order --- .../service/PersistentMessageFinderTest.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 705898df0ecf2..2f88e5f5652dd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -1118,24 +1118,21 @@ void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception { Position pos1 = ledger.addEntry(createMessageWrittenToLedger("msg-1")); Position pos2 = ledger.addEntry(createMessageWrittenToLedger("msg-2")); - CountDownLatch userMarkDeleteCompletedLatch = new CountDownLatch(1); + CountDownLatch expiryMarkDeleteEnteredLatch = new CountDownLatch(1); + CountDownLatch cursorMarkDeleteCompletedLatch = new CountDownLatch(1); CountDownLatch expiryMarkDeleteCompletedLatch = new CountDownLatch(1); doAnswer(invocation -> { Map invocationProperties = invocation.getArgument(1); - // Let the user-triggered markDelete() with properties complete first. if (invocationProperties != null && invocationProperties.size() == 1) { - try { - return invocation.callRealMethod(); - } finally { - userMarkDeleteCompletedLatch.countDown(); - } + return invocation.callRealMethod(); } - // Then let the expiry-triggered mark-delete proceed with inherited properties. + // Pause the expiry-triggered mark-delete so the user markDelete() can complete first. if (invocationProperties == null || invocationProperties.isEmpty()) { - userMarkDeleteCompletedLatch.await(); + expiryMarkDeleteEnteredLatch.countDown(); + assertTrue(cursorMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS)); try { return invocation.callRealMethod(); } finally { @@ -1152,14 +1149,15 @@ void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception { PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(topic, spyCursor.getName(), spyCursor, null); - // Start expiry first so it captures the current properties before the user mark-delete completes. CompletableFuture.runAsync(() -> monitor.findEntryComplete(pos2, null)); + assertTrue(expiryMarkDeleteEnteredLatch.await(5, TimeUnit.SECONDS)); Map properties = new HashMap<>(); properties.put("test-property", 1L); spyCursor.markDelete(pos1, properties); + cursorMarkDeleteCompletedLatch.countDown(); - expiryMarkDeleteCompletedLatch.await(); + assertTrue(expiryMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS)); assertEquals(spyCursor.getMarkDeletedPosition(), pos2); assertEquals(spyCursor.getProperties(), properties); } From 796b012eb4762c6e66837410b49a36593583fed5 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 18 May 2026 13:01:33 +0800 Subject: [PATCH 4/6] Optimize test --- .../pulsar/broker/service/PersistentMessageFinderTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 2f88e5f5652dd..5bf4e1f97d462 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -1124,11 +1124,6 @@ void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception { doAnswer(invocation -> { Map invocationProperties = invocation.getArgument(1); - - if (invocationProperties != null && invocationProperties.size() == 1) { - return invocation.callRealMethod(); - } - // Pause the expiry-triggered mark-delete so the user markDelete() can complete first. if (invocationProperties == null || invocationProperties.isEmpty()) { expiryMarkDeleteEnteredLatch.countDown(); From f06153cafb32d59db26b12a128815cce66e824df Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 19 May 2026 15:40:38 +0800 Subject: [PATCH 5/6] Optimize test --- .../pulsar/broker/service/PersistentMessageFinderTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 5bf4e1f97d462..e66b19ee691a1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -1109,6 +1109,7 @@ void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception { final String ledgerAndCursorName = "testExpireMessagesNeverLoseMarkDeleteProperties"; ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(1); config.setRetentionSizeInMB(10); config.setRetentionTime(1, TimeUnit.HOURS); ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); @@ -1144,7 +1145,9 @@ void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception { PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(topic, spyCursor.getName(), spyCursor, null); - CompletableFuture.runAsync(() -> monitor.findEntryComplete(pos2, null)); + // Wait until the new ledger is created so ManagedLedgerInfo.LedgerInfo is updated in the metastore. + Awaitility.await().untilAsserted(() -> assertEquals(ledger.getLedgersInfo().size(), 3)); + CompletableFuture.runAsync(() -> monitor.expireMessages(pos2)); assertTrue(expiryMarkDeleteEnteredLatch.await(5, TimeUnit.SECONDS)); Map properties = new HashMap<>(); From e258e870fd96979a728f935e747de2608d5701de Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 19 May 2026 19:30:20 +0800 Subject: [PATCH 6/6] Revert "Optimize test" This reverts commit f06153cafb32d59db26b12a128815cce66e824df. --- .../pulsar/broker/service/PersistentMessageFinderTest.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index e66b19ee691a1..5bf4e1f97d462 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -1109,7 +1109,6 @@ void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception { final String ledgerAndCursorName = "testExpireMessagesNeverLoseMarkDeleteProperties"; ManagedLedgerConfig config = new ManagedLedgerConfig(); - config.setMaxEntriesPerLedger(1); config.setRetentionSizeInMB(10); config.setRetentionTime(1, TimeUnit.HOURS); ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); @@ -1145,9 +1144,7 @@ void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception { PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(topic, spyCursor.getName(), spyCursor, null); - // Wait until the new ledger is created so ManagedLedgerInfo.LedgerInfo is updated in the metastore. - Awaitility.await().untilAsserted(() -> assertEquals(ledger.getLedgersInfo().size(), 3)); - CompletableFuture.runAsync(() -> monitor.expireMessages(pos2)); + CompletableFuture.runAsync(() -> monitor.findEntryComplete(pos2, null)); assertTrue(expiryMarkDeleteEnteredLatch.await(5, TimeUnit.SECONDS)); Map properties = new HashMap<>();