Skip to content

Commit

Permalink
[FLINK-2314] Make Streaming File Sources Persistent
Browse files Browse the repository at this point in the history
This commit takes the changes from the previous
commits and wires them into the API, both Java and Scala.

While doing so, some changes were introduced to the
classes actually doing the work, either as bug fixes, or
as new design choices.
  • Loading branch information
kl0u committed May 24, 2016
1 parent 13db59f commit 60949ab
Show file tree
Hide file tree
Showing 18 changed files with 517 additions and 454 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.FilePathFilter;
import org.apache.flink.streaming.api.functions.source.FileSplitMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.FileSplitReadOperator;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
Expand Down Expand Up @@ -116,20 +117,18 @@ protected void testProgram() throws Exception {
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilePath(hdfsURI);

Configuration config = new Configuration();
config.setString("input.file.path", hdfsURI);

try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

FileSplitMonitoringFunction<String> monitoringFunction =
new FileSplitMonitoringFunction<>(format, hdfsURI,
config, FileSplitMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED,
env.getParallelism(), INTERVAL);
FilePathFilter.DefaultFilter.getInstance(),
FileSplitMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED,
env.getParallelism(), INTERVAL);

TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
FileSplitReadOperator<String, ?> reader = new FileSplitReadOperator<>(format, config);
FileSplitReadOperator<String, ?> reader = new FileSplitReadOperator<>(format);
TestingSinkFunction sink = new TestingSinkFunction(monitoringFunction);

DataStream<FileInputSplit> splits = env.addSource(monitoringFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,36 +115,41 @@ public void testFileReadingOperator() throws Exception {
}

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
Configuration config = new Configuration();
config.setString("input.file.path", hdfsURI);

TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
FileSplitReadOperator<String, ?> reader = new FileSplitReadOperator<>(format, config);
reader.setOutputType(typeInfo, new ExecutionConfig());

FileSplitReadOperator<String, ?> reader = new FileSplitReadOperator<>(format);
OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
new OneInputStreamOperatorTestHarness<>(reader);

reader.setOutputType(typeInfo, new ExecutionConfig());
tester.open();

// create the necessary splits for the test
FileInputSplit[] splits = format.createInputSplits(
reader.getRuntimeContext().getNumberOfParallelSubtasks());

// and feed them to the operator
for(FileInputSplit split: splits) {
tester.processElement(new StreamRecord<>(split));
}
// this will also call the reader.close()
tester.close();

// then close the reader gracefully
synchronized (tester.getCheckpointLock()) {
tester.close();
}

/*
* Given that the reader is multithreaded, the test finishes before the reader finishes
* reading. This results in files being deleted before they are read, thus throwing an exception.
* Given that the reader is multithreaded, the test finishes before the reader thread finishes
* reading. This results in files being deleted by the test before being read, thus throwing an exception.
* In addition, even if file deletion happens at the end, the results are not ready for testing.
* To faces this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s.
* */
* To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s.
*/

long start = System.currentTimeMillis();
Queue<Object> output;
do {
output = tester.getOutput();
Thread.sleep(50);
} while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);

Map<Integer, List<String>> actualFileContents = new HashMap<>();
Expand Down Expand Up @@ -184,10 +189,12 @@ public int compare(String o1, String o2) {
}
}

private int getLineNo(String line) {
String[] tkns = line.split("\\s");
Assert.assertTrue(tkns.length == 6);
return Integer.parseInt(tkns[tkns.length - 1]);
private static class PathFilter implements FilePathFilter {

@Override
public boolean filterPath(Path filePath) {
return filePath.getName().startsWith("**");
}
}

@Test
Expand All @@ -208,15 +215,11 @@ public void testFilePathFiltering() throws Exception {
}

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));

Configuration config = new Configuration();
config.setString("input.file.path", hdfsURI);

FileSplitMonitoringFunction<String> monitoringFunction =
new FileSplitMonitoringFunction<>(format, hdfsURI, config, new PathFilter(),
FileSplitMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED, 1, INTERVAL);
new FileSplitMonitoringFunction<>(format, hdfsURI, new PathFilter(),
FileSplitMonitoringFunction.WatchType.PROCESS_ONCE, 1, INTERVAL);

monitoringFunction.open(config);
monitoringFunction.open(new Configuration());
monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));

Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
Expand All @@ -230,74 +233,75 @@ public void testFilePathFiltering() throws Exception {
}
}

private static class PathFilter implements FilePathFilter {

@Override
public boolean filterPath(Path filePath) {
return filePath.getName().startsWith("**");
}
}

@Test
public void testFileSplitMonitoring() throws Exception {
public void testFileSplitMonitoringReprocessWithAppended() throws Exception {
Set<String> uniqFilesFound = new HashSet<>();

FileCreator fc = new FileCreator(INTERVAL);
FileCreator fc = new FileCreator(INTERVAL, NO_OF_FILES);
fc.start();

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
Configuration config = new Configuration();
config.setString("input.file.path", hdfsURI);

FileSplitMonitoringFunction<String> monitoringFunction =
new FileSplitMonitoringFunction<>(format, hdfsURI, config,
new FileSplitMonitoringFunction<>(format, hdfsURI, FilePathFilter.DefaultFilter.getInstance(),
FileSplitMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED, 1, INTERVAL);
monitoringFunction.open(config);

monitoringFunction.open(new Configuration());
monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));

// wait until all the files are created
fc.join();
// wait until the sink also sees all the splits.
synchronized (uniqFilesFound) {
while (uniqFilesFound.size() < NO_OF_FILES) {
uniqFilesFound.wait(7 * INTERVAL);
}
}

Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);

Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
Set<String> fileNamesCreated = new HashSet<>();
for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
fileNamesCreated.add(path.toString());
}

Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
for(String file: uniqFilesFound) {
Assert.assertTrue(fileNamesCreated.contains(file));
}

for(org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}

@Test
public void testFileSplitMonitoringProcessOnce() throws Exception {
Set<String> uniqFilesFound = new HashSet<>();

FileCreator fc = new FileCreator(INTERVAL);
FileCreator fc = new FileCreator(INTERVAL, 1);
fc.start();

// to make sure that at least one file is created
Set<org.apache.hadoop.fs.Path> created = fc.getFilesCreated();
synchronized (created) {
if (created.size() == 0) {
created.wait();
Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
synchronized (filesCreated) {
if (filesCreated.size() == 0) {
filesCreated.wait();
}
}
Assert.assertTrue(fc.getFilesCreated().size() >= 1);

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
Configuration config = new Configuration();
config.setString("input.file.path", hdfsURI);

FileSplitMonitoringFunction<String> monitoringFunction =
new FileSplitMonitoringFunction<>(format, hdfsURI, config,
new FileSplitMonitoringFunction<>(format, hdfsURI, FilePathFilter.DefaultFilter.getInstance(),
FileSplitMonitoringFunction.WatchType.PROCESS_ONCE, 1, INTERVAL);
monitoringFunction.open(config);

monitoringFunction.open(new Configuration());
monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));

// wait until all the files are created
fc.join();

Assert.assertTrue(filesCreated.size() == NO_OF_FILES);

Set<String> fileNamesCreated = new HashSet<>();
for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
fileNamesCreated.add(path.toString());
Expand All @@ -307,10 +311,20 @@ public void testFileSplitMonitoringProcessOnce() throws Exception {
for(String file: uniqFilesFound) {
Assert.assertTrue(fileNamesCreated.contains(file));
}

for(org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}

// ------------- End of Tests

private int getLineNo(String line) {
String[] tkns = line.split("\\s");
Assert.assertTrue(tkns.length == 6);
return Integer.parseInt(tkns[tkns.length - 1]);
}

/**
* A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
* It serves for testing the file monitoring functionality of the {@link FileSplitMonitoringFunction}.
Expand All @@ -319,11 +333,13 @@ public void testFileSplitMonitoringProcessOnce() throws Exception {
private class FileCreator extends Thread {

private final long interval;
private final int noOfFilesBeforeNotifying;

private final Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();

FileCreator(long interval) {
FileCreator(long interval, int notificationLim) {
this.interval = interval;
this.noOfFilesBeforeNotifying = notificationLim;
}

public void run() {
Expand All @@ -334,13 +350,13 @@ public void run() {

synchronized (filesCreated) {
filesCreated.add(file.f0);
filesCreated.notifyAll();
if (filesCreated.size() == noOfFilesBeforeNotifying) {
filesCreated.notifyAll();
}
}
Thread.sleep(interval);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
Expand Down Expand Up @@ -376,6 +392,9 @@ public void collect(FileInputSplit element) {
if (filesFound.size() == NO_OF_FILES) {
this.src.cancel();
this.src.close();
synchronized (filesFound) {
filesFound.notifyAll();
}
}
} catch (Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -407,6 +426,7 @@ private Tuple2<org.apache.hadoop.fs.Path, String> fillWithData(String base, Stri
assert (hdfs != null);

org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
Assert.assertTrue (!hdfs.exists(file));

org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
FSDataOutputStream stream = hdfs.create(tmp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public RuntimeEnvironment(
InputGate[] inputGates,
ActorGateway jobManager,
TaskManagerRuntimeInfo taskManagerInfo,
Task containingTask,
TaskMetricGroup metrics) {
TaskMetricGroup metrics,
Task containingTask) {

this.jobId = checkNotNull(jobId);
this.jobVertexId = checkNotNull(jobVertexId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ else if (current == ExecutionState.CANCELING) {
userCodeClassLoader, memoryManager, ioManager,
broadcastVariableManager, accumulatorRegistry,
splitProvider, distributedCacheEntries,
writers, inputGates, jobManager, taskManagerConfig, this, metrics);
writers, inputGates, jobManager, taskManagerConfig, metrics, this);

// let the task code create its readers and writers
invokable.setEnvironment(env);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public DataStreamSource(StreamExecutionEnvironment environment,
}
}

public DataStreamSource(SingleOutputStreamOperator<T> operator) {
super(operator.environment, operator.getTransformation());
this.isParallel = true;
}

@Override
public DataStreamSource<T> setParallelism(int parallelism) {
if (parallelism > 1 && !isParallel) {
Expand Down

0 comments on commit 60949ab

Please sign in to comment.