From 41b0581bccd2f1d95cbfa2cddfa5fdc3bd0b448e Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Tue, 25 Mar 2025 14:11:52 +0800 Subject: [PATCH 1/4] support rf on set operator: be part fix update fix --- be/src/pipeline/exec/set_sink_operator.cpp | 63 +++++++++++------ be/src/pipeline/exec/set_sink_operator.h | 20 ++++-- be/src/runtime/fragment_mgr.cpp | 4 +- .../runtime_filter_producer_helper_set.h | 67 +++++++++++++++++++ 4 files changed, 128 insertions(+), 26 deletions(-) create mode 100644 be/src/runtime_filter/runtime_filter_producer_helper_set.h diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 4faeb975ef9df5..665796c29201c7 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -20,12 +20,35 @@ #include #include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" #include "vec/common/hash_table/hash_table_set_build.h" #include "vec/core/materialize_block.h" namespace doris::pipeline { #include "common/compile_check_begin.h" +template +Status SetSinkLocalState::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } + + if (!_runtime_filter_producer_helper || state->is_cancelled() || !_eos) { + return Base::close(state, exec_status); + } + + try { + RETURN_IF_ERROR( + _runtime_filter_producer_helper->process(state, &_shared_state->build_block)); + } catch (Exception& e) { + return Status::InternalError( + "rf process meet error: {}, wake_up_early: {}, _finish_dependency: {}", + e.to_string(), state->get_task()->wake_up_early(), + _finish_dependency->debug_string()); + } + return Base::close(state, exec_status); +} + template Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { @@ -57,23 +80,22 @@ Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Blo local_state._mutable_block.clear(); if (eos) { - if constexpr (is_intersect) { - valid_element_in_hash_tbl = 0; - } else { - std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t; - if constexpr (!std::is_same_v) { - valid_element_in_hash_tbl = arg.hash_table->size(); - } - }, - local_state._shared_state->hash_table_variants->method_variant); - } + uint64_t hash_table_size = 0; + std::visit( + [&](auto&& arg) { + using HashTableCtxType = std::decay_t; + if constexpr (!std::is_same_v) { + hash_table_size = arg.hash_table->size(); + } + }, + local_state._shared_state->hash_table_variants->method_variant); + valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size; + local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1] ->set_ready(); - if (_child_quantity == 1) { - local_state._dependency->set_ready_to_read(); - } + RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size( + state, hash_table_size, local_state._finish_dependency)); + local_state._eos = true; } } return Status::OK(); @@ -113,16 +135,16 @@ template Status SetSinkOperatorX::_extract_build_column( SetSinkLocalState& local_state, vectorized::Block& block, vectorized::ColumnRawPtrs& raw_ptrs, size_t& rows) { - std::vector result_locs(_child_exprs.size(), -1); + std::vector result_locs(local_state._child_exprs.size(), -1); bool is_all_const = true; - for (size_t i = 0; i < _child_exprs.size(); ++i) { - RETURN_IF_ERROR(_child_exprs[i]->execute(&block, &result_locs[i])); + for (size_t i = 0; i < local_state._child_exprs.size(); ++i) { + RETURN_IF_ERROR(local_state._child_exprs[i]->execute(&block, &result_locs[i])); is_all_const &= is_column_const(*block.get_by_position(result_locs[i]).column); } rows = is_all_const ? 1 : rows; - for (size_t i = 0; i < _child_exprs.size(); ++i) { + for (size_t i = 0; i < local_state._child_exprs.size(); ++i) { size_t result_col_id = result_locs[i]; if (is_all_const) { @@ -175,6 +197,9 @@ Status SetSinkLocalState::init(RuntimeState* state, LocalSinkState RETURN_IF_ERROR(_shared_state->update_build_not_ignore_null(_child_exprs)); + _runtime_filter_producer_helper = std::make_shared(profile()); + RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, _child_exprs, + parent._runtime_filter_descs)); return Status::OK(); } diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index b2795c23a5bb1b..6b307dc6528da7 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -17,10 +17,8 @@ #pragma once -#include - -#include "olap/olap_common.h" #include "operator.h" +#include "runtime_filter/runtime_filter_producer_helper_set.h" namespace doris { #include "common/compile_check_begin.h" @@ -46,6 +44,7 @@ class SetSinkLocalState final : public PipelineXSinkLocalState { Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; + Status close(RuntimeState* state, Status exec_status) override; private: friend class SetSinkOperatorX; @@ -57,6 +56,9 @@ class SetSinkLocalState final : public PipelineXSinkLocalState { RuntimeProfile::Counter* _merge_block_timer = nullptr; RuntimeProfile::Counter* _build_timer = nullptr; + + std::shared_ptr _runtime_filter_producer_helper; + std::shared_ptr _finish_dependency; }; template @@ -71,14 +73,17 @@ class SetSinkOperatorX final : public DataSinkOperatorX _partition_exprs; using OperatorBase::_child; + + const std::vector _runtime_filter_descs; }; #include "common/compile_check_end.h" diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 3d81bee17d5453..f7980e41c91311 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1343,7 +1343,9 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, query_id.__set_lo(queryid.lo); if (auto q_ctx = get_query_ctx(query_id)) { SCOPED_ATTACH_TASK(q_ctx.get()); - std::shared_ptr filter_controller; + if (!q_ctx->get_merge_controller_handler()) { + return Status::InternalError("Merge filter failed: Merge controller handler is null"); + } return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, attach_data); } else { return Status::EndOfFile( diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_set.h b/be/src/runtime_filter/runtime_filter_producer_helper_set.h new file mode 100644 index 00000000000000..7d2f5b73e17a91 --- /dev/null +++ b/be/src/runtime_filter/runtime_filter_producer_helper_set.h @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "pipeline/pipeline_task.h" +#include "runtime/runtime_state.h" +#include "runtime_filter/runtime_filter.h" +#include "runtime_filter/runtime_filter_mgr.h" +#include "runtime_filter/runtime_filter_producer_helper.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris { +#include "common/compile_check_begin.h" +// this class used in set sink node +class RuntimeFilterProducerHelperSet : public RuntimeFilterProducerHelper { +public: + ~RuntimeFilterProducerHelperSet() override = default; + + RuntimeFilterProducerHelperSet(RuntimeProfile* profile) + : RuntimeFilterProducerHelper(profile, true, false) {} + + Status process(RuntimeState* state, const vectorized::Block* block) { + if (_skip_runtime_filters_process) { + return Status::OK(); + } + + bool wake_up_early = state->get_task()->wake_up_early(); + // Runtime filter is ignored partially which has no effect on correctness. + auto wrapper_state = wake_up_early ? RuntimeFilterWrapper::State::IGNORED + : RuntimeFilterWrapper::State::READY; + if (!wake_up_early) { + // Hash table is completed and runtime filter has a global size now. + uint64_t hash_table_size = block ? block->rows() : 0; + RETURN_IF_ERROR(_init_filters(state, hash_table_size)); + if (hash_table_size != 0) { + RETURN_IF_ERROR(_insert(block, 0)); + } + } + + for (const auto& filter : _producers) { + filter->set_wrapper_state_and_ready_to_publish(wrapper_state); + } + + RETURN_IF_ERROR(_publish(state)); + return Status::OK(); + } +}; +#include "common/compile_check_end.h" +} // namespace doris From 6ee7d53361dc2bcce44c5c99a8b5c04be9b08a55 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Tue, 1 Apr 2025 11:31:58 +0800 Subject: [PATCH 2/4] update --- be/src/pipeline/exec/set_sink_operator.cpp | 45 ++++++++++--------- .../runtime_filter_producer_helper_set.h | 8 ++-- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 665796c29201c7..5b0c123a19d22a 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -27,24 +27,36 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" +uint64_t get_hash_table_size(const auto& hash_table_variant) { + uint64_t hash_table_size = 0; + std::visit( + [&](auto&& arg) { + using HashTableCtxType = std::decay_t; + if constexpr (!std::is_same_v) { + hash_table_size = arg.hash_table->size(); + } + }, + hash_table_variant); + return hash_table_size; +} + template Status SetSinkLocalState::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } - if (!_runtime_filter_producer_helper || state->is_cancelled() || !_eos) { - return Base::close(state, exec_status); - } - - try { - RETURN_IF_ERROR( - _runtime_filter_producer_helper->process(state, &_shared_state->build_block)); - } catch (Exception& e) { - return Status::InternalError( - "rf process meet error: {}, wake_up_early: {}, _finish_dependency: {}", - e.to_string(), state->get_task()->wake_up_early(), - _finish_dependency->debug_string()); + if (!state->is_cancelled() && _eos) { + try { + RETURN_IF_ERROR(_runtime_filter_producer_helper->process( + state, &_shared_state->build_block, + get_hash_table_size(_shared_state->hash_table_variants->method_variant))); + } catch (Exception& e) { + return Status::InternalError( + "rf process meet error: {}, wake_up_early: {}, _finish_dependency: {}", + e.to_string(), state->get_task()->wake_up_early(), + _finish_dependency->debug_string()); + } } return Base::close(state, exec_status); } @@ -80,14 +92,7 @@ Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Blo local_state._mutable_block.clear(); if (eos) { - uint64_t hash_table_size = 0; - std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t; - if constexpr (!std::is_same_v) { - hash_table_size = arg.hash_table->size(); - } - }, + uint64_t hash_table_size = get_hash_table_size( local_state._shared_state->hash_table_variants->method_variant); valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size; diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_set.h b/be/src/runtime_filter/runtime_filter_producer_helper_set.h index 7d2f5b73e17a91..132538aed40b3f 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper_set.h +++ b/be/src/runtime_filter/runtime_filter_producer_helper_set.h @@ -37,7 +37,7 @@ class RuntimeFilterProducerHelperSet : public RuntimeFilterProducerHelper { RuntimeFilterProducerHelperSet(RuntimeProfile* profile) : RuntimeFilterProducerHelper(profile, true, false) {} - Status process(RuntimeState* state, const vectorized::Block* block) { + Status process(RuntimeState* state, const vectorized::Block* block, uint64_t cardinality) { if (_skip_runtime_filters_process) { return Status::OK(); } @@ -47,10 +47,8 @@ class RuntimeFilterProducerHelperSet : public RuntimeFilterProducerHelper { auto wrapper_state = wake_up_early ? RuntimeFilterWrapper::State::IGNORED : RuntimeFilterWrapper::State::READY; if (!wake_up_early) { - // Hash table is completed and runtime filter has a global size now. - uint64_t hash_table_size = block ? block->rows() : 0; - RETURN_IF_ERROR(_init_filters(state, hash_table_size)); - if (hash_table_size != 0) { + RETURN_IF_ERROR(_init_filters(state, cardinality)); + if (cardinality != 0) { RETURN_IF_ERROR(_insert(block, 0)); } } From 04227b89361afa8faf0c10c529374b5af4a59844 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Wed, 2 Apr 2025 20:57:59 +0800 Subject: [PATCH 3/4] update --- be/src/pipeline/exec/set_sink_operator.cpp | 17 +++++++++++++---- be/src/pipeline/exec/set_sink_operator.h | 1 + .../runtime_filter_producer_helper_set.h | 14 ++++---------- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 5b0c123a19d22a..b8e9542f377658 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -40,22 +40,31 @@ uint64_t get_hash_table_size(const auto& hash_table_variant) { return hash_table_size; } +template +Status SetSinkLocalState::terminate(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + if (_terminated) { + return Status::OK(); + } + RETURN_IF_ERROR(_runtime_filter_producer_helper->terminate(state)); + return Base::terminate(state); +} + template Status SetSinkLocalState::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } - if (!state->is_cancelled() && _eos) { + if (!_terminated && _runtime_filter_producer_helper && !state->is_cancelled()) { try { RETURN_IF_ERROR(_runtime_filter_producer_helper->process( state, &_shared_state->build_block, get_hash_table_size(_shared_state->hash_table_variants->method_variant))); } catch (Exception& e) { return Status::InternalError( - "rf process meet error: {}, wake_up_early: {}, _finish_dependency: {}", - e.to_string(), state->get_task()->wake_up_early(), - _finish_dependency->debug_string()); + "rf process meet error: {}, _terminated: {}, _finish_dependency: {}", + e.to_string(), _terminated, _finish_dependency->debug_string()); } } return Base::close(state, exec_status); diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 6b307dc6528da7..aadfc7ee6d2638 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -44,6 +44,7 @@ class SetSinkLocalState final : public PipelineXSinkLocalState { Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; + Status terminate(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; private: diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_set.h b/be/src/runtime_filter/runtime_filter_producer_helper_set.h index 132538aed40b3f..2e4e5bfe86a126 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper_set.h +++ b/be/src/runtime_filter/runtime_filter_producer_helper_set.h @@ -42,19 +42,13 @@ class RuntimeFilterProducerHelperSet : public RuntimeFilterProducerHelper { return Status::OK(); } - bool wake_up_early = state->get_task()->wake_up_early(); - // Runtime filter is ignored partially which has no effect on correctness. - auto wrapper_state = wake_up_early ? RuntimeFilterWrapper::State::IGNORED - : RuntimeFilterWrapper::State::READY; - if (!wake_up_early) { - RETURN_IF_ERROR(_init_filters(state, cardinality)); - if (cardinality != 0) { - RETURN_IF_ERROR(_insert(block, 0)); - } + RETURN_IF_ERROR(_init_filters(state, cardinality)); + if (cardinality != 0) { + RETURN_IF_ERROR(_insert(block, 0)); } for (const auto& filter : _producers) { - filter->set_wrapper_state_and_ready_to_publish(wrapper_state); + filter->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); } RETURN_IF_ERROR(_publish(state)); From 08db6c48fd0a652ab713c2d48a373bf15a4c62fd Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 3 Apr 2025 11:21:17 +0800 Subject: [PATCH 4/4] remove task include --- be/src/pipeline/exec/set_sink_operator.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index b8e9542f377658..8d28fe1e36953d 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -20,7 +20,6 @@ #include #include "pipeline/exec/operator.h" -#include "pipeline/pipeline_task.h" #include "vec/common/hash_table/hash_table_set_build.h" #include "vec/core/materialize_block.h"