From 2a78a64b12fb96e471d0e5db8fdfec0fea7deded Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Thu, 20 May 2021 23:39:20 -0700 Subject: [PATCH 1/7] Fix issues in advanceNonDurableCursors --- .../mledger/impl/ManagedLedgerImpl.java | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 106fcc9268bc9..10be1818817cf 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2458,23 +2458,26 @@ private void advanceNonDurableCursors(List ledgersToDelete) { return; } - long firstNonDeletedLedger = ledgers - .higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId()); - PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1); + LedgerInfo highestLedgerToDelete = ledgersToDelete.get(ledgersToDelete.size() - 1); + PositionImpl highestPositionToDelete = new PositionImpl(highestLedgerToDelete.getLedgerId(), highestLedgerToDelete.getEntries() - 1); cursors.forEach(cursor -> { - if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) { - cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() { - @Override - public void markDeleteComplete(Object ctx) { - } + if (!cursor.isDurable()) { + // Advance mark delete position if the highest position that can be deleted to is greater than the current mark deletion position + // if + if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) { + cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + } - @Override - public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}] Failed to mark delete while trimming data ledgers: {}", name, - exception.getMessage()); - } - }, null); + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + log.warn("[{}] Failed to mark delete while trimming data ledgers: {}", name, + exception.getMessage()); + } + }, null); + } } }); } From 08f506cd4bec77d646a373620b3496ae898a5ede Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Fri, 21 May 2021 12:41:37 -0700 Subject: [PATCH 2/7] cleaning up --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 10be1818817cf..5a493a06a0a3e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2463,8 +2463,6 @@ private void advanceNonDurableCursors(List ledgersToDelete) { cursors.forEach(cursor -> { if (!cursor.isDurable()) { - // Advance mark delete position if the highest position that can be deleted to is greater than the current mark deletion position - // if if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) { cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() { @Override From 074e28d38f6fdb5aba37abd291ac691adc4412af Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Fri, 21 May 2021 16:36:48 -0700 Subject: [PATCH 3/7] fixing --- .../mledger/impl/ManagedLedgerImpl.java | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 5a493a06a0a3e..ad840c0981f17 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2458,12 +2458,17 @@ private void advanceNonDurableCursors(List ledgersToDelete) { return; } - LedgerInfo highestLedgerToDelete = ledgersToDelete.get(ledgersToDelete.size() - 1); - PositionImpl highestPositionToDelete = new PositionImpl(highestLedgerToDelete.getLedgerId(), highestLedgerToDelete.getEntries() - 1); + // need to move mark delete to the first non deleted ledger for non durable cursor since ledgers before it will be deleted + // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return incorrect results + long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId()); + PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1); cursors.forEach(cursor -> { if (!cursor.isDurable()) { - if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) { + // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed + // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed + if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0 + && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) { cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { @@ -3013,6 +3018,7 @@ private void cleanupOffloaded(long ledgerId, UUID uuid, String offloadDriverName * @return the count of entries */ long getNumberOfEntries(Range range) { + log.info("!-----------getNumberOfEntries {} ---------------!", range); PositionImpl fromPosition = range.lowerEndpoint(); boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED; PositionImpl toPosition = range.upperEndpoint(); @@ -3029,21 +3035,32 @@ long getNumberOfEntries(Range range) { // If the from & to are pointing to different ledgers, then we need to : // 1. Add the entries in the ledger pointed by toPosition count += toPosition.getEntryId(); + log.info("count: {}", count); count += toIncluded ? 1 : 0; + log.info("count: {}", count); // 2. Add the entries in the ledger pointed by fromPosition LedgerInfo li = ledgers.get(fromPosition.getLedgerId()); + log.info("li: {}", li); if (li != null) { count += li.getEntries() - (fromPosition.getEntryId() + 1); count += fromIncluded ? 1 : 0; } + log.info("count: {}", count); + + // 3. Add the whole ledgers entries in between + log.info("ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false): {}", ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)); for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false) .values()) { count += ls.getEntries(); } + log.info("count: {}", count); + + log.info("!-----------end getNumberOfEntries ---------------!"); + return count; } } From f677ceb411b33774a455d73f0fff903fb59ef5de Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Fri, 21 May 2021 16:43:11 -0700 Subject: [PATCH 4/7] fix comment --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ad840c0981f17..29079cfd66e4b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2458,7 +2458,7 @@ private void advanceNonDurableCursors(List ledgersToDelete) { return; } - // need to move mark delete to the first non deleted ledger for non durable cursor since ledgers before it will be deleted + // need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return incorrect results long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId()); PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1); From 4a88ad5e216a1b55d98261dc46cd83e642dbb525 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Mon, 24 May 2021 18:27:37 -0700 Subject: [PATCH 5/7] renaming method --- .../mledger/impl/ManagedLedgerImpl.java | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 29079cfd66e4b..f450d25a8eda5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2381,7 +2381,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { return; } - advanceNonDurableCursors(ledgersToDelete); + advanceCursorsIfNecessary(ledgersToDelete); PositionImpl currentLastConfirmedEntry = lastConfirmedEntry; // Update metadata @@ -2450,10 +2450,15 @@ public void operationFailed(MetaStoreException e) { /** * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data. + * This method also addresses a corner case for durable cursors in which the cursor is caught up, i.e. the mark delete position + * happens to be the last entry in a ledger. If the ledger is deleted, then subsequent calculations for backlog + * size may not be accurate since the method getNumberOfEntries we use in backlog calculation will not be able to fetch + * the ledger info of a deleted ledger. Thus, we need to update the mark delete position to the "-1" entry of the first ledger + * that is not marked for deletion. * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped * entries and the stats are reported correctly. */ - private void advanceNonDurableCursors(List ledgersToDelete) { + private void advanceCursorsIfNecessary(List ledgersToDelete) { if (ledgersToDelete.isEmpty()) { return; } @@ -2464,23 +2469,21 @@ private void advanceNonDurableCursors(List ledgersToDelete) { PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1); cursors.forEach(cursor -> { - if (!cursor.isDurable()) { - // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed - // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed - if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0 - && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) { - cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() { - @Override - public void markDeleteComplete(Object ctx) { - } + // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed + // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed + if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0 + && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) { + cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + } - @Override - public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}] Failed to mark delete while trimming data ledgers: {}", name, - exception.getMessage()); - } - }, null); - } + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + log.warn("[{}] Failed to mark delete while trimming data ledgers: {}", name, + exception.getMessage()); + } + }, null); } }); } From 9e92a7eda679f6719afe6ec12c0ff3cb02dfd4c2 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Mon, 24 May 2021 18:28:37 -0700 Subject: [PATCH 6/7] clean up --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f450d25a8eda5..c651566d26d15 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3021,7 +3021,6 @@ private void cleanupOffloaded(long ledgerId, UUID uuid, String offloadDriverName * @return the count of entries */ long getNumberOfEntries(Range range) { - log.info("!-----------getNumberOfEntries {} ---------------!", range); PositionImpl fromPosition = range.lowerEndpoint(); boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED; PositionImpl toPosition = range.upperEndpoint(); @@ -3038,32 +3037,21 @@ long getNumberOfEntries(Range range) { // If the from & to are pointing to different ledgers, then we need to : // 1. Add the entries in the ledger pointed by toPosition count += toPosition.getEntryId(); - log.info("count: {}", count); count += toIncluded ? 1 : 0; - log.info("count: {}", count); // 2. Add the entries in the ledger pointed by fromPosition LedgerInfo li = ledgers.get(fromPosition.getLedgerId()); - log.info("li: {}", li); if (li != null) { count += li.getEntries() - (fromPosition.getEntryId() + 1); count += fromIncluded ? 1 : 0; } - log.info("count: {}", count); - - // 3. Add the whole ledgers entries in between - log.info("ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false): {}", ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)); for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false) .values()) { count += ls.getEntries(); } - - log.info("count: {}", count); - - log.info("!-----------end getNumberOfEntries ---------------!"); - + return count; } } From 70b577c31ffdbcf3a904e3207d19b4a5c81dfe69 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Mon, 24 May 2021 18:29:09 -0700 Subject: [PATCH 7/7] cleaning up --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c651566d26d15..0f56b5765896b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3051,7 +3051,7 @@ long getNumberOfEntries(Range range) { .values()) { count += ls.getEntries(); } - + return count; } }