Skip to content

Commit

Permalink
Integrating the PR Comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed May 31, 2016
1 parent 974a37f commit fb9c949
Show file tree
Hide file tree
Showing 18 changed files with 670 additions and 397 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType

private transient long lastSync;

private transient FileInputSplit restoredSplit;

private transient Tuple2<Long, Long> restoredState;

public AvroInputFormat(Path filePath, Class<E> type) {
super(filePath);
this.avroValueType = type;
Expand Down Expand Up @@ -113,54 +109,35 @@ public TypeInformation<E> getProducedType() {
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
dataFileReader = initReader(split);
dataFileReader.sync(split.getStart());
lastSync = dataFileReader.previousSync();
}

private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
DatumReader<E> datumReader;

if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
datumReader = new GenericDatumReader<E>();
} else {
datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
}

if (LOG.isInfoEnabled()) {
LOG.info("Opening split {}", split);
}

this.currSplit = split;
SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);

if (LOG.isDebugEnabled()) {
LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
}

end = split.getStart() + split.getLength();
recordsReadSinceLastSync = 0;

if(this.restoredSplit == null) {
dataFileReader.sync(split.getStart());
lastSync = dataFileReader.previousSync();
} else {

if (!this.restoredSplit.equals(split)) {
throw new RuntimeException("Tried to open at the wrong split after recovery.");
}

// go to the block we stopped
currSplit = this.restoredSplit;
lastSync = this.restoredState.f0;
dataFileReader.seek(lastSync);

// read until the record we were before the checkpoint
// and discard the values
for(int i = 0; i < this.restoredState.f1; i++) {
dataFileReader.next(null);
recordsReadSinceLastSync++;
}
}
this.restoredSplit = null;
this.restoredState = null;
return dataFileReader;
}

@Override
Expand Down Expand Up @@ -202,7 +179,7 @@ public E nextRecord(E reuseValue) throws IOException {
// --------------------------------------------------------------------------------------------

@Override
public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentChannelState() throws IOException {
public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentState() throws IOException {
if (this.reachedEnd()) {
return new Tuple2<>(null, new Tuple2<>(0L, 0L));
}
Expand All @@ -212,8 +189,26 @@ public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentChannelState() throw
}

@Override
public void restore(FileInputSplit split, Tuple2<Long, Long> state) {
this.restoredSplit = split;
this.restoredState = state;
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
if (split == null) {
throw new RuntimeException("Called reopen() on a null split.");
}

this.open(split);
if (state != null) {
dataFileReader = initReader(split);

// go to the block we stopped
currSplit = split;
lastSync = state.f0;
dataFileReader.seek(lastSync);

// read until the record we were before the checkpoint and discard the values
long recordsToDiscard = state.f1;
for(int i = 0; i < recordsToDiscard; i++) {
dataFileReader.next(null);
recordsReadSinceLastSync++;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,14 @@ public void testAvroRecovery() throws Exception {
if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {

// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
Tuple2<FileInputSplit, Tuple2<Long, Long>> state = format.getCurrentChannelState();
Tuple2<FileInputSplit, Tuple2<Long, Long>> state = format.getCurrentState();

// this is to make sure that nothing stays from the previous format
// (as it is going to be in the normal case)
format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
Assert.assertEquals(splits[i], state.f0);

format.restore(state.f0, state.f1);
format.open(state.f0);
format.reopen(state.f0, state.f1);
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
}
elementsPerSplit[i]++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T>

private transient FileInputSplit currSplit = null;

private transient FileInputSplit restoredSplit;

private transient Tuple2<Long, Long> restoredState;

/**
* The number of records already read from the block.
* This is used to decide if the end of the block has been
Expand Down Expand Up @@ -208,6 +204,20 @@ public BlockInfo createBlockInfo() {
return new BlockInfo();
}

private BlockInfo createAndReadBlockInfo() throws IOException {
BlockInfo blockInfo = new BlockInfo();
if (this.splitLength > blockInfo.getInfoSize()) {
// At first we go and read the block info containing the recordCount, the accumulatedRecordCount
// and the firstRecordStart offset in the current block. This is written at the end of the block and
// is of fixed size, currently 3 * Long.SIZE.

// TODO: seek not supported by compressed streams. Will throw exception
this.stream.seek(this.splitStart + this.splitLength - blockInfo.getInfoSize());
blockInfo.read(new DataInputViewStreamWrapper(this.stream));
}
return blockInfo;
}

/**
* Fill in the statistics. The last modification time and the total input size are prefilled.
*
Expand All @@ -222,8 +232,7 @@ protected SequentialStatistics createStatistics(List<FileStatus> files, FileBase
return null;
}

BlockInfo blockInfo = this.createBlockInfo();

BlockInfo blockInfo = new BlockInfo();
long totalCount = 0;
for (FileStatus file : files) {
// invalid file
Expand Down Expand Up @@ -265,44 +274,16 @@ public void open(FileInputSplit split) throws IOException {
super.open(split);

this.currSplit = split;
this.blockInfo = this.createBlockInfo();
if (this.splitLength > this.blockInfo.getInfoSize()) {
// At first we go and read the block info containing the recordCount, the accumulatedRecordCount
// and the firstRecordStart offset in the current block. This is written at the end of the block and
// is of fixed size, currently 3 * Long.SIZE.

// TODO: seek not supported by compressed streams. Will throw exception
this.stream.seek(this.splitStart + this.splitLength - this.blockInfo.getInfoSize());
this.blockInfo.read(new DataInputViewStreamWrapper(this.stream));
}
this.blockInfo = this.createAndReadBlockInfo();

// We set the size of the BlockBasedInput to splitLength as each split contains one block.
// After reading the block info, we seek in the file to the correct position.

if(this.restoredSplit == null) {
this.readRecords = 0;
this.stream.seek(this.splitStart + this.blockInfo.getFirstRecordStart());
this.blockBasedInput = new BlockBasedInput(this.stream,
(int) blockInfo.getFirstRecordStart(), this.splitLength);
} else {

if (!this.restoredSplit.equals(split)) {
throw new RuntimeException("Tried to open at the wrong split after recovery.");
}

// go to the block we stopped
this.currSplit = this.restoredSplit;

long blockPos = this.restoredState.f0;
this.readRecords = this.restoredState.f1;

this.stream.seek(this.splitStart + blockPos);
this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
}
this.readRecords = 0;
this.stream.seek(this.splitStart + this.blockInfo.getFirstRecordStart());
this.blockBasedInput = new BlockBasedInput(this.stream,
(int) blockInfo.getFirstRecordStart(), this.splitLength);
this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
this.restoredSplit = null;
this.restoredState = null;

}

@Override
Expand All @@ -326,12 +307,18 @@ record = this.deserialize(record, this.dataInputStream);
* Reads the content of a block of data. The block contains its {@link BlockInfo}
* at the end, and this method takes this into account when reading the data.
*/
private class BlockBasedInput extends FilterInputStream {
protected class BlockBasedInput extends FilterInputStream {
private final int maxPayloadSize;

private int blockPos;

BlockBasedInput(FSDataInputStream in, int startPos, long length) {
public BlockBasedInput(FSDataInputStream in, int blockSize) {
super(in);
this.blockPos = (int) BinaryInputFormat.this.blockInfo.getFirstRecordStart();
this.maxPayloadSize = blockSize - BinaryInputFormat.this.blockInfo.getInfoSize();
}

public BlockBasedInput(FSDataInputStream in, int startPos, long length) {
super(in);
this.blockPos = startPos;
this.maxPayloadSize = (int) (length - BinaryInputFormat.this.blockInfo.getInfoSize());
Expand Down Expand Up @@ -389,7 +376,7 @@ public int read(byte[] b, int off, int len) throws IOException {
// --------------------------------------------------------------------------------------------

@Override
public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentChannelState() throws IOException {
public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentState() throws IOException {
if (this.reachedEnd()) {
return new Tuple2<>(null, new Tuple2<>(0L, 0L));
}
Expand All @@ -402,8 +389,22 @@ public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentChannelState() throw
}

@Override
public void restore(FileInputSplit split, Tuple2<Long, Long> state) {
this.restoredSplit = split;
this.restoredState = state;
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
if (split == null) {
throw new RuntimeException("Called reopen() on a null split.");
}

this.open(split);
if (state != null) {
this.currSplit = split;
this.blockInfo = this.createAndReadBlockInfo();

long blockPos = state.f0;
this.readRecords = state.f1;

this.stream.seek(this.splitStart + blockPos);
this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public interface CheckpointableInputFormat<S extends InputSplit, T extends Seria
*
* @throws Exception Thrown if the creation of the state object failed.
*/
Tuple2<S, T> getCurrentChannelState() throws IOException;
Tuple2<S, T> getCurrentState() throws IOException;

/**
* Restores the state of a parallel instance reading from an {@link InputFormat}.
Expand All @@ -56,5 +56,5 @@ public interface CheckpointableInputFormat<S extends InputSplit, T extends Seria
* @param state The state from which to start from. This can contain the offset,
* but also other data, depending on the input format.
*/
void restore(S split, T state);
void reopen(S split, T state) throws IOException;
}

0 comments on commit fb9c949

Please sign in to comment.