Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions bookkeeper-server/conf/bk_server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ zkTimeout=10000
# The interval is specified in seconds.
#auditorPeriodicBookieCheckInterval=86400

# Entry log flush interval in bytes.
# Default is 0. 0 or less disables this feature and effectively flush
# happens on log rotation.
# Flushing in smaller chunks but more frequently reduces spikes in disk
# I/O. Flushing too frequently may also affect performance negatively.
#flushEntrylogBytes=0

# How long to wait, in seconds, before starting auto recovery of a lost bookie
#lostBookieRecoveryDelay=0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ private static class Header {
final static int MIN_SANE_ENTRY_SIZE = 8 + 8;
final static long MB = 1024 * 1024;

private final long flushIntervalInBytes;
private final boolean doRegularFlushes;
private long bytesWrittenSinceLastFlush = 0;

final ServerConfiguration conf;
/**
* Scan entries in a entry log file.
Expand Down Expand Up @@ -253,6 +257,9 @@ public EntryLogger(ServerConfiguration conf,
this.leastUnflushedLogId = logId + 1;
this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
this.conf = conf;
flushIntervalInBytes = conf.getFlushIntervalInBytes();
doRegularFlushes = flushIntervalInBytes > 0;

initialize();
}

Expand Down Expand Up @@ -737,6 +744,7 @@ void flush() throws IOException {
synchronized void flushCurrentLog() throws IOException {
if (logChannel != null) {
logChannel.flush(true);
bytesWrittenSinceLastFlush = 0;
LOG.debug("Flush and sync current entry logger {}.", logChannel.getLogId());
}
}
Expand All @@ -752,6 +760,9 @@ synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) throw
// Create new log if logSizeLimit reached or current disk is full
boolean createNewLog = shouldCreateNewEntryLog.get();
if (createNewLog || reachEntryLogLimit) {
if (doRegularFlushes) {
flushCurrentLog();
}
createNewLog();
// Reset the flag
if (createNewLog) {
Expand All @@ -766,10 +777,22 @@ synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) throw
long pos = logChannel.position();
logChannel.write(entry);
logChannel.registerWrittenEntry(ledger, entrySize);

incrementBytesWrittenAndMaybeFlush(4L + entrySize);

return (logChannel.getLogId() << 32L) | pos;
}

private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws IOException {
if (!doRegularFlushes) {
return;
}
bytesWrittenSinceLastFlush += bytesWritten;
if (bytesWrittenSinceLastFlush > flushIntervalInBytes) {
flushCurrentLog();
}
}

static long logIdForOffset(long offset) {
return offset >> 32L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class ServerConfiguration extends AbstractConfiguration {
protected final static String GC_OVERREPLICATED_LEDGER_WAIT_TIME = "gcOverreplicatedLedgerWaitTime";
// Sync Parameters
protected final static String FLUSH_INTERVAL = "flushInterval";
protected final static String FLUSH_ENTRYLOG_INTERVAL_BYTES = "flushEntrylogBytes";
// Bookie death watch interval
protected final static String DEATH_WATCH_INTERVAL = "bookieDeathWatchInterval";
// Ledger Cache Parameters
Expand Down Expand Up @@ -264,6 +265,33 @@ public ServerConfiguration setFlushInterval(int flushInterval) {
return this;
}

/**
* Set entry log flush interval in bytes.
*
* Default is 0. 0 or less disables this feature and effectively flush
* happens on log rotation.
*
* Flushing in smaller chunks but more frequently reduces spikes in disk
* I/O. Flushing too frequently may also affect performance negatively.
*
* @return Entry log flush interval in bytes
*/
public long getFlushIntervalInBytes() {
return this.getLong(FLUSH_ENTRYLOG_INTERVAL_BYTES, 0);
}

/**
* Set entry log flush interval in bytes
*
* @param flushInterval in bytes
* @return server configuration
*/
public ServerConfiguration setFlushIntervalInBytes(long flushInterval) {
this.setProperty(FLUSH_ENTRYLOG_INTERVAL_BYTES, Long.toString(flushInterval));
return this;
}


/**
* Get bookie death watch interval
*
Expand Down