-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[FLINK-29297] Group Table Store file writers into SingleFileWriter and RollingFileWriter #295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
JingsongLi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution, left comments.
| LOG.warn( | ||
| "Failed to open the bulk writer, closing the output stream and throw the error.", | ||
| e); | ||
| IOUtils.closeQuietly(out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to close out here, outside will invoke abort.
| // Abort this writer to clear uncommitted files. | ||
| writer.abort(); | ||
|
|
||
| writer.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to use close and return result.
We can just make RecordWriter.close returns void.
| import java.util.function.Supplier; | ||
|
|
||
| /** A {@link RollingFileWriter} to write {@link KeyValue}s into several rolling data files. */ | ||
| public class KeyValueDataRollingFileWriter extends RollingFileWriter<KeyValue, DataFileMeta> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to create separate class? I think there is no logical.
| + (currentWriter == null ? null : currentWriter.path()) | ||
| + ". Cleaning up.", | ||
| e); | ||
| abort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add document to FileWriter.abort, Implementation needs to be reentrant
| } | ||
|
|
||
| try { | ||
| if (writer == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a single file writer, the file should be single instead of zero.
We can create writer in the constructor. This can avoid various inconsistencies caused by not producing files.
| + (currentWriter == null ? null : currentWriter.path()) | ||
| + ". Cleaning up.", | ||
| e); | ||
| abort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add document to FileWriter.write, clear file by itself now when exception in write.
JingsongLi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
Currently we have two types of files to write:
However, current writers are all based on
MetricFileWriter, which always produces statistics.We'd like to refactor the writers and group them into
SingleFileWriterandRollingFileWriter.StatsCollectingSingleFileWritershould be a subclass ofSingleFileWriterwhich additionally produces statistics, and data file writers should be a subclass ofStatsCollectingSingleFileWriterorRollingFileWriterbased on their level. For extra file writers, extending fromSingleFileWriteris enough.