diff --git a/bookkeeper-server/build.gradle b/bookkeeper-server/build.gradle
index 18a43d2d5fb..11d0b2a88f6 100644
--- a/bookkeeper-server/build.gradle
+++ b/bookkeeper-server/build.gradle
@@ -31,6 +31,9 @@ dependencies {
implementation project(':circe-checksum')
implementation project(':cpu-affinity')
implementation project(':stats:bookkeeper-stats-api')
+ implementation project(':bookkeeper-slogger:api')
+ implementation project(':bookkeeper-slogger:slf4j')
+ implementation project(':bookkeeper-stats')
compileOnly depLibs.lombok
compileOnly depLibs.spotbugsAnnotations
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index bc7886d7bfa..f9ab427bfa4 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -40,6 +40,16 @@
bookkeeper-proto
${project.parent.version}
+
+ org.apache.bookkeeper
+ bookkeeper-slogger-slf4j
+ ${project.parent.version}
+
+
+ org.apache.bookkeeper
+ bookkeeper-slogger-api
+ ${project.parent.version}
+
org.apache.bookkeeper
bookkeeper-tools-framework
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java
index 832cb57d1a6..1798391fc76 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java
@@ -22,17 +22,11 @@
package org.apache.bookkeeper.bookie;
import java.io.IOException;
-import org.apache.bookkeeper.bookie.storage.EntryLogger;
/**
* Interface that identifies LedgerStorage implementations using EntryLogger and running periodic entries compaction.
*/
public interface CompactableLedgerStorage extends LedgerStorage {
- /**
- * @return the EntryLogger used by the ledger storage
- */
- EntryLogger getEntryLogger();
-
/**
* Get an iterator over a range of ledger ids stored in the bookie.
*
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
index dc90811e839..1beae25370d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
@@ -47,6 +47,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -68,6 +69,7 @@
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.BiConsumerLong;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -448,38 +450,30 @@ public BufferedReadChannel getFromChannels(long logId) {
return logid2Channel.get().get(logId);
}
- /**
- * Get the least unflushed log id. Garbage collector thread should not process
- * unflushed entry log file.
- *
- * @return least unflushed log id.
- */
- @Override
- public long getLeastUnflushedLogId() {
+ @VisibleForTesting
+ long getLeastUnflushedLogId() {
return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
}
- /**
- * Get the last log id created so far. If entryLogPerLedger is enabled, the Garbage Collector
- * process needs to look beyond the least unflushed entry log file, as there may be entry logs
- * ready to be garbage collected.
- *
- * @return last entry log id created.
- */
- @Override
- public long getLastLogId() {
- return recentlyCreatedEntryLogsStatus.getLastLogId();
- }
-
- /**
- * Returns whether the current log id exists and has been rotated already.
- *
- * @param entryLogId EntryLog id to check.
- * @return Whether the given entryLogId exists and has been rotated.
- */
@Override
- public boolean isFlushedEntryLog(Long entryLogId) {
- return recentlyCreatedEntryLogsStatus.isFlushedEntryLog(entryLogId);
+ public Set getFlushedLogIds() {
+ Set logIds = new HashSet<>();
+ synchronized (recentlyCreatedEntryLogsStatus) {
+ for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
+ if (dir.exists() && dir.isDirectory()) {
+ File[] files = dir.listFiles(file -> file.getName().endsWith(".log"));
+ if (files != null && files.length > 0) {
+ for (File f : files) {
+ long logId = fileName2LogId(f.getName());
+ if (recentlyCreatedEntryLogsStatus.isFlushedLogId(logId)) {
+ logIds.add(logId);
+ }
+ }
+ }
+ }
+ }
+ }
+ return logIds;
}
long getPreviousAllocatedEntryLogId() {
@@ -1249,7 +1243,7 @@ static class RecentEntryLogsStatus {
private long leastUnflushedLogId;
RecentEntryLogsStatus(long leastUnflushedLogId) {
- entryLogsStatusMap = new TreeMap();
+ entryLogsStatusMap = new TreeMap<>();
this.leastUnflushedLogId = leastUnflushedLogId;
}
@@ -1270,13 +1264,8 @@ synchronized long getLeastUnflushedLogId() {
return leastUnflushedLogId;
}
- synchronized long getLastLogId() {
- return !entryLogsStatusMap.isEmpty() ? entryLogsStatusMap.lastKey() : 0;
- }
-
- synchronized boolean isFlushedEntryLog(Long entryLogId) {
- return entryLogsStatusMap.containsKey(entryLogId) && entryLogsStatusMap.get(entryLogId)
- || entryLogId < leastUnflushedLogId;
+ synchronized boolean isFlushedLogId(long entryLogId) {
+ return entryLogsStatusMap.getOrDefault(entryLogId, Boolean.FALSE) || entryLogId < leastUnflushedLogId;
}
}
@@ -1354,7 +1343,7 @@ public void makeAvailable() throws IOException {
}
}
@Override
- public void cleanup() {
+ public void finalizeAndCleanup() {
if (compactedLogFile.exists()) {
if (!compactedLogFile.delete()) {
LOG.warn("Could not delete file: {}", compactedLogFile);
@@ -1368,11 +1357,11 @@ public void cleanup() {
}
@Override
- public long getLogId() {
+ public long getDstLogId() {
return compactionLogId;
}
@Override
- public long getCompactedLogId() {
+ public long getSrcLogId() {
return logIdToCompact;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java
index 88f6ce53986..adaf8330a29 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java
@@ -22,7 +22,6 @@
package org.apache.bookkeeper.bookie;
import java.io.Closeable;
-import java.io.IOException;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
@@ -37,7 +36,7 @@ public interface EntryLogMetadataMap extends Closeable {
*
* @param entryLogId
* @return
- * @throws IOException
+ * @throws EntryLogMetadataMapException
*/
boolean containsKey(long entryLogId) throws EntryLogMetadataMapException;
@@ -46,7 +45,7 @@ public interface EntryLogMetadataMap extends Closeable {
*
* @param entryLogId
* @param entryLogMeta
- * @throws IOException
+ * @throws EntryLogMetadataMapException
*/
void put(long entryLogId, EntryLogMetadata entryLogMeta) throws EntryLogMetadataMapException;
@@ -55,7 +54,7 @@ public interface EntryLogMetadataMap extends Closeable {
* have been processed or the action throws an exception.
*
* @param action
- * @throws IOException
+ * @throws EntryLogMetadataMapException
*/
void forEach(BiConsumer action) throws EntryLogMetadataMapException;
@@ -63,7 +62,7 @@ public interface EntryLogMetadataMap extends Closeable {
* Removes entryLogMetadata record from the map.
*
* @param entryLogId
- * @throws IOException
+ * @throws EntryLogMetadataMapException
*/
void remove(long entryLogId) throws EntryLogMetadataMapException;
@@ -71,8 +70,24 @@ public interface EntryLogMetadataMap extends Closeable {
* Returns number of entryLogMetadata records presents into the map.
*
* @return
- * @throws IOException
+ * @throws EntryLogMetadataMapException
*/
int size() throws EntryLogMetadataMapException;
+ /**
+ * Returns true if there are no elements in the map.
+ *
+ * @return
+ */
+ default boolean isEmpty() throws EntryLogMetadataMapException {
+ return size() == 0;
+ }
+
+ /**
+ * Clear all records from the map.
+ * For unit tests.
+ *
+ * @throws EntryLogMetadataMapException
+ */
+ void clear() throws EntryLogMetadataMapException;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index fba0bec90fa..1279f2dc885 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -33,9 +33,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
import lombok.Getter;
@@ -111,9 +109,6 @@ public class GarbageCollectorThread extends SafeRunnable {
volatile boolean running = true;
- // track the last scanned successfully log id
- long scannedLogId = 0;
-
// Boolean to trigger a forced GC.
final AtomicBoolean forceGarbageCollection = new AtomicBoolean(false);
// Boolean to disable major compaction, when disk is almost full
@@ -139,8 +134,9 @@ public class GarbageCollectorThread extends SafeRunnable {
public GarbageCollectorThread(ServerConfiguration conf, LedgerManager ledgerManager,
final LedgerDirsManager ledgerDirsManager,
final CompactableLedgerStorage ledgerStorage,
+ EntryLogger entryLogger,
StatsLogger statsLogger) throws IOException {
- this(conf, ledgerManager, ledgerDirsManager, ledgerStorage, statsLogger,
+ this(conf, ledgerManager, ledgerDirsManager, ledgerStorage, entryLogger, statsLogger,
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("GarbageCollectorThread")));
}
@@ -155,6 +151,7 @@ public GarbageCollectorThread(ServerConfiguration conf,
LedgerManager ledgerManager,
final LedgerDirsManager ledgerDirsManager,
final CompactableLedgerStorage ledgerStorage,
+ EntryLogger entryLogger,
StatsLogger statsLogger,
ScheduledExecutorService gcExecutor)
throws IOException {
@@ -162,7 +159,7 @@ public GarbageCollectorThread(ServerConfiguration conf,
this.conf = conf;
this.ledgerDirsManager = ledgerDirsManager;
- this.entryLogger = ledgerStorage.getEntryLogger();
+ this.entryLogger = entryLogger;
this.entryLogMetaMap = createEntryLogMetadataMap();
this.ledgerStorage = ledgerStorage;
this.gcWaitTime = conf.getGcWaitTime();
@@ -214,8 +211,7 @@ public void removeEntryLog(long logToRemove) {
}
};
if (conf.getUseTransactionalCompaction()) {
- this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage,
- ledgerDirsManager, remover);
+ this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
} else {
this.compactor = new EntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
}
@@ -678,15 +674,7 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) {
* @throws EntryLogMetadataMapException
*/
protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException {
- // Entry Log ID's are just a long value that starts at 0 and increments by 1 when the log fills up and we roll
- // to a new one. We scan entry logs as follows:
- // - entryLogPerLedgerEnabled is false: Extract it for every entry log except for the current one (un-flushed).
- // - entryLogPerLedgerEnabled is true: Scan all flushed entry logs up to the highest known id.
- Supplier finalEntryLog = () -> conf.isEntryLogPerLedgerEnabled() ? entryLogger.getLastLogId() :
- entryLogger.getLeastUnflushedLogId();
- boolean hasExceptionWhenScan = false;
- boolean increaseScannedLogId = true;
- for (long entryLogId = scannedLogId; entryLogId < finalEntryLog.get(); entryLogId++) {
+ for (long entryLogId : entryLogger.getFlushedLogIds()) {
// Comb the current entry log file if it has not already been extracted.
if (entryLogMetaMap.containsKey(entryLogId)) {
continue;
@@ -698,15 +686,6 @@ protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException {
continue;
}
- // If entryLogPerLedgerEnabled is true, we will look for entry log files beyond getLeastUnflushedLogId()
- // that have been explicitly rotated or below getLeastUnflushedLogId().
- if (conf.isEntryLogPerLedgerEnabled() && !entryLogger.isFlushedEntryLog(entryLogId)) {
- LOG.info("Entry log {} not flushed (entryLogPerLedgerEnabled). Starting next iteration at this point.",
- entryLogId);
- increaseScannedLogId = false;
- continue;
- }
-
LOG.info("Extracting entry log meta from entryLogId: {}", entryLogId);
try {
@@ -722,18 +701,9 @@ protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException {
entryLogMetaMap.put(entryLogId, entryLogMeta);
}
} catch (IOException e) {
- hasExceptionWhenScan = true;
LOG.warn("Premature exception when processing " + entryLogId
+ " recovery will take care of the problem", e);
}
-
- // if scan failed on some entry log, we don't move 'scannedLogId' to next id
- // if scan succeed, we don't need to scan it again during next gc run,
- // we move 'scannedLogId' to next id (unless entryLogPerLedgerEnabled is true
- // and we have found and un-flushed entry log already).
- if (!hasExceptionWhenScan && (!conf.isEntryLogPerLedgerEnabled() || increaseScannedLogId)) {
- ++scannedLogId;
- }
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java
index 106a382f543..4949c16441f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java
@@ -57,6 +57,16 @@ public int size() {
return entryLogMetaMap.size();
}
+ @Override
+ public boolean isEmpty() {
+ return entryLogMetaMap.isEmpty();
+ }
+
+ @Override
+ public void clear() {
+ entryLogMetaMap.clear();
+ }
+
@Override
public void close() throws IOException {
entryLogMetaMap.clear();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index a071fd5f40d..82ee5ca6098 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -190,7 +190,7 @@ public void initializeWithEntryLogger(ServerConfiguration conf,
ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager,
- this, statsLogger.scope("gc"));
+ this, entryLogger, statsLogger.scope("gc"));
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
// Expose Stats
getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
@@ -512,8 +512,7 @@ public void flushEntriesLocationsIndex() throws IOException {
ledgerCache.flushLedger(true);
}
- @Override
- public EntryLogger getEntryLogger() {
+ public DefaultEntryLogger getEntryLogger() {
return entryLogger;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 89a429b7627..f79cf0ef927 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -37,7 +37,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
-import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
@@ -261,8 +260,7 @@ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
@Override
public void checkpoint(final Checkpoint checkpoint) throws IOException {
long numBytesFlushed = memTable.flush(this, checkpoint);
- ((DefaultEntryLogger) interleavedLedgerStorage.getEntryLogger())
- .prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
+ interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
interleavedLedgerStorage.checkpoint(checkpoint);
}
@@ -317,9 +315,9 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException {
public void run() {
try {
LOG.info("Started flushing mem table.");
- ((DefaultEntryLogger) interleavedLedgerStorage.getEntryLogger()).prepareEntryMemTableFlush();
+ interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush();
memTable.flush(SortedLedgerStorage.this);
- if (((DefaultEntryLogger) interleavedLedgerStorage.getEntryLogger()).commitEntryMemTableFlush()) {
+ if (interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) {
interleavedLedgerStorage.checkpointer.startCheckpoint(cp);
}
} catch (Exception e) {
@@ -342,8 +340,7 @@ BookieStateManager getStateManager(){
return (BookieStateManager) stateManager;
}
- @Override
- public EntryLogger getEntryLogger() {
+ public DefaultEntryLogger getEntryLogger() {
return interleavedLedgerStorage.getEntryLogger();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
index 7c7307c3714..f1f3ab747bb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
@@ -48,24 +48,21 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
final EntryLogger entryLogger;
final CompactableLedgerStorage ledgerStorage;
- final LedgerDirsManager ledgerDirsManager;
final List offsets = new ArrayList<>();
// compaction log file suffix
- static final String COMPACTING_SUFFIX = ".log.compacting";
+ public static final String COMPACTING_SUFFIX = ".log.compacting";
// flushed compaction log file suffix
- static final String COMPACTED_SUFFIX = ".compacted";
+ public static final String COMPACTED_SUFFIX = ".compacted";
public TransactionalEntryLogCompactor(
ServerConfiguration conf,
EntryLogger entryLogger,
CompactableLedgerStorage ledgerStorage,
- LedgerDirsManager ledgerDirsManager,
LogRemovalListener logRemover) {
super(conf, logRemover);
this.entryLogger = entryLogger;
this.ledgerStorage = ledgerStorage;
- this.ledgerDirsManager = ledgerDirsManager;
}
/**
@@ -300,8 +297,8 @@ boolean complete() {
// When index is flushed, and entry log is removed,
// delete the ".compacted" file to indicate this phase is completed.
offsets.clear();
- compactionLog.cleanup();
- logRemovalListener.removeEntryLog(compactionLog.getCompactedLogId());
+ compactionLog.finalizeAndCleanup();
+ logRemovalListener.removeEntryLog(compactionLog.getSrcLogId());
return true;
}
@@ -330,11 +327,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio
ledgerId, lid, entryId, offset);
throw new IOException("Invalid entry found @ offset " + offset);
}
- long location = (compactionLog.getLogId() << 32L) | (offset + 4);
+ long location = (compactionLog.getDstLogId() << 32L) | (offset + 4);
offsets.add(new EntryLocation(lid, entryId, location));
}
});
- LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactionLog.getLogId());
+ LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactionLog.getDstLogId());
}
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java
index 60806add86d..fcbfcf702f2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java
@@ -75,17 +75,16 @@ public interface CompactionEntryLog {
/**
* Clean up any temporary resources that were used by the compaction process.
- * At this point, there
*/
- void cleanup();
+ void finalizeAndCleanup();
/**
* Get the log ID of the entrylog to which compacted entries are being written.
*/
- long getLogId();
+ long getDstLogId();
/**
* Get the log ID of the entrylog which is being compacted.
*/
- long getCompactedLogId();
+ long getSrcLogId();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java
index cab72cd587c..a4aa93ed266 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java
@@ -89,21 +89,12 @@ ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
Collection incompleteCompactionLogs();
/**
- * Get the last log id created so far. If entryLogPerLedger is enabled, the Garbage Collector
- * process needs to look beyond the least unflushed entry log file, as there may be entry logs
- * ready to be garbage collected.
- *
- * @return last entry log id created.
- */
- long getLastLogId();
-
- /**
- * Get the lowest entrylog ID which has not had all its data persisted to
+ * Get the log ids for the set of logs which have been completely flushed to
* disk.
- * This is used by compaction. Any entrylog ID higher or equal to this value
- * will not be considered for compaction.
+ * Only log ids in this set are considered for either compaction or garbage
+ * collection.
*/
- long getLeastUnflushedLogId();
+ Collection getFlushedLogIds();
/**
* Scan the given entrylog, returning all entries contained therein.
@@ -133,14 +124,6 @@ default EntryLogMetadata getEntryLogMetadata(long entryLogId) throws IOException
*/
boolean logExists(long logId);
- /**
- * Returns whether the current log id exists and has been rotated already.
- *
- * @param entryLogId EntryLog id to check.
- * @return Whether the given entryLogId exists and has been rotated.
- */
- boolean isFlushedEntryLog(Long entryLogId);
-
/**
* Delete the entrylog with the given ID.
* @return false if the entrylog doesn't exist.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java
index ec289fd16c2..35c53fe620a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java
@@ -196,6 +196,21 @@ public int size() throws EntryLogMetadataMapException {
}
}
+ @Override
+ public void clear() throws EntryLogMetadataMapException {
+ try {
+ try (KeyValueStorage.Batch b = metadataMapDB.newBatch();
+ CloseableIterator itr = metadataMapDB.keys()) {
+ while (itr.hasNext()) {
+ b.remove(itr.next());
+ }
+ b.flush();
+ }
+ } catch (IOException e) {
+ throw new EntryLogMetadataMapException(e);
+ }
+ }
+
@Override
public void close() throws IOException {
if (isClosed.compareAndSet(false, true)) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 63f2f5a1b7e..7aeea2bc6df 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -187,7 +187,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager, null, statsLogger, allocator);
- gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager, this, statsLogger);
+ gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager, this, entryLogger, statsLogger);
dbLedgerStorageStats = new DbLedgerStorageStats(
ledgerDirStatsLogger,
@@ -864,8 +864,8 @@ public void updateEntriesLocations(Iterable locations) throws IOE
entryLocationIndex.updateLocations(locations);
}
- @Override
- public EntryLogger getEntryLogger() {
+ @VisibleForTesting
+ EntryLogger getEntryLogger() {
return entryLogger;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
index ec1d232ab1b..6db4e32ad7c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
@@ -891,4 +891,20 @@ private static void checkBiggerEqualZero(long n) {
throw new IllegalArgumentException("Keys and values must be >= 0");
}
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ConcurrentLongLongHashMap{");
+
+ int headerLen = sb.length();
+ forEach((k, v) -> {
+ sb.append(k).append(" => ").append(v).append(", ");
+ });
+ if (sb.length() > headerLen) {
+ sb.setLength(sb.length() - 2);
+ }
+ sb.append("}");
+ return sb.toString();
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 9005fef748b..99fdabe23d4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -45,10 +45,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -87,7 +85,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* This class tests the entry log compaction functionality.
*/
@@ -723,7 +720,7 @@ public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception {
bkc.deleteLedger(lhs[2].getId());
// Need to wait until entry log 3 gets flushed before initiating GC to satisfy assertions.
- while (!getGCThread().entryLogger.isFlushedEntryLog(3L)) {
+ while (!getGCThread().entryLogger.getFlushedLogIds().contains(3L)) {
TimeUnit.MILLISECONDS.sleep(100);
}
@@ -1509,80 +1506,6 @@ public void checkpointComplete(Checkpoint checkpoint,
storage.gcThread.doCompactEntryLogs(threshold, limit);
}
- /**
- * Test extractMetaFromEntryLogs optimized method to avoid excess memory usage.
- */
- public void testExtractMetaFromEntryLogs() throws Exception {
- // restart bookies
- restartBookies(c -> {
- // Always run this test with Throttle enabled.
- c.setIsThrottleByBytes(true);
- return c;
- });
- ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
- File tmpDir = tmpDirs.createNew("bkTest", ".dir");
- File curDir = BookieImpl.getCurrentDirectory(tmpDir);
- BookieImpl.checkDirectoryStructure(curDir);
- conf.setLedgerDirNames(new String[] { tmpDir.toString() });
-
- LedgerDirsManager dirs = new LedgerDirsManager(conf, conf.getLedgerDirs(),
- new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
- final Set ledgers = Collections
- .newSetFromMap(new ConcurrentHashMap());
-
- LedgerManager manager = getLedgerManager(ledgers);
-
- CheckpointSource checkpointSource = new CheckpointSource() {
-
- @Override
- public Checkpoint newCheckpoint() {
- return null;
- }
-
- @Override
- public void checkpointComplete(Checkpoint checkpoint,
- boolean compact) throws IOException {
- }
- };
- InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
- storage.initialize(conf, manager, dirs, dirs,
- NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
- storage.setCheckpointSource(checkpointSource);
- storage.setCheckpointer(Checkpointer.NULL);
-
-
- for (long ledger = 0; ledger <= 10; ledger++) {
- ledgers.add(ledger);
- for (int entry = 1; entry <= 50; entry++) {
- try {
- storage.addEntry(genEntry(ledger, entry, ENTRY_SIZE));
- } catch (IOException e) {
- //ignore exception on failure to add entry.
- }
- }
- }
-
- storage.flush();
- storage.shutdown();
-
- storage = new InterleavedLedgerStorage();
- storage.initialize(conf, manager, dirs, dirs, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
- storage.setCheckpointSource(checkpointSource);
- storage.setCheckpointer(Checkpointer.NULL);
-
- long startingEntriesCount = storage.gcThread.entryLogger.getLeastUnflushedLogId()
- - storage.gcThread.scannedLogId;
- LOG.info("The old Log Entry count is: " + startingEntriesCount);
-
- Map entryLogMetaData = new HashMap<>();
- long finalEntriesCount = storage.gcThread.entryLogger.getLeastUnflushedLogId()
- - storage.gcThread.scannedLogId;
- LOG.info("The latest Log Entry count is: " + finalEntriesCount);
-
- assertTrue("The GC did not clean up entries...", startingEntriesCount != finalEntriesCount);
- assertTrue("Entries Count is zero", finalEntriesCount == 0);
- }
-
private ByteBuf genEntry(long ledger, long entry, int size) {
ByteBuf bb = Unpooled.buffer(size);
bb.writeLong(ledger);
@@ -1885,7 +1808,6 @@ public MockTransactionalEntryLogCompactor(GarbageCollectorThread gcThread) {
super(gcThread.conf,
gcThread.entryLogger,
gcThread.ledgerStorage,
- gcThread.ledgerDirsManager,
(long entry) -> {
try {
gcThread.removeEntryLog(entry);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java
index 9ed271ecfca..923cf1c1ae6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java
@@ -20,18 +20,38 @@
*/
package org.apache.bookkeeper.bookie;
+import static org.apache.bookkeeper.bookie.storage.EntryLogTestUtils.logIdFromLocation;
+import static org.apache.bookkeeper.bookie.storage.EntryLogTestUtils.makeEntry;
+import static org.apache.bookkeeper.bookie.storage.EntryLogTestUtils.newDirsManager;
+import static org.apache.bookkeeper.bookie.storage.EntryLogTestUtils.newLegacyEntryLogger;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.openMocks;
+import java.io.File;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.MockLedgerManager;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.test.TmpDirs;
+
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -39,15 +59,16 @@
import org.mockito.Mock;
import org.mockito.Spy;
import org.powermock.reflect.Whitebox;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Unit test for {@link GarbageCollectorThread}.
*/
@SuppressWarnings("deprecation")
public class GarbageCollectorThreadTest {
- private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThreadTest.class);
+ private static final Slogger slog = Slogger.CONSOLE;
+
+ private final TmpDirs tmpDirs = new TmpDirs();
+
@InjectMocks
@Spy
private GarbageCollectorThread mockGCThread;
@@ -64,10 +85,15 @@ public class GarbageCollectorThreadTest {
@Before
public void setUp() throws Exception {
- when(ledgerStorage.getEntryLogger()).thenReturn(mock(DefaultEntryLogger.class));
+ conf.setAllowLoopback(true);
openMocks(this);
}
+ @After
+ public void cleanup() throws Exception {
+ tmpDirs.cleanup();
+ }
+
@Test
public void testCompactEntryLogWithException() throws Exception {
AbstractLogCompactor mockCompactor = mock(AbstractLogCompactor.class);
@@ -88,25 +114,121 @@ public void testCalculateUsageBucket() {
// Valid range for usage is [0.0 to 1.0]
final int numBuckets = 10;
int[] usageBuckets = new int[numBuckets];
+ String[] bucketNames = new String[numBuckets];
for (int i = 0; i < numBuckets; i++) {
usageBuckets[i] = 0;
+ bucketNames[i] = String.format("%d%%", (i + 1) * 10);
}
int items = 10000;
+
for (int item = 0; item <= items; item++) {
double usage = ((double) item / (double) items);
int index = mockGCThread.calculateUsageIndex(numBuckets, usage);
- Assert.assertFalse("Boundary condition exceeded", index < 0 || index >= numBuckets);
- LOG.debug("Mapped {} usage to {}}\n", usage, index);
+ assertFalse("Boundary condition exceeded", index < 0 || index >= numBuckets);
+ slog.kv("usage", usage)
+ .kv("index", index)
+ .info("Mapped usage to index");
usageBuckets[index]++;
}
- LOG.info(
- "Compaction: entry log usage buckets[10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}",
- usageBuckets);
+
+ Slogger sl = slog.ctx();
+ for (int i = 0; i < numBuckets; i++) {
+ sl = sl.kv(bucketNames[i], usageBuckets[i]);
+ }
+ sl.info("Compaction: entry log usage buckets");
+
int sum = 0;
for (int i = 0; i < numBuckets; i++) {
sum += usageBuckets[i];
}
Assert.assertEquals("Incorrect number of items", items + 1, sum);
}
+
+ @Test
+ public void testExtractMetaFromEntryLogsLegacy() throws Exception {
+ File ledgerDir = tmpDirs.createNew("testExtractMeta", "ledgers");
+ testExtractMetaFromEntryLogs(
+ newLegacyEntryLogger(20000, ledgerDir), ledgerDir);
+ }
+
+ private void testExtractMetaFromEntryLogs(EntryLogger entryLogger, File ledgerDir)
+ throws Exception {
+
+ MockLedgerStorage storage = new MockLedgerStorage();
+ MockLedgerManager lm = new MockLedgerManager();
+
+ GarbageCollectorThread gcThread = new GarbageCollectorThread(
+ TestBKConfiguration.newServerConfiguration(), lm,
+ newDirsManager(ledgerDir),
+ storage, entryLogger,
+ NullStatsLogger.INSTANCE);
+
+ // Add entries.
+ // Ledger 1 is on first entry log
+ // Ledger 2 spans first, second and third entry log
+ // Ledger 3 is on the third entry log (which is still active when extract meta)
+ long loc1 = entryLogger.addEntry(1L, makeEntry(1L, 1L, 5000));
+ long loc2 = entryLogger.addEntry(2L, makeEntry(2L, 1L, 5000));
+ assertThat(logIdFromLocation(loc2), equalTo(logIdFromLocation(loc1)));
+ long loc3 = entryLogger.addEntry(2L, makeEntry(2L, 1L, 15000));
+ assertThat(logIdFromLocation(loc3), greaterThan(logIdFromLocation(loc2)));
+ long loc4 = entryLogger.addEntry(2L, makeEntry(2L, 1L, 15000));
+ assertThat(logIdFromLocation(loc4), greaterThan(logIdFromLocation(loc3)));
+ long loc5 = entryLogger.addEntry(3L, makeEntry(3L, 1L, 1000));
+ assertThat(logIdFromLocation(loc5), equalTo(logIdFromLocation(loc4)));
+
+ long logId1 = logIdFromLocation(loc2);
+ long logId2 = logIdFromLocation(loc3);
+ long logId3 = logIdFromLocation(loc5);
+ entryLogger.flush();
+
+ storage.setMasterKey(1L, new byte[0]);
+ storage.setMasterKey(2L, new byte[0]);
+ storage.setMasterKey(3L, new byte[0]);
+
+ assertThat(entryLogger.getFlushedLogIds(), containsInAnyOrder(logId1, logId2));
+ assertTrue(entryLogger.logExists(logId3));
+
+ // all ledgers exist, nothing should disappear
+ final EntryLogMetadataMap entryLogMetaMap = gcThread.getEntryLogMetaMap();
+ gcThread.extractMetaFromEntryLogs();
+
+ assertThat(entryLogger.getFlushedLogIds(), containsInAnyOrder(logId1, logId2));
+ assertTrue(entryLogMetaMap.containsKey(logId1));
+ assertTrue(entryLogMetaMap.containsKey(logId2));
+ assertTrue(entryLogger.logExists(logId3));
+
+ // log 2 is 100% ledger 2, so it should disappear if ledger 2 is deleted
+ entryLogMetaMap.clear();
+ storage.deleteLedger(2L);
+ gcThread.extractMetaFromEntryLogs();
+
+ assertThat(entryLogger.getFlushedLogIds(), containsInAnyOrder(logId1));
+ assertTrue(entryLogMetaMap.containsKey(logId1));
+ assertTrue(entryLogger.logExists(logId3));
+
+ // delete all ledgers, all logs except the current should be deleted
+ entryLogMetaMap.clear();
+ storage.deleteLedger(1L);
+ storage.deleteLedger(3L);
+ gcThread.extractMetaFromEntryLogs();
+
+ assertThat(entryLogger.getFlushedLogIds(), empty());
+ assertTrue(entryLogMetaMap.isEmpty());
+ assertTrue(entryLogger.logExists(logId3));
+
+ // add enough entries to roll log, log 3 can not be GC'd
+ long loc6 = entryLogger.addEntry(3L, makeEntry(3L, 1L, 25000));
+ assertThat(logIdFromLocation(loc6), greaterThan(logIdFromLocation(loc5)));
+ entryLogger.flush();
+ assertThat(entryLogger.getFlushedLogIds(), containsInAnyOrder(logId3));
+
+ entryLogMetaMap.clear();
+ gcThread.extractMetaFromEntryLogs();
+
+ assertThat(entryLogger.getFlushedLogIds(), empty());
+ assertTrue(entryLogMetaMap.isEmpty());
+ assertFalse(entryLogger.logExists(logId3));
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
index 7afa816af99..e71750d5104 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -371,7 +371,7 @@ conf, new TestBookieImpl(conf),
}
handle.close();
// simulate rolling entrylog
- ((EntryLogManagerBase) ((DefaultEntryLogger) ledgerStorage.getEntryLogger()).getEntryLogManager())
+ ((EntryLogManagerBase) ledgerStorage.getEntryLogger().getEntryLogManager())
.createNewLog(ledgerId);
// sleep for a bit for checkpoint to do its task
executorController.advance(Duration.ofMillis(500));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
index 06703966f25..f6ed19f6fed 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
@@ -41,7 +41,7 @@
/**
* A mock for running tests that require ledger storage.
*/
-public class MockLedgerStorage implements LedgerStorage {
+public class MockLedgerStorage implements CompactableLedgerStorage {
private static class LedgerInfo {
boolean limbo = false;
@@ -242,7 +242,7 @@ public void checkpoint(Checkpoint checkpoint) throws IOException {
@Override
public void deleteLedger(long ledgerId) throws IOException {
- throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
+ ledgers.remove(ledgerId);
}
@Override
@@ -262,32 +262,32 @@ public ByteBuf getExplicitLac(long ledgerId) throws IOException {
@Override
public LedgerStorage getUnderlyingLedgerStorage() {
- return LedgerStorage.super.getUnderlyingLedgerStorage();
+ return CompactableLedgerStorage.super.getUnderlyingLedgerStorage();
}
@Override
public void forceGC() {
- LedgerStorage.super.forceGC();
+ CompactableLedgerStorage.super.forceGC();
}
@Override
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
- LedgerStorage.super.forceGC(forceMajor, forceMinor);
+ CompactableLedgerStorage.super.forceGC(forceMajor, forceMinor);
}
@Override
public List localConsistencyCheck(Optional rateLimiter) throws IOException {
- return LedgerStorage.super.localConsistencyCheck(rateLimiter);
+ return CompactableLedgerStorage.super.localConsistencyCheck(rateLimiter);
}
@Override
public boolean isInForceGC() {
- return LedgerStorage.super.isInForceGC();
+ return CompactableLedgerStorage.super.isInForceGC();
}
@Override
public List getGarbageCollectionStatus() {
- return LedgerStorage.super.getGarbageCollectionStatus();
+ return CompactableLedgerStorage.super.getGarbageCollectionStatus();
}
@Override
@@ -295,6 +295,22 @@ public PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws I
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
}
+ @Override
+ public Iterable getActiveLedgersInRange(long firstLedgerId, long lastLedgerId)
+ throws IOException {
+ throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
+ }
+
+ @Override
+ public void updateEntriesLocations(Iterable locations) throws IOException {
+ throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
+ }
+
+ @Override
+ public void flushEntriesLocationsIndex() throws IOException {
+ throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
+ }
+
@Override
public EnumSet getStorageStateFlags() throws IOException {
return storageStateFlags;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index 07b0018683e..6ad6737b634 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -224,12 +224,11 @@ public void testCheckpointAfterEntryLogRotated() throws Exception {
});
// simulate entry log is rotated (due to compaction)
+ DefaultEntryLogger elogger = storage.getEntryLogger();
EntryLogManagerForSingleEntryLog entryLogManager =
- (EntryLogManagerForSingleEntryLog) ((DefaultEntryLogger) storage.getEntryLogger()).getEntryLogManager();
+ (EntryLogManagerForSingleEntryLog) elogger.getEntryLogManager();
entryLogManager.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID);
- long leastUnflushedLogId = storage.getEntryLogger().getLeastUnflushedLogId();
long currentLogId = entryLogManager.getCurrentLogId();
- log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId);
readyLatch.countDown();
assertNull(checkpoints.poll());
@@ -246,8 +245,8 @@ public void testCheckpointAfterEntryLogRotated() throws Exception {
assertEquals(0, storage.memTable.kvmap.size());
assertTrue(
"current log " + currentLogId + " contains entries added from memtable should be forced to disk"
- + " but least unflushed log is " + storage.getEntryLogger().getLeastUnflushedLogId(),
- storage.getEntryLogger().getLeastUnflushedLogId() > currentLogId);
+ + " but flushed logs are " + elogger.getFlushedLogIds(),
+ elogger.getFlushedLogIds().contains(currentLogId));
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/EntryLogTestUtils.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/EntryLogTestUtils.java
new file mode 100644
index 00000000000..22e607a2236
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/EntryLogTestUtils.java
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.util.Arrays;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+
+/**
+ * EntryLogTestUtils.
+ */
+public class EntryLogTestUtils {
+ private static final Slogger slog = Slogger.CONSOLE;
+
+ public static LedgerDirsManager newDirsManager(File... ledgerDir) throws Exception {
+ return new LedgerDirsManager(
+ new ServerConfiguration(), ledgerDir, new DiskChecker(0.999f, 0.999f));
+ }
+
+ public static EntryLogger newLegacyEntryLogger(int logSizeLimit, File... ledgerDir) throws Exception {
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setEntryLogSizeLimit(logSizeLimit);
+ return new DefaultEntryLogger(conf, newDirsManager(ledgerDir), null,
+ NullStatsLogger.INSTANCE, ByteBufAllocator.DEFAULT);
+ }
+
+ public static int logIdFromLocation(long location) {
+ return (int) (location >> 32);
+ }
+
+ public static ByteBuf makeEntry(long ledgerId, long entryId, int size) {
+ ByteBuf buf = Unpooled.buffer(size);
+ buf.writeLong(ledgerId).writeLong(entryId);
+ byte[] random = new byte[buf.writableBytes()];
+ Arrays.fill(random, (byte) 0xdd);
+ buf.writeBytes(random);
+ return buf;
+ }
+}
+
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index d3e628fc0ba..9258a155296 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -62,7 +62,6 @@
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
import org.apache.bookkeeper.bookie.StateManager;
-import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.api.DigestType;
@@ -672,11 +671,6 @@ public Iterable getActiveLedgersInRange(long firstLedgerId, long lastLedge
return subBkActiveLedgers.keySet();
}
- @Override
- public EntryLogger getEntryLogger() {
- return null;
- }
-
@Override
public void updateEntriesLocations(Iterable locations) throws IOException {
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 362ff50b15d..c534c6b6ed8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -43,7 +43,6 @@
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.StateManager;
-import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -264,11 +263,6 @@ public Iterable getActiveLedgersInRange(long firstLedgerId, long lastLedge
return subBkActiveLedgers.keySet();
}
- @Override
- public EntryLogger getEntryLogger() {
- return null;
- }
-
@Override
public void updateEntriesLocations(Iterable locations) throws IOException {
}
diff --git a/bookkeeper-slogger/api/build.gradle b/bookkeeper-slogger/api/build.gradle
index 0264913e640..2c232565fad 100644
--- a/bookkeeper-slogger/api/build.gradle
+++ b/bookkeeper-slogger/api/build.gradle
@@ -26,8 +26,3 @@ dependencies {
}
jar.archiveBaseName = 'bookkeeper-slogger-api'
-
-jar {
- dependsOn tasks.named("writeClasspath")
-}
-
diff --git a/bookkeeper-slogger/slf4j/build.gradle b/bookkeeper-slogger/slf4j/build.gradle
index 85653428b0d..60ed7b1fef2 100644
--- a/bookkeeper-slogger/slf4j/build.gradle
+++ b/bookkeeper-slogger/slf4j/build.gradle
@@ -28,8 +28,3 @@ dependencies {
}
jar.archiveBaseName = 'bookkeeper-slogger-slf4j'
-
-jar {
- dependsOn tasks.named("writeClasspath")
-}
-
diff --git a/settings.gradle b/settings.gradle
index a9114e1280d..040f4f86015 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -52,6 +52,9 @@ include(':bookkeeper-benchmark',
'bookkeeper-server',
'shaded:bookkeeper-server-shaded',
'shaded:bookkeeper-server-tests-shaded',
+ ':bookkeeper-slogger:api',
+ ':bookkeeper-slogger:slf4j',
+ 'bookkeeper-stats',
'buildtools',
'circe-checksum',
'circe-checksum:src:main:circe',