Permalink
Browse files

FLUME-1516: FileChannel Write Dual Checkpoints to avoid replays

(Hari Shreedharan via Brock Noland)
  • Loading branch information...
1 parent df7a197 commit 6ca616800ec897551fbb14959ce3a5f0c1d69aed Brock Noland committed Apr 5, 2013
Showing with 1,104 additions and 225 deletions.
  1. +2 −0 ...annels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
  2. +16 −5 ...flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
  3. +155 −1 ...ls/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
  4. +14 −3 .../flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
  5. +64 −53 flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
  6. +10 −0 ...nels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
  7. +0 −18 ...e-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
  8. +144 −39 flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
  9. +37 −23 flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
  10. +25 −16 flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
  11. +42 −27 flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
  12. +101 −4 flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
  13. +131 −15 ...-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
  14. +2 −0 flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
  15. +5 −2 ...-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
  16. +341 −15 ...annels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
  17. +13 −4 flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
  18. +2 −0 flume-ng-doc/sphinx/FlumeUserGuide.rst
View
2 ...lume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
@@ -29,6 +29,8 @@
private long logWriteOrderID;
private final int capacity;
private final String name;
+ public static final String BACKUP_COMPLETE_FILENAME = "backupComplete";
+ protected Boolean slowdownBackup = false;
protected EventQueueBackingStore(int capacity, String name) {
this.capacity = capacity;
View
21 ...le-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
@@ -35,8 +35,14 @@ static EventQueueBackingStore get(File checkpointFile, int capacity,
String name) throws Exception {
return get(checkpointFile, capacity, name, true);
}
+
static EventQueueBackingStore get(File checkpointFile, int capacity,
String name, boolean upgrade) throws Exception {
+ return get(checkpointFile, null, capacity, name, upgrade, false);
+ }
+ static EventQueueBackingStore get(File checkpointFile,
+ File backupCheckpointDir, int capacity,String name,
+ boolean upgrade, boolean shouldBackup) throws Exception {
File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
RandomAccessFile checkpointFileHandle = null;
try {
@@ -61,17 +67,20 @@ static EventQueueBackingStore get(File checkpointFile, int capacity,
if(!checkpointFile.createNewFile()) {
throw new IOException("Cannot create " + checkpointFile);
}
- return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
+ return new EventQueueBackingStoreFileV3(checkpointFile,
+ capacity, name, backupCheckpointDir, shouldBackup);
}
// v3 due to meta file, version will be checked by backing store
if(metaDataExists) {
- return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
+ return new EventQueueBackingStoreFileV3(checkpointFile, capacity,
+ name, backupCheckpointDir, shouldBackup);
}
checkpointFileHandle = new RandomAccessFile(checkpointFile, "r");
int version = (int)checkpointFileHandle.readLong();
if(Serialization.VERSION_2 == version) {
if(upgrade) {
- return upgrade(checkpointFile, capacity, name);
+ return upgrade(checkpointFile, capacity, name, backupCheckpointDir,
+ shouldBackup);
}
return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
}
@@ -91,7 +100,8 @@ static EventQueueBackingStore get(File checkpointFile, int capacity,
}
private static EventQueueBackingStore upgrade(File checkpointFile,
- int capacity, String name)
+ int capacity, String name, File backupCheckpointDir,
+ boolean shouldBackup)
throws Exception {
LOG.info("Attempting upgrade of " + checkpointFile + " for " + name);
EventQueueBackingStoreFileV2 backingStoreV2 =
@@ -103,7 +113,8 @@ private static EventQueueBackingStore upgrade(File checkpointFile,
File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile,
metaDataFile);
- return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
+ return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name,
+ backupCheckpointDir, shouldBackup);
}
}
View
156 ...-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -28,8 +28,14 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,11 +62,25 @@
protected final MappedByteBuffer mappedBuffer;
protected final RandomAccessFile checkpointFileHandle;
protected final File checkpointFile;
+ private final Semaphore backupCompletedSema = new Semaphore(1);
+ protected final boolean shouldBackup;
+ private final File backupDir;
+ private final ExecutorService checkpointBackUpExecutor;
protected EventQueueBackingStoreFile(int capacity, String name,
- File checkpointFile) throws IOException, BadCheckpointException {
+ File checkpointFile) throws IOException,
+ BadCheckpointException {
+ this(capacity, name, checkpointFile, null, false);
+ }
+
+ protected EventQueueBackingStoreFile(int capacity, String name,
+ File checkpointFile, File checkpointBackupDir,
+ boolean backupCheckpoint) throws IOException,
+ BadCheckpointException {
super(capacity, name);
this.checkpointFile = checkpointFile;
+ this.shouldBackup = backupCheckpoint;
+ this.backupDir = checkpointBackupDir;
checkpointFileHandle = new RandomAccessFile(checkpointFile, "rw");
long totalBytes = (capacity + HEADER_SIZE) * Serialization.SIZE_OF_LONG;
if(checkpointFileHandle.length() == 0) {
@@ -95,6 +115,13 @@ protected EventQueueBackingStoreFile(int capacity, String name,
+ " probably because the agent stopped while the channel was"
+ " checkpointing.");
}
+ if (shouldBackup) {
+ checkpointBackUpExecutor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat(
+ getName() + " - CheckpointBackUpThread").build());
+ } else {
+ checkpointBackUpExecutor = null;
+ }
}
protected long getCheckpointLogWriteOrderID() {
@@ -103,11 +130,104 @@ protected long getCheckpointLogWriteOrderID() {
protected abstract void writeCheckpointMetaData() throws IOException;
+ /**
+ * This method backs up the checkpoint and its metadata files. This method
+ * is called once the checkpoint is completely written and is called
+ * from a separate thread which runs in the background while the file channel
+ * continues operation.
+ *
+ * @param backupDirectory - the directory to which the backup files should be
+ * copied.
+ * @throws IOException - if the copy failed, or if there is not enough disk
+ * space to copy the checkpoint files over.
+ */
+ protected void backupCheckpoint(File backupDirectory) throws IOException {
+ int availablePermits = backupCompletedSema.drainPermits();
+ Preconditions.checkState(availablePermits == 0,
+ "Expected no permits to be available in the backup semaphore, " +
+ "but " + availablePermits + " permits were available.");
+ if (slowdownBackup) {
+ try {
+ TimeUnit.SECONDS.sleep(10);
+ } catch (Exception ex) {
+ Throwables.propagate(ex);
+ }
+ }
+ File backupFile = new File(backupDirectory, BACKUP_COMPLETE_FILENAME);
+ if (backupExists(backupDirectory)) {
+ if (!backupFile.delete()) {
+ throw new IOException("Error while doing backup of checkpoint. Could " +
+ "not remove" + backupFile.toString() + ".");
+ }
+ }
+ Serialization.deleteAllFiles(backupDirectory, Log.EXCLUDES);
+ File checkpointDir = checkpointFile.getParentFile();
+ File[] checkpointFiles = checkpointDir.listFiles();
+ Preconditions.checkNotNull(checkpointFiles, "Could not retrieve files " +
+ "from the checkpoint directory. Cannot complete backup of the " +
+ "checkpoint.");
+ for (File origFile : checkpointFiles) {
+ if(origFile.getName().equals(Log.FILE_LOCK)) {
+ continue;
+ }
+ Serialization.copyFile(origFile, new File(backupDirectory,
+ origFile.getName()));
+ }
+ Preconditions.checkState(!backupFile.exists(), "The backup file exists " +
+ "while it is not supposed to. Are multiple channels configured to use " +
+ "this directory: " + backupDirectory.toString() + " as backup?");
+ if (!backupFile.createNewFile()) {
+ LOG.error("Could not create backup file. Backup of checkpoint will " +
+ "not be used during replay even if checkpoint is bad.");
+ }
+ }
+
+ /**
+ * Restore the checkpoint, if it is found to be bad.
+ * @return true - if the previous backup was successfully completed and
+ * restore was successfully completed.
+ * @throws IOException - If restore failed due to IOException
+ *
+ */
+ public static boolean restoreBackup(File checkpointDir, File backupDir)
+ throws IOException {
+ if (!backupExists(backupDir)) {
+ return false;
+ }
+ Serialization.deleteAllFiles(checkpointDir, Log.EXCLUDES);
+ File[] backupFiles = backupDir.listFiles();
+ if (backupFiles == null) {
+ return false;
+ } else {
+ for (File backupFile : backupFiles) {
+ String fileName = backupFile.getName();
+ if (!fileName.equals(BACKUP_COMPLETE_FILENAME) &&
+ !fileName.equals(Log.FILE_LOCK)) {
+ Serialization.copyFile(backupFile, new File(checkpointDir, fileName));
+ }
+ }
+ return true;
+ }
+ }
+
@Override
void beginCheckpoint() throws IOException {
LOG.info("Start checkpoint for " + checkpointFile +
", elements to sync = " + overwriteMap.size());
+ if (shouldBackup) {
+ int permits = backupCompletedSema.drainPermits();
+ Preconditions.checkState(permits <= 1, "Expected only one or less " +
+ "permits to checkpoint, but got " + String.valueOf(permits) +
+ " permits");
+ if(permits < 1) {
+ // Force the checkpoint to not happen by throwing an exception.
+ throw new IOException("Previous backup of checkpoint files is still " +
+ "in progress. Will attempt to checkpoint only at the end of the " +
+ "next checkpoint interval. Try increasing the checkpoint interval " +
+ "if this error happens often.");
+ }
+ }
// Start checkpoint
elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE);
mappedBuffer.force();
@@ -141,8 +261,38 @@ void checkpoint() throws IOException {
// Finish checkpoint
elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE);
mappedBuffer.force();
+ if (shouldBackup) {
+ startBackupThread();
+ }
}
+ /**
+ * This method starts backing up the checkpoint in the background.
+ */
+ private void startBackupThread() {
+ Preconditions.checkNotNull(checkpointBackUpExecutor,
+ "Expected the checkpoint backup exector to be non-null, " +
+ "but it is null. Checkpoint will not be backed up.");
+ LOG.info("Attempting to back up checkpoint.");
+ checkpointBackUpExecutor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ boolean error = false;
+ try {
+ backupCheckpoint(backupDir);
+ } catch (Throwable throwable) {
+ error = true;
+ LOG.error("Backing up of checkpoint directory failed.", throwable);
+ } finally {
+ backupCompletedSema.release();
+ }
+ if (!error) {
+ LOG.info("Checkpoint backup completed.");
+ }
+ }
+ });
+ }
@Override
void close() {
@@ -242,6 +392,10 @@ protected static void allocate(File file, long totalBytes) throws IOException {
}
}
+ public static boolean backupExists(File backupDir) {
+ return new File(backupDir, BACKUP_COMPLETE_FILENAME).exists();
+ }
+
public static void main(String[] args) throws Exception {
File file = new File(args[0]);
File inflightTakesFile = new File(args[1]);
View
17 ...ile-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
@@ -38,9 +38,15 @@
.getLogger(EventQueueBackingStoreFileV3.class);
private final File metaDataFile;
- EventQueueBackingStoreFileV3(File checkpointFile, int capacity, String name)
- throws IOException, BadCheckpointException {
- super(capacity, name, checkpointFile);
+ EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
+ String name) throws IOException, BadCheckpointException {
+ this(checkpointFile, capacity, name, null, false);
+ }
+
+ EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
+ String name, File checkpointBackupDir,
+ boolean backupCheckpoint) throws IOException, BadCheckpointException {
+ super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint);
Preconditions.checkArgument(capacity > 0,
"capacity must be greater than 0 " + capacity);
metaDataFile = Serialization.getMetaDataFile(checkpointFile);
@@ -89,6 +95,11 @@
}
}
} else {
+ if(backupExists(checkpointBackupDir) && shouldBackup) {
+ // If a backup exists, then throw an exception to recover checkpoint
+ throw new BadCheckpointException("The checkpoint metadata file does " +
+ "not exist, but a backup exists");
+ }
ProtosFactory.Checkpoint.Builder checkpointBuilder =
ProtosFactory.Checkpoint.newBuilder();
checkpointBuilder.setVersion(getVersion());
View
117 ...-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -19,22 +19,17 @@
package org.apache.flume.channel.file;
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
+import org.apache.flume.annotations.Disposable;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
-import org.apache.flume.annotations.Disposable;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.channel.file.Log.Builder;
@@ -45,8 +40,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
/**
* <p>
@@ -83,6 +82,7 @@
private long maxFileSize;
private long minimumRequiredSpace;
private File checkpointDir;
+ private File backupCheckpointDir;
private File[] dataDirs;
private Log log;
private volatile boolean open;
@@ -99,6 +99,7 @@
private KeyProvider encryptionKeyProvider;
private String encryptionActiveKey;
private String encryptionCipherProvider;
+ private boolean useDualCheckpoints;
@Override
public synchronized void setName(String name) {
@@ -109,60 +110,51 @@ public synchronized void setName(String name) {
@Override
public void configure(Context context) {
+ useDualCheckpoints = context.getBoolean(
+ FileChannelConfiguration.USE_DUAL_CHECKPOINTS,
+ FileChannelConfiguration.DEFAULT_USE_DUAL_CHECKPOINTS);
String homePath = System.getProperty("user.home").replace('\\', '/');
String strCheckpointDir =
context.getString(FileChannelConfiguration.CHECKPOINT_DIR,
homePath + "/.flume/file-channel/checkpoint");
+ String strBackupCheckpointDir = context.getString
+ (FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, "").trim();
+
String[] strDataDirs = context.getString(FileChannelConfiguration.DATA_DIRS,
homePath + "/.flume/file-channel/data").split(",");
- if(checkpointDir == null) {
- checkpointDir = new File(strCheckpointDir);
- } else if(!checkpointDir.getAbsolutePath().
- equals(new File(strCheckpointDir).getAbsolutePath())) {
- LOG.warn("An attempt was made to change the checkpoint " +
- "directory after start, this is not supported.");
+ checkpointDir = new File(strCheckpointDir);
+
+ if (useDualCheckpoints) {
+ Preconditions.checkState(!strBackupCheckpointDir.isEmpty(),
+ "Dual checkpointing is enabled, but the backup directory is not set. " +
+ "Please set " + FileChannelConfiguration.BACKUP_CHECKPOINT_DIR + " " +
+ "to enable dual checkpointing");
+ backupCheckpointDir = new File(strBackupCheckpointDir);
+ /*
+ * If the backup directory is the same as the checkpoint directory,
+ * then throw an exception and force the config system to ignore this
+ * channel.
+ */
+ Preconditions.checkState(!backupCheckpointDir.equals(checkpointDir),
+ "Could not configure " + getName() + ". The checkpoint backup " +
+ "directory and the checkpoint directory are " +
+ "configured to be the same.");
}
- if(dataDirs == null) {
- dataDirs = new File[strDataDirs.length];
- for (int i = 0; i < strDataDirs.length; i++) {
- dataDirs[i] = new File(strDataDirs[i]);
- }
- } else {
- boolean changed = false;
- if(dataDirs.length != strDataDirs.length) {
- changed = true;
- } else {
- for (int i = 0; i < strDataDirs.length; i++) {
- if(!dataDirs[i].getAbsolutePath().
- equals(new File(strDataDirs[i]).getAbsolutePath())) {
- changed = true;
- break;
- }
- }
- }
- if(changed) {
- LOG.warn("An attempt was made to change the data " +
- "directories after start, this is not supported.");
- }
+
+ dataDirs = new File[strDataDirs.length];
+ for (int i = 0; i < strDataDirs.length; i++) {
+ dataDirs[i] = new File(strDataDirs[i]);
}
- int newCapacity = context.getInteger(FileChannelConfiguration.CAPACITY,
+ capacity = context.getInteger(FileChannelConfiguration.CAPACITY,
FileChannelConfiguration.DEFAULT_CAPACITY);
- if(newCapacity <= 0 && capacity == 0) {
- newCapacity = FileChannelConfiguration.DEFAULT_CAPACITY;
+ if(capacity <= 0) {
+ capacity = FileChannelConfiguration.DEFAULT_CAPACITY;
LOG.warn("Invalid capacity specified, initializing channel to "
- + "default capacity of {}", newCapacity);
- }
- if(capacity > 0 && newCapacity != capacity) {
- LOG.warn("Capacity of this channel cannot be sized on the fly due " +
- "the requirement we have enough DirectMemory for the queue and " +
- "downsizing of the queue cannot be guranteed due to the " +
- "fact there maybe more items on the queue than the new capacity.");
- } else {
- capacity = newCapacity;
+ + "default capacity of {}", capacity);
}
keepAlive =
@@ -181,8 +173,8 @@ public void configure(Context context) {
}
Preconditions.checkState(transactionCapacity <= capacity,
- "File Channel transaction capacity cannot be greater than the " +
- "capacity of the channel.");
+ "File Channel transaction capacity cannot be greater than the " +
+ "capacity of the channel.");
checkpointInterval =
context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL,
@@ -303,6 +295,8 @@ public synchronized void start() {
builder.setEncryptionKeyProvider(encryptionKeyProvider);
builder.setEncryptionKeyAlias(encryptionActiveKey);
builder.setEncryptionCipherProvider(encryptionCipherProvider);
+ builder.setUseDualCheckpoints(useDualCheckpoints);
+ builder.setBackupCheckpointDir(backupCheckpointDir);
log = builder.build();
log.replay();
open = true;
@@ -402,6 +396,23 @@ public boolean isOpen() {
}
/**
+ * Did this channel recover a backup of the checkpoint to restart?
+ * @return true if the channel recovered using a backup.
+ */
+ @VisibleForTesting
+ boolean checkpointBackupRestored() {
+ if(log != null) {
+ return log.backupRestored();
+ }
+ return false;
+ }
+
+ @VisibleForTesting
+ Log getLog() {
+ return log;
+ }
+
+ /**
* Transaction backed by a file. This transaction supports either puts
* or takes but not both.
*/
@@ -462,7 +473,7 @@ protected void doPut(Event event) throws InterruptedException {
}
FlumeEventPointer ptr = log.put(transactionID, event);
Preconditions.checkState(putList.offer(ptr), "putList offer failed "
- + channelNameDescriptor);
+ + channelNameDescriptor);
queue.addWithoutCommit(ptr, transactionID);
success = true;
} catch (IOException e) {
View
10 ...me-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
@@ -23,6 +23,12 @@
* Directory Checkpoints will be written in
*/
public static final String CHECKPOINT_DIR = "checkpointDir";
+
+ /**
+ * The directory to which the checkpoint must be backed up
+ */
+ public static final String BACKUP_CHECKPOINT_DIR = "backupCheckpointDir";
+
/**
* Directories data files will be written in. Multiple directories
* can be specified as comma separated values. Writes will
@@ -90,4 +96,8 @@
public static final String USE_FAST_REPLAY = "use-fast-replay";
public static final boolean DEFAULT_USE_FAST_REPLAY = false;
+
+ public static final String USE_DUAL_CHECKPOINTS = "useDualCheckpoints";
+ public static final boolean DEFAULT_USE_DUAL_CHECKPOINTS = false;
+
}
View
18 ...nnels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -30,12 +30,7 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -394,19 +389,6 @@ public void addEvent(Long transactionID, Long pointer){
* asynchronously written to disk.
*/
public void serializeAndWrite() throws Exception {
- //Check if there is a current write happening, if there is abort it.
- if (future != null) {
- try {
- future.cancel(true);
- } catch (Exception e) {
- LOG.warn("Interrupted a write to inflights "
- + "file: " + inflightEventsFile.getName()
- + " to start a new write.");
- }
- while (!future.isDone()) {
- TimeUnit.MILLISECONDS.sleep(100);
- }
- }
Collection<Long> values = inflightEvents.values();
if(!fileChannel.isOpen()){
file = new RandomAccessFile(inflightEventsFile, "rw");
View
183 flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -18,6 +18,23 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -31,6 +48,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Executors;
@@ -42,23 +60,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flume.ChannelException;
-import org.apache.flume.Event;
-import org.apache.flume.annotations.InterfaceAudience;
-import org.apache.flume.annotations.InterfaceStability;
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
/**
* Stores FlumeEvents on disk and pointers to the events in a in memory queue.
* Once a log object is created the replay method should be called to reconcile
@@ -76,12 +77,13 @@
public static final String PREFIX = "log-";
private static final Logger LOGGER = LoggerFactory.getLogger(Log.class);
private static final int MIN_NUM_LOGS = 2;
- private static final String FILE_LOCK = "in_use.lock";
+ public static final String FILE_LOCK = "in_use.lock";
// for reader
private final Map<Integer, LogFile.RandomReader> idLogFileMap = Collections
.synchronizedMap(new HashMap<Integer, LogFile.RandomReader>());
private final AtomicInteger nextFileID = new AtomicInteger(0);
private final File checkpointDir;
+ private final File backupCheckpointDir;
private final File[] logDirs;
private final int queueCapacity;
private final AtomicReferenceArray<LogFile.Writer> logFiles;
@@ -97,6 +99,11 @@
private final Map<String, FileLock> locks;
private final ReentrantReadWriteLock checkpointLock =
new ReentrantReadWriteLock(true);
+
+ /**
+ * Set of files that should be excluded from backup and restores.
+ */
+ public static final Set<String> EXCLUDES = Sets.newHashSet(FILE_LOCK);
/**
* Shared lock
*/
@@ -115,6 +122,16 @@
private Key encryptionKey;
private final long usableSpaceRefreshInterval;
private boolean didFastReplay = false;
+ private final boolean useDualCheckpoints;
+ private volatile boolean backupRestored = false;
+
+ private int readCount;
+ private int putCount;
+ private int takeCount;
+ private int committedCount;
+ private int rollbackCount;
+
+ private final List<File> pendingDeletes = Lists.newArrayList();
static class Builder {
private long bCheckpointInterval;
@@ -134,6 +151,8 @@
private String bEncryptionKeyAlias;
private String bEncryptionCipherProvider;
private long bUsableSpaceRefreshInterval = 15L * 1000L;
+ private boolean bUseDualCheckpoints = false;
+ private File bBackupCheckpointDir = null;
Builder setUsableSpaceRefreshInterval(long usableSpaceRefreshInterval) {
bUsableSpaceRefreshInterval = usableSpaceRefreshInterval;
@@ -210,9 +229,20 @@ Builder setEncryptionCipherProvider(String encryptionCipherProvider) {
return this;
}
+ Builder setUseDualCheckpoints(boolean UseDualCheckpoints) {
+ this.bUseDualCheckpoints = UseDualCheckpoints;
+ return this;
+ }
+
+ Builder setBackupCheckpointDir(File backupCheckpointDir) {
+ this.bBackupCheckpointDir = backupCheckpointDir;
+ return this;
+ }
+
Log build() throws IOException {
return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
- bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName,
+ bLogWriteTimeout, bCheckpointWriteTimeout, bUseDualCheckpoints,
+ bCheckpointDir, bBackupCheckpointDir, bName,
useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
bEncryptionKeyProvider, bEncryptionKeyAlias,
bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
@@ -221,23 +251,32 @@ Log build() throws IOException {
}
private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
- int logWriteTimeout, int checkpointWriteTimeout, File checkpointDir,
+ int logWriteTimeout, int checkpointWriteTimeout,
+ boolean useDualCheckpoints, File checkpointDir, File backupCheckpointDir,
String name, boolean useLogReplayV1, boolean useFastReplay,
long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
@Nullable String encryptionKeyAlias,
@Nullable String encryptionCipherProvider,
long usableSpaceRefreshInterval, File... logDirs)
throws IOException {
Preconditions.checkArgument(checkpointInterval > 0,
- "checkpointInterval <= 0");
+ "checkpointInterval <= 0");
Preconditions.checkArgument(queueCapacity > 0, "queueCapacity <= 0");
Preconditions.checkArgument(maxFileSize > 0, "maxFileSize <= 0");
Preconditions.checkNotNull(checkpointDir, "checkpointDir");
Preconditions.checkArgument(usableSpaceRefreshInterval > 0,
"usableSpaceRefreshInterval <= 0");
Preconditions.checkArgument(
- checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
- + checkpointDir + " could not be created");
+ checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
+ + checkpointDir + " could not be created");
+ if (useDualCheckpoints) {
+ Preconditions.checkNotNull(backupCheckpointDir, "backupCheckpointDir is" +
+ " null while dual checkpointing is enabled.");
+ Preconditions.checkArgument(
+ backupCheckpointDir.isDirectory() || backupCheckpointDir.mkdirs(),
+ "Backup CheckpointDir " + backupCheckpointDir +
+ " could not be created");
+ }
Preconditions.checkNotNull(logDirs, "logDirs");
Preconditions.checkArgument(logDirs.length > 0, "logDirs empty");
Preconditions.checkArgument(name != null && !name.trim().isEmpty(),
@@ -255,6 +294,9 @@ private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
locks = Maps.newHashMap();
try {
lock(checkpointDir);
+ if(useDualCheckpoints) {
+ lock(backupCheckpointDir);
+ }
for (File logDir : logDirs) {
lock(logDir);
}
@@ -288,13 +330,15 @@ private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
this.checkpointInterval = Math.max(checkpointInterval, 1000);
this.maxFileSize = maxFileSize;
this.queueCapacity = queueCapacity;
+ this.useDualCheckpoints = useDualCheckpoints;
this.checkpointDir = checkpointDir;
+ this.backupCheckpointDir = backupCheckpointDir;
this.logDirs = logDirs;
this.logWriteTimeout = logWriteTimeout;
this.checkpointWriteTimeout = checkpointWriteTimeout;
logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
workerExecutor = Executors.newSingleThreadScheduledExecutor(new
- ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
+ ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
.build());
workerExecutor.scheduleWithFixedDelay(new BackgroundWorker(this),
this.checkpointInterval, this.checkpointInterval,
@@ -365,8 +409,9 @@ void replay() throws IOException {
try {
backingStore =
- EventQueueBackingStoreFactory.get(checkpointFile, queueCapacity,
- channelNameDescriptor);
+ EventQueueBackingStoreFactory.get(checkpointFile,
+ backupCheckpointDir, queueCapacity, channelNameDescriptor,
+ true, this.useDualCheckpoints);
queue = new FlumeEventQueue(backingStore, inflightTakesFile,
inflightPutsFile);
LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified())
@@ -383,14 +428,26 @@ void replay() throws IOException {
*/
doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay);
} catch (BadCheckpointException ex) {
- LOGGER.warn("Checkpoint may not have completed successfully. "
- + "Forcing full replay, this may take a while.", ex);
- if(!Serialization.deleteAllFiles(checkpointDir)) {
- throw new IOException("Could not delete files in checkpoint " +
- "directory to recover from a corrupt or incomplete checkpoint");
+ backupRestored = false;
+ if (useDualCheckpoints) {
+ LOGGER.warn("Checkpoint may not have completed successfully. "
+ + "Restoring checkpoint and starting up.", ex);
+ if (EventQueueBackingStoreFile.backupExists(backupCheckpointDir)) {
+ backupRestored = EventQueueBackingStoreFile.restoreBackup(
+ checkpointDir, backupCheckpointDir);
+ }
+ }
+ if (!backupRestored) {
+ LOGGER.warn("Checkpoint may not have completed successfully. "
+ + "Forcing full replay, this may take a while.", ex);
+ if (!Serialization.deleteAllFiles(checkpointDir, EXCLUDES)) {
+ throw new IOException("Could not delete files in checkpoint " +
+ "directory to recover from a corrupt or incomplete checkpoint");
+ }
}
backingStore = EventQueueBackingStoreFactory.get(checkpointFile,
- queueCapacity, channelNameDescriptor);
+ backupCheckpointDir,
+ queueCapacity, channelNameDescriptor, true, useDualCheckpoints);
queue = new FlumeEventQueue(backingStore, inflightTakesFile,
inflightPutsFile);
// If the checkpoint was deleted due to BadCheckpointException, then
@@ -441,13 +498,48 @@ private void doReplay(FlumeEventQueue queue, List<File> dataFiles,
LOGGER.info("Replaying logs with v2 replay logic");
replayHandler.replayLog(dataFiles);
}
+ readCount = replayHandler.getReadCount();
+ putCount = replayHandler.getPutCount();
+ takeCount = replayHandler.getTakeCount();
+ rollbackCount = replayHandler.getRollbackCount();
+ committedCount = replayHandler.getCommitCount();
}
}
@VisibleForTesting
boolean didFastReplay() {
return didFastReplay;
}
+ @VisibleForTesting
+ public int getReadCount() {
+ return readCount;
+ }
+ @VisibleForTesting
+ public int getPutCount() {
+ return putCount;
+ }
+
+ @VisibleForTesting
+ public int getTakeCount() {
+ return takeCount;
+ }
+ @VisibleForTesting
+ public int getCommittedCount() {
+ return committedCount;
+ }
+ @VisibleForTesting
+ public int getRollbackCount() {
+ return rollbackCount;
+ }
+
+ /**
+ * Was a checkpoint backup used to replay?
+ * @return true if a checkpoint backup was used to replay.
+ */
+ @VisibleForTesting
+ boolean backupRestored() {
+ return backupRestored;
+ }
int getNextFileID() {
Preconditions.checkState(open, "Log is closed");
@@ -704,6 +796,13 @@ void close() throws IOException{
} catch (IOException ex) {
LOGGER.warn("Error unlocking " + checkpointDir, ex);
}
+ if (useDualCheckpoints) {
+ try {
+ unlock(backupCheckpointDir);
+ } catch (IOException ex) {
+ LOGGER.warn("Error unlocking " + checkpointDir, ex);
+ }
+ }
for (File logDir : logDirs) {
try {
unlock(logDir);
@@ -942,6 +1041,17 @@ private Boolean writeCheckpoint(Boolean force) throws Exception {
private void removeOldLogs(SortedSet<Integer> fileIDs) {
Preconditions.checkState(open, "Log is closed");
+ // To maintain a single code path for deletes, if backup of checkpoint is
+ // enabled or not, we will track the files which can be deleted after the
+ // current checkpoint (since the one which just got backed up still needs
+ // these files) and delete them only after the next (since the current
+ // checkpoint will become the backup at that time,
+ // and thus these files are no longer needed).
+ for(File fileToDelete : pendingDeletes) {
+ LOGGER.info("Removing old file: " + fileToDelete);
+ FileUtils.deleteQuietly(fileToDelete);
+ }
+ pendingDeletes.clear();
// we will find the smallest fileID currently in use and
// won't delete any files with an id larger than the min
int minFileID = fileIDs.first();
@@ -960,14 +1070,9 @@ private void removeOldLogs(SortedSet<Integer> fileIDs) {
if(reader != null) {
reader.close();
}
- LOGGER.info("Removing old log " + logFile +
- ", result = " + logFile.delete() + ", minFileID "
- + minFileID);
File metaDataFile = Serialization.getMetaDataFile(logFile);
- if(metaDataFile.exists() && !metaDataFile.delete()) {
- LOGGER.warn("Could not remove metadata file "
- + metaDataFile + " for " + logFile);
- }
+ pendingDeletes.add(logFile);
+ pendingDeletes.add(metaDataFile);
}
}
}
View
60 ...e-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -18,6 +18,17 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.flume.channel.file.encryption.CipherProvider;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.apache.flume.tools.DirectMemoryUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
@@ -29,19 +40,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nullable;
-
-import org.apache.flume.channel.file.encryption.CipherProvider;
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.apache.flume.tools.DirectMemoryUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
abstract class LogFile {
private static final Logger LOG = LoggerFactory
@@ -420,6 +418,8 @@ private static void close(RandomAccessFile fileHandle, File file) {
private int logFileID;
private long lastCheckpointPosition;
private long lastCheckpointWriteOrderID;
+ private long backupCheckpointPosition;
+ private long backupCheckpointWriteOrderID;
/**
* Construct a Sequential Log Reader object
@@ -444,6 +444,14 @@ protected void setLastCheckpointPosition(long lastCheckpointPosition) {
protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) {
this.lastCheckpointWriteOrderID = lastCheckpointWriteOrderID;
}
+ protected void setPreviousCheckpointPosition(
+ long backupCheckpointPosition) {
+ this.backupCheckpointPosition = backupCheckpointPosition;
+ }
+ protected void setPreviousCheckpointWriteOrderID(
+ long backupCheckpointWriteOrderID) {
+ this.backupCheckpointWriteOrderID = backupCheckpointWriteOrderID;
+ }
protected void setLogFileID(int logFileID) {
this.logFileID = logFileID;
Preconditions.checkArgument(logFileID >= 0, "LogFileID is not positive: "
@@ -459,18 +467,24 @@ protected RandomAccessFile getFileHandle() {
int getLogFileID() {
return logFileID;
}
+
void skipToLastCheckpointPosition(long checkpointWriteOrderID)
- throws IOException {
- if (lastCheckpointPosition > 0L
- && lastCheckpointWriteOrderID <= checkpointWriteOrderID) {
- LOG.info("fast-forward to checkpoint position: "
- + lastCheckpointPosition);
- fileChannel.position(lastCheckpointPosition);
+ throws IOException {
+ if (lastCheckpointPosition > 0L) {
+ long position = 0;
+ if (lastCheckpointWriteOrderID <= checkpointWriteOrderID) {
+ position = lastCheckpointPosition;
+ } else if (backupCheckpointWriteOrderID <= checkpointWriteOrderID
+ && backupCheckpointPosition > 0) {
+ position = backupCheckpointPosition;
+ }
+ fileChannel.position(position);
+ LOG.info("fast-forward to checkpoint position: " + position);
} else {
- LOG.warn("Checkpoint for file(" + file.getAbsolutePath() + ") "
- + "is: " + lastCheckpointWriteOrderID + ", which is beyond the "
- + "requested checkpoint time: " + checkpointWriteOrderID
- + " and position " + lastCheckpointPosition);
+ LOG.info("Checkpoint for file(" + file.getAbsolutePath() + ") "
+ + "is: " + lastCheckpointWriteOrderID + ", which is beyond the "
+ + "requested checkpoint time: " + checkpointWriteOrderID
+ + " and position " + lastCheckpointPosition);
}
}
View
41 ...ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -18,6 +18,17 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import org.apache.flume.channel.file.encryption.CipherProvider;
+import org.apache.flume.channel.file.encryption.CipherProviderFactory;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.apache.flume.channel.file.proto.ProtosFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
@@ -28,19 +39,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
-import javax.annotation.Nullable;
-
-import org.apache.flume.channel.file.proto.ProtosFactory;
-import org.apache.flume.channel.file.encryption.CipherProvider;
-import org.apache.flume.channel.file.encryption.CipherProviderFactory;
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.GeneratedMessage;
-
/**
* Represents a single data file on disk. Has methods to write,
* read sequentially (replay), and read randomly (channel takes).
@@ -81,9 +79,15 @@ void markCheckpoint(long currentPosition, long logWriteOrderID)
ProtosFactory.LogFileMetaData.newBuilder(logFileMetaData);
metaDataBuilder.setCheckpointPosition(currentPosition);
metaDataBuilder.setCheckpointWriteOrderID(logWriteOrderID);
+ /*
+ * Set the previous checkpoint position and write order id so that it
+ * would be possible to recover from a backup.
+ */
+ metaDataBuilder.setBackupCheckpointPosition(logFileMetaData
+ .getCheckpointPosition());
+ metaDataBuilder.setBackupCheckpointWriteOrderID(logFileMetaData
+ .getCheckpointWriteOrderID());
logFileMetaData = metaDataBuilder.build();
- LOGGER.info("Updating " + metaDataFile.getName() + " currentPosition = "
- + currentPosition + ", logWriteOrderID = " + logWriteOrderID);
writeDelimitedTo(logFileMetaData, metaDataFile);
}
}
@@ -101,7 +105,7 @@ protected MetaDataReader(File logFile, int logFileID) throws IOException {
FileInputStream inputStream = new FileInputStream(metaDataFile);
try {
ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull(
- ProtosFactory.LogFileMetaData.
+ ProtosFactory.LogFileMetaData.
parseDelimitedFrom(inputStream), "Metadata cannot be null");
if (metaData.getLogFileID() != logFileID) {
throw new IOException("The file id of log file: "
@@ -193,6 +197,8 @@ public static void writeDelimitedTo(GeneratedMessage msg, File file)
metaDataBuilder.setLogFileID(logFileID);
metaDataBuilder.setCheckpointPosition(0L);
metaDataBuilder.setCheckpointWriteOrderID(0L);
+ metaDataBuilder.setBackupCheckpointPosition(0L);
+ metaDataBuilder.setBackupCheckpointWriteOrderID(0L);
File metaDataFile = Serialization.getMetaDataFile(file);
writeDelimitedTo(metaDataBuilder.build(), metaDataFile);
}
@@ -322,6 +328,9 @@ protected TransactionEventRecord doGet(RandomAccessFile fileHandle)
setLogFileID(metaData.getLogFileID());
setLastCheckpointPosition(metaData.getCheckpointPosition());
setLastCheckpointWriteOrderID(metaData.getCheckpointWriteOrderID());
+ setPreviousCheckpointPosition(metaData.getBackupCheckpointPosition());
+ setPreviousCheckpointWriteOrderID(
+ metaData.getBackupCheckpointWriteOrderID());
} finally {
try {
inputStream.close();
View
69 ...hannels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -18,6 +18,19 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import org.apache.commons.collections.MultiMap;
+import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
@@ -28,20 +41,6 @@
import java.util.PriorityQueue;
import java.util.Set;
-import javax.annotation.Nullable;
-
-import org.apache.commons.collections.MultiMap;
-import org.apache.commons.collections.map.MultiValueMap;
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
-
/**
* Processes a set of data logs, replaying said logs into the queue.
*/
@@ -69,6 +68,33 @@
* finding the put and commit in logdir2.
*/
private final List<Long> pendingTakes;
+ int readCount = 0;
+ int putCount = 0;
+ int takeCount = 0;
+ int rollbackCount = 0;
+ int commitCount = 0;
+ int skipCount = 0;
+
+ @VisibleForTesting
+ public int getReadCount() {
+ return readCount;
+ }
+ @VisibleForTesting
+ public int getPutCount() {
+ return putCount;
+ }
+ @VisibleForTesting
+ public int getTakeCount() {
+ return takeCount;
+ }
+ @VisibleForTesting
+ public int getCommitCount() {
+ return commitCount;
+ }
+ @VisibleForTesting
+ public int getRollbackCount() {
+ return rollbackCount;
+ }
ReplayHandler(FlumeEventQueue queue,
@Nullable KeyProvider encryptionKeyProvider) {
@@ -110,12 +136,7 @@ void replayLogv1(List<File> logs) throws Exception {
// for puts the fileId is the fileID of the file they exist in
// for takes the fileId and offset are pointers to a put
int fileId = reader.getLogFileID();
- int readCount = 0;
- int putCount = 0;
- int takeCount = 0;
- int rollbackCount = 0;
- int commitCount = 0;
- int skipCount = 0;
+
while ((entry = reader.next()) != null) {
int offset = entry.getOffset();
TransactionEventRecord record = entry.getEvent();
@@ -160,7 +181,7 @@ void replayLogv1(List<File> logs) throws Exception {
}
} else {
Preconditions.checkArgument(false, "Unknown record type: "
- + Integer.toHexString(type));
+ + Integer.toHexString(type));
}
} else {
@@ -255,12 +276,6 @@ void replayLog(List<File> logs) throws Exception {
}
LogRecord entry = null;
FlumeEventPointer ptr = null;
- int readCount = 0;
- int putCount = 0;
- int takeCount = 0;
- int rollbackCount = 0;
- int commitCount = 0;
- int skipCount = 0;
while ((entry = next()) != null) {
// for puts the fileId is the fileID of the file they exist in
// for takes the fileId and offset are pointers to a put
View
105 ...hannels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -18,11 +18,20 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import java.io.BufferedInputStream;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Collections;
+import java.util.Set;
class Serialization {
private Serialization() {}
@@ -38,6 +47,9 @@ private Serialization() {}
static final String METADATA_TMP_FILENAME = ".tmp";
static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old";
+ // 64 K buffer to copy files.
+ private static final int FILE_COPY_BUFFER_SIZE = 64 * 1024;
+
public static final Logger LOG = LoggerFactory.getLogger(Serialization.class);
static File getMetaDataTempFile(File metaDataFile) {
@@ -60,20 +72,39 @@ static File getOldMetaDataFile(File file) {
/**
* Deletes all files in given directory.
* @param checkpointDir - The directory whose files are to be deleted
+ * @param excludes - Names of files which should not be deleted from this
+ * directory.
* @return - true if all files were successfully deleted, false otherwise.
*/
- static boolean deleteAllFiles(File checkpointDir) {
+ static boolean deleteAllFiles(File checkpointDir,
+ @Nullable Set<String> excludes) {
if (!checkpointDir.isDirectory()) {
return false;
}
- StringBuilder builder = new StringBuilder("Deleted the following files from"
- + " the checkpoint directory: ");
+
File[] files = checkpointDir.listFiles();
+ if(files == null) {
+ return false;
+ }
+ StringBuilder builder;
+ if (files.length == 0) {
+ return true;
+ } else {
+ builder = new StringBuilder("Deleted the following files: ");
+ }
+ if(excludes == null) {
+ excludes = Collections.EMPTY_SET;
+ }
for (File file : files) {
+ if(excludes.contains(file.getName())) {
+ LOG.info("Skipping " + file.getName() + " because it is in excludes " +
+ "set");
+ continue;
+ }
if (!FileUtils.deleteQuietly(file)) {
LOG.info(builder.toString());
LOG.error("Error while attempting to delete: " +
- file.getName());
+ file.getAbsolutePath());
return false;
}
builder.append(", ").append(file.getName());
@@ -82,4 +113,70 @@ static boolean deleteAllFiles(File checkpointDir) {
LOG.info(builder.toString());
return true;
}
+
+ /**
+ * Copy a file using a 64K size buffer. This method will copy the file and
+ * then fsync to disk
+ * @param from File to copy - this file should exist
+ * @param to Destination file - this file should not exist
+ * @return true if the copy was successful
+ */
+ static boolean copyFile(File from, File to) throws IOException {
+ Preconditions.checkNotNull(from, "Source file is null, file copy failed.");
+ Preconditions.checkNotNull(to, "Destination file is null, " +
+ "file copy failed.");
+ Preconditions.checkState(from.exists(), "Source file: " + from.toString() +
+ " does not exist.");
+ Preconditions.checkState(!to.exists(), "Destination file: "
+ + to.toString() + " unexpectedly exists.");
+
+ BufferedInputStream in = null;
+ RandomAccessFile out = null; //use a RandomAccessFile for easy fsync
+ try {
+ in = new BufferedInputStream(new FileInputStream(from));
+ out = new RandomAccessFile(to, "rw");
+ byte[] buf = new byte[FILE_COPY_BUFFER_SIZE];
+ int total = 0;
+ while(true) {
+ int read = in.read(buf);
+ if (read == -1) {
+ break;
+ }
+ out.write(buf, 0, read);
+ total += read;
+ }
+ out.getFD().sync();
+ Preconditions.checkState(total == from.length(),
+ "The size of the origin file and destination file are not equal.");
+ return true;
+ } catch (Exception ex) {
+ LOG.error("Error while attempting to copy " + from.toString() + " to "
+ + to.toString() + ".", ex);
+ Throwables.propagate(ex);
+ } finally {
+ Throwable th = null;
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } catch (Throwable ex) {
+ LOG.error("Error while closing input file.", ex);
+ th = ex;
+ }
+ try {
+ if (out != null) {
+ out.close();
+ }
+ } catch (IOException ex) {
+ LOG.error("Error while closing output file.", ex);
+ Throwables.propagate(ex);
+ }
+ if (th != null) {
+ Throwables.propagate(th);
+ }
+ }
+ // Should never reach here.
+ throw new IOException("Copying file: " + from.toString() + " to: " + to
+ .toString() + " may have failed.");
+ }
}
View
146 ...s/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
@@ -1286,6 +1286,14 @@ public Builder clearCount() {
boolean hasEncryption();
org.apache.flume.channel.file.proto.ProtosFactory.LogFileEncryption getEncryption();
org.apache.flume.channel.file.proto.ProtosFactory.LogFileEncryptionOrBuilder getEncryptionOrBuilder();
+
+ // optional sfixed64 backupCheckpointPosition = 6;
+ boolean hasBackupCheckpointPosition();
+ long getBackupCheckpointPosition();
+
+ // optional sfixed64 backupCheckpointWriteOrderID = 7;
+ boolean hasBackupCheckpointWriteOrderID();
+ long getBackupCheckpointWriteOrderID();
}
public static final class LogFileMetaData extends
com.google.protobuf.GeneratedMessage
@@ -1369,12 +1377,34 @@ public boolean hasEncryption() {
return encryption_;
}
+ // optional sfixed64 backupCheckpointPosition = 6;
+ public static final int BACKUPCHECKPOINTPOSITION_FIELD_NUMBER = 6;
+ private long backupCheckpointPosition_;
+ public boolean hasBackupCheckpointPosition() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public long getBackupCheckpointPosition() {
+ return backupCheckpointPosition_;
+ }
+
+ // optional sfixed64 backupCheckpointWriteOrderID = 7;
+ public static final int BACKUPCHECKPOINTWRITEORDERID_FIELD_NUMBER = 7;
+ private long backupCheckpointWriteOrderID_;
+ public boolean hasBackupCheckpointWriteOrderID() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ public long getBackupCheckpointWriteOrderID() {
+ return backupCheckpointWriteOrderID_;
+ }
+
private void initFields() {
version_ = 0;
logFileID_ = 0;
checkpointPosition_ = 0L;
checkpointWriteOrderID_ = 0L;
encryption_ = org.apache.flume.channel.file.proto.ProtosFactory.LogFileEncryption.getDefaultInstance();
+ backupCheckpointPosition_ = 0L;
+ backupCheckpointWriteOrderID_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -1425,6 +1455,12 @@ public void writeTo(com.google.protobuf.CodedOutputStream output)
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeMessage(5, encryption_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeSFixed64(6, backupCheckpointPosition_);
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ output.writeSFixed64(7, backupCheckpointWriteOrderID_);
+ }
getUnknownFields().writeTo(output);
}
@@ -1454,6 +1490,14 @@ public int getSerializedSize() {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, encryption_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeSFixed64Size(6, backupCheckpointPosition_);
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeSFixed64Size(7, backupCheckpointWriteOrderID_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -1593,6 +1637,10 @@ public Builder clear() {
encryptionBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000010);
+ backupCheckpointPosition_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000020);
+ backupCheckpointWriteOrderID_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -1655,6 +1703,14 @@ public Builder clone() {
} else {
result.encryption_ = encryptionBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.backupCheckpointPosition_ = backupCheckpointPosition_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000040;
+ }
+ result.backupCheckpointWriteOrderID_ = backupCheckpointWriteOrderID_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -1686,6 +1742,12 @@ public Builder mergeFrom(org.apache.flume.channel.file.proto.ProtosFactory.LogFi
if (other.hasEncryption()) {
mergeEncryption(other.getEncryption());
}
+ if (other.hasBackupCheckpointPosition()) {
+ setBackupCheckpointPosition(other.getBackupCheckpointPosition());
+ }
+ if (other.hasBackupCheckpointWriteOrderID()) {
+ setBackupCheckpointWriteOrderID(other.getBackupCheckpointWriteOrderID());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -1768,6 +1830,16 @@ public Builder mergeFrom(
setEncryption(subBuilder.buildPartial());
break;
}
+ case 49: {
+ bitField0_ |= 0x00000020;
+ backupCheckpointPosition_ = input.readSFixed64();
+ break;
+ }
+ case 57: {
+ bitField0_ |= 0x00000040;
+ backupCheckpointWriteOrderID_ = input.readSFixed64();
+ break;
+ }
}
}
}
@@ -1948,6 +2020,48 @@ public Builder clearEncryption() {
return encryptionBuilder_;
}
+ // optional sfixed64 backupCheckpointPosition = 6;
+ private long backupCheckpointPosition_ ;
+ public boolean hasBackupCheckpointPosition() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public long getBackupCheckpointPosition() {
+ return backupCheckpointPosition_;
+ }
+ public Builder setBackupCheckpointPosition(long value) {
+ bitField0_ |= 0x00000020;
+ backupCheckpointPosition_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearBackupCheckpointPosition() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ backupCheckpointPosition_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // optional sfixed64 backupCheckpointWriteOrderID = 7;
+ private long backupCheckpointWriteOrderID_ ;
+ public boolean hasBackupCheckpointWriteOrderID() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ public long getBackupCheckpointWriteOrderID() {
+ return backupCheckpointWriteOrderID_;
+ }
+ public Builder setBackupCheckpointWriteOrderID(long value) {
+ bitField0_ |= 0x00000040;
+ backupCheckpointWriteOrderID_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearBackupCheckpointWriteOrderID() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ backupCheckpointWriteOrderID_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:LogFileMetaData)
}
@@ -5921,23 +6035,25 @@ void setValue(com.google.protobuf.ByteString value) {
"sion\030\001 \002(\017\022\024\n\014writeOrderID\030\002 \002(\020\022\021\n\tqueu" +
"eSize\030\003 \002(\017\022\021\n\tqueueHead\030\004 \002(\017\022\036\n\nactive" +
"Logs\030\005 \003(\0132\n.ActiveLog\"-\n\tActiveLog\022\021\n\tl" +
- "ogFileID\030\001 \002(\017\022\r\n\005count\030\002 \002(\017\"\231\001\n\017LogFil" +
+ "ogFileID\030\001 \002(\017\022\r\n\005count\030\002 \002(\017\"\341\001\n\017LogFil" +
"eMetaData\022\017\n\007version\030\001 \002(\017\022\021\n\tlogFileID\030" +
"\002 \002(\017\022\032\n\022checkpointPosition\030\003 \002(\020\022\036\n\026che" +
"ckpointWriteOrderID\030\004 \002(\020\022&\n\nencryption\030" +
- "\005 \001(\0132\022.LogFileEncryption\"Q\n\021LogFileEncr" +
- "yption\022\026\n\016cipherProvider\030\001 \002(\t\022\020\n\010keyAli",
- "as\030\002 \002(\t\022\022\n\nparameters\030\003 \001(\014\"S\n\026Transact" +
- "ionEventHeader\022\014\n\004type\030\001 \002(\017\022\025\n\rtransact" +
- "ionID\030\002 \002(\020\022\024\n\014writeOrderID\030\003 \002(\020\"!\n\003Put" +
- "\022\032\n\005event\030\001 \002(\0132\013.FlumeEvent\"&\n\004Take\022\016\n\006" +
- "fileID\030\001 \002(\017\022\016\n\006offset\030\002 \002(\017\"\n\n\010Rollback" +
- "\"\026\n\006Commit\022\014\n\004type\030\001 \002(\017\"\030\n\026TransactionE" +
- "ventFooter\">\n\nFlumeEvent\022\"\n\007headers\030\001 \003(" +
- "\0132\021.FlumeEventHeader\022\014\n\004body\030\002 \002(\014\".\n\020Fl" +
- "umeEventHeader\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002" +
- "(\tB4\n#org.apache.flume.channel.file.prot",
- "oB\rProtosFactory"
+ "\005 \001(\0132\022.LogFileEncryption\022 \n\030backupCheck" +
+ "pointPosition\030\006 \001(\020\022$\n\034backupCheckpointW",
+ "riteOrderID\030\007 \001(\020\"Q\n\021LogFileEncryption\022\026" +
+ "\n\016cipherProvider\030\001 \002(\t\022\020\n\010keyAlias\030\002 \002(\t" +
+ "\022\022\n\nparameters\030\003 \001(\014\"S\n\026TransactionEvent" +
+ "Header\022\014\n\004type\030\001 \002(\017\022\025\n\rtransactionID\030\002 " +
+ "\002(\020\022\024\n\014writeOrderID\030\003 \002(\020\"!\n\003Put\022\032\n\005even" +
+ "t\030\001 \002(\0132\013.FlumeEvent\"&\n\004Take\022\016\n\006fileID\030\001" +
+ " \002(\017\022\016\n\006offset\030\002 \002(\017\"\n\n\010Rollback\"\026\n\006Comm" +
+ "it\022\014\n\004type\030\001 \002(\017\"\030\n\026TransactionEventFoot" +
+ "er\">\n\nFlumeEvent\022\"\n\007headers\030\001 \003(\0132\021.Flum" +
+ "eEventHeader\022\014\n\004body\030\002 \002(\014\".\n\020FlumeEvent",
+ "Header\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\tB4\n#or" +
+ "g.apache.flume.channel.file.protoB\rProto" +
+ "sFactory"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5965,7 +6081,7 @@ void setValue(com.google.protobuf.ByteString value) {
internal_static_LogFileMetaData_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_LogFileMetaData_descriptor,
- new java.lang.String[] { "Version", "LogFileID", "CheckpointPosition", "CheckpointWriteOrderID", "Encryption", },
+ new java.lang.String[] { "Version", "LogFileID", "CheckpointPosition", "CheckpointWriteOrderID", "Encryption", "BackupCheckpointPosition", "BackupCheckpointWriteOrderID", },
org.apache.flume.channel.file.proto.ProtosFactory.LogFileMetaData.class,
org.apache.flume.channel.file.proto.ProtosFactory.LogFileMetaData.Builder.class);
internal_static_LogFileEncryption_descriptor =
View
2 flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
@@ -38,6 +38,8 @@ message LogFileMetaData {
required sfixed64 checkpointPosition = 3;
required sfixed64 checkpointWriteOrderID = 4;
optional LogFileEncryption encryption = 5;
+ optional sfixed64 backupCheckpointPosition = 6;
+ optional sfixed64 backupCheckpointWriteOrderID = 7;
}
message LogFileEncryption {
View
7 ...s/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
@@ -37,12 +37,15 @@
protected File checkpointDir;
protected File[] dataDirs;
protected String dataDir;
+ protected File backupDir;
@Before
public void setup() throws Exception {
baseDir = Files.createTempDir();
checkpointDir = new File(baseDir, "chkpt");
+ backupDir = new File(baseDir, "backup");
Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory());
+ Assert.assertTrue(backupDir.mkdirs() || backupDir.isDirectory());
dataDirs = new File[3];
dataDir = "";
for (int i = 0; i < dataDirs.length; i++) {
@@ -68,7 +71,7 @@ protected Context createContext() {
protected Context createContext(Map<String, String> overrides) {
return TestUtils.createFileChannelContext(checkpointDir.getAbsolutePath(),
- dataDir, overrides);
+ dataDir, backupDir.getAbsolutePath(), overrides);
}
protected FileChannel createFileChannel() {
@@ -77,6 +80,6 @@ protected FileChannel createFileChannel() {
protected FileChannel createFileChannel(Map<String, String> overrides) {
return TestUtils.createFileChannel(checkpointDir.getAbsolutePath(),
- dataDir, overrides);
+ dataDir, backupDir.getAbsolutePath(), overrides);
}
}
View
356 ...lume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -18,25 +18,37 @@
*/
package org.apache.flume.channel.file;
-import static org.apache.flume.channel.file.TestUtils.*;
-
-import java.io.File;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.proto.ProtosFactory;
+import org.fest.reflect.exception.ReflectionError;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
+import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.Map;
import java.util.Random;
-import org.apache.flume.channel.file.proto.ProtosFactory;
+import java.util.Set;
+
+import static org.apache.flume.channel.file.TestUtils.compareInputAndOut;
+import static org.apache.flume.channel.file.TestUtils.consumeChannel;
+import static org.apache.flume.channel.file.TestUtils.fillChannel;
+import static org.apache.flume.channel.file.TestUtils.forceCheckpoint;
+import static org.apache.flume.channel.file.TestUtils.putEvents;
+import static org.apache.flume.channel.file.TestUtils.takeEvents;
+import static org.fest.reflect.core.Reflection.*;
public class TestFileChannelRestart extends TestFileChannelBase {
protected static final Logger LOG = LoggerFactory
@@ -119,16 +131,32 @@ public void doTestRestart(boolean useLogReplayV1,
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
+
+ @Test
+ public void testRestartWhenMetaDataExistsButCheckpointDoesNot() throws
+ Exception {
+ doTestRestartWhenMetaDataExistsButCheckpointDoesNot(false);
+ }
+
@Test
- public void testRestartWhenMetaDataExistsButCheckpointDoesNot()
+ public void testRestartWhenMetaDataExistsButCheckpointDoesNotWithBackup()
throws Exception {
+ doTestRestartWhenMetaDataExistsButCheckpointDoesNot(true);
+ }
+
+ private void doTestRestartWhenMetaDataExistsButCheckpointDoesNot(
+ boolean backup) throws Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
Assert.assertTrue(checkpoint.delete());
@@ -139,19 +167,36 @@ public void testRestartWhenMetaDataExistsButCheckpointDoesNot()
Assert.assertTrue(channel.isOpen());
Assert.assertTrue(checkpoint.exists());
Assert.assertTrue(checkpointMetaData.exists());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
+
+ @Test
+ public void testRestartWhenCheckpointExistsButMetaDoesNot() throws Exception{
+ doTestRestartWhenCheckpointExistsButMetaDoesNot(false);
+ }
+
@Test
- public void testRestartWhenCheckpointExistsButMetaDoesNot()
+ public void testRestartWhenCheckpointExistsButMetaDoesNotWithBackup() throws
+ Exception{
+ doTestRestartWhenCheckpointExistsButMetaDoesNot(true);
+ }
+
+
+ private void doTestRestartWhenCheckpointExistsButMetaDoesNot(boolean backup)
throws Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
@@ -162,19 +207,34 @@ public void testRestartWhenCheckpointExistsButMetaDoesNot()
Assert.assertTrue(channel.isOpen());
Assert.assertTrue(checkpoint.exists());
Assert.assertTrue(checkpointMetaData.exists());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
public void testRestartWhenNoCheckpointExists() throws Exception {
+ doTestRestartWhenNoCheckpointExists(false);
+ }
+
+ @Test
+ public void testRestartWhenNoCheckpointExistsWithBackup() throws Exception {
+ doTestRestartWhenNoCheckpointExists(true);
+ }
+
+ private void doTestRestartWhenNoCheckpointExists(boolean backup) throws
+ Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
@@ -185,19 +245,33 @@ public void testRestartWhenNoCheckpointExists() throws Exception {
Assert.assertTrue(channel.isOpen());
Assert.assertTrue(checkpoint.exists());
Assert.assertTrue(checkpointMetaData.exists());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
- public void testBadCheckpointVersion() throws Exception{
+ public void testBadCheckpointVersion() throws Exception {
+ doTestBadCheckpointVersion(false);
+ }
+
+ @Test
+ public void testBadCheckpointVersionWithBackup() throws Exception {
+ doTestBadCheckpointVersion(true);
+ }
+
+ private void doTestBadCheckpointVersion(boolean backup) throws Exception{
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
@@ -209,19 +283,34 @@ public void testBadCheckpointVersion() throws Exception{
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
public void testBadCheckpointMetaVersion() throws Exception {
+ doTestBadCheckpointMetaVersion(false);
+ }
+
+ @Test
+ public void testBadCheckpointMetaVersionWithBackup() throws Exception {
+ doTestBadCheckpointMetaVersion(true);
+ }
+
+ private void doTestBadCheckpointMetaVersion(boolean backup) throws
+ Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint));
@@ -235,19 +324,35 @@ public void testBadCheckpointMetaVersion() throws Exception {
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
public void testDifferingOrderIDCheckpointAndMetaVersion() throws Exception {
+ doTestDifferingOrderIDCheckpointAndMetaVersion(false);
+ }
+
+ @Test
+ public void testDifferingOrderIDCheckpointAndMetaVersionWithBackup() throws
+ Exception {
+ doTestDifferingOrderIDCheckpointAndMetaVersion(true);
+ }
+
+ private void doTestDifferingOrderIDCheckpointAndMetaVersion(boolean backup)
+ throws Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint));
@@ -261,19 +366,33 @@ public void testDifferingOrderIDCheckpointAndMetaVersion() throws Exception {
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
- public void testIncompleteCheckpoint() throws Exception {
+ public void testIncompleteCheckpoint() throws Exception{
+ doTestIncompleteCheckpoint(false);
+ }
+
+ @Test
+ public void testIncompleteCheckpointWithCheckpoint() throws Exception{
+ doTestIncompleteCheckpoint(true);
+ }
+
+ private void doTestIncompleteCheckpoint(boolean backup) throws Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
@@ -285,18 +404,29 @@ public void testIncompleteCheckpoint() throws Exception {
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
public void testCorruptInflightPuts() throws Exception {
- testCorruptInflights("inflightPuts");