From 7004d994938e914561d8343a96d9d75ce98b2837 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 19 Sep 2022 08:59:57 -0700 Subject: [PATCH] Speed up the rebuildinding of RocksDB index (#3458) * Speed up the rebuildinding of RocksDB index * fix check style Co-authored-by: chenhang --- .../storage/ldb/LocationsIndexRebuildOp.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index 4edbc2c3e91..37de152fb6f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -31,6 +31,8 @@ import java.util.Date; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.bookie.BookieImpl; import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.LedgerDirsManager; @@ -53,6 +55,8 @@ public LocationsIndexRebuildOp(ServerConfiguration conf) { this.conf = conf; } + private static final int BATCH_COMMIT_SIZE = 10_000; + public void initiate() throws IOException { LOG.info("Starting locations index rebuilding"); File[] indexDirs = conf.getIndexDirs(); @@ -90,6 +94,8 @@ public void initiate() throws IOException { int totalEntryLogs = entryLogs.size(); int completedEntryLogs = 0; LOG.info("Scanning {} entry logs", totalEntryLogs); + AtomicReference batch = new AtomicReference<>(newIndex.newBatch()); + AtomicInteger count = new AtomicInteger(); for (long entryLogId : entryLogs) { entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @@ -108,7 +114,15 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio // Update the ledger index page LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId); LongWrapper value = LongWrapper.get(location); - newIndex.put(key.array, value.array); + batch.get().put(key.array, value.array); + + if (count.incrementAndGet() > BATCH_COMMIT_SIZE) { + batch.get().flush(); + batch.get().close(); + + batch.set(newIndex.newBatch()); + count.set(0); + } } @Override @@ -122,6 +136,9 @@ public boolean accept(long ledgerId) { completedEntryLogs, totalEntryLogs); } + batch.get().flush(); + batch.get().close(); + newIndex.sync(); newIndex.close(); }