Skip to content

Commit

Permalink
Avoid usage of RocksDB deleteRange() in DbLedgerStorage
Browse files Browse the repository at this point in the history
### Motivation

There are few issues that are reconducible to a performance degradation in RocksDB when using deleteRange() feature (eg: apache/pulsar#1737 and others).

There is some discussion going on RocksDB to address this issue: facebook/rocksdb#3959

In the meantime, we should rollback the change and don't use deleteRange until these issues are resolved.

### Changes

This PR is essentially reverting back the commit YahooArchive@4b84990 from Yahoo branch (which was squashed when merging back to apache).
The only addition here is to use `DELETE_ENTRIES_BATCH_SIZE` to amortize the cost of `batch.flush()` when there are many ledgers with few entries.

Author: Matteo Merli <mmerli@apache.org>

Reviewers: Ivan Kelly <ivank@apache.org>, Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org>

This closes #1620 from merlimat/rollback-delete-range
  • Loading branch information
merlimat authored and sijie committed Aug 23, 2018
1 parent a5b4b93 commit dca471d
Showing 1 changed file with 55 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.file.FileSystems;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.EntryLocation;
Expand Down Expand Up @@ -188,6 +189,8 @@ public void delete(long ledgerId) throws IOException {
deletedLedgers.add(ledgerId);
}

private static final int DELETE_ENTRIES_BATCH_SIZE = 100000;

public void removeOffsetFromDeletedLedgers() throws IOException {
LongPairWrapper firstKeyWrapper = LongPairWrapper.get(-1, -1);
LongPairWrapper lastKeyWrapper = LongPairWrapper.get(-1, -1);
Expand All @@ -200,6 +203,10 @@ public void removeOffsetFromDeletedLedgers() throws IOException {
}

log.info("Deleting indexes for ledgers: {}", ledgersToDelete);
long startTime = System.nanoTime();
long deletedEntries = 0;
long deletedEntriesInBatch = 0;

Batch batch = locationsDb.newBatch();

try {
Expand All @@ -211,20 +218,58 @@ public void removeOffsetFromDeletedLedgers() throws IOException {
firstKeyWrapper.set(ledgerId, 0);
lastKeyWrapper.set(ledgerId, Long.MAX_VALUE);

batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array);
}
Entry<byte[], byte[]> firstKeyRes = locationsDb.getCeil(firstKeyWrapper.array);
if (firstKeyRes == null || ArrayUtil.getLong(firstKeyRes.getKey(), 0) != ledgerId) {
// No entries found for ledger
if (log.isDebugEnabled()) {
log.debug("No entries found for ledger {}", ledgerId);
}
continue;
}

batch.flush();
long firstEntryId = ArrayUtil.getLong(firstKeyRes.getKey(), 8);
long lastEntryId = getLastEntryInLedgerInternal(ledgerId);
if (log.isDebugEnabled()) {
log.debug("Deleting index for ledger {} entries ({} -> {})",
ledgerId, firstEntryId, lastEntryId);
}

// Removed from pending set
for (long ledgerId : ledgersToDelete) {
deletedLedgers.remove(ledgerId);
// Iterate over all the keys and remove each of them
for (long entryId = firstEntryId; entryId <= lastEntryId; entryId++) {
keyToDelete.set(ledgerId, entryId);
if (log.isDebugEnabled()) {
log.debug("Deleting index for ({}, {})", keyToDelete.getFirst(), keyToDelete.getSecond());
}
batch.remove(keyToDelete.array);
++deletedEntriesInBatch;
++deletedEntries;
}

if (deletedEntriesInBatch > DELETE_ENTRIES_BATCH_SIZE) {
batch.flush();
batch.clear();
deletedEntriesInBatch = 0;
}
}
} finally {
firstKeyWrapper.recycle();
lastKeyWrapper.recycle();
keyToDelete.recycle();
batch.close();
try {
batch.flush();
batch.clear();
} finally {

firstKeyWrapper.recycle();
lastKeyWrapper.recycle();
keyToDelete.recycle();
batch.close();
}
}

log.info("Deleted indexes for {} entries from {} ledgers in {} seconds", deletedEntries, ledgersToDelete.size(),
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) / 1000.0);

// Removed from pending set
for (long ledgerId : ledgersToDelete) {
deletedLedgers.remove(ledgerId);
}
}

Expand Down

0 comments on commit dca471d

Please sign in to comment.