Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 57 additions & 19 deletions be/src/pipeline/exec/set_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,49 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"

uint64_t get_hash_table_size(const auto& hash_table_variant) {
uint64_t hash_table_size = 0;
std::visit(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
hash_table_size = arg.hash_table->size();
}
},
hash_table_variant);
return hash_table_size;
}

template <bool is_intersect>
Status SetSinkLocalState<is_intersect>::terminate(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
if (_terminated) {
return Status::OK();
}
RETURN_IF_ERROR(_runtime_filter_producer_helper->terminate(state));
return Base::terminate(state);
}

template <bool is_intersect>
Status SetSinkLocalState<is_intersect>::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}

if (!_terminated && _runtime_filter_producer_helper && !state->is_cancelled()) {
try {
RETURN_IF_ERROR(_runtime_filter_producer_helper->process(
state, &_shared_state->build_block,
get_hash_table_size(_shared_state->hash_table_variants->method_variant)));
} catch (Exception& e) {
return Status::InternalError(
"rf process meet error: {}, _terminated: {}, _finish_dependency: {}",
e.to_string(), _terminated, _finish_dependency->debug_string());
}
}
return Base::close(state, exec_status);
}

template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Block* in_block,
bool eos) {
Expand Down Expand Up @@ -57,23 +100,15 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo
local_state._mutable_block.clear();

if (eos) {
if constexpr (is_intersect) {
valid_element_in_hash_tbl = 0;
} else {
std::visit(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
valid_element_in_hash_tbl = arg.hash_table->size();
}
},
local_state._shared_state->hash_table_variants->method_variant);
}
uint64_t hash_table_size = get_hash_table_size(
local_state._shared_state->hash_table_variants->method_variant);
valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size;

local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
->set_ready();
if (_child_quantity == 1) {
local_state._dependency->set_ready_to_read();
}
RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size(
state, hash_table_size, local_state._finish_dependency));
local_state._eos = true;
}
}
return Status::OK();
Expand Down Expand Up @@ -113,16 +148,16 @@ template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::_extract_build_column(
SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block,
vectorized::ColumnRawPtrs& raw_ptrs, size_t& rows) {
std::vector<int> result_locs(_child_exprs.size(), -1);
std::vector<int> result_locs(local_state._child_exprs.size(), -1);
bool is_all_const = true;

for (size_t i = 0; i < _child_exprs.size(); ++i) {
RETURN_IF_ERROR(_child_exprs[i]->execute(&block, &result_locs[i]));
for (size_t i = 0; i < local_state._child_exprs.size(); ++i) {
RETURN_IF_ERROR(local_state._child_exprs[i]->execute(&block, &result_locs[i]));
is_all_const &= is_column_const(*block.get_by_position(result_locs[i]).column);
}
rows = is_all_const ? 1 : rows;

for (size_t i = 0; i < _child_exprs.size(); ++i) {
for (size_t i = 0; i < local_state._child_exprs.size(); ++i) {
size_t result_col_id = result_locs[i];

if (is_all_const) {
Expand Down Expand Up @@ -175,6 +210,9 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSinkState

RETURN_IF_ERROR(_shared_state->update_build_not_ignore_null(_child_exprs));

_runtime_filter_producer_helper = std::make_shared<RuntimeFilterProducerHelperSet>(profile());
RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, _child_exprs,
parent._runtime_filter_descs));
return Status::OK();
}

Expand Down
21 changes: 15 additions & 6 deletions be/src/pipeline/exec/set_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

#pragma once

#include <stdint.h>

#include "olap/olap_common.h"
#include "operator.h"
#include "runtime_filter/runtime_filter_producer_helper_set.h"

namespace doris {
#include "common/compile_check_begin.h"
Expand All @@ -46,6 +44,8 @@ class SetSinkLocalState final : public PipelineXSinkLocalState<SetSharedState> {

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status terminate(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;

private:
friend class SetSinkOperatorX<is_intersect>;
Expand All @@ -57,6 +57,9 @@ class SetSinkLocalState final : public PipelineXSinkLocalState<SetSharedState> {

RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _build_timer = nullptr;

std::shared_ptr<RuntimeFilterProducerHelperSet> _runtime_filter_producer_helper;
std::shared_ptr<CountedFinishDependency> _finish_dependency;
};

template <bool is_intersect>
Expand All @@ -71,14 +74,17 @@ class SetSinkOperatorX final : public DataSinkOperatorX<SetSinkLocalState<is_int
SetSinkOperatorX(int child_id, int sink_id, int dest_id, ObjectPool* pool,
const TPlanNode& tnode, const DescriptorTbl& descs)
: Base(sink_id, tnode.node_id, dest_id),
_cur_child_id(child_id),
_child_quantity(tnode.node_type == TPlanNodeType::type::INTERSECT_NODE
? tnode.intersect_node.result_expr_lists.size()
: tnode.except_node.result_expr_lists.size()),
_is_colocate(is_intersect ? tnode.intersect_node.is_colocate
: tnode.except_node.is_colocate),
_partition_exprs(is_intersect ? tnode.intersect_node.result_expr_lists[child_id]
: tnode.except_node.result_expr_lists[child_id]) {}
: tnode.except_node.result_expr_lists[child_id]),
_runtime_filter_descs(tnode.runtime_filters) {
DCHECK_EQ(child_id, _cur_child_id);
DCHECK_GT(_child_quantity, 1);
}

#ifdef BE_TEST
SetSinkOperatorX(int _child_quantity)
Expand All @@ -87,6 +93,7 @@ class SetSinkOperatorX final : public DataSinkOperatorX<SetSinkLocalState<is_int
_is_colocate(false),
_partition_exprs() {}
#endif

~SetSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TDataSink",
Expand Down Expand Up @@ -115,13 +122,15 @@ class SetSinkOperatorX final : public DataSinkOperatorX<SetSinkLocalState<is_int
vectorized::Block& block, vectorized::ColumnRawPtrs& raw_ptrs,
size_t& rows);

const int _cur_child_id;
const int _cur_child_id = 0;
const size_t _child_quantity;
// every child has its result expr list
vectorized::VExprContextSPtrs _child_exprs;
const bool _is_colocate;
const std::vector<TExpr> _partition_exprs;
using OperatorBase::_child;

const std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
};
#include "common/compile_check_end.h"

Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,9 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
query_id.__set_lo(queryid.lo);
if (auto q_ctx = get_query_ctx(query_id)) {
SCOPED_ATTACH_TASK(q_ctx.get());
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
if (!q_ctx->get_merge_controller_handler()) {
return Status::InternalError("Merge filter failed: Merge controller handler is null");
}
return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, attach_data);
} else {
return Status::EndOfFile(
Expand Down
59 changes: 59 additions & 0 deletions be/src/runtime_filter/runtime_filter_producer_helper_set.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "common/status.h"
#include "pipeline/pipeline_task.h"
#include "runtime/runtime_state.h"
#include "runtime_filter/runtime_filter.h"
#include "runtime_filter/runtime_filter_mgr.h"
#include "runtime_filter/runtime_filter_producer_helper.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"

namespace doris {
#include "common/compile_check_begin.h"
// this class used in set sink node
class RuntimeFilterProducerHelperSet : public RuntimeFilterProducerHelper {
public:
~RuntimeFilterProducerHelperSet() override = default;

RuntimeFilterProducerHelperSet(RuntimeProfile* profile)
: RuntimeFilterProducerHelper(profile, true, false) {}

Status process(RuntimeState* state, const vectorized::Block* block, uint64_t cardinality) {
if (_skip_runtime_filters_process) {
return Status::OK();
}

RETURN_IF_ERROR(_init_filters(state, cardinality));
if (cardinality != 0) {
RETURN_IF_ERROR(_insert(block, 0));
}

for (const auto& filter : _producers) {
filter->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
}

RETURN_IF_ERROR(_publish(state));
return Status::OK();
}
};
#include "common/compile_check_end.h"
} // namespace doris
Loading