diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf index 678018c0ed0..17c2ed58f9a 100644 --- a/bookkeeper-server/conf/bk_server.conf +++ b/bookkeeper-server/conf/bk_server.conf @@ -270,6 +270,13 @@ zkTimeout=10000 # The interval is specified in seconds. #auditorPeriodicBookieCheckInterval=86400 +# Entry log flush interval in bytes. +# Default is 0. 0 or less disables this feature and effectively flush +# happens on log rotation. +# Flushing in smaller chunks but more frequently reduces spikes in disk +# I/O. Flushing too frequently may also affect performance negatively. +#flushEntrylogBytes=0 + # How long to wait, in seconds, before starting auto recovery of a lost bookie #lostBookieRecoveryDelay=0 diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java index ecd0e0a829f..a2b21b3dc47 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java @@ -171,6 +171,10 @@ private static class Header { final static int MIN_SANE_ENTRY_SIZE = 8 + 8; final static long MB = 1024 * 1024; + private final long flushIntervalInBytes; + private final boolean doRegularFlushes; + private long bytesWrittenSinceLastFlush = 0; + final ServerConfiguration conf; /** * Scan entries in a entry log file. @@ -253,6 +257,9 @@ public EntryLogger(ServerConfiguration conf, this.leastUnflushedLogId = logId + 1; this.entryLoggerAllocator = new EntryLoggerAllocator(logId); this.conf = conf; + flushIntervalInBytes = conf.getFlushIntervalInBytes(); + doRegularFlushes = flushIntervalInBytes > 0; + initialize(); } @@ -737,6 +744,7 @@ void flush() throws IOException { synchronized void flushCurrentLog() throws IOException { if (logChannel != null) { logChannel.flush(true); + bytesWrittenSinceLastFlush = 0; LOG.debug("Flush and sync current entry logger {}.", logChannel.getLogId()); } } @@ -752,6 +760,9 @@ synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) throw // Create new log if logSizeLimit reached or current disk is full boolean createNewLog = shouldCreateNewEntryLog.get(); if (createNewLog || reachEntryLogLimit) { + if (doRegularFlushes) { + flushCurrentLog(); + } createNewLog(); // Reset the flag if (createNewLog) { @@ -766,10 +777,22 @@ synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) throw long pos = logChannel.position(); logChannel.write(entry); logChannel.registerWrittenEntry(ledger, entrySize); + + incrementBytesWrittenAndMaybeFlush(4L + entrySize); return (logChannel.getLogId() << 32L) | pos; } + private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws IOException { + if (!doRegularFlushes) { + return; + } + bytesWrittenSinceLastFlush += bytesWritten; + if (bytesWrittenSinceLastFlush > flushIntervalInBytes) { + flushCurrentLog(); + } + } + static long logIdForOffset(long offset) { return offset >> 32L; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 92644dace50..236731e1288 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -56,6 +56,7 @@ public class ServerConfiguration extends AbstractConfiguration { protected final static String GC_OVERREPLICATED_LEDGER_WAIT_TIME = "gcOverreplicatedLedgerWaitTime"; // Sync Parameters protected final static String FLUSH_INTERVAL = "flushInterval"; + protected final static String FLUSH_ENTRYLOG_INTERVAL_BYTES = "flushEntrylogBytes"; // Bookie death watch interval protected final static String DEATH_WATCH_INTERVAL = "bookieDeathWatchInterval"; // Ledger Cache Parameters @@ -264,6 +265,33 @@ public ServerConfiguration setFlushInterval(int flushInterval) { return this; } + /** + * Set entry log flush interval in bytes. + * + * Default is 0. 0 or less disables this feature and effectively flush + * happens on log rotation. + * + * Flushing in smaller chunks but more frequently reduces spikes in disk + * I/O. Flushing too frequently may also affect performance negatively. + * + * @return Entry log flush interval in bytes + */ + public long getFlushIntervalInBytes() { + return this.getLong(FLUSH_ENTRYLOG_INTERVAL_BYTES, 0); + } + + /** + * Set entry log flush interval in bytes + * + * @param flushInterval in bytes + * @return server configuration + */ + public ServerConfiguration setFlushIntervalInBytes(long flushInterval) { + this.setProperty(FLUSH_ENTRYLOG_INTERVAL_BYTES, Long.toString(flushInterval)); + return this; + } + + /** * Get bookie death watch interval *