Skip to content

Commit

Permalink
[FLINK-3889] Refactor File Monitoring Source
Browse files Browse the repository at this point in the history
This is meant to replace the different file
reading sources in Flink streaming. Now there is
one monitoring source with DOP 1 monitoring a
directory and assigning input split to downstream
readers.

In addition, it makes the new features added by
FLINK-3717 work together with the aforementioned entities
(the monitor and the readers) in order to have
fault tolerant file sources and exactly once guarantees.

This does not replace the old API calls. This
will be done in a future commit.
  • Loading branch information
kl0u committed May 17, 2016
1 parent c9682b7 commit c2c18e3
Show file tree
Hide file tree
Showing 19 changed files with 1,912 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void open(FileInputSplit split) throws IOException {
end = split.getStart() + split.getLength();
recordsReadSinceLastSync = 0;

if(this.restoredState == null) {
if(this.restoredSplit == null) {
dataFileReader.sync(split.getStart());
lastSync = dataFileReader.previousSync();
} else {
Expand Down Expand Up @@ -203,12 +203,16 @@ public E nextRecord(E reuseValue) throws IOException {

@Override
public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentChannelState() throws IOException {
if (this.reachedEnd()) {
return new Tuple2<>(null, new Tuple2<>(0L, 0L));
}

Tuple2<Long, Long> state = new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
return new Tuple2<>(currSplit, state);
}

@Override
public void restore(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
public void restore(FileInputSplit split, Tuple2<Long, Long> state) {
this.restoredSplit = split;
this.restoredState = state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public void open(FileInputSplit split) throws IOException {
// We set the size of the BlockBasedInput to splitLength as each split contains one block.
// After reading the block info, we seek in the file to the correct position.

if(this.restoredState == null) {
if(this.restoredSplit == null) {
this.readRecords = 0;
this.stream.seek(this.splitStart + this.blockInfo.getFirstRecordStart());
this.blockBasedInput = new BlockBasedInput(this.stream,
Expand Down Expand Up @@ -390,6 +390,10 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentChannelState() throws IOException {
if (this.reachedEnd()) {
return new Tuple2<>(null, new Tuple2<>(0L, 0L));
}

Tuple2<Long, Long> state = new Tuple2<>(
this.blockBasedInput.getCurrBlockPos(), // the last read index in the block
this.readRecords // the number of records read
Expand All @@ -398,7 +402,7 @@ public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentChannelState() throw
}

@Override
public void restore(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
public void restore(FileInputSplit split, Tuple2<Long, Long> state) {
this.restoredSplit = split;
this.restoredState = state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

@Public
/**
* A block of 24 bytes written at the <i>end</i> of a block in a binary file, and containing
* i) the number of records in the block, ii) the accumulated number of records, and
* iii) the offset of the first record in the block.
* */
@Public
public class BlockInfo implements IOReadableWritable {

private long recordCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public interface CheckpointableInputFormat<S extends InputSplit, T extends Seria
* @param split The split to be opened.
* @param state The state from which to start from. This can contain the offset,
* but also other data, depending on the input format.
* @throws IOException Thrown, if the spit could not be opened due to an I/O problem.
*/
void restore(S split, T state) throws IOException;
void restore(S split, T state);
}
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public void open(FileInputSplit split) throws IOException {
this.overLimit = false;
this.end = false;

if (this.splitStart != 0 && this.restoredOffset == null) {
if (this.splitStart != 0 && this.restoredSplit == null) {
this.stream.seek(this.splitStart);
this.offset = this.splitStart;
readLine();
Expand All @@ -444,7 +444,7 @@ public void open(FileInputSplit split) throws IOException {
if (this.overLimit) {
this.end = true;
}
} else if (this.restoredOffset != null) {
} else if (this.restoredSplit != null) {

if (!this.restoredSplit.equals(split)) {
throw new RuntimeException("Tried to open at the wrong split after recovery.");
Expand Down Expand Up @@ -653,11 +653,14 @@ private boolean fillBuffer() throws IOException {

@Override
public Tuple2<FileInputSplit, Long> getCurrentChannelState() throws IOException {
if (reachedEnd()) {
return new Tuple2<>(null, 0l);
}
return new Tuple2<>(this.currSplit, this.offset);
}

@Override
public void restore(FileInputSplit split, Long state) throws IOException {
public void restore(FileInputSplit split, Long state) {
this.restoredSplit = split;
this.restoredOffset = state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,18 @@ public void checkRead() throws Exception {
if (input.nextRecord(record) != null) {
this.checkEquals(this.getRecord(readCount), record);

Tuple2<FileInputSplit, Tuple2<Long, Long>> state = input.getCurrentChannelState();
Assert.assertEquals(state.f0, inputSplit);

input = this.createInputFormat();
input.restore(state.f0, state.f1);
input.open(state.f0);
if (!input.reachedEnd()) {
Tuple2<FileInputSplit, Tuple2<Long, Long>> state = input.getCurrentChannelState();
Assert.assertEquals(state.f0, inputSplit);

input = this.createInputFormat();
input.restore(state.f0, state.f1);
input.open(state.f0);
} else {
Tuple2<FileInputSplit, Tuple2<Long, Long>> state = input.getCurrentChannelState();
Assert.assertEquals(state.f0, null);

}
readCount++;
}
}
Expand Down
11 changes: 10 additions & 1 deletion flink-fs-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,15 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
Expand All @@ -94,5 +102,6 @@ under the License.
<type>test-jar</type>
<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
</dependency>

</dependencies>
</project>

0 comments on commit c2c18e3

Please sign in to comment.