fix(flink): fix the write handle close for append write#18756
Conversation
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR hardens append-writer shutdown so async buffer flushes finish before sort buffers are released and any surviving BulkInsertWriterHelper is closed during task shutdown. The ordering (shutdown → wait for in-flight async → awaitTermination → release pools → super.close → close writerHelper) correctly avoids the race between background writes and resource cleanup, and the tests cover both new behaviors. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One small naming suggestion on the hardcoded termination timeout; the rest of the changes look clean.
cc @yihua
| } | ||
| // Do not release the sort buffers while an already-submitted flush is still using them. | ||
| waitForAsyncWriteCompletion(); | ||
| if (!asyncWriteExecutor.awaitTermination(10, TimeUnit.MINUTES)) { |
There was a problem hiding this comment.
🤖 nit: the 10 here is a magic number — could you extract it into a named constant like ASYNC_EXECUTOR_TERMINATION_TIMEOUT_MINUTES so a future reader knows this value is intentional rather than arbitrary?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
Extracting to constant is a good suggestion. Also, should it be configurable? I think that's unnecessary but will defer to your judgement.
| } | ||
| // Do not release the sort buffers while an already-submitted flush is still using them. | ||
| waitForAsyncWriteCompletion(); | ||
| if (!asyncWriteExecutor.awaitTermination(10, TimeUnit.MINUTES)) { |
There was a problem hiding this comment.
Extracting to constant is a good suggestion. Also, should it be configurable? I think that's unnecessary but will defer to your judgement.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18756 +/- ##
============================================
+ Coverage 68.14% 68.16% +0.01%
- Complexity 29094 29121 +27
============================================
Files 2517 2518 +1
Lines 141113 141229 +116
Branches 17508 17533 +25
============================================
+ Hits 96160 96265 +105
+ Misses 37046 37041 -5
- Partials 7907 7923 +16
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
+1 |
Describe the issue this Pull Request addresses
AppendWriteFunctionWithBIMBufferSortcan have an in-flight async flush when the Flink task is being closed. The existing shutdown path stopped accepting new async work but did not wait for already-submitted work before releasing the sort buffers, which could allow background writes to overlap with resource cleanup during failover or cancellation.In addition,
AppendWriteFunctiondid not close a survivingBulkInsertWriterHelperfrom itsclose()path, leaving helper-owned write resources open when a task shuts down before the normal checkpoint flush path closes them.Summary and Changelog
This change tightens append-writer shutdown behavior so async buffer flushing completes before dependent resources are released, and any surviving writer helper is closed during function shutdown.
Working tree: harden append writer shutdown cleanup
AppendWriteFunction.close()to closewriterHelperwhen it is still present during task shutdown.AppendWriteFunctionWithBIMBufferSort.close()to shut down the async executor, wait for in-flight async flush work to finish, and only then release memory segment pools.TestAppendWriteFunctionwithtestCloseClosesWriterHelper.TestAppendWriteFunctionWithBIMBufferSortwithtestCloseWaitsForAsyncWriteBeforeClosingWriterHelperto verify close ordering when async work is still running.TestAppendWriteFunctionTestAppendWriteFunctionWithBIMBufferSortTestAppendWriteFunctionWithBufferSortImpact
This change affects Flink append-write task shutdown behavior only. It does not change public APIs, configuration, storage format, or normal write semantics.
The main user-facing effect is safer cleanup during cancellation/failover for append writers using bounded in-memory buffer sort, reducing the chance of resource cleanup racing with already-submitted async write work.
Risk Level
low
The change is localized to append-writer shutdown paths and adds explicit waiting during close, so the primary risk is a longer close path if async work is still in progress. This is mitigated by targeted unit coverage and successful validation with:
mvn -pl hudi-flink-datasource/hudi-flink -DskipITs -Dtest=TestAppendWriteFunction,TestAppendWriteFunctionWithBIMBufferSort,TestAppendWriteFunctionWithBufferSort testResult:
16tests run,0failures,0errors.Documentation Update
none
No user-facing configuration, API, or behavior contract changed; this is an internal lifecycle and cleanup fix.
Contributor's checklist