Skip to content

Commit

Permalink
[FLINK-5021] Make the ContinuousFileReaderOperator rescalable.
Browse files Browse the repository at this point in the history
This is the last commit that completes the refactoring of the
ContinuousFileReaderOperator so that it can be rescalable.
With this, the reader can restart from a savepoint with a
different parallelism without compromising the provided
exactly-once guarantees.
  • Loading branch information
kl0u authored and aljoscha committed Nov 11, 2016
1 parent 5a90c6b commit b2e8792
Show file tree
Hide file tree
Showing 4 changed files with 560 additions and 55 deletions.
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.hdfstests;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -36,7 +37,9 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
Expand Down Expand Up @@ -387,6 +390,149 @@ public int compare(String o1, String o2) {
}
}

@Test
public void testReaderSnapshotRestore() throws Exception {

TimestampedFileInputSplit split1 =
new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);

TimestampedFileInputSplit split2 =
new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);

TimestampedFileInputSplit split3 =
new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);

TimestampedFileInputSplit split4 =
new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);


final OneShotLatch latch = new OneShotLatch();

BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI));
TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);

ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format);
initReader.setOutputType(typeInfo, new ExecutionConfig());

OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> initTestInstance =
new OneInputStreamOperatorTestHarness<>(initReader);
initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
initTestInstance.open();

// create some state in the reader
initTestInstance.processElement(new StreamRecord<>(split1));
initTestInstance.processElement(new StreamRecord<>(split2));
initTestInstance.processElement(new StreamRecord<>(split3));
initTestInstance.processElement(new StreamRecord<>(split4));

// take a snapshot of the operator's state. This will be used
// to initialize another reader and compare the results of the
// two operators.

final OperatorStateHandles snapshot;
synchronized (initTestInstance.getCheckpointLock()) {
snapshot = initTestInstance.snapshot(0L, 0L);
}

ContinuousFileReaderOperator<FileInputSplit> restoredReader = new ContinuousFileReaderOperator<>(
new BlockingFileInputFormat(latch, new Path(hdfsURI)));
restoredReader.setOutputType(typeInfo, new ExecutionConfig());

OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> restoredTestInstance =
new OneInputStreamOperatorTestHarness<>(restoredReader);
restoredTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);

restoredTestInstance.initializeState(snapshot);
restoredTestInstance.open();

// now let computation start
latch.trigger();

// ... and wait for the operators to close gracefully

synchronized (initTestInstance.getCheckpointLock()) {
initTestInstance.close();
}

synchronized (restoredTestInstance.getCheckpointLock()) {
restoredTestInstance.close();
}

FileInputSplit fsSplit1 = createSplitFromTimestampedSplit(split1);
FileInputSplit fsSplit2 = createSplitFromTimestampedSplit(split2);
FileInputSplit fsSplit3 = createSplitFromTimestampedSplit(split3);
FileInputSplit fsSplit4 = createSplitFromTimestampedSplit(split4);

// compare if the results contain what they should contain and also if
// they are the same, as they should.

Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit1)));
Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit2)));
Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit3)));
Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit4)));

Assert.assertArrayEquals(
initTestInstance.getOutput().toArray(),
restoredTestInstance.getOutput().toArray()
);
}

private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit split) {
Preconditions.checkNotNull(split);

return new FileInputSplit(
split.getSplitNumber(),
split.getPath(),
split.getStart(),
split.getLength(),
split.getHostnames()
);
}

private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> {

private final OneShotLatch latch;

private FileInputSplit split;

private boolean reachedEnd;

BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
super(filePath);
this.latch = latch;
this.reachedEnd = false;
}

@Override
public void open(FileInputSplit fileSplit) throws IOException {
this.split = fileSplit;
this.reachedEnd = false;
}

@Override
public boolean reachedEnd() throws IOException {
if (!latch.isTriggered()) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return reachedEnd;
}

@Override
public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException {
this.reachedEnd = true;
return split;
}

@Override
public void close() {

}
}

//// Monitoring Function Tests //////

@Test
Expand Down Expand Up @@ -421,7 +567,7 @@ public boolean filterPath(Path filePath) {
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);

final FileVerifyingSourceContext context =
new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction, 0, -1);
new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction);

monitoringFunction.open(new Configuration());
monitoringFunction.run(context);
Expand Down Expand Up @@ -490,15 +636,21 @@ public void testProcessOnce() throws Exception {
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);

final FileVerifyingSourceContext context =
new FileVerifyingSourceContext(latch, monitoringFunction, 1, -1);
final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch, monitoringFunction);

final Thread t = new Thread() {
@Override
public void run() {
try {
monitoringFunction.open(new Configuration());
monitoringFunction.run(context);

// we would never arrive here if we were in
// PROCESS_CONTINUOUSLY mode.

// this will trigger the latch
context.close();

} catch (Exception e) {
Assert.fail(e.getMessage());
}
Expand Down Expand Up @@ -599,10 +751,15 @@ private static class FileVerifyingSourceContext extends DummySourceContext {
private final ContinuousFileMonitoringFunction src;
private final OneShotLatch latch;
private final Set<String> seenFiles;
private final int elementsBeforeNotifying;

private int elementsBeforeNotifying = -1;
private int elementsBeforeCanceling = -1;

FileVerifyingSourceContext(OneShotLatch latch,
ContinuousFileMonitoringFunction src) {
this(latch, src, -1, -1);
}

FileVerifyingSourceContext(OneShotLatch latch,
ContinuousFileMonitoringFunction src,
int elementsBeforeNotifying,
Expand All @@ -621,16 +778,27 @@ Set<String> getSeenFiles() {
@Override
public void collect(TimestampedFileInputSplit element) {
String seenFileName = element.getPath().getName();

this.seenFiles.add(seenFileName);
if (seenFiles.size() == elementsBeforeNotifying) {

if (seenFiles.size() == elementsBeforeNotifying && !latch.isTriggered()) {
latch.trigger();
}

if (elementsBeforeCanceling != -1 && seenFiles.size() == elementsBeforeCanceling) {
if (seenFiles.size() == elementsBeforeCanceling) {
src.cancel();
}
}

@Override
public void close() {
// the context was terminated so trigger so
// that all threads that were waiting for this
// are un-blocked.
if (!latch.isTriggered()) {
latch.trigger();
}
src.cancel();
}
}

private static class ModTimeVerifyingSourceContext extends DummySourceContext {
Expand Down

0 comments on commit b2e8792

Please sign in to comment.