Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 230 additions & 19 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

// use string iequal
#include <event2/buffer.h>
#include <event2/event.h>
#include <event2/http.h>
#include <gen_cpp/FrontendService.h>
#include <gen_cpp/FrontendService_types.h>
Expand Down Expand Up @@ -106,14 +107,62 @@ void StreamLoadAction::handle(HttpRequest* req) {
return;
}

// Mark handle as called, indicating all client data has been received
// This is safe to send response now without broken pipe
// Use mutex to protect the critical section and avoid race conditions
std::optional<std::string> response_to_send;
{
std::lock_guard<std::mutex> lock(ctx->response_mutex);

ctx->handle_called = true;

// If using async mode, check if fragment execution has completed
// If fragment completed before handle was called, response info was saved
// Now that handle is called, we can safely send the response
if (ctx->event_base != nullptr && ctx->http_request != nullptr &&
ctx->stream_load_action != nullptr) {
// Check if there's a pending response to send
// This happens when fragment execution completed before handle was called
if (ctx->pending_response.has_value()) {
// Fragment execution completed before handle was called
// Now handle is called (data received), safe to send response
response_to_send = std::move(ctx->pending_response.value());
ctx->pending_response.reset();
}
}
}

// Send response outside of lock to avoid holding lock during I/O
if (response_to_send.has_value()) {
HttpChannel::send_reply(req, response_to_send.value());
_finalize_request_cleanup(ctx);
return;
}

// No pending response, fragment execution not completed yet
// Continue with normal handle logic, response will be sent in callback
if (ctx->event_base != nullptr && ctx->http_request != nullptr &&
ctx->stream_load_action != nullptr) {
// Continue with normal handle logic, response will be sent in callback
}

// status already set to fail
if (ctx->status.ok()) {
ctx->status = _handle(ctx);
ctx->status = _handle(ctx, req);
if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
}
}

// If using async mode and no pending response, response will be sent in callback
if (ctx->event_base != nullptr && ctx->http_request != nullptr &&
ctx->stream_load_action != nullptr && ctx->status.ok()) {
// Async mode, response will be sent in callback when fragment execution completes
return;
}

// Sync mode or fallback mode, continue with original logic
ctx->load_cost_millis = UnixMillis() - ctx->start_millis;

if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
Expand Down Expand Up @@ -160,7 +209,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
}
}

Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
Copy link

Copilot AI Dec 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling error: "recevie" should be spelled "receive".

Suggested change
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
LOG(WARNING) << "receive body don't equal with body bytes, body_bytes=" << ctx->body_bytes

Copilot uses AI. Check for mistakes.
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
Expand All @@ -176,26 +225,38 @@ Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked));
}

// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());
// Check if event_base is set (should be set in _on_header)
// If not set, fallback to sync wait
if (ctx->event_base == nullptr || ctx->http_request == nullptr ||
ctx->stream_load_action == nullptr) {
// event_base not set (should not happen), fallback to sync wait
LOG(WARNING) << "event_base not set, fallback to sync wait, ctx=" << ctx->id.to_string();
RETURN_IF_ERROR(ctx->future.get());

if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string();
// Continue with original logic
if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id="
<< ctx->id.to_string();
return Status::OK();
}

if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
} else {
int64_t commit_and_publish_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
ctx->commit_and_publish_txn_cost_nanos =
MonotonicNanos() - commit_and_publish_start_time;
g_stream_load_commit_and_publish_latency_ms
<< ctx->commit_and_publish_txn_cost_nanos / 1000000;
}
return Status::OK();
}

if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
} else {
// If put file success we need commit this load
int64_t commit_and_publish_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time;
g_stream_load_commit_and_publish_latency_ms
<< ctx->commit_and_publish_txn_cost_nanos / 1000000;
}
// event_base is set, use async mode, return directly
// Future completion will trigger callback in stream_load_executor
Copy link

Copilot AI Dec 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect comment: The comment states "Future completion will trigger callback in stream_load_executor" but this is misleading. The callback is not triggered by future completion - it's triggered immediately by event_base_once with a zero timeout. The future/promise mechanism is actually bypassed in async mode, as noted in the code itself (lines 197-198, 202, 204).

Suggested change
// Future completion will trigger callback in stream_load_executor
// stream_load_executor will be invoked via event_base_once (zero-timeout) callback

Copilot uses AI. Check for mistakes.
return Status::OK();
}

Expand Down Expand Up @@ -340,14 +401,30 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
if (!http_req->header(HTTP_COMMENT).empty()) {
ctx->load_comment = http_req->header(HTTP_COMMENT);
}

// Set event_base early before calling execute_plan_fragment to avoid race conditions
// For use_streaming=true, execute_plan_fragment is called in _process_put
// For use_streaming=false, execute_plan_fragment is called in _handle
// Setting event_base here ensures it's available when execute_plan_fragment is called
struct evhttp_request* ev_req = http_req->get_evhttp_request();
struct evhttp_connection* conn = evhttp_request_get_connection(ev_req);
if (conn != nullptr) {
struct event_base* base = evhttp_connection_get_base(conn);
if (base != nullptr) {
ctx->event_base = base;
ctx->http_request = http_req;
ctx->stream_load_action = this;
}
}

// begin transaction
if (!ctx->group_commit) {
int64_t begin_txn_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
}

// process put file
// process put file (may call execute_plan_fragment for use_streaming=true)
return _process_put(http_req, ctx);
}

Expand Down Expand Up @@ -398,6 +475,12 @@ void StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) {
if (ctx == nullptr) {
return;
}

// Mark request as closed to prevent callback from accessing invalid pointers
ctx->http_request = nullptr;
ctx->stream_load_action = nullptr;
ctx->event_base = nullptr;

// sender is gone, make receiver know it
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel("sender is gone");
Expand Down Expand Up @@ -924,4 +1007,132 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
return Status::OK();
}

void StreamLoadAction::continue_handle_after_future(std::shared_ptr<StreamLoadContext> ctx,
Status fragment_status, bool need_rollback,
bool need_commit_self,
bool body_sink_cancelled) {
// Handle body_sink cancel if needed
if (!fragment_status.ok() && ctx->body_sink != nullptr) {
ctx->body_sink->cancel(fragment_status.to_string());
}

// Handle need_commit_self case
if (need_commit_self && ctx->body_sink != nullptr) {
if (body_sink_cancelled || !fragment_status.ok()) {
ctx->status = fragment_status;
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
_finalize_request(ctx->http_request, ctx);
return;
} else {
// Execute commit in callback (this was originally in exec_fragment)
static_cast<void>(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
_finalize_request(ctx->http_request, ctx);
return;
}
}

// Continue with original subsequent logic
if (!fragment_status.ok()) {
ctx->status = fragment_status;
if (need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
}
_finalize_request(ctx->http_request, ctx);
return;
}

if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string();
ctx->status = Status::OK();
_finalize_request(ctx->http_request, ctx);
return;
}

if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
Status commit_st = _exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
ctx->status = commit_st;
} else {
int64_t commit_and_publish_start_time = MonotonicNanos();
Status commit_st = _exec_env->stream_load_executor()->commit_txn(ctx.get());
ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time;
g_stream_load_commit_and_publish_latency_ms
<< ctx->commit_and_publish_txn_cost_nanos / 1000000;
ctx->status = commit_st;
}

_finalize_request(ctx->http_request, ctx);
Comment on lines +1010 to +1066
Copy link

Copilot AI Dec 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential null pointer dereference: In continue_handle_after_future, ctx->http_request is accessed on lines 996, 1001, 1013, 1020, and 1038, but it can be set to nullptr in free_handler_ctx (line 441). Even though there's a check on line 74 of stream_load_executor.cpp, there's no guarantee that http_request won't become null between that check and when continue_handle_after_future is called, creating a potential crash.

Copilot uses AI. Check for mistakes.
}

void StreamLoadAction::_finalize_request(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx) {
ctx->load_cost_millis = UnixMillis() - ctx->start_millis;

if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
}
Comment on lines +1073 to +1076
Copy link

Copilot AI Dec 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated rollback logic: The rollback logic appears in multiple places - in continue_handle_after_future (lines 995, 1010) and in _finalize_request (lines 1046). This duplication could lead to maintenance issues if the rollback behavior needs to change. Additionally, there's a risk of double rollback if both code paths execute, though the need_rollback flag should prevent this in most cases.

Suggested change
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
}
auto rollback_if_needed = [this](const std::shared_ptr<StreamLoadContext>& c) {
if (c->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(c.get());
c->need_rollback = false;
}
};
rollback_if_needed(ctx);

Copilot uses AI. Check for mistakes.
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(ctx->status.to_string());
}
}

auto str = ctx->to_json();
str = str + '\n';

// Check if handle() has been called (data received completely)
// If handle() has been called, safe to send response immediately
// Otherwise, save response string and wait for handle() to be called
// Use mutex to protect the critical section and avoid race conditions
std::lock_guard<std::mutex> lock(ctx->response_mutex);

if (ctx->handle_called) {
// handle() has been called, data received completely, safe to send response
HttpChannel::send_reply(req, str);
_finalize_request_cleanup(ctx);
} else {
// handle() not called yet, data still being received
// Save response string to send when handle() is called
ctx->pending_response = str;
LOG(INFO) << "Fragment execution completed before handle() was called, "
<< "saving response to send when handle() is called, ctx=" << ctx->id.to_string();
}
}

void StreamLoadAction::_finalize_request_cleanup(std::shared_ptr<StreamLoadContext> ctx) {
#ifndef BE_TEST
if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) {
if (ctx->http_request != nullptr &&
ctx->http_request->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
auto str = ctx->to_json();
str = str + '\n';
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
}
#endif

LOG(INFO) << "finished to execute stream load. label=" << ctx->label
<< ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id
<< ", load_cost_ms=" << ctx->load_cost_millis << ", receive_data_cost_ms="
<< (ctx->receive_and_read_data_cost_nanos - ctx->read_data_cost_nanos) / 1000000
<< ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 1000000
<< ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000
<< ", commit_and_publish_txn_cost_ms="
<< ctx->commit_and_publish_txn_cost_nanos / 1000000
<< ", number_total_rows=" << ctx->number_total_rows
<< ", number_loaded_rows=" << ctx->number_loaded_rows
<< ", receive_bytes=" << ctx->receive_bytes << ", loaded_bytes=" << ctx->loaded_bytes
<< ", error_url=" << ctx->error_url;

// update statistics
streaming_load_requests_total->increment(1);
streaming_load_duration_ms->increment(ctx->load_cost_millis);
if (!ctx->data_saved_path.empty()) {
_exec_env->load_path_mgr()->clean_tmp_files(ctx->data_saved_path);
}
}

} // namespace doris
13 changes: 11 additions & 2 deletions be/src/http/action/stream_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,24 @@ class StreamLoadAction : public HttpHandler {
void on_chunk_data(HttpRequest* req) override;
void free_handler_ctx(std::shared_ptr<void> ctx) override;

// Continue handling after future is ready (called in libevent thread)
void continue_handle_after_future(std::shared_ptr<StreamLoadContext> ctx,
Status fragment_status, bool need_rollback,
bool need_commit_self, bool body_sink_cancelled);

private:
Status _on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
Status _handle(std::shared_ptr<StreamLoadContext> ctx);
Status _handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
Status _data_saved_path(HttpRequest* req, std::string* file_path, int64_t file_bytes);
Status _process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str);
Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);

private:
// Finalize request and send response
void _finalize_request(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx);
// Cleanup after finalizing request (for statistics and logging)
void _finalize_request_cleanup(std::shared_ptr<StreamLoadContext> ctx);

ExecEnv* _exec_env;

std::shared_ptr<MetricEntity> _stream_load_entity;
Expand Down
19 changes: 19 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <event2/event.h>
#include <gen_cpp/BackendService_types.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/PlanNodes_types.h>
Expand All @@ -27,6 +28,7 @@
#include <future>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
Expand All @@ -47,6 +49,10 @@ namespace io {
class StreamLoadPipe;
} // namespace io

// Forward declarations
class HttpRequest;
class StreamLoadAction;

// kafka related info
class KafkaLoadInfo {
public:
Expand Down Expand Up @@ -256,6 +262,19 @@ class StreamLoadContext {
std::string qualified_user;
std::string cloud_cluster;

// Fields for async processing (Scheme B: libevent deferred callback)
// These fields are set in _on_header() before execute_plan_fragment is called
// to avoid race conditions
struct event_base* event_base = nullptr; // libevent event loop
HttpRequest* http_request = nullptr; // HTTP request reference
StreamLoadAction* stream_load_action = nullptr; // StreamLoadAction instance pointer
Comment on lines +268 to +270
Copy link

Copilot AI Dec 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raw pointer members without ownership semantics: The StreamLoadContext class now contains raw pointers (event_base, http_request, stream_load_action) that don't express ownership. These pointers are set to nullptr in free_handler_ctx but there's no clear lifetime management. If the HttpRequest or StreamLoadAction is destroyed while the async callback is pending or executing, accessing these pointers will cause undefined behavior. Consider using weak_ptr or another mechanism to safely check if these objects are still valid.

Copilot uses AI. Check for mistakes.
bool handle_called {
false}; // Flag to indicate if handle() has been called (data received completely)
std::optional<std::string>
pending_response; // Pending response string to send when handle() is called
mutable std::mutex
response_mutex; // Mutex to protect pending_response and handle_called check/set

public:
ExecEnv* exec_env() { return _exec_env; }

Expand Down
Loading
Loading