diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index c08fb9c1d4f5f..1acbc824f9d29 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -148,6 +148,12 @@ private ByteBuf copyEntry(EntryImpl entry) { public void invalidateEntries(final PositionImpl lastPosition) { final PositionImpl firstPosition = PositionImpl.get(-1, 0); + if (firstPosition.compareTo(lastPosition) > 0) { + log.debug("Attempted to invalidate entries in an invalid range : {} ~ {}", + firstPosition, lastPosition); + return; + } + Pair removed = entries.removeRange(firstPosition, lastPosition, false); int entriesRemoved = removed.getLeft(); long sizeRemoved = removed.getRight(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java index 1f8dade9e96b2..7ca4ebcdefc11 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java @@ -45,7 +45,7 @@ * care about ledgers to be deleted. * */ -class ManagedCursorContainer implements Iterable { +public class ManagedCursorContainer implements Iterable { private static class Item { final ManagedCursor cursor; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 2183de26b881c..11047d9557c09 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -105,7 +105,7 @@ public class ManagedCursorImpl implements ManagedCursor { protected volatile PositionImpl markDeletePosition; protected volatile PositionImpl readPosition; - private volatile MarkDeleteEntry lastMarkDeleteEntry; + protected volatile MarkDeleteEntry lastMarkDeleteEntry; protected static final AtomicReferenceFieldUpdater WAITING_READ_OP_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp"); @@ -178,7 +178,7 @@ public MarkDeleteEntry(PositionImpl newPosition, Map properties, } } - private final ArrayDeque pendingMarkDeleteOps = new ArrayDeque<>(); + protected final ArrayDeque pendingMarkDeleteOps = new ArrayDeque<>(); private static final AtomicIntegerFieldUpdater PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount"); @SuppressWarnings("unused") diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c61f94c50eb33..905544b351ac0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -829,11 +829,9 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { @Override public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException { - checkManagedLedgerIsOpen(); - checkFenced(); - - return new NonDurableCursorImpl(bookKeeper, config, this, null, - (PositionImpl) startCursorPosition); + return newNonDurableCursor( + startCursorPosition, + "non-durable-cursor-" + UUID.randomUUID()); } @Override @@ -863,12 +861,12 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu } @Override - public Iterable getCursors() { + public ManagedCursorContainer getCursors() { return cursors; } @Override - public Iterable getActiveCursors() { + public ManagedCursorContainer getActiveCursors() { return activeCursors; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 43bab0f6296af..bc57f79c58c07 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -42,7 +42,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { if (startCursorPosition == null || startCursorPosition.getLedgerId() == PositionImpl.latest.getLedgerId()) { // Start from last entry initializeCursorPosition(ledger.getLastPositionAndCounter()); - } else if (startCursorPosition.equals(PositionImpl.earliest)) { + } else if (startCursorPosition.getLedgerId() == PositionImpl.earliest.getLedgerId()) { // Start from invalid ledger to read from first available entry recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition())); } else { @@ -83,6 +83,12 @@ void recover(final VoidCallback callback) { protected void internalAsyncMarkDelete(final PositionImpl newPosition, Map properties, final MarkDeleteCallback callback, final Object ctx) { // Bypass persistence of mark-delete position and individually deleted messages info + + MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx); + lastMarkDeleteEntry = mdEntry; + // it is important to advance cursor so the retention can kick in as expected. + ledger.updateCursor(NonDurableCursorImpl.this, mdEntry.newPosition); + callback.markDeleteComplete(ctx); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java index 1579a7a71d20e..957e63bfa3e6a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -80,7 +81,7 @@ void testZNodeBypassed() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger"); ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest); - assertTrue(Iterables.isEmpty(ledger.getCursors())); + assertFalse(Iterables.isEmpty(ledger.getCursors())); c1.close(); ledger.close(); @@ -610,6 +611,50 @@ void testCursorWithNameIsCachable() throws Exception { ledger.close(); } + @Test + public void testGetSlowestConsumer() throws Exception { + final String mlName = "test-get-slowest-consumer-ml"; + final String c1 = "cursor1"; + final String nc1 = "non-durable-cursor1"; + final String ncEarliest = "non-durable-cursor-earliest"; + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, new ManagedLedgerConfig()); + Position p1 = ledger.addEntry(c1.getBytes(UTF_8)); + log.info("write entry 1 : pos = {}", p1); + Position p2 = ledger.addEntry(nc1.getBytes(UTF_8)); + log.info("write entry 2 : pos = {}", p2); + Position p3 = ledger.addEntry(nc1.getBytes(UTF_8)); + log.info("write entry 3 : pos = {}", p3); + + ManagedCursor cursor1 = ledger.openCursor(c1); + cursor1.seek(p3); + assertEquals(p3, ledger.getCursors().getSlowestReaderPosition()); + + ManagedCursor nonCursor1 = ledger.newNonDurableCursor(p2, nc1); + assertEquals(p2, ledger.getCursors().getSlowestReaderPosition()); + + PositionImpl earliestPos = new PositionImpl(-1, -2); + + ManagedCursor nonCursorEarliest = ledger.newNonDurableCursor(earliestPos, ncEarliest); + PositionImpl expectedPos = new PositionImpl(((PositionImpl) p1).getLedgerId(), -1); + assertEquals(expectedPos, ledger.getCursors().getSlowestReaderPosition()); + + // move non-durable cursor should update the slowest reader position + nonCursorEarliest.markDelete(p1); + assertEquals(p1, ledger.getCursors().getSlowestReaderPosition()); + + nonCursorEarliest.markDelete(p2); + assertEquals(p2, ledger.getCursors().getSlowestReaderPosition()); + + nonCursorEarliest.markDelete(p3); + assertEquals(p2, ledger.getCursors().getSlowestReaderPosition()); + + nonCursor1.markDelete(p3); + assertEquals(p3, ledger.getCursors().getSlowestReaderPosition()); + + ledger.close(); + } + @Test(expectedExceptions = NullPointerException.class) void testCursorWithNameIsNotNull() throws Exception { final String p1CursorName = "entry-1"; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 178965f504def..995af6cec94dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -682,7 +682,8 @@ private CompletableFuture getNonDurableSubscription(Stri long ledgerId = msgId.getLedgerId(); long entryId = msgId.getEntryId(); - if (msgId instanceof BatchMessageIdImpl) { + if (ledgerId >= 0 + && msgId instanceof BatchMessageIdImpl) { // When the start message is relative to a batch, we need to take one step back on the previous message, // because the "batch" might not have been consumed in its entirety. // The client will then be able to discard the first messages if needed. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java index 0b480d25db3f4..84f4eac594894 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java @@ -83,7 +83,8 @@ public PersistentTransactionBuffer(String topic, ManagedLedger ledger, BrokerSer throws BrokerServiceException.NamingException, ManagedLedgerException { super(topic, ledger, brokerService); this.txnCursor = new TransactionCursorImpl(); - this.retentionCursor = ledger.newNonDurableCursor(PositionImpl.earliest); + this.retentionCursor = ledger.newNonDurableCursor( + PositionImpl.earliest, "txn-buffer-retention"); } @Override