New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-5095] Flink: Stores a special watermark(flag) to identify the c… #7099
base: master
Are you sure you want to change the base?
Conversation
return extractTimestamp((HoodieAvroRecord) value); | ||
} | ||
return extractTimestamp((RowData) value); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move these logic into AbstractStreamWriteFunction.java
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why put it here is BulkInsertWriteFunction only extends BulkInsertWriteFunction instead of AbstractStreamWriteFunction, if moving this code to AbstractStreamWriteFunction will make a specified way to deal with BulkInsertWriteFunction
9c8daed
to
427bc0a
Compare
@XuQianJin-Stars what do you think about this way to describe the progress of the streaming writer |
+1, This is also the implementation method used in our internal production. |
Please fix checkstyle to pass ci. |
@yuzhaojing thanks for the review, I will fix the issue you mentioned |
@JerryYue-M Gentle ping to fix the checkstyle issues. This is very close to merging. |
...datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
Outdated
Show resolved
Hide resolved
…urrent progress of writing data
427bc0a
to
f29644c
Compare
@codope @danny0405 @yuzhaojing @XuQianJin-Stars |
@hudi-bot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we fetch the event time through the payload clazz event time field we the HoodieRecordPayload#getInsertValue
is invoked ? And we can finalize the max timestamp
in the coordinator when commit the metadata.
|
Change Logs
Fixed #7098 Hold a event time instance in each write task to evaluate the progress for write data
Impact
No Impact ,User can disable it use config
Risk level (write none, low medium or high below)
low
Documentation Update
No
Contributor's checklist