diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index 4edbc2c3e91..37de152fb6f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -31,6 +31,8 @@ import java.util.Date; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.bookie.BookieImpl; import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.LedgerDirsManager; @@ -53,6 +55,8 @@ public LocationsIndexRebuildOp(ServerConfiguration conf) { this.conf = conf; } + private static final int BATCH_COMMIT_SIZE = 10_000; + public void initiate() throws IOException { LOG.info("Starting locations index rebuilding"); File[] indexDirs = conf.getIndexDirs(); @@ -90,6 +94,8 @@ public void initiate() throws IOException { int totalEntryLogs = entryLogs.size(); int completedEntryLogs = 0; LOG.info("Scanning {} entry logs", totalEntryLogs); + AtomicReference batch = new AtomicReference<>(newIndex.newBatch()); + AtomicInteger count = new AtomicInteger(); for (long entryLogId : entryLogs) { entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @@ -108,7 +114,15 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio // Update the ledger index page LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId); LongWrapper value = LongWrapper.get(location); - newIndex.put(key.array, value.array); + batch.get().put(key.array, value.array); + + if (count.incrementAndGet() > BATCH_COMMIT_SIZE) { + batch.get().flush(); + batch.get().close(); + + batch.set(newIndex.newBatch()); + count.set(0); + } } @Override @@ -122,6 +136,9 @@ public boolean accept(long ledgerId) { completedEntryLogs, totalEntryLogs); } + batch.get().flush(); + batch.get().close(); + newIndex.sync(); newIndex.close(); }