Skip to content
Permalink
Browse files
FLUME-2595. Add option to checkpoint on file channel shutdown
(Roshan Naik via Hari)
  • Loading branch information
harishreedharan committed Apr 13, 2015
1 parent fc03456 commit be4ae294ca549648f785e7eea7564ee95112130a
Showing 6 changed files with 102 additions and 24 deletions.
@@ -100,6 +100,7 @@ public class FileChannel extends BasicChannelSemantics {
private boolean compressBackupCheckpoint;
private boolean fsyncPerTransaction;
private int fsyncInterval;
private boolean checkpointOnClose = true;

@Override
public synchronized void setName(String name) {
@@ -251,6 +252,9 @@ public void configure(Context context) {
fsyncInterval = context.getInteger(FileChannelConfiguration
.FSYNC_INTERVAL, FileChannelConfiguration.DEFAULT_FSYNC_INTERVAL);

checkpointOnClose = context.getBoolean(FileChannelConfiguration
.CHKPT_ONCLOSE, FileChannelConfiguration.DEFAULT_CHKPT_ONCLOSE);

if(queueRemaining == null) {
queueRemaining = new Semaphore(capacity, true);
}
@@ -286,6 +290,7 @@ public synchronized void start() {
builder.setBackupCheckpointDir(backupCheckpointDir);
builder.setFsyncPerTransaction(fsyncPerTransaction);
builder.setFsyncInterval(fsyncInterval);
builder.setCheckpointOnClose(checkpointOnClose);
log = builder.build();
log.replay();
open = true;
@@ -98,5 +98,6 @@ public class FileChannelConfiguration {
public static final String FSYNC_INTERVAL = "fsyncInterval";
public static final int DEFAULT_FSYNC_INTERVAL = 5; // seconds.


public static final String CHKPT_ONCLOSE = "checkpointOnClose";
public static final Boolean DEFAULT_CHKPT_ONCLOSE = true;
}
@@ -128,6 +128,7 @@ public class Log {

private final boolean fsyncPerTransaction;
private final int fsyncInterval;
private final boolean checkpointOnClose;

private int readCount;
private int putCount;
@@ -158,6 +159,8 @@ static class Builder {
private boolean fsyncPerTransaction = true;
private int fsyncInterval;

private boolean checkpointOnClose = true;

boolean isFsyncPerTransaction() {
return fsyncPerTransaction;
}
@@ -254,13 +257,18 @@ Builder setBackupCheckpointDir(File backupCheckpointDir) {
return this;
}

Builder setCheckpointOnClose(boolean enableCheckpointOnClose) {
this.checkpointOnClose = enableCheckpointOnClose;
return this;
}

Log build() throws IOException {
return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
bUseDualCheckpoints, bCompressBackupCheckpoint,bCheckpointDir,
bBackupCheckpointDir, bName, useLogReplayV1, useFastReplay,
bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias,
bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
fsyncPerTransaction, fsyncInterval, bLogDirs);
fsyncPerTransaction, fsyncInterval, checkpointOnClose, bLogDirs);
}
}

@@ -272,7 +280,7 @@ private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
@Nullable String encryptionKeyAlias,
@Nullable String encryptionCipherProvider,
long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
int fsyncInterval, File... logDirs)
int fsyncInterval, boolean checkpointOnClose, File... logDirs)
throws IOException {
Preconditions.checkArgument(checkpointInterval > 0,
"checkpointInterval <= 0");
@@ -352,6 +360,8 @@ private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
this.logDirs = logDirs;
this.fsyncPerTransaction = fsyncPerTransaction;
this.fsyncInterval = fsyncInterval;
this.checkpointOnClose = checkpointOnClose;

logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
workerExecutor = Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
@@ -791,6 +801,14 @@ void close() throws IOException{
lockExclusive();
try {
open = false;
try {
if(checkpointOnClose) {
writeCheckpoint(true); // do this before acquiring exclusive lock
}
} catch (Exception err) {
LOGGER.warn("Failed creating checkpoint on close of channel " + channelNameDescriptor +
"Replay will take longer next time channel is started.", err);
}
shutdownWorker();
if (logFiles != null) {
for (int index = 0; index < logFiles.length(); index++) {
@@ -22,6 +22,8 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.*;
import java.util.Collection;
import java.util.List;

import org.apache.commons.io.FileUtils;
@@ -35,6 +37,8 @@
import com.google.common.collect.Lists;
import com.google.common.io.Files;

import javax.ws.rs.Path;

public class TestLog {
private static final Logger LOGGER = LoggerFactory.getLogger(TestLog.class);
private static final long MAX_FILE_SIZE = 1000;
@@ -56,7 +60,7 @@ public void setup() throws IOException {
}
log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
checkpointDir).setLogDirs(dataDirs)
checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(false)
.setChannelName("testlog").build();
log.replay();
}
@@ -465,6 +469,34 @@ public void testCachedFSUsableSpace() throws Exception {
Long.MAX_VALUE - 1L);
}

@Test
public void testCheckpointOnClose() throws Exception {
log.close();
log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(true)
.setChannelName("testLog").build();
log.replay();


// 1 Write One Event
FlumeEvent eventIn = TestUtils.newPersistableEvent();
log.put(transactionID, eventIn);
log.commitPut(transactionID);

// 2 Check state of checkpoint before close
File checkPointMetaFile =
FileUtils.listFiles(checkpointDir,new String[]{"meta"},false).iterator().next();
long before = FileUtils.checksumCRC32( checkPointMetaFile );

// 3 Close Log
log.close();

// 4 Verify that checkpoint was modified on close
long after = FileUtils.checksumCRC32( checkPointMetaFile );
Assert.assertFalse( before == after );
}

private void takeAndVerify(FlumeEventPointer eventPointerIn,
FlumeEvent eventIn)
throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
@@ -479,4 +511,5 @@ private void takeAndVerify(FlumeEventPointer eventPointerIn,
Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
}

}
@@ -2638,6 +2638,7 @@ capacity 1000000
keep-alive 3 Amount of time (in sec) to wait for a put operation
use-log-replay-v1 false Expert: Use old replay logic
use-fast-replay false Expert: Replay without using queue
checkpointOnClose true Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay.
encryption.activeKey -- Key name used to encrypt new data
encryption.cipherProvider -- Cipher provider type, supported types: AESCTRNOPADDING
encryption.keyProvider -- Key provider type, supported types: JCEKSFILE
@@ -97,10 +97,21 @@ public static void tearDownClass() throws Exception {
FileUtils.deleteDirectory(origDataDir);
}

@Test
public void testFixCorruptRecordsWithCheckpoint() throws Exception {
doTestFixCorruptEvents(true);
}

@Test
public void testFixCorruptRecords() throws Exception {
doTestFixCorruptEvents(false);
}

@Test
public void testFixInvalidRecords() throws Exception {
doTestFixInvalidEvents(false, DummyEventVerifier.Builder.class.getName());
}

@Test
public void testFixInvalidRecordsWithCheckpoint() throws Exception {
doTestFixInvalidEvents(true, DummyEventVerifier.Builder.class.getName());
@@ -111,15 +122,24 @@ public void doTestFixInvalidEvents(boolean withCheckpoint, String eventHandler)
tool.run(new String[] {"-l", dataDir.toString(), "-e", eventHandler, "-DvalidatorValue=0"});
FileChannel channel = new FileChannel();
channel.setName("channel");
String cp;
if(withCheckpoint) {
cp = origCheckpointDir.toString();
if (withCheckpoint) {
File[] cpFiles = origCheckpointDir.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
if (name.contains("lock") || name.contains("queueset")) {
return false;
}
return true;
}
});
for (File cpFile : cpFiles) {
Serialization.copyFile(cpFile, new File(checkpointDir, cpFile.getName()));
}
} else {
FileUtils.deleteDirectory(checkpointDir);
Assert.assertTrue(checkpointDir.mkdirs());
cp = checkpointDir.toString();
}
ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp);
ctx.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir.toString());
ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString());
channel.configure(ctx);
channel.start();
@@ -136,15 +156,6 @@ public void doTestFixInvalidEvents(boolean withCheckpoint, String eventHandler)
Assert.assertEquals(25 - invalidEvent, i);
}

@Test
public void testFixCorruptRecords() throws Exception {
doTestFixCorruptEvents(false);
}
@Test
public void testFixCorruptRecordsWithCheckpoint() throws Exception {
doTestFixCorruptEvents(true);
}

public void doTestFixCorruptEvents(boolean withCheckpoint) throws Exception {
Set<String> corruptFiles = new HashSet<String>();
File[] files = dataDir.listFiles(new FilenameFilter() {
@@ -193,18 +204,27 @@ public boolean accept(File dir, String name) {

}
FileChannelIntegrityTool tool = new FileChannelIntegrityTool();
tool.run(new String[] {"-l", dataDir.toString()});
tool.run(new String[]{"-l", dataDir.toString()});
FileChannel channel = new FileChannel();
channel.setName("channel");
String cp;
if(withCheckpoint) {
cp = origCheckpointDir.toString();
if (withCheckpoint) {
File[] cpFiles = origCheckpointDir.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
if (name.contains("lock") || name.contains("queueset")) {
return false;
}
return true;
}
});
for (File cpFile : cpFiles) {
Serialization.copyFile(cpFile, new File(checkpointDir, cpFile.getName()));
}
} else {
FileUtils.deleteDirectory(checkpointDir);
Assert.assertTrue(checkpointDir.mkdirs());
cp = checkpointDir.toString();
}
ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp);
ctx.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir.toString());
ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString());
channel.configure(ctx);
channel.start();

0 comments on commit be4ae29

Please sign in to comment.