Skip to content

Commit

Permalink
[FLINK-3889][FLINK-3808] Refactor File Monitoring Source
Browse files Browse the repository at this point in the history
This is meant to replace the different file
reading sources in Flink streaming. Now there is
one monitoring source with DOP 1 monitoring a
directory and assigning input split to downstream
readers.

In addition, it makes the new features added by
FLINK-3717 and FLINK-3808 work together. Now we have
a file monitoring source that is also fault tolerant
and can guarantee exactly once semantics.

This does not replace the old API calls. This
will be done in a future commit.
  • Loading branch information
kl0u committed May 11, 2016
1 parent c9682b7 commit cbbfd8d
Show file tree
Hide file tree
Showing 17 changed files with 1,918 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void open(FileInputSplit split) throws IOException {
end = split.getStart() + split.getLength();
recordsReadSinceLastSync = 0;

if(this.restoredState == null) {
if(this.restoredSplit == null) {
dataFileReader.sync(split.getStart());
lastSync = dataFileReader.previousSync();
} else {
Expand Down Expand Up @@ -203,6 +203,10 @@ public E nextRecord(E reuseValue) throws IOException {

@Override
public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentChannelState() throws IOException {
if (this.reachedEnd()) {
return new Tuple2<>(null, new Tuple2<>(0L, 0L));
}

Tuple2<Long, Long> state = new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
return new Tuple2<>(currSplit, state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public void open(FileInputSplit split) throws IOException {
// We set the size of the BlockBasedInput to splitLength as each split contains one block.
// After reading the block info, we seek in the file to the correct position.

if(this.restoredState == null) {
if(this.restoredSplit == null) {
this.readRecords = 0;
this.stream.seek(this.splitStart + this.blockInfo.getFirstRecordStart());
this.blockBasedInput = new BlockBasedInput(this.stream,
Expand Down Expand Up @@ -390,6 +390,10 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentChannelState() throws IOException {
if (this.reachedEnd()) {
return new Tuple2<>(null, new Tuple2<>(0L, 0L));
}

Tuple2<Long, Long> state = new Tuple2<>(
this.blockBasedInput.getCurrBlockPos(), // the last read index in the block
this.readRecords // the number of records read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public void open(FileInputSplit split) throws IOException {
this.overLimit = false;
this.end = false;

if (this.splitStart != 0 && this.restoredOffset == null) {
if (this.splitStart != 0 && this.restoredSplit == null) {
this.stream.seek(this.splitStart);
this.offset = this.splitStart;
readLine();
Expand All @@ -444,7 +444,7 @@ public void open(FileInputSplit split) throws IOException {
if (this.overLimit) {
this.end = true;
}
} else if (this.restoredOffset != null) {
} else if (this.restoredSplit != null) {

if (!this.restoredSplit.equals(split)) {
throw new RuntimeException("Tried to open at the wrong split after recovery.");
Expand Down Expand Up @@ -653,6 +653,9 @@ private boolean fillBuffer() throws IOException {

@Override
public Tuple2<FileInputSplit, Long> getCurrentChannelState() throws IOException {
if (reachedEnd()) {
return new Tuple2<>(null, 0l);
}
return new Tuple2<>(this.currSplit, this.offset);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,18 @@ public void checkRead() throws Exception {
if (input.nextRecord(record) != null) {
this.checkEquals(this.getRecord(readCount), record);

Tuple2<FileInputSplit, Tuple2<Long, Long>> state = input.getCurrentChannelState();
Assert.assertEquals(state.f0, inputSplit);

input = this.createInputFormat();
input.restore(state.f0, state.f1);
input.open(state.f0);
if (!input.reachedEnd()) {
Tuple2<FileInputSplit, Tuple2<Long, Long>> state = input.getCurrentChannelState();
Assert.assertEquals(state.f0, inputSplit);

input = this.createInputFormat();
input.restore(state.f0, state.f1);
input.open(state.f0);
} else {
Tuple2<FileInputSplit, Tuple2<Long, Long>> state = input.getCurrentChannelState();
Assert.assertEquals(state.f0, null);

}
readCount++;
}
}
Expand Down
11 changes: 10 additions & 1 deletion flink-fs-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,15 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
Expand All @@ -94,5 +102,6 @@ under the License.
<type>test-jar</type>
<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
</dependency>

</dependencies>
</project>

0 comments on commit cbbfd8d

Please sign in to comment.