-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[opt](stream-load) Implement async future handling for stream load to avoid blocking libevent threads #59428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements asynchronous future handling for stream load operations to prevent blocking libevent threads. The implementation uses event_base_once to defer callback execution to the libevent thread, avoiding long-running operations in the fragment manager thread.
Key Changes:
- Introduced async callback mechanism using libevent's
event_base_onceto handle stream load completion without blocking - Added event_base, http_request, and stream_load_action fields to StreamLoadContext for async processing coordination
- Refactored completion logic to support both sync (fallback) and async modes based on event_base availability
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 13 comments.
| File | Description |
|---|---|
| be/src/runtime/stream_load/stream_load_executor.cpp | Implements async callback infrastructure with StreamLoadAsyncCallbackData and event_base_once integration for non-blocking fragment completion handling |
| be/src/runtime/stream_load/stream_load_context.h | Adds event_base, http_request, and stream_load_action pointers to support async callback coordination |
| be/src/http/action/stream_load.h | Adds continue_handle_after_future public method and _finalize_request private method to support async completion flow |
| be/src/http/action/stream_load.cpp | Refactors _handle to support async mode, implements continue_handle_after_future callback, and extracts common finalization logic into _finalize_request |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (ctx->need_rollback) { | ||
| _exec_env->stream_load_executor()->rollback_txn(ctx.get()); | ||
| ctx->need_rollback = false; | ||
| } |
Copilot
AI
Dec 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicated rollback logic: The rollback logic appears in multiple places - in continue_handle_after_future (lines 995, 1010) and in _finalize_request (lines 1046). This duplication could lead to maintenance issues if the rollback behavior needs to change. Additionally, there's a risk of double rollback if both code paths execute, though the need_rollback flag should prevent this in most cases.
| if (ctx->need_rollback) { | |
| _exec_env->stream_load_executor()->rollback_txn(ctx.get()); | |
| ctx->need_rollback = false; | |
| } | |
| auto rollback_if_needed = [this](const std::shared_ptr<StreamLoadContext>& c) { | |
| if (c->need_rollback) { | |
| _exec_env->stream_load_executor()->rollback_txn(c.get()); | |
| c->need_rollback = false; | |
| } | |
| }; | |
| rollback_if_needed(ctx); |
| // Note: In async mode, no code is waiting for promise (_handle() has returned), | ||
| // so no need to set promise | ||
| LOG(ERROR) << "event_base_once failed, cannot send async callback, ctx=" | ||
| << ctx->id.to_string() << ", errno=" << errno; |
Copilot
AI
Dec 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memory leak: If event_base_once fails (line 195), the callback_data allocated on line 181-186 is never deleted, causing a memory leak. The callback_data should be deleted in the error path.
| << ctx->id.to_string() << ", errno=" << errno; | |
| << ctx->id.to_string() << ", errno=" << errno; | |
| // Clean up callback_data since callback will never be invoked on failure | |
| delete callback_data; |
| struct event_base* event_base = nullptr; // libevent event loop | ||
| HttpRequest* http_request = nullptr; // HTTP request reference | ||
| StreamLoadAction* stream_load_action = nullptr; // StreamLoadAction instance pointer |
Copilot
AI
Dec 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Raw pointer members without ownership semantics: The StreamLoadContext class now contains raw pointers (event_base, http_request, stream_load_action) that don't express ownership. These pointers are set to nullptr in free_handler_ctx but there's no clear lifetime management. If the HttpRequest or StreamLoadAction is destroyed while the async callback is pending or executing, accessing these pointers will cause undefined behavior. Consider using weak_ptr or another mechanism to safely check if these objects are still valid.
be/src/http/action/stream_load.cpp
Outdated
| void StreamLoadAction::_finalize_request(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx) { | ||
| ctx->load_cost_millis = UnixMillis() - ctx->start_millis; | ||
|
|
||
| if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { | ||
| if (ctx->need_rollback) { | ||
| _exec_env->stream_load_executor()->rollback_txn(ctx.get()); | ||
| ctx->need_rollback = false; | ||
| } | ||
| if (ctx->body_sink != nullptr) { | ||
| ctx->body_sink->cancel(ctx->status.to_string()); | ||
| } | ||
| } | ||
|
|
||
| auto str = ctx->to_json(); | ||
| str = str + '\n'; | ||
| HttpChannel::send_reply(req, str); |
Copilot
AI
Dec 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential use-after-free: In _finalize_request, ctx->http_request is passed as a parameter and used on line 1056 to send a reply. However, the http_request pointer in ctx may have been set to nullptr by free_handler_ctx (line 441) which could be called if the client disconnects. While the parameter is passed by value, if free_handler_ctx is called concurrently or the underlying HttpRequest object is destroyed, this will cause a crash.
| // event_base_once failed, log error | ||
| // Note: In async mode, no code is waiting for promise (_handle() has returned), | ||
| // so no need to set promise | ||
| LOG(ERROR) << "event_base_once failed, cannot send async callback, ctx=" | ||
| << ctx->id.to_string() << ", errno=" << errno; | ||
| // Note: Cannot send HTTP response in this case, request may be invalid | ||
| // No need to set promise, as no code is waiting in async mode |
Copilot
AI
Dec 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent error handling: When event_base_once fails (line 195), the error is logged but no corrective action is taken. The request will be left in an incomplete state - the fragment execution has finished but no response will be sent to the client, and the transaction state is unclear. At minimum, this should attempt to finalize the request or set an error status that will be handled elsewhere.
| // event_base_once failed, log error | |
| // Note: In async mode, no code is waiting for promise (_handle() has returned), | |
| // so no need to set promise | |
| LOG(ERROR) << "event_base_once failed, cannot send async callback, ctx=" | |
| << ctx->id.to_string() << ", errno=" << errno; | |
| // Note: Cannot send HTTP response in this case, request may be invalid | |
| // No need to set promise, as no code is waiting in async mode | |
| // event_base_once failed, log error and fall back to executing callback | |
| // synchronously in the current thread to avoid leaving the request incomplete. | |
| LOG(ERROR) << "event_base_once failed, cannot send async callback, ctx=" | |
| << ctx->id.to_string() << ", errno=" << errno; | |
| // Best-effort fallback: directly invoke the callback so that the normal | |
| // completion logic (including HTTP response / txn handling) still runs. | |
| // The callback will delete callback_data. | |
| stream_load_async_callback(-1, EV_TIMEOUT, callback_data); |
| Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) { | ||
| Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) { | ||
| if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { | ||
| LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes |
Copilot
AI
Dec 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling error: "recevie" should be spelled "receive".
| LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes | |
| LOG(WARNING) << "receive body don't equal with body bytes, body_bytes=" << ctx->body_bytes |
| // Save other state needed in callback | ||
| bool need_rollback = ctx->need_rollback; | ||
| bool need_commit_self = ctx->need_commit_self; | ||
| bool body_sink_cancelled = (ctx->body_sink != nullptr && ctx->body_sink->cancelled()); |
Copilot
AI
Dec 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double access to body_sink: The code reads ctx->body_sink and checks if it's cancelled on line 178, but this happens in the fragment manager thread. Later, the callback may access body_sink in a different thread (libevent thread). If body_sink is not thread-safe or can be modified/destroyed between these accesses, this could lead to undefined behavior. The body_sink_cancelled flag is captured but the sink itself may still be accessed in continue_handle_after_future.
… avoid blocking libevent threads
df91c1a to
73fd4a0
Compare
|
run buildall |
TPC-H: Total hot run time: 34928 ms |
TPC-DS: Total hot run time: 179035 ms |
ClickBench: Total hot run time: 27.27 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)