Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2314] Make Streaming File Sources Persistent #2020

Closed
wants to merge 8 commits into from

Conversation

kl0u
Copy link
Contributor

@kl0u kl0u commented May 23, 2016

This PR solves FLINK-2314 and combines a number of sub-tasks. In addition, it solves FLINK-3896 which was introduced as part of this task.

The way File Input sources are now processed is the following:
* One task monitors (parallelism 1) a user-specified path for new files/data
* The above task assigns FileInputSplits to downstream (parallel) readers to actually read the data

The monitoring entity scans the path, splits the files to be processed in splits, and assigns them downstream. For now, two modes are supported. These are the PROCESS_ONCE which just processes the current contents of the path and exits, and the REPROCESS_WITH_APPENDED which periodically monitors the path and reprocesses new files and (the entire contents of) files with new data.

In addition, these sources are checkpointed, i.e. in the case of a task failure the job will resume from where it left off.

Finally, some changes were introduced in the way we are handling FileInputFormats after discussions with @aljoscha .

* A default implementation is the {@link DefaultFilter} which excludes files starting with ".", "_", or
* contain the "_COPYING_" in their names. This can be retrieved by {@link DefaultFilter#getInstance()}.
* */
public interface FilePathFilter extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably make this @PublicEvolving, just to be on the save site. I can fix it up when merging.

@aljoscha
Copy link
Contributor

All in all, very good work!

One thing I'd like to change is the order of parameters in the readFile methods. For these telescoping methods is usual to append new parameters to the end. For example in these two methods, where you add the additional filter method the filter would go to the end on the second method because it is an additional parameter. This way, users can just append additional parameters to existing ones without changing the order.

public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
                                                String filePath,
                                                WatchType watchType,
                                                long interval)
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
                                                String filePath,
                                                WatchType watchType,
                                                FilePathFilter filter,
                                                long interval)

@kl0u
Copy link
Contributor Author

kl0u commented May 23, 2016

@aljoscha Thanks a lot for the comments!
I integrated them already.

@aljoscha
Copy link
Contributor

Thanks, the changes look good.

R: @StephanEwen for taking a look at the API, you would only look at StreamExecutionEnvironment, for this.

@aljoscha
Copy link
Contributor

CC: @StephanEwen

By the way, it might not look like it but the only additional methods this introduces on StreamExecutionEnvironment are are these three:

public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
                                                String filePath,
                                                WatchType watchType,
                                                long interval)

public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
                                                String filePath,
                                                WatchType watchType,
                                                long interval,
                                                FilePathFilter filter)

public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
                                                String filePath,
                                                WatchType watchType,
                                                long interval,
                                                FilePathFilter filter,
                                                TypeInformation<OUT> typeInformation)

The rest are unfortunately public methods and we can't remove them, even though some should probably be removed.

try {
postSubmit();
} catch (Exception e1) {
e1.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not forward the exception here? You introduced this block so that postSubmit() also runs when the SuccessException was thrown, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aljoscha ! Done.

private transient FileInputSplit currSplit;

private transient FileInputSplit restoredSplit;
private transient Long restoredOffset;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use a primitive type here

@kl0u kl0u force-pushed the api_ft_files branch 10 times, most recently from e78457e to fb9c949 Compare May 31, 2016 10:14
@@ -82,6 +82,8 @@
*/
private static int MAX_SAMPLE_LEN;

private boolean restoring = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's probable left from a previous change

@aljoscha
Copy link
Contributor

The changes look good, we just have to figure out what to do about the methods on StreamExecutionEnvironment. One thing I'd like to get rid of is CheckpointableInputFormat.getCurrentState returns a Tuple that contains the split. The split is never used internally, it's only stored to return it from this method but the read operator itself also stores the split so I think it is redundant.

kl0u added 2 commits June 1, 2016 01:06
It adds a method failExternally() to the StreamTask, so that custom Operators
can make their containing task fail when needed.
This adds a new interface called CheckpointableInputFormat
which describes input formats whose state is queryable,
i.e. getCurrentChannelState() returns where the reader is
in the underlying source, and they can resume reading from
a user-specified position.

This functionality is not yet leveraged by current readers.
kl0u added 3 commits June 1, 2016 01:06
This is meant to replace the different file
reading sources in Flink streaming. Now there is
one monitoring source with DOP 1 monitoring a
directory and assigning input split to downstream
readers.

In addition, it makes the new features added by
FLINK-3717 work together with the aforementioned entities
(the monitor and the readers) in order to have
fault tolerant file sources and exactly once guarantees.

This does not replace the old API calls. This
will be done in a future commit.
This commit takes the changes from the previous
commits and wires them into the API, both Java and Scala.

While doing so, some changes were introduced to the
classes actually doing the work, either as bug fixes, or
as new design choices.
@kl0u kl0u force-pushed the api_ft_files branch 3 times, most recently from aef24e8 to 3903562 Compare May 31, 2016 23:34
@aljoscha
Copy link
Contributor

aljoscha commented Jun 1, 2016

The input formats still have a leftover field that stores the split.

After that, the only thing that remains is the API methods. Also what was the reason for the new code in StreamFaultToleranceTestBase.java.

@kl0u kl0u force-pushed the api_ft_files branch 5 times, most recently from 2ef1593 to e1dac4b Compare June 6, 2016 14:09
@kl0u kl0u closed this Jun 6, 2016
@kl0u kl0u deleted the api_ft_files branch June 6, 2016 14:12
@kl0u kl0u restored the api_ft_files branch June 6, 2016 14:12
@kl0u kl0u reopened this Jun 6, 2016
@kl0u kl0u force-pushed the api_ft_files branch 3 times, most recently from a96f4de to b25ef87 Compare June 10, 2016 15:16
@fhueske
Copy link
Contributor

fhueske commented Jun 15, 2016

Hi @kl0u, this PR is merged, right?
Can you close it? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants