[Bug](sink) fix cancel_at_time not work at parallel_sink/parallel_outfile#61739
[Bug](sink) fix cancel_at_time not work at parallel_sink/parallel_outfile#61739BiteTheDDDDt wants to merge 3 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Pull request overview
Fixes deferred result-buffer cancellation for parallel result sink / parallel outfile by using the same identifier for cancellation as was used when registering the buffer in ResultBufferMgr.
Changes:
- Update
ResultSinkLocalState::close()to cancel byquery_idwhenenable_parallel_result_sinkis enabled; otherwise cancel byfragment_instance_id. - Update
ResultFileSinkLocalState::close()to cancel byquery_idwhenenable_parallel_outfileis enabled; otherwise cancel byfragment_instance_id. - Add clarifying comments explaining the key selection.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| be/src/exec/operator/result_sink_operator.cpp | Aligns deferred buffer cancellation key with parallel/non-parallel sender registration key for result sink. |
| be/src/exec/operator/result_file_sink_operator.cpp | Aligns deferred buffer cancellation key with parallel/non-parallel sender registration key for outfile sink. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // In parallel result sink mode, the buffer is registered under query_id; otherwise | ||
| // it is registered under fragment_instance_id. Pass the matching key so the | ||
| // deferred cancel actually finds and removes the buffer entry. | ||
| state->exec_env()->result_mgr()->cancel_at_time( | ||
| time(nullptr) + config::result_buffer_cancelled_interval_time, | ||
| state->fragment_instance_id()); | ||
| state->query_options().enable_parallel_result_sink ? state->query_id() | ||
| : state->fragment_instance_id()); |
There was a problem hiding this comment.
In parallel result sink mode, multiple fragment instances share the same ResultBlockBuffer keyed by query_id, and close() is invoked per fragment instance (removing one dependency each time). Scheduling cancel_at_time(..., query_id) on every instance close can cancel/erase the shared buffer after the interval even if other instances are still producing results (i.e., before the buffer is actually fully closed), which can break result fetching and/or cancel in-flight execution. Consider scheduling the deferred cancel only when the shared buffer transitions to fully closed (after the last dependency is removed), e.g., by moving this scheduling into ResultBlockBuffer::close when _result_sink_dependencies becomes empty, or by extending the ResultBlockBufferBase::close contract to indicate “final close” so callers can safely schedule cancel once.
| // In parallel outfile mode, the buffer is registered under query_id; otherwise | ||
| // it is registered under fragment_instance_id. Pass the matching key so the | ||
| // deferred cancel actually finds and removes the buffer entry. | ||
| state->exec_env()->result_mgr()->cancel_at_time( | ||
| time(nullptr) + config::result_buffer_cancelled_interval_time, | ||
| state->fragment_instance_id()); | ||
| state->query_options().enable_parallel_outfile ? state->query_id() | ||
| : state->fragment_instance_id()); |
There was a problem hiding this comment.
Same concern as result sink: in parallel outfile mode the buffer is shared under query_id and ResultBlockBuffer::close only fully closes once all fragment-instance dependencies have been removed. Calling cancel_at_time(..., query_id) from every fragment instance close can cancel the shared buffer after the interval even if some instances are still running, potentially breaking client fetch and/or canceling remaining producers. Consider deferring scheduling until the buffer is fully closed (e.g., schedule inside ResultBlockBuffer::close when the last dependency is removed, or change close() to return/indicate when it performed the final close so this can be scheduled once).
|
run buildall |
|
/review |
There was a problem hiding this comment.
Code Review Summary
Overall Assessment
The fix correctly addresses the bug where cancel_at_time was not working in parallel result-sink / parallel outfile mode. The root cause was that in parallel mode, the buffer is registered under query_id but cancel_at_time was unconditionally using fragment_instance_id, so it would try to cancel a non-existent entry. The approach of using is_fully_closed + buffer_id() is clean and correct.
Critical Checkpoint Conclusions
-
Goal / correctness: The goal is to fix deferred buffer cleanup in parallel sink modes. The production code accomplishes this correctly by (a) only scheduling cleanup when the last instance closes the shared buffer, and (b) using
buffer_id()which returns the correct registration key regardless of mode. However, existing unit tests will not compile due to the signature change (see inline comment). -
Minimal / focused: Yes, the change is focused and minimal.
-
Concurrency: The
is_fully_closedoutput parameter is set under_lockinsideResultBlockBuffer::close(), and the caller reads it afterclose()returns — no concurrent access issue. The_result_sink_dependenciesmap is correctly used as the reference-counting mechanism under the same lock. -
Lifecycle:
buffer_id()returns a const reference to_fragment_id, a member of the buffer object. The buffer remains alive (held by_sendershared_ptr) whenbuffer_id()is called, so no dangling reference. -
Parallel code paths: Both
result_file_sink_operator.cppandresult_sink_operator.cppare updated symmetrically. Good. -
Error handling: If
_sender->close()returns error,RETURN_IF_ERRORskipscancel_at_time. This is acceptable becausecreate_sender()already registers a safety-net timeout (execution_timeout + 5s) that will eventually clean up the buffer. -
Test coverage: Build-breaking issue — 6 call sites in
be/test/exec/sink/result_block_buffer_test.cpp(lines 163, 292, 311) andbe/test/exec/sink/arrow_result_block_buffer_test.cpp(lines 176, 308, 327) still callclose()with the old 3-argument signature. These must be updated to pass the newbool& is_fully_closedparameter. Additionally, the tests should be extended to verifyis_fully_closedis correctly set (false when dependencies remain, true when the last one closes). -
Observability: No new logging needed; existing logging in
get_batchalready covers the buffer lifecycle. -
Configuration: No new config items.
-
FE-BE compatibility: No protocol changes.
-
Performance: No concerns — the change adds a bool output parameter and a conditional branch, both trivial.
-
Other issues: None found.
| // Close one fragment instance's contribution to this buffer. When the last | ||
| // registered instance calls close(), |is_fully_closed| is set to true, | ||
| // indicating that no more producers will write to this buffer and callers may | ||
| // safely schedule deferred cleanup. The buffer is keyed in ResultBufferMgr |
There was a problem hiding this comment.
[Build Break] The close() signature changed from 3 to 4 parameters, but the following test call sites were not updated and will fail to compile:
be/test/exec/sink/result_block_buffer_test.cpplines 163, 292, 311be/test/exec/sink/arrow_result_block_buffer_test.cpplines 176, 308, 327
All 6 sites call buffer.close(ins_id, Status::..., 0) with only 3 arguments. They need to be updated to pass bool& is_fully_closed, e.g.:
bool is_fully_closed = false;
EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0, is_fully_closed).ok());Also consider adding test assertions on is_fully_closed itself — e.g., verify it returns false when other dependencies remain, and true when the last instance closes.
|
run buildall |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
This pull request refines the lifecycle management of result buffers in the execution engine. It introduces a mechanism to determine when a result buffer is fully closed (i.e., all fragment instances have finished writing), enabling deferred cleanup to be scheduled only at the appropriate time. The changes propagate this logic through the buffer interface, operator implementations, and associated tests.
Lifecycle management improvements:
The
closemethod inResultBlockBufferBaseand its implementations now takes abool& is_fully_closedparameter, which is set totrueonly when the last fragment instance has closed the buffer. This allows callers to safely schedule deferred cleanup only when the buffer is no longer needed. (be/src/runtime/result_block_buffer.h,be/src/runtime/result_block_buffer.cpp) [1] [2] [3] [4]In both
ResultFileSinkLocalState::closeandResultSinkLocalState::close, deferred cleanup viacancel_at_timeis now scheduled only whenis_fully_closedis true, and the correct buffer ID is used as the registration key, ensuring proper cleanup in both parallel and non-parallel modes. (be/src/exec/operator/result_file_sink_operator.cpp,be/src/exec/operator/result_sink_operator.cpp) [1] [2]Interface and API changes:
ResultBlockBufferBaseinterface now includes abuffer_id()method, which returns the correct ID for use in cleanup, abstracting away the difference between parallel and non-parallel modes. (be/src/runtime/result_block_buffer.h) [1] [2]Test updates:
closesignature and verify the value ofis_fully_closed, ensuring correct behavior in both normal and error scenarios. (be/test/exec/sink/arrow_result_block_buffer_test.cpp,be/test/exec/sink/result_block_buffer_test.cpp) [1] [2] [3] [4] [5] [6]