From 6d97a9abc30d7dab8924f5ff05ce036d7575884a Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 1 Aug 2023 13:57:06 +0800 Subject: [PATCH 01/20] Add try close profile --- be/src/pipeline/pipeline_task.cpp | 17 +++++++++++++++-- be/src/pipeline/pipeline_task.h | 12 +++++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 0e2041c488f03a..6749ea1c4adc57 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -92,6 +92,8 @@ void PipelineTask::_init_profile() { _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time); _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", exec_time); _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time); + _try_close_source_timer = ADD_CHILD_TIMER(_task_profile, "TryCloseSourceTime", exec_time); + _try_close_sink_timer = ADD_CHILD_TIMER(_task_profile, "TryCloseSinkTime", exec_time); _wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime"); _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime"); @@ -288,8 +290,19 @@ Status PipelineTask::try_close() { return Status::OK(); } _try_close_flag = true; - Status status1 = _sink->try_close(_state); - Status status2 = _source->try_close(_state); + if (!_prepared) { + return Status::OK(); + } + Status status1; + { + SCOPED_TIMER(_try_close_sink_timer); + status1 = _sink->try_close(_state); + } + Status status2; + { + SCOPED_TIMER(_try_close_source_timer); + status2 = _source->try_close(_state); + } return status1.ok() ? status2 : status1; } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 5cba2ef96ef631..be1c8e35acf8e1 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -144,11 +144,12 @@ class PipelineTask { bool sink_ret = _sink->is_pending_finish(); if (sink_ret) { + clear_dst_pending_finish_time(); return true; } else { this->set_dst_pending_finish_time(); + return false; } - return false; } bool source_can_read() { return _source->can_read(); } @@ -227,6 +228,13 @@ class PipelineTask { } } + void clear_dst_pending_finish_time() { + if (_is_dst_pending_finish_over) { + _dst_pending_finish_over_time = 0; + _is_dst_pending_finish_over = false; + } + } + void set_dst_pending_finish_time() { if (!_is_dst_pending_finish_over) { _dst_pending_finish_over_time = _pipeline_task_watcher.elapsed_time(); @@ -296,6 +304,8 @@ class PipelineTask { RuntimeProfile::Counter* _sink_timer; RuntimeProfile::Counter* _finalize_timer; RuntimeProfile::Counter* _close_timer; + RuntimeProfile::Counter* _try_close_sink_timer; + RuntimeProfile::Counter* _try_close_source_timer; RuntimeProfile::Counter* _block_counts; RuntimeProfile::Counter* _block_by_source_counts; RuntimeProfile::Counter* _block_by_sink_counts; From 96e1b3674aa1e70c0b9dd9ae54940023f938dadc Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 1 Aug 2023 18:11:43 +0800 Subject: [PATCH 02/20] rpc callback profile --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 71 ++++++++++++++++--- be/src/pipeline/exec/exchange_sink_buffer.h | 9 ++- 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index bf43c3b860cc40..d38451fa7472f9 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -116,6 +116,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { _instance_to_rpc_ctx[low_id] = {}; _instance_to_receiver_eof[low_id] = false; _instance_to_rpc_time[low_id] = 0; + _instance_to_rpc_callback_time[low_id] = 0; + _instance_to_rpc_callback_exec_time[low_id] = 0; _construct_request(low_id, finst_id); } @@ -205,7 +207,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, const PTransmitDataResult& result, const int64_t& start_rpc_time) { - set_rpc_time(id, start_rpc_time, result.receive_time()); + auto callback_start_time = GetCurrentTimeNanos(); Status s(Status::create(result.status())); if (s.is()) { _set_receiver_eof(id); @@ -217,6 +219,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } else { _send_rpc(id); } + auto callback_end_time = GetCurrentTimeNanos(); + set_rpc_time(id, start_rpc_time, result.receive_time(), callback_start_time, + callback_end_time); }); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); @@ -258,7 +263,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, const PTransmitDataResult& result, const int64_t& start_rpc_time) { - set_rpc_time(id, start_rpc_time, result.receive_time()); + auto callback_start_time = GetCurrentTimeNanos(); Status s(Status::create(result.status())); if (s.is()) { _set_receiver_eof(id); @@ -270,6 +275,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } else { _send_rpc(id); } + auto callback_end_time = GetCurrentTimeNanos(); + set_rpc_time(id, start_rpc_time, result.receive_time(), callback_start_time, + callback_end_time); }); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); @@ -324,17 +332,41 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { return _instance_to_receiver_eof[id]; } -void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) { +void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time, + int64_t* max_callback_time, + int64_t* min_callback_time, + int64_t* max_callback_exec_time, + int64_t* min_callback_exec_time) { int64_t local_max_time = 0; + int64_t local_max_callback_time = 0; + int64_t local_max_callback_exec_time = 0; int64_t local_min_time = INT64_MAX; + int64_t local_min_callback_time = INT64_MAX; + int64_t local_min_callback_exec_time = INT64_MAX; for (auto& [id, time] : _instance_to_rpc_time) { if (time != 0) { local_max_time = std::max(local_max_time, time); local_min_time = std::min(local_min_time, time); } + auto& callback_time = _instance_to_rpc_callback_time[id]; + if (callback_time !=0) { + local_max_callback_time = std::max(local_max_callback_time, callback_time); + local_min_callback_time = std::min(local_min_callback_time, callback_time); + } + auto& callback_exec_time = _instance_to_rpc_callback_exec_time[id]; + if (callback_exec_time !=0) { + local_max_callback_exec_time = + std::max(local_max_callback_exec_time, callback_exec_time); + local_min_callback_exec_time = + std::min(local_min_callback_exec_time, callback_exec_time); + } } *max_time = local_max_time; + *max_callback_time = local_max_callback_time; + *max_callback_exec_time = local_min_callback_time; *min_time = local_min_time; + *min_callback_time = local_min_callback_time; + *min_callback_exec_time = local_min_callback_exec_time; } int64_t ExchangeSinkBuffer::get_sum_rpc_time() { @@ -345,27 +377,46 @@ int64_t ExchangeSinkBuffer::get_sum_rpc_time() { return sum_time; } -void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time, - int64_t receive_rpc_time) { +void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_time, + int64_t callback_start_time, int64_t callback_end_time) { _rpc_count++; - int64_t rpc_spend_time = receive_rpc_time - start_rpc_time; DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end()); - if (rpc_spend_time > 0) { - _instance_to_rpc_time[id] += rpc_spend_time; + int64_t rpc_forward_time = receive_time - start_rpc_time; + int64_t rpc_callback_time = receive_time - start_rpc_time; + int64_t callback_exec_time = receive_time - start_rpc_time; + if (rpc_forward_time > 0) { + _instance_to_rpc_time[id] += rpc_forward_time; + } + + if (rpc_callback_time > 0) { + _instance_to_rpc_callback_time[id] += rpc_callback_time; + } + if (callback_exec_time > 0) { + _instance_to_rpc_callback_exec_time[id] += callback_exec_time; } } void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { auto* _max_rpc_timer = ADD_TIMER(profile, "RpcMaxTime"); + auto* _max_rpc_callback_timer = ADD_TIMER(profile, "RpcMaxCallbackTime"); + auto* _max_rpc_callback_exec_timer = ADD_TIMER(profile, "RpcMaxCallbackExecTime"); auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime"); + auto* _min_rpc_callback_timer = ADD_TIMER(profile, "RpcMinCallbackTime"); + auto* _min_rpc_callback_exec_timer = ADD_TIMER(profile, "RpcMinCallbackExecTime"); auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime"); auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT); auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime"); - int64_t max_rpc_time = 0, min_rpc_time = 0; - get_max_min_rpc_time(&max_rpc_time, &min_rpc_time); + int64_t max_rpc_time = 0, min_rpc_time = 0, max_callback_t = 0, min_callback_t = 0, + max_callback_exec_t = 0, min_callback_exec_t = 0; + get_max_min_rpc_time(&max_rpc_time, &min_rpc_time, &max_callback_t, &min_callback_t, + &max_callback_exec_t, &min_callback_exec_t); _max_rpc_timer->set(max_rpc_time); _min_rpc_timer->set(min_rpc_time); + _max_rpc_callback_timer->set(max_callback_t); + _min_rpc_callback_timer->set(min_callback_t); + _max_rpc_callback_exec_timer->set(max_callback_exec_t); + _min_rpc_callback_exec_timer->set(min_callback_exec_t); _count_rpc->set(_rpc_count); int64_t sum_time = get_sum_rpc_time(); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index c4636563108320..a19c175a67458a 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -170,7 +170,8 @@ class ExchangeSinkBuffer { bool can_write() const; bool is_pending_finish(); void close(); - void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); + void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_time, + int64_t callback_start_time, int64_t callback_end_time); void update_profile(RuntimeProfile* profile); private: @@ -191,6 +192,8 @@ class ExchangeSinkBuffer { phmap::flat_hash_map _instance_to_sending_by_pipeline; phmap::flat_hash_map _instance_to_receiver_eof; phmap::flat_hash_map _instance_to_rpc_time; + phmap::flat_hash_map _instance_to_rpc_callback_time; + phmap::flat_hash_map _instance_to_rpc_callback_exec_time; phmap::flat_hash_map _instance_to_rpc_ctx; std::atomic _is_finishing; @@ -209,7 +212,9 @@ class ExchangeSinkBuffer { inline void _failed(InstanceLoId id, const std::string& err); inline void _set_receiver_eof(InstanceLoId id); inline bool _is_receiver_eof(InstanceLoId id); - void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); + void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time, int64_t* max_callback_time, + int64_t* min_callback_time, int64_t* max_callback_exec_time, + int64_t* min_callback_exec_time); int64_t get_sum_rpc_time(); }; From 3b32fbad47f39bc9ea8d540cfdf5699463f03580 Mon Sep 17 00:00:00 2001 From: liulijia Date: Wed, 2 Aug 2023 11:28:18 +0800 Subject: [PATCH 03/20] Add pending profile --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 14 ++++++- be/src/pipeline/exec/exchange_sink_buffer.h | 1 + be/src/pipeline/pipeline_task.cpp | 7 ++++ be/src/pipeline/pipeline_task.h | 39 +++++++++++++++++-- be/src/vec/sink/vdata_stream_sender.cpp | 1 + 5 files changed, 57 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index d38451fa7472f9..ec5adde2abd046 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -114,6 +114,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { finst_id.set_lo(fragment_instance_id.lo); _instance_to_sending_by_pipeline[low_id] = true; _instance_to_rpc_ctx[low_id] = {}; + _instance_watcher[low_id] = {}; + _instance_watcher[low_id].start(); _instance_to_receiver_eof[low_id] = false; _instance_to_rpc_time[low_id] = 0; _instance_to_rpc_callback_time[low_id] = 0; @@ -313,6 +315,7 @@ void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) void ExchangeSinkBuffer::_ended(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); _instance_to_sending_by_pipeline[id] = true; + _instance_watcher[id].stop(); } void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { @@ -349,12 +352,12 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_ti local_min_time = std::min(local_min_time, time); } auto& callback_time = _instance_to_rpc_callback_time[id]; - if (callback_time !=0) { + if (callback_time != 0) { local_max_callback_time = std::max(local_max_callback_time, callback_time); local_min_callback_time = std::min(local_min_callback_time, callback_time); } auto& callback_exec_time = _instance_to_rpc_callback_exec_time[id]; - if (callback_exec_time !=0) { + if (callback_exec_time != 0) { local_max_callback_exec_time = std::max(local_max_callback_exec_time, callback_exec_time); local_min_callback_exec_time = @@ -422,5 +425,12 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { int64_t sum_time = get_sum_rpc_time(); _sum_rpc_timer->set(sum_time); _avg_rpc_timer->set(sum_time / std::max(static_cast(1), _rpc_count.load())); + + int64_t max_end_time = 0; + for (auto& [id, timer] : _instance_watcher) { + max_end_time = std::max(timer.elapsed_time(), max_end_time); + } + auto* _max_end_timer = ADD_TIMER(profile, "MaxRpcEndTime"); + _max_end_timer->set(max_end_time); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index a19c175a67458a..c4e9f2516c01cc 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -190,6 +190,7 @@ class ExchangeSinkBuffer { phmap::flat_hash_map _instance_to_seq; phmap::flat_hash_map> _instance_to_request; phmap::flat_hash_map _instance_to_sending_by_pipeline; + phmap::flat_hash_map _instance_watcher; phmap::flat_hash_map _instance_to_receiver_eof; phmap::flat_hash_map _instance_to_rpc_time; phmap::flat_hash_map _instance_to_rpc_callback_time; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 6749ea1c4adc57..df4fe4670c337c 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -70,7 +70,9 @@ void PipelineTask::_fresh_profile_counter() { COUNTER_SET(_begin_execute_timer, _begin_execute_time); COUNTER_SET(_eos_timer, _eos_time); COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time); + COUNTER_SET(_src_pending_finish_over_timer1, _src_pending_finish_over_time1); COUNTER_SET(_dst_pending_finish_over_timer, _dst_pending_finish_over_time); + COUNTER_SET(_dst_pending_finish_over_timer1, _dst_pending_finish_over_time1); COUNTER_SET(_pip_task_total_timer, (int64_t)_pipeline_task_watcher.elapsed_time()); } @@ -109,8 +111,13 @@ void PipelineTask::_init_profile() { _begin_execute_timer = ADD_TIMER(_task_profile, "Task1BeginExecuteTime"); _eos_timer = ADD_TIMER(_task_profile, "Task2EosTime"); + _src_pending_finish_check_timer = ADD_TIMER(_task_profile, "Task3SrcPendingFinishCheckTime"); _src_pending_finish_over_timer = ADD_TIMER(_task_profile, "Task3SrcPendingFinishOverTime"); + _src_pending_finish_over_timer1 = ADD_TIMER(_task_profile, "Task3SrcPendingFinishOverTime1"); + + _dst_pending_finish_check_timer = ADD_TIMER(_task_profile, "Task4DstPendingFinishCheckTime"); _dst_pending_finish_over_timer = ADD_TIMER(_task_profile, "Task4DstPendingFinishOverTime"); + _dst_pending_finish_over_timer1 = ADD_TIMER(_task_profile, "Task4DstPendingFinishOverTime1"); _pip_task_total_timer = ADD_TIMER(_task_profile, "Task5TotalTime"); _close_pipeline_timer = ADD_TIMER(_task_profile, "Task6ClosePipelineTime"); } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index be1c8e35acf8e1..c08d9db9736a93 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -135,16 +135,25 @@ class PipelineTask { void set_state(PipelineTaskState state); bool is_pending_finish() { - bool source_ret = _source->is_pending_finish(); + bool source_ret; + { + SCOPED_TIMER(_src_pending_finish_check_timer) + source_ret = _source->is_pending_finish(); + } if (source_ret) { + this->clear_src_pending_finish_time(); return true; } else { this->set_src_pending_finish_time(); } - bool sink_ret = _sink->is_pending_finish(); + bool sink_ret; + { + SCOPED_TIMER(_dst_pending_finish_check_timer); + sink_ret = _sink->is_pending_finish(); + } if (sink_ret) { - clear_dst_pending_finish_time(); + this->clear_dst_pending_finish_time(); return true; } else { this->set_dst_pending_finish_time(); @@ -221,7 +230,18 @@ class PipelineTask { } } + void clear_src_pending_finish_time() { + if (!_is_src_pending_finish_over) { + _src_pending_finish_over_time = 0; + _is_src_pending_finish_over = false; + } + } + void set_src_pending_finish_time() { + if (!_is_src_pending_finish_over1) { + _src_pending_finish_over_time1 = _pipeline_task_watcher.elapsed_time(); + _is_src_pending_finish_over1 = true; + } if (!_is_src_pending_finish_over) { _src_pending_finish_over_time = _pipeline_task_watcher.elapsed_time(); _is_src_pending_finish_over = true; @@ -236,6 +256,11 @@ class PipelineTask { } void set_dst_pending_finish_time() { + if (!_is_dst_pending_finish_over1) { + _dst_pending_finish_over_time1 = _pipeline_task_watcher.elapsed_time(); + _is_dst_pending_finish_over1 = true; + } + if (!_is_dst_pending_finish_over) { _dst_pending_finish_over_time = _pipeline_task_watcher.elapsed_time(); _is_dst_pending_finish_over = true; @@ -342,12 +367,20 @@ class PipelineTask { int64_t _eos_time = 0; //time 3 bool _is_src_pending_finish_over = false; + bool _is_src_pending_finish_over1 = false; + RuntimeProfile::Counter* _src_pending_finish_check_timer; RuntimeProfile::Counter* _src_pending_finish_over_timer; + RuntimeProfile::Counter* _src_pending_finish_over_timer1; int64_t _src_pending_finish_over_time = 0; + int64_t _src_pending_finish_over_time1 = 0; // time 4 bool _is_dst_pending_finish_over = false; + bool _is_dst_pending_finish_over1 = false; + RuntimeProfile::Counter* _dst_pending_finish_check_timer; RuntimeProfile::Counter* _dst_pending_finish_over_timer; + RuntimeProfile::Counter* _dst_pending_finish_over_timer1; int64_t _dst_pending_finish_over_time = 0; + int64_t _dst_pending_finish_over_time1 = 0; // time 5 bool _is_close_pipeline = false; RuntimeProfile::Counter* _close_pipeline_timer; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 6e01aabb0f75ca..14d1cd8ce4d469 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -803,6 +803,7 @@ Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** hol void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer* buffer) { for (auto channel : _channels) { + // TODO local channel no need to register?? ((PipChannel*)channel)->registe(buffer); } } From 7a098d78a34782d93fc2b81ca9a2a3edb35538b6 Mon Sep 17 00:00:00 2001 From: liulijia Date: Fri, 4 Aug 2023 23:45:23 +0800 Subject: [PATCH 04/20] Move FragmentExecState to plan_fragment_executor --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 4 +- be/src/pipeline/pipeline_fragment_context.cpp | 141 +++------ be/src/pipeline/pipeline_fragment_context.h | 21 +- be/src/pipeline/pipeline_task.cpp | 3 +- be/src/pipeline/pipeline_task.h | 2 +- be/src/runtime/fragment_mgr.cpp | 271 ++---------------- be/src/runtime/fragment_mgr.h | 3 +- be/src/runtime/plan_fragment_executor.cpp | 129 +++++++++ be/src/runtime/plan_fragment_executor.h | 104 +++++++ 9 files changed, 309 insertions(+), 369 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index ec5adde2abd046..299454dada25bf 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -426,11 +426,11 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { _sum_rpc_timer->set(sum_time); _avg_rpc_timer->set(sum_time / std::max(static_cast(1), _rpc_count.load())); - int64_t max_end_time = 0; + uint64_t max_end_time = 0; for (auto& [id, timer] : _instance_watcher) { max_end_time = std::max(timer.elapsed_time(), max_end_time); } auto* _max_end_timer = ADD_TIMER(profile, "MaxRpcEndTime"); - _max_end_timer->set(max_end_time); + _max_end_timer->set(static_cast(max_end_time)); } } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index d37410edb196c1..feffd8c8d02db4 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -124,14 +124,12 @@ PipelineFragmentContext::PipelineFragmentContext( _exec_env(exec_env), _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR), _query_ctx(std::move(query_ctx)), - _call_back(call_back), - _report_thread_active(false), + _finish_call_back(call_back), _report_status_cb(report_status_cb), _is_report_on_cancel(true) { if (_query_ctx->get_task_group()) { _task_group_entity = _query_ctx->get_task_group()->task_entity(); } - _report_thread_future = _report_thread_promise.get_future(); _fragment_watcher.start(); } @@ -139,12 +137,11 @@ PipelineFragmentContext::~PipelineFragmentContext() { if (_runtime_state != nullptr) { // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state. SCOPED_ATTACH_TASK(_runtime_state.get()); - _call_back(_runtime_state.get(), &_exec_status); + _finish_call_back(_runtime_state.get(), &_exec_status); _runtime_state.reset(); } else { - _call_back(_runtime_state.get(), &_exec_status); + _finish_call_back(_runtime_state.get(), &_exec_status); } - DCHECK(!_report_thread_active); } void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, @@ -156,6 +153,8 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, } if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { _exec_status = Status::Cancelled(msg); + } else { + _is_report_on_cancel = false; } _runtime_state->set_is_cancelled(true, msg); @@ -187,7 +186,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, } } -PipelinePtr PipelineFragmentContext::add_pipeline() { +PipelinePtr PipelineFragmentContext::_add_pipeline() { // _prepared、_submitted, _canceled should do not add pipeline PipelineId id = _next_pipeline_id++; auto pipeline = std::make_shared(id, weak_from_this()); @@ -202,10 +201,13 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re } const auto& local_params = request.local_params[idx]; _runtime_profile.reset(new RuntimeProfile("PipelineContext")); - _start_timer = ADD_TIMER(_runtime_profile, "StartTime"); + _start_timer = ADD_TIMER(_runtime_profile, "PipCtx1StartTime"); COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time()); - _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime"); - SCOPED_TIMER(_prepare_timer); + _prepare_timer = ADD_TIMER(_runtime_profile, "PipCtx2PrepareTime"); + _close_begin_timer = ADD_TIMER(_runtime_profile, "PipCtx3CloseTime"); + Defer prepare_defer {[&]() { + COUNTER_UPDATE(_prepare_timer, _fragment_watcher.elapsed_time()); + }}; auto* fragment_context = this; OpentelemetryTracer tracer = telemetry::get_noop_tracer(); @@ -314,7 +316,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state.get(), &_sink, *desc_tbl)); } - _root_pipeline = fragment_context->add_pipeline(); + _root_pipeline = fragment_context->_add_pipeline(); RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline)); if (_sink) { RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id, @@ -362,74 +364,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks( return Status::OK(); } -void PipelineFragmentContext::_stop_report_thread() { - if (!_report_thread_active) { - return; - } - - _report_thread_active = false; - - _stop_report_thread_cv.notify_one(); - // Wait infinitly to ensure that the report task is finished and the this variable - // is not used in report thread. - _report_thread_future.wait(); -} - -void PipelineFragmentContext::report_profile() { - SCOPED_ATTACH_TASK(_runtime_state.get()); - VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id(); - - _report_thread_active = true; - - std::unique_lock l(_report_thread_lock); - // tell Open() that we started - _report_thread_started_cv.notify_one(); - - // Jitter the reporting time of remote fragments by a random amount between - // 0 and the report_interval. This way, the coordinator doesn't get all the - // updates at once so its better for contention as well as smoother progress - // reporting. - int report_fragment_offset = rand() % config::status_report_interval; - // We don't want to wait longer than it takes to run the entire fragment. - _stop_report_thread_cv.wait_for(l, std::chrono::seconds(report_fragment_offset)); - while (_report_thread_active) { - if (config::status_report_interval > 0) { - // wait_for can return because the timeout occurred or the condition variable - // was signaled. We can't rely on its return value to distinguish between the - // two cases (e.g. there is a race here where the wait timed out but before grabbing - // the lock, the condition variable was signaled). Instead, we will use an external - // flag, _report_thread_active, to coordinate this. - _stop_report_thread_cv.wait_for(l, - std::chrono::seconds(config::status_report_interval)); - } else { - LOG(WARNING) << "config::status_report_interval is equal to or less than zero, exiting " - "reporting thread."; - break; - } - - if (VLOG_FILE_IS_ON) { - VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : " ") - << "profile for instance " << _runtime_state->fragment_instance_id(); - std::stringstream ss; - _runtime_state->runtime_profile()->compute_time_in_profile(); - _runtime_state->runtime_profile()->pretty_print(&ss); - if (_runtime_state->load_channel_profile()) { - // _runtime_state->load_channel_profile()->compute_time_in_profile(); // TODO load channel profile add timer - _runtime_state->load_channel_profile()->pretty_print(&ss); - } - VLOG_FILE << ss.str(); - } - - if (!_report_thread_active) { - break; - } - - send_report(false); - } - - VLOG_FILE << "exiting reporting thread: instance_id=" << _runtime_state->fragment_instance_id(); -} - // TODO: use virtual function to do abstruct Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur_pipe) { auto node_type = node->type(); @@ -491,7 +425,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur int child_count = union_node->children_count(); auto data_queue = std::make_shared(child_count); for (int child_id = 0; child_id < child_count; ++child_id) { - auto new_child_pipeline = add_pipeline(); + auto new_child_pipeline = _add_pipeline(); RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), new_child_pipeline)); OperatorBuilderPtr child_sink_builder = std::make_shared( union_node->id(), child_id, union_node, data_queue); @@ -505,7 +439,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur } case TPlanNodeType::AGGREGATION_NODE: { auto* agg_node = dynamic_cast(node); - auto new_pipe = add_pipeline(); + auto new_pipe = _add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe)); if (agg_node->is_aggregate_evaluators_empty()) { auto data_queue = std::make_shared(1); @@ -539,7 +473,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur break; } case TPlanNodeType::SORT_NODE: { - auto new_pipeline = add_pipeline(); + auto new_pipeline = _add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline)); OperatorBuilderPtr sort_sink = std::make_shared(node->id(), node); @@ -551,7 +485,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur break; } case TPlanNodeType::PARTITION_SORT_NODE: { - auto new_pipeline = add_pipeline(); + auto new_pipeline = _add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline)); OperatorBuilderPtr partition_sort_sink = @@ -564,7 +498,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur break; } case TPlanNodeType::ANALYTIC_EVAL_NODE: { - auto new_pipeline = add_pipeline(); + auto new_pipeline = _add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline)); OperatorBuilderPtr analytic_sink = @@ -598,7 +532,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur } case TPlanNodeType::HASH_JOIN_NODE: { auto* join_node = assert_cast(node); - auto new_pipe = add_pipeline(); + auto new_pipe = _add_pipeline(); if (join_node->should_build_hash_table()) { RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe)); } else { @@ -619,7 +553,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur break; } case TPlanNodeType::CROSS_JOIN_NODE: { - auto new_pipe = add_pipeline(); + auto new_pipe = _add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe)); OperatorBuilderPtr join_sink = std::make_shared(node->id(), node); @@ -657,14 +591,14 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur template Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode* node, PipelinePtr cur_pipe) { - auto build_pipeline = add_pipeline(); + auto build_pipeline = _add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), build_pipeline)); OperatorBuilderPtr sink_builder = std::make_shared>(node->id(), node); RETURN_IF_ERROR(build_pipeline->set_sink(sink_builder)); for (int child_id = 1; child_id < node->children_count(); ++child_id) { - auto probe_pipeline = add_pipeline(); + auto probe_pipeline = _add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(child_id), probe_pipeline)); OperatorBuilderPtr probe_sink_builder = std::make_shared>(node->id(), child_id, @@ -781,7 +715,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size(); _multi_cast_stream_sink_senders.resize(sender_size); for (int i = 0; i < sender_size; ++i) { - auto new_pipeline = add_pipeline(); + auto new_pipeline = _add_pipeline(); auto row_desc = !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty() @@ -824,9 +758,9 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr } void PipelineFragmentContext::_close_action() { - _runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); - send_report(true); - _stop_report_thread(); + _runtime_profile->total_time_counter()->update(close_end_time); + COUNTER_UPDATE(_close_timer, _fragment_watcher.elapsed_time()); + this->_send_report(); // all submitted tasks done _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this()); } @@ -839,7 +773,7 @@ void PipelineFragmentContext::close_a_pipeline() { } } -void PipelineFragmentContext::send_report(bool done) { +void PipelineFragmentContext::_send_report() const { Status exec_status = Status::OK(); { std::lock_guard l(_status_lock); @@ -848,7 +782,7 @@ void PipelineFragmentContext::send_report(bool done) { // If plan is done successfully, but _is_report_success is false, // no need to send report. - if (!_is_report_success && done && exec_status.ok()) { + if (!_is_report_success && exec_status.ok()) { return; } @@ -860,14 +794,17 @@ void PipelineFragmentContext::send_report(bool done) { return; } - _report_status_cb( - {exec_status, _is_report_success ? _runtime_state->runtime_profile() : nullptr, - _is_report_success ? _runtime_state->load_channel_profile() : nullptr, - done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, - _fragment_instance_id, _backend_num, _runtime_state.get(), - std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), - std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, - std::placeholders::_2)}); + auto share_this = shared_from_this(); + _exec_env->send_report_thread_pool()->submit_func([&, this] { + share_this->_report_status_cb( + {exec_status, _is_report_success ? _runtime_state->runtime_profile() : nullptr, + _is_report_success ? _runtime_state->load_channel_profile() : nullptr, + !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, + _fragment_instance_id, _backend_num, _runtime_state.get(), + std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), + std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, + std::placeholders::_2)}); + }); } } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index cda6206d9b9e77..3c5737d143a846 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -67,8 +67,6 @@ class PipelineFragmentContext : public std::enable_shared_from_this l(_status_lock); @@ -123,13 +119,12 @@ class PipelineFragmentContext : public std::enable_shared_from_this Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr); void _close_action(); - void _stop_report_thread(); - void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; } // Id of this query TUniqueId _query_id; @@ -181,22 +176,14 @@ class PipelineFragmentContext : public std::enable_shared_from_this _call_back; + std::function _finish_call_back; std::once_flag _close_once_flag; - std::condition_variable _report_thread_started_cv; - // true if we started the thread - bool _report_thread_active; // profile reporting-related report_status_callback _report_status_cb; - std::promise _report_thread_promise; - std::future _report_thread_future; - std::mutex _report_thread_lock; - // Indicates that profile reporting thread should stop. - // Tied to _report_thread_lock. - std::condition_variable _stop_report_thread_cv; // If this is set to false, and '_is_report_success' is false as well, // This executor will not report status to FE on being cancelled. bool _is_report_on_cancel; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index df4fe4670c337c..da6da619a96778 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -115,8 +115,9 @@ void PipelineTask::_init_profile() { _src_pending_finish_over_timer = ADD_TIMER(_task_profile, "Task3SrcPendingFinishOverTime"); _src_pending_finish_over_timer1 = ADD_TIMER(_task_profile, "Task3SrcPendingFinishOverTime1"); - _dst_pending_finish_check_timer = ADD_TIMER(_task_profile, "Task4DstPendingFinishCheckTime"); _dst_pending_finish_over_timer = ADD_TIMER(_task_profile, "Task4DstPendingFinishOverTime"); + _dst_pending_finish_check_timer = + ADD_TIMER(_dst_pending_finish_over_timer, "Task4DstPendingFinishCheckTime"); _dst_pending_finish_over_timer1 = ADD_TIMER(_task_profile, "Task4DstPendingFinishOverTime1"); _pip_task_total_timer = ADD_TIMER(_task_profile, "Task5TotalTime"); _close_pipeline_timer = ADD_TIMER(_task_profile, "Task6ClosePipelineTime"); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index c08d9db9736a93..01f17598f97426 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -137,7 +137,7 @@ class PipelineTask { bool is_pending_finish() { bool source_ret; { - SCOPED_TIMER(_src_pending_finish_check_timer) + SCOPED_TIMER(_src_pending_finish_check_timer); source_ret = _source->is_pending_finish(); } if (source_ret) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c8fb7ac9ace4d8..648fc6f0fb57f3 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -63,7 +63,6 @@ #include "runtime/plan_fragment_executor.h" #include "runtime/primitive_type.h" #include "runtime/query_context.h" -#include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" @@ -109,208 +108,6 @@ std::string to_load_error_http_path(const std::string& file_name) { using apache::thrift::TException; using apache::thrift::transport::TTransportException; -class FragmentExecState { -public: - using report_status_callback_impl = std::function; - // Constructor by using QueryContext - FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num, - ExecEnv* exec_env, std::shared_ptr query_ctx, - const report_status_callback_impl& report_status_cb_impl); - - Status prepare(const TExecPlanFragmentParams& params); - - Status execute(); - - Status cancel(const PPlanFragmentCancelReason& reason, const std::string& msg = ""); - bool is_canceled() { return _cancelled; } - - TUniqueId fragment_instance_id() const { return _fragment_instance_id; } - - TUniqueId query_id() const { return _query_id; } - - PlanFragmentExecutor* executor() { return &_executor; } - - const vectorized::VecDateTimeValue& start_time() const { return _start_time; } - - void set_merge_controller_handler( - std::shared_ptr& handler) { - _merge_controller_handler = handler; - } - - // Update status of this fragment execute - Status update_status(const Status& status) { - std::lock_guard l(_status_lock); - if (!status.ok() && _exec_status.ok()) { - _exec_status = status; - LOG(WARNING) << "query_id=" << print_id(_query_id) - << ", instance_id=" << print_id(_fragment_instance_id) - << " meet error status " << status; - } - return _exec_status; - } - - void set_group(const TResourceInfo& info) { - _set_rsc_info = true; - _user = info.user; - _group = info.group; - } - - bool is_timeout(const vectorized::VecDateTimeValue& now) const { - if (_timeout_second <= 0) { - return false; - } - if (now.second_diff(_start_time) > _timeout_second) { - return true; - } - return false; - } - - int get_timeout_second() const { return _timeout_second; } - - std::shared_ptr get_query_ctx() { return _query_ctx; } - - void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } - -private: - void coordinator_callback(const Status& status, RuntimeProfile* profile, - RuntimeProfile* load_channel_profile, bool done); - - // Id of this query - TUniqueId _query_id; - // Id of this instance - TUniqueId _fragment_instance_id; - // Used to report to coordinator which backend is over - int _backend_num; - TNetworkAddress _coord_addr; - - // This context is shared by all fragments of this host in a query. - // _query_ctx should be the last one to be destructed, because _executor's - // destruct method will call close and it will depend on query context, - // for example runtime profile. - std::shared_ptr _query_ctx; - PlanFragmentExecutor _executor; - vectorized::VecDateTimeValue _start_time; - - std::mutex _status_lock; - Status _exec_status; - - bool _set_rsc_info = false; - std::string _user; - std::string _group; - - int _timeout_second; - std::atomic _cancelled {false}; - - std::shared_ptr _merge_controller_handler; - - // If set the true, this plan fragment will be executed only after FE send execution start rpc. - bool _need_wait_execution_trigger = false; - report_status_callback_impl _report_status_cb_impl; -}; - -FragmentExecState::FragmentExecState(const TUniqueId& query_id, - const TUniqueId& fragment_instance_id, int backend_num, - ExecEnv* exec_env, std::shared_ptr query_ctx, - const report_status_callback_impl& report_status_cb_impl) - : _query_id(query_id), - _fragment_instance_id(fragment_instance_id), - _backend_num(backend_num), - _query_ctx(std::move(query_ctx)), - _executor(exec_env, std::bind(std::mem_fn(&FragmentExecState::coordinator_callback), - this, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3, std::placeholders::_4)), - _set_rsc_info(false), - _timeout_second(-1), - _report_status_cb_impl(report_status_cb_impl) { - _start_time = vectorized::VecDateTimeValue::local_time(); - _coord_addr = _query_ctx->coord_addr; -} - -Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) { - if (params.__isset.query_options) { - _timeout_second = params.query_options.execution_timeout; - } - - if (_query_ctx == nullptr) { - if (params.__isset.resource_info) { - set_group(params.resource_info); - } - } - - if (_query_ctx == nullptr) { - return _executor.prepare(params); - } else { - return _executor.prepare(params, _query_ctx.get()); - } -} - -Status FragmentExecState::execute() { - if (_need_wait_execution_trigger) { - // if _need_wait_execution_trigger is true, which means this instance - // is prepared but need to wait for the signal to do the rest execution. - if (!_query_ctx->wait_for_start()) { - return cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait fragment start timeout"); - } - } -#ifndef BE_TEST - if (_executor.runtime_state()->is_cancelled()) { - return Status::Cancelled("cancelled before execution"); - } -#endif - int64_t duration_ns = 0; - { - SCOPED_RAW_TIMER(&duration_ns); - opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment"); - Status st = _executor.open(); - WARN_IF_ERROR(st, - strings::Substitute("Got error while opening fragment $0, query id: $1", - print_id(_fragment_instance_id), print_id(_query_id))); - if (!st.ok()) { - cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, - fmt::format("PlanFragmentExecutor open failed, reason: {}", st.to_string())); - } - _executor.close(); - } - DorisMetrics::instance()->fragment_requests_total->increment(1); - DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000); - return Status::OK(); -} - -Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { - if (!_cancelled) { - std::lock_guard l(_status_lock); - if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { - _executor.set_is_report_on_cancel(false); - } - _executor.cancel(reason, msg); -#ifndef BE_TEST - // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe - // For stream load the fragment's query_id == load id, it is set in FE. - auto stream_load_ctx = _query_ctx->exec_env()->new_load_stream_mgr()->get(_query_id); - if (stream_load_ctx != nullptr) { - stream_load_ctx->pipe->cancel(msg); - } -#endif - _cancelled = true; - } - return Status::OK(); -} - -// There can only be one of these callbacks in-flight at any moment, because -// it is only invoked from the executor's reporting thread. -// Also, the reported status will always reflect the most recent execution status, -// including the final status when execution finishes. -void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile, - RuntimeProfile* load_channel_profile, bool done) { - _report_status_cb_impl( - {status, profile, load_channel_profile, done, _coord_addr, _query_id, -1, - _fragment_instance_id, _backend_num, _executor.runtime_state(), - std::bind(&FragmentExecState::update_status, this, std::placeholders::_1), - std::bind(&PlanFragmentExecutor::cancel, &_executor, std::placeholders::_1, - std::placeholders::_2)}); - DCHECK(status.ok() || done); // if !status.ok() => done -} - FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _exec_env(exec_env), _stop_background_threads_latch(1) { _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); @@ -518,42 +315,8 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { static void empty_function(RuntimeState*, Status*) {} -void FragmentMgr::_exec_actual(std::shared_ptr exec_state, - const FinishCallback& cb) { - std::string func_name {"PlanFragmentExecutor::_exec_actual"}; -#ifndef BE_TEST - SCOPED_ATTACH_TASK(exec_state->executor()->runtime_state()); -#endif - - LOG_INFO(func_name) - .tag("query_id", exec_state->query_id()) - .tag("instance_id", exec_state->fragment_instance_id()) - .tag("pthread_id", (uintptr_t)pthread_self()); - - Status st = exec_state->execute(); - if (!st.ok()) { - exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "exec_state execute failed"); - } - - std::shared_ptr query_ctx = exec_state->get_query_ctx(); - bool all_done = false; - if (query_ctx != nullptr) { - // decrease the number of unfinished fragments - all_done = query_ctx->countdown(); - } - - // remove exec state after this fragment finished - { - std::lock_guard lock(_lock); - _fragment_map.erase(exec_state->fragment_instance_id()); - if (all_done && query_ctx) { - _query_ctx_map.erase(query_ctx->query_id); - } - } - - // Callback after remove from this id - auto status = exec_state->executor()->status(); - cb(exec_state->executor()->runtime_state(), &status); +void FragmentMgr::_exec_actual(std::shared_ptr exec_state, const FinishCallback& cb) { + exec_state->execute(cb); } Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { @@ -645,18 +408,36 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r return Status::OK(); } -void FragmentMgr::remove_pipeline_context( - std::shared_ptr f_context) { +void FragmentMgr::remove_fragment_exec_state(std::shared_ptr f_state) { std::lock_guard lock(_lock); - auto query_id = f_context->get_query_id(); - auto* q_context = f_context->get_query_context(); - bool all_done = q_context->countdown(); - _pipeline_map.erase(f_context->get_fragment_instance_id()); + auto ins_id = f_state->fragment_instance_id(); + auto q_context = f_state->get_query_ctx(); + auto query_id = f_state->query_id(); + + bool all_done = q_context != nullptr && q_context->countdown(); + + // remove exec state after this fragment finished + _fragment_map.erase(ins_id); if (all_done) { _query_ctx_map.erase(query_id); } } +void FragmentMgr::remove_pipeline_context( + std::shared_ptr& f_context) { + auto share_pip_ctx = f_context; + _exec_env->send_report_thread_pool()->submit_func([&, this] { + std::lock_guard lock(_lock); + auto query_id = share_pip_ctx->get_query_id(); + auto* q_context = share_pip_ctx->get_query_context(); + bool all_done = q_context->countdown(); + this->_pipeline_map.erase(share_pip_ctx->get_fragment_instance_id()); + if (all_done) { + this->_query_ctx_map.erase(query_id); + } + }); +} + template Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, std::shared_ptr& query_ctx) { diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 9820cf90456517..1832dac255e18a 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -93,8 +93,9 @@ class FragmentMgr : public RestMonitorIface { Status exec_plan_fragment(const TPipelineFragmentParams& params); + void remove_fragment_exec_state(std::shared_ptr f_state); void remove_pipeline_context( - std::shared_ptr pipeline_context); + std::shared_ptr& pipeline_context); // TODO(zc): report this is over Status exec_plan_fragment(const TExecPlanFragmentParams& params, const FinishCallback& cb); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index f4eb2dffa5f8e0..882a14020a3a01 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -44,6 +44,7 @@ #include "exec/scan_node.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/query_context.h" #include "runtime/query_statistics.h" @@ -569,4 +570,132 @@ void PlanFragmentExecutor::close() { _closed = true; } +///////////////// FragmentExecState ////////////// + +FragmentExecState::FragmentExecState(const TUniqueId& query_id, + const TUniqueId& fragment_instance_id, int backend_num, + ExecEnv* exec_env, std::shared_ptr query_ctx, + const report_status_callback_impl& report_status_cb_impl) + : _query_id(query_id), + _fragment_instance_id(fragment_instance_id), + _backend_num(backend_num), + _query_ctx(std::move(query_ctx)), + _executor(exec_env, std::bind(std::mem_fn(&FragmentExecState::coordinator_callback), + this, std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4)), + _set_rsc_info(false), + _timeout_second(-1), + _report_status_cb_impl(report_status_cb_impl) { + _start_time = vectorized::VecDateTimeValue::local_time(); + _coord_addr = _query_ctx->coord_addr; +} + +Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) { + if (params.__isset.query_options) { + _timeout_second = params.query_options.execution_timeout; + } + + if (_query_ctx == nullptr) { + if (params.__isset.resource_info) { + set_group(params.resource_info); + } + } + + if (_query_ctx == nullptr) { + return _executor.prepare(params); + } else { + return _executor.prepare(params, _query_ctx.get()); + } +} + +void FragmentExecState::execute(const FinishCallback& cb) { + std::string func_name {"FragmentExecState::execute"}; +#ifndef BE_TEST + SCOPED_ATTACH_TASK(_executor->runtime_state()); +#endif + + LOG_INFO(func_name) + .tag("query_id", query_id()) + .tag("instance_id", fragment_instance_id()) + .tag("pthread_id", (uintptr_t)pthread_self()); + + Status st = _execute(); + if (!st.ok()) { + cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "exec_state execute failed"); + } + + _executor->exec_env()->fragment_mgr()->remove_fragment_exec_state(shared_from_this()); + + // Callback after remove from this id + auto status = _executor->status(); + cb(_executor->runtime_state(), &status); +} + +Status FragmentExecState::_execute() const { + if (_need_wait_execution_trigger) { + // if _need_wait_execution_trigger is true, which means this instance + // is prepared but need to wait for the signal to do the rest execution. + if (!_query_ctx->wait_for_start()) { + return cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait fragment start timeout"); + } + } +#ifndef BE_TEST + if (_executor.runtime_state()->is_cancelled()) { + return Status::Cancelled("cancelled before execution"); + } +#endif + int64_t duration_ns = 0; + { + SCOPED_RAW_TIMER(&duration_ns); + opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment"); + Status st = _executor.open(); + WARN_IF_ERROR(st, + strings::Substitute("Got error while opening fragment $0, query id: $1", + print_id(_fragment_instance_id), print_id(_query_id))); + if (!st.ok()) { + cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, + fmt::format("PlanFragmentExecutor open failed, reason: {}", st.to_string())); + } + _executor.close(); + } + DorisMetrics::instance()->fragment_requests_total->increment(1); + DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000); + return Status::OK(); +} + +Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { + if (!_cancelled) { + std::lock_guard l(_status_lock); + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { + _executor.set_is_report_on_cancel(false); + } + _executor.cancel(reason, msg); +#ifndef BE_TEST + // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe + // For stream load the fragment's query_id == load id, it is set in FE. + auto stream_load_ctx = _query_ctx->exec_env()->new_load_stream_mgr()->get(_query_id); + if (stream_load_ctx != nullptr) { + stream_load_ctx->pipe->cancel(msg); + } +#endif + _cancelled = true; + } + return Status::OK(); +} + +// There can only be one of these callbacks in-flight at any moment, because +// it is only invoked from the executor's reporting thread. +// Also, the reported status will always reflect the most recent execution status, +// including the final status when execution finishes. +void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile, + RuntimeProfile* load_channel_profile, bool done) { + _report_status_cb_impl( + {status, profile, load_channel_profile, done, _coord_addr, _query_id, -1, + _fragment_instance_id, _backend_num, _executor.runtime_state(), + std::bind(&FragmentExecState::update_status, this, std::placeholders::_1), + std::bind(&PlanFragmentExecutor::cancel, &_executor, std::placeholders::_1, + std::placeholders::_2)}); + DCHECK(status.ok() || done); // if !status.ok() => done +} + } // namespace doris diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index c95ddc75c17c03..c525c172de6fda 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -34,6 +34,7 @@ #include #include "common/status.h" +#include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" @@ -135,6 +136,8 @@ class PlanFragmentExecutor { void set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; } + ExecEnv* exec_env() const { return _exec_env; } + private: ExecEnv* _exec_env; // not owned ExecNode* _plan; // lives in _runtime_state->obj_pool() @@ -245,4 +248,105 @@ class PlanFragmentExecutor { void _collect_node_statistics(); }; +// used for not pipeline exec +class FragmentExecState : public std::enable_shared_from_this { +public: + using FinishCallback = std::function; + using report_status_callback_impl = std::function; + // Constructor by using QueryContext + FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num, + ExecEnv* exec_env, std::shared_ptr query_ctx, + const report_status_callback_impl& report_status_cb_impl); + Status prepare(const TExecPlanFragmentParams& params); + + void execute(const FinishCallback& cb); + + Status cancel(const PPlanFragmentCancelReason& reason, const std::string& msg = ""); + bool is_canceled() { return _cancelled; } + + TUniqueId fragment_instance_id() const { return _fragment_instance_id; } + + TUniqueId query_id() const { return _query_id; } + + PlanFragmentExecutor* executor() { return &_executor; } + + const vectorized::VecDateTimeValue& start_time() const { return _start_time; } + + void set_merge_controller_handler( + std::shared_ptr& handler) { + _merge_controller_handler = handler; + } + + // Update status of this fragment execute + Status update_status(const Status& status) { + std::lock_guard l(_status_lock); + if (!status.ok() && _exec_status.ok()) { + _exec_status = status; + LOG(WARNING) << "query_id=" << print_id(_query_id) + << ", instance_id=" << print_id(_fragment_instance_id) + << " meet error status " << status; + } + return _exec_status; + } + + void set_group(const TResourceInfo& info) { + _set_rsc_info = true; + _user = info.user; + _group = info.group; + } + + bool is_timeout(const vectorized::VecDateTimeValue& now) const { + if (_timeout_second <= 0) { + return false; + } + if (now.second_diff(_start_time) > _timeout_second) { + return true; + } + return false; + } + + int get_timeout_second() const { return _timeout_second; } + + std::shared_ptr get_query_ctx() { return _query_ctx; } + + void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } + +private: + Status _execute() const; + void coordinator_callback(const Status& status, RuntimeProfile* profile, + RuntimeProfile* load_channel_profile, bool done); + + // Id of this query + TUniqueId _query_id; + // Id of this instance + TUniqueId _fragment_instance_id; + // Used to report to coordinator which backend is over + int _backend_num; + TNetworkAddress _coord_addr; + + // This context is shared by all fragments of this host in a query. + // _query_ctx should be the last one to be destructed, because _executor's + // destruct method will call close and it will depend on query context, + // for example runtime profile. + std::shared_ptr _query_ctx; + PlanFragmentExecutor _executor; + vectorized::VecDateTimeValue _start_time; + + std::mutex _status_lock; + Status _exec_status; + + bool _set_rsc_info = false; + std::string _user; + std::string _group; + + int _timeout_second; + std::atomic _cancelled {false}; + + std::shared_ptr _merge_controller_handler; + + // If set the true, this plan fragment will be executed only after FE send execution start rpc. + bool _need_wait_execution_trigger = false; + report_status_callback_impl _report_status_cb_impl; +}; + } // namespace doris From 981fe2db50ef463b18b2ed2d16fca6aeaa54466d Mon Sep 17 00:00:00 2001 From: liulijia Date: Sat, 5 Aug 2023 00:05:35 +0800 Subject: [PATCH 05/20] fix --- be/src/pipeline/pipeline_fragment_context.cpp | 31 +++++++++++-------- be/src/pipeline/pipeline_fragment_context.h | 2 +- be/src/runtime/fragment_mgr.cpp | 21 ++++++------- 3 files changed, 28 insertions(+), 26 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index feffd8c8d02db4..4f352b0580da2d 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -134,14 +134,7 @@ PipelineFragmentContext::PipelineFragmentContext( } PipelineFragmentContext::~PipelineFragmentContext() { - if (_runtime_state != nullptr) { - // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state. - SCOPED_ATTACH_TASK(_runtime_state.get()); - _finish_call_back(_runtime_state.get(), &_exec_status); - _runtime_state.reset(); - } else { - _finish_call_back(_runtime_state.get(), &_exec_status); - } + } void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, @@ -204,7 +197,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _start_timer = ADD_TIMER(_runtime_profile, "PipCtx1StartTime"); COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time()); _prepare_timer = ADD_TIMER(_runtime_profile, "PipCtx2PrepareTime"); - _close_begin_timer = ADD_TIMER(_runtime_profile, "PipCtx3CloseTime"); + _close_timer = ADD_TIMER(_runtime_profile, "PipCtx3CloseTime"); Defer prepare_defer {[&]() { COUNTER_UPDATE(_prepare_timer, _fragment_watcher.elapsed_time()); }}; @@ -758,11 +751,23 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr } void PipelineFragmentContext::_close_action() { - _runtime_profile->total_time_counter()->update(close_end_time); - COUNTER_UPDATE(_close_timer, _fragment_watcher.elapsed_time()); + auto close_time = _fragment_watcher.elapsed_time() + _runtime_profile->total_time_counter()->update(close_time); + COUNTER_UPDATE(_close_timer, close_time); this->_send_report(); // all submitted tasks done - _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this()); + auto share_this = shared_from_this(); + _exec_env->fragment_mgr()->remove_pipeline_context(share_this); + _exec_env->send_report_thread_pool()->submit_func([&, this] { + if (_runtime_state != nullptr) { + // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state. + SCOPED_ATTACH_TASK(_runtime_state.get()); + _finish_call_back(_runtime_state.get(), &_exec_status); + _runtime_state.reset(); + } else { + _finish_call_back(_runtime_state.get(), &_exec_status); + } + }) } void PipelineFragmentContext::close_a_pipeline() { @@ -773,7 +778,7 @@ void PipelineFragmentContext::close_a_pipeline() { } } -void PipelineFragmentContext::_send_report() const { +void PipelineFragmentContext::_send_report() { Status exec_status = Status::OK(); { std::lock_guard l(_status_lock); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 3c5737d143a846..ef3af1ad617e8c 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -103,7 +103,7 @@ class PipelineFragmentContext : public std::enable_shared_from_this l(_status_lock); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 648fc6f0fb57f3..de910874b76cf0 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -424,18 +424,15 @@ void FragmentMgr::remove_fragment_exec_state(std::shared_ptr } void FragmentMgr::remove_pipeline_context( - std::shared_ptr& f_context) { - auto share_pip_ctx = f_context; - _exec_env->send_report_thread_pool()->submit_func([&, this] { - std::lock_guard lock(_lock); - auto query_id = share_pip_ctx->get_query_id(); - auto* q_context = share_pip_ctx->get_query_context(); - bool all_done = q_context->countdown(); - this->_pipeline_map.erase(share_pip_ctx->get_fragment_instance_id()); - if (all_done) { - this->_query_ctx_map.erase(query_id); - } - }); + std::shared_ptr& share_pip_ctx) { + std::lock_guard lock(_lock); + auto query_id = share_pip_ctx->get_query_id(); + auto* q_context = share_pip_ctx->get_query_context(); + bool all_done = q_context->countdown(); + this->_pipeline_map.erase(share_pip_ctx->get_fragment_instance_id()); + if (all_done) { + this->_query_ctx_map.erase(query_id); + } } template From 6189d06f81fd70c14ff0381450ec601b01a7c52f Mon Sep 17 00:00:00 2001 From: liulijia Date: Sat, 5 Aug 2023 00:20:44 +0800 Subject: [PATCH 06/20] fix 1 --- be/src/pipeline/pipeline_fragment_context.cpp | 8 ++++---- be/src/pipeline/pipeline_fragment_context.h | 2 +- be/src/pipeline/pipeline_task.cpp | 2 +- be/src/pipeline/task_scheduler.cpp | 4 ++-- be/src/runtime/plan_fragment_executor.cpp | 11 ++++++----- be/src/runtime/plan_fragment_executor.h | 3 ++- 6 files changed, 16 insertions(+), 14 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 4f352b0580da2d..8597e404e3bdf4 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -751,10 +751,10 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr } void PipelineFragmentContext::_close_action() { - auto close_time = _fragment_watcher.elapsed_time() + auto close_time = _fragment_watcher.elapsed_time(); _runtime_profile->total_time_counter()->update(close_time); COUNTER_UPDATE(_close_timer, close_time); - this->_send_report(); + this->send_report(); // all submitted tasks done auto share_this = shared_from_this(); _exec_env->fragment_mgr()->remove_pipeline_context(share_this); @@ -767,7 +767,7 @@ void PipelineFragmentContext::_close_action() { } else { _finish_call_back(_runtime_state.get(), &_exec_status); } - }) + }); } void PipelineFragmentContext::close_a_pipeline() { @@ -778,7 +778,7 @@ void PipelineFragmentContext::close_a_pipeline() { } } -void PipelineFragmentContext::_send_report() { +void PipelineFragmentContext::send_report() { Status exec_status = Status::OK(); { std::lock_guard l(_status_lock); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index ef3af1ad617e8c..4d61d9e8cb16e7 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -103,7 +103,7 @@ class PipelineFragmentContext : public std::enable_shared_from_this l(_status_lock); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index da6da619a96778..3ccaeff70d8a37 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -117,7 +117,7 @@ void PipelineTask::_init_profile() { _dst_pending_finish_over_timer = ADD_TIMER(_task_profile, "Task4DstPendingFinishOverTime"); _dst_pending_finish_check_timer = - ADD_TIMER(_dst_pending_finish_over_timer, "Task4DstPendingFinishCheckTime"); + ADD_TIMER(_task_profile, "Task4DstPendingFinishCheckTime"); _dst_pending_finish_over_timer1 = ADD_TIMER(_task_profile, "Task4DstPendingFinishOverTime1"); _pip_task_total_timer = ADD_TIMER(_task_profile, "Task5TotalTime"); _close_pipeline_timer = ADD_TIMER(_task_profile, "Task6ClosePipelineTime"); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 4f9e90f80e5e99..0e3b6e908a5c26 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -261,7 +261,7 @@ void TaskScheduler::_do_work(size_t index) { // If pipeline is canceled caused by memory limit, we should send report to FE in order // to cancel all pipeline tasks in this query - fragment_ctx->send_report(true); + fragment_ctx->send_report(); _try_close_task(task, PipelineTaskState::CANCELED); continue; } @@ -287,7 +287,7 @@ void TaskScheduler::_do_work(size_t index) { // exec failed,cancel all fragment instance fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.to_string()); - fragment_ctx->send_report(true); + fragment_ctx->send_report(); _try_close_task(task, PipelineTaskState::CANCELED); continue; } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 882a14020a3a01..c8b57d95e4a8d2 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -50,6 +50,7 @@ #include "runtime/query_statistics.h" #include "runtime/result_queue_mgr.h" #include "runtime/runtime_filter_mgr.h" +#include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/thread_context.h" #include "util/container_util.hpp" #include "util/defer_op.h" @@ -611,7 +612,7 @@ Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) { void FragmentExecState::execute(const FinishCallback& cb) { std::string func_name {"FragmentExecState::execute"}; #ifndef BE_TEST - SCOPED_ATTACH_TASK(_executor->runtime_state()); + SCOPED_ATTACH_TASK(_executor.runtime_state()); #endif LOG_INFO(func_name) @@ -624,14 +625,14 @@ void FragmentExecState::execute(const FinishCallback& cb) { cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "exec_state execute failed"); } - _executor->exec_env()->fragment_mgr()->remove_fragment_exec_state(shared_from_this()); + _executor.exec_env()->fragment_mgr()->remove_fragment_exec_state(shared_from_this()); // Callback after remove from this id - auto status = _executor->status(); - cb(_executor->runtime_state(), &status); + auto status = _executor.status(); + cb(_executor.runtime_state(), &status); } -Status FragmentExecState::_execute() const { +Status FragmentExecState::_execute() { if (_need_wait_execution_trigger) { // if _need_wait_execution_trigger is true, which means this instance // is prepared but need to wait for the signal to do the rest execution. diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index c525c172de6fda..f04ef46e9c15a7 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -36,6 +36,7 @@ #include "common/status.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" +#include "runtime/fragment_mgr.h" #include "util/runtime_profile.h" namespace doris { @@ -312,7 +313,7 @@ class FragmentExecState : public std::enable_shared_from_this void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } private: - Status _execute() const; + Status _execute(); void coordinator_callback(const Status& status, RuntimeProfile* profile, RuntimeProfile* load_channel_profile, bool done); From 5c6bf068c4fba0aa91b9d146b9a70a5dd27fcdf3 Mon Sep 17 00:00:00 2001 From: liulijia Date: Sat, 5 Aug 2023 00:22:49 +0800 Subject: [PATCH 07/20] fix 2 --- be/src/runtime/plan_fragment_executor.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index c8b57d95e4a8d2..a876b74ae24f99 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -51,6 +51,7 @@ #include "runtime/result_queue_mgr.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/stream_load/stream_load_context.h" #include "runtime/thread_context.h" #include "util/container_util.hpp" #include "util/defer_op.h" From f32909bd7560b45e2aad8860c3a375b0665bf4cd Mon Sep 17 00:00:00 2001 From: liulijia Date: Sat, 5 Aug 2023 00:25:38 +0800 Subject: [PATCH 08/20] fix 3 --- be/src/runtime/plan_fragment_executor.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index a876b74ae24f99..fc5b0b28508350 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -42,6 +42,7 @@ #include "exec/data_sink.h" #include "exec/exec_node.h" #include "exec/scan_node.h" +#include "io/fs/stream_load_pipe.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" From 52b00d4badf48586136b34cf063e28337dd17fd1 Mon Sep 17 00:00:00 2001 From: liulijia Date: Thu, 10 Aug 2023 16:46:02 +0800 Subject: [PATCH 09/20] fix core & use btread::Mutex to replace std::mutex in exchagne_sink_buffer --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 17 +++++----- be/src/pipeline/exec/exchange_sink_buffer.h | 3 +- be/src/pipeline/pipeline_fragment_context.cpp | 31 +++++++++---------- be/src/pipeline/pipeline_task.cpp | 3 ++ be/src/pipeline/pipeline_task.h | 2 ++ be/src/runtime/fragment_mgr.cpp | 18 ++++++++--- 6 files changed, 43 insertions(+), 31 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 299454dada25bf..ac268f14494ec8 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -79,7 +79,8 @@ bool ExchangeSinkBuffer::is_pending_finish() { bool need_cancel = _context->is_canceled(); for (auto& pair : _instance_to_package_queue_mutex) { - std::unique_lock lock(*(pair.second)); + std::unique_lock lock(*(pair.second)); +// std::unique_lock lock(*(pair.second)); auto& id = pair.first; if (!_instance_to_sending_by_pipeline.at(id)) { // when pending finish, we need check whether current query is cancelled @@ -104,7 +105,7 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { if (_instance_to_package_queue_mutex.count(low_id)) { return; } - _instance_to_package_queue_mutex[low_id] = std::make_unique(); + _instance_to_package_queue_mutex[low_id] = std::make_unique(); _instance_to_seq[low_id] = 0; _instance_to_package_queue[low_id] = std::queue>(); _instance_to_broadcast_package_queue[low_id] = @@ -130,7 +131,7 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { TUniqueId ins_id = request.channel->_fragment_instance_id; bool send_now = false; { - std::unique_lock lock(*_instance_to_package_queue_mutex[ins_id.lo]); + std::unique_lock lock(*_instance_to_package_queue_mutex[ins_id.lo]); // Do not have in process rpc, directly send if (_instance_to_sending_by_pipeline[ins_id.lo]) { send_now = true; @@ -156,7 +157,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { bool send_now = false; request.block_holder->ref(); { - std::unique_lock lock(*_instance_to_package_queue_mutex[ins_id.lo]); + std::unique_lock lock(*_instance_to_package_queue_mutex[ins_id.lo]); // Do not have in process rpc, directly send if (_instance_to_sending_by_pipeline[ins_id.lo]) { send_now = true; @@ -172,7 +173,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { } Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { - std::unique_lock lock(*_instance_to_package_queue_mutex[id]); + std::unique_lock lock(*_instance_to_package_queue_mutex[id]); DCHECK(_instance_to_sending_by_pipeline[id] == false); @@ -313,7 +314,7 @@ void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) } void ExchangeSinkBuffer::_ended(InstanceLoId id) { - std::unique_lock lock(*_instance_to_package_queue_mutex[id]); + std::unique_lock lock(*_instance_to_package_queue_mutex[id]); _instance_to_sending_by_pipeline[id] = true; _instance_watcher[id].stop(); } @@ -325,13 +326,13 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { } void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { - std::unique_lock lock(*_instance_to_package_queue_mutex[id]); + std::unique_lock lock(*_instance_to_package_queue_mutex[id]); _instance_to_receiver_eof[id] = true; _instance_to_sending_by_pipeline[id] = true; } bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { - std::unique_lock lock(*_instance_to_package_queue_mutex[id]); + std::unique_lock lock(*_instance_to_package_queue_mutex[id]); return _instance_to_receiver_eof[id]; } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index c4e9f2516c01cc..38ad6462b3459b 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -175,7 +176,7 @@ class ExchangeSinkBuffer { void update_profile(RuntimeProfile* profile); private: - phmap::flat_hash_map> + phmap::flat_hash_map> _instance_to_package_queue_mutex; // store data in non-broadcast shuffle phmap::flat_hash_map>> diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 8597e404e3bdf4..fe21b25005ba2e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -134,7 +134,7 @@ PipelineFragmentContext::PipelineFragmentContext( } PipelineFragmentContext::~PipelineFragmentContext() { - + LOG(WARNING) << "~PFC " << print_id(_fragment_instance_id); } void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, @@ -209,8 +209,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re } LOG_INFO("PipelineFragmentContext::prepare") - .tag("query_id", _query_id) - .tag("instance_id", local_params.fragment_instance_id) + .tag("query_id", print_id(_query_id)) + .tag("instance_id", print_id(local_params.fragment_instance_id)) .tag("backend_num", local_params.backend_num) .tag("pthread_id", (uintptr_t)pthread_self()); @@ -754,11 +754,9 @@ void PipelineFragmentContext::_close_action() { auto close_time = _fragment_watcher.elapsed_time(); _runtime_profile->total_time_counter()->update(close_time); COUNTER_UPDATE(_close_timer, close_time); - this->send_report(); // all submitted tasks done - auto share_this = shared_from_this(); - _exec_env->fragment_mgr()->remove_pipeline_context(share_this); _exec_env->send_report_thread_pool()->submit_func([&, this] { + this->send_report(); if (_runtime_state != nullptr) { // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state. SCOPED_ATTACH_TASK(_runtime_state.get()); @@ -767,6 +765,8 @@ void PipelineFragmentContext::_close_action() { } else { _finish_call_back(_runtime_state.get(), &_exec_status); } + auto share_this = shared_from_this(); + _exec_env->fragment_mgr()->remove_pipeline_context(share_this); }); } @@ -799,17 +799,14 @@ void PipelineFragmentContext::send_report() { return; } - auto share_this = shared_from_this(); - _exec_env->send_report_thread_pool()->submit_func([&, this] { - share_this->_report_status_cb( - {exec_status, _is_report_success ? _runtime_state->runtime_profile() : nullptr, - _is_report_success ? _runtime_state->load_channel_profile() : nullptr, - !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, - _fragment_instance_id, _backend_num, _runtime_state.get(), - std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), - std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, - std::placeholders::_2)}); - }); + _report_status_cb( + {exec_status, _is_report_success ? _runtime_state->runtime_profile() : nullptr, + _is_report_success ? _runtime_state->load_channel_profile() : nullptr, + !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, + _fragment_instance_id, _backend_num, _runtime_state.get(), + std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), + std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, + std::placeholders::_2)}); } } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 3ccaeff70d8a37..d4706f5c004fb5 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -111,6 +111,7 @@ void PipelineTask::_init_profile() { _begin_execute_timer = ADD_TIMER(_task_profile, "Task1BeginExecuteTime"); _eos_timer = ADD_TIMER(_task_profile, "Task2EosTime"); + _try_close_timer = ADD_TIMER(_task_profile, "Task23TryCloseTime"); _src_pending_finish_check_timer = ADD_TIMER(_task_profile, "Task3SrcPendingFinishCheckTime"); _src_pending_finish_over_timer = ADD_TIMER(_task_profile, "Task3SrcPendingFinishOverTime"); _src_pending_finish_over_timer1 = ADD_TIMER(_task_profile, "Task3SrcPendingFinishOverTime1"); @@ -298,9 +299,11 @@ Status PipelineTask::try_close() { return Status::OK(); } _try_close_flag = true; + _pipeline_task_watcher.elapsed_time(); if (!_prepared) { return Status::OK(); } + COUNTER_SET(_try_close_timer, (int64_t)_pipeline_task_watcher.elapsed_time()); Status status1; { SCOPED_TIMER(_try_close_sink_timer); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 01f17598f97426..55bafa3461ff2f 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -365,6 +365,8 @@ class PipelineTask { bool _is_eos = false; RuntimeProfile::Counter* _eos_timer; int64_t _eos_time = 0; + RuntimeProfile::Counter* _try_close_timer; + //time 3 bool _is_src_pending_finish_over = false; bool _is_src_pending_finish_over1 = false; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index de910874b76cf0..c3c3727f1879c3 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -424,14 +424,20 @@ void FragmentMgr::remove_fragment_exec_state(std::shared_ptr } void FragmentMgr::remove_pipeline_context( - std::shared_ptr& share_pip_ctx) { + std::shared_ptr& f_context) { std::lock_guard lock(_lock); - auto query_id = share_pip_ctx->get_query_id(); - auto* q_context = share_pip_ctx->get_query_context(); + auto query_id = f_context->get_query_id(); + LOG(INFO) << "remove 1 " << print_id(query_id); + auto* q_context = f_context->get_query_context(); + LOG(INFO) << "remove 2 " << q_context; bool all_done = q_context->countdown(); - this->_pipeline_map.erase(share_pip_ctx->get_fragment_instance_id()); + LOG(INFO) << "remove 3 " << all_done; + auto ins_id = f_context->get_fragment_instance_id(); + _pipeline_map.erase(f_context->get_fragment_instance_id()); + LOG(INFO) << "remove 4" << print_id(ins_id) << ", q_id " << print_id(query_id) << ", all_done " + << all_done; if (all_done) { - this->_query_ctx_map.erase(query_id); + _query_ctx_map.erase(query_id); } } @@ -698,7 +704,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, { std::lock_guard lock(_lock); + _pipeline_map.insert(std::make_pair(fragment_instance_id, context)); + LOG(INFO) << "insert " << print_id(fragment_instance_id); _cv.notify_all(); } From d6e58c96f259a27fdcfefd8b35db8a09b2df67d0 Mon Sep 17 00:00:00 2001 From: liulijia Date: Thu, 10 Aug 2023 22:11:49 +0800 Subject: [PATCH 10/20] fix rpc profile --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index ac268f14494ec8..c8de1a209a5b9c 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -367,7 +367,7 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_ti } *max_time = local_max_time; *max_callback_time = local_max_callback_time; - *max_callback_exec_time = local_min_callback_time; + *max_callback_exec_time = local_max_callback_exec_time; *min_time = local_min_time; *min_callback_time = local_min_callback_time; *min_callback_exec_time = local_min_callback_exec_time; @@ -386,8 +386,8 @@ void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time, i _rpc_count++; DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end()); int64_t rpc_forward_time = receive_time - start_rpc_time; - int64_t rpc_callback_time = receive_time - start_rpc_time; - int64_t callback_exec_time = receive_time - start_rpc_time; + int64_t rpc_callback_time = callback_start_time - receive_time; + int64_t callback_exec_time = callback_end_time - callback_start_time; if (rpc_forward_time > 0) { _instance_to_rpc_time[id] += rpc_forward_time; } From a79c5ddeec56d2453d8186034bee8bf0208b71e5 Mon Sep 17 00:00:00 2001 From: liulijia Date: Mon, 14 Aug 2023 19:36:21 +0800 Subject: [PATCH 11/20] more profile --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 85 ++++++++++++++----- be/src/pipeline/exec/exchange_sink_buffer.h | 11 ++- be/src/service/internal_service.cpp | 24 ++++-- be/src/service/internal_service.h | 2 +- be/src/vec/runtime/vdata_stream_mgr.cpp | 4 +- be/src/vec/runtime/vdata_stream_mgr.h | 3 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 32 ++++++- be/src/vec/runtime/vdata_stream_recvr.h | 9 +- gensrc/proto/internal_service.proto | 2 + 9 files changed, 133 insertions(+), 39 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index c8de1a209a5b9c..23b474bbb4c360 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -119,6 +119,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { _instance_watcher[low_id].start(); _instance_to_receiver_eof[low_id] = false; _instance_to_rpc_time[low_id] = 0; + _instance_to_rpc_exec_delay_time[low_id] = 0; + _instance_to_rpc_exec_time[low_id] = 0; _instance_to_rpc_callback_time[low_id] = 0; _instance_to_rpc_callback_exec_time[low_id] = 0; _construct_request(low_id, finst_id); @@ -223,8 +225,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { _send_rpc(id); } auto callback_end_time = GetCurrentTimeNanos(); - set_rpc_time(id, start_rpc_time, result.receive_time(), callback_start_time, - callback_end_time); + set_rpc_time(id, start_rpc_time, result.receive_time(), result.exec_start_time(), + result.exec_end_time(), callback_start_time, callback_end_time); }); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); @@ -279,8 +281,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { _send_rpc(id); } auto callback_end_time = GetCurrentTimeNanos(); - set_rpc_time(id, start_rpc_time, result.receive_time(), callback_start_time, - callback_end_time); + set_rpc_time(id, start_rpc_time, result.receive_time(), result.exec_start_time(), + result.exec_end_time(), callback_start_time, callback_end_time); }); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); @@ -337,14 +339,22 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { } void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time, + int64_t* max_exec_delay_time, + int64_t* min_exec_delay_time, + int64_t* max_exec_time, + int64_t* min_exec_time, int64_t* max_callback_time, int64_t* min_callback_time, int64_t* max_callback_exec_time, int64_t* min_callback_exec_time) { int64_t local_max_time = 0; + int64_t local_max_exec_delay_time = 0; + int64_t local_max_exec_time = 0; int64_t local_max_callback_time = 0; int64_t local_max_callback_exec_time = 0; int64_t local_min_time = INT64_MAX; + int64_t local_min_exec_delay_time = INT64_MAX; + int64_t local_min_exec_time = INT64_MAX; int64_t local_min_callback_time = INT64_MAX; int64_t local_min_callback_exec_time = INT64_MAX; for (auto& [id, time] : _instance_to_rpc_time) { @@ -352,6 +362,16 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_ti local_max_time = std::max(local_max_time, time); local_min_time = std::min(local_min_time, time); } + auto& exec_delay_time = _instance_to_rpc_exec_delay_time[id]; + if (exec_delay_time != 0) { + local_max_exec_delay_time = std::max(local_max_exec_delay_time, exec_delay_time); + local_min_exec_delay_time = std::min(local_min_exec_delay_time, exec_delay_time); + } + auto& exec_time = _instance_to_rpc_exec_time[id]; + if (exec_time != 0) { + local_max_exec_time = std::max(local_max_exec_time, exec_time); + local_min_exec_time = std::min(local_min_exec_time, exec_time); + } auto& callback_time = _instance_to_rpc_callback_time[id]; if (callback_time != 0) { local_max_callback_time = std::max(local_max_callback_time, callback_time); @@ -366,9 +386,13 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_ti } } *max_time = local_max_time; + *max_exec_delay_time = local_max_exec_delay_time; + *max_exec_time = local_max_exec_time; *max_callback_time = local_max_callback_time; *max_callback_exec_time = local_max_callback_exec_time; *min_time = local_min_time; + *min_exec_delay_time = local_min_exec_delay_time; + *min_exec_time = local_min_exec_time; *min_callback_time = local_min_callback_time; *min_callback_exec_time = local_min_callback_exec_time; } @@ -382,16 +406,24 @@ int64_t ExchangeSinkBuffer::get_sum_rpc_time() { } void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_time, - int64_t callback_start_time, int64_t callback_end_time) { + int64_t exec_start, int64_t exec_end, int64_t callback_start_time, + int64_t callback_end_time) { _rpc_count++; DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end()); int64_t rpc_forward_time = receive_time - start_rpc_time; - int64_t rpc_callback_time = callback_start_time - receive_time; + int64_t rpc_exec_delay_time = exec_start - receive_time; + int64_t rpc_exec_time = exec_end - exec_start; + int64_t rpc_callback_time = callback_start_time - exec_end; int64_t callback_exec_time = callback_end_time - callback_start_time; if (rpc_forward_time > 0) { _instance_to_rpc_time[id] += rpc_forward_time; } - + if (rpc_exec_delay_time > 0) { + _instance_to_rpc_exec_delay_time[id] += rpc_exec_delay_time; + } + if (rpc_exec_time > 0) { + _instance_to_rpc_exec_time[id] += rpc_exec_time; + } if (rpc_callback_time > 0) { _instance_to_rpc_callback_time[id] += rpc_callback_time; } @@ -401,22 +433,37 @@ void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time, i } void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { - auto* _max_rpc_timer = ADD_TIMER(profile, "RpcMaxTime"); - auto* _max_rpc_callback_timer = ADD_TIMER(profile, "RpcMaxCallbackTime"); - auto* _max_rpc_callback_exec_timer = ADD_TIMER(profile, "RpcMaxCallbackExecTime"); - auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime"); - auto* _min_rpc_callback_timer = ADD_TIMER(profile, "RpcMinCallbackTime"); - auto* _min_rpc_callback_exec_timer = ADD_TIMER(profile, "RpcMinCallbackExecTime"); - auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime"); - auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT); - auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime"); - - int64_t max_rpc_time = 0, min_rpc_time = 0, max_callback_t = 0, min_callback_t = 0, + auto* _count_rpc = ADD_COUNTER(profile, "Rpc0Count", TUnit::UNIT); + auto* _max_rpc_timer = ADD_TIMER(profile, "Rpc1MaxTime"); + auto* _min_rpc_timer = ADD_TIMER(profile, "Rpc1MinTime"); + auto* _sum_rpc_timer = ADD_TIMER(profile, "Rpc1SumTime"); + auto* _avg_rpc_timer = ADD_TIMER(profile, "Rpc1AvgTime"); + + auto* _max_rpc_exec_delay_timer = ADD_TIMER(profile, "Rpc2MaxExecDelayTime"); + auto* _min_rpc_exec_delay_timer = ADD_TIMER(profile, "Rpc2MinExecDelayTime"); + + auto* _max_rpc_exec_timer = ADD_TIMER(profile, "Rpc2MaxExecTime"); + auto* _min_rpc_exec_timer = ADD_TIMER(profile, "Rpc2MinExecTime"); + + auto* _max_rpc_callback_timer = ADD_TIMER(profile, "Rpc3MaxCallbackTime"); + auto* _min_rpc_callback_timer = ADD_TIMER(profile, "Rpc3MinCallbackTime"); + + auto* _max_rpc_callback_exec_timer = ADD_TIMER(profile, "Rpc4MaxCallbackExecTime"); + auto* _min_rpc_callback_exec_timer = ADD_TIMER(profile, "Rpc4MinCallbackExecTime"); + + int64_t max_rpc_time = 0, min_rpc_time = 0, max_exec_delay_t = 0, min_exec_delay_t = 0, + max_exec_t, min_exec_t = 0, max_callback_t = 0, min_callback_t = 0, max_callback_exec_t = 0, min_callback_exec_t = 0; - get_max_min_rpc_time(&max_rpc_time, &min_rpc_time, &max_callback_t, &min_callback_t, + get_max_min_rpc_time(&max_rpc_time, &min_rpc_time, &max_exec_delay_t, &min_exec_delay_t, + &max_exec_t, &min_exec_t, &max_callback_t, &min_callback_t, &max_callback_exec_t, &min_callback_exec_t); _max_rpc_timer->set(max_rpc_time); _min_rpc_timer->set(min_rpc_time); + _max_rpc_exec_delay_timer->set(max_exec_delay_t); + _min_rpc_exec_delay_timer->set(min_exec_delay_t); + _max_rpc_exec_timer->set(max_exec_t); + _min_rpc_exec_timer->set(min_exec_t); + _max_rpc_callback_timer->set(max_callback_t); _min_rpc_callback_timer->set(min_callback_t); _max_rpc_callback_exec_timer->set(max_callback_exec_t); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 38ad6462b3459b..0e0f93ae35f7d2 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -171,8 +171,9 @@ class ExchangeSinkBuffer { bool can_write() const; bool is_pending_finish(); void close(); - void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_time, - int64_t callback_start_time, int64_t callback_end_time); + void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_time, int64_t, + exec_start, int64_t exec_end, int64_t callback_start_time, + int64_t callback_end_time); void update_profile(RuntimeProfile* profile); private: @@ -194,6 +195,8 @@ class ExchangeSinkBuffer { phmap::flat_hash_map _instance_watcher; phmap::flat_hash_map _instance_to_receiver_eof; phmap::flat_hash_map _instance_to_rpc_time; + phmap::flat_hash_map _instance_to_rpc_exec_delay_time; + phmap::flat_hash_map _instance_to_rpc_exec_time; phmap::flat_hash_map _instance_to_rpc_callback_time; phmap::flat_hash_map _instance_to_rpc_callback_exec_time; phmap::flat_hash_map _instance_to_rpc_ctx; @@ -214,7 +217,9 @@ class ExchangeSinkBuffer { inline void _failed(InstanceLoId id, const std::string& err); inline void _set_receiver_eof(InstanceLoId id); inline bool _is_receiver_eof(InstanceLoId id); - void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time, int64_t* max_callback_time, + void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time, int64_t* max_exec_delay_time, + int64_t* min_exec_delay_time, int64_t* max_exec_time, + int64_t* min_exec_time, int64_t* max_callback_time, int64_t* min_callback_time, int64_t* max_callback_exec_time, int64_t* min_callback_exec_time); int64_t get_sum_rpc_time(); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 610c45aa4da42b..49620c61e850d3 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1015,13 +1015,14 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr if (!request->has_block() && config::brpc_light_work_pool_threads == -1) { // under high concurrency, thread pool will have a lot of lock contention. - _transmit_block(controller, request, response, done, Status::OK()); + _transmit_block(controller, request, response, done, Status::OK(), receive_time, receive_time); return; } - FifoThreadPool& pool = request->has_block() ? _heavy_work_pool : _light_work_pool; bool ret = pool.try_offer([this, controller, request, response, done]() { - _transmit_block(controller, request, response, done, Status::OK()); + int64_t start_exec_time = GetCurrentTimeNanos(); + _transmit_block(controller, request, response, done, Status::OK(), receive_time, + start_exec_time); }); if (!ret) { offer_failed(response, done, pool); @@ -1032,14 +1033,17 @@ void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcControlle const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - bool ret = _heavy_work_pool.try_offer([this, controller, response, done]() { + int64_t receive_time = GetCurrentTimeNanos(); + bool ret = _heavy_work_pool.try_offer([this, controller, response, done, receive_time]() { + int64_t start_exec_time = GetCurrentTimeNanos(); PTransmitDataParams* new_request = new PTransmitDataParams(); google::protobuf::Closure* new_done = new NewHttpClosure(new_request, done); brpc::Controller* cntl = static_cast(controller); Status st = attachment_extract_request_contain_block(new_request, cntl); - _transmit_block(controller, new_request, response, new_done, st); + _transmit_block(controller, new_request, response, new_done, st, receive_time, + start_exec_time); }); if (!ret) { offer_failed(response, done, _heavy_work_pool); @@ -1050,7 +1054,9 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cont const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done, - const Status& extract_st) { + const Status& extract_st, int64_t receive_time, + int64_t start_exec_time) { + response->set_exec_start_time(start_exec_time); std::string query_id; TUniqueId finst_id; if (request->has_query_id()) { @@ -1065,7 +1071,9 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cont Status st; st.to_protobuf(response->mutable_status()); if (extract_st.ok()) { - st = _exec_env->vstream_mgr()->transmit_block(request, &done); + response->set_exec_end_time(start_exec_time); + st = _exec_env->vstream_mgr()->transmit_block(request, &done, + start_exec_time - receive_time); if (!st.ok()) { LOG(WARNING) << "transmit_block failed, message=" << st << ", fragment_instance_id=" << print_id(request->finst_id()) @@ -1075,6 +1083,8 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cont st = extract_st; } if (done != nullptr) { + int64_t exec_end_time = GetCurrentTimeNanos(); + response->set_exec_end_time(exec_end_time); st.to_protobuf(response->mutable_status()); done->Run(); } diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 55b51cf40a9ead..674139c13401a6 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -211,7 +211,7 @@ class PInternalServiceImpl : public PBackendService { void _transmit_block(::google::protobuf::RpcController* controller, const ::doris::PTransmitDataParams* request, ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done, - const Status& extract_st); + const Status& extract_st, int64_t receive_time, int64_t start_exec_time); void _tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index eeee0723717627..fd06d7fc19643e 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -90,7 +90,7 @@ std::shared_ptr VDataStreamMgr::find_recvr(const TUniqueId& fr } Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, - ::google::protobuf::Closure** done) { + ::google::protobuf::Closure** done, int64_t wait_exec_time) { const PUniqueId& finst_id = request->finst_id(); TUniqueId t_finst_id; t_finst_id.hi = finst_id.hi(); @@ -121,7 +121,7 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, bool eos = request->eos(); if (request->has_block()) { recvr->add_block(request->block(), request->sender_id(), request->be_number(), - request->packet_seq(), eos ? nullptr : done); + request->packet_seq(), eos ? nullptr : done, wait_exec_time); } if (eos) { diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index ca0e7ab4b741e2..5f65b4d98ddfb3 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -61,7 +61,8 @@ class VDataStreamMgr { Status deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id); - Status transmit_block(const PTransmitDataParams* request, ::google::protobuf::Closure** done); + Status transmit_block(const PTransmitDataParams* request, ::google::protobuf::Closure** done, + int64_t wait_exec_time); void cancel(const TUniqueId& fragment_instance_id); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 2e0ffaf9a37bb3..589b8f1e0f9a7b 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -44,7 +44,8 @@ VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* parent_recvr, int n : _recvr(parent_recvr), _is_cancelled(false), _num_remaining_senders(num_senders), - _received_first_batch(false) {} + _received_first_batch(false), + _wait_exec_time(0) {} VDataStreamRecvr::SenderQueue::~SenderQueue() { // Check pending closures, if it is not empty, should clear it here. but it should not happen. @@ -110,12 +111,14 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number, int64_t packet_seq, - ::google::protobuf::Closure** done) { + ::google::protobuf::Closure** done, + int64_t wait_exec_time) { { std::lock_guard l(_lock); if (_is_cancelled) { return; } + _wait_exec_time += wait_exec_time; auto iter = _packet_seq_map.find(be_number); if (iter != _packet_seq_map.end()) { if (iter->second >= packet_seq) { @@ -275,9 +278,17 @@ void VDataStreamRecvr::SenderQueue::close() { std::lock_guard l(_lock); _is_cancelled = true; + int64_t total_ = 0; + int64_t max_ = 0; for (auto closure_pair : _pending_closures) { closure_pair.first->Run(); + closure_pair.second.stop(); + auto t = closure_pair.second.elapsed_time(); + total_ += t; + max_ = std::max(max_, t); } + _recvr->_buffer_full_delay_total_timer->update(total_); + _recvr->_buffer_full_delay_max_timer->update(max_); _pending_closures.clear(); } @@ -334,6 +345,8 @@ VDataStreamRecvr::VDataStreamRecvr( _deserialize_row_batch_timer = ADD_TIMER(_profile, "DeserializeRowBatchTimer"); _data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime"); _buffer_full_total_timer = ADD_TIMER(_profile, "SendersBlockedTotalTimer(*)"); + _buffer_full_delay_total_timer = ADD_TIMER(_profile, "SendersBlockedToCloseTotalTimer(*)"); + _buffer_full_delay_max_timer = ADD_TIMER(_profile, "SendersBlockedToCloseMaxTimer"); _first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime"); _decompress_timer = ADD_TIMER(_profile, "DecompressTime"); _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES); @@ -367,10 +380,11 @@ Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr, } void VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number, - int64_t packet_seq, ::google::protobuf::Closure** done) { + int64_t packet_seq, ::google::protobuf::Closure** done, + int64_t wait_exec_time) { SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id, _fragment_instance_id); int use_sender_id = _is_merging ? sender_id : 0; - _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done); + _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done, wait_exec_time); } void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) { @@ -418,9 +432,19 @@ void VDataStreamRecvr::close() { return; } _is_closed = true; + int64_t total_wait_exec_time = 0; + int64_t max_wait_exec_time = 0; for (int i = 0; i < _sender_queues.size(); ++i) { _sender_queues[i]->close(); + max_wait_exec_time = std::max(max_wait_exec_time, _sender_queues[i]->get_wait_exec_time()); + total_wait_exec_time += _sender_queues[i]->get_wait_exec_time(); } + auto* max_wait_exec_timer = ADD_TIMER(_profile, "WaitExecTimeMax"); + auto* total_wait_exec_timer = ADD_TIMER(_profile, "WaitExecTimeTotal"); + auto* avg_wait_exec_timer = ADD_TIMER(_profile, "WaitExecTimeAvg"); + max_wait_exec_timer->update(max_wait_exec_time); + total_wait_exec_timer->update(total_wait_exec_time); + avg_wait_exec_timer->update(total_wait_exec_time / _sender_queues.size()); // Remove this receiver from the DataStreamMgr that created it. // TODO: log error msg _mgr->deregister_recvr(fragment_instance_id(), dest_node_id()); diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 03bf6f9db28bc5..33df2a8609c9b9 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -76,7 +76,7 @@ class VDataStreamRecvr { size_t offset); void add_block(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq, - ::google::protobuf::Closure** done); + ::google::protobuf::Closure** done, int64_t wait_exec_time); void add_block(Block* block, int sender_id, bool use_move); @@ -149,6 +149,8 @@ class VDataStreamRecvr { RuntimeProfile::Counter* _deserialize_row_batch_timer; RuntimeProfile::Counter* _first_batch_wait_total_timer; RuntimeProfile::Counter* _buffer_full_total_timer; + RuntimeProfile::Counter* _buffer_full_delay_total_timer; + RuntimeProfile::Counter* _buffer_full_delay_max_timer; RuntimeProfile::Counter* _data_arrival_timer; RuntimeProfile::Counter* _decompress_timer; RuntimeProfile::Counter* _decompress_bytes; @@ -186,7 +188,7 @@ class VDataStreamRecvr::SenderQueue { virtual Status get_batch(Block* next_block, bool* eos); void add_block(const PBlock& pblock, int be_number, int64_t packet_seq, - ::google::protobuf::Closure** done); + ::google::protobuf::Closure** done, int64_t wait_exec_time); virtual void add_block(Block* block, bool use_move); @@ -201,6 +203,8 @@ class VDataStreamRecvr::SenderQueue { return _block_queue.empty(); } + int64_t get_wait_exec_time() const { return _wait_exec_time; } + protected: Status _inner_get_batch_without_lock(Block* block, bool* eos); @@ -220,6 +224,7 @@ class VDataStreamRecvr::SenderQueue { std::unordered_map _packet_seq_map; std::deque> _pending_closures; std::unordered_map> _local_closure; + int64_t _wait_exec_time; }; class VDataStreamRecvr::PipSenderQueue : public SenderQueue { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 830ed3c41a56f5..f452cf5214280b 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -54,6 +54,8 @@ message PTransmitDataParams { message PTransmitDataResult { optional PStatus status = 1; optional int64 receive_time = 2; + optional int64 exec_start_time = 3; + optional int64 exec_end_time = 4; }; message PTabletWithPartition { From 1904c3356c4cc2ab15a3a6344d4e2d54bf322dbf Mon Sep 17 00:00:00 2001 From: liulijia Date: Mon, 14 Aug 2023 20:38:41 +0800 Subject: [PATCH 12/20] fix compile --- be/src/pipeline/exec/exchange_sink_buffer.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 0e0f93ae35f7d2..aff8084c4b067e 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -171,8 +171,8 @@ class ExchangeSinkBuffer { bool can_write() const; bool is_pending_finish(); void close(); - void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_time, int64_t, - exec_start, int64_t exec_end, int64_t callback_start_time, + void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_time, + int64_t exec_start, int64_t exec_end, int64_t callback_start_time, int64_t callback_end_time); void update_profile(RuntimeProfile* profile); From 4aa60e8cbdd583bc6e4845ae7a2cac027e7f1fcf Mon Sep 17 00:00:00 2001 From: liulijia Date: Mon, 14 Aug 2023 21:11:01 +0800 Subject: [PATCH 13/20] fix compile --- be/src/service/internal_service.cpp | 2 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 49620c61e850d3..363ad67d2d391b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1019,7 +1019,7 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr return; } FifoThreadPool& pool = request->has_block() ? _heavy_work_pool : _light_work_pool; - bool ret = pool.try_offer([this, controller, request, response, done]() { + bool ret = pool.try_offer([this, controller, request, response, done, receive_time]() { int64_t start_exec_time = GetCurrentTimeNanos(); _transmit_block(controller, request, response, done, Status::OK(), receive_time, start_exec_time); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 589b8f1e0f9a7b..87ff24d94c9183 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -283,7 +283,7 @@ void VDataStreamRecvr::SenderQueue::close() { for (auto closure_pair : _pending_closures) { closure_pair.first->Run(); closure_pair.second.stop(); - auto t = closure_pair.second.elapsed_time(); + int64_t t = closure_pair.second.elapsed_time(); total_ += t; max_ = std::max(max_, t); } From 41df148ea4b5726f1c30803b89f1834979098ff1 Mon Sep 17 00:00:00 2001 From: liulijia Date: Mon, 14 Aug 2023 21:42:55 +0800 Subject: [PATCH 14/20] fix code style --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 11 ++---- be/src/pipeline/exec/exchange_sink_buffer.h | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 5 +-- be/src/pipeline/pipeline_task.cpp | 38 ++++++++++++------- be/src/runtime/fragment_mgr.cpp | 3 +- be/src/runtime/plan_fragment_executor.h | 2 +- be/src/service/internal_service.cpp | 3 +- 7 files changed, 36 insertions(+), 28 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 23b474bbb4c360..02db610c4046d0 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -80,7 +80,6 @@ bool ExchangeSinkBuffer::is_pending_finish() { for (auto& pair : _instance_to_package_queue_mutex) { std::unique_lock lock(*(pair.second)); -// std::unique_lock lock(*(pair.second)); auto& id = pair.first; if (!_instance_to_sending_by_pipeline.at(id)) { // when pending finish, we need check whether current query is cancelled @@ -340,10 +339,8 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time, int64_t* max_exec_delay_time, - int64_t* min_exec_delay_time, - int64_t* max_exec_time, - int64_t* min_exec_time, - int64_t* max_callback_time, + int64_t* min_exec_delay_time, int64_t* max_exec_time, + int64_t* min_exec_time, int64_t* max_callback_time, int64_t* min_callback_time, int64_t* max_callback_exec_time, int64_t* min_callback_exec_time) { @@ -406,8 +403,8 @@ int64_t ExchangeSinkBuffer::get_sum_rpc_time() { } void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_time, - int64_t exec_start, int64_t exec_end, int64_t callback_start_time, - int64_t callback_end_time) { + int64_t exec_start, int64_t exec_end, + int64_t callback_start_time, int64_t callback_end_time) { _rpc_count++; DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end()); int64_t rpc_forward_time = receive_time - start_rpc_time; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index aff8084c4b067e..5b5e58ceb2b1fa 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include #include @@ -25,7 +26,6 @@ #include #include -#include #include #include #include diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index fe21b25005ba2e..efa0af7e7fb866 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -198,9 +198,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time()); _prepare_timer = ADD_TIMER(_runtime_profile, "PipCtx2PrepareTime"); _close_timer = ADD_TIMER(_runtime_profile, "PipCtx3CloseTime"); - Defer prepare_defer {[&]() { - COUNTER_UPDATE(_prepare_timer, _fragment_watcher.elapsed_time()); - }}; + Defer prepare_defer { + [&]() { COUNTER_UPDATE(_prepare_timer, _fragment_watcher.elapsed_time()); }}; auto* fragment_context = this; OpentelemetryTracer tracer = telemetry::get_noop_tracer(); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index d4706f5c004fb5..c46e741d02fbd6 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -102,26 +102,36 @@ void PipelineTask::_init_profile() { _wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime"); _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime"); _wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime"); - _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT); - _block_by_source_counts = ADD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT); - _block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT); + + static const char* num_blocked_times = "NumBlockedTimes"; + _block_counts = ADD_COUNTER(_task_profile, num_blocked_times, TUnit::UNIT); + _block_by_source_counts = ADD_CHILD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT, + num_blocked_times); + _block_by_sink_counts = ADD_CHILD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT, + num_blocked_times); _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT); _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT); _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT); _begin_execute_timer = ADD_TIMER(_task_profile, "Task1BeginExecuteTime"); _eos_timer = ADD_TIMER(_task_profile, "Task2EosTime"); - _try_close_timer = ADD_TIMER(_task_profile, "Task23TryCloseTime"); - _src_pending_finish_check_timer = ADD_TIMER(_task_profile, "Task3SrcPendingFinishCheckTime"); - _src_pending_finish_over_timer = ADD_TIMER(_task_profile, "Task3SrcPendingFinishOverTime"); - _src_pending_finish_over_timer1 = ADD_TIMER(_task_profile, "Task3SrcPendingFinishOverTime1"); - - _dst_pending_finish_over_timer = ADD_TIMER(_task_profile, "Task4DstPendingFinishOverTime"); - _dst_pending_finish_check_timer = - ADD_TIMER(_task_profile, "Task4DstPendingFinishCheckTime"); - _dst_pending_finish_over_timer1 = ADD_TIMER(_task_profile, "Task4DstPendingFinishOverTime1"); - _pip_task_total_timer = ADD_TIMER(_task_profile, "Task5TotalTime"); - _close_pipeline_timer = ADD_TIMER(_task_profile, "Task6ClosePipelineTime"); + _try_close_timer = ADD_TIMER(_task_profile, "Task3TryCloseTime"); + + static const char* src_pending_finish_over_time = "Task4SrcPendingFinishOverTime"; + _src_pending_finish_over_timer = ADD_TIMER(_task_profile, src_pending_finish_over_time); + _src_pending_finish_check_timer = ADD_CHILD_TIMER( + _task_profile, "Task4SrcPendingFinishCheckTime", src_pending_finish_over_time); + _src_pending_finish_over_timer1 = ADD_CHILD_TIMER( + _task_profile, "Task4SrcPendingFinishFirstOverTime", src_pending_finish_over_time); + + static const char* dst_pending_finish_over_time = "Task5DstPendingFinishOverTime"; + _dst_pending_finish_over_timer = ADD_TIMER(_task_profile, dst_pending_finish_over_time); + _dst_pending_finish_check_timer = ADD_CHILD_TIMER( + _task_profile, "Task4DstPendingFinishCheckTime", dst_pending_finish_over_time); + _dst_pending_finish_over_timer1 = ADD_CHILD_TIMER( + _task_profile, "Task5DstPendingFinishFirstOverTime", dst_pending_finish_over_time); + _pip_task_total_timer = ADD_TIMER(_task_profile, "Task6TotalTime"); + _close_pipeline_timer = ADD_TIMER(_task_profile, "Task7ClosePipelineTime"); } Status PipelineTask::prepare(RuntimeState* state) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c3c3727f1879c3..537a62c4bdc590 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -315,7 +315,8 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { static void empty_function(RuntimeState*, Status*) {} -void FragmentMgr::_exec_actual(std::shared_ptr exec_state, const FinishCallback& cb) { +void FragmentMgr::_exec_actual(std::shared_ptr exec_state, + const FinishCallback& cb) { exec_state->execute(cb); } diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index f04ef46e9c15a7..1a2c4586aa1c07 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -34,9 +34,9 @@ #include #include "common/status.h" +#include "runtime/fragment_mgr.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" -#include "runtime/fragment_mgr.h" #include "util/runtime_profile.h" namespace doris { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 363ad67d2d391b..8f074e7ca0ab40 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1015,7 +1015,8 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr if (!request->has_block() && config::brpc_light_work_pool_threads == -1) { // under high concurrency, thread pool will have a lot of lock contention. - _transmit_block(controller, request, response, done, Status::OK(), receive_time, receive_time); + _transmit_block(controller, request, response, done, Status::OK(), receive_time, + receive_time); return; } FifoThreadPool& pool = request->has_block() ? _heavy_work_pool : _light_work_pool; From 0caaa89bd5e8f3aa0658fef083c65bbaa19b9046 Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 15 Aug 2023 10:55:22 +0800 Subject: [PATCH 15/20] fix be ut & ajust exchange sink buffer profile --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 12 ++++++------ be/test/vec/runtime/vdata_stream_test.cpp | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 02db610c4046d0..705dd4db711e88 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -439,14 +439,14 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { auto* _max_rpc_exec_delay_timer = ADD_TIMER(profile, "Rpc2MaxExecDelayTime"); auto* _min_rpc_exec_delay_timer = ADD_TIMER(profile, "Rpc2MinExecDelayTime"); - auto* _max_rpc_exec_timer = ADD_TIMER(profile, "Rpc2MaxExecTime"); - auto* _min_rpc_exec_timer = ADD_TIMER(profile, "Rpc2MinExecTime"); + auto* _max_rpc_exec_timer = ADD_TIMER(profile, "Rpc3MaxExecTime"); + auto* _min_rpc_exec_timer = ADD_TIMER(profile, "Rpc3MinExecTime"); - auto* _max_rpc_callback_timer = ADD_TIMER(profile, "Rpc3MaxCallbackTime"); - auto* _min_rpc_callback_timer = ADD_TIMER(profile, "Rpc3MinCallbackTime"); + auto* _max_rpc_callback_timer = ADD_TIMER(profile, "Rpc4MaxCallbackTime"); + auto* _min_rpc_callback_timer = ADD_TIMER(profile, "Rpc4MinCallbackTime"); - auto* _max_rpc_callback_exec_timer = ADD_TIMER(profile, "Rpc4MaxCallbackExecTime"); - auto* _min_rpc_callback_exec_timer = ADD_TIMER(profile, "Rpc4MinCallbackExecTime"); + auto* _max_rpc_callback_exec_timer = ADD_TIMER(profile, "Rpc5MaxCallbackExecTime"); + auto* _min_rpc_callback_exec_timer = ADD_TIMER(profile, "Rpc5MinCallbackExecTime"); int64_t max_rpc_time = 0, min_rpc_time = 0, max_exec_delay_t = 0, min_exec_delay_t = 0, max_exec_t, min_exec_t = 0, max_callback_t = 0, min_callback_t = 0, diff --git a/be/test/vec/runtime/vdata_stream_test.cpp b/be/test/vec/runtime/vdata_stream_test.cpp index 6c10b89aa7d8ad..4c9baa8361a582 100644 --- a/be/test/vec/runtime/vdata_stream_test.cpp +++ b/be/test/vec/runtime/vdata_stream_test.cpp @@ -78,7 +78,7 @@ class LocalMockBackendService : public PBackendService { // give response a default value to avoid null pointers in high concurrency. Status st; st.to_protobuf(response->mutable_status()); - st = stream_mgr->transmit_block(request, &done); + st = stream_mgr->transmit_block(request, &done, 0); if (!st.ok()) { LOG(WARNING) << "transmit_block failed, message=" << st << ", fragment_instance_id=" << print_id(request->finst_id()) From 1d7d7b984187a80c242d518a47aa86e0ab89b297 Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 15 Aug 2023 16:29:34 +0800 Subject: [PATCH 16/20] fix 1 --- be/src/pipeline/pipeline_task.cpp | 2 +- be/src/runtime/plan_fragment_executor.cpp | 1 - be/src/runtime/plan_fragment_executor.h | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index c46e741d02fbd6..477dce2268f993 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -127,7 +127,7 @@ void PipelineTask::_init_profile() { static const char* dst_pending_finish_over_time = "Task5DstPendingFinishOverTime"; _dst_pending_finish_over_timer = ADD_TIMER(_task_profile, dst_pending_finish_over_time); _dst_pending_finish_check_timer = ADD_CHILD_TIMER( - _task_profile, "Task4DstPendingFinishCheckTime", dst_pending_finish_over_time); + _task_profile, "Task5DstPendingFinishCheckTime", dst_pending_finish_over_time); _dst_pending_finish_over_timer1 = ADD_CHILD_TIMER( _task_profile, "Task5DstPendingFinishFirstOverTime", dst_pending_finish_over_time); _pip_task_total_timer = ADD_TIMER(_task_profile, "Task6TotalTime"); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index fc5b0b28508350..b622020e104920 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -45,7 +45,6 @@ #include "io/fs/stream_load_pipe.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" -#include "runtime/fragment_mgr.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/query_context.h" #include "runtime/query_statistics.h" diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 1a2c4586aa1c07..c4fb90b7d6c33d 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -34,7 +34,6 @@ #include #include "common/status.h" -#include "runtime/fragment_mgr.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" From 98aa8f86cfd7addd9fb256dd5595f4ad8b68bb4c Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 15 Aug 2023 17:27:49 +0800 Subject: [PATCH 17/20] fix 2 --- be/src/runtime/plan_fragment_executor.cpp | 3 +++ be/src/runtime/plan_fragment_executor.h | 1 + 2 files changed, 4 insertions(+) diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index b622020e104920..7847b0088c7490 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -45,6 +45,7 @@ #include "io/fs/stream_load_pipe.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/query_context.h" #include "runtime/query_statistics.h" @@ -74,6 +75,7 @@ namespace doris { using namespace ErrorCode; +#ifndef BE_TEST PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, const report_status_callback& report_status_cb) : _exec_env(exec_env), @@ -571,6 +573,7 @@ void PlanFragmentExecutor::close() { profile()->add_to_span(_span); _closed = true; } +#endif ///////////////// FragmentExecState ////////////// diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index c4fb90b7d6c33d..6fa1203e640ca1 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -48,6 +48,7 @@ class DescriptorTbl; class ExecEnv; class ObjectPool; class QueryStatistics; +struct ReportStatusRequest; namespace vectorized { class Block; From 1a10626b1bd8de63a967031b227a0c1ff813906a Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 15 Aug 2023 19:43:41 +0800 Subject: [PATCH 18/20] fix ut --- be/src/runtime/fragment_mgr.cpp | 7 ++----- be/src/runtime/plan_fragment_executor.cpp | 6 ++++-- be/src/runtime/plan_fragment_executor.h | 4 +++- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 537a62c4bdc590..bdcecb6ba11a93 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -428,15 +428,12 @@ void FragmentMgr::remove_pipeline_context( std::shared_ptr& f_context) { std::lock_guard lock(_lock); auto query_id = f_context->get_query_id(); - LOG(INFO) << "remove 1 " << print_id(query_id); auto* q_context = f_context->get_query_context(); - LOG(INFO) << "remove 2 " << q_context; bool all_done = q_context->countdown(); - LOG(INFO) << "remove 3 " << all_done; auto ins_id = f_context->get_fragment_instance_id(); _pipeline_map.erase(f_context->get_fragment_instance_id()); - LOG(INFO) << "remove 4" << print_id(ins_id) << ", q_id " << print_id(query_id) << ", all_done " - << all_done; + LOG(INFO) << "remove pip context " << print_id(ins_id) << ", q_id " << print_id(query_id) + << ", all_done " << all_done; if (all_done) { _query_ctx_map.erase(query_id); } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 7847b0088c7490..cfef3562281501 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -579,11 +579,13 @@ void PlanFragmentExecutor::close() { FragmentExecState::FragmentExecState(const TUniqueId& query_id, const TUniqueId& fragment_instance_id, int backend_num, - ExecEnv* exec_env, std::shared_ptr query_ctx, + ExecEnv* exec_env, FragmentMgr* fragment_mgr, + std::shared_ptr query_ctx, const report_status_callback_impl& report_status_cb_impl) : _query_id(query_id), _fragment_instance_id(fragment_instance_id), _backend_num(backend_num), + _fragment_mgr(fragment_mgr), _query_ctx(std::move(query_ctx)), _executor(exec_env, std::bind(std::mem_fn(&FragmentExecState::coordinator_callback), this, std::placeholders::_1, std::placeholders::_2, @@ -629,7 +631,7 @@ void FragmentExecState::execute(const FinishCallback& cb) { cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "exec_state execute failed"); } - _executor.exec_env()->fragment_mgr()->remove_fragment_exec_state(shared_from_this()); + _fragment_mgr->remove_fragment_exec_state(shared_from_this()); // Callback after remove from this id auto status = _executor.status(); diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 6fa1203e640ca1..83b221ecfe0d62 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -256,7 +256,8 @@ class FragmentExecState : public std::enable_shared_from_this using report_status_callback_impl = std::function; // Constructor by using QueryContext FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num, - ExecEnv* exec_env, std::shared_ptr query_ctx, + ExecEnv* exec_env, FragmentMgr* fragment_mgr, + std::shared_ptr query_ctx, const report_status_callback_impl& report_status_cb_impl); Status prepare(const TExecPlanFragmentParams& params); @@ -325,6 +326,7 @@ class FragmentExecState : public std::enable_shared_from_this int _backend_num; TNetworkAddress _coord_addr; + FragmentMgr* _fragment_mgr; // This context is shared by all fragments of this host in a query. // _query_ctx should be the last one to be destructed, because _executor's // destruct method will call close and it will depend on query context, From e61c128e0351c069534bae523e0ba2b55e54d9c2 Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 15 Aug 2023 19:46:40 +0800 Subject: [PATCH 19/20] fix ut 1 --- be/src/runtime/fragment_mgr.cpp | 4 ++-- be/src/runtime/plan_fragment_executor.h | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index bdcecb6ba11a93..3d0bdff52cc197 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -584,7 +584,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, exec_state.reset( new FragmentExecState(query_ctx->query_id, params.params.fragment_instance_id, - params.backend_num, _exec_env, query_ctx, + params.backend_num, _exec_env, query_ctx, this, std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), this, std::placeholders::_1))); if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) { @@ -664,7 +664,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr exec_state(new FragmentExecState( query_ctx->query_id, fragment_instance_id, local_params.backend_num, _exec_env, - query_ctx, + this, query_ctx, std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), this, std::placeholders::_1))); if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) { diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 83b221ecfe0d62..316af5d9599603 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -46,6 +46,7 @@ class RowDescriptor; class DataSink; class DescriptorTbl; class ExecEnv; +class FragmentMgr; class ObjectPool; class QueryStatistics; struct ReportStatusRequest; From d78ebd82dc869c82478bd5d6df96c55ed2298c40 Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 15 Aug 2023 19:51:30 +0800 Subject: [PATCH 20/20] fix ut 2 --- be/src/runtime/fragment_mgr.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 3d0bdff52cc197..2767662ca0bf5f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -584,7 +584,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, exec_state.reset( new FragmentExecState(query_ctx->query_id, params.params.fragment_instance_id, - params.backend_num, _exec_env, query_ctx, this, + params.backend_num, _exec_env, this, query_ctx, std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), this, std::placeholders::_1))); if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {