Skip to content
Permalink
Browse files
FLUME-2181 - Optionally disable File Channel fsyncs (Hari via Brock)
  • Loading branch information
Brock Noland committed May 2, 2014
1 parent a94594d commit 6115e7d6d611d2b82dc2583b95a13d4c0886a93f
Showing 19 changed files with 314 additions and 89 deletions.
@@ -49,22 +49,26 @@ public class CheckpointRebuilder {
HashMultimap.create();
private final SetMultimap<Long, ComparableFlumeEventPointer>
uncommittedTakes = HashMultimap.create();
private final boolean fsyncPerTransaction;

private static Logger LOG =
LoggerFactory.getLogger(CheckpointRebuilder.class);

public CheckpointRebuilder(List<File> logFiles,
FlumeEventQueue queue) throws IOException {
FlumeEventQueue queue, boolean fsyncPerTransaction) throws
IOException {
this.logFiles = logFiles;
this.queue = queue;
this.fsyncPerTransaction = fsyncPerTransaction;
}

public boolean rebuild() throws IOException, Exception {
LOG.info("Attempting to fast replay the log files.");
List<LogFile.SequentialReader> logReaders = Lists.newArrayList();
for (File logFile : logFiles) {
try {
logReaders.add(LogFileFactory.getSequentialReader(logFile, null));
logReaders.add(LogFileFactory.getSequentialReader(logFile, null,
fsyncPerTransaction));
} catch(EOFException e) {
LOG.warn("Ignoring " + logFile + " due to EOF", e);
}
@@ -252,7 +256,8 @@ public static void main(String[] args) throws Exception {
new File(checkpointDir, "inflighttakes"),
new File(checkpointDir, "inflightputs"),
new File(checkpointDir, Log.QUEUE_SET));
CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles, queue);
CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles,
queue, true);
if(rebuilder.rebuild()) {
rebuilder.writeCheckpoint();
} else {
@@ -95,6 +95,8 @@ public class FileChannel extends BasicChannelSemantics {
private String encryptionActiveKey;
private String encryptionCipherProvider;
private boolean useDualCheckpoints;
private boolean fsyncPerTransaction;
private int fsyncInterval;

@Override
public synchronized void setName(String name) {
@@ -233,6 +235,12 @@ public void configure(Context context) {
"key provider name is not.");
}

fsyncPerTransaction = context.getBoolean(FileChannelConfiguration
.FSYNC_PER_TXN, FileChannelConfiguration.DEFAULT_FSYNC_PRE_TXN);

fsyncInterval = context.getInteger(FileChannelConfiguration
.FSYNC_INTERVAL, FileChannelConfiguration.DEFAULT_FSYNC_INTERVAL);

if(queueRemaining == null) {
queueRemaining = new Semaphore(capacity, true);
}
@@ -265,6 +273,8 @@ public synchronized void start() {
builder.setEncryptionCipherProvider(encryptionCipherProvider);
builder.setUseDualCheckpoints(useDualCheckpoints);
builder.setBackupCheckpointDir(backupCheckpointDir);
builder.setFsyncPerTransaction(fsyncPerTransaction);
builder.setFsyncInterval(fsyncInterval);
log = builder.build();
log.replay();
open = true;
@@ -328,8 +338,8 @@ protected BasicTransactionSemantics createTransaction() {
trans.getStateAsString() + channelNameDescriptor);
}
trans = new FileBackedTransaction(log, TransactionIDOracle.next(),
transactionCapacity, keepAlive, queueRemaining, getName(),
channelCounter);
transactionCapacity, keepAlive, queueRemaining, getName(),
fsyncPerTransaction, channelCounter);
transactions.set(trans);
return trans;
}
@@ -401,16 +411,19 @@ static class FileBackedTransaction extends BasicTransactionSemantics {
private final Semaphore queueRemaining;
private final String channelNameDescriptor;
private final ChannelCounter channelCounter;
private final boolean fsyncPerTransaction;
public FileBackedTransaction(Log log, long transactionID,
int transCapacity, int keepAlive, Semaphore queueRemaining,
String name, ChannelCounter counter) {
String name, boolean fsyncPerTransaction, ChannelCounter
counter) {
this.log = log;
queue = log.getFlumeEventQueue();
this.transactionID = transactionID;
this.keepAlive = keepAlive;
this.queueRemaining = queueRemaining;
putList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
takeList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
this.fsyncPerTransaction = fsyncPerTransaction;
channelNameDescriptor = "[channel=" + name + "]";
this.channelCounter = counter;
}
@@ -500,6 +513,13 @@ protected Event doTake() throws InterruptedException {
LOG.warn("Corrupt record replaced by File Channel Integrity " +
"tool found. Will retrieve next event", e);
takeList.remove(ptr);
} catch (CorruptEventException ex) {
if (fsyncPerTransaction) {
throw new ChannelException(ex);
}
LOG.warn("Corrupt record found. Event will be " +
"skipped, and next event will be read.", ex);
takeList.remove(ptr);
}
}
}
@@ -87,4 +87,11 @@ public class FileChannelConfiguration {
public static final String USE_DUAL_CHECKPOINTS = "useDualCheckpoints";
public static final boolean DEFAULT_USE_DUAL_CHECKPOINTS = false;

public static final String FSYNC_PER_TXN = "fsyncPerTransaction";
public static final boolean DEFAULT_FSYNC_PRE_TXN = true;

public static final String FSYNC_INTERVAL = "fsyncInterval";
public static final int DEFAULT_FSYNC_INTERVAL = 5; // seconds.


}
@@ -125,6 +125,9 @@ public class Log {
private final boolean useDualCheckpoints;
private volatile boolean backupRestored = false;

private final boolean fsyncPerTransaction;
private final int fsyncInterval;

private int readCount;
private int putCount;
private int takeCount;
@@ -150,6 +153,25 @@ static class Builder {
private boolean bUseDualCheckpoints = false;
private File bBackupCheckpointDir = null;

private boolean fsyncPerTransaction = true;
private int fsyncInterval;

boolean isFsyncPerTransaction() {
return fsyncPerTransaction;
}

void setFsyncPerTransaction(boolean fsyncPerTransaction) {
this.fsyncPerTransaction = fsyncPerTransaction;
}

int getFsyncInterval() {
return fsyncInterval;
}

void setFsyncInterval(int fsyncInterval) {
this.fsyncInterval = fsyncInterval;
}

Builder setUsableSpaceRefreshInterval(long usableSpaceRefreshInterval) {
bUsableSpaceRefreshInterval = usableSpaceRefreshInterval;
return this;
@@ -231,7 +253,7 @@ Log build() throws IOException {
useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
bEncryptionKeyProvider, bEncryptionKeyAlias,
bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
bLogDirs);
fsyncPerTransaction, fsyncInterval, bLogDirs);
}
}

@@ -241,7 +263,8 @@ private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
@Nullable String encryptionKeyAlias,
@Nullable String encryptionCipherProvider,
long usableSpaceRefreshInterval, File... logDirs)
long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
int fsyncInterval, File... logDirs)
throws IOException {
Preconditions.checkArgument(checkpointInterval > 0,
"checkpointInterval <= 0");
@@ -318,6 +341,8 @@ private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
this.checkpointDir = checkpointDir;
this.backupCheckpointDir = backupCheckpointDir;
this.logDirs = logDirs;
this.fsyncPerTransaction = fsyncPerTransaction;
this.fsyncInterval = fsyncInterval;
logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
workerExecutor = Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
@@ -354,7 +379,7 @@ void replay() throws IOException {
dataFiles.add(file);
nextFileID.set(Math.max(nextFileID.get(), id));
idLogFileMap.put(id, LogFileFactory.getRandomReader(new File(logDir,
PREFIX + id), encryptionKeyProvider));
PREFIX + id), encryptionKeyProvider, fsyncPerTransaction));
}
}
LOGGER.info("Found NextFileID " + nextFileID +
@@ -468,13 +493,13 @@ private void doReplay(FlumeEventQueue queue, List<File> dataFiles,
KeyProvider encryptionKeyProvider,
boolean useFastReplay) throws Exception {
CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles,
queue);
queue, fsyncPerTransaction);
if (useFastReplay && rebuilder.rebuild()) {
didFastReplay = true;
LOGGER.info("Fast replay successful.");
} else {
ReplayHandler replayHandler = new ReplayHandler(queue,
encryptionKeyProvider);
encryptionKeyProvider, fsyncPerTransaction);
if (useLogReplayV1) {
LOGGER.info("Replaying logs with v1 replay logic");
replayHandler.replayLogv1(dataFiles);
@@ -551,17 +576,20 @@ FlumeEventQueue getFlumeEventQueue() {
* @throws InterruptedException
*/
FlumeEvent get(FlumeEventPointer pointer) throws IOException,
InterruptedException, NoopRecordException {
InterruptedException, NoopRecordException, CorruptEventException {
Preconditions.checkState(open, "Log is closed");
int id = pointer.getFileID();
LogFile.RandomReader logFile = idLogFileMap.get(id);
Preconditions.checkNotNull(logFile, "LogFile is null for id " + id);
try {
return logFile.get(pointer.getOffset());
} catch (CorruptEventException ex) {
open = false;
throw new IOException("Corrupt event found. Please run File Channel " +
"Integrity tool.", ex);
if (fsyncPerTransaction) {
open = false;
throw new IOException("Corrupt event found. Please run File Channel " +
"Integrity tool.", ex);
}
throw ex;
}
}

@@ -906,9 +934,10 @@ private synchronized void roll(int index, ByteBuffer buffer)
File file = new File(logDirs[index], PREFIX + fileID);
LogFile.Writer writer = LogFileFactory.getWriter(file, fileID,
maxFileSize, encryptionKey, encryptionKeyAlias,
encryptionCipherProvider, usableSpaceRefreshInterval);
encryptionCipherProvider, usableSpaceRefreshInterval,
fsyncPerTransaction, fsyncInterval);
idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file,
encryptionKeyProvider));
encryptionKeyProvider, fsyncPerTransaction));
// writer from this point on will get new reference
logFiles.set(index, writer);
// close out old log
@@ -991,7 +1020,8 @@ private Boolean writeCheckpoint(Boolean force) throws Exception {
} finally {
writer.close();
}
reader = LogFileFactory.getRandomReader(file, encryptionKeyProvider);
reader = LogFileFactory.getRandomReader(file,
encryptionKeyProvider, fsyncPerTransaction);
idLogFileMap.put(id, reader);
LOGGER.debug("Updated checkpoint for file: " + file
+ "logWriteOrderID " + logWriteOrderID);

0 comments on commit 6115e7d

Please sign in to comment.