Skip to content
Permalink
Browse files
FLUME-1586: File Channel should support verifying integrity of indivi…
…dual events.

(Hari Shreedharan via Brock Noland)
  • Loading branch information
Brock Noland committed Jun 13, 2013
1 parent 791f443 commit c57ebd1d2296fd1c0c8f84eecb16799de2a73e35
Showing 28 changed files with 1,765 additions and 619 deletions.
@@ -26,6 +26,7 @@
FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"

CLEAN_FLAG=1
################################
@@ -261,6 +262,9 @@ case "$mode" in
avro-client)
opt_avro_client=1
;;
tool)
opt_tool=1
;;
version)
opt_version=1
CLEAN_FLAG=0
@@ -433,6 +437,8 @@ elif [ -n "$opt_avro_client" ] ; then
run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
run_flume $FLUME_TOOLS_CLASS $args
else
error "This message should never appear" 1
fi
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.channel.file;


public class CorruptEventException extends Exception {

private static final long serialVersionUID = -2986946303540798416L;
public CorruptEventException() {
super();
}

public CorruptEventException(String msg) {
super(msg);
}

public CorruptEventException(String msg, Throwable th) {
super(msg, th);
}
}
@@ -100,6 +100,7 @@ public class FileChannel extends BasicChannelSemantics {
private String encryptionActiveKey;
private String encryptionCipherProvider;
private boolean useDualCheckpoints;
private boolean isTest = false;

@Override
public synchronized void setName(String name) {
@@ -451,6 +452,7 @@ private boolean isClosed() {
private String getStateAsString() {
return String.valueOf(getState());
}

@Override
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
@@ -511,23 +513,40 @@ protected Event doTake() throws InterruptedException {
+ "log. Try increasing the log write timeout value. " +
channelNameDescriptor);
}

/*
* 1. Take an event which is in the queue.
* 2. If getting that event does not throw NoopRecordException,
* then return it.
* 3. Else try to retrieve the next event from the queue
* 4. Repeat 2 and 3 until queue is empty or an event is returned.
*/

try {
FlumeEventPointer ptr = queue.removeHead(transactionID);
if(ptr != null) {
try {
// first add to takeList so that if write to disk
// fails rollback actually does it's work
Preconditions.checkState(takeList.offer(ptr), "takeList offer failed "
+ channelNameDescriptor);
log.take(transactionID, ptr); // write take to disk
Event event = log.get(ptr);
return event;
} catch (IOException e) {
throw new ChannelException("Take failed due to IO error "
+ channelNameDescriptor, e);
while (true) {
FlumeEventPointer ptr = queue.removeHead(transactionID);
if (ptr == null) {
return null;
} else {
try {
// first add to takeList so that if write to disk
// fails rollback actually does it's work
Preconditions.checkState(takeList.offer(ptr),
"takeList offer failed "
+ channelNameDescriptor);
log.take(transactionID, ptr); // write take to disk
Event event = log.get(ptr);
return event;
} catch (IOException e) {
throw new ChannelException("Take failed due to IO error "
+ channelNameDescriptor, e);
} catch (NoopRecordException e) {
LOG.warn("Corrupt record replaced by File Channel Integrity " +
"tool found. Will retrieve next event", e);
takeList.remove(ptr);
}
}
}
return null;
} finally {
log.unlockShared();
}
@@ -73,7 +73,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class Log {
public class Log {
public static final String PREFIX = "log-";
private static final Logger LOGGER = LoggerFactory.getLogger(Log.class);
private static final int MIN_NUM_LOGS = 2;
@@ -571,12 +571,18 @@ FlumeEventQueue getFlumeEventQueue() {
* @throws InterruptedException
*/
FlumeEvent get(FlumeEventPointer pointer) throws IOException,
InterruptedException {
InterruptedException, NoopRecordException {
Preconditions.checkState(open, "Log is closed");
int id = pointer.getFileID();
LogFile.RandomReader logFile = idLogFileMap.get(id);
Preconditions.checkNotNull(logFile, "LogFile is null for id " + id);
return logFile.get(pointer.getOffset());
try {
return logFile.get(pointer.getOffset());
} catch (CorruptEventException ex) {
open = false;
throw new IOException("Corrupt event found. Please run File Channel " +
"Integrity tool.", ex);
}
}

/**
@@ -22,6 +22,9 @@
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.flume.ChannelException;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.channel.file.encryption.CipherProvider;
import org.apache.flume.channel.file.encryption.KeyProvider;
import org.apache.flume.tools.DirectMemoryUtils;
@@ -31,6 +34,7 @@
import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
@@ -40,7 +44,9 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

abstract class LogFile {
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class LogFile {

private static final Logger LOG = LoggerFactory
.getLogger(LogFile.class);
@@ -54,15 +60,23 @@ abstract class LogFile {
private static final ByteBuffer FILL = DirectMemoryUtils.
allocate(1024 * 1024); // preallocation, 1MB

protected static final byte OP_RECORD = Byte.MAX_VALUE;
protected static final byte OP_EOF = Byte.MIN_VALUE;
public static final byte OP_RECORD = Byte.MAX_VALUE;
public static final byte OP_NOOP = (Byte.MAX_VALUE + Byte.MIN_VALUE)/2;
public static final byte OP_EOF = Byte.MIN_VALUE;

static {
for (int i = 0; i < FILL.capacity(); i++) {
FILL.put(OP_EOF);
}
}

protected static void skipRecord(RandomAccessFile fileHandle,
int offset) throws IOException {
fileHandle.seek(offset);
int length = fileHandle.readInt();
fileHandle.skipBytes(length);
}

abstract static class MetaDataWriter {
private final File file;
private final int logFileID;
@@ -296,6 +310,48 @@ protected void preallocate(int size) throws IOException {
}
}

/**
* This is an class meant to be an internal Flume API,
* and can change at any time. Intended to be used only from File Channel Integrity
* test tool. Not to be used for any other purpose.
*/
public static class OperationRecordUpdater {
private final RandomAccessFile fileHandle;
private final File file;

public OperationRecordUpdater(File file) throws FileNotFoundException {
Preconditions.checkState(file.exists(), "File to update, " +
file.toString() + " does not exist.");
this.file = file;
fileHandle = new RandomAccessFile(file, "rw");
}

public void markRecordAsNoop(long offset) throws IOException {
// First ensure that the offset actually is an OP_RECORD. There is a
// small possibility that it still is OP_RECORD,
// but is not actually the beginning of a record. Is there anything we
// can do about it?
fileHandle.seek(offset);
byte byteRead = fileHandle.readByte();
Preconditions.checkState(byteRead == OP_RECORD || byteRead == OP_NOOP,
"Expected to read a record but the byte read indicates EOF");
fileHandle.seek(offset);
LOG.info("Marking event as " + OP_NOOP + " at " + offset + " for file " +
file.toString());
fileHandle.writeByte(OP_NOOP);
}

public void close() {
try {
fileHandle.getFD().sync();
fileHandle.close();
} catch (IOException e) {
LOG.error("Could not close file handle to file " +
fileHandle.toString(), e);
}
}
}

static abstract class RandomReader {
private final File file;
private final BlockingQueue<RandomAccessFile> readFileHandles =
@@ -311,7 +367,7 @@ public RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider)
}

protected abstract TransactionEventRecord doGet(RandomAccessFile fileHandle)
throws IOException;
throws IOException, CorruptEventException;

abstract int getVersion();

@@ -323,13 +379,18 @@ protected KeyProvider getKeyProvider() {
return encryptionKeyProvider;
}

FlumeEvent get(int offset) throws IOException, InterruptedException {
FlumeEvent get(int offset) throws IOException, InterruptedException,
CorruptEventException, NoopRecordException {
Preconditions.checkState(open, "File closed");
RandomAccessFile fileHandle = checkOut();
boolean error = true;
try {
fileHandle.seek(offset);
byte operation = fileHandle.readByte();
if(operation == OP_NOOP) {
throw new NoopRecordException("No op record found. Corrupt record " +
"may have been repaired by File Channel Integrity tool");
}
Preconditions.checkState(operation == OP_RECORD,
Integer.toHexString(operation));
TransactionEventRecord record = doGet(fileHandle);
@@ -408,7 +469,7 @@ private static void close(RandomAccessFile fileHandle, File file) {
}
}

static abstract class SequentialReader {
public static abstract class SequentialReader {

private final RandomAccessFile fileHandle;
private final FileChannel fileChannel;
@@ -434,7 +495,7 @@ static abstract class SequentialReader {
fileHandle = new RandomAccessFile(file, "r");
fileChannel = fileHandle.getChannel();
}
abstract LogRecord doNext(int offset) throws IOException;
abstract LogRecord doNext(int offset) throws IOException, CorruptEventException;

abstract int getVersion();

@@ -488,7 +549,7 @@ void skipToLastCheckpointPosition(long checkpointWriteOrderID)
}
}

LogRecord next() throws IOException {
public LogRecord next() throws IOException, CorruptEventException {
int offset = -1;
try {
long position = fileChannel.position();
@@ -499,14 +560,26 @@ LogRecord next() throws IOException {
}
offset = (int) position;
Preconditions.checkState(offset >= 0);
byte operation = fileHandle.readByte();
if(operation != OP_RECORD) {
if(operation == OP_EOF) {
while (offset < fileHandle.length()) {
byte operation = fileHandle.readByte();
if (operation == OP_RECORD) {
break;
} else if (operation == OP_EOF) {
LOG.info("Encountered EOF at " + offset + " in " + file);
return null;
} else if (operation == OP_NOOP) {
LOG.info("No op event found in file: " + file.toString() +
" at " + offset + ". Skipping event.");
skipRecord(fileHandle, offset + 1);
offset = (int) fileHandle.getFilePointer();
continue;
} else {
LOG.error("Encountered non op-record at " + offset + " " +
Integer.toHexString(operation) + " in " + file);
Integer.toHexString(operation) + " in " + file);
return null;
}
}
if(offset >= fileHandle.length()) {
return null;
}
return doNext(offset);
@@ -518,7 +591,10 @@ LogRecord next() throws IOException {
}
}

void close() {
public long getPosition() throws IOException {
return fileChannel.position();
}
public void close() {
if(fileHandle != null) {
try {
fileHandle.close();
@@ -540,7 +616,7 @@ protected static byte[] readDelimitedBuffer(RandomAccessFile fileHandle)
return buffer;
}

public static void main(String[] args) throws EOFException, IOException {
public static void main(String[] args) throws EOFException, IOException, CorruptEventException {
File file = new File(args[0]);
LogFile.SequentialReader reader = null;
try {

0 comments on commit c57ebd1

Please sign in to comment.