diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 106fcc9268bc9..0f56b5765896b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2381,7 +2381,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { return; } - advanceNonDurableCursors(ledgersToDelete); + advanceCursorsIfNecessary(ledgersToDelete); PositionImpl currentLastConfirmedEntry = lastConfirmedEntry; // Update metadata @@ -2450,20 +2450,29 @@ public void operationFailed(MetaStoreException e) { /** * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data. + * This method also addresses a corner case for durable cursors in which the cursor is caught up, i.e. the mark delete position + * happens to be the last entry in a ledger. If the ledger is deleted, then subsequent calculations for backlog + * size may not be accurate since the method getNumberOfEntries we use in backlog calculation will not be able to fetch + * the ledger info of a deleted ledger. Thus, we need to update the mark delete position to the "-1" entry of the first ledger + * that is not marked for deletion. * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped * entries and the stats are reported correctly. */ - private void advanceNonDurableCursors(List ledgersToDelete) { + private void advanceCursorsIfNecessary(List ledgersToDelete) { if (ledgersToDelete.isEmpty()) { return; } - long firstNonDeletedLedger = ledgers - .higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId()); + // need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion + // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return incorrect results + long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId()); PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1); cursors.forEach(cursor -> { - if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) { + // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed + // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed + if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0 + && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) { cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) {