Skip to content

Commit

Permalink
Auto-configure direct-io buffer size based on available memory
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored and mauricebarnum committed Jan 10, 2022
1 parent 4eddb6b commit 13d98ae
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.EOFException;
Expand All @@ -47,7 +48,6 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.stream.Collectors;

import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
import org.apache.bookkeeper.bookie.EntryLogMetadata;
import org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
Expand All @@ -74,8 +74,6 @@ public class DirectEntryLogger implements EntryLoggerIface {
private final ByteBufAllocator allocator;
private final BufferPool writeBuffers;
private final int readBufferSize;
private final int maxOpenReadFilesPerThread;
private final int maxFdCacheTimeSeconds;
private final int maxSaneEntrySize;
private final Set<Integer> unflushedLogs;

Expand All @@ -84,25 +82,9 @@ public class DirectEntryLogger implements EntryLoggerIface {
private List<Future<?>> pendingFlushes;
private final NativeIO nativeIO;
private final List<Cache<?, ?>> allCaches = new CopyOnWriteArrayList<>();
private final ThreadLocal<Cache<Integer, LogReader>> caches = new ThreadLocal<Cache<Integer, LogReader>>() {
@Override
public Cache<Integer, LogReader> initialValue() {
RemovalListener<Integer, LogReader> rl = (notification) -> {
try {
notification.getValue().close();
} catch (IOException ioe) {
slog.kv("logID", notification.getKey()).error(Events.READER_CLOSE_ERROR);
}
};
Cache<Integer, LogReader> cache = CacheBuilder.newBuilder()
.maximumSize(maxOpenReadFilesPerThread)
.removalListener(rl)
.expireAfterWrite(maxFdCacheTimeSeconds, TimeUnit.SECONDS)
.build();
allCaches.add(cache);
return cache;
}
};
private final ThreadLocal<Cache<Integer, LogReader>> caches;

private static final int NUMBER_OF_WRITE_BUFFERS = 8;

public DirectEntryLogger(File ledgerDir,
EntryLogIds ids,
Expand All @@ -112,9 +94,10 @@ public DirectEntryLogger(File ledgerDir,
ExecutorService flushExecutor,
long maxFileSize,
int maxSaneEntrySize,
int writeBufferSize,
long totalWriteBufferSize,
long totalReadBufferSize,
int readBufferSize,
int maxOpenReadFilesPerThread,
int numReadThreads,
int maxFdCacheTimeSeconds,
Slogger slogParent,
StatsLogger stats) throws IOException {
Expand All @@ -127,23 +110,67 @@ public DirectEntryLogger(File ledgerDir,

this.maxFileSize = maxFileSize;
this.maxSaneEntrySize = maxSaneEntrySize;
this.readBufferSize = readBufferSize;
this.maxOpenReadFilesPerThread = maxOpenReadFilesPerThread;
this.maxFdCacheTimeSeconds = maxFdCacheTimeSeconds;
this.readBufferSize = Buffer.nextAlignment(readBufferSize);
this.ids = ids;
this.slog = slogParent.kv("directory", ledgerDir).ctx();

this.stats = new DirectEntryLoggerStats(stats);

this.allocator = allocator;
this.writeBuffers = new BufferPool(nativeIO, writeBufferSize, 8);

int singleWriteBufferSize = Buffer.nextAlignment((int) (totalWriteBufferSize / NUMBER_OF_WRITE_BUFFERS));
this.writeBuffers = new BufferPool(nativeIO, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS);

// The total read buffer memory needs to get split across all the read threads, since the caches
// are thread-specific and we want to ensure we don't pass the total memory limit.
long perThreadBufferSize = totalReadBufferSize / numReadThreads;

// if the amount of total read buffer size is too low, and/or the number of read threads is too high
// then the perThreadBufferSize can be lower than the readBufferSize causing immediate eviction of readers
// from the cache
if (perThreadBufferSize < readBufferSize) {
slog.kv("reason", "perThreadBufferSize lower than readBufferSize (causes immediate reader cache eviction)")
.kv("totalReadBufferSize", totalReadBufferSize)
.kv("totalNumReadThreads", numReadThreads)
.kv("readBufferSize", readBufferSize)
.kv("perThreadBufferSize", perThreadBufferSize)
.error(Events.ENTRYLOGGER_MISCONFIGURED);
}

long maxCachedReadersPerThread = perThreadBufferSize / readBufferSize;
long maxCachedReaders = maxCachedReadersPerThread * numReadThreads;

this.slog
.kv("maxFileSize", maxFileSize)
.kv("maxSaneEntrySize", maxSaneEntrySize)
.kv("writeBufferSize", writeBufferSize)
.kv("totalWriteBufferSize", totalWriteBufferSize)
.kv("singleWriteBufferSize", singleWriteBufferSize)
.kv("totalReadBufferSize", totalReadBufferSize)
.kv("readBufferSize", readBufferSize)
.kv("perThreadBufferSize", perThreadBufferSize)
.kv("maxCachedReadersPerThread", maxCachedReadersPerThread)
.kv("maxCachedReaders", maxCachedReaders)
.info(Events.ENTRYLOGGER_CREATED);

this.caches = ThreadLocal.withInitial(() -> {
RemovalListener<Integer, LogReader> rl = (notification) -> {
try {
notification.getValue().close();
this.stats.getCloseReaderCounter().inc();
} catch (IOException ioe) {
slog.kv("logID", notification.getKey()).error(Events.READER_CLOSE_ERROR);
}
};
Cache<Integer, LogReader> cache = CacheBuilder.newBuilder()
.maximumWeight(perThreadBufferSize)
.weigher((key, value) -> readBufferSize)
.removalListener(rl)
.expireAfterAccess(maxFdCacheTimeSeconds, TimeUnit.SECONDS)
.concurrencyLevel(1) // important to avoid too aggressive eviction
.build();
allCaches.add(cache);
return cache;
});
}

@Override
Expand Down Expand Up @@ -188,7 +215,20 @@ public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
private LogReader getReader(int logId) throws IOException {
Cache<Integer, LogReader> cache = caches.get();
try {
return cache.get(logId, () -> newDirectReader(logId));
LogReader reader = cache.get(logId, () -> {
this.stats.getOpenReaderCounter().inc();
return newDirectReader(logId);
});

// it is possible though unlikely, that the cache has already cleaned up this cache entry
// during the get operation. This is more likely to happen when there is great demand
// for many separate readers in a low memory environment.
if (reader.isClosed()) {
this.stats.getCachedReadersServedClosedCounter().inc();
throw new IOException(exMsg("Cached reader already closed").kv("logId", logId).toString());
}

return reader;
} catch (ExecutionException ee) {
if (ee.getCause() instanceof IOException) {
throw (IOException) ee.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;

import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
Expand All @@ -39,6 +40,9 @@ class DirectEntryLoggerStats {
private static final String FLUSH = "entrylog-flush";
private static final String WRITER_FLUSH = "entrylog-writer-flush";
private static final String READ_BLOCK = "entrylog-read-block";
private static final String READER_OPEN = "entrylog-open-reader";
private static final String READER_CLOSE = "entrylog-close-reader";
private static final String CACHED_READER_SERVED_CLOSED = "entrylog-cached-reader-closed";

@StatsDoc(
name = ADD_ENTRY,
Expand Down Expand Up @@ -72,6 +76,24 @@ class DirectEntryLoggerStats {
)
private final ThreadLocal<OpStatsLogger> readBlockStats;

@StatsDoc(
name = READER_OPEN,
help = "Stats for reader open operations"
)
private final ThreadLocal<Counter> openReaderStats;

@StatsDoc(
name = READER_CLOSE,
help = "Stats for reader close operations"
)
private final ThreadLocal<Counter> closeReaderStats;

@StatsDoc(
name = CACHED_READER_SERVED_CLOSED,
help = "Stats for cached readers being served closed"
)
private final ThreadLocal<Counter> cachedReadersServedClosed;

DirectEntryLoggerStats(StatsLogger stats) {
addEntryStats = stats.getOpStatsLogger(ADD_ENTRY);

Expand All @@ -92,6 +114,30 @@ public OpStatsLogger initialValue() {
.getOpStatsLogger(READ_BLOCK);
}
};

openReaderStats = new ThreadLocal<Counter>() {
@Override
public Counter initialValue() {
return stats.scopeLabel("thread", String.valueOf(Thread.currentThread().getId()))
.getCounter(READER_OPEN);
}
};

closeReaderStats = new ThreadLocal<Counter>() {
@Override
public Counter initialValue() {
return stats.scopeLabel("thread", String.valueOf(Thread.currentThread().getId()))
.getCounter(READER_CLOSE);
}
};

cachedReadersServedClosed = new ThreadLocal<Counter>() {
@Override
public Counter initialValue() {
return stats.scopeLabel("thread", String.valueOf(Thread.currentThread().getId()))
.getCounter(CACHED_READER_SERVED_CLOSED);
}
};
}

OpStatsLogger getAddEntryStats() {
Expand All @@ -107,11 +153,22 @@ OpStatsLogger getWriterFlushStats() {
}

OpStatsLogger getReadEntryStats() {
return readEntryStats.get();

return readEntryStats.get();
}

OpStatsLogger getReadBlockStats() {
return readBlockStats.get();
return readBlockStats.get();
}

Counter getOpenReaderCounter() {
return openReaderStats.get();
}

Counter getCloseReaderCounter() {
return closeReaderStats.get();
}

Counter getCachedReadersServedClosedCounter() {
return cachedReadersServedClosed.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class DirectReader implements LogReader {
private long currentBlock = -1;
private long currentBlockEnd = -1;
private long maxOffset;
private boolean closed;

DirectReader(int logId, String filename, ByteBufAllocator allocator,
NativeIO nativeIO, int bufferSize,
Expand All @@ -57,6 +58,7 @@ class DirectReader implements LogReader {
this.readBlockStats = readBlockStats;

nativeBuffer = new Buffer(nativeIO, bufferSize);
closed = false;

try {
fd = nativeIO.open(filename,
Expand Down Expand Up @@ -257,13 +259,19 @@ public void close() throws IOException {
try {
int ret = nativeIO.close(fd);
checkState(ret == 0, "Close should throw exception on non-zero return (%d)", ret);
closed = true;
} catch (NativeIOException ne) {
throw new IOException(exMsg(ne.getMessage())
.kv("file", filename)
.kv("errno", ne.getErrno()).toString());
}
}

@Override
public boolean isClosed() {
return closed;
}

@Override
public long maxOffset() {
return maxOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public enum Events {
*/
ENTRYLOGGER_CREATED,

/**
* The entrylogger has been configured in a way that will likely result in errors during operation.
*/
ENTRYLOGGER_MISCONFIGURED,

/**
* The entrylogger has started writing the a new log file. The previous log file may not
* be entirely flushed when this is called, though they will be after an explicit flush call.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@
* </pre>
*/
class Header {
static final int LOGFILE_HEADER_SIZE = 1024;
static final int LOGFILE_LEGACY_HEADER_SIZE = 1024;
static final int LOGFILE_DIRECT_HEADER_SIZE = Buffer.ALIGNMENT;
static final int HEADER_VERSION_OFFSET = 4;
static final int LEDGERS_MAP_OFFSET = HEADER_VERSION_OFFSET + Integer.BYTES;
static final int LEDGER_COUNT_OFFSET = LEDGERS_MAP_OFFSET + Long.BYTES;
static final int HEADER_V0 = 0; // Old log file format (no ledgers map index)
static final int HEADER_V1 = 1; // Introduced ledger map index
static final int HEADER_CURRENT_VERSION = HEADER_V1;

static final byte[] EMPTY_HEADER = new byte[Buffer.ALIGNMENT];
static final byte[] EMPTY_HEADER = new byte[LOGFILE_DIRECT_HEADER_SIZE];
static {
ByteBuf buf = Unpooled.wrappedBuffer(EMPTY_HEADER);
buf.setByte(0, 'B');
Expand All @@ -58,8 +59,8 @@ class Header {
// legacy header size is 1024, while direct is 4096 so that it can be written as a single block
// to avoid legacy failing when it encounters the header in direct, create a dummy entry, which
// skips to the start of the second block
buf.setInt(LOGFILE_HEADER_SIZE, (buf.capacity() - LOGFILE_HEADER_SIZE) - Integer.BYTES);
buf.setLong(LOGFILE_HEADER_SIZE + Integer.BYTES, LogMetadata.INVALID_LID);
buf.setInt(LOGFILE_LEGACY_HEADER_SIZE, (buf.capacity() - LOGFILE_LEGACY_HEADER_SIZE) - Integer.BYTES);
buf.setLong(LOGFILE_LEGACY_HEADER_SIZE + Integer.BYTES, LogMetadata.INVALID_LID);
};
static int extractVersion(ByteBuf header) throws IOException {
assertFingerPrint(header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void accept(long ledgerId, long size) {
}

static EntryLogMetadata read(LogReader reader) throws IOException {
ByteBuf header = reader.readBufferAt(0, Header.LOGFILE_HEADER_SIZE);
ByteBuf header = reader.readBufferAt(0, Header.LOGFILE_LEGACY_HEADER_SIZE);
try {
int headerVersion = Header.extractVersion(header);
if (headerVersion < Header.HEADER_V1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,6 @@ public interface LogReader extends AutoCloseable {

@Override
void close() throws IOException;

boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

class LogReaderScan {
static void scan(LogReader reader, EntryLogScanner scanner) throws IOException {
int offset = Header.LOGFILE_HEADER_SIZE;
int offset = Header.LOGFILE_LEGACY_HEADER_SIZE;

ByteBuf entry = PooledByteBufAllocator.DEFAULT.directBuffer(16 * 1024 * 1024);

Expand Down
Loading

0 comments on commit 13d98ae

Please sign in to comment.