New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-17587][filesystem] Filesystem streaming sink support commit success file #12062
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit e0a38b8 (Fri Oct 16 10:31:47 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JingsongLi Thanks for the great efforts. I have left some comments.
...rc/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
Outdated
Show resolved
Hide resolved
...k-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
Outdated
Show resolved
Hide resolved
...ime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
Show resolved
Hide resolved
...ime-blink/src/main/java/org/apache/flink/table/filesystem/stream/PartitionCommitManager.java
Outdated
Show resolved
Hide resolved
...time-blink/src/main/java/org/apache/flink/table/filesystem/stream/PartitionCommitPolicy.java
Outdated
Show resolved
Hide resolved
...ime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public void close() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that there's some pending data between close
and the last notifyCheckpointComplete
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll modify to endInput
and support commit pending data.
...me-blink/src/main/java/org/apache/flink/table/filesystem/stream/SuccessFileCommitPolicy.java
Outdated
Show resolved
Hide resolved
7c0c835
to
e7040a4
Compare
* | ||
* <p>See {@link StreamingFileCommitter}. | ||
*/ | ||
public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage> implements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is copy pasting quite a bit of code/fields from StreamingFileSink
. Can not we extract a common abstraction or re-use one in the another?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've discussed with @gaoyunhaii offline, mainly because feel that there is too little code, so don't reuse it.
But +1 for abstraction. I've extracted a helper from StreamingFileSink
. See
36fa3d2
0d278fa
to
e2b2ede
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @JingsongLi for the update. Left some minor comments.
" 'partition-time': extract time from partition," + | ||
" if 'watermark' > 'partition-time' + 'delay', will commit the partition." + | ||
" 'process-time': use processing time, if 'current processing time' > " + | ||
"'partition creation time' + 'delay', will commit the partition."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should explain how "partition creation time" is determined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"partition directory creation time"
key("sink.partition-commit.policy.class") | ||
.stringType() | ||
.noDefaultValue() | ||
.withDescription("The partition commit policy class for implement PartitionCommitPolicy interface."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this only work custom commit policy? If so it should be mentioned in the description and reflected in the config name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add comment, it is OK, has been mentioned in policy kind.
/** | ||
* Policy for commit a partition. | ||
* | ||
* <p>The implemented commit method needs to be reentrant because the same partition may be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* <p>The implemented commit method needs to be reentrant because the same partition may be | |
* <p>The implemented commit method needs to be idempotent because the same partition may be |
.withDescription("Policy to commit a partition is to notify the downstream" + | ||
" application that the partition has finished writing, the partition" + | ||
" is ready to be read." + | ||
" metastore: add partition to metastore." + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users choose metastore policy, don't they need to specify/provide a TableMetaStoreFactory
implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add comment: Only work with hive table, it is empty implementation for file system table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What is the purpose of the change
Committing a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read.
Add “.succes” file to directory (success file name is configurable too)
Brief change log
Verifying this change
StreamFileSystemTestCsvITCase
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation