Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2381,7 +2381,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
return;
}

advanceNonDurableCursors(ledgersToDelete);
advanceCursorsIfNecessary(ledgersToDelete);

PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
Expand Down Expand Up @@ -2450,20 +2450,29 @@ public void operationFailed(MetaStoreException e) {

/**
* Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
* This method also addresses a corner case for durable cursors in which the cursor is caught up, i.e. the mark delete position
* happens to be the last entry in a ledger. If the ledger is deleted, then subsequent calculations for backlog
* size may not be accurate since the method getNumberOfEntries we use in backlog calculation will not be able to fetch
* the ledger info of a deleted ledger. Thus, we need to update the mark delete position to the "-1" entry of the first ledger
* that is not marked for deletion.
* This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
* entries and the stats are reported correctly.
*/
private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
private void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) {
if (ledgersToDelete.isEmpty()) {
return;
}

long firstNonDeletedLedger = ledgers
.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
// need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion
// calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return incorrect results
long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);

cursors.forEach(cursor -> {
if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) {
// move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
// to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed
if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
&& highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) {
cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
Expand Down