Skip to content

Commit

Permalink
Improved DirectEntryLogger reader caching
Browse files Browse the repository at this point in the history
The perThreadBufferSize was being miscalculated which caused reader eviction problems.
This change fixes that issue and also reduces the concurrency of the reader cache to 1
which avoids further reader eviction issues. When concurrency is left at the default
of 4 then the cache is composed of 4 segments and each segment limits its own weight to
that of  maximumWeight / concurrencyLevel which causes readers to be evicted before
reaching the maximumWeight, especially in low memory environments.

Additionally extra metrics to measure the rate of reader creation and closing has been added.
  • Loading branch information
Jack Vanlightly authored and mauricebarnum committed Dec 16, 2021
1 parent 7e453ab commit ab84dda
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public class DirectEntryLogger implements EntryLoggerIface {
private final ByteBufAllocator allocator;
private final BufferPool writeBuffers;
private final int readBufferSize;
private final int maxFdCacheTimeSeconds;
private final int maxSaneEntrySize;
private final Set<Integer> unflushedLogs;

Expand Down Expand Up @@ -112,7 +111,6 @@ public DirectEntryLogger(File ledgerDir,
this.maxFileSize = maxFileSize;
this.maxSaneEntrySize = maxSaneEntrySize;
this.readBufferSize = Buffer.nextAlignment(readBufferSize);
this.maxFdCacheTimeSeconds = maxFdCacheTimeSeconds;
this.ids = ids;
this.slog = slogParent.kv("directory", ledgerDir).ctx();

Expand All @@ -123,23 +121,42 @@ public DirectEntryLogger(File ledgerDir,
int singleWriteBufferSize = Buffer.nextAlignment((int) (totalWriteBufferSize / NUMBER_OF_WRITE_BUFFERS));
this.writeBuffers = new BufferPool(nativeIO, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS);

// The total read buffer memory needs to get split across all the read threads, since the caches
// are thread-specific and we want to ensure we don't pass the total memory limit.
long perThreadBufferSize = totalReadBufferSize / numReadThreads;

// if the amount of total read buffer size is too low, and/or the number of read threads is too high
// then the perThreadBufferSize can be lower than the readBufferSize causing immediate eviction of readers
// from the cache
if (perThreadBufferSize < readBufferSize) {
slog.kv("reason", "perThreadBufferSize lower than readBufferSize (causes immediate reader cache eviction)")
.kv("totalReadBufferSize", totalReadBufferSize)
.kv("totalNumReadThreads", numReadThreads)
.kv("readBufferSize", readBufferSize)
.kv("perThreadBufferSize", perThreadBufferSize)
.error(Events.ENTRYLOGGER_MISCONFIGURED);
}

long maxCachedReadersPerThread = perThreadBufferSize / readBufferSize;
long maxCachedReaders = maxCachedReadersPerThread * numReadThreads;

this.slog
.kv("maxFileSize", maxFileSize)
.kv("maxSaneEntrySize", maxSaneEntrySize)
.kv("totalWriteBufferSize", totalWriteBufferSize)
.kv("singleWriteBufferSize", singleWriteBufferSize)
.kv("totalReadBufferSize", totalReadBufferSize)
.kv("readBufferSize", readBufferSize)
.kv("perThreadBufferSize", perThreadBufferSize)
.kv("maxCachedReadersPerThread", maxCachedReadersPerThread)
.kv("maxCachedReaders", maxCachedReaders)
.info(Events.ENTRYLOGGER_CREATED);

// The total read buffer memory needs to get split across all the read threads, since the caches
// are thread-specific and we want to ensure we don't pass the total memory limit.
long perThreadBufferSize = totalReadBufferSize / numReadThreads;

this.caches = ThreadLocal.withInitial(() -> {
RemovalListener<Integer, LogReader> rl = (notification) -> {
try {
notification.getValue().close();
this.stats.getCloseReaderCounter().inc();
} catch (IOException ioe) {
slog.kv("logID", notification.getKey()).error(Events.READER_CLOSE_ERROR);
}
Expand All @@ -148,7 +165,8 @@ public DirectEntryLogger(File ledgerDir,
.maximumWeight(perThreadBufferSize)
.weigher((key, value) -> readBufferSize)
.removalListener(rl)
.expireAfterWrite(maxFdCacheTimeSeconds, TimeUnit.SECONDS)
.expireAfterAccess(maxFdCacheTimeSeconds, TimeUnit.SECONDS)
.concurrencyLevel(1) // important to avoid too aggressive eviction
.build();
allCaches.add(cache);
return cache;
Expand Down Expand Up @@ -197,7 +215,20 @@ public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
private LogReader getReader(int logId) throws IOException {
Cache<Integer, LogReader> cache = caches.get();
try {
return cache.get(logId, () -> newDirectReader(logId));
LogReader reader = cache.get(logId, () -> {
this.stats.getOpenReaderCounter().inc();
return newDirectReader(logId);
});

// it is possible though unlikely, that the cache has already cleaned up this cache entry
// during the get operation. This is more likely to happen when there is great demand
// for many separate readers in a low memory environment.
if (reader.isClosed()) {
this.stats.getCachedReadersServedClosedCounter().inc();
throw new IOException(exMsg("Cached reader already closed").kv("logId", logId).toString());
}

return reader;
} catch (ExecutionException ee) {
if (ee.getCause() instanceof IOException) {
throw (IOException) ee.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;

import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
Expand All @@ -39,6 +40,9 @@ class DirectEntryLoggerStats {
private static final String FLUSH = "entrylog-flush";
private static final String WRITER_FLUSH = "entrylog-writer-flush";
private static final String READ_BLOCK = "entrylog-read-block";
private static final String READER_OPEN = "entrylog-open-reader";
private static final String READER_CLOSE = "entrylog-close-reader";
private static final String CACHED_READER_SERVED_CLOSED = "entrylog-cached-reader-closed";

@StatsDoc(
name = ADD_ENTRY,
Expand Down Expand Up @@ -72,6 +76,24 @@ class DirectEntryLoggerStats {
)
private final ThreadLocal<OpStatsLogger> readBlockStats;

@StatsDoc(
name = READER_OPEN,
help = "Stats for reader open operations"
)
private final ThreadLocal<Counter> openReaderStats;

@StatsDoc(
name = READER_CLOSE,
help = "Stats for reader close operations"
)
private final ThreadLocal<Counter> closeReaderStats;

@StatsDoc(
name = CACHED_READER_SERVED_CLOSED,
help = "Stats for cached readers being served closed"
)
private final ThreadLocal<Counter> cachedReadersServedClosed;

DirectEntryLoggerStats(StatsLogger stats) {
addEntryStats = stats.getOpStatsLogger(ADD_ENTRY);

Expand All @@ -92,6 +114,30 @@ public OpStatsLogger initialValue() {
.getOpStatsLogger(READ_BLOCK);
}
};

openReaderStats = new ThreadLocal<Counter>() {
@Override
public Counter initialValue() {
return stats.scopeLabel("thread", String.valueOf(Thread.currentThread().getId()))
.getCounter(READER_OPEN);
}
};

closeReaderStats = new ThreadLocal<Counter>() {
@Override
public Counter initialValue() {
return stats.scopeLabel("thread", String.valueOf(Thread.currentThread().getId()))
.getCounter(READER_CLOSE);
}
};

cachedReadersServedClosed = new ThreadLocal<Counter>() {
@Override
public Counter initialValue() {
return stats.scopeLabel("thread", String.valueOf(Thread.currentThread().getId()))
.getCounter(CACHED_READER_SERVED_CLOSED);
}
};
}

OpStatsLogger getAddEntryStats() {
Expand All @@ -107,11 +153,22 @@ OpStatsLogger getWriterFlushStats() {
}

OpStatsLogger getReadEntryStats() {
return readEntryStats.get();

return readEntryStats.get();
}

OpStatsLogger getReadBlockStats() {
return readBlockStats.get();
return readBlockStats.get();
}

Counter getOpenReaderCounter() {
return openReaderStats.get();
}

Counter getCloseReaderCounter() {
return closeReaderStats.get();
}

Counter getCachedReadersServedClosedCounter() {
return cachedReadersServedClosed.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class DirectReader implements LogReader {
private long currentBlock = -1;
private long currentBlockEnd = -1;
private long maxOffset;
private boolean closed;

DirectReader(int logId, String filename, ByteBufAllocator allocator,
NativeIO nativeIO, int bufferSize,
Expand All @@ -57,6 +58,7 @@ class DirectReader implements LogReader {
this.readBlockStats = readBlockStats;

nativeBuffer = new Buffer(nativeIO, bufferSize);
closed = false;

try {
fd = nativeIO.open(filename,
Expand Down Expand Up @@ -257,13 +259,19 @@ public void close() throws IOException {
try {
int ret = nativeIO.close(fd);
checkState(ret == 0, "Close should throw exception on non-zero return (%d)", ret);
closed = true;
} catch (NativeIOException ne) {
throw new IOException(exMsg(ne.getMessage())
.kv("file", filename)
.kv("errno", ne.getErrno()).toString());
}
}

@Override
public boolean isClosed() {
return closed;
}

@Override
public long maxOffset() {
return maxOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public enum Events {
*/
ENTRYLOGGER_CREATED,

/**
* The entrylogger has been configured in a way that will likely result in errors during operation.
*/
ENTRYLOGGER_MISCONFIGURED,

/**
* The entrylogger has started writing the a new log file. The previous log file may not
* be entirely flushed when this is called, though they will be after an explicit flush call.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,6 @@ public interface LogReader extends AutoCloseable {

@Override
void close() throws IOException;

boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public class DbLedgerStorage implements LedgerStorage {
/ MB;
private static final long DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB = 8;

private static final int DEFAULT_DIRECT_IO_MAX_READ_FDS_PER_THREAD = 200;
private static final int DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS = 300;

private int numberOfDirs;
Expand Down Expand Up @@ -193,10 +192,6 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
conf,
DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB);
int perDirectoryMaxReadFds = (int) getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_MAX_READ_FDS_PER_THREAD,
DEFAULT_DIRECT_IO_MAX_READ_FDS_PER_THREAD) / numberOfDirs;
int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS,
Expand All @@ -211,6 +206,7 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
if (numReadThreads == 0) {
numReadThreads = conf.getServerNumIOThreads();
}

entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ledgerDirsManager, slog),
new NativeIOImpl(),
allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor,
Expand All @@ -219,7 +215,8 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
perDirectoryTotalWriteBufferSize,
perDirectoryTotalReadBufferSize,
readBufferSize,
perDirectoryMaxReadFds, maxFdCacheTimeSeconds,
numReadThreads,
maxFdCacheTimeSeconds,
slog, statsLogger);
} else {
entrylogger = new EntryLogger(conf, ldm, null, statsLogger, allocator);
Expand Down

0 comments on commit ab84dda

Please sign in to comment.