Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e998ce8
[Improvement](runtime-filter) support sync join node build side's siz…
BiteTheDDDDt Apr 8, 2024
ac6cefa
[Fix](schema change) Fix schema change fault when add complex type co…
Yukang-Lian Apr 8, 2024
b47348f
[Improvement](executor)cancel query when a query is queued (#33339)
wangbo Apr 8, 2024
c00e26e
[Fix]fix insert overwrite non-partition table null pointer exception …
feiniaofeiafei Apr 8, 2024
5732226
[fix](partial-update) remove unnecessary DECHEK on IndexChannel::num_…
hust-hhb Apr 8, 2024
3a35f76
[bugfix](profile) should use backend ip:heartbeat port as key during …
yiguolei Apr 8, 2024
1684591
[chore](test) let some case suitable for legacy planner and nereids (…
morrySnow Apr 9, 2024
bcda1d8
[Bug](load) fix stream load file on hll type mv column (#33373)
BiteTheDDDDt Apr 9, 2024
1d0076e
[improve](serde) support complex type in write/read pb serde (#33124)
zhangstar333 Apr 9, 2024
35d5c9c
[fix](Nereids): add order for constraint test (#33323)
keanji-x Apr 9, 2024
6bfeb0c
[feature](debug point) add macro DBUG_RUN_CALLBACK (#33407)
yujun777 Apr 9, 2024
4b60265
[feature](expr) add type check when expr prepare (#33330)
Mryange Apr 9, 2024
d427106
Fix alter column stats without min max value deserialize failure. (#3…
Jibing-Li Apr 9, 2024
0f34cc0
[log](chore) print isBad in Replica::toString() (#33427)
zhannngchen Apr 9, 2024
0a2a035
[fix](merge-cloud) fix possible data leak when creating tables (#33416)
luwei16 Apr 9, 2024
4b1cb17
[fix](cases) Add check status timeout for backup/restore cases (#32975)
w41ter Apr 9, 2024
9d93520
[fix](mtmv)debug log for tvf (#33431)
zddr Apr 10, 2024
afdb060
[bug](mem_tracker) fix mem_tracker dcheck failed as not used correctl…
zhangstar333 Apr 10, 2024
d4b7b74
[fix](memory) Fix memory tracker destructor deadlock (#33497)
xinyiZzz Apr 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ void Daemon::memory_maintenance_thread() {
// Update and print memory stat when the memory changes by 256M.
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
last_print_proc_mem = PerfCounters::get_vm_rss();
doris::MemTrackerLimiter::clean_tracker_limiter_group();
doris::MemTrackerLimiter::enable_print_log_process_usage();

// Refresh mem tracker each type counter.
Expand Down
12 changes: 6 additions & 6 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ Status ExecNode::prepare(RuntimeState* state) {

RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc()));

if (has_output_row_descriptor()) {
RETURN_IF_ERROR(
vectorized::VExpr::check_expr_output_type(_projections, *_output_row_descriptor));
}

for (auto& i : _children) {
RETURN_IF_ERROR(i->prepare(state));
}
Expand Down Expand Up @@ -582,12 +587,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo

auto& mutable_columns = mutable_block.mutable_columns();

if (mutable_columns.size() != _projections.size()) {
return Status::InternalError(
"Logical error during processing {}, output of projections {} mismatches with "
"exec node output {}",
this->get_name(), _projections.size(), mutable_columns.size());
}
DCHECK_EQ(mutable_columns.size(), _projections.size());

for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
Expand Down
4 changes: 3 additions & 1 deletion be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {

Status init_with_fixed_length() { return init_with_fixed_length(_bloom_filter_length); }

Status init_with_cardinality(const size_t build_bf_cardinality, int id = 0) {
bool get_build_bf_cardinality() const { return _build_bf_exactly; }

Status init_with_cardinality(const size_t build_bf_cardinality) {
if (_build_bf_exactly) {
// Use the same algorithm as org.apache.doris.planner.RuntimeFilter#calculateFilterSize
constexpr double fpp = 0.05;
Expand Down
457 changes: 263 additions & 194 deletions be/src/exprs/runtime_filter.cpp

Large diffs are not rendered by default.

25 changes: 21 additions & 4 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct SharedRuntimeFilterContext;

namespace pipeline {
class RuntimeFilterTimer;
struct CountedFinishDependency;
} // namespace pipeline

enum class RuntimeFilterType {
Expand Down Expand Up @@ -221,7 +222,7 @@ class IRuntimeFilter {
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
bool build_bf_exactly = false, bool need_local_merge = false);

vectorized::SharedRuntimeFilterContext& get_shared_context_ref();
SharedRuntimeFilterContext& get_shared_context_ref();

// insert data to build filter
void insert_batch(vectorized::ColumnPtr column, size_t start);
Expand All @@ -230,6 +231,8 @@ class IRuntimeFilter {
// push filter to remote node or push down it to scan_node
Status publish(bool publish_local = false);

Status send_filter_size(uint64_t local_filter_size);

RuntimeFilterType type() const { return _runtime_filter_type; }

PrimitiveType column_type() const;
Expand Down Expand Up @@ -294,10 +297,13 @@ class IRuntimeFilter {
void update_filter(RuntimePredicateWrapper* filter_wrapper, int64_t merge_time,
int64_t start_apply);

void set_ignored(const std::string& msg);
void set_ignored();

bool get_ignored();

// for ut
bool is_bloomfilter();
RuntimeFilterType get_real_type();

bool need_sync_filter_size();

// async push runtimefilter to remote node
Status push_to_remote(const TNetworkAddress* addr, bool opt_remote_rf);
Expand Down Expand Up @@ -358,6 +364,14 @@ class IRuntimeFilter {
void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
std::string formatted_state() const;

void set_synced_size(uint64_t global_size);

void set_dependency(pipeline::CountedFinishDependency* dependency);

int64_t get_synced_size() const { return _synced_size; }

bool isset_synced_size() const { return _synced_size != -1; }

protected:
// serialize _wrapper to protobuf
void to_protobuf(PInFilter* filter);
Expand Down Expand Up @@ -437,6 +451,9 @@ class IRuntimeFilter {
bool _need_local_merge = false;

std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;

int64_t _synced_size = -1;
pipeline::CountedFinishDependency* _dependency = nullptr;
};

// avoid expose RuntimePredicateWrapper
Expand Down
167 changes: 58 additions & 109 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,130 +34,77 @@ class VRuntimeFilterSlots {
public:
VRuntimeFilterSlots(
const std::vector<std::shared_ptr<vectorized::VExprContext>>& build_expr_ctxs,
const std::vector<IRuntimeFilter*>& runtime_filters, bool need_local_merge = false)
: _build_expr_context(build_expr_ctxs),
_runtime_filters(runtime_filters),
_need_local_merge(need_local_merge) {}

Status init(RuntimeState* state, int64_t hash_table_size) {
// runtime filter effect strategy
// 1. we will ignore IN filter when hash_table_size is too big
// 2. we will ignore BLOOM filter and MinMax filter when hash_table_size
// is too small and IN filter has effect
std::map<int, bool> has_in_filter;

auto ignore_local_filter = [&](int filter_id) {
auto runtime_filter_mgr = _need_local_merge ? state->global_runtime_filter_mgr()
: state->local_runtime_filter_mgr();

std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, filters));
if (filters.empty()) {
throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, filter_id={}",
filter_id);
}
for (auto* filter : filters) {
filter->set_ignored("");
filter->signal();
}
return Status::OK();
};
const std::vector<IRuntimeFilter*>& runtime_filters)
: _build_expr_context(build_expr_ctxs), _runtime_filters(runtime_filters) {
for (auto* runtime_filter : _runtime_filters) {
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
}
}

auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string& msg) {
runtime_filter->set_ignored(msg);
RETURN_IF_ERROR(runtime_filter->publish());
Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, bool publish_local,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'send_filter_size' can be made static [readability-convert-member-functions-to-static]

Suggested change
Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, bool publish_local,
static Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, bool publish_local,

pipeline::CountedFinishDependency* dependency) {
if (_runtime_filters.empty() || publish_local) {
return Status::OK();
};

// ordered vector: IN, IN_OR_BLOOM, others.
// so we can ignore other filter if IN Predicate exists.
auto compare_desc = [](IRuntimeFilter* d1, IRuntimeFilter* d2) {
if (d1->type() == d2->type()) {
return false;
} else if (d1->type() == RuntimeFilterType::IN_FILTER) {
return true;
} else if (d2->type() == RuntimeFilterType::IN_FILTER) {
return false;
} else if (d1->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
return true;
} else if (d2->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
return false;
} else {
return d1->type() < d2->type();
}
for (auto* runtime_filter : _runtime_filters) {
if (runtime_filter->need_sync_filter_size()) {
runtime_filter->set_dependency(dependency);
}
};
std::sort(_runtime_filters.begin(), _runtime_filters.end(), compare_desc);

// do not create 'in filter' when hash_table size over limit
const auto max_in_num = state->runtime_filter_max_in_num();
const bool over_max_in_num = (hash_table_size >= max_in_num);
}

// send_filter_size may call dependency->sub(), so we call set_dependency firstly for all rf to avoid dependency set_ready repeatedly
for (auto* runtime_filter : _runtime_filters) {
if (runtime_filter->expr_order() < 0 ||
runtime_filter->expr_order() >= _build_expr_context.size()) {
return Status::InternalError(
"runtime_filter meet invalid expr_order, expr_order={}, "
"_build_expr_context.size={}",
runtime_filter->expr_order(), _build_expr_context.size());
if (runtime_filter->need_sync_filter_size()) {
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
}
}
return Status::OK();
}

bool is_in_filter = (runtime_filter->type() == RuntimeFilterType::IN_FILTER);
// use synced size when this rf has global merged
static uint64_t get_real_size(IRuntimeFilter* runtime_filter, uint64_t hash_table_size) {
return runtime_filter->isset_synced_size() ? runtime_filter->get_synced_size()
: hash_table_size;
}

if (over_max_in_num &&
runtime_filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter());
Status ignore_filters(RuntimeState* state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'ignore_filters' can be made static [readability-convert-member-functions-to-static]

Suggested change
Status ignore_filters(RuntimeState* state) {
static Status ignore_filters(RuntimeState* state) {

// process ignore duplicate IN_FILTER
std::unordered_set<int> has_in_filter;
for (auto* filter : _runtime_filters) {
if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
continue;
}

if (runtime_filter->is_bloomfilter()) {
RETURN_IF_ERROR(runtime_filter->init_bloom_filter(hash_table_size));
if (has_in_filter.contains(filter->expr_order())) {
filter->set_ignored();
continue;
}
has_in_filter.insert(filter->expr_order());
}

// Note:
// In the case that exist *remote target* and in filter and other filter,
// we must merge other filter whatever in filter is over the max num in current node,
// because:
// case 1: (in filter >= max num) in current node, so in filter will be ignored,
// and then other filter can be used
// case 2: (in filter < max num) in current node, we don't know whether the in filter
// will be ignored in merge node, so we must transfer other filter to merge node
if (!runtime_filter->has_remote_target()) {
bool exists_in_filter = has_in_filter[runtime_filter->expr_order()];
if (is_in_filter && over_max_in_num) {
VLOG_DEBUG << "fragment instance " << print_id(state->fragment_instance_id())
<< " ignore runtime filter(in filter id "
<< runtime_filter->filter_id() << ") because: in_num("
<< hash_table_size << ") >= max_in_num(" << max_in_num << ")";
RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id()));
continue;
} else if (!is_in_filter && exists_in_filter) {
// do not create 'bloom filter' and 'minmax filter' when 'in filter' has created
// because in filter is exactly filter, so it is enough to filter data
VLOG_DEBUG << "fragment instance " << print_id(state->fragment_instance_id())
<< " ignore runtime filter("
<< IRuntimeFilter::to_string(runtime_filter->type()) << " id "
<< runtime_filter->filter_id()
<< ") because: already exists in filter";
RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id()));
continue;
}
} else if (is_in_filter && over_max_in_num) {
std::string msg = fmt::format(
"fragment instance {} ignore runtime filter(in filter id {}) because: "
"in_num({}) >= max_in_num({})",
print_id(state->fragment_instance_id()), runtime_filter->filter_id(),
hash_table_size, max_in_num);
RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, msg));
// process ignore filter when it has IN_FILTER on same expr, and init bloom filter size
for (auto* filter : _runtime_filters) {
if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
!has_in_filter.contains(filter->expr_order())) {
continue;
}
filter->set_ignored();
}
return Status::OK();
}

if ((runtime_filter->type() == RuntimeFilterType::IN_FILTER) ||
(runtime_filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
!over_max_in_num)) {
has_in_filter[runtime_filter->expr_order()] = true;
Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'init_filters' can be made static [readability-convert-member-functions-to-static]

Suggested change
Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
static Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {

// process IN_OR_BLOOM_FILTER's real type
for (auto* filter : _runtime_filters) {
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
get_real_size(filter, local_hash_table_size) > state->runtime_filter_max_in_num()) {
RETURN_IF_ERROR(filter->change_to_bloom_filter());
}
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
}

if (filter->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
RETURN_IF_ERROR(
filter->init_bloom_filter(get_real_size(filter, local_hash_table_size)));
}
}
return Status::OK();
}

Expand All @@ -171,6 +118,9 @@ class VRuntimeFilterSlots {
int result_column_id = _build_expr_context[i]->get_last_result_column_id();
const auto& column = block->get_by_position(result_column_id).column;
for (auto* filter : iter->second) {
if (filter->get_ignored()) {
continue;
}
filter->insert_batch(column, 1);
}
}
Expand Down Expand Up @@ -213,7 +163,6 @@ class VRuntimeFilterSlots {
private:
const std::vector<std::shared_ptr<vectorized::VExprContext>>& _build_expr_context;
std::vector<IRuntimeFilter*> _runtime_filters;
const bool _need_local_merge = false;
// prob_contition index -> [IRuntimeFilter]
std::map<int, std::list<IRuntimeFilter*>> _runtime_filters_map;
};
Expand Down
1 change: 1 addition & 0 deletions be/src/http/action/checksum_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ int64_t ChecksumAction::do_checksum(int64_t tablet_id, int64_t version, int32_t
Status res = Status::OK();
uint32_t checksum;
EngineChecksumTask engine_task(tablet_id, schema_hash, version, &checksum);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
res = engine_task.execute();
if (!res.ok()) {
LOG(WARNING) << "checksum failed. status: " << res << ", signature: " << tablet_id;
Expand Down
Loading