diff --git a/bookkeeper-proto/pom.xml b/bookkeeper-proto/pom.xml index cc9169da737..388bdcb7b2d 100644 --- a/bookkeeper-proto/pom.xml +++ b/bookkeeper-proto/pom.xml @@ -45,6 +45,7 @@ **/DataFormats.java **/BookkeeperProtocol.java + **/DbLedgerStorageDataFormats.java diff --git a/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto b/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto new file mode 100644 index 00000000000..e68b2d0d51c --- /dev/null +++ b/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto2"; + +option java_package = "org.apache.bookkeeper.bookie.storage.ldb"; +option optimize_for = SPEED; + +/** + * Ledger metadata stored in the bookie + */ +message LedgerData { + required bool exists = 1; + required bool fenced = 2; + required bytes masterKey = 3; +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java new file mode 100644 index 00000000000..fa800167776 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -0,0 +1,794 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; + +import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; + +import java.io.IOException; +import java.util.Observable; +import java.util.Observer; +import java.util.SortedMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.Bookie.NoEntryException; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.CheckpointSource; +import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; +import org.apache.bookkeeper.bookie.Checkpointer; +import org.apache.bookkeeper.bookie.CompactableLedgerStorage; +import org.apache.bookkeeper.bookie.EntryLocation; +import org.apache.bookkeeper.bookie.EntryLogger; +import org.apache.bookkeeper.bookie.GarbageCollectorThread; +import org.apache.bookkeeper.bookie.LedgerDirsManager; +import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData; +import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.MathUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of LedgerStorage that uses RocksDB to keep the indexes for + * entries stored in EntryLogs. + */ +public class DbLedgerStorage implements CompactableLedgerStorage { + + private EntryLogger entryLogger; + + private LedgerMetadataIndex ledgerIndex; + private EntryLocationIndex entryLocationIndex; + + private GarbageCollectorThread gcThread; + + // Write cache where all new entries are inserted into + protected WriteCache writeCache; + + // Write cache that is used to swap with writeCache during flushes + protected WriteCache writeCacheBeingFlushed; + + // Cache where we insert entries for speculative reading + private ReadCache readCache; + + private final ReentrantReadWriteLock writeCacheMutex = new ReentrantReadWriteLock(); + private final Condition flushWriteCacheCondition = writeCacheMutex.writeLock().newCondition(); + + protected final ReentrantLock flushMutex = new ReentrantLock(); + + protected final AtomicBoolean hasFlushBeenTriggered = new AtomicBoolean(false); + private final AtomicBoolean isFlushOngoing = new AtomicBoolean(false); + + private final ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage")); + + // Executor used to for db index cleanup + private final ExecutorService cleanupExecutor = Executors + .newSingleThreadExecutor(new DefaultThreadFactory("db-storage-cleanup")); + + static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb"; + static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize"; + static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb"; + + private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16; + private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16; + private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100; + + private static final int MB = 1024 * 1024; + + private final CopyOnWriteArrayList ledgerDeletionListeners = Lists + .newCopyOnWriteArrayList(); + + private long writeCacheMaxSize; + + private CheckpointSource checkpointSource = null; + private Checkpoint lastCheckpoint = Checkpoint.MIN; + + private long readCacheMaxSize; + private int readAheadCacheBatchSize; + + private StatsLogger stats; + + private OpStatsLogger addEntryStats; + private OpStatsLogger readEntryStats; + private OpStatsLogger readCacheHitStats; + private OpStatsLogger readCacheMissStats; + private OpStatsLogger readAheadBatchCountStats; + private OpStatsLogger readAheadBatchSizeStats; + private OpStatsLogger flushStats; + private OpStatsLogger flushSizeStats; + + @Override + public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, + LedgerDirsManager indexDirsManager, CheckpointSource checkpointSource, Checkpointer checkpointer, + StatsLogger statsLogger) throws IOException { + checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1, + "Db implementation only allows for one storage dir"); + + String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString(); + + writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB; + + writeCache = new WriteCache(writeCacheMaxSize / 2); + writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2); + + this.checkpointSource = checkpointSource; + + readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB; + readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE); + + readCache = new ReadCache(readCacheMaxSize); + + this.stats = statsLogger; + + log.info("Started Db Ledger Storage"); + log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB); + log.info(" - Read Cache: {} MB", readCacheMaxSize / MB); + log.info(" - Read Ahead Batch size: : {}", readAheadCacheBatchSize); + + ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats); + entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats); + + entryLogger = new EntryLogger(conf, ledgerDirsManager); + gcThread = new GarbageCollectorThread(conf, ledgerManager, this); + + registerStats(); + } + + public void registerStats() { + stats.registerGauge("write-cache-size", new Gauge() { + @Override + public Long getDefaultValue() { + return 0L; + } + + @Override + public Long getSample() { + return writeCache.size() + writeCacheBeingFlushed.size(); + } + }); + stats.registerGauge("write-cache-count", new Gauge() { + @Override + public Long getDefaultValue() { + return 0L; + } + + @Override + public Long getSample() { + return writeCache.count() + writeCacheBeingFlushed.count(); + } + }); + stats.registerGauge("read-cache-size", new Gauge() { + @Override + public Long getDefaultValue() { + return 0L; + } + + @Override + public Long getSample() { + return readCache.size(); + } + }); + stats.registerGauge("read-cache-count", new Gauge() { + @Override + public Long getDefaultValue() { + return 0L; + } + + @Override + public Long getSample() { + return readCache.count(); + } + }); + + addEntryStats = stats.getOpStatsLogger("add-entry"); + readEntryStats = stats.getOpStatsLogger("read-entry"); + readCacheHitStats = stats.getOpStatsLogger("read-cache-hits"); + readCacheMissStats = stats.getOpStatsLogger("read-cache-misses"); + readAheadBatchCountStats = stats.getOpStatsLogger("readahead-batch-count"); + readAheadBatchSizeStats = stats.getOpStatsLogger("readahead-batch-size"); + flushStats = stats.getOpStatsLogger("flush"); + flushSizeStats = stats.getOpStatsLogger("flush-size"); + } + + @Override + public void start() { + gcThread.start(); + } + + @Override + public void shutdown() throws InterruptedException { + try { + flush(); + + gcThread.shutdown(); + entryLogger.shutdown(); + + cleanupExecutor.shutdown(); + cleanupExecutor.awaitTermination(1, TimeUnit.SECONDS); + + ledgerIndex.close(); + entryLocationIndex.close(); + + writeCache.close(); + writeCacheBeingFlushed.close(); + readCache.close(); + executor.shutdown(); + + } catch (IOException e) { + log.error("Error closing db storage", e); + } + } + + @Override + public boolean ledgerExists(long ledgerId) throws IOException { + try { + LedgerData ledgerData = ledgerIndex.get(ledgerId); + if (log.isDebugEnabled()) { + log.debug("Ledger exists. ledger: {} : {}", ledgerId, ledgerData.getExists()); + } + return ledgerData.getExists(); + } catch (Bookie.NoLedgerException nle) { + // ledger does not exist + return false; + } + } + + @Override + public boolean isFenced(long ledgerId) throws IOException { + if (log.isDebugEnabled()) { + log.debug("isFenced. ledger: {}", ledgerId); + } + return ledgerIndex.get(ledgerId).getFenced(); + } + + @Override + public boolean setFenced(long ledgerId) throws IOException { + if (log.isDebugEnabled()) { + log.debug("Set fenced. ledger: {}", ledgerId); + } + return ledgerIndex.setFenced(ledgerId); + } + + @Override + public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException { + if (log.isDebugEnabled()) { + log.debug("Set master key. ledger: {}", ledgerId); + } + ledgerIndex.setMasterKey(ledgerId, masterKey); + } + + @Override + public byte[] readMasterKey(long ledgerId) throws IOException, BookieException { + if (log.isDebugEnabled()) { + log.debug("Read master key. ledger: {}", ledgerId); + } + return ledgerIndex.get(ledgerId).getMasterKey().toByteArray(); + } + + @Override + public long addEntry(ByteBuf entry) throws IOException { + long startTime = MathUtils.nowInNano(); + + long ledgerId = entry.readLong(); + long entryId = entry.readLong(); + entry.resetReaderIndex(); + + if (log.isDebugEnabled()) { + log.debug("Add entry. {}@{}", ledgerId, entryId); + } + + // Waits if the write cache is being switched for a flush + writeCacheMutex.readLock().lock(); + boolean inserted; + try { + inserted = writeCache.put(ledgerId, entryId, entry); + } finally { + writeCacheMutex.readLock().unlock(); + } + + if (!inserted) { + triggerFlushAndAddEntry(ledgerId, entryId, entry); + } + + recordSuccessfulEvent(addEntryStats, startTime); + return entryId; + } + + private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) throws IOException { + // Write cache is full, we need to trigger a flush so that it gets rotated + writeCacheMutex.writeLock().lock(); + + try { + // If the flush has already been triggered or flush has already switched the + // cache, we don't need to + // trigger another flush + if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) { + // Trigger an early flush in background + log.info("Write cache is full, triggering flush"); + executor.execute(() -> { + try { + flush(); + } catch (IOException e) { + log.error("Error during flush", e); + } + }); + } + + long timeoutNs = TimeUnit.MILLISECONDS.toNanos(100); + while (hasFlushBeenTriggered.get()) { + if (timeoutNs <= 0L) { + throw new IOException("Write cache was not trigger within the timeout, cannot add entry " + ledgerId + + "@" + entryId); + } + timeoutNs = flushWriteCacheCondition.awaitNanos(timeoutNs); + } + + if (!writeCache.put(ledgerId, entryId, entry)) { + // Still wasn't able to cache entry + throw new IOException("Error while inserting entry in write cache" + ledgerId + "@" + entryId); + } + + } catch (InterruptedException e) { + throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId); + } finally { + writeCacheMutex.writeLock().unlock(); + } + } + + @Override + public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { + long startTime = MathUtils.nowInNano(); + if (log.isDebugEnabled()) { + log.debug("Get Entry: {}@{}", ledgerId, entryId); + } + + if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) { + return getLastEntry(ledgerId); + } + + writeCacheMutex.readLock().lock(); + try { + // First try to read from the write cache of recent entries + ByteBuf entry = writeCache.get(ledgerId, entryId); + if (entry != null) { + recordSuccessfulEvent(readCacheHitStats, startTime); + recordSuccessfulEvent(readEntryStats, startTime); + return entry; + } + + // If there's a flush going on, the entry might be in the flush buffer + entry = writeCacheBeingFlushed.get(ledgerId, entryId); + if (entry != null) { + recordSuccessfulEvent(readCacheHitStats, startTime); + recordSuccessfulEvent(readEntryStats, startTime); + return entry; + } + } finally { + writeCacheMutex.readLock().unlock(); + } + + // Try reading from read-ahead cache + ByteBuf entry = readCache.get(ledgerId, entryId); + if (entry != null) { + recordSuccessfulEvent(readCacheHitStats, startTime); + recordSuccessfulEvent(readEntryStats, startTime); + return entry; + } + + // Read from main storage + long entryLocation; + try { + entryLocation = entryLocationIndex.getLocation(ledgerId, entryId); + if (entryLocation == 0) { + throw new NoEntryException(ledgerId, entryId); + } + entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); + } catch (NoEntryException e) { + recordFailedEvent(readEntryStats, startTime); + throw e; + } + + readCache.put(ledgerId, entryId, entry); + + // Try to read more entries + long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); + fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); + + recordSuccessfulEvent(readCacheMissStats, startTime); + recordSuccessfulEvent(readEntryStats, startTime); + return entry; + } + + private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long firstEntryLocation) { + try { + long firstEntryLogId = (firstEntryLocation >> 32); + long currentEntryLogId = firstEntryLogId; + long currentEntryLocation = firstEntryLocation; + int count = 0; + long size = 0; + + while (count < readAheadCacheBatchSize && currentEntryLogId == firstEntryLogId) { + ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, -1, currentEntryLocation); + + try { + long currentEntryLedgerId = entry.getLong(0); + long currentEntryId = entry.getLong(8); + + if (currentEntryLedgerId != orginalLedgerId) { + // Found an entry belonging to a different ledger, stopping read-ahead + entry.release(); + return; + } + + // Insert entry in read cache + readCache.put(orginalLedgerId, currentEntryId, entry); + + count++; + size += entry.readableBytes(); + + currentEntryLocation += 4 + entry.readableBytes(); + currentEntryLogId = currentEntryLocation >> 32; + } finally { + entry.release(); + } + } + + readAheadBatchCountStats.registerSuccessfulValue(count); + readAheadBatchSizeStats.registerSuccessfulValue(size); + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("Exception during read ahead for ledger: {}: e", orginalLedgerId, e); + } + } + } + + public ByteBuf getLastEntry(long ledgerId) throws IOException { + long startTime = MathUtils.nowInNano(); + + writeCacheMutex.readLock().lock(); + try { + // First try to read from the write cache of recent entries + ByteBuf entry = writeCache.getLastEntry(ledgerId); + if (entry != null) { + if (log.isDebugEnabled()) { + long foundLedgerId = entry.readLong(); // ledgedId + long entryId = entry.readLong(); + entry.resetReaderIndex(); + if (log.isDebugEnabled()) { + log.debug("Found last entry for ledger {} in write cache: {}@{}", ledgerId, foundLedgerId, + entryId); + } + } + + recordSuccessfulEvent(readCacheHitStats, startTime); + recordSuccessfulEvent(readEntryStats, startTime); + return entry; + } + + // If there's a flush going on, the entry might be in the flush buffer + entry = writeCacheBeingFlushed.getLastEntry(ledgerId); + if (entry != null) { + if (log.isDebugEnabled()) { + entry.readLong(); // ledgedId + long entryId = entry.readLong(); + entry.resetReaderIndex(); + if (log.isDebugEnabled()) { + log.debug("Found last entry for ledger {} in write cache being flushed: {}", ledgerId, entryId); + } + } + + recordSuccessfulEvent(readCacheHitStats, startTime); + recordSuccessfulEvent(readEntryStats, startTime); + return entry; + } + } finally { + writeCacheMutex.readLock().unlock(); + } + + // Search the last entry in storage + long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId); + if (log.isDebugEnabled()) { + log.debug("Found last entry for ledger {} in db: {}", ledgerId, lastEntryId); + } + + long entryLocation = entryLocationIndex.getLocation(ledgerId, lastEntryId); + ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, entryLocation); + + recordSuccessfulEvent(readCacheMissStats, startTime); + recordSuccessfulEvent(readEntryStats, startTime); + return content; + } + + @VisibleForTesting + boolean isFlushRequired() { + writeCacheMutex.readLock().lock(); + try { + return !writeCache.isEmpty(); + } finally { + writeCacheMutex.readLock().unlock(); + } + } + + @Override + public void checkpoint(Checkpoint checkpoint) throws IOException { + Checkpoint thisCheckpoint = checkpointSource.newCheckpoint(); + if (lastCheckpoint.compareTo(checkpoint) > 0) { + return; + } + + long startTime = MathUtils.nowInNano(); + + // Only a single flush operation can happen at a time + flushMutex.lock(); + + try { + // Swap the write cache so that writes can continue to happen while the flush is + // ongoing + swapWriteCache(); + + long sizeToFlush = writeCacheBeingFlushed.size(); + if (log.isDebugEnabled()) { + log.debug("Flushing entries. count: {} -- size {} Mb", writeCacheBeingFlushed.count(), + sizeToFlush / 1024.0 / 1024); + } + + // Write all the pending entries into the entry logger and collect the offset + // position for each entry + + Batch batch = entryLocationIndex.newBatch(); + writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> { + try { + long location = entryLogger.addEntry(ledgerId, entry, true); + entryLocationIndex.addLocation(batch, ledgerId, entryId, location); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + entryLogger.flush(); + + long batchFlushStarTime = System.nanoTime(); + batch.flush(); + batch.close(); + if (log.isDebugEnabled()) { + log.debug("DB batch flushed time : {} s", + MathUtils.elapsedNanos(batchFlushStarTime) / (double) TimeUnit.SECONDS.toNanos(1)); + } + + ledgerIndex.flush(); + + cleanupExecutor.execute(() -> { + // There can only be one single cleanup task running because the cleanupExecutor + // is single-threaded + try { + if (log.isDebugEnabled()) { + log.debug("Removing deleted ledgers from db indexes"); + } + + entryLocationIndex.removeOffsetFromDeletedLedgers(); + ledgerIndex.removeDeletedLedgers(); + } catch (Throwable t) { + log.warn("Failed to cleanup db indexes", t); + } + }); + + lastCheckpoint = thisCheckpoint; + + // Discard all the entry from the write cache, since they're now persisted + writeCacheBeingFlushed.clear(); + + double flushTimeSeconds = MathUtils.elapsedNanos(startTime) / (double) TimeUnit.SECONDS.toNanos(1); + double flushThroughput = sizeToFlush / 1024.0 / 1024.0 / flushTimeSeconds; + + if (log.isDebugEnabled()) { + log.debug("Flushing done time {} s -- Written {} MB/s", flushTimeSeconds, flushThroughput); + } + + recordSuccessfulEvent(flushStats, startTime); + flushSizeStats.registerSuccessfulValue(sizeToFlush); + } catch (IOException e) { + // Leave IOExecption as it is + throw e; + } catch (RuntimeException e) { + // Wrap unchecked exceptions + throw new IOException(e); + } finally { + try { + isFlushOngoing.set(false); + } finally { + flushMutex.unlock(); + } + } + } + + /** + * Swap the current write cache with the replacement cache. + */ + private void swapWriteCache() { + writeCacheMutex.writeLock().lock(); + try { + // First, swap the current write-cache map with an empty one so that writes will + // go on unaffected. Only a single flush is happening at the same time + WriteCache tmp = writeCacheBeingFlushed; + writeCacheBeingFlushed = writeCache; + writeCache = tmp; + + // since the cache is switched, we can allow flush to be triggered + hasFlushBeenTriggered.set(false); + flushWriteCacheCondition.signalAll(); + } finally { + try { + isFlushOngoing.set(true); + } finally { + writeCacheMutex.writeLock().unlock(); + } + } + } + + @Override + public void flush() throws IOException { + checkpoint(Checkpoint.MAX); + } + + @Override + public void deleteLedger(long ledgerId) throws IOException { + if (log.isDebugEnabled()) { + log.debug("Deleting ledger {}", ledgerId); + } + + // Delete entries from this ledger that are still in the write cache + writeCacheMutex.readLock().lock(); + try { + writeCache.deleteLedger(ledgerId); + } finally { + writeCacheMutex.readLock().unlock(); + } + + entryLocationIndex.delete(ledgerId); + ledgerIndex.delete(ledgerId); + + for (int i = 0, size = ledgerDeletionListeners.size(); i < size; i++) { + LedgerDeletionListener listener = ledgerDeletionListeners.get(i); + listener.ledgerDeleted(ledgerId); + } + } + + @Override + public Iterable getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException { + return ledgerIndex.getActiveLedgersInRange(firstLedgerId, lastLedgerId); + } + + @Override + public void updateEntriesLocations(Iterable locations) throws IOException { + // Trigger a flush to have all the entries being compacted in the db storage + flush(); + + entryLocationIndex.updateLocations(locations); + } + + @Override + public EntryLogger getEntryLogger() { + return entryLogger; + } + + @Override + public long getLastAddConfirmed(long ledgerId) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getExplicitLac(long ledgerId) { + throw new UnsupportedOperationException(); + } + + @Override + public void flushEntriesLocationsIndex() throws IOException { + // No-op. Location index is already flushed in updateEntriesLocations() call + } + + /** + * Add an already existing ledger to the index. + * + *

This method is only used as a tool to help the migration from + * InterleaveLedgerStorage to DbLedgerStorage + * + * @param ledgerId + * the ledger id + * @param entries + * a map of entryId -> location + * @return the number of + */ + public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey, + Iterable> entries) throws Exception { + LedgerData ledgerData = LedgerData.newBuilder().setExists(true).setFenced(isFenced) + .setMasterKey(ByteString.copyFrom(masterKey)).build(); + ledgerIndex.set(ledgerId, ledgerData); + AtomicLong numberOfEntries = new AtomicLong(); + + // Iterate over all the entries pages + Batch batch = entryLocationIndex.newBatch(); + entries.forEach(map -> { + map.forEach((entryId, location) -> { + try { + entryLocationIndex.addLocation(batch, ledgerId, entryId, location); + } catch (IOException e) { + throw new RuntimeException(e); + } + + numberOfEntries.incrementAndGet(); + }); + }); + + batch.flush(); + batch.close(); + + return numberOfEntries.get(); + } + + @Override + public void registerLedgerDeletionListener(LedgerDeletionListener listener) { + ledgerDeletionListeners.add(listener); + } + + public EntryLocationIndex getEntryLocationIndex() { + return entryLocationIndex; + } + + private void recordSuccessfulEvent(OpStatsLogger logger, long startTimeNanos) { + logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } + + private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) { + logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } + + private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class); +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java new file mode 100644 index 00000000000..53e37a263cf --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java @@ -0,0 +1,232 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import com.google.common.collect.Iterables; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.EntryLocation; +import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch; +import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintains an index of the entry locations in the EntryLogger. + * + *

For each ledger multiple entries are stored in the same "record", represented + * by the {@link LedgerIndexPage} class. + */ +public class EntryLocationIndex implements Closeable { + + private final KeyValueStorage locationsDb; + private final ConcurrentLongHashSet deletedLedgers = new ConcurrentLongHashSet(); + + private StatsLogger stats; + + public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath, + StatsLogger stats) throws IOException { + String locationsDbPath = FileSystems.getDefault().getPath(basePath, "locations").toFile().toString(); + locationsDb = storageFactory.newKeyValueStorage(locationsDbPath, DbConfigType.Huge, conf); + + this.stats = stats; + registerStats(); + } + + public void registerStats() { + stats.registerGauge("entries-count", new Gauge() { + @Override + public Long getDefaultValue() { + return 0L; + } + + @Override + public Long getSample() { + try { + return locationsDb.count(); + } catch (IOException e) { + return -1L; + } + } + }); + } + + @Override + public void close() throws IOException { + locationsDb.close(); + } + + public long getLocation(long ledgerId, long entryId) throws IOException { + LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId); + LongWrapper value = LongWrapper.get(); + + try { + if (locationsDb.get(key.array, value.array) < 0) { + if (log.isDebugEnabled()) { + log.debug("Entry not found {}@{} in db index", ledgerId, entryId); + } + return 0; + } + + return value.getValue(); + } finally { + key.recycle(); + value.recycle(); + } + } + + public long getLastEntryInLedger(long ledgerId) throws IOException { + if (deletedLedgers.contains(ledgerId)) { + // Ledger already deleted + return -1; + } + + return getLastEntryInLedgerInternal(ledgerId); + } + + private long getLastEntryInLedgerInternal(long ledgerId) throws IOException { + LongPairWrapper maxEntryId = LongPairWrapper.get(ledgerId, Long.MAX_VALUE); + + // Search the last entry in storage + Entry entry = locationsDb.getFloor(maxEntryId.array); + maxEntryId.recycle(); + + if (entry == null) { + throw new Bookie.NoEntryException(ledgerId, -1); + } else { + long foundLedgerId = ArrayUtil.getLong(entry.getKey(), 0); + long lastEntryId = ArrayUtil.getLong(entry.getKey(), 8); + + if (foundLedgerId == ledgerId) { + if (log.isDebugEnabled()) { + log.debug("Found last page in storage db for ledger {} - last entry: {}", ledgerId, lastEntryId); + } + return lastEntryId; + } else { + throw new Bookie.NoEntryException(ledgerId, -1); + } + } + } + + public void addLocation(long ledgerId, long entryId, long location) throws IOException { + Batch batch = locationsDb.newBatch(); + addLocation(batch, ledgerId, entryId, location); + batch.flush(); + batch.close(); + } + + public Batch newBatch() { + return locationsDb.newBatch(); + } + + public void addLocation(Batch batch, long ledgerId, long entryId, long location) throws IOException { + LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId); + LongWrapper value = LongWrapper.get(location); + + if (log.isDebugEnabled()) { + log.debug("Add location - ledger: {} -- entry: {} -- location: {}", ledgerId, entryId, location); + } + + try { + batch.put(key.array, value.array); + } finally { + key.recycle(); + value.recycle(); + } + } + + public void updateLocations(Iterable newLocations) throws IOException { + if (log.isDebugEnabled()) { + log.debug("Update locations -- {}", Iterables.size(newLocations)); + } + + Batch batch = newBatch(); + // Update all the ledger index pages with the new locations + for (EntryLocation e : newLocations) { + if (log.isDebugEnabled()) { + log.debug("Update location - ledger: {} -- entry: {}", e.ledger, e.entry); + } + + addLocation(batch, e.ledger, e.entry, e.location); + } + + batch.flush(); + batch.close(); + } + + public void delete(long ledgerId) throws IOException { + // We need to find all the LedgerIndexPage records belonging to one specific + // ledgers + deletedLedgers.add(ledgerId); + } + + public void removeOffsetFromDeletedLedgers() throws IOException { + LongPairWrapper firstKeyWrapper = LongPairWrapper.get(-1, -1); + LongPairWrapper lastKeyWrapper = LongPairWrapper.get(-1, -1); + LongPairWrapper keyToDelete = LongPairWrapper.get(-1, -1); + + Set ledgersToDelete = deletedLedgers.items(); + + if (ledgersToDelete.isEmpty()) { + return; + } + + log.info("Deleting indexes for ledgers: {}", ledgersToDelete); + Batch batch = locationsDb.newBatch(); + + try { + for (long ledgerId : ledgersToDelete) { + if (log.isDebugEnabled()) { + log.debug("Deleting indexes from ledger {}", ledgerId); + } + + firstKeyWrapper.set(ledgerId, 0); + lastKeyWrapper.set(ledgerId, Long.MAX_VALUE); + + batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array); + } + + batch.flush(); + + // Removed from pending set + for (long ledgerId : ledgersToDelete) { + deletedLedgers.remove(ledgerId); + } + } finally { + firstKeyWrapper.recycle(); + lastKeyWrapper.recycle(); + keyToDelete.recycle(); + batch.close(); + } + } + + private static final Logger log = LoggerFactory.getLogger(EntryLocationIndex.class); +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java new file mode 100644 index 00000000000..04bf32dba71 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java @@ -0,0 +1,262 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.protobuf.ByteString; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.util.AbstractMap.SimpleEntry; +import java.util.Arrays; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData; +import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator; +import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintains an index for the ledgers metadata. + * + *

The key is the ledgerId and the value is the {@link LedgerData} content. + */ +public class LedgerMetadataIndex implements Closeable { + // Contains all ledgers stored in the bookie + private final ConcurrentLongHashMap ledgers; + private final AtomicInteger ledgersCount; + + private final KeyValueStorage ledgersDb; + private StatsLogger stats; + + // Holds ledger modifications applied in memory map, and pending to be flushed on db + private final ConcurrentLinkedQueue> pendingLedgersUpdates; + + // Holds ledger ids that were delete from memory map, and pending to be flushed on db + private final ConcurrentLinkedQueue pendingDeletedLedgers; + + public LedgerMetadataIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath, + StatsLogger stats) throws IOException { + String ledgersPath = FileSystems.getDefault().getPath(basePath, "ledgers").toFile().toString(); + ledgersDb = storageFactory.newKeyValueStorage(ledgersPath, DbConfigType.Small, conf); + + ledgers = new ConcurrentLongHashMap<>(); + ledgersCount = new AtomicInteger(); + + // Read all ledgers from db + CloseableIterator> iterator = ledgersDb.iterator(); + try { + while (iterator.hasNext()) { + Entry entry = iterator.next(); + long ledgerId = ArrayUtil.getLong(entry.getKey(), 0); + LedgerData ledgerData = LedgerData.parseFrom(entry.getValue()); + ledgers.put(ledgerId, ledgerData); + ledgersCount.incrementAndGet(); + } + } finally { + iterator.close(); + } + + this.pendingLedgersUpdates = new ConcurrentLinkedQueue>(); + this.pendingDeletedLedgers = new ConcurrentLinkedQueue(); + + this.stats = stats; + registerStats(); + } + + public void registerStats() { + stats.registerGauge("ledgers-count", new Gauge() { + @Override + public Long getDefaultValue() { + return 0L; + } + + @Override + public Long getSample() { + return (long) ledgersCount.get(); + } + }); + } + + @Override + public void close() throws IOException { + ledgersDb.close(); + } + + public LedgerData get(long ledgerId) throws IOException { + LedgerData ledgerData = ledgers.get(ledgerId); + if (ledgerData == null) { + if (log.isDebugEnabled()) { + log.debug("Ledger not found {}", ledgerId); + } + throw new Bookie.NoLedgerException(ledgerId); + } + + return ledgerData; + } + + public void set(long ledgerId, LedgerData ledgerData) throws IOException { + ledgerData = LedgerData.newBuilder(ledgerData).setExists(true).build(); + + if (ledgers.put(ledgerId, ledgerData) == null) { + if (log.isDebugEnabled()) { + log.debug("Added new ledger {}", ledgerId); + } + ledgersCount.incrementAndGet(); + } + + pendingLedgersUpdates.add(new SimpleEntry(ledgerId, ledgerData)); + pendingDeletedLedgers.remove(ledgerId); + } + + public void delete(long ledgerId) throws IOException { + if (ledgers.remove(ledgerId) != null) { + if (log.isDebugEnabled()) { + log.debug("Removed ledger {}", ledgerId); + } + ledgersCount.decrementAndGet(); + } + + pendingDeletedLedgers.add(ledgerId); + pendingLedgersUpdates.removeIf(e -> e.getKey() == ledgerId); + } + + public Iterable getActiveLedgersInRange(final long firstLedgerId, final long lastLedgerId) + throws IOException { + return Iterables.filter(ledgers.keys(), new Predicate() { + @Override + public boolean apply(Long ledgerId) { + return ledgerId >= firstLedgerId && ledgerId < lastLedgerId; + } + }); + } + + public boolean setFenced(long ledgerId) throws IOException { + LedgerData ledgerData = get(ledgerId); + if (ledgerData.getFenced()) { + return false; + } + + LedgerData newLedgerData = LedgerData.newBuilder(ledgerData).setFenced(true).build(); + + if (ledgers.put(ledgerId, newLedgerData) == null) { + // Ledger had been deleted + if (log.isDebugEnabled()) { + log.debug("Re-inserted fenced ledger {}", ledgerId); + } + ledgersCount.incrementAndGet(); + } else { + if (log.isDebugEnabled()) { + log.debug("Set fenced ledger {}", ledgerId); + } + } + + pendingLedgersUpdates.add(new SimpleEntry(ledgerId, newLedgerData)); + pendingDeletedLedgers.remove(ledgerId); + return true; + } + + public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException { + LedgerData ledgerData = ledgers.get(ledgerId); + if (ledgerData == null) { + // New ledger inserted + ledgerData = LedgerData.newBuilder().setExists(true).setFenced(false) + .setMasterKey(ByteString.copyFrom(masterKey)).build(); + if (log.isDebugEnabled()) { + log.debug("Inserting new ledger {}", ledgerId); + } + } else { + byte[] storedMasterKey = ledgerData.getMasterKey().toByteArray(); + if (ArrayUtil.isArrayAllZeros(storedMasterKey)) { + // update master key of the ledger + ledgerData = LedgerData.newBuilder(ledgerData).setMasterKey(ByteString.copyFrom(masterKey)).build(); + if (log.isDebugEnabled()) { + log.debug("Replace old master key {} with new master key {}", storedMasterKey, masterKey); + } + } else if (!Arrays.equals(storedMasterKey, masterKey) && !ArrayUtil.isArrayAllZeros(masterKey)) { + log.warn("Ledger {} masterKey in db can only be set once.", ledgerId); + throw new IOException(BookieException.create(BookieException.Code.IllegalOpException)); + } + } + + if (ledgers.put(ledgerId, ledgerData) == null) { + ledgersCount.incrementAndGet(); + } + + pendingLedgersUpdates.add(new SimpleEntry(ledgerId, ledgerData)); + pendingDeletedLedgers.remove(ledgerId); + } + + /** + * Flushes all pending changes. + */ + public void flush() throws IOException { + LongWrapper key = LongWrapper.get(); + + int updatedLedgers = 0; + while (!pendingLedgersUpdates.isEmpty()) { + Entry entry = pendingLedgersUpdates.poll(); + key.set(entry.getKey()); + byte[] value = entry.getValue().toByteArray(); + ledgersDb.put(key.array, value); + ++updatedLedgers; + } + + if (log.isDebugEnabled()) { + log.debug("Persisting updates to {} ledgers", updatedLedgers); + } + + ledgersDb.sync(); + key.recycle(); + } + + public void removeDeletedLedgers() throws IOException { + LongWrapper key = LongWrapper.get(); + + int deletedLedgers = 0; + while (!pendingDeletedLedgers.isEmpty()) { + long ledgerId = pendingDeletedLedgers.poll(); + key.set(ledgerId); + ledgersDb.delete(key.array); + deletedLedgers++; + } + + if (log.isDebugEnabled()) { + log.debug("Persisting deletes of ledgers {}", deletedLedgers); + } + + ledgersDb.sync(); + key.recycle(); + } + + private static final Logger log = LoggerFactory.getLogger(LedgerMetadataIndex.class); +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java new file mode 100644 index 00000000000..993297c2b36 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java @@ -0,0 +1,69 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; + +/** + * Recyclable wrapper that holds a pair of longs. + */ +class LongPairWrapper { + + final byte[] array = new byte[16]; + + public void set(long first, long second) { + ArrayUtil.setLong(array, 0, first); + ArrayUtil.setLong(array, 8, second); + } + + public long getFirst() { + return ArrayUtil.getLong(array, 0); + } + + public long getSecond() { + return ArrayUtil.getLong(array, 8); + } + + public static LongPairWrapper get(long first, long second) { + LongPairWrapper lp = RECYCLER.get(); + ArrayUtil.setLong(lp.array, 0, first); + ArrayUtil.setLong(lp.array, 8, second); + return lp; + } + + public void recycle() { + handle.recycle(this); + } + + private static final Recycler RECYCLER = new Recycler() { + @Override + protected LongPairWrapper newObject(Handle handle) { + return new LongPairWrapper(handle); + } + }; + + private final Handle handle; + + private LongPairWrapper(Handle handle) { + this.handle = handle; + } +} \ No newline at end of file diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java new file mode 100644 index 00000000000..144b9ee75a1 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java @@ -0,0 +1,67 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; + +/** + * Wrapper for a long serialized into a byte array. + */ +class LongWrapper { + + final byte[] array = new byte[8]; + + public void set(long value) { + ArrayUtil.setLong(array, 0, value); + } + + public long getValue() { + return ArrayUtil.getLong(array, 0); + } + + public static LongWrapper get() { + return RECYCLER.get(); + } + + public static LongWrapper get(long value) { + LongWrapper lp = RECYCLER.get(); + ArrayUtil.setLong(lp.array, 0, value); + return lp; + } + + public void recycle() { + handle.recycle(this); + } + + private static final Recycler RECYCLER = new Recycler() { + @Override + protected LongWrapper newObject(Handle handle) { + return new LongWrapper(handle); + } + }; + + private final Handle handle; + + private LongWrapper(Handle handle) { + this.handle = handle; + } +} \ No newline at end of file diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java new file mode 100644 index 00000000000..da7f32a8d06 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java @@ -0,0 +1,52 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import static org.junit.Assert.assertEquals; + +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Test; + +/** + * Unit test for {@link DbLedgerStorageBookieTest}. + */ +public class DbLedgerStorageBookieTest extends BookKeeperClusterTestCase { + + public DbLedgerStorageBookieTest() { + super(1); + baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + baseConf.setFlushInterval(60000); + baseConf.setGcWaitTime(60000); + } + + @Test + public void testRecoveryEmptyLedger() throws Exception { + LedgerHandle lh1 = bkc.createLedger(1, 1, DigestType.MAC, new byte[0]); + + // Force ledger close & recovery + LedgerHandle lh2 = bkc.openLedger(lh1.getId(), DigestType.MAC, new byte[0]); + + assertEquals(0, lh2.getLength()); + assertEquals(-1, lh2.getLastAddConfirmed()); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java new file mode 100644 index 00000000000..1e91c2ce831 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -0,0 +1,423 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.Bookie.NoEntryException; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.EntryLocation; +import org.apache.bookkeeper.bookie.EntryLogger; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for {@link DbLedgerStorage}. + */ +public class DbLedgerStorageTest { + + private DbLedgerStorage storage; + private File tmpDir; + + @Before + public void setup() throws Exception { + tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + File curDir = Bookie.getCurrentDirectory(tmpDir); + Bookie.checkDirectoryStructure(curDir); + + int gcWaitTime = 1000; + ServerConfiguration conf = new ServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setAllowLoopback(true); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerDirNames(new String[] { tmpDir.toString() }); + Bookie bookie = new Bookie(conf); + + storage = (DbLedgerStorage) bookie.getLedgerStorage(); + } + + @After + public void teardown() throws Exception { + storage.shutdown(); + tmpDir.delete(); + } + + @Test + public void simple() throws Exception { + assertEquals(false, storage.ledgerExists(3)); + try { + storage.isFenced(3); + fail("should have failed"); + } catch (Bookie.NoLedgerException nle) { + // OK + } + assertEquals(false, storage.ledgerExists(3)); + try { + storage.setFenced(3); + fail("should have failed"); + } catch (Bookie.NoLedgerException nle) { + // OK + } + storage.setMasterKey(3, "key".getBytes()); + try { + storage.setMasterKey(3, "other-key".getBytes()); + fail("should have failed"); + } catch (IOException ioe) { + assertTrue(ioe.getCause() instanceof BookieException.BookieIllegalOpException); + } + // setting the same key is NOOP + storage.setMasterKey(3, "key".getBytes()); + assertEquals(true, storage.ledgerExists(3)); + assertEquals(true, storage.setFenced(3)); + assertEquals(true, storage.isFenced(3)); + assertEquals(false, storage.setFenced(3)); + + storage.setMasterKey(4, "key".getBytes()); + assertEquals(false, storage.isFenced(4)); + assertEquals(true, storage.ledgerExists(4)); + + assertEquals("key", new String(storage.readMasterKey(4))); + + assertEquals(Lists.newArrayList(4L, 3L), Lists.newArrayList(storage.getActiveLedgersInRange(0, 100))); + assertEquals(Lists.newArrayList(4L, 3L), Lists.newArrayList(storage.getActiveLedgersInRange(3, 100))); + assertEquals(Lists.newArrayList(3L), Lists.newArrayList(storage.getActiveLedgersInRange(0, 4))); + + // Add / read entries + ByteBuf entry = Unpooled.buffer(1024); + entry.writeLong(4); // ledger id + entry.writeLong(1); // entry id + entry.writeBytes("entry-1".getBytes()); + + assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired()); + + assertEquals(1, storage.addEntry(entry)); + + assertEquals(true, ((DbLedgerStorage) storage).isFlushRequired()); + + // Read from write cache + ByteBuf res = storage.getEntry(4, 1); + assertEquals(entry, res); + + storage.flush(); + + assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired()); + + // Read from db + res = storage.getEntry(4, 1); + assertEquals(entry, res); + + try { + storage.getEntry(4, 2); + fail("Should have thrown exception"); + } catch (NoEntryException e) { + // ok + } + + ByteBuf entry2 = Unpooled.buffer(1024); + entry2.writeLong(4); // ledger id + entry2.writeLong(2); // entry id + entry2.writeBytes("entry-2".getBytes()); + + storage.addEntry(entry2); + + // Read last entry in ledger + res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED); + assertEquals(entry2, res); + + ByteBuf entry3 = Unpooled.buffer(1024); + entry3.writeLong(4); // ledger id + entry3.writeLong(3); // entry id + entry3.writeBytes("entry-3".getBytes()); + storage.addEntry(entry3); + + ByteBuf entry4 = Unpooled.buffer(1024); + entry4.writeLong(4); // ledger id + entry4.writeLong(4); // entry id + entry4.writeBytes("entry-4".getBytes()); + storage.addEntry(entry4); + + res = storage.getEntry(4, 4); + assertEquals(entry4, res); + + // Delete + assertEquals(true, storage.ledgerExists(4)); + storage.deleteLedger(4); + assertEquals(false, storage.ledgerExists(4)); + + // Should not throw exception event if the ledger was deleted + storage.getEntry(4, 4); + + storage.addEntry(Unpooled.wrappedBuffer(entry2)); + res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED); + assertEquals(entry4, res); + + // Get last entry from storage + storage.flush(); + + try { + storage.getEntry(4, 4); + fail("Should have thrown exception since the ledger was deleted"); + } catch (NoEntryException e) { + // ok + } + } + + @Test + public void testBookieCompaction() throws Exception { + storage.setMasterKey(4, "key".getBytes()); + + ByteBuf entry3 = Unpooled.buffer(1024); + entry3.writeLong(4); // ledger id + entry3.writeLong(3); // entry id + entry3.writeBytes("entry-3".getBytes()); + storage.addEntry(entry3); + + // Simulate bookie compaction + EntryLogger entryLogger = ((DbLedgerStorage) storage).getEntryLogger(); + // Rewrite entry-3 + ByteBuf newEntry3 = Unpooled.buffer(1024); + newEntry3.writeLong(4); // ledger id + newEntry3.writeLong(3); // entry id + newEntry3.writeBytes("new-entry-3".getBytes()); + long location = entryLogger.addEntry(4, newEntry3, false); + + List locations = Lists.newArrayList(new EntryLocation(4, 3, location)); + storage.updateEntriesLocations(locations); + + ByteBuf res = storage.getEntry(4, 3); + System.out.println("res: " + ByteBufUtil.hexDump(res)); + System.out.println("newEntry3: " + ByteBufUtil.hexDump(newEntry3)); + assertEquals(newEntry3, res); + } + + @Test + public void doubleDirectoryError() throws Exception { + int gcWaitTime = 1000; + ServerConfiguration conf = new ServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setAllowLoopback(true); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerDirNames(new String[] { "dir1", "dir2" }); + + try { + new Bookie(conf); + fail("Should have failed because of the 2 directories"); + } catch (IllegalArgumentException e) { + // ok + } + + } + + @Test + public void testRewritingEntries() throws Exception { + storage.setMasterKey(1, "key".getBytes()); + + try { + storage.getEntry(1, -1); + fail("Should throw exception"); + } catch (Bookie.NoEntryException e) { + // ok + } + + ByteBuf entry1 = Unpooled.buffer(1024); + entry1.writeLong(1); // ledger id + entry1.writeLong(1); // entry id + entry1.writeBytes("entry-1".getBytes()); + + storage.addEntry(entry1); + storage.flush(); + + ByteBuf newEntry1 = Unpooled.buffer(1024); + newEntry1.writeLong(1); // ledger id + newEntry1.writeLong(1); // entry id + newEntry1.writeBytes("new-entry-1".getBytes()); + + storage.addEntry(newEntry1); + storage.flush(); + + ByteBuf response = storage.getEntry(1, 1); + assertEquals(newEntry1, response); + } + + @Test + public void testEntriesOutOfOrder() throws Exception { + storage.setMasterKey(1, "key".getBytes()); + + ByteBuf entry2 = Unpooled.buffer(1024); + entry2.writeLong(1); // ledger id + entry2.writeLong(2); // entry id + entry2.writeBytes("entry-2".getBytes()); + + storage.addEntry(entry2); + + try { + storage.getEntry(1, 1); + fail("Entry doesn't exist"); + } catch (NoEntryException e) { + // Ok, entry doesn't exist + } + + ByteBuf res = storage.getEntry(1, 2); + assertEquals(entry2, res); + + ByteBuf entry1 = Unpooled.buffer(1024); + entry1.writeLong(1); // ledger id + entry1.writeLong(1); // entry id + entry1.writeBytes("entry-1".getBytes()); + + storage.addEntry(entry1); + + res = storage.getEntry(1, 1); + assertEquals(entry1, res); + + res = storage.getEntry(1, 2); + assertEquals(entry2, res); + + storage.flush(); + + res = storage.getEntry(1, 1); + assertEquals(entry1, res); + + res = storage.getEntry(1, 2); + assertEquals(entry2, res); + } + + @Test + public void testEntriesOutOfOrderWithFlush() throws Exception { + storage.setMasterKey(1, "key".getBytes()); + + ByteBuf entry2 = Unpooled.buffer(1024); + entry2.writeLong(1); // ledger id + entry2.writeLong(2); // entry id + entry2.writeBytes("entry-2".getBytes()); + + storage.addEntry(entry2); + + try { + storage.getEntry(1, 1); + fail("Entry doesn't exist"); + } catch (NoEntryException e) { + // Ok, entry doesn't exist + } + + ByteBuf res = storage.getEntry(1, 2); + assertEquals(entry2, res); + res.release(); + + storage.flush(); + + try { + storage.getEntry(1, 1); + fail("Entry doesn't exist"); + } catch (NoEntryException e) { + // Ok, entry doesn't exist + } + + res = storage.getEntry(1, 2); + assertEquals(entry2, res); + res.release(); + + ByteBuf entry1 = Unpooled.buffer(1024); + entry1.writeLong(1); // ledger id + entry1.writeLong(1); // entry id + entry1.writeBytes("entry-1".getBytes()); + + storage.addEntry(entry1); + + res = storage.getEntry(1, 1); + assertEquals(entry1, res); + res.release(); + + res = storage.getEntry(1, 2); + assertEquals(entry2, res); + res.release(); + + storage.flush(); + + res = storage.getEntry(1, 1); + assertEquals(entry1, res); + res.release(); + + res = storage.getEntry(1, 2); + assertEquals(entry2, res); + res.release(); + } + + @Test + public void testAddEntriesAfterDelete() throws Exception { + storage.setMasterKey(1, "key".getBytes()); + + ByteBuf entry0 = Unpooled.buffer(1024); + entry0.writeLong(1); // ledger id + entry0.writeLong(0); // entry id + entry0.writeBytes("entry-0".getBytes()); + + ByteBuf entry1 = Unpooled.buffer(1024); + entry1.writeLong(1); // ledger id + entry1.writeLong(1); // entry id + entry1.writeBytes("entry-1".getBytes()); + + storage.addEntry(entry0); + storage.addEntry(entry1); + + storage.flush(); + + storage.deleteLedger(1); + + storage.setMasterKey(1, "key".getBytes()); + + entry0 = Unpooled.buffer(1024); + entry0.writeLong(1); // ledger id + entry0.writeLong(0); // entry id + entry0.writeBytes("entry-0".getBytes()); + + entry1 = Unpooled.buffer(1024); + entry1.writeLong(1); // ledger id + entry1.writeLong(1); // entry id + entry1.writeBytes("entry-1".getBytes()); + + storage.addEntry(entry0); + storage.addEntry(entry1); + + assertEquals(entry0, storage.getEntry(1, 0)); + assertEquals(entry1, storage.getEntry(1, 1)); + + storage.flush(); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java new file mode 100644 index 00000000000..0d3f5bbc86e --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java @@ -0,0 +1,137 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.io.File; +import java.io.IOException; + +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for {@link DbLedgerStorage}. + */ +public class DbLedgerStorageWriteCacheTest { + + private DbLedgerStorage storage; + private File tmpDir; + + private static class MockedDbLedgerStorage extends DbLedgerStorage { + + @Override + public void flush() throws IOException { + flushMutex.lock(); + try { + // Swap the write caches and block indefinitely to simulate a slow disk + WriteCache tmp = writeCacheBeingFlushed; + writeCacheBeingFlushed = writeCache; + writeCache = tmp; + + // since the cache is switched, we can allow flush to be triggered + hasFlushBeenTriggered.set(false); + + // Block the flushing thread + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + return; + } + } finally { + flushMutex.unlock(); + } + } + + } + + @Before + public void setup() throws Exception { + tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + File curDir = Bookie.getCurrentDirectory(tmpDir); + Bookie.checkDirectoryStructure(curDir); + + int gcWaitTime = 1000; + ServerConfiguration conf = new ServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setAllowLoopback(true); + conf.setLedgerStorageClass(MockedDbLedgerStorage.class.getName()); + conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 1); + conf.setLedgerDirNames(new String[] { tmpDir.toString() }); + Bookie bookie = new Bookie(conf); + + storage = (DbLedgerStorage) bookie.getLedgerStorage(); + } + + @After + public void teardown() throws Exception { + storage.shutdown(); + tmpDir.delete(); + } + + @Test + public void writeCacheFull() throws Exception { + storage.setMasterKey(4, "key".getBytes()); + assertEquals(false, storage.isFenced(4)); + assertEquals(true, storage.ledgerExists(4)); + + assertEquals("key", new String(storage.readMasterKey(4))); + + // Add enough entries to fill the 1st write cache + for (int i = 0; i < 5; i++) { + ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8); + entry.writeLong(4); // ledger id + entry.writeLong(i); // entry id + entry.writeZero(100 * 1024); + storage.addEntry(entry); + } + + for (int i = 0; i < 5; i++) { + ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8); + entry.writeLong(4); // ledger id + entry.writeLong(5 + i); // entry id + entry.writeZero(100 * 1024); + storage.addEntry(entry); + } + + // Next add should fail for cache full + ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8); + entry.writeLong(4); // ledger id + entry.writeLong(22); // entry id + entry.writeZero(100 * 1024); + + try { + storage.addEntry(entry); + fail("Should have thrown exception"); + } catch (IOException e) { + // Expected + } + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java new file mode 100644 index 00000000000..6e83ffd58a4 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java @@ -0,0 +1,111 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import static org.junit.Assert.assertEquals; + +import java.io.File; + +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.junit.Test; + +/** + * Unit test for {@link EntryLocationIndex}. + */ +public class EntryLocationIndexTest { + + private final ServerConfiguration serverConfiguration = new ServerConfiguration(); + + @Test + public void deleteLedgerTest() throws Exception { + File tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + tmpDir.deleteOnExit(); + + EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory, + tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE); + + // Add some dummy indexes + idx.addLocation(40312, 0, 1); + idx.addLocation(40313, 10, 2); + idx.addLocation(40320, 0, 3); + + // Add more indexes in a different batch + idx.addLocation(40313, 11, 5); + idx.addLocation(40313, 12, 6); + idx.addLocation(40320, 1, 7); + idx.addLocation(40312, 3, 4); + + idx.delete(40313); + + assertEquals(1, idx.getLocation(40312, 0)); + assertEquals(4, idx.getLocation(40312, 3)); + assertEquals(3, idx.getLocation(40320, 0)); + assertEquals(7, idx.getLocation(40320, 1)); + + assertEquals(2, idx.getLocation(40313, 10)); + assertEquals(5, idx.getLocation(40313, 11)); + assertEquals(6, idx.getLocation(40313, 12)); + + idx.removeOffsetFromDeletedLedgers(); + + // After flush the keys will be removed + assertEquals(0, idx.getLocation(40313, 10)); + assertEquals(0, idx.getLocation(40313, 11)); + assertEquals(0, idx.getLocation(40313, 12)); + + idx.close(); + } + + // this tests if a ledger is added after it has been deleted + @Test + public void addLedgerAfterDeleteTest() throws Exception { + File tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + tmpDir.deleteOnExit(); + + EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory, + tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE); + + // Add some dummy indexes + idx.addLocation(40312, 0, 1); + idx.addLocation(40313, 10, 2); + idx.addLocation(40320, 0, 3); + + idx.delete(40313); + + // Add more indexes in a different batch + idx.addLocation(40313, 11, 5); + idx.addLocation(40313, 12, 6); + idx.addLocation(40320, 1, 7); + idx.addLocation(40312, 3, 4); + + idx.removeOffsetFromDeletedLedgers(); + + assertEquals(0, idx.getLocation(40313, 11)); + assertEquals(0, idx.getLocation(40313, 12)); + + idx.close(); + } +} diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml index 2cc5ce766cd..736c88b11d4 100644 --- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml +++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml @@ -24,6 +24,10 @@ + + + +