Skip to content

Commit

Permalink
Speed up the rebuildinding of RocksDB index (#3458)
Browse files Browse the repository at this point in the history
* Speed up the rebuildinding of RocksDB index

* fix check style

Co-authored-by: chenhang <chenhang@apache.org>
  • Loading branch information
merlimat and hangc0276 committed Sep 19, 2022
1 parent 1313b8e commit 7004d99
Showing 1 changed file with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
Expand All @@ -53,6 +55,8 @@ public LocationsIndexRebuildOp(ServerConfiguration conf) {
this.conf = conf;
}

private static final int BATCH_COMMIT_SIZE = 10_000;

public void initiate() throws IOException {
LOG.info("Starting locations index rebuilding");
File[] indexDirs = conf.getIndexDirs();
Expand Down Expand Up @@ -90,6 +94,8 @@ public void initiate() throws IOException {
int totalEntryLogs = entryLogs.size();
int completedEntryLogs = 0;
LOG.info("Scanning {} entry logs", totalEntryLogs);
AtomicReference<KeyValueStorage.Batch> batch = new AtomicReference<>(newIndex.newBatch());
AtomicInteger count = new AtomicInteger();

for (long entryLogId : entryLogs) {
entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
Expand All @@ -108,7 +114,15 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio
// Update the ledger index page
LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId);
LongWrapper value = LongWrapper.get(location);
newIndex.put(key.array, value.array);
batch.get().put(key.array, value.array);

if (count.incrementAndGet() > BATCH_COMMIT_SIZE) {
batch.get().flush();
batch.get().close();

batch.set(newIndex.newBatch());
count.set(0);
}
}

@Override
Expand All @@ -122,6 +136,9 @@ public boolean accept(long ledgerId) {
completedEntryLogs, totalEntryLogs);
}

batch.get().flush();
batch.get().close();

newIndex.sync();
newIndex.close();
}
Expand Down

0 comments on commit 7004d99

Please sign in to comment.