diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java index 53e37a263cf..21b87e2be96 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java @@ -27,6 +27,7 @@ import java.nio.file.FileSystems; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.EntryLocation; @@ -188,6 +189,8 @@ public void delete(long ledgerId) throws IOException { deletedLedgers.add(ledgerId); } + private static final int DELETE_ENTRIES_BATCH_SIZE = 100000; + public void removeOffsetFromDeletedLedgers() throws IOException { LongPairWrapper firstKeyWrapper = LongPairWrapper.get(-1, -1); LongPairWrapper lastKeyWrapper = LongPairWrapper.get(-1, -1); @@ -200,6 +203,10 @@ public void removeOffsetFromDeletedLedgers() throws IOException { } log.info("Deleting indexes for ledgers: {}", ledgersToDelete); + long startTime = System.nanoTime(); + long deletedEntries = 0; + long deletedEntriesInBatch = 0; + Batch batch = locationsDb.newBatch(); try { @@ -211,20 +218,58 @@ public void removeOffsetFromDeletedLedgers() throws IOException { firstKeyWrapper.set(ledgerId, 0); lastKeyWrapper.set(ledgerId, Long.MAX_VALUE); - batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array); - } + Entry firstKeyRes = locationsDb.getCeil(firstKeyWrapper.array); + if (firstKeyRes == null || ArrayUtil.getLong(firstKeyRes.getKey(), 0) != ledgerId) { + // No entries found for ledger + if (log.isDebugEnabled()) { + log.debug("No entries found for ledger {}", ledgerId); + } + continue; + } - batch.flush(); + long firstEntryId = ArrayUtil.getLong(firstKeyRes.getKey(), 8); + long lastEntryId = getLastEntryInLedgerInternal(ledgerId); + if (log.isDebugEnabled()) { + log.debug("Deleting index for ledger {} entries ({} -> {})", + ledgerId, firstEntryId, lastEntryId); + } - // Removed from pending set - for (long ledgerId : ledgersToDelete) { - deletedLedgers.remove(ledgerId); + // Iterate over all the keys and remove each of them + for (long entryId = firstEntryId; entryId <= lastEntryId; entryId++) { + keyToDelete.set(ledgerId, entryId); + if (log.isDebugEnabled()) { + log.debug("Deleting index for ({}, {})", keyToDelete.getFirst(), keyToDelete.getSecond()); + } + batch.remove(keyToDelete.array); + ++deletedEntriesInBatch; + ++deletedEntries; + } + + if (deletedEntriesInBatch > DELETE_ENTRIES_BATCH_SIZE) { + batch.flush(); + batch.clear(); + deletedEntriesInBatch = 0; + } } } finally { - firstKeyWrapper.recycle(); - lastKeyWrapper.recycle(); - keyToDelete.recycle(); - batch.close(); + try { + batch.flush(); + batch.clear(); + } finally { + + firstKeyWrapper.recycle(); + lastKeyWrapper.recycle(); + keyToDelete.recycle(); + batch.close(); + } + } + + log.info("Deleted indexes for {} entries from {} ledgers in {} seconds", deletedEntries, ledgersToDelete.size(), + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) / 1000.0); + + // Removed from pending set + for (long ledgerId : ledgersToDelete) { + deletedLedgers.remove(ledgerId); } }