Skip to content

Commit

Permalink
Minor changes in the FileSplitMonitoringFunction.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Apr 27, 2016
1 parent 8952eac commit e692fe3
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.flink.streaming.api.functions.source;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
Expand All @@ -39,6 +40,7 @@
* to downstream tasks for further reading and processing. Which splits will be further processed
* depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}.
*/
@Internal
public class FileSplitMonitoringFunction<OUT>
extends RichSourceFunction<FileInputSplit> {

Expand Down Expand Up @@ -72,7 +74,7 @@ public enum WatchType {

private FilePathFilter pathFilter;

private volatile boolean isRunning = false;
private volatile boolean isRunning = true;

private Configuration configuration;

Expand Down Expand Up @@ -110,26 +112,8 @@ public void open(Configuration parameters) throws Exception {
format.configure(this.configuration);
}

/**
* Creates the input splits for the path to be assigned to the downstream tasks.
* Those are going to read their contents for further processing.
*/
private FileInputSplit[] createInputSplits() throws JobException {
FileInputSplit[] inputSplits;
try {
inputSplits = format.createInputSplits(readerParallelism);
} catch (Throwable t) {
throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
}
return inputSplits;
}

@Override
public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception {
if(!isRunning) {
isRunning = true;
}

FileSystem fileSystem = FileSystem.get(new URI(path));
while (isRunning) {
monitor(fileSystem, context);
Expand All @@ -139,7 +123,7 @@ public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exc

private void monitor(FileSystem fs, SourceContext<FileInputSplit> context) throws IOException, JobException {
List<FileStatus> files = listEligibleFiles(fs);
for (FileInputSplit split : getInputSplitsToForward(files)) {
for (FileInputSplit split : getInputSplits(files)) {
processSplit(split, context);
}
}
Expand All @@ -154,12 +138,18 @@ private void processSplit(FileInputSplit split, SourceContext<FileInputSplit> co
}
}

private Set<FileInputSplit> getInputSplitsToForward(List<FileStatus> files) throws JobException {
/**
* Creates the input splits for the path to be assigned to the downstream tasks.
* Those are going to read their contents for further processing. Splits belonging
* to files in the {@code files} list are ignored.
* @param files The files to ignore.
*/
private Set<FileInputSplit> getInputSplits(List<FileStatus> files) throws IOException {
if (files.isEmpty()) {
return new HashSet<>();
}

FileInputSplit[] inputSplits = createInputSplits();
FileInputSplit[] inputSplits = format.createInputSplits(readerParallelism);

Set<FileInputSplit> splitsToForward = new HashSet<>();
for (FileInputSplit split: inputSplits) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,22 +48,17 @@ public class FileSplitReadOperator<OUT> extends AbstractStreamOperator<OUT> impl

private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class);

private Queue<FileInputSplit> unReadSplits;

private transient SplitReader reader;
private transient Object checkpointLock;
private transient SplitReader<OUT> reader;
private transient TimestampedCollector<OUT> collector;

private Configuration configuration;
private FileInputFormat<OUT> format;
private TypeInformation<OUT> typeInfo;
private transient TypeSerializer<OUT> serializer;

public FileSplitReadOperator(FileInputFormat<OUT> format, TypeInformation<OUT> typeInfo, Configuration configuration) {
this.format = format;
this.typeInfo = typeInfo;
this.configuration = configuration;
this.unReadSplits = new ConcurrentLinkedQueue<>();

// this is for the extra thread that is reading,
// the tests were not terminating because the success
Expand All @@ -80,19 +76,18 @@ public void open() throws Exception {
super.open();

this.format.configure(configuration);
this.collector = new TimestampedCollector<OUT>(output);
this.checkpointLock = getContainingTask()
.getCheckpointLock();
this.serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
this.collector = new TimestampedCollector<>(output);

TypeSerializer<OUT> serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
Object checkpointLock = getContainingTask().getCheckpointLock();

this.reader = new SplitReader(unReadSplits, format, serializer, collector, checkpointLock);
this.reader = new SplitReader<>(format, serializer, collector, checkpointLock);
this.reader.start();
}

@Override
public void processElement(StreamRecord<FileInputSplit> element) throws Exception {
LOG.info("Reading Split: " + element.getValue());
this.unReadSplits.add(element.getValue());
this.reader.addSplit(element.getValue());
}

@Override
Expand All @@ -104,36 +99,41 @@ public void processWatermark(Watermark mark) throws Exception {
public void close() throws Exception {
super.close();
this.reader.cancel();
this.reader.interrupt();
this.collector.close();
this.format.close();
}

private class SplitReader<OUT> extends Thread {
private class SplitReader<OT> extends Thread {

private boolean isRunning;

private final Queue<FileInputSplit> splits;
private final Queue<FileInputSplit> pendingSplits;

private final FileInputFormat<OUT> format;
private final TypeSerializer<OUT> serializer;
private final FileInputFormat<OT> format;
private final TypeSerializer<OT> serializer;

private final Object checkpointLock;
private final TimestampedCollector<OUT> collector;
private final TimestampedCollector<OT> collector;

SplitReader(Queue<FileInputSplit> splits,
FileInputFormat<OUT> format,
TypeSerializer<OUT> serializer,
TimestampedCollector<OUT> collector,
SplitReader(FileInputFormat<OT> format,
TypeSerializer<OT> serializer,
TimestampedCollector<OT> collector,
Object checkpointLock) {
this.format = checkNotNull(format, "Unspecified FileInputFormat.");
this.serializer = checkNotNull(serializer, "Unspecified Serialized.");

this.splits = splits;
this.pendingSplits = new ConcurrentLinkedQueue<>();
this.collector = collector;
this.checkpointLock = checkpointLock;
this.isRunning = true;
}

void addSplit(FileInputSplit split) {
Preconditions.checkNotNull(split);
LOG.info("Adding split to read queue: " + split);
this.pendingSplits.add(split);
}

@Override
public void run() {
FileInputSplit split = null;
Expand All @@ -142,30 +142,38 @@ public void run() {
while(this.isRunning) {

// get the next split to read
split = this.splits.poll();
if(split == null) {
split = this.pendingSplits.poll();
if (split == null) {
Thread.sleep(50);
continue;
}

this.format.open(split);
OUT nextElement = serializer.createInstance();
do {
nextElement = format.nextRecord(nextElement);
if(nextElement != null) {
synchronized (checkpointLock) {
collector.collect(nextElement);
// checkpointing should be here.
boolean isOpen = false;
try {
this.format.open(split);
isOpen = true;
OT nextElement = serializer.createInstance();
do {
nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
synchronized (checkpointLock) {
collector.collect(nextElement);
// checkpointing should be here.
}
}
} while (nextElement != null && !format.reachedEnd());
} finally {
if (isOpen) {
this.format.close();
}
} while(nextElement != null && !format.reachedEnd());
}
}

LOG.info("Split Reader terminated, and exiting normally.");
} catch (IOException e) {
throw new RuntimeException("Unable to open split: " + split, e.getCause());
} catch (InterruptedException e) {
LOG.info("Reader thread was interrupted.");
LOG.info("Reader thread was interrupted: ", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -37,9 +36,7 @@
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
Expand All @@ -49,17 +46,15 @@ public class FileSplitMonitoringFunctionITCase extends StreamingProgramTestBase
private static final int NO_OF_FILES = 10;
private static final int LINES_PER_FILE = 10;

private static final long INTERVAL = 200;
private static final long INTERVAL = 100;

private File baseDir;

private org.apache.hadoop.fs.FileSystem hdfs;
private String hdfsURI;
private MiniDFSCluster hdfsCluster;

private Set<Path> hdPaths = new HashSet<>();
private Set<String> hdPathNames = new HashSet<>();
private Map<Integer, String> hdPathContents = new HashMap<>();
private static Map<Integer, String> hdPathContents = new HashMap<>();

// PREPARING FOR THE TESTS

Expand Down Expand Up @@ -95,36 +90,29 @@ public void destroyHDFS() {

private static final Object lock = new Object();

private boolean[] finished;

@Override
protected void testProgram() throws Exception {
FileCreator fileCreator = new FileCreator(INTERVAL);
Thread t = new Thread(fileCreator);
t.start();
Thread.sleep(100);

StringFileFormat format = new StringFileFormat();
format.setFilePath(hdfsURI);

Configuration config = new Configuration();
config.setString("input.file.path", hdfsURI);

FileSplitMonitoringFunction<String> monitoringFunction = null;
FileSplitReadOperator<String> reader = null;
TestingSinkFunction sink = null;

try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

monitoringFunction = new FileSplitMonitoringFunction<String>(format, hdfsURI,
FileSplitMonitoringFunction<String> monitoringFunction =
new FileSplitMonitoringFunction<>(format, hdfsURI,
config, FileSplitMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED,
env.getParallelism(), INTERVAL);

TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
reader = new FileSplitReadOperator<String>(format, typeInfo, config);
sink = new TestingSinkFunction(monitoringFunction);
FileSplitReadOperator<String> reader = new FileSplitReadOperator<>(format, typeInfo, config);
TestingSinkFunction sink = new TestingSinkFunction(monitoringFunction);

DataStream<FileInputSplit> splits = env.addSource(monitoringFunction);
splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1);
Expand All @@ -138,19 +126,18 @@ protected void testProgram() throws Exception {
}
}

private static int elementCounter = 0;
private static Map<Integer, Integer> elementCounters = new HashMap<>();
private static Map<Integer, String> collectedContent = new HashMap<>();

private static class TestingSinkFunction extends RichSinkFunction<String> {

private final FileSplitMonitoringFunction src;

private int elementCounter = 0;
private Map<Integer, Integer> elementCounters = new HashMap<>();
private Map<Integer, String> collectedContent = new HashMap<>();

TestingSinkFunction(FileSplitMonitoringFunction monitoringFunction) {
this.src = monitoringFunction;
}


@Override
public void open(Configuration parameters) throws Exception {
// this sink can only work with DOP 1
Expand All @@ -159,6 +146,14 @@ public void open(Configuration parameters) throws Exception {

@Override
public void close() {

// check if the data that we collected are the ones they are supposed to be.
Assert.assertEquals(collectedContent.size(), hdPathContents.size());
for (Integer fileIdx: collectedContent.keySet()) {
Assert.assertEquals(collectedContent.get(fileIdx), hdPathContents.get(fileIdx));
}
hdPathContents.clear();

src.cancel();
try {
src.close();
Expand All @@ -176,9 +171,8 @@ public void invoke(String value) throws Exception {
if (counter == null) {
counter = 0;
} else if (counter == LINES_PER_FILE) {
// ignore duplicate files.
System.out.println("Ignoring: " + value);
return;
// ignore duplicate lines.
Assert.fail("Duplicate line: " + value);
}
elementCounters.put(fileIdx, ++counter);

Expand Down Expand Up @@ -226,7 +220,7 @@ public void run() {
}

/**
* Fill the file with content and put the content in the {@code hdPathContents} list.
* Fill the file with content.
* */
private void fillWithData(String base, String fileName, int fileIdx, String sampleLine) throws IOException {
assert (hdfs != null);
Expand All @@ -245,8 +239,6 @@ private void fillWithData(String base, String fileName, int fileIdx, String samp

hdfs.rename(tmp, file);

hdPaths.add(file);
hdPathNames.add(file.toString());
hdPathContents.put(fileIdx, str.toString());

Assert.assertTrue("No result file present", hdfs.exists(file));
Expand Down

0 comments on commit e692fe3

Please sign in to comment.