Add in-memory cache for getLastEntryInLedger to reduce RocksDB getFloor CPU cost#4732
Add in-memory cache for getLastEntryInLedger to reduce RocksDB getFloor CPU cost#4732hangc0276 wants to merge 1 commit intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a bounded in-memory cache in EntryLocationIndex to reduce expensive RocksDB getFloor() seeks when repeatedly querying the last entry in a ledger, targeting CPU saturation scenarios during high-concurrency ledger loads (e.g., broker failover).
Changes:
- Added a
ConcurrentLongLongHashMap-backedledgerId -> lastEntryIdcache checked ongetLastEntryInLedgerInternal(). - Updated
addLocation()anddelete()to keep the cache warm/consistent with writes and invalidations. - Added a unit test covering cache hit behavior, update-on-write, and delete behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java |
Adds/configures the last-entry cache and integrates it into read/write/delete paths. |
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java |
Adds a new test exercising the cache behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Populate cache for future lookups | ||
| if (lastEntryCache.size() >= lastEntryCacheMaxSize) { | ||
| lastEntryCache.clear(); | ||
| } | ||
| lastEntryCache.put(ledgerId, lastEntryId); |
There was a problem hiding this comment.
ConcurrentLongLongHashMap requires values to be >= 0 (it uses -1 as a sentinel). lastEntryId can be -1 when the last key for a ledger is the special entry (see existing tests that write entryId -1), so lastEntryCache.put(ledgerId, lastEntryId) can throw IllegalArgumentException and break getLastEntryInLedger(). Consider guarding against negative lastEntryId or storing an encoded value (eg. entryId + 1) so -1 can be cached safely.
| stats.getGetLastEntryInLedgerStats() | ||
| .registerSuccessfulEvent(0, TimeUnit.NANOSECONDS); | ||
| return cachedLastEntry; |
There was a problem hiding this comment.
The cache hit path records a latency of 0 via registerSuccessfulEvent(0, ...). This will skew getLastEntryInLedger latency metrics (and can make averages look artificially perfect under high cache hit rates). Please record the actual elapsed time for cache hits (eg. measure from method entry) or use a separate counter/metric for cache hits if you want to distinguish them.
| // Update the last entry cache if this entry is newer. | ||
| // ConcurrentLongLongHashMap.get() returns -1 if not found. | ||
| long cachedLastEntry = lastEntryCache.get(ledgerId); | ||
| if (cachedLastEntry < entryId) { | ||
| // Clear the cache if it exceeds the max size to bound memory usage. | ||
| // The cache will quickly repopulate with hot ledgers on subsequent reads. | ||
| if (cachedLastEntry < 0 && lastEntryCache.size() >= lastEntryCacheMaxSize) { | ||
| lastEntryCache.clear(); | ||
| } | ||
| lastEntryCache.put(ledgerId, entryId); | ||
| } |
There was a problem hiding this comment.
addLocation() can repopulate lastEntryCache even after delete(ledgerId) has been called (the cache update happens unconditionally here). That creates a correctness risk with existing behavior: removeOffsetFromDeletedLedgers() later removes ledgerId from deletedLedgers and deletes the DB range, but it does not clear lastEntryCache again. If entries were added after delete() (see addLedgerAfterDeleteTest), getLastEntryInLedger() may return a stale cached entryId for a ledger whose indexes were deleted. Consider skipping cache updates when deletedLedgers.contains(ledgerId) is true, and/or clearing cache entries for ledgersToDelete inside removeOffsetFromDeletedLedgers() before removing them from deletedLedgers.
| // Delete should invalidate cache | ||
| idx.delete(1); | ||
| try { | ||
| idx.getLastEntryInLedger(1); | ||
| fail("Should have thrown NoEntryException"); | ||
| } catch (Bookie.NoEntryException e) { | ||
| // expected | ||
| } |
There was a problem hiding this comment.
This test’s “cache invalidation on delete” assertion currently relies on deletedLedgers.contains(ledgerId) throwing before the cache is consulted, so it doesn’t actually verify that the cache entry was cleared (or that stale cache entries can’t leak after removeOffsetFromDeletedLedgers() removes the ledger from deletedLedgers). To validate cache invalidation, consider calling idx.removeOffsetFromDeletedLedgers() after idx.delete(1) and then asserting getLastEntryInLedger(1) throws because the DB entries are gone (which would fail if the cache still returned a stale value).
| this.lastEntryCacheMaxSize = conf.getLong(LAST_ENTRY_CACHE_MAX_SIZE, DEFAULT_LAST_ENTRY_CACHE_MAX_SIZE); | ||
| this.lastEntryCache = ConcurrentLongLongHashMap.newBuilder() | ||
| .expectedItems((int) Math.min(lastEntryCacheMaxSize, Integer.MAX_VALUE)) | ||
| .build(); |
There was a problem hiding this comment.
lastEntryCacheMaxSize is used directly as expectedItems(...) with the default ConcurrentLongLongHashMap concurrency level (16). ConcurrentLongLongHashMap enforces expectedItems > 0 and expectedItems >= concurrencyLevel, so setting dbStorage_lastEntryCacheMaxSize to a small value (e.g., 1–15) or 0 will throw IllegalArgumentException during EntryLocationIndex construction. Please clamp/validate the config and/or explicitly set the cache map concurrency level (eg. concurrencyLevel(min(16, expectedItems))) and treat <= 0 as “cache disabled”.
| this.lastEntryCacheMaxSize = conf.getLong(LAST_ENTRY_CACHE_MAX_SIZE, DEFAULT_LAST_ENTRY_CACHE_MAX_SIZE); | |
| this.lastEntryCache = ConcurrentLongLongHashMap.newBuilder() | |
| .expectedItems((int) Math.min(lastEntryCacheMaxSize, Integer.MAX_VALUE)) | |
| .build(); | |
| long configuredLastEntryCacheMaxSize = | |
| conf.getLong(LAST_ENTRY_CACHE_MAX_SIZE, DEFAULT_LAST_ENTRY_CACHE_MAX_SIZE); | |
| if (configuredLastEntryCacheMaxSize <= 0) { | |
| // Treat non-positive values as "cache disabled", but still create a minimal map | |
| this.lastEntryCacheMaxSize = 0; | |
| this.lastEntryCache = ConcurrentLongLongHashMap.newBuilder() | |
| .expectedItems(1) | |
| .concurrencyLevel(1) | |
| .build(); | |
| } else { | |
| this.lastEntryCacheMaxSize = configuredLastEntryCacheMaxSize; | |
| int expectedItems = (int) Math.min(this.lastEntryCacheMaxSize, Integer.MAX_VALUE); | |
| int concurrencyLevel = Math.min(16, expectedItems); | |
| this.lastEntryCache = ConcurrentLongLongHashMap.newBuilder() | |
| .expectedItems(expectedItems) | |
| .concurrencyLevel(concurrencyLevel) | |
| .build(); | |
| } |
Motivation
EntryLocationIndex.getLastEntryInLedgerInternal()callslocationsDb.getFloor()on every invocation, which is an expensive RocksDB seek operation. When a large number of ledgers query their last entry concurrently — for example, during a Pulsar broker failover where thousands of topics are loaded by another broker and send get-LAC requests — this causes severe CPU saturation on the bookie.In a production Pulsar cluster with 3000+ topics and a bookie configured with 1 CPU, a single broker OOM event triggered massive concurrent
getFloor()calls that throttled the bookie CPU for over 1 hour.The existing
WriteCachealready caches last entry IDs for recent unflushed entries, but once entries are flushed to RocksDB, everygetLastEntryInLedger()call falls through to the database with no caching layer.Changes
Add a bounded in-memory
ConcurrentLongLongHashMapcache inEntryLocationIndexthat mapsledgerId → lastEntryId:getLastEntryInLedgerInternal): Check the cache before callinggetFloor(). On cache hit, return immediately without touching RocksDB. On cache miss, populate the cache after asuccessful
getFloor()lookup.addLocation): Eagerly update the cache when a newer entry is added, so the cache stays current even with interleaved reads and writes.delete): Remove the ledger from the cache.dbStorage_lastEntryCacheMaxSize(default: 10,000). When the cache exceeds this limit and a new ledger needs to be inserted, the cache is cleared and repopulatesnaturally with hot ledgers.
Using
ConcurrentLongLongHashMap(lock-striped open hash map with primitive long keys/values) ensures minimal overhead on the write-hot path — no boxing, no LRU bookkeeping, no timestamp computationper update.
Test plan
testGetLastEntryInLedgerCachecovering cache hit, update on new entry, and invalidation on deleteEntryLocationIndexTesttests pass