From 4675bcee600ef821321d3328da8f61a850f9652d Mon Sep 17 00:00:00 2001 From: Ivan Kelly Date: Fri, 15 Jan 2021 12:48:22 +0100 Subject: [PATCH 1/6] Garbage Collection support for DirectIO entrylogger GC support requires that the entrylogger provides a way to retrieve all entrylogs which have been completely flushed to disk. Previously this was done by returning the least unflushed log id. However, this is problematic as it doesn't support the log ids wrapping around. It also means that GC has to start checking for log id existence from zero every time it boots. This change replaces getLeastUnflushedLogId() with getFlushedLogIds(), to give the entrylogger full control of which logs should be considered for GC. It also changes the CompactableLedgerStorage interface, removing getEntryLogger() and adding injection of the entrylogger to the GarbageCollectionThread. This makes testing easier. (cherry picked from commit 853dd8490bc01a716eb919167d4c59a12bd1b768) --- bookkeeper-server/build.gradle | 3 + bookkeeper-server/pom.xml | 10 ++ .../bookie/CompactableLedgerStorage.java | 6 - .../bookkeeper/bookie/DefaultEntryLogger.java | 60 +++----- .../bookie/EntryLogMetadataMap.java | 27 +++- .../bookie/GarbageCollectorThread.java | 41 +---- .../bookie/InMemoryEntryLogMetadataMap.java | 10 ++ .../bookie/InterleavedLedgerStorage.java | 3 +- .../bookie/SortedLedgerStorage.java | 1 - .../TransactionalEntryLogCompactor.java | 15 +- .../bookie/storage/CompactionEntryLog.java | 7 +- .../bookie/storage/EntryLogger.java | 25 +--- .../ldb/PersistentEntryLogMetadataMap.java | 15 ++ .../ldb/SingleDirectoryDbLedgerStorage.java | 6 +- .../ConcurrentLongLongHashMap.java | 16 ++ .../bookkeeper/bookie/CompactionTest.java | 80 +--------- .../bookie/GarbageCollectorThreadTest.java | 140 ++++++++++++++++-- .../bookkeeper/bookie/MockLedgerStorage.java | 32 +++- .../SortedLedgerStorageCheckpointTest.java | 9 +- .../bookie/storage/EntryLogTestUtils.java | 66 +++++++++ .../apache/bookkeeper/meta/GcLedgersTest.java | 5 - .../meta/LedgerManagerTestCase.java | 5 - 22 files changed, 346 insertions(+), 236 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/EntryLogTestUtils.java diff --git a/bookkeeper-server/build.gradle b/bookkeeper-server/build.gradle index 18a43d2d5fb..b6a315d5133 100644 --- a/bookkeeper-server/build.gradle +++ b/bookkeeper-server/build.gradle @@ -27,6 +27,9 @@ dependencies { implementation project(':bookkeeper-common-allocator') implementation project(':bookkeeper-http:http-server') implementation project(':bookkeeper-proto') + implementation project(':bookkeeper-slogger:api') + implementation project(':bookkeeper-slogger:slf4j') + implementation project(':bookkeeper-stats') implementation project(':bookkeeper-tools-framework') implementation project(':circe-checksum') implementation project(':cpu-affinity') diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml index bc7886d7bfa..f9ab427bfa4 100644 --- a/bookkeeper-server/pom.xml +++ b/bookkeeper-server/pom.xml @@ -40,6 +40,16 @@ bookkeeper-proto ${project.parent.version} + + org.apache.bookkeeper + bookkeeper-slogger-slf4j + ${project.parent.version} + + + org.apache.bookkeeper + bookkeeper-slogger-api + ${project.parent.version} + org.apache.bookkeeper bookkeeper-tools-framework diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java index 832cb57d1a6..1798391fc76 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java @@ -22,17 +22,11 @@ package org.apache.bookkeeper.bookie; import java.io.IOException; -import org.apache.bookkeeper.bookie.storage.EntryLogger; /** * Interface that identifies LedgerStorage implementations using EntryLogger and running periodic entries compaction. */ public interface CompactableLedgerStorage extends LedgerStorage { - /** - * @return the EntryLogger used by the ledger storage - */ - EntryLogger getEntryLogger(); - /** * Get an iterator over a range of ledger ids stored in the bookie. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index dc90811e839..e2d5bdaf967 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -68,6 +69,7 @@ import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap; import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.BiConsumerLong; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -448,38 +450,25 @@ public BufferedReadChannel getFromChannels(long logId) { return logid2Channel.get().get(logId); } - /** - * Get the least unflushed log id. Garbage collector thread should not process - * unflushed entry log file. - * - * @return least unflushed log id. - */ - @Override - public long getLeastUnflushedLogId() { + @VisibleForTesting + long getLeastUnflushedLogId() { return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId(); } - /** - * Get the last log id created so far. If entryLogPerLedger is enabled, the Garbage Collector - * process needs to look beyond the least unflushed entry log file, as there may be entry logs - * ready to be garbage collected. - * - * @return last entry log id created. - */ - @Override - public long getLastLogId() { - return recentlyCreatedEntryLogsStatus.getLastLogId(); - } - - /** - * Returns whether the current log id exists and has been rotated already. - * - * @param entryLogId EntryLog id to check. - * @return Whether the given entryLogId exists and has been rotated. - */ @Override - public boolean isFlushedEntryLog(Long entryLogId) { - return recentlyCreatedEntryLogsStatus.isFlushedEntryLog(entryLogId); + public Set getFlushedLogIds() { + Set logIds = new HashSet<>(); + synchronized (recentlyCreatedEntryLogsStatus) { + for (File dir : ledgerDirsManager.getAllLedgerDirs()) { + for (File f : dir.listFiles(file -> file.getName().endsWith(".log"))) { + long logId = fileName2LogId(f.getName()); + if (recentlyCreatedEntryLogsStatus.isFlushedLogId(logId)) { + logIds.add(logId); + } + } + } + } + return logIds; } long getPreviousAllocatedEntryLogId() { @@ -1270,13 +1259,8 @@ synchronized long getLeastUnflushedLogId() { return leastUnflushedLogId; } - synchronized long getLastLogId() { - return !entryLogsStatusMap.isEmpty() ? entryLogsStatusMap.lastKey() : 0; - } - - synchronized boolean isFlushedEntryLog(Long entryLogId) { - return entryLogsStatusMap.containsKey(entryLogId) && entryLogsStatusMap.get(entryLogId) - || entryLogId < leastUnflushedLogId; + synchronized boolean isFlushedLogId(long entryLogId) { + return entryLogsStatusMap.getOrDefault(entryLogId, Boolean.FALSE) || entryLogId < leastUnflushedLogId; } } @@ -1354,7 +1338,7 @@ public void makeAvailable() throws IOException { } } @Override - public void cleanup() { + public void finalizeAndCleanup() { if (compactedLogFile.exists()) { if (!compactedLogFile.delete()) { LOG.warn("Could not delete file: {}", compactedLogFile); @@ -1368,11 +1352,11 @@ public void cleanup() { } @Override - public long getLogId() { + public long getDstLogId() { return compactionLogId; } @Override - public long getCompactedLogId() { + public long getSrcLogId() { return logIdToCompact; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java index 88f6ce53986..f9d41d2ff2a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java @@ -22,7 +22,6 @@ package org.apache.bookkeeper.bookie; import java.io.Closeable; -import java.io.IOException; import java.util.function.BiConsumer; import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException; @@ -37,7 +36,7 @@ public interface EntryLogMetadataMap extends Closeable { * * @param entryLogId * @return - * @throws IOException + * @throws EntryLogMetadataMapException */ boolean containsKey(long entryLogId) throws EntryLogMetadataMapException; @@ -46,7 +45,7 @@ public interface EntryLogMetadataMap extends Closeable { * * @param entryLogId * @param entryLogMeta - * @throws IOException + * @throws EntryLogMetadataMapException */ void put(long entryLogId, EntryLogMetadata entryLogMeta) throws EntryLogMetadataMapException; @@ -55,7 +54,7 @@ public interface EntryLogMetadataMap extends Closeable { * have been processed or the action throws an exception. * * @param action - * @throws IOException + * @throws EntryLogMetadataMapException */ void forEach(BiConsumer action) throws EntryLogMetadataMapException; @@ -63,7 +62,7 @@ public interface EntryLogMetadataMap extends Closeable { * Removes entryLogMetadata record from the map. * * @param entryLogId - * @throws IOException + * @throws EntryLogMetadataMapException */ void remove(long entryLogId) throws EntryLogMetadataMapException; @@ -71,8 +70,24 @@ public interface EntryLogMetadataMap extends Closeable { * Returns number of entryLogMetadata records presents into the map. * * @return - * @throws IOException + * @throws EntryLogMetadataMapException */ int size() throws EntryLogMetadataMapException; + /** + * Returns true iff there are no elements in the map. + * + * @return + */ + default boolean isEmpty() throws EntryLogMetadataMapException { + return size() == 0; + } + + /** + * Clear all records from the map. + * For unit tests. + * + * @throws EntryLogMetadataMapException + */ + void clear() throws EntryLogMetadataMapException; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index fba0bec90fa..0be7cbe2f3e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -33,9 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; import lombok.Getter; @@ -111,9 +109,6 @@ public class GarbageCollectorThread extends SafeRunnable { volatile boolean running = true; - // track the last scanned successfully log id - long scannedLogId = 0; - // Boolean to trigger a forced GC. final AtomicBoolean forceGarbageCollection = new AtomicBoolean(false); // Boolean to disable major compaction, when disk is almost full @@ -139,8 +134,9 @@ public class GarbageCollectorThread extends SafeRunnable { public GarbageCollectorThread(ServerConfiguration conf, LedgerManager ledgerManager, final LedgerDirsManager ledgerDirsManager, final CompactableLedgerStorage ledgerStorage, + EntryLogger entryLogger, StatsLogger statsLogger) throws IOException { - this(conf, ledgerManager, ledgerDirsManager, ledgerStorage, statsLogger, + this(conf, ledgerManager, ledgerDirsManager, ledgerStorage, entryLogger, statsLogger, Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("GarbageCollectorThread"))); } @@ -155,6 +151,7 @@ public GarbageCollectorThread(ServerConfiguration conf, LedgerManager ledgerManager, final LedgerDirsManager ledgerDirsManager, final CompactableLedgerStorage ledgerStorage, + EntryLogger entryLogger, StatsLogger statsLogger, ScheduledExecutorService gcExecutor) throws IOException { @@ -162,7 +159,7 @@ public GarbageCollectorThread(ServerConfiguration conf, this.conf = conf; this.ledgerDirsManager = ledgerDirsManager; - this.entryLogger = ledgerStorage.getEntryLogger(); + this.entryLogger = entryLogger; this.entryLogMetaMap = createEntryLogMetadataMap(); this.ledgerStorage = ledgerStorage; this.gcWaitTime = conf.getGcWaitTime(); @@ -214,8 +211,7 @@ public void removeEntryLog(long logToRemove) { } }; if (conf.getUseTransactionalCompaction()) { - this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage, - ledgerDirsManager, remover); + this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage, remover); } else { this.compactor = new EntryLogCompactor(conf, entryLogger, ledgerStorage, remover); } @@ -678,15 +674,7 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) { * @throws EntryLogMetadataMapException */ protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException { - // Entry Log ID's are just a long value that starts at 0 and increments by 1 when the log fills up and we roll - // to a new one. We scan entry logs as follows: - // - entryLogPerLedgerEnabled is false: Extract it for every entry log except for the current one (un-flushed). - // - entryLogPerLedgerEnabled is true: Scan all flushed entry logs up to the highest known id. - Supplier finalEntryLog = () -> conf.isEntryLogPerLedgerEnabled() ? entryLogger.getLastLogId() : - entryLogger.getLeastUnflushedLogId(); - boolean hasExceptionWhenScan = false; - boolean increaseScannedLogId = true; - for (long entryLogId = scannedLogId; entryLogId < finalEntryLog.get(); entryLogId++) { + for (long entryLogId : entryLogger.getFlushedLogIds()) { // Comb the current entry log file if it has not already been extracted. if (entryLogMetaMap.containsKey(entryLogId)) { continue; @@ -698,15 +686,6 @@ protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException { continue; } - // If entryLogPerLedgerEnabled is true, we will look for entry log files beyond getLeastUnflushedLogId() - // that have been explicitly rotated or below getLeastUnflushedLogId(). - if (conf.isEntryLogPerLedgerEnabled() && !entryLogger.isFlushedEntryLog(entryLogId)) { - LOG.info("Entry log {} not flushed (entryLogPerLedgerEnabled). Starting next iteration at this point.", - entryLogId); - increaseScannedLogId = false; - continue; - } - LOG.info("Extracting entry log meta from entryLogId: {}", entryLogId); try { @@ -722,18 +701,10 @@ protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException { entryLogMetaMap.put(entryLogId, entryLogMeta); } } catch (IOException e) { - hasExceptionWhenScan = true; LOG.warn("Premature exception when processing " + entryLogId + " recovery will take care of the problem", e); } - // if scan failed on some entry log, we don't move 'scannedLogId' to next id - // if scan succeed, we don't need to scan it again during next gc run, - // we move 'scannedLogId' to next id (unless entryLogPerLedgerEnabled is true - // and we have found and un-flushed entry log already). - if (!hasExceptionWhenScan && (!conf.isEntryLogPerLedgerEnabled() || increaseScannedLogId)) { - ++scannedLogId; - } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java index 106a382f543..4949c16441f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java @@ -57,6 +57,16 @@ public int size() { return entryLogMetaMap.size(); } + @Override + public boolean isEmpty() { + return entryLogMetaMap.isEmpty(); + } + + @Override + public void clear() { + entryLogMetaMap.clear(); + } + @Override public void close() throws IOException { entryLogMetaMap.clear(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index a071fd5f40d..782c50ae5fc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -190,7 +190,7 @@ public void initializeWithEntryLogger(ServerConfiguration conf, ledgerCache = new LedgerCacheImpl(conf, activeLedgers, null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger); gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager, - this, statsLogger.scope("gc")); + this, entryLogger, statsLogger.scope("gc")); ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); // Expose Stats getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET); @@ -512,7 +512,6 @@ public void flushEntriesLocationsIndex() throws IOException { ledgerCache.flushLedger(true); } - @Override public EntryLogger getEntryLogger() { return entryLogger; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java index 89a429b7627..d68c5dce77b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java @@ -342,7 +342,6 @@ BookieStateManager getStateManager(){ return (BookieStateManager) stateManager; } - @Override public EntryLogger getEntryLogger() { return interleavedLedgerStorage.getEntryLogger(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index 7c7307c3714..f1f3ab747bb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -48,24 +48,21 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor { final EntryLogger entryLogger; final CompactableLedgerStorage ledgerStorage; - final LedgerDirsManager ledgerDirsManager; final List offsets = new ArrayList<>(); // compaction log file suffix - static final String COMPACTING_SUFFIX = ".log.compacting"; + public static final String COMPACTING_SUFFIX = ".log.compacting"; // flushed compaction log file suffix - static final String COMPACTED_SUFFIX = ".compacted"; + public static final String COMPACTED_SUFFIX = ".compacted"; public TransactionalEntryLogCompactor( ServerConfiguration conf, EntryLogger entryLogger, CompactableLedgerStorage ledgerStorage, - LedgerDirsManager ledgerDirsManager, LogRemovalListener logRemover) { super(conf, logRemover); this.entryLogger = entryLogger; this.ledgerStorage = ledgerStorage; - this.ledgerDirsManager = ledgerDirsManager; } /** @@ -300,8 +297,8 @@ boolean complete() { // When index is flushed, and entry log is removed, // delete the ".compacted" file to indicate this phase is completed. offsets.clear(); - compactionLog.cleanup(); - logRemovalListener.removeEntryLog(compactionLog.getCompactedLogId()); + compactionLog.finalizeAndCleanup(); + logRemovalListener.removeEntryLog(compactionLog.getSrcLogId()); return true; } @@ -330,11 +327,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio ledgerId, lid, entryId, offset); throw new IOException("Invalid entry found @ offset " + offset); } - long location = (compactionLog.getLogId() << 32L) | (offset + 4); + long location = (compactionLog.getDstLogId() << 32L) | (offset + 4); offsets.add(new EntryLocation(lid, entryId, location)); } }); - LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactionLog.getLogId()); + LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactionLog.getDstLogId()); } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java index 60806add86d..fcbfcf702f2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java @@ -75,17 +75,16 @@ public interface CompactionEntryLog { /** * Clean up any temporary resources that were used by the compaction process. - * At this point, there */ - void cleanup(); + void finalizeAndCleanup(); /** * Get the log ID of the entrylog to which compacted entries are being written. */ - long getLogId(); + long getDstLogId(); /** * Get the log ID of the entrylog which is being compacted. */ - long getCompactedLogId(); + long getSrcLogId(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java index cab72cd587c..a4aa93ed266 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java @@ -89,21 +89,12 @@ ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) Collection incompleteCompactionLogs(); /** - * Get the last log id created so far. If entryLogPerLedger is enabled, the Garbage Collector - * process needs to look beyond the least unflushed entry log file, as there may be entry logs - * ready to be garbage collected. - * - * @return last entry log id created. - */ - long getLastLogId(); - - /** - * Get the lowest entrylog ID which has not had all its data persisted to + * Get the log ids for the set of logs which have been completely flushed to * disk. - * This is used by compaction. Any entrylog ID higher or equal to this value - * will not be considered for compaction. + * Only log ids in this set are considered for either compaction or garbage + * collection. */ - long getLeastUnflushedLogId(); + Collection getFlushedLogIds(); /** * Scan the given entrylog, returning all entries contained therein. @@ -133,14 +124,6 @@ default EntryLogMetadata getEntryLogMetadata(long entryLogId) throws IOException */ boolean logExists(long logId); - /** - * Returns whether the current log id exists and has been rotated already. - * - * @param entryLogId EntryLog id to check. - * @return Whether the given entryLogId exists and has been rotated. - */ - boolean isFlushedEntryLog(Long entryLogId); - /** * Delete the entrylog with the given ID. * @return false if the entrylog doesn't exist. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java index ec289fd16c2..35c53fe620a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java @@ -196,6 +196,21 @@ public int size() throws EntryLogMetadataMapException { } } + @Override + public void clear() throws EntryLogMetadataMapException { + try { + try (KeyValueStorage.Batch b = metadataMapDB.newBatch(); + CloseableIterator itr = metadataMapDB.keys()) { + while (itr.hasNext()) { + b.remove(itr.next()); + } + b.flush(); + } + } catch (IOException e) { + throw new EntryLogMetadataMapException(e); + } + } + @Override public void close() throws IOException { if (isClosed.compareAndSet(false, true)) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 63f2f5a1b7e..7aeea2bc6df 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -187,7 +187,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES); entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager, null, statsLogger, allocator); - gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager, this, statsLogger); + gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager, this, entryLogger, statsLogger); dbLedgerStorageStats = new DbLedgerStorageStats( ledgerDirStatsLogger, @@ -864,8 +864,8 @@ public void updateEntriesLocations(Iterable locations) throws IOE entryLocationIndex.updateLocations(locations); } - @Override - public EntryLogger getEntryLogger() { + @VisibleForTesting + EntryLogger getEntryLogger() { return entryLogger; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java index ec1d232ab1b..6db4e32ad7c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java @@ -891,4 +891,20 @@ private static void checkBiggerEqualZero(long n) { throw new IllegalArgumentException("Keys and values must be >= 0"); } } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("ConcurrentLongLongHashMap{"); + + int headerLen = sb.length(); + forEach((k, v) -> { + sb.append(k).append(" => ").append(v).append(", "); + }); + if (sb.length() > headerLen) { + sb.setLength(sb.length() - 2); + } + sb.append("}"); + return sb.toString(); + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 9005fef748b..99fdabe23d4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -45,10 +45,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -87,7 +85,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * This class tests the entry log compaction functionality. */ @@ -723,7 +720,7 @@ public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception { bkc.deleteLedger(lhs[2].getId()); // Need to wait until entry log 3 gets flushed before initiating GC to satisfy assertions. - while (!getGCThread().entryLogger.isFlushedEntryLog(3L)) { + while (!getGCThread().entryLogger.getFlushedLogIds().contains(3L)) { TimeUnit.MILLISECONDS.sleep(100); } @@ -1509,80 +1506,6 @@ public void checkpointComplete(Checkpoint checkpoint, storage.gcThread.doCompactEntryLogs(threshold, limit); } - /** - * Test extractMetaFromEntryLogs optimized method to avoid excess memory usage. - */ - public void testExtractMetaFromEntryLogs() throws Exception { - // restart bookies - restartBookies(c -> { - // Always run this test with Throttle enabled. - c.setIsThrottleByBytes(true); - return c; - }); - ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); - File tmpDir = tmpDirs.createNew("bkTest", ".dir"); - File curDir = BookieImpl.getCurrentDirectory(tmpDir); - BookieImpl.checkDirectoryStructure(curDir); - conf.setLedgerDirNames(new String[] { tmpDir.toString() }); - - LedgerDirsManager dirs = new LedgerDirsManager(conf, conf.getLedgerDirs(), - new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - final Set ledgers = Collections - .newSetFromMap(new ConcurrentHashMap()); - - LedgerManager manager = getLedgerManager(ledgers); - - CheckpointSource checkpointSource = new CheckpointSource() { - - @Override - public Checkpoint newCheckpoint() { - return null; - } - - @Override - public void checkpointComplete(Checkpoint checkpoint, - boolean compact) throws IOException { - } - }; - InterleavedLedgerStorage storage = new InterleavedLedgerStorage(); - storage.initialize(conf, manager, dirs, dirs, - NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT); - storage.setCheckpointSource(checkpointSource); - storage.setCheckpointer(Checkpointer.NULL); - - - for (long ledger = 0; ledger <= 10; ledger++) { - ledgers.add(ledger); - for (int entry = 1; entry <= 50; entry++) { - try { - storage.addEntry(genEntry(ledger, entry, ENTRY_SIZE)); - } catch (IOException e) { - //ignore exception on failure to add entry. - } - } - } - - storage.flush(); - storage.shutdown(); - - storage = new InterleavedLedgerStorage(); - storage.initialize(conf, manager, dirs, dirs, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT); - storage.setCheckpointSource(checkpointSource); - storage.setCheckpointer(Checkpointer.NULL); - - long startingEntriesCount = storage.gcThread.entryLogger.getLeastUnflushedLogId() - - storage.gcThread.scannedLogId; - LOG.info("The old Log Entry count is: " + startingEntriesCount); - - Map entryLogMetaData = new HashMap<>(); - long finalEntriesCount = storage.gcThread.entryLogger.getLeastUnflushedLogId() - - storage.gcThread.scannedLogId; - LOG.info("The latest Log Entry count is: " + finalEntriesCount); - - assertTrue("The GC did not clean up entries...", startingEntriesCount != finalEntriesCount); - assertTrue("Entries Count is zero", finalEntriesCount == 0); - } - private ByteBuf genEntry(long ledger, long entry, int size) { ByteBuf bb = Unpooled.buffer(size); bb.writeLong(ledger); @@ -1885,7 +1808,6 @@ public MockTransactionalEntryLogCompactor(GarbageCollectorThread gcThread) { super(gcThread.conf, gcThread.entryLogger, gcThread.ledgerStorage, - gcThread.ledgerDirsManager, (long entry) -> { try { gcThread.removeEntryLog(entry); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java index 9ed271ecfca..923cf1c1ae6 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java @@ -20,18 +20,38 @@ */ package org.apache.bookkeeper.bookie; +import static org.apache.bookkeeper.bookie.storage.EntryLogTestUtils.logIdFromLocation; +import static org.apache.bookkeeper.bookie.storage.EntryLogTestUtils.makeEntry; +import static org.apache.bookkeeper.bookie.storage.EntryLogTestUtils.newDirsManager; +import static org.apache.bookkeeper.bookie.storage.EntryLogTestUtils.newLegacyEntryLogger; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.openMocks; +import java.io.File; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.MockLedgerManager; +import org.apache.bookkeeper.slogger.Slogger; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.test.TmpDirs; + +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -39,15 +59,16 @@ import org.mockito.Mock; import org.mockito.Spy; import org.powermock.reflect.Whitebox; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Unit test for {@link GarbageCollectorThread}. */ @SuppressWarnings("deprecation") public class GarbageCollectorThreadTest { - private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThreadTest.class); + private static final Slogger slog = Slogger.CONSOLE; + + private final TmpDirs tmpDirs = new TmpDirs(); + @InjectMocks @Spy private GarbageCollectorThread mockGCThread; @@ -64,10 +85,15 @@ public class GarbageCollectorThreadTest { @Before public void setUp() throws Exception { - when(ledgerStorage.getEntryLogger()).thenReturn(mock(DefaultEntryLogger.class)); + conf.setAllowLoopback(true); openMocks(this); } + @After + public void cleanup() throws Exception { + tmpDirs.cleanup(); + } + @Test public void testCompactEntryLogWithException() throws Exception { AbstractLogCompactor mockCompactor = mock(AbstractLogCompactor.class); @@ -88,25 +114,121 @@ public void testCalculateUsageBucket() { // Valid range for usage is [0.0 to 1.0] final int numBuckets = 10; int[] usageBuckets = new int[numBuckets]; + String[] bucketNames = new String[numBuckets]; for (int i = 0; i < numBuckets; i++) { usageBuckets[i] = 0; + bucketNames[i] = String.format("%d%%", (i + 1) * 10); } int items = 10000; + for (int item = 0; item <= items; item++) { double usage = ((double) item / (double) items); int index = mockGCThread.calculateUsageIndex(numBuckets, usage); - Assert.assertFalse("Boundary condition exceeded", index < 0 || index >= numBuckets); - LOG.debug("Mapped {} usage to {}}\n", usage, index); + assertFalse("Boundary condition exceeded", index < 0 || index >= numBuckets); + slog.kv("usage", usage) + .kv("index", index) + .info("Mapped usage to index"); usageBuckets[index]++; } - LOG.info( - "Compaction: entry log usage buckets[10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}", - usageBuckets); + + Slogger sl = slog.ctx(); + for (int i = 0; i < numBuckets; i++) { + sl = sl.kv(bucketNames[i], usageBuckets[i]); + } + sl.info("Compaction: entry log usage buckets"); + int sum = 0; for (int i = 0; i < numBuckets; i++) { sum += usageBuckets[i]; } Assert.assertEquals("Incorrect number of items", items + 1, sum); } + + @Test + public void testExtractMetaFromEntryLogsLegacy() throws Exception { + File ledgerDir = tmpDirs.createNew("testExtractMeta", "ledgers"); + testExtractMetaFromEntryLogs( + newLegacyEntryLogger(20000, ledgerDir), ledgerDir); + } + + private void testExtractMetaFromEntryLogs(EntryLogger entryLogger, File ledgerDir) + throws Exception { + + MockLedgerStorage storage = new MockLedgerStorage(); + MockLedgerManager lm = new MockLedgerManager(); + + GarbageCollectorThread gcThread = new GarbageCollectorThread( + TestBKConfiguration.newServerConfiguration(), lm, + newDirsManager(ledgerDir), + storage, entryLogger, + NullStatsLogger.INSTANCE); + + // Add entries. + // Ledger 1 is on first entry log + // Ledger 2 spans first, second and third entry log + // Ledger 3 is on the third entry log (which is still active when extract meta) + long loc1 = entryLogger.addEntry(1L, makeEntry(1L, 1L, 5000)); + long loc2 = entryLogger.addEntry(2L, makeEntry(2L, 1L, 5000)); + assertThat(logIdFromLocation(loc2), equalTo(logIdFromLocation(loc1))); + long loc3 = entryLogger.addEntry(2L, makeEntry(2L, 1L, 15000)); + assertThat(logIdFromLocation(loc3), greaterThan(logIdFromLocation(loc2))); + long loc4 = entryLogger.addEntry(2L, makeEntry(2L, 1L, 15000)); + assertThat(logIdFromLocation(loc4), greaterThan(logIdFromLocation(loc3))); + long loc5 = entryLogger.addEntry(3L, makeEntry(3L, 1L, 1000)); + assertThat(logIdFromLocation(loc5), equalTo(logIdFromLocation(loc4))); + + long logId1 = logIdFromLocation(loc2); + long logId2 = logIdFromLocation(loc3); + long logId3 = logIdFromLocation(loc5); + entryLogger.flush(); + + storage.setMasterKey(1L, new byte[0]); + storage.setMasterKey(2L, new byte[0]); + storage.setMasterKey(3L, new byte[0]); + + assertThat(entryLogger.getFlushedLogIds(), containsInAnyOrder(logId1, logId2)); + assertTrue(entryLogger.logExists(logId3)); + + // all ledgers exist, nothing should disappear + final EntryLogMetadataMap entryLogMetaMap = gcThread.getEntryLogMetaMap(); + gcThread.extractMetaFromEntryLogs(); + + assertThat(entryLogger.getFlushedLogIds(), containsInAnyOrder(logId1, logId2)); + assertTrue(entryLogMetaMap.containsKey(logId1)); + assertTrue(entryLogMetaMap.containsKey(logId2)); + assertTrue(entryLogger.logExists(logId3)); + + // log 2 is 100% ledger 2, so it should disappear if ledger 2 is deleted + entryLogMetaMap.clear(); + storage.deleteLedger(2L); + gcThread.extractMetaFromEntryLogs(); + + assertThat(entryLogger.getFlushedLogIds(), containsInAnyOrder(logId1)); + assertTrue(entryLogMetaMap.containsKey(logId1)); + assertTrue(entryLogger.logExists(logId3)); + + // delete all ledgers, all logs except the current should be deleted + entryLogMetaMap.clear(); + storage.deleteLedger(1L); + storage.deleteLedger(3L); + gcThread.extractMetaFromEntryLogs(); + + assertThat(entryLogger.getFlushedLogIds(), empty()); + assertTrue(entryLogMetaMap.isEmpty()); + assertTrue(entryLogger.logExists(logId3)); + + // add enough entries to roll log, log 3 can not be GC'd + long loc6 = entryLogger.addEntry(3L, makeEntry(3L, 1L, 25000)); + assertThat(logIdFromLocation(loc6), greaterThan(logIdFromLocation(loc5))); + entryLogger.flush(); + assertThat(entryLogger.getFlushedLogIds(), containsInAnyOrder(logId3)); + + entryLogMetaMap.clear(); + gcThread.extractMetaFromEntryLogs(); + + assertThat(entryLogger.getFlushedLogIds(), empty()); + assertTrue(entryLogMetaMap.isEmpty()); + assertFalse(entryLogger.logExists(logId3)); + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java index 06703966f25..f6ed19f6fed 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java @@ -41,7 +41,7 @@ /** * A mock for running tests that require ledger storage. */ -public class MockLedgerStorage implements LedgerStorage { +public class MockLedgerStorage implements CompactableLedgerStorage { private static class LedgerInfo { boolean limbo = false; @@ -242,7 +242,7 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { @Override public void deleteLedger(long ledgerId) throws IOException { - throw new UnsupportedOperationException("Not supported in mock, implement if you need it"); + ledgers.remove(ledgerId); } @Override @@ -262,32 +262,32 @@ public ByteBuf getExplicitLac(long ledgerId) throws IOException { @Override public LedgerStorage getUnderlyingLedgerStorage() { - return LedgerStorage.super.getUnderlyingLedgerStorage(); + return CompactableLedgerStorage.super.getUnderlyingLedgerStorage(); } @Override public void forceGC() { - LedgerStorage.super.forceGC(); + CompactableLedgerStorage.super.forceGC(); } @Override public void forceGC(Boolean forceMajor, Boolean forceMinor) { - LedgerStorage.super.forceGC(forceMajor, forceMinor); + CompactableLedgerStorage.super.forceGC(forceMajor, forceMinor); } @Override public List localConsistencyCheck(Optional rateLimiter) throws IOException { - return LedgerStorage.super.localConsistencyCheck(rateLimiter); + return CompactableLedgerStorage.super.localConsistencyCheck(rateLimiter); } @Override public boolean isInForceGC() { - return LedgerStorage.super.isInForceGC(); + return CompactableLedgerStorage.super.isInForceGC(); } @Override public List getGarbageCollectionStatus() { - return LedgerStorage.super.getGarbageCollectionStatus(); + return CompactableLedgerStorage.super.getGarbageCollectionStatus(); } @Override @@ -295,6 +295,22 @@ public PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws I throw new UnsupportedOperationException("Not supported in mock, implement if you need it"); } + @Override + public Iterable getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) + throws IOException { + throw new UnsupportedOperationException("Not supported in mock, implement if you need it"); + } + + @Override + public void updateEntriesLocations(Iterable locations) throws IOException { + throw new UnsupportedOperationException("Not supported in mock, implement if you need it"); + } + + @Override + public void flushEntriesLocationsIndex() throws IOException { + throw new UnsupportedOperationException("Not supported in mock, implement if you need it"); + } + @Override public EnumSet getStorageStateFlags() throws IOException { return storageStateFlags; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java index 07b0018683e..3a049bd91de 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java @@ -224,12 +224,11 @@ public void testCheckpointAfterEntryLogRotated() throws Exception { }); // simulate entry log is rotated (due to compaction) + DefaultEntryLogger elogger = (DefaultEntryLogger) storage.getEntryLogger(); EntryLogManagerForSingleEntryLog entryLogManager = - (EntryLogManagerForSingleEntryLog) ((DefaultEntryLogger) storage.getEntryLogger()).getEntryLogManager(); + (EntryLogManagerForSingleEntryLog) elogger.getEntryLogManager(); entryLogManager.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID); - long leastUnflushedLogId = storage.getEntryLogger().getLeastUnflushedLogId(); long currentLogId = entryLogManager.getCurrentLogId(); - log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId); readyLatch.countDown(); assertNull(checkpoints.poll()); @@ -246,8 +245,8 @@ public void testCheckpointAfterEntryLogRotated() throws Exception { assertEquals(0, storage.memTable.kvmap.size()); assertTrue( "current log " + currentLogId + " contains entries added from memtable should be forced to disk" - + " but least unflushed log is " + storage.getEntryLogger().getLeastUnflushedLogId(), - storage.getEntryLogger().getLeastUnflushedLogId() > currentLogId); + + " but flushed logs are " + elogger.getFlushedLogIds(), + elogger.getFlushedLogIds().contains(currentLogId)); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/EntryLogTestUtils.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/EntryLogTestUtils.java new file mode 100644 index 00000000000..22e607a2236 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/EntryLogTestUtils.java @@ -0,0 +1,66 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import java.io.File; +import java.util.Arrays; +import org.apache.bookkeeper.bookie.DefaultEntryLogger; +import org.apache.bookkeeper.bookie.LedgerDirsManager; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.slogger.Slogger; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.util.DiskChecker; + +/** + * EntryLogTestUtils. + */ +public class EntryLogTestUtils { + private static final Slogger slog = Slogger.CONSOLE; + + public static LedgerDirsManager newDirsManager(File... ledgerDir) throws Exception { + return new LedgerDirsManager( + new ServerConfiguration(), ledgerDir, new DiskChecker(0.999f, 0.999f)); + } + + public static EntryLogger newLegacyEntryLogger(int logSizeLimit, File... ledgerDir) throws Exception { + ServerConfiguration conf = new ServerConfiguration(); + conf.setEntryLogSizeLimit(logSizeLimit); + return new DefaultEntryLogger(conf, newDirsManager(ledgerDir), null, + NullStatsLogger.INSTANCE, ByteBufAllocator.DEFAULT); + } + + public static int logIdFromLocation(long location) { + return (int) (location >> 32); + } + + public static ByteBuf makeEntry(long ledgerId, long entryId, int size) { + ByteBuf buf = Unpooled.buffer(size); + buf.writeLong(ledgerId).writeLong(entryId); + byte[] random = new byte[buf.writableBytes()]; + Arrays.fill(random, (byte) 0xdd); + buf.writeBytes(random); + return buf; + } +} + diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java index d3e628fc0ba..e1d9a4db9b3 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java @@ -672,11 +672,6 @@ public Iterable getActiveLedgersInRange(long firstLedgerId, long lastLedge return subBkActiveLedgers.keySet(); } - @Override - public EntryLogger getEntryLogger() { - return null; - } - @Override public void updateEntriesLocations(Iterable locations) throws IOException { } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java index 362ff50b15d..97ce537e9da 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java @@ -264,11 +264,6 @@ public Iterable getActiveLedgersInRange(long firstLedgerId, long lastLedge return subBkActiveLedgers.keySet(); } - @Override - public EntryLogger getEntryLogger() { - return null; - } - @Override public void updateEntriesLocations(Iterable locations) throws IOException { } From 4989bd57f3cebaf300e2084b7d9329bb897cc1ea Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 3 May 2022 22:13:57 +0800 Subject: [PATCH 2/6] format code and fix some bug --- .../org/apache/bookkeeper/bookie/DefaultEntryLogger.java | 5 +++-- .../apache/bookkeeper/bookie/EntryLogMetadataMap.java | 2 +- .../apache/bookkeeper/bookie/GarbageCollectorThread.java | 1 - .../bookkeeper/bookie/InterleavedLedgerStorage.java | 2 +- .../apache/bookkeeper/bookie/SortedLedgerStorage.java | 9 ++++----- .../bookkeeper/bookie/LedgerStorageCheckpointTest.java | 2 +- .../bookie/SortedLedgerStorageCheckpointTest.java | 2 +- 7 files changed, 11 insertions(+), 12 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index e2d5bdaf967..69a049bac2b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -50,6 +50,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -460,7 +461,7 @@ public Set getFlushedLogIds() { Set logIds = new HashSet<>(); synchronized (recentlyCreatedEntryLogsStatus) { for (File dir : ledgerDirsManager.getAllLedgerDirs()) { - for (File f : dir.listFiles(file -> file.getName().endsWith(".log"))) { + for (File f : Objects.requireNonNull(dir.listFiles(file -> file.getName().endsWith(".log")))) { long logId = fileName2LogId(f.getName()); if (recentlyCreatedEntryLogsStatus.isFlushedLogId(logId)) { logIds.add(logId); @@ -1238,7 +1239,7 @@ static class RecentEntryLogsStatus { private long leastUnflushedLogId; RecentEntryLogsStatus(long leastUnflushedLogId) { - entryLogsStatusMap = new TreeMap(); + entryLogsStatusMap = new TreeMap<>(); this.leastUnflushedLogId = leastUnflushedLogId; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java index f9d41d2ff2a..adaf8330a29 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java @@ -75,7 +75,7 @@ public interface EntryLogMetadataMap extends Closeable { int size() throws EntryLogMetadataMapException; /** - * Returns true iff there are no elements in the map. + * Returns true if there are no elements in the map. * * @return */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 0be7cbe2f3e..1279f2dc885 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -704,7 +704,6 @@ protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException { LOG.warn("Premature exception when processing " + entryLogId + " recovery will take care of the problem", e); } - } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index 782c50ae5fc..82ee5ca6098 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -512,7 +512,7 @@ public void flushEntriesLocationsIndex() throws IOException { ledgerCache.flushLedger(true); } - public EntryLogger getEntryLogger() { + public DefaultEntryLogger getEntryLogger() { return entryLogger; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java index d68c5dce77b..7d386441d01 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java @@ -261,8 +261,7 @@ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId, @Override public void checkpoint(final Checkpoint checkpoint) throws IOException { long numBytesFlushed = memTable.flush(this, checkpoint); - ((DefaultEntryLogger) interleavedLedgerStorage.getEntryLogger()) - .prepareSortedLedgerStorageCheckpoint(numBytesFlushed); + interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed); interleavedLedgerStorage.checkpoint(checkpoint); } @@ -317,9 +316,9 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException { public void run() { try { LOG.info("Started flushing mem table."); - ((DefaultEntryLogger) interleavedLedgerStorage.getEntryLogger()).prepareEntryMemTableFlush(); + interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush(); memTable.flush(SortedLedgerStorage.this); - if (((DefaultEntryLogger) interleavedLedgerStorage.getEntryLogger()).commitEntryMemTableFlush()) { + if (interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) { interleavedLedgerStorage.checkpointer.startCheckpoint(cp); } } catch (Exception e) { @@ -342,7 +341,7 @@ BookieStateManager getStateManager(){ return (BookieStateManager) stateManager; } - public EntryLogger getEntryLogger() { + public DefaultEntryLogger getEntryLogger() { return interleavedLedgerStorage.getEntryLogger(); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java index 7afa816af99..e71750d5104 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java @@ -371,7 +371,7 @@ conf, new TestBookieImpl(conf), } handle.close(); // simulate rolling entrylog - ((EntryLogManagerBase) ((DefaultEntryLogger) ledgerStorage.getEntryLogger()).getEntryLogManager()) + ((EntryLogManagerBase) ledgerStorage.getEntryLogger().getEntryLogManager()) .createNewLog(ledgerId); // sleep for a bit for checkpoint to do its task executorController.advance(Duration.ofMillis(500)); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java index 3a049bd91de..6ad6737b634 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java @@ -224,7 +224,7 @@ public void testCheckpointAfterEntryLogRotated() throws Exception { }); // simulate entry log is rotated (due to compaction) - DefaultEntryLogger elogger = (DefaultEntryLogger) storage.getEntryLogger(); + DefaultEntryLogger elogger = storage.getEntryLogger(); EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) elogger.getEntryLogManager(); entryLogManager.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID); From 1211b0aa02032c1a9193f16e3048d6bb4a67544e Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 3 May 2022 22:45:16 +0800 Subject: [PATCH 3/6] format code --- bookkeeper-server/build.gradle | 6 +++--- .../org/apache/bookkeeper/bookie/SortedLedgerStorage.java | 1 - .../test/java/org/apache/bookkeeper/meta/GcLedgersTest.java | 1 - .../org/apache/bookkeeper/meta/LedgerManagerTestCase.java | 1 - bookkeeper-slogger/api/build.gradle | 5 ----- bookkeeper-slogger/slf4j/build.gradle | 5 ----- settings.gradle | 3 +++ 7 files changed, 6 insertions(+), 16 deletions(-) diff --git a/bookkeeper-server/build.gradle b/bookkeeper-server/build.gradle index b6a315d5133..11d0b2a88f6 100644 --- a/bookkeeper-server/build.gradle +++ b/bookkeeper-server/build.gradle @@ -27,13 +27,13 @@ dependencies { implementation project(':bookkeeper-common-allocator') implementation project(':bookkeeper-http:http-server') implementation project(':bookkeeper-proto') - implementation project(':bookkeeper-slogger:api') - implementation project(':bookkeeper-slogger:slf4j') - implementation project(':bookkeeper-stats') implementation project(':bookkeeper-tools-framework') implementation project(':circe-checksum') implementation project(':cpu-affinity') implementation project(':stats:bookkeeper-stats-api') + implementation project(':bookkeeper-slogger:api') + implementation project(':bookkeeper-slogger:slf4j') + implementation project(':bookkeeper-stats') compileOnly depLibs.lombok compileOnly depLibs.spotbugsAnnotations diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java index 7d386441d01..f79cf0ef927 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java @@ -37,7 +37,6 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; -import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java index e1d9a4db9b3..9258a155296 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java @@ -62,7 +62,6 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector; import org.apache.bookkeeper.bookie.StateManager; -import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerMetadataBuilder; import org.apache.bookkeeper.client.api.DigestType; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java index 97ce537e9da..c534c6b6ed8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java @@ -43,7 +43,6 @@ import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.StateManager; -import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; diff --git a/bookkeeper-slogger/api/build.gradle b/bookkeeper-slogger/api/build.gradle index 0264913e640..2c232565fad 100644 --- a/bookkeeper-slogger/api/build.gradle +++ b/bookkeeper-slogger/api/build.gradle @@ -26,8 +26,3 @@ dependencies { } jar.archiveBaseName = 'bookkeeper-slogger-api' - -jar { - dependsOn tasks.named("writeClasspath") -} - diff --git a/bookkeeper-slogger/slf4j/build.gradle b/bookkeeper-slogger/slf4j/build.gradle index 85653428b0d..60ed7b1fef2 100644 --- a/bookkeeper-slogger/slf4j/build.gradle +++ b/bookkeeper-slogger/slf4j/build.gradle @@ -28,8 +28,3 @@ dependencies { } jar.archiveBaseName = 'bookkeeper-slogger-slf4j' - -jar { - dependsOn tasks.named("writeClasspath") -} - diff --git a/settings.gradle b/settings.gradle index a9114e1280d..040f4f86015 100644 --- a/settings.gradle +++ b/settings.gradle @@ -52,6 +52,9 @@ include(':bookkeeper-benchmark', 'bookkeeper-server', 'shaded:bookkeeper-server-shaded', 'shaded:bookkeeper-server-tests-shaded', + ':bookkeeper-slogger:api', + ':bookkeeper-slogger:slf4j', + 'bookkeeper-stats', 'buildtools', 'circe-checksum', 'circe-checksum:src:main:circe', From da3911ac5131b9974cf1852a72d4f8b524e192f7 Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 3 May 2022 23:01:26 +0800 Subject: [PATCH 4/6] fix findbugs failed --- .../apache/bookkeeper/bookie/DefaultEntryLogger.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 69a049bac2b..46690300a53 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -461,11 +461,15 @@ public Set getFlushedLogIds() { Set logIds = new HashSet<>(); synchronized (recentlyCreatedEntryLogsStatus) { for (File dir : ledgerDirsManager.getAllLedgerDirs()) { - for (File f : Objects.requireNonNull(dir.listFiles(file -> file.getName().endsWith(".log")))) { - long logId = fileName2LogId(f.getName()); - if (recentlyCreatedEntryLogsStatus.isFlushedLogId(logId)) { - logIds.add(logId); + try { + for (File f : Objects.requireNonNull(dir.listFiles(file -> file.getName().endsWith(".log")))) { + long logId = fileName2LogId(f.getName()); + if (recentlyCreatedEntryLogsStatus.isFlushedLogId(logId)) { + logIds.add(logId); + } } + } catch (NullPointerException e) { + LOG.error("Failed get log files from {} ", dir, e); } } } From 37cba24ff4286e678643c6082a7febcbc0beb866 Mon Sep 17 00:00:00 2001 From: chenhang Date: Wed, 4 May 2022 00:28:11 +0800 Subject: [PATCH 5/6] fix findbugs failed --- .../bookkeeper/bookie/DefaultEntryLogger.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 46690300a53..0dcedcc0a53 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -461,15 +461,16 @@ public Set getFlushedLogIds() { Set logIds = new HashSet<>(); synchronized (recentlyCreatedEntryLogsStatus) { for (File dir : ledgerDirsManager.getAllLedgerDirs()) { - try { - for (File f : Objects.requireNonNull(dir.listFiles(file -> file.getName().endsWith(".log")))) { - long logId = fileName2LogId(f.getName()); - if (recentlyCreatedEntryLogsStatus.isFlushedLogId(logId)) { - logIds.add(logId); + if (dir.exists() && dir.isDirectory()) { + File[] files = dir.listFiles(file -> file.getName().endsWith(".log")); + if (files != null && files.length > 0) { + for (File f : files) { + long logId = fileName2LogId(f.getName()); + if (recentlyCreatedEntryLogsStatus.isFlushedLogId(logId)) { + logIds.add(logId); + } } } - } catch (NullPointerException e) { - LOG.error("Failed get log files from {} ", dir, e); } } } From f54941864af1ea020ee058e75f413807d02ff2fb Mon Sep 17 00:00:00 2001 From: chenhang Date: Wed, 4 May 2022 09:07:37 +0800 Subject: [PATCH 6/6] format code --- .../java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 0dcedcc0a53..1beae25370d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -50,7 +50,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap;