Skip to content

Commit

Permalink
bring back deleteRange for RocksDB to improve location delete perform…
Browse files Browse the repository at this point in the history
…ance (#3653)

The entry log location index deletion is deleted in batches one by one currently, and it will have low performance. Refer to: #3646

Matteo has introduced deleteRange API a few years ago, but rollback due to RocksDB delete ranges bug. #1620.  The RocksDB bug has been addressed since 5.18.0 https://github.com/facebook/rocksdb/blob/main/HISTORY.md#5180-2018-11-30. We can bring the `deleteRange` API back to improve the entry log location deletion performance.

Bring `deleteRange` API back for entry log location deletion.

(cherry picked from commit 696919c)
  • Loading branch information
hangc0276 committed Nov 21, 2022
1 parent d621e5d commit e159510
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ public void delete(long ledgerId) throws IOException {
public void removeOffsetFromDeletedLedgers() throws IOException {
LongPairWrapper firstKeyWrapper = LongPairWrapper.get(-1, -1);
LongPairWrapper lastKeyWrapper = LongPairWrapper.get(-1, -1);
LongPairWrapper keyToDelete = LongPairWrapper.get(-1, -1);

Set<Long> ledgersToDelete = deletedLedgers.items();

Expand All @@ -200,12 +199,8 @@ public void removeOffsetFromDeletedLedgers() throws IOException {

log.info("Deleting indexes for ledgers: {}", ledgersToDelete);
long startTime = System.nanoTime();
long deletedEntries = 0;
long deletedEntriesInBatch = 0;

Batch batch = locationsDb.newBatch();

try {
try (Batch batch = locationsDb.newBatch()) {
for (long ledgerId : ledgersToDelete) {
if (log.isDebugEnabled()) {
log.debug("Deleting indexes from ledger {}", ledgerId);
Expand All @@ -214,66 +209,20 @@ public void removeOffsetFromDeletedLedgers() throws IOException {
firstKeyWrapper.set(ledgerId, 0);
lastKeyWrapper.set(ledgerId, Long.MAX_VALUE);

Entry<byte[], byte[]> firstKeyRes = locationsDb.getCeil(firstKeyWrapper.array);
if (firstKeyRes == null || ArrayUtil.getLong(firstKeyRes.getKey(), 0) != ledgerId) {
// No entries found for ledger
if (log.isDebugEnabled()) {
log.debug("No entries found for ledger {}", ledgerId);
}
continue;
}

long firstEntryId = ArrayUtil.getLong(firstKeyRes.getKey(), 8);
long lastEntryId;
try {
lastEntryId = getLastEntryInLedgerInternal(ledgerId);
} catch (Bookie.NoEntryException nee) {
if (log.isDebugEnabled()) {
log.debug("No last entry id found for ledger {}", ledgerId);
}
continue;
}
if (log.isDebugEnabled()) {
log.debug("Deleting index for ledger {} entries ({} -> {})",
ledgerId, firstEntryId, lastEntryId);
}

// Iterate over all the keys and remove each of them
for (long entryId = firstEntryId; entryId <= lastEntryId; entryId++) {
keyToDelete.set(ledgerId, entryId);
if (log.isDebugEnabled()) {
log.debug("Deleting index for ({}, {})", keyToDelete.getFirst(), keyToDelete.getSecond());
}
batch.remove(keyToDelete.array);
++deletedEntriesInBatch;
++deletedEntries;
}
batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array);
}

if (deletedEntriesInBatch > DELETE_ENTRIES_BATCH_SIZE) {
batch.flush();
batch.clear();
deletedEntriesInBatch = 0;
}
batch.flush();
for (long ledgerId : ledgersToDelete) {
deletedLedgers.remove(ledgerId);
}
} finally {
try {
batch.flush();
batch.clear();
} finally {
firstKeyWrapper.recycle();
lastKeyWrapper.recycle();
keyToDelete.recycle();
batch.close();
}
firstKeyWrapper.recycle();
lastKeyWrapper.recycle();
}

log.info("Deleted indexes for {} entries from {} ledgers in {} seconds", deletedEntries, ledgersToDelete.size(),
log.info("Deleted indexes from {} ledgers in {} seconds", ledgersToDelete.size(),
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) / 1000.0);

// Removed from pending set
for (long ledgerId : ledgersToDelete) {
deletedLedgers.remove(ledgerId);
}
}

private static final Logger log = LoggerFactory.getLogger(EntryLocationIndex.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,70 @@ public void deleteLedgerTest() throws Exception {
idx.close();
}

@Test
public void deleteBatchLedgersTest() throws Exception {
File tmpDir = File.createTempFile("bkTest", ".dir");
tmpDir.delete();
tmpDir.mkdir();
tmpDir.deleteOnExit();

EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory,
tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE);

int numLedgers = 1000;
int numEntriesPerLedger = 100;

int location = 0;
KeyValueStorage.Batch batch = idx.newBatch();
for (int entryId = 0; entryId < numEntriesPerLedger; ++entryId) {
for (int ledgerId = 0; ledgerId < numLedgers; ++ledgerId) {
idx.addLocation(batch, ledgerId, entryId, location);
location++;
}
}
batch.flush();
batch.close();


int expectedLocation = 0;
for (int entryId = 0; entryId < numEntriesPerLedger; ++entryId) {
for (int ledgerId = 0; ledgerId < numLedgers; ++ledgerId) {
assertEquals(expectedLocation, idx.getLocation(ledgerId, entryId));
expectedLocation++;
}
}

for (int ledgerId = 0; ledgerId < numLedgers; ++ledgerId) {
if (ledgerId % 2 == 0) {
idx.delete(ledgerId);
}
}

expectedLocation = 0;
for (int entryId = 0; entryId < numEntriesPerLedger; ++entryId) {
for (int ledgerId = 0; ledgerId < numLedgers; ++ledgerId) {
assertEquals(expectedLocation, idx.getLocation(ledgerId, entryId));
expectedLocation++;
}
}

idx.removeOffsetFromDeletedLedgers();

expectedLocation = 0;
for (int entryId = 0; entryId < numEntriesPerLedger; ++entryId) {
for (int ledgerId = 0; ledgerId < numLedgers; ++ledgerId) {
if (ledgerId % 2 == 0) {
assertEquals(0, idx.getLocation(ledgerId, entryId));
} else {
assertEquals(expectedLocation, idx.getLocation(ledgerId, entryId));
}
expectedLocation++;
}
}

idx.close();
}

// this tests if a ledger is added after it has been deleted
@Test
public void addLedgerAfterDeleteTest() throws Exception {
Expand Down

0 comments on commit e159510

Please sign in to comment.