Skip to content

Commit

Permalink
Garbage Collection support for DirectIO entrylogger
Browse files Browse the repository at this point in the history
GC support requires that the entrylogger provides a way to retrieve
all entrylogs which have been completely flushed to disk. Previously
this was done by returning the least unflushed log id. However, this
is problematic as it doesn't support the log ids wrapping around. It
also means that GC has to start checking for log id existence from
zero every time it boots.

This change replaces getLeastUnflushedLogId() with getFlushedLogIds(),
to give the entrylogger full control of which logs should be
considered for GC.

It also changes the CompactableLedgerStorage interface, removing
getEntryLogger() and adding injection of the entrylogger to the
GarbageCollectionThread. This makes testing easier.
  • Loading branch information
Ivan Kelly authored and mauricebarnum committed Apr 7, 2022
1 parent 9c0e026 commit 853dd84
Show file tree
Hide file tree
Showing 21 changed files with 338 additions and 241 deletions.
2 changes: 2 additions & 0 deletions bookkeeper-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ dependencies {
implementation project(':bookkeeper-common-allocator')
implementation project(':bookkeeper-http:http-server')
implementation project(':bookkeeper-proto')
implementation project(':bookkeeper-slogger:api')
implementation project(':bookkeeper-slogger:slf4j')
implementation project(':bookkeeper-stats')
implementation project(':bookkeeper-tools-framework')
implementation project(':circe-checksum')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,12 @@
package org.apache.bookkeeper.bookie;

import java.io.IOException;
import org.apache.bookkeeper.bookie.storage.EntryLoggerIface;

/**
* Interface that identifies LedgerStorage implementations using EntryLogger and running periodic entries compaction.
*/
public interface CompactableLedgerStorage extends LedgerStorage {

/**
* @return the EntryLogger used by the ledger storage
*/
EntryLoggerIface getEntryLogger();

/**
* Get an iterator over a range of ledger ids stored in the bookie.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
package org.apache.bookkeeper.bookie;

import java.io.Closeable;
import java.io.IOException;
import java.util.function.BiConsumer;

import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
Expand All @@ -37,7 +36,7 @@ public interface EntryLogMetadataMap extends Closeable {
*
* @param entryLogId
* @return
* @throws IOException
* @throws EntryLogMetadataMapException
*/
boolean containsKey(long entryLogId) throws EntryLogMetadataMapException;

Expand All @@ -46,7 +45,7 @@ public interface EntryLogMetadataMap extends Closeable {
*
* @param entryLogId
* @param entryLogMeta
* @throws IOException
* @throws EntryLogMetadataMapException
*/
void put(long entryLogId, EntryLogMetadata entryLogMeta) throws EntryLogMetadataMapException;

Expand All @@ -55,24 +54,40 @@ public interface EntryLogMetadataMap extends Closeable {
* have been processed or the action throws an exception.
*
* @param action
* @throws IOException
* @throws EntryLogMetadataMapException
*/
void forEach(BiConsumer<Long, EntryLogMetadata> action) throws EntryLogMetadataMapException;

/**
* Removes entryLogMetadata record from the map.
*
* @param entryLogId
* @throws IOException
* @throws EntryLogMetadataMapException
*/
void remove(long entryLogId) throws EntryLogMetadataMapException;

/**
* Returns number of entryLogMetadata records presents into the map.
*
* @return
* @throws IOException
* @throws EntryLogMetadataMapException
*/
int size() throws EntryLogMetadataMapException;

/**
* Returns true iff there are no elements in the map.
*
* @return
*/
default boolean isEmpty() throws EntryLogMetadataMapException {
return size() == 0;
}

/**
* Clear all records from the map.
* For unit tests.
*
* @throws EntryLogMetadataMapException
*/
void clear() throws EntryLogMetadataMapException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -68,6 +69,7 @@
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.BiConsumerLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -451,38 +453,25 @@ public BufferedReadChannel getFromChannels(long logId) {
return logid2Channel.get().get(logId);
}

/**
* Get the least unflushed log id. Garbage collector thread should not process
* unflushed entry log file.
*
* @return least unflushed log id.
*/
@Override
public long getLeastUnflushedLogId() {
@VisibleForTesting
long getLeastUnflushedLogId() {
return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
}

/**
* Get the last log id created so far. If entryLogPerLedger is enabled, the Garbage Collector
* process needs to look beyond the least unflushed entry log file, as there may be entry logs
* ready to be garbage collected.
*
* @return last entry log id created.
*/
@Override
public long getLastLogId() {
return recentlyCreatedEntryLogsStatus.getLastLogId();
}

/**
* Returns whether the current log id exists and has been rotated already.
*
* @param entryLogId EntryLog id to check.
* @return Whether the given entryLogId exists and has been rotated.
*/
@Override
public boolean isFlushedEntryLog(Long entryLogId) {
return recentlyCreatedEntryLogsStatus.isFlushedEntryLog(entryLogId);
public Set<Long> getFlushedLogIds() {
Set<Long> logIds = new HashSet<>();
synchronized (recentlyCreatedEntryLogsStatus) {
for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
for (File f : dir.listFiles(file -> file.getName().endsWith(".log"))) {
long logId = fileName2LogId(f.getName());
if (recentlyCreatedEntryLogsStatus.isFlushedLogId(logId)) {
logIds.add(logId);
}
}
}
}
return logIds;
}

long getPreviousAllocatedEntryLogId() {
Expand Down Expand Up @@ -1269,13 +1258,8 @@ synchronized long getLeastUnflushedLogId() {
return leastUnflushedLogId;
}

synchronized long getLastLogId() {
return !entryLogsStatusMap.isEmpty() ? entryLogsStatusMap.lastKey() : 0;
}

synchronized boolean isFlushedEntryLog(Long entryLogId) {
return entryLogsStatusMap.containsKey(entryLogId) && entryLogsStatusMap.get(entryLogId)
|| entryLogId < leastUnflushedLogId;
synchronized boolean isFlushedLogId(long entryLogId) {
return entryLogsStatusMap.getOrDefault(entryLogId, Boolean.FALSE) || entryLogId < leastUnflushedLogId;
}
}

Expand Down Expand Up @@ -1351,7 +1335,7 @@ public void makeAvailable() throws IOException {
}
}
@Override
public void cleanup() {
public void finalizeAndCleanup() {
if (compactedLogFile.exists()) {
if (!compactedLogFile.delete()) {
LOG.warn("Could not delete file: " + compactedLogFile);
Expand All @@ -1365,11 +1349,11 @@ public void cleanup() {
}

@Override
public long getLogId() {
public long getDstLogId() {
return compactionLogId;
}
@Override
public long getCompactedLogId() {
public long getSrcLogId() {
return logIdToCompact;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import lombok.Getter;

Expand Down Expand Up @@ -111,9 +109,6 @@ public class GarbageCollectorThread extends SafeRunnable {

volatile boolean running = true;

// track the last scanned successfully log id
long scannedLogId = 0;

// Boolean to trigger a forced GC.
final AtomicBoolean forceGarbageCollection = new AtomicBoolean(false);
// Boolean to disable major compaction, when disk is almost full
Expand All @@ -139,8 +134,9 @@ public class GarbageCollectorThread extends SafeRunnable {
public GarbageCollectorThread(ServerConfiguration conf, LedgerManager ledgerManager,
final LedgerDirsManager ledgerDirsManager,
final CompactableLedgerStorage ledgerStorage,
EntryLoggerIface entryLogger,
StatsLogger statsLogger) throws IOException {
this(conf, ledgerManager, ledgerDirsManager, ledgerStorage, statsLogger,
this(conf, ledgerManager, ledgerDirsManager, ledgerStorage, entryLogger, statsLogger,
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("GarbageCollectorThread")));
}

Expand All @@ -155,14 +151,15 @@ public GarbageCollectorThread(ServerConfiguration conf,
LedgerManager ledgerManager,
final LedgerDirsManager ledgerDirsManager,
final CompactableLedgerStorage ledgerStorage,
EntryLoggerIface entryLogger,
StatsLogger statsLogger,
ScheduledExecutorService gcExecutor)
throws IOException {
this.gcExecutor = gcExecutor;
this.conf = conf;

this.ledgerDirsManager = ledgerDirsManager;
this.entryLogger = ledgerStorage.getEntryLogger();
this.entryLogger = entryLogger;
this.entryLogMetaMap = createEntryLogMetadataMap();
this.ledgerStorage = ledgerStorage;
this.gcWaitTime = conf.getGcWaitTime();
Expand Down Expand Up @@ -214,8 +211,7 @@ public void removeEntryLog(long logToRemove) {
}
};
if (conf.getUseTransactionalCompaction()) {
this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage,
ledgerDirsManager, remover);
this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
} else {
this.compactor = new EntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
}
Expand Down Expand Up @@ -669,15 +665,7 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) {
* @throws EntryLogMetadataMapException
*/
protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException {
// Entry Log ID's are just a long value that starts at 0 and increments by 1 when the log fills up and we roll
// to a new one. We scan entry logs as follows:
// - entryLogPerLedgerEnabled is false: Extract it for every entry log except for the current one (un-flushed).
// - entryLogPerLedgerEnabled is true: Scan all flushed entry logs up to the highest known id.
Supplier<Long> finalEntryLog = () -> conf.isEntryLogPerLedgerEnabled() ? entryLogger.getLastLogId() :
entryLogger.getLeastUnflushedLogId();
boolean hasExceptionWhenScan = false;
boolean increaseScannedLogId = true;
for (long entryLogId = scannedLogId; entryLogId < finalEntryLog.get(); entryLogId++) {
for (long entryLogId : entryLogger.getFlushedLogIds()) {
// Comb the current entry log file if it has not already been extracted.
if (entryLogMetaMap.containsKey(entryLogId)) {
continue;
Expand All @@ -689,15 +677,6 @@ protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException {
continue;
}

// If entryLogPerLedgerEnabled is true, we will look for entry log files beyond getLeastUnflushedLogId()
// that have been explicitly rotated or below getLeastUnflushedLogId().
if (conf.isEntryLogPerLedgerEnabled() && !entryLogger.isFlushedEntryLog(entryLogId)) {
LOG.info("Entry log {} not flushed (entryLogPerLedgerEnabled). Starting next iteration at this point.",
entryLogId);
increaseScannedLogId = false;
continue;
}

LOG.info("Extracting entry log meta from entryLogId: {}", entryLogId);

try {
Expand All @@ -713,18 +692,10 @@ protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException {
entryLogMetaMap.put(entryLogId, entryLogMeta);
}
} catch (IOException e) {
hasExceptionWhenScan = true;
LOG.warn("Premature exception when processing " + entryLogId
+ " recovery will take care of the problem", e);
}

// if scan failed on some entry log, we don't move 'scannedLogId' to next id
// if scan succeed, we don't need to scan it again during next gc run,
// we move 'scannedLogId' to next id (unless entryLogPerLedgerEnabled is true
// and we have found and un-flushed entry log already).
if (!hasExceptionWhenScan && (!conf.isEntryLogPerLedgerEnabled() || increaseScannedLogId)) {
++scannedLogId;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public int size() {
return entryLogMetaMap.size();
}

@Override
public boolean isEmpty() {
return entryLogMetaMap.isEmpty();
}

@Override
public void clear() {
entryLogMetaMap.clear();
}

@Override
public void close() throws IOException {
entryLogMetaMap.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.storage.EntryLoggerIface;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
Expand Down Expand Up @@ -196,7 +195,7 @@ public void initializeWithEntryLogger(ServerConfiguration conf,
ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager,
this, statsLogger.scope("gc"));
this, entryLogger, statsLogger.scope("gc"));
pageSize = conf.getPageSize();
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
// Expose Stats
Expand Down Expand Up @@ -514,8 +513,7 @@ public void flushEntriesLocationsIndex() throws IOException {
ledgerCache.flushLedger(true);
}

@Override
public EntryLoggerIface getEntryLogger() {
public EntryLogger getEntryLogger() {
return entryLogger;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.storage.EntryLoggerIface;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
Expand Down Expand Up @@ -341,8 +340,7 @@ BookieStateManager getStateManager(){
return (BookieStateManager) stateManager;
}

@Override
public EntryLoggerIface getEntryLogger() {
public EntryLogger getEntryLogger() {
return interleavedLedgerStorage.getEntryLogger();
}

Expand Down
Loading

0 comments on commit 853dd84

Please sign in to comment.