Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BP-47 (task5): Garbage collection support direct IO entrylogger #3256

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bookkeeper-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ dependencies {
implementation project(':circe-checksum')
implementation project(':cpu-affinity')
implementation project(':stats:bookkeeper-stats-api')
implementation project(':bookkeeper-slogger:api')
implementation project(':bookkeeper-slogger:slf4j')
implementation project(':bookkeeper-stats')

compileOnly depLibs.lombok
compileOnly depLibs.spotbugsAnnotations
Expand Down
10 changes: 10 additions & 0 deletions bookkeeper-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@
<artifactId>bookkeeper-proto</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-slogger-slf4j</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-slogger-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-tools-framework</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,11 @@
package org.apache.bookkeeper.bookie;

import java.io.IOException;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
/**
* Interface that identifies LedgerStorage implementations using EntryLogger and running periodic entries compaction.
*/
public interface CompactableLedgerStorage extends LedgerStorage {

/**
* @return the EntryLogger used by the ledger storage
*/
EntryLogger getEntryLogger();

/**
* Get an iterator over a range of ledger ids stored in the bookie.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -68,6 +69,7 @@
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.BiConsumerLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -448,38 +450,30 @@ public BufferedReadChannel getFromChannels(long logId) {
return logid2Channel.get().get(logId);
}

/**
* Get the least unflushed log id. Garbage collector thread should not process
* unflushed entry log file.
*
* @return least unflushed log id.
*/
@Override
public long getLeastUnflushedLogId() {
@VisibleForTesting
long getLeastUnflushedLogId() {
return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
}

/**
* Get the last log id created so far. If entryLogPerLedger is enabled, the Garbage Collector
* process needs to look beyond the least unflushed entry log file, as there may be entry logs
* ready to be garbage collected.
*
* @return last entry log id created.
*/
@Override
public long getLastLogId() {
return recentlyCreatedEntryLogsStatus.getLastLogId();
}

/**
* Returns whether the current log id exists and has been rotated already.
*
* @param entryLogId EntryLog id to check.
* @return Whether the given entryLogId exists and has been rotated.
*/
@Override
public boolean isFlushedEntryLog(Long entryLogId) {
return recentlyCreatedEntryLogsStatus.isFlushedEntryLog(entryLogId);
public Set<Long> getFlushedLogIds() {
Set<Long> logIds = new HashSet<>();
synchronized (recentlyCreatedEntryLogsStatus) {
for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
if (dir.exists() && dir.isDirectory()) {
File[] files = dir.listFiles(file -> file.getName().endsWith(".log"));
if (files != null && files.length > 0) {
for (File f : files) {
long logId = fileName2LogId(f.getName());
if (recentlyCreatedEntryLogsStatus.isFlushedLogId(logId)) {
logIds.add(logId);
}
}
}
}
}
}
return logIds;
}

long getPreviousAllocatedEntryLogId() {
Expand Down Expand Up @@ -1249,7 +1243,7 @@ static class RecentEntryLogsStatus {
private long leastUnflushedLogId;

RecentEntryLogsStatus(long leastUnflushedLogId) {
entryLogsStatusMap = new TreeMap<Long, Boolean>();
entryLogsStatusMap = new TreeMap<>();
this.leastUnflushedLogId = leastUnflushedLogId;
}

Expand All @@ -1270,13 +1264,8 @@ synchronized long getLeastUnflushedLogId() {
return leastUnflushedLogId;
}

synchronized long getLastLogId() {
return !entryLogsStatusMap.isEmpty() ? entryLogsStatusMap.lastKey() : 0;
}

synchronized boolean isFlushedEntryLog(Long entryLogId) {
return entryLogsStatusMap.containsKey(entryLogId) && entryLogsStatusMap.get(entryLogId)
|| entryLogId < leastUnflushedLogId;
synchronized boolean isFlushedLogId(long entryLogId) {
return entryLogsStatusMap.getOrDefault(entryLogId, Boolean.FALSE) || entryLogId < leastUnflushedLogId;
}
}

Expand Down Expand Up @@ -1354,7 +1343,7 @@ public void makeAvailable() throws IOException {
}
}
@Override
public void cleanup() {
public void finalizeAndCleanup() {
if (compactedLogFile.exists()) {
if (!compactedLogFile.delete()) {
LOG.warn("Could not delete file: {}", compactedLogFile);
Expand All @@ -1368,11 +1357,11 @@ public void cleanup() {
}

@Override
public long getLogId() {
public long getDstLogId() {
return compactionLogId;
}
@Override
public long getCompactedLogId() {
public long getSrcLogId() {
return logIdToCompact;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
package org.apache.bookkeeper.bookie;

import java.io.Closeable;
import java.io.IOException;
import java.util.function.BiConsumer;

import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
Expand All @@ -37,7 +36,7 @@ public interface EntryLogMetadataMap extends Closeable {
*
* @param entryLogId
* @return
* @throws IOException
* @throws EntryLogMetadataMapException
*/
boolean containsKey(long entryLogId) throws EntryLogMetadataMapException;

Expand All @@ -46,7 +45,7 @@ public interface EntryLogMetadataMap extends Closeable {
*
* @param entryLogId
* @param entryLogMeta
* @throws IOException
* @throws EntryLogMetadataMapException
*/
void put(long entryLogId, EntryLogMetadata entryLogMeta) throws EntryLogMetadataMapException;

Expand All @@ -55,24 +54,40 @@ public interface EntryLogMetadataMap extends Closeable {
* have been processed or the action throws an exception.
*
* @param action
* @throws IOException
* @throws EntryLogMetadataMapException
*/
void forEach(BiConsumer<Long, EntryLogMetadata> action) throws EntryLogMetadataMapException;

/**
* Removes entryLogMetadata record from the map.
*
* @param entryLogId
* @throws IOException
* @throws EntryLogMetadataMapException
*/
void remove(long entryLogId) throws EntryLogMetadataMapException;

/**
* Returns number of entryLogMetadata records presents into the map.
*
* @return
* @throws IOException
* @throws EntryLogMetadataMapException
*/
int size() throws EntryLogMetadataMapException;

/**
* Returns true if there are no elements in the map.
*
* @return
*/
default boolean isEmpty() throws EntryLogMetadataMapException {
return size() == 0;
}

/**
* Clear all records from the map.
* For unit tests.
*
* @throws EntryLogMetadataMapException
*/
void clear() throws EntryLogMetadataMapException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import lombok.Getter;

Expand Down Expand Up @@ -111,9 +109,6 @@ public class GarbageCollectorThread extends SafeRunnable {

volatile boolean running = true;

// track the last scanned successfully log id
long scannedLogId = 0;

// Boolean to trigger a forced GC.
final AtomicBoolean forceGarbageCollection = new AtomicBoolean(false);
// Boolean to disable major compaction, when disk is almost full
Expand All @@ -139,8 +134,9 @@ public class GarbageCollectorThread extends SafeRunnable {
public GarbageCollectorThread(ServerConfiguration conf, LedgerManager ledgerManager,
final LedgerDirsManager ledgerDirsManager,
final CompactableLedgerStorage ledgerStorage,
EntryLogger entryLogger,
StatsLogger statsLogger) throws IOException {
this(conf, ledgerManager, ledgerDirsManager, ledgerStorage, statsLogger,
this(conf, ledgerManager, ledgerDirsManager, ledgerStorage, entryLogger, statsLogger,
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("GarbageCollectorThread")));
}

Expand All @@ -155,14 +151,15 @@ public GarbageCollectorThread(ServerConfiguration conf,
LedgerManager ledgerManager,
final LedgerDirsManager ledgerDirsManager,
final CompactableLedgerStorage ledgerStorage,
EntryLogger entryLogger,
StatsLogger statsLogger,
ScheduledExecutorService gcExecutor)
throws IOException {
this.gcExecutor = gcExecutor;
this.conf = conf;

this.ledgerDirsManager = ledgerDirsManager;
this.entryLogger = ledgerStorage.getEntryLogger();
this.entryLogger = entryLogger;
this.entryLogMetaMap = createEntryLogMetadataMap();
this.ledgerStorage = ledgerStorage;
this.gcWaitTime = conf.getGcWaitTime();
Expand Down Expand Up @@ -214,8 +211,7 @@ public void removeEntryLog(long logToRemove) {
}
};
if (conf.getUseTransactionalCompaction()) {
this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage,
ledgerDirsManager, remover);
this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
} else {
this.compactor = new EntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
}
Expand Down Expand Up @@ -678,15 +674,7 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) {
* @throws EntryLogMetadataMapException
*/
protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException {
// Entry Log ID's are just a long value that starts at 0 and increments by 1 when the log fills up and we roll
// to a new one. We scan entry logs as follows:
// - entryLogPerLedgerEnabled is false: Extract it for every entry log except for the current one (un-flushed).
// - entryLogPerLedgerEnabled is true: Scan all flushed entry logs up to the highest known id.
Supplier<Long> finalEntryLog = () -> conf.isEntryLogPerLedgerEnabled() ? entryLogger.getLastLogId() :
entryLogger.getLeastUnflushedLogId();
boolean hasExceptionWhenScan = false;
boolean increaseScannedLogId = true;
for (long entryLogId = scannedLogId; entryLogId < finalEntryLog.get(); entryLogId++) {
for (long entryLogId : entryLogger.getFlushedLogIds()) {
// Comb the current entry log file if it has not already been extracted.
if (entryLogMetaMap.containsKey(entryLogId)) {
continue;
Expand All @@ -698,15 +686,6 @@ protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException {
continue;
}

// If entryLogPerLedgerEnabled is true, we will look for entry log files beyond getLeastUnflushedLogId()
// that have been explicitly rotated or below getLeastUnflushedLogId().
if (conf.isEntryLogPerLedgerEnabled() && !entryLogger.isFlushedEntryLog(entryLogId)) {
LOG.info("Entry log {} not flushed (entryLogPerLedgerEnabled). Starting next iteration at this point.",
entryLogId);
increaseScannedLogId = false;
continue;
}

LOG.info("Extracting entry log meta from entryLogId: {}", entryLogId);

try {
Expand All @@ -722,18 +701,9 @@ protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException {
entryLogMetaMap.put(entryLogId, entryLogMeta);
}
} catch (IOException e) {
hasExceptionWhenScan = true;
LOG.warn("Premature exception when processing " + entryLogId
+ " recovery will take care of the problem", e);
}

// if scan failed on some entry log, we don't move 'scannedLogId' to next id
// if scan succeed, we don't need to scan it again during next gc run,
// we move 'scannedLogId' to next id (unless entryLogPerLedgerEnabled is true
// and we have found and un-flushed entry log already).
if (!hasExceptionWhenScan && (!conf.isEntryLogPerLedgerEnabled() || increaseScannedLogId)) {
++scannedLogId;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public int size() {
return entryLogMetaMap.size();
}

@Override
public boolean isEmpty() {
return entryLogMetaMap.isEmpty();
}

@Override
public void clear() {
entryLogMetaMap.clear();
}

@Override
public void close() throws IOException {
entryLogMetaMap.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void initializeWithEntryLogger(ServerConfiguration conf,
ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager,
this, statsLogger.scope("gc"));
this, entryLogger, statsLogger.scope("gc"));
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
// Expose Stats
getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
Expand Down Expand Up @@ -512,8 +512,7 @@ public void flushEntriesLocationsIndex() throws IOException {
ledgerCache.flushLedger(true);
}

@Override
public EntryLogger getEntryLogger() {
public DefaultEntryLogger getEntryLogger() {
return entryLogger;
}

Expand Down
Loading