-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Improvement](cte) Reduce cases where fragments in recursive CTEs are not released in a timely manner. #60313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR improves recursive CTE fragment rerun/release behavior by avoiding reliance on QueryContext lookup in FragmentMgr::rerun_fragment, reducing cases where fragments aren’t notified/released promptly.
Changes:
- Update
FragmentMgr::rerun_fragmentto operate purely via the pipeline fragment context map rather thanget_query_ctx(). - Add
SCOPED_ATTACH_TASK(_query_ctx.get())inside severalPipelineFragmentContextrerun-related methods to ensure proper task context attachment. - Adjust recursive CTE RPC rerun loop to continue across fragments and return a consolidated status.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| be/src/runtime/fragment_mgr.cpp | Removes dependency on QueryContext lookup for rerun operations, using _pipeline_map directly. |
| be/src/pipeline/pipeline_fragment_context.cpp | Attaches task context within wait/release/rebuild rerun operations to support the new caller behavior. |
| be/src/pipeline/exec/rec_cte_source_operator.h | Changes rerun-fragment RPC error handling to continue across targets and return a final status. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return fragment_ctx->set_to_rerun(); | ||
| } else if (stage == PRerunFragmentParams::rebuild) { | ||
| return fragment_ctx->rebuild(_thread_pool.get()); | ||
| } else if (stage == PRerunFragmentParams::submit) { |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rerun_fragment() no longer attaches a task context, and the submit stage now calls fragment_ctx->submit() without any SCOPED_ATTACH_TASK. Other stages (wait_close/set_to_rerun/rebuild) now attach inside the callee, but submit() does not, making thread context / signal task id / mem tracking inconsistent for the submit path. Consider attaching in this caller for the submit branch via SCOPED_ATTACH_TASK(fragment_ctx->get_query_ctx()) (or equivalent) before calling submit().
| } else if (stage == PRerunFragmentParams::submit) { | |
| } else if (stage == PRerunFragmentParams::submit) { | |
| SCOPED_ATTACH_TASK(fragment_ctx->get_query_ctx()); |
be/src/runtime/fragment_mgr.cpp
Outdated
| print_id(query_id)); | ||
| return Status::InvalidArgument("Unknown rerun fragment opcode: {}", stage); | ||
| } | ||
| return Status::OK(); |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This return Status::OK(); is unreachable because every branch above returns. Removing it would avoid dead code and make control flow clearer.
| st = Status::InternalError(controller.ErrorText()); | ||
| } | ||
|
|
||
| auto rpc_st = Status::create(result.status()); | ||
| if (!rpc_st.ok()) { |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_send_rerun_fragments() now continues on RPC failures but overwrites st on each error, so the final returned status depends on the last failing fragment and may lose the original/root-cause error (and which fragment failed). Also, when controller.Failed() is true, result.status() is not meaningful; consider skipping the Status::create(result.status()) path for that iteration (e.g., continue) and preserving the first failure (or aggregating errors with fragment id/address) instead of overwriting.
| st = Status::InternalError(controller.ErrorText()); | |
| } | |
| auto rpc_st = Status::create(result.status()); | |
| if (!rpc_st.ok()) { | |
| if (st.ok()) { | |
| st = Status::InternalError(controller.ErrorText()); | |
| } | |
| // When the RPC controller reports failure, the result status is not reliable. | |
| continue; | |
| } | |
| auto rpc_st = Status::create(result.status()); | |
| if (!rpc_st.ok() && st.ok()) { |
|
run buildall |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
…timely manner. update update
8f08d33 to
81583c0
Compare
|
run buildall |
What problem does this PR solve?
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)