Skip to content

Commit

Permalink
[FLINK-3889][FLINK-3808] Refactor File Monitoring Source
Browse files Browse the repository at this point in the history
This is meant to replace the different file
reading sources in Flink streaming. Now there is
one monitoring source with DOP 1 monitoring a
directory and assigning input split to downstream
readers.

In addition, it makes the new features added by
FLINK-3717 and FLINK-3808 work together. Now we have
a file monitoring source that is also fault tolerant
and can guarantee exactly once semantics.

This does not replace the old API calls. This
will be done in a future commit.
  • Loading branch information
kl0u committed May 17, 2016
1 parent c9682b7 commit 85688c5
Show file tree
Hide file tree
Showing 18 changed files with 1,904 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void open(FileInputSplit split) throws IOException {
end = split.getStart() + split.getLength();
recordsReadSinceLastSync = 0;

if(this.restoredState == null) {
if(this.restoredSplit == null) {
dataFileReader.sync(split.getStart());
lastSync = dataFileReader.previousSync();
} else {
Expand Down Expand Up @@ -203,6 +203,10 @@ public E nextRecord(E reuseValue) throws IOException {

@Override
public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentChannelState() throws IOException {
if (this.reachedEnd()) {
return new Tuple2<>(null, new Tuple2<>(0L, 0L));
}

Tuple2<Long, Long> state = new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
return new Tuple2<>(currSplit, state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public void open(FileInputSplit split) throws IOException {
// We set the size of the BlockBasedInput to splitLength as each split contains one block.
// After reading the block info, we seek in the file to the correct position.

if(this.restoredState == null) {
if(this.restoredSplit == null) {
this.readRecords = 0;
this.stream.seek(this.splitStart + this.blockInfo.getFirstRecordStart());
this.blockBasedInput = new BlockBasedInput(this.stream,
Expand Down Expand Up @@ -390,6 +390,10 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentChannelState() throws IOException {
if (this.reachedEnd()) {
return new Tuple2<>(null, new Tuple2<>(0L, 0L));
}

Tuple2<Long, Long> state = new Tuple2<>(
this.blockBasedInput.getCurrBlockPos(), // the last read index in the block
this.readRecords // the number of records read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

@Public
/**
* A block of 24 bytes written at the <i>end</i> of a block in a binary file, and containing
* i) the number of records in the block, ii) the accumulated number of records, and
* iii) the offset of the first record in the block.
* */
@Public
public class BlockInfo implements IOReadableWritable {

private long recordCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public void open(FileInputSplit split) throws IOException {
this.overLimit = false;
this.end = false;

if (this.splitStart != 0 && this.restoredOffset == null) {
if (this.splitStart != 0 && this.restoredSplit == null) {
this.stream.seek(this.splitStart);
this.offset = this.splitStart;
readLine();
Expand All @@ -444,7 +444,7 @@ public void open(FileInputSplit split) throws IOException {
if (this.overLimit) {
this.end = true;
}
} else if (this.restoredOffset != null) {
} else if (this.restoredSplit != null) {

if (!this.restoredSplit.equals(split)) {
throw new RuntimeException("Tried to open at the wrong split after recovery.");
Expand Down Expand Up @@ -653,6 +653,9 @@ private boolean fillBuffer() throws IOException {

@Override
public Tuple2<FileInputSplit, Long> getCurrentChannelState() throws IOException {
if (reachedEnd()) {
return new Tuple2<>(null, 0l);
}
return new Tuple2<>(this.currSplit, this.offset);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,18 @@ public void checkRead() throws Exception {
if (input.nextRecord(record) != null) {
this.checkEquals(this.getRecord(readCount), record);

Tuple2<FileInputSplit, Tuple2<Long, Long>> state = input.getCurrentChannelState();
Assert.assertEquals(state.f0, inputSplit);

input = this.createInputFormat();
input.restore(state.f0, state.f1);
input.open(state.f0);
if (!input.reachedEnd()) {
Tuple2<FileInputSplit, Tuple2<Long, Long>> state = input.getCurrentChannelState();
Assert.assertEquals(state.f0, inputSplit);

input = this.createInputFormat();
input.restore(state.f0, state.f1);
input.open(state.f0);
} else {
Tuple2<FileInputSplit, Tuple2<Long, Long>> state = input.getCurrentChannelState();
Assert.assertEquals(state.f0, null);

}
readCount++;
}
}
Expand Down
11 changes: 10 additions & 1 deletion flink-fs-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,15 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
Expand All @@ -94,5 +102,6 @@ under the License.
<type>test-jar</type>
<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
</dependency>

</dependencies>
</project>

0 comments on commit 85688c5

Please sign in to comment.