Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion be/src/exec/operator/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ PushDownType FileScanLocalState::_should_push_down_binary_predicate(
}
}

bool FileScanLocalState::_push_down_topn(const RuntimePredicate& predicate) {
if (!predicate.target_is_slot(_parent->node_id())) {
return false;
}
auto& p = _parent->cast<FileScanOperatorX>();
const auto slot_id = predicate.get_texpr(_parent->node_id()).nodes[0].slot_ref.slot_id;
auto* slot = p._slot_id_to_slot_desc[slot_id];
DCHECK(slot != nullptr);
return p.can_push_down_column_predicate(slot);
}

int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) const {
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
Expand Down Expand Up @@ -107,7 +118,7 @@ Status FileScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
std::min(ScannerScheduler::default_remote_scan_thread_num() / p.parallelism(state()),
_max_scanners);
shard_num = std::max(shard_num, 1U);
_kv_cache.reset(new ShardedKVCache(shard_num));
_kv_cache = std::make_unique<ShardedKVCache>(shard_num);
for (int i = 0; i < _max_scanners; ++i) {
std::unique_ptr<FileScanner> scanner = FileScanner::create_unique(
state(), this, p._limit, _split_source, _scanner_profile.get(), _kv_cache.get(),
Expand Down Expand Up @@ -206,4 +217,9 @@ Status FileScanOperatorX::prepare(RuntimeState* state) {
return Status::OK();
}

bool FileScanOperatorX::can_push_down_column_predicate(const SlotDescriptor* slot) const {
// External readers do not fully support VARBINARY column predicates yet.
return slot->type()->get_primitive_type() != TYPE_VARBINARY;
}

} // namespace doris
11 changes: 4 additions & 7 deletions be/src/exec/operator/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

#pragma once

#include <stdint.h>

#include <cstdint>
#include <string>

#include "common/logging.h"
Expand Down Expand Up @@ -63,11 +62,7 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
PushDownType _should_push_down_topn_filter() const override {
return PushDownType::PARTIAL_ACCEPTABLE;
}
bool _push_down_topn(const RuntimePredicate& predicate) override {
// For external table/ file scan, first try push down the predicate,
// and then determine whether it can be pushed down within the (parquet/orc) reader.
return true;
}
bool _push_down_topn(const RuntimePredicate& predicate) override;

PushDownType _should_push_down_bitmap_filter() const override {
return PushDownType::UNACCEPTABLE;
Expand Down Expand Up @@ -123,6 +118,8 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
return column_id_counter;
}

bool can_push_down_column_predicate(const SlotDescriptor* slot) const override;

private:
friend class FileScanLocalState;

Expand Down
9 changes: 4 additions & 5 deletions be/src/exec/operator/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ bool ScanLocalState<Derived>::_is_predicate_acting_on_slot(const VExprSPtrs& chi
if (_slot_id_to_value_range.end() == sid_to_range) {
return false;
}
if (remove_nullable((*slot_desc)->type())->get_primitive_type() == TYPE_VARBINARY) {
if (!_parent->cast<typename Derived::Parent>().can_push_down_column_predicate(*slot_desc)) {
return false;
}
*range = &(sid_to_range->second);
Expand Down Expand Up @@ -1307,11 +1307,10 @@ Status ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
.nodes[0]
.slot_ref.slot_id];
DCHECK(s != nullptr);
if (remove_nullable(s->type())->get_primitive_type() == TYPE_VARBINARY) {
continue;
if (can_push_down_column_predicate(s)) {
auto col_name = s->col_name();
cid = get_column_id(col_name);
}
auto col_name = s->col_name();
cid = get_column_id(col_name);
}
RETURN_IF_ERROR(state->get_query_ctx()->get_runtime_predicate(id).init_target(
node_id(), _slot_id_to_slot_desc, cid));
Expand Down
8 changes: 6 additions & 2 deletions be/src/exec/operator/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ class ScanLocalState : public ScanLocalStateBase {
: ScanLocalStateBase(state, parent) {}
~ScanLocalState() override = default;

virtual Status init(RuntimeState* state, LocalStateInfo& info) override;
Status init(RuntimeState* state, LocalStateInfo& info) override;

virtual Status open(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status close(RuntimeState* state) override;
std::string debug_string(int indentation_level) const final;
Expand Down Expand Up @@ -388,6 +388,10 @@ class ScanOperatorX : public OperatorX<LocalStateType> {

[[nodiscard]] virtual int get_column_id(const std::string& col_name) const { return -1; }

[[nodiscard]] virtual bool can_push_down_column_predicate(const SlotDescriptor*) const {
return true;
}

TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }

DataDistribution required_data_distribution(RuntimeState* /*state*/) const override {
Expand Down
Loading