New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FLINK-3808: Refactor the whole file monitoring source to take a fileInputFormat as an argument. #1929
Conversation
Addresses issue FLINK-3808 . |
What's with the PR title? 😕 |
import java.util.Set; | ||
|
||
/** | ||
* This is the single (non-parallel) task, that monitors a user-procided path, and assigns splits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: procided -> provided
// /** | ||
// * The default file path filtering method and is used | ||
// * if no other such function is provided. This filter leaves out | ||
// * files starting with ".", "_", and "_COPYING_". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leftover code?
Can you provide a rough description as to how the FileSourceMonitoringFunction works and how it interacts with the actual formats? |
* This is the single (non-parallel) task, that monitors a user-procided path, and assigns splits | ||
* to downstream tasks for further reading and processing, depending on the user-provided {@link FileSplitMonitoringFunction.WatchType}. | ||
* | ||
* This method keeps track of which splits have already being processed by which task, and at which point |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
method? This is the class javadoc!
f19d8f7
to
7da7e88
Compare
* This method keeps track of which splits have already being processed by which task, and at which point | ||
* in the file we are currently processing, at the granularity of the split. In addition, it keeps track | ||
* of the last modification time for each file, so that it can detect new data. | ||
* This is the single (non-parallel) task, that monitors a user-provided path, and assigns splits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both commas don't belong in this sentence.
aa2ceb6
to
534b7cd
Compare
@Override | ||
public void close() throws Exception { | ||
super.close(); | ||
this.reader.cancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add a reader.interrupt()
here. What do you think @StephanEwen ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the call.
30de4e7
to
e692fe3
Compare
/** | ||
* The testing {@link FileInputFormat} to be used throughout the tests. | ||
* */ | ||
private static class StringFileFormat extends FileInputFormat<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be replaced by using TextInputFormat
, both in this ITCase and the Test.
Thank you for the pull request. This is a much needed feature!
I agree with @zentol here! Its important to put the right JIRA id into the PR title so that comments are properly mirrored into JIRA (this has also legal reasons. GitHub is not an entity controlled by the ASF, so all discussions must be mirrored to some apache systems) For future pull requests, please make sure to follow the PR checklist (the JIRA reference is the first item there!) BTW: you can still edit the PR title. |
|
||
private volatile boolean isRunning = true; | ||
|
||
private Configuration configuration; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should explain why we have the Configuration here. That the Configuration that we get in open() is not valid. Same in the FileSplitReadOperator.
|
||
LOG.info("Split Reader terminated, and exiting normally."); | ||
} catch (IOException e) { | ||
throw new RuntimeException("Unable to open split: " + split, e.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should wrap this in an AsyncException
and store it, so that processElement
can rethrow it. Please have a look at StreamTask.asyncException
, we can basically copy the stuff from there.
This is not completely fool-proof, since we only recognize the exception if new data arrives in processElement
but it is better than nothing.
Thanks for the comments @aljoscha |
I'm very happy with this now. 😃 Looks good to merge. Now for the checkpointable |
One related test failure; the hadoop dependency could not be found for PROFILE="-Dhadoop.profile=1" |
Ah man, you're right, the test needs to be moved to the |
This is now included in #1984 |
This PR is for issue FLINK-3808 . It refactor the FileMonitoring sources to take as an argument a FileInputFormat and work at the granularity of a split, and not at that of a file. It still does not change the API calls, which still call the old code. This will come in a following PR.