From bf16d69b9133ff4dfab8e785f343068298c94193 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 6 Jun 2026 16:52:54 +0800 Subject: [PATCH] fix tvf read --- .../data_type_datetimev2_serde.cpp | 6 +- .../data_type_datev2_serde.cpp | 6 +- .../data_type_decimal_serde.cpp | 22 ++ .../data_type_nullable_serde.cpp | 1 - .../data_type_number_serde.cpp | 15 +- .../data_type_string_serde.cpp | 6 +- .../data_type_serde/data_type_time_serde.cpp | 6 +- .../data_type_serde/decoded_column_view.h | 18 ++ be/src/exec/scan/file_scanner_v2.cpp | 11 +- be/src/exprs/vexpr.cpp | 9 + be/src/exprs/vexpr.h | 1 + be/src/format/reader/column_mapper.cpp | 258 +++++++++++++----- be/src/format/reader/column_mapper.h | 22 +- be/src/format/reader/schema_projection.cpp | 1 + be/src/format/reader/table/hive_reader.h | 2 +- be/src/format/reader/table/paimon_reader.cpp | 125 +++++++++ be/src/format/reader/table/paimon_reader.h | 7 + be/src/format/reader/table_reader.cpp | 26 +- be/src/format/reader/table_reader.h | 19 +- be/src/format/table/iceberg_reader_v2.h | 17 +- .../data_type_serde_decoded_values_test.cpp | 40 ++- be/test/format/reader/expr/cast_test.cpp | 101 +++++-- 22 files changed, 597 insertions(+), 122 deletions(-) diff --git a/be/src/core/data_type_serde/data_type_datetimev2_serde.cpp b/be/src/core/data_type_serde/data_type_datetimev2_serde.cpp index ce0599080c6b2a..2012830f1ac340 100644 --- a/be/src/core/data_type_serde/data_type_datetimev2_serde.cpp +++ b/be/src/core/data_type_serde/data_type_datetimev2_serde.cpp @@ -457,7 +457,7 @@ Status DataTypeDateTimeV2SerDe::read_column_from_decoded_values( if (view.value_kind != DecodedValueKind::INT64) { return Status::NotSupported("DATETIMEV2 decoded reader expects INT64 source"); } - if (view.values == nullptr && view.row_count > 0) { + if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) { return Status::Corruption("Decoded value buffer is null for {}", column.get_name()); } auto& data = assert_cast(column).get_data(); @@ -465,6 +465,10 @@ Status DataTypeDateTimeV2SerDe::read_column_from_decoded_values( static const cctz::time_zone utc_time_zone = cctz::utc_time_zone(); const int64_t second_mask = view.time_unit == DecodedTimeUnit::MILLIS ? 1000 : 1000000; for (int64_t row = 0; row < view.row_count; ++row) { + if (decoded_column_view_row_is_null(view, row)) { + data.push_back(DateV2Value()); + continue; + } int64_t epoch_seconds = values[row] / second_mask; int64_t sub_second = values[row] % second_mask; if (sub_second < 0) { diff --git a/be/src/core/data_type_serde/data_type_datev2_serde.cpp b/be/src/core/data_type_serde/data_type_datev2_serde.cpp index 94b86312d61d3a..f4e6d503d331f5 100644 --- a/be/src/core/data_type_serde/data_type_datev2_serde.cpp +++ b/be/src/core/data_type_serde/data_type_datev2_serde.cpp @@ -131,12 +131,16 @@ Status DataTypeDateV2SerDe::read_column_from_decoded_values(IColumn& column, if (view.value_kind != DecodedValueKind::INT32) { return Status::NotSupported("DATEV2 decoded reader expects INT32 source"); } - if (view.values == nullptr && view.row_count > 0) { + if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) { return Status::Corruption("Decoded value buffer is null for {}", column.get_name()); } auto& data = assert_cast(column).get_data(); const auto* values = reinterpret_cast(view.values); for (int64_t row = 0; row < view.row_count; ++row) { + if (decoded_column_view_row_is_null(view, row)) { + data.push_back(DateV2Value()); + continue; + } DateV2Value date_v2; date_v2.get_date_from_daynr(values[row] + date_threshold); data.push_back(date_v2); diff --git a/be/src/core/data_type_serde/data_type_decimal_serde.cpp b/be/src/core/data_type_serde/data_type_decimal_serde.cpp index 9fa2e0c6ebd9cc..5beef40e64d775 100644 --- a/be/src/core/data_type_serde/data_type_decimal_serde.cpp +++ b/be/src/core/data_type_serde/data_type_decimal_serde.cpp @@ -83,8 +83,30 @@ typename PrimitiveTypeTraits::CppType read_decimal_decoded_value(const Decode template Status read_decimal_decoded_values(IColumn& column, const DecodedColumnView& view) { + if (view.value_kind == DecodedValueKind::INT32 || view.value_kind == DecodedValueKind::INT64) { + if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) { + return Status::Corruption("Decoded value buffer is null for {}", column.get_name()); + } + } else if (view.binary_values == nullptr && decoded_column_view_has_non_null_value(view)) { + return Status::Corruption("Decoded binary values are null for {}", column.get_name()); + } auto& data = assert_cast&>(column).get_data(); for (int64_t row = 0; row < view.row_count; ++row) { + if (decoded_column_view_row_is_null(view, row)) { + data.push_back(typename PrimitiveTypeTraits::CppType()); + continue; + } + if (view.value_kind == DecodedValueKind::BINARY || + view.value_kind == DecodedValueKind::FIXED_BINARY) { + const auto& value = (*view.binary_values)[row]; + const auto length = view.value_kind == DecodedValueKind::FIXED_BINARY + ? view.fixed_length + : cast_set(value.size); + if (value.data == nullptr && length > 0) { + return Status::Corruption("Decoded decimal binary value is null for {} at row {}", + column.get_name(), row); + } + } data.push_back(read_decimal_decoded_value(view, row)); } return Status::OK(); diff --git a/be/src/core/data_type_serde/data_type_nullable_serde.cpp b/be/src/core/data_type_serde/data_type_nullable_serde.cpp index b02c8606332b92..4cdce029ef84fd 100644 --- a/be/src/core/data_type_serde/data_type_nullable_serde.cpp +++ b/be/src/core/data_type_serde/data_type_nullable_serde.cpp @@ -363,7 +363,6 @@ Status DataTypeNullableSerDe::read_column_from_decoded_values(IColumn& column, memcpy(dst, view.null_map, view.row_count); } DecodedColumnView nested_view = view; - nested_view.null_map = nullptr; return nested_serde->read_column_from_decoded_values(nullable_column.get_nested_column(), nested_view); } diff --git a/be/src/core/data_type_serde/data_type_number_serde.cpp b/be/src/core/data_type_serde/data_type_number_serde.cpp index a38b574be37e40..442b99a6d4b702 100644 --- a/be/src/core/data_type_serde/data_type_number_serde.cpp +++ b/be/src/core/data_type_serde/data_type_number_serde.cpp @@ -53,7 +53,7 @@ const NativeType* decoded_values_as(const DecodedColumnView& view) { template Status read_number_decoded_values(IColumn& column, const DecodedColumnView& view) { - if (view.values == nullptr && view.row_count > 0) { + if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) { return Status::Corruption("Decoded value buffer is null for {}", column.get_name()); } auto& data = @@ -61,6 +61,10 @@ Status read_number_decoded_values(IColumn& column, const DecodedColumnView& view const auto* values = decoded_values_as(view); for (int64_t row = 0; row < view.row_count; ++row) { using DorisCppType = typename PrimitiveTypeTraits::CppType; + if (decoded_column_view_row_is_null(view, row)) { + data.push_back(DorisCppType()); + continue; + } data.push_back(static_cast(values[row])); } return Status::OK(); @@ -188,13 +192,14 @@ Status DataTypeNumberSerDe::read_column_from_decoded_values( if (view.value_kind == DecodedValueKind::BOOL) { return read_number_decoded_values(column, view); } - } else if constexpr (T == TYPE_INT) { + } else if constexpr (T == TYPE_TINYINT || T == TYPE_SMALLINT || T == TYPE_INT) { if (view.value_kind == DecodedValueKind::INT32) { - return read_number_decoded_values(column, view); + return read_number_decoded_values(column, view); } - } else if constexpr (T == TYPE_BIGINT) { + } else if constexpr (T == TYPE_TINYINT || T == TYPE_SMALLINT || T == TYPE_INT || + T == TYPE_BIGINT) { if (view.value_kind == DecodedValueKind::INT64) { - return read_number_decoded_values(column, view); + return read_number_decoded_values(column, view); } } else if constexpr (T == TYPE_FLOAT) { if (view.value_kind == DecodedValueKind::FLOAT) { diff --git a/be/src/core/data_type_serde/data_type_string_serde.cpp b/be/src/core/data_type_serde/data_type_string_serde.cpp index 2715f74ded989a..814360ecdd7d55 100644 --- a/be/src/core/data_type_serde/data_type_string_serde.cpp +++ b/be/src/core/data_type_serde/data_type_string_serde.cpp @@ -29,11 +29,15 @@ namespace { template Status read_string_decoded_values(IColumn& column, const DecodedColumnView& view) { - if (view.binary_values == nullptr && view.row_count > 0) { + if (view.binary_values == nullptr && decoded_column_view_has_non_null_value(view)) { return Status::Corruption("Decoded binary values are null for {}", column.get_name()); } auto& string_column = assert_cast(column); for (int64_t row = 0; row < view.row_count; ++row) { + if (decoded_column_view_row_is_null(view, row)) { + string_column.insert_default(); + continue; + } const auto& value = (*view.binary_values)[row]; string_column.insert_data(value.data, value.size); } diff --git a/be/src/core/data_type_serde/data_type_time_serde.cpp b/be/src/core/data_type_serde/data_type_time_serde.cpp index a40a8d217c9bd4..de59373f7c5347 100644 --- a/be/src/core/data_type_serde/data_type_time_serde.cpp +++ b/be/src/core/data_type_serde/data_type_time_serde.cpp @@ -177,11 +177,15 @@ Status DataTypeTimeV2SerDe::read_column_from_decoded_values(IColumn& column, if (view.value_kind != DecodedValueKind::INT32 && view.value_kind != DecodedValueKind::INT64) { return Status::NotSupported("TIMEV2 decoded reader expects INT32 or INT64 source"); } - if (view.values == nullptr && view.row_count > 0) { + if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) { return Status::Corruption("Decoded value buffer is null for {}", column.get_name()); } auto& data = assert_cast(column).get_data(); for (int64_t row = 0; row < view.row_count; ++row) { + if (decoded_column_view_row_is_null(view, row)) { + data.push_back(TimeValue::TimeType()); + continue; + } data.push_back(read_time_decoded_value(view, row)); } return Status::OK(); diff --git a/be/src/core/data_type_serde/decoded_column_view.h b/be/src/core/data_type_serde/decoded_column_view.h index 9b0b14b17c777d..c0eb42cfe6d114 100644 --- a/be/src/core/data_type_serde/decoded_column_view.h +++ b/be/src/core/data_type_serde/decoded_column_view.h @@ -59,4 +59,22 @@ struct DecodedColumnView { const std::vector* binary_values = nullptr; }; +inline bool decoded_column_view_row_is_null(const DecodedColumnView& view, int64_t row) { + return view.null_map != nullptr && view.null_map[row] != 0; +} + +inline bool decoded_column_view_has_non_null_value(const DecodedColumnView& view) { + if (view.null_map == nullptr) { + return view.row_count > 0; + } + + // TODO(gabriel): optimize null map check with SIMD or bitset if needed. + for (int64_t row = 0; row < view.row_count; ++row) { + if (view.null_map[row] == 0) { + return true; + } + } + return false; +} + } // namespace doris diff --git a/be/src/exec/scan/file_scanner_v2.cpp b/be/src/exec/scan/file_scanner_v2.cpp index e1b64f44e71e93..822d5c8969a70d 100644 --- a/be/src/exec/scan/file_scanner_v2.cpp +++ b/be/src/exec/scan/file_scanner_v2.cpp @@ -50,6 +50,7 @@ #include "exprs/vexpr_context.h" #include "exprs/vslot_ref.h" #include "format/format_common.h" +#include "format/reader/column_mapper.h" #include "format/reader/expr/slot_ref.h" #include "format/reader/table/hive_reader.h" #include "format/reader/table/paimon_reader.h" @@ -544,12 +545,14 @@ Status rewrite_slot_refs_to_global_index( *expr = TableSlotRef::create_shared(cast_set(global_index.value()), cast_set(global_index.value()), -1, slot_ref->data_type(), slot_ref->column_name()); + RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr)); return Status::OK(); } const auto global_index = global_index_it->second; *expr = TableSlotRef::create_shared(cast_set(global_index.value()), cast_set(global_index.value()), -1, slot_ref->data_type(), slot_ref->column_name()); + RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr)); return Status::OK(); } auto children = (*expr)->children(); @@ -876,13 +879,11 @@ Status FileScannerV2::_build_table_conjuncts(VExprContextSPtrs* conjuncts) const conjuncts->clear(); conjuncts->reserve(_conjuncts.size()); for (const auto& conjunct : _conjuncts) { - VExprContextSPtr table_conjunct; - RETURN_IF_ERROR(conjunct->clone(_state, table_conjunct)); - auto root = table_conjunct->root(); + VExprSPtr root; + RETURN_IF_ERROR(reader::clone_table_expr_tree(conjunct->root(), &root)); RETURN_IF_ERROR( rewrite_slot_refs_to_global_index(&root, _column_unique_id_to_global_index)); - table_conjunct->set_root(root); - conjuncts->push_back(std::move(table_conjunct)); + conjuncts->push_back(VExprContext::create_shared(std::move(root))); } return Status::OK(); } diff --git a/be/src/exprs/vexpr.cpp b/be/src/exprs/vexpr.cpp index a61baaa908d0a6..9617f5c7c59fef 100644 --- a/be/src/exprs/vexpr.cpp +++ b/be/src/exprs/vexpr.cpp @@ -397,6 +397,15 @@ Status VExpr::open(RuntimeState* state, VExprContext* context, return Status::OK(); } +void VExpr::reset_prepare_state() { + _prepared = false; + _prepare_finished = false; + _open_finished = false; + for (auto& child : _children) { + child->reset_prepare_state(); + } +} + void VExpr::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { for (auto& i : _children) { i->close(context, scope); diff --git a/be/src/exprs/vexpr.h b/be/src/exprs/vexpr.h index 8b7246e71a538a..dec871f601651b 100644 --- a/be/src/exprs/vexpr.h +++ b/be/src/exprs/vexpr.h @@ -262,6 +262,7 @@ class VExpr { virtual const VExprSPtrs& children() const { return _children; } void set_children(const VExprSPtrs& children) { _children = children; } void set_children(VExprSPtrs&& children) { _children = std::move(children); } + void reset_prepare_state(); virtual std::string debug_string() const; static std::string debug_string(const VExprSPtrs& exprs); static std::string debug_string(const VExprContextSPtrs& ctxs); diff --git a/be/src/format/reader/column_mapper.cpp b/be/src/format/reader/column_mapper.cpp index 8d1a18db344499..85deabd3246c00 100644 --- a/be/src/format/reader/column_mapper.cpp +++ b/be/src/format/reader/column_mapper.cpp @@ -32,7 +32,11 @@ #include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_struct.h" #include "exprs/create_predicate_function.h" +#include "exprs/vcompound_pred.h" +#include "exprs/vdirect_in_predicate.h" +#include "exprs/vexpr_context.h" #include "exprs/vin_predicate.h" +#include "exprs/vectorized_fn_call.h" #include "format/reader/expr/cast.h" #include "format/reader/expr/literal.h" #include "format/reader/expr/slot_ref.h" @@ -154,6 +158,30 @@ std::string data_type_debug_string(const DataTypePtr& type) { return type == nullptr ? "null" : type->get_name(); } +std::string field_debug_string(const Field& field) { + std::ostringstream out; + out << "Field{type=" << type_to_string(field.get_type()) << ", value="; + switch (field.get_type()) { + case TYPE_NULL: + out << "null"; + break; + case TYPE_INT: + out << field.get(); + break; + case TYPE_BIGINT: + out << field.get(); + break; + case TYPE_STRING: + out << field.get(); + break; + default: + out << field.to_debug_string(0); + break; + } + out << "}"; + return out.str(); +} + template std::string join_debug_strings(const std::vector& values, Formatter formatter) { std::ostringstream out; @@ -190,6 +218,27 @@ struct FileSlotRewriteInfo { std::string file_column_name; }; +struct RewriteContext { + RuntimeState* runtime_state = nullptr; + std::vector created_exprs {}; + + void add_created_expr(VExprSPtr expr) { created_exprs.push_back(std::move(expr)); } + + Status prepare_created_exprs(VExprContext* context) const { + DORIS_CHECK(context != nullptr); + RowDescriptor row_desc; + for (const auto& expr : created_exprs) { + if (dynamic_cast(expr.get()) != nullptr && runtime_state == nullptr) { + return Status::InvalidArgument( + "RuntimeState is required to prepare rewritten cast expression {}", + expr->expr_name()); + } + RETURN_IF_ERROR(expr->prepare(runtime_state, row_desc, context)); + } + return Status::OK(); + } +}; + struct StructChildSelector { bool by_name = true; std::string name; @@ -201,12 +250,8 @@ struct NestedStructPath { std::vector selectors; }; -// A split-local literal produced by slot-literal predicate localization. -// -// TableColumnMapper currently rewrites VExpr trees in-place because VExpr has no generic deep -// clone API. The same table-level conjunct can therefore be localized repeatedly for different -// splits. This wrapper keeps the original table literal so the next split can restore table -// semantics before trying its own file-type literal rewrite. +// A split-local literal produced by slot-literal predicate localization. This wrapper keeps the +// original table literal so a cloned conjunct can be localized again for another split. class SplitLocalFileLiteral final : public TableLiteral { public: SplitLocalFileLiteral(const DataTypePtr& file_type, const Field& file_field, @@ -214,7 +259,6 @@ class SplitLocalFileLiteral final : public TableLiteral { : TableLiteral(file_type, file_field), _original_type(std::move(original_type)), _original_field(std::move(original_field)) {} - const DataTypePtr& original_type() const { return _original_type; } const Field& original_field() const { return _original_field; } @@ -224,10 +268,13 @@ class SplitLocalFileLiteral final : public TableLiteral { }; static VExprSPtr create_file_slot_ref(const VSlotRef& slot_ref, - const FileSlotRewriteInfo& rewrite_info) { - return TableSlotRef::create_shared(slot_ref.slot_id(), - cast_set(rewrite_info.block_position), -1, - rewrite_info.file_type, rewrite_info.file_column_name); + const FileSlotRewriteInfo& rewrite_info, + RewriteContext* rewrite_context) { + auto ref = TableSlotRef::create_shared(slot_ref.slot_id(), + cast_set(rewrite_info.block_position), -1, + rewrite_info.file_type, rewrite_info.file_column_name); + rewrite_context->add_created_expr(ref); + return ref; } static bool is_cast_expr(const VExprSPtr& expr) { @@ -263,8 +310,8 @@ std::string TableColumnMapperOptions::debug_string() const { std::string TableColumnMapper::debug_string(const ColumnDefinition& column) { std::ostringstream out; - out << "ColumnDefinition{name=" << column.name - << ", identifier_type=" << static_cast(column.identifier.get_type()) + out << "ColumnDefinition{name=" << column.name << ", identifier=" + << field_debug_string(column.identifier) << ", local_id=" << column.local_id << ", type=" << data_type_debug_string(column.type) << ", children=" << join_debug_strings(column.children, @@ -463,15 +510,73 @@ static Field literal_field(const VExprSPtr& literal_expr) { return field; } -static VExprSPtr original_table_literal(const VExprSPtr& literal_expr) { +Status clone_table_expr_tree(const VExprSPtr& expr, VExprSPtr* cloned_expr) { + DORIS_CHECK(cloned_expr != nullptr); + if (expr == nullptr) { + *cloned_expr = nullptr; + return Status::OK(); + } + + VExprSPtr cloned; + if (const auto* table_slot_ref = dynamic_cast(expr.get())) { + cloned = TableSlotRef::create_shared( + table_slot_ref->slot_id(), table_slot_ref->column_id(), + table_slot_ref->column_uniq_id(), table_slot_ref->data_type(), + table_slot_ref->column_name()); + } else if (const auto* vslot_ref = dynamic_cast(expr.get())) { + cloned = TableSlotRef::create_shared(vslot_ref->slot_id(), vslot_ref->column_id(), + vslot_ref->column_uniq_id(), vslot_ref->data_type(), + vslot_ref->column_name()); + } else if (const auto* split_literal = dynamic_cast(expr.get())) { + cloned = std::make_shared( + split_literal->data_type(), literal_field(expr), split_literal->original_type(), + split_literal->original_field()); + } else if (dynamic_cast(expr.get()) != nullptr) { + cloned = TableLiteral::create_shared(expr->data_type(), literal_field(expr)); + } else if (expr->is_literal()) { + cloned = TableLiteral::create_shared(expr->data_type(), literal_field(expr)); + } else if (const auto* cast_expr = dynamic_cast(expr.get())) { + cloned = std::make_shared(*cast_expr); + } else if (const auto* in_pred = dynamic_cast(expr.get())) { + cloned = std::make_shared(*in_pred); + } else if (const auto* direct_in_pred = dynamic_cast(expr.get())) { + cloned = std::make_shared(*direct_in_pred); + } else if (const auto* compound_pred = dynamic_cast(expr.get())) { + cloned = std::make_shared(*compound_pred); + } else if (const auto* fn_call = dynamic_cast(expr.get())) { + cloned = std::make_shared(*fn_call); + } else { + return Status::NotSupported("Cannot clone expression {} for file-local rewrite", + expr->expr_name()); + } + + VExprSPtrs cloned_children; + cloned_children.reserve(expr->children().size()); + for (const auto& child : expr->children()) { + VExprSPtr cloned_child; + RETURN_IF_ERROR(clone_table_expr_tree(child, &cloned_child)); + cloned_children.push_back(std::move(cloned_child)); + } + cloned->set_children(std::move(cloned_children)); + cloned->reset_prepare_state(); + *cloned_expr = std::move(cloned); + return Status::OK(); +} + +static VExprSPtr original_table_literal(const VExprSPtr& literal_expr, + RewriteContext* rewrite_context = nullptr) { DORIS_CHECK(literal_expr != nullptr); DORIS_CHECK(literal_expr->is_literal()); const auto* rewritten_literal = dynamic_cast(literal_expr.get()); if (rewritten_literal == nullptr) { return literal_expr; } - return TableLiteral::create_shared(rewritten_literal->original_type(), - rewritten_literal->original_field()); + auto literal = TableLiteral::create_shared(rewritten_literal->original_type(), + rewritten_literal->original_field()); + if (rewrite_context != nullptr) { + rewrite_context->add_created_expr(literal); + } + return literal; } static bool is_struct_element_expr(const VExprSPtr& expr) { @@ -1101,10 +1206,11 @@ static Status build_projected_type_from_projection(const DataTypePtr& file_type, } static VExprSPtr rewrite_literal_to_file_type(const VExprSPtr& literal_expr, - const FileSlotRewriteInfo& rewrite_info) { + const FileSlotRewriteInfo& rewrite_info, + RewriteContext* rewrite_context) { DORIS_CHECK(literal_expr != nullptr); DORIS_CHECK(literal_expr->is_literal()); - const auto original_literal = original_table_literal(literal_expr); + const auto original_literal = original_table_literal(literal_expr, rewrite_context); const Field original_field = literal_field(original_literal); if (rewrite_info.file_type->equals(*original_literal->data_type())) { return original_literal; @@ -1119,13 +1225,16 @@ static VExprSPtr rewrite_literal_to_file_type(const VExprSPtr& literal_expr, if (file_field.is_null()) { return nullptr; } - return std::make_shared(rewrite_info.file_type, file_field, - original_literal->data_type(), original_field); + auto literal = std::make_shared( + rewrite_info.file_type, file_field, original_literal->data_type(), original_field); + rewrite_context->add_created_expr(literal); + return literal; } static bool rewrite_binary_slot_literal_predicate( const VExprSPtr& expr, - const std::map& global_to_file_slot) { + const std::map& global_to_file_slot, + RewriteContext* rewrite_context) { if (!is_binary_comparison_predicate(expr)) { return false; } @@ -1149,14 +1258,15 @@ static bool rewrite_binary_slot_literal_predicate( return false; } - auto rewritten_literal = rewrite_literal_to_file_type(literal_expr, *rewrite_info); + auto rewritten_literal = rewrite_literal_to_file_type(literal_expr, *rewrite_info, + rewrite_context); if (rewritten_literal == nullptr) { - children[literal_child_idx] = original_table_literal(literal_expr); + children[literal_child_idx] = original_table_literal(literal_expr, rewrite_context); expr->set_children(std::move(children)); return false; } - children[slot_child_idx] = create_file_slot_ref(*slot_ref, *rewrite_info); + children[slot_child_idx] = create_file_slot_ref(*slot_ref, *rewrite_info, rewrite_context); children[literal_child_idx] = std::move(rewritten_literal); expr->set_children(std::move(children)); return true; @@ -1164,7 +1274,8 @@ static bool rewrite_binary_slot_literal_predicate( static bool rewrite_in_slot_literal_predicate( const VExprSPtr& expr, - const std::map& global_to_file_slot) { + const std::map& global_to_file_slot, + RewriteContext* rewrite_context) { if (expr->node_type() != TExprNodeType::IN_PRED || expr->get_num_children() < 2) { return false; } @@ -1184,13 +1295,15 @@ static bool rewrite_in_slot_literal_predicate( if (literal_expr == nullptr) { return false; } - auto rewritten_literal = rewrite_literal_to_file_type(literal_expr, *rewrite_info); + auto rewritten_literal = rewrite_literal_to_file_type(literal_expr, *rewrite_info, + rewrite_context); if (rewritten_literal == nullptr) { for (size_t restore_idx = 1; restore_idx < children.size(); ++restore_idx) { auto restore_literal = unwrap_literal_for_file_cast(children[restore_idx], rewrite_info->table_type); if (restore_literal != nullptr) { - children[restore_idx] = original_table_literal(restore_literal); + children[restore_idx] = original_table_literal(restore_literal, + rewrite_context); } } expr->set_children(std::move(children)); @@ -1199,7 +1312,7 @@ static bool rewrite_in_slot_literal_predicate( rewritten_literals.push_back(std::move(rewritten_literal)); } - children[0] = create_file_slot_ref(*slot_ref, *rewrite_info); + children[0] = create_file_slot_ref(*slot_ref, *rewrite_info, rewrite_context); for (size_t literal_idx = 0; literal_idx < rewritten_literals.size(); ++literal_idx) { children[literal_idx + 1] = std::move(rewritten_literals[literal_idx]); } @@ -1209,14 +1322,16 @@ static bool rewrite_in_slot_literal_predicate( static VExprSPtr rewrite_table_expr_to_file_expr( const VExprSPtr& expr, - const std::map& global_to_file_slot) { + const std::map& global_to_file_slot, + RewriteContext* rewrite_context) { if (expr == nullptr) { return nullptr; } - if (rewrite_binary_slot_literal_predicate(expr, global_to_file_slot)) { + DORIS_CHECK(rewrite_context != nullptr); + if (rewrite_binary_slot_literal_predicate(expr, global_to_file_slot, rewrite_context)) { return expr; } - if (rewrite_in_slot_literal_predicate(expr, global_to_file_slot)) { + if (rewrite_in_slot_literal_predicate(expr, global_to_file_slot, rewrite_context)) { return expr; } if (is_struct_element_expr(expr)) { @@ -1229,12 +1344,14 @@ static VExprSPtr rewrite_table_expr_to_file_expr( // struct_element must see the actual file struct layout. Casting the parent struct // to the output projection can hide filter-only children such as `s.id` in // `SELECT s.name WHERE s.id > 5`. - children[0] = create_file_slot_ref(*slot_ref, rewrite_it->second); + children[0] = create_file_slot_ref(*slot_ref, rewrite_it->second, + rewrite_context); expr->set_children(std::move(children)); return expr; } } - children[0] = rewrite_table_expr_to_file_expr(children[0], global_to_file_slot); + children[0] = + rewrite_table_expr_to_file_expr(children[0], global_to_file_slot, rewrite_context); expr->set_children(std::move(children)); return expr; } @@ -1244,21 +1361,20 @@ static VExprSPtr rewrite_table_expr_to_file_expr( global_to_file_slot.find(GlobalIndex(cast_set(slot_ref->slot_id()))); if (rewrite_it != global_to_file_slot.end()) { const auto& rewrite_info = rewrite_it->second; - auto file_slot = create_file_slot_ref(*slot_ref, rewrite_info); + auto file_slot = create_file_slot_ref(*slot_ref, rewrite_info, rewrite_context); if (rewrite_info.file_type->equals(*rewrite_info.table_type)) { return file_slot; } auto cast_expr = Cast::create_shared(rewrite_info.table_type); cast_expr->add_child(std::move(file_slot)); + rewrite_context->add_created_expr(cast_expr); return cast_expr; } return expr; } - // rewrite_table_expr_to_file_expr localizes the expression tree in-place because VExpr does - // not provide a generic deep-clone API. A previous split may already have inserted Cast(slot) - // for the same table-level conjunct. Keep that rewrite idempotent: rewrite the cast child - // from table slot to the current split's file slot, and drop the cast when the current split - // no longer needs it. + // The input is a split-local cloned tree. A previous split-local clone may already have + // inserted Cast(slot). Keep that rewrite idempotent: rewrite the cast child from table slot to + // the current split's file slot, and drop the cast when the current split no longer needs it. if (is_cast_expr(expr) && expr->get_num_children() == 1) { const auto& child = expr->children()[0]; if (child->is_slot_ref()) { @@ -1267,7 +1383,8 @@ static VExprSPtr rewrite_table_expr_to_file_expr( global_to_file_slot.find(GlobalIndex(cast_set(slot_ref->slot_id()))); if (rewrite_it != global_to_file_slot.end() && expr->data_type()->equals(*rewrite_it->second.table_type)) { - auto rewritten_child = create_file_slot_ref(*slot_ref, rewrite_it->second); + auto rewritten_child = create_file_slot_ref(*slot_ref, rewrite_it->second, + rewrite_context); if (rewrite_it->second.file_type->equals(*rewrite_it->second.table_type)) { return rewritten_child; } @@ -1277,13 +1394,11 @@ static VExprSPtr rewrite_table_expr_to_file_expr( } } - // VExpr currently does not provide a generic deep-clone API for arbitrary expression types. - // Keep all slot-localization mutation inside ColumnMapper and rebuild it for every split - // before the localized expression is prepared/opened by TableReader. VExprSPtrs rewritten_children; rewritten_children.reserve(expr->children().size()); for (const auto& child : expr->children()) { - rewritten_children.push_back(rewrite_table_expr_to_file_expr(child, global_to_file_slot)); + rewritten_children.push_back( + rewrite_table_expr_to_file_expr(child, global_to_file_slot, rewrite_context)); } expr->set_children(std::move(rewritten_children)); return expr; @@ -1309,6 +1424,8 @@ static bool complex_projection_has_pruned_children(const ColumnMapping& mapping) return true; } for (const auto& child_mapping : mapping.child_mappings) { + // `child_mapping.table_column_name != child_mapping.file_column_name` means this column is renamed + // `!child_mapping.file_local_id.has_value()` means this column is miss in file if (child_mapping.table_column_name != child_mapping.file_column_name || !child_mapping.file_local_id.has_value() || complex_projection_has_pruned_children(child_mapping)) { @@ -1453,10 +1570,12 @@ static Status add_scan_column(FileScanRequest* file_request, ColumnMapping* mapp if (mapping->has_complex_projection || complex_projection_has_pruned_children(*mapping)) { if (!mapping->has_complex_projection) { RETURN_IF_ERROR(rebuild_projected_file_type(mapping)); + DCHECK(scan_columns == &file_request->predicate_columns); } RETURN_IF_ERROR(build_complex_projection(*mapping, &projection)); } if (scan_columns == &file_request->predicate_columns) { + DCHECK(filter_projections != nullptr); RETURN_IF_ERROR(merge_filter_projection(filter_projections, &projection)); } RETURN_IF_ERROR(apply_projection_to_mapping_file_type(projection, mapping)); @@ -1599,13 +1718,8 @@ static const ColumnDefinition* find_file_child_by_table_column( static const ColumnDefinition* find_file_child_for_complex_wrapper( const ColumnDefinition& table_child, const ColumnDefinition& file_field, TableColumnMappingMode mode) { - const auto primitive_type = remove_nullable(file_field.type)->get_primitive_type(); - if (primitive_type == TYPE_ARRAY || primitive_type == TYPE_MAP) { - if (file_field.children.empty()) { - return nullptr; - } - DORIS_CHECK(file_field.children.size() == 1); - return &file_field.children[0]; + if (file_field.children.empty()) { + return nullptr; } return find_file_child_by_table_column(table_child, file_field.children, mode); } @@ -1627,15 +1741,16 @@ Status TableColumnMapper::create_mapping(const std::vector& pr mapping.table_type, partition_values.at(table_column.name)))); } else if (_options.mode == TableColumnMappingMode::BY_INDEX && !table_column.is_partition_key) { + // 2. BY_INDEX mapping, use the file column at the position specified by `ColumnDefinition::identifier` as a direct mapping. This mode is only used by Hive. RETURN_IF_ERROR(_create_by_index_mapping(table_column, file_schema, &mapping)); } else if (const auto* file_field = _find_file_field(table_column, file_schema)) { - // 2. Table column has a matching file column, use it as a direct mapping. + // 3. Table column has a matching file column, use it as a direct mapping. RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field, &mapping)); } else if (table_column.default_expr != nullptr) { - // 3. Table column does not exist in file (column adding by schema evolution), which has a default expression, use it as a constant mapping. + // 4. Table column does not exist in file (column adding by schema evolution), which has a default expression, use it as a constant mapping. _set_constant_mapping(&mapping, table_column.default_expr); } else if (table_column.name == ROW_LINEAGE_ROW_ID) { - // 4. Virtual column, use special mapping to indicate it should be materialized by table reader instead of read from file or evaluated from expression. + // 5. Virtual column, use special mapping to indicate it should be materialized by table reader instead of read from file or evaluated from expression. mapping.virtual_column_type = TableVirtualColumnType::ROW_ID; } else if (table_column.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) { mapping.virtual_column_type = TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER; @@ -1755,7 +1870,8 @@ Status TableColumnMapper::_build_result_column_mapping(const FileScanRequest& fi Status TableColumnMapper::create_scan_request( const std::vector& table_filters, const TableColumnPredicates& table_column_predicates, - const std::vector& projected_columns, FileScanRequest* file_request) { + const std::vector& projected_columns, FileScanRequest* file_request, + RuntimeState* runtime_state) { // FileReader evaluates expressions against a file-local block. This mapper owns the // table-column to file-column conversion, so it also owns the file-local block positions. file_request->predicate_columns.clear(); @@ -1790,7 +1906,8 @@ Status TableColumnMapper::create_scan_request( } } // 2. Build referenced predicate columns - RETURN_IF_ERROR(localize_filters(table_filters, table_column_predicates, file_request)); + RETURN_IF_ERROR( + localize_filters(table_filters, table_column_predicates, file_request, runtime_state)); // 3. Re-build projections for all referenced file columns to point to the correct file-local block positions. for (auto& mapping : _mappings) { if (!mapping.file_local_id.has_value()) { @@ -1826,7 +1943,8 @@ const ColumnMapping* TableColumnMapper::_find_mapping(GlobalIndex global_index) Status TableColumnMapper::localize_filters(const std::vector& table_filters, const TableColumnPredicates& table_column_predicates, - FileScanRequest* file_request) { + FileScanRequest* file_request, + RuntimeState* runtime_state) { FilterProjectionMap filter_projections; RETURN_IF_ERROR(build_filter_projection_map(table_filters, &_mappings, &filter_projections)); for (const auto& table_filter : table_filters) { @@ -1848,10 +1966,19 @@ Status TableColumnMapper::localize_filters(const std::vector& table for (const auto& table_filter : table_filters) { if (table_filter.conjunct != nullptr && table_filter_has_only_local_entries(table_filter, _filter_entries)) { - file_request->conjuncts.push_back( - VExprContext::create_shared(rewrite_table_expr_to_file_expr( - table_filter.conjunct->root(), global_to_file_slot))); - table_filter.conjunct->clone_fn_contexts(file_request->conjuncts.back().get()); + RewriteContext rewrite_context {.runtime_state = runtime_state}; + VExprSPtr rewrite_root; + const auto clone_status = + clone_table_expr_tree(table_filter.conjunct->root(), &rewrite_root); + if (!clone_status.ok()) { + continue; + } + auto localized_root = + rewrite_table_expr_to_file_expr(rewrite_root, global_to_file_slot, + &rewrite_context); + auto localized_conjunct = VExprContext::create_shared(std::move(localized_root)); + RETURN_IF_ERROR(rewrite_context.prepare_created_exprs(localized_conjunct.get())); + file_request->conjuncts.push_back(std::move(localized_conjunct)); } } for (const auto& [global_index, predicates] : table_column_predicates) { @@ -1899,7 +2026,7 @@ Status TableColumnMapper::_create_direct_mapping(const ColumnDefinition& table_c mapping->original_file_type = file_field.type; mapping->original_file_children = file_field.children; mapping->file_type = file_field.type; - mapping->is_trivial = _is_same_type(mapping->table_type, mapping->file_type); + mapping->is_trivial = mapping->table_type->equals(*mapping->file_type); mapping->filter_conversion = mapping->is_trivial ? FilterConversionType::COPY_DIRECTLY : FilterConversionType::CAST_FILTER; mapping->child_mappings.clear(); @@ -1933,10 +2060,13 @@ Status TableColumnMapper::_create_direct_mapping(const ColumnDefinition& table_c } if (complex_projection_has_pruned_children(*mapping)) { mapping->has_complex_projection = true; + // If complex projection prunes some children, we have to rebuild the projected file type to make sure the reader expression can find the correct child types by name. RETURN_IF_ERROR(build_projected_child_type(mapping->file_type, mapping->child_mappings, &mapping->file_type)); - mapping->is_trivial = mapping->table_type != nullptr && - mapping->table_type->equals(*mapping->file_type); + DCHECK(!complex_projection_has_pruned_children(*mapping)); + DCHECK(mapping->table_type != nullptr); + mapping->is_trivial = mapping->table_type->equals(*mapping->file_type); + // TODO: ? READER_EXPRESSION mapping->filter_conversion = mapping->is_trivial ? FilterConversionType::COPY_DIRECTLY : FilterConversionType::READER_EXPRESSION; diff --git a/be/src/format/reader/column_mapper.h b/be/src/format/reader/column_mapper.h index 101d78db2f58bf..7378017927054a 100644 --- a/be/src/format/reader/column_mapper.h +++ b/be/src/format/reader/column_mapper.h @@ -34,6 +34,7 @@ namespace doris { class ColumnPredicate; +class RuntimeState; } // namespace doris namespace doris::reader { @@ -64,10 +65,10 @@ enum TableVirtualColumnType { }; enum class FilterConversionType { - COPY_DIRECTLY, - CAST_FILTER, + COPY_DIRECTLY, // filter can be copied directly from file layer without any change, e.g. column type and table type are the same and no complex nested projection is involved. + CAST_FILTER, // filter can be converted from file layer by adding a cast, e.g. column type is nullable but table type is not, or file column has a trivial nested projection but table column has a complex nested projection. READER_EXPRESSION, - FINALIZE_ONLY, + FINALIZE_ONLY, // filter cannot be converted to file layer and should be evaluated at table reader finalize phase, e.g. a child column of a nested column is null in file schema. CONSTANT, }; @@ -157,6 +158,8 @@ struct TableColumnMapperOptions { std::string debug_string() const; }; +Status clone_table_expr_tree(const VExprSPtr& expr, VExprSPtr* cloned_expr); + // 通用 table schema 到 file schema 映射层。 // Iceberg 会使用 BY_FIELD_ID;普通 by-name 场景可以复用该组件,但不应把它命名成 // Iceberg-only 组件。 @@ -179,14 +182,16 @@ class TableColumnMapper { virtual Status create_scan_request(const std::vector& table_filters, const TableColumnPredicates& table_column_predicates, const std::vector& projected_columns, - FileScanRequest* file_request); + FileScanRequest* file_request, + RuntimeState* runtime_state = nullptr); // 将 table-level filter 定位到文件 schema。 // trivial mapping 可以直接复制结构化谓词;类型变化时可以尝试安全 cast;无法安全 // 下推的表达式应通过 reader_expression_map 或 table-level finalize/filter fallback 处理。 virtual Status localize_filters(const std::vector& table_filters, const TableColumnPredicates& table_column_predicates, - FileScanRequest* file_request); + FileScanRequest* file_request, + RuntimeState* runtime_state = nullptr); void clear() { _mappings.clear(); _constant_map.clear(); @@ -226,13 +231,8 @@ class TableColumnMapper { ColumnMapping* _find_mapping(GlobalIndex global_index); const ColumnMapping* _find_mapping(GlobalIndex global_index) const; - bool _is_same_type(const DataTypePtr& table_type, const DataTypePtr& file_type) const { - DORIS_CHECK(table_type != nullptr); - DORIS_CHECK(file_type != nullptr); - return table_type->equals(*file_type); - } - TableColumnMapperOptions _options; + // Column mapping for each projected column, in the same order as projected_columns. Each entry describes how to get one table/global column from file-local sources, and carries metadata for filter localization and result finalize. std::vector _mappings; std::map _filter_entries; std::map _column_map_results; diff --git a/be/src/format/reader/schema_projection.cpp b/be/src/format/reader/schema_projection.cpp index aff99d7d1d3be3..7523c731e7ee88 100644 --- a/be/src/format/reader/schema_projection.cpp +++ b/be/src/format/reader/schema_projection.cpp @@ -48,6 +48,7 @@ Status rebuild_projected_type(const DataTypePtr& original_type, nested_projected_type = std::make_shared(child_types[0]); break; case TYPE_MAP: { + // TODO: ? DORIS_CHECK(child_types.size() == 1); DORIS_CHECK(remove_nullable(child_types[0])->get_primitive_type() == TYPE_STRUCT); DORIS_CHECK(remove_nullable(original_type)->get_primitive_type() == TYPE_MAP); diff --git a/be/src/format/reader/table/hive_reader.h b/be/src/format/reader/table/hive_reader.h index 477fc1c01d640f..098951107afbcb 100644 --- a/be/src/format/reader/table/hive_reader.h +++ b/be/src/format/reader/table/hive_reader.h @@ -28,7 +28,7 @@ class HiveReader final : public reader::TableReader { ~HiveReader() final = default; Status init(reader::TableReadOptions&& options) override; - reader::TableColumnMappingMode default_mapping_mode() const override { return _mode; } + reader::TableColumnMappingMode mapping_mode() const override { return _mode; } private: reader::TableColumnMappingMode _mode = reader::TableColumnMappingMode::BY_NAME; diff --git a/be/src/format/reader/table/paimon_reader.cpp b/be/src/format/reader/table/paimon_reader.cpp index d5c450b2c0172b..f821bfeb90116d 100644 --- a/be/src/format/reader/table/paimon_reader.cpp +++ b/be/src/format/reader/table/paimon_reader.cpp @@ -21,8 +21,133 @@ #include #include "format/table/deletion_vector_reader.h" +#include "util/string_util.h" namespace doris::paimon { +namespace { + +const schema::external::TField* get_field_ptr(const schema::external::TFieldPtr& field_ptr) { + if (!field_ptr.__isset.field_ptr || field_ptr.field_ptr == nullptr) { + return nullptr; + } + return field_ptr.field_ptr.get(); +} + +const schema::external::TSchema* find_schema(const TFileScanRangeParams* params, + int64_t schema_id) { + if (params == nullptr || !params->__isset.history_schema_info) { + return nullptr; + } + for (const auto& schema : params->history_schema_info) { + if (schema.__isset.schema_id && schema.schema_id == schema_id) { + return &schema; + } + } + return nullptr; +} + +const schema::external::TField* find_child_field_by_name( + const std::vector& fields, const std::string& name) { + for (const auto& field_ptr : fields) { + const auto* field = get_field_ptr(field_ptr); + if (field != nullptr && field->__isset.name && to_lower(field->name) == to_lower(name)) { + return field; + } + } + return nullptr; +} + +void annotate_column_from_field(reader::ColumnDefinition* column, + const schema::external::TField& field); + +void annotate_struct_children(reader::ColumnDefinition* column, + const schema::external::TStructField& struct_field) { + DORIS_CHECK(column != nullptr); + if (!struct_field.__isset.fields) { + return; + } + for (auto& child : column->children) { + const auto* child_field = find_child_field_by_name(struct_field.fields, child.name); + if (child_field != nullptr) { + annotate_column_from_field(&child, *child_field); + } + } +} + +void annotate_column_from_field(reader::ColumnDefinition* column, + const schema::external::TField& field) { + DORIS_CHECK(column != nullptr); + if (field.__isset.id) { + column->identifier = Field::create_field(field.id); + } + if (!field.__isset.nestedField) { + return; + } + if (field.nestedField.__isset.struct_field) { + annotate_struct_children(column, field.nestedField.struct_field); + } else if (field.nestedField.__isset.array_field) { + if (column->children.empty() || !field.nestedField.array_field.__isset.item_field) { + return; + } + const auto* item_field = get_field_ptr(field.nestedField.array_field.item_field); + if (item_field != nullptr) { + annotate_column_from_field(&column->children.front(), *item_field); + } + } else if (field.nestedField.__isset.map_field) { + if (!column->children.empty() && field.nestedField.map_field.__isset.key_field) { + const auto* key_field = get_field_ptr(field.nestedField.map_field.key_field); + if (key_field != nullptr) { + annotate_column_from_field(&column->children.front(), *key_field); + } + } + if (column->children.size() > 1 && field.nestedField.map_field.__isset.value_field) { + const auto* value_field = get_field_ptr(field.nestedField.map_field.value_field); + if (value_field != nullptr) { + annotate_column_from_field(&column->children[1], *value_field); + } + } + } +} + +} // namespace + +Status PaimonReader::prepare_split(const reader::SplitReadOptions& options) { + _split_schema_id = -1; + const auto& paimon_params = options.current_range.table_format_params.paimon_params; + if (paimon_params.__isset.schema_id) { + _split_schema_id = paimon_params.schema_id; + } + return reader::TableReader::prepare_split(options); +} + +reader::TableColumnMappingMode PaimonReader::mapping_mode() const { + if (_split_schema_id < 0 || _scan_params == nullptr || !_scan_params->__isset.current_schema_id || + !_scan_params->__isset.history_schema_info) { + return reader::TableColumnMappingMode::BY_NAME; + } + return find_schema(_scan_params, _split_schema_id) == nullptr + ? reader::TableColumnMappingMode::BY_NAME + : reader::TableColumnMappingMode::BY_FIELD_ID; +} + +Status PaimonReader::annotate_file_schema(std::vector* file_schema) { + DORIS_CHECK(file_schema != nullptr); + if (mapping_mode() != reader::TableColumnMappingMode::BY_FIELD_ID) { + return Status::OK(); + } + const auto* schema = find_schema(_scan_params, _split_schema_id); + DORIS_CHECK(schema != nullptr); + if (!schema->__isset.root_field || !schema->root_field.__isset.fields) { + return Status::OK(); + } + for (auto& column : *file_schema) { + const auto* field = find_child_field_by_name(schema->root_field.fields, column.name); + if (field != nullptr) { + annotate_column_from_field(&column, *field); + } + } + return Status::OK(); +} Status PaimonReader::_parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc* desc, bool* has_delete_file) { diff --git a/be/src/format/reader/table/paimon_reader.h b/be/src/format/reader/table/paimon_reader.h index ce386460a6e681..feb5f5ed956cbd 100644 --- a/be/src/format/reader/table/paimon_reader.h +++ b/be/src/format/reader/table/paimon_reader.h @@ -28,10 +28,17 @@ class PaimonReader final : public reader::TableReader { public: ENABLE_FACTORY_CREATOR(PaimonReader); ~PaimonReader() final = default; + Status prepare_split(const reader::SplitReadOptions& options) override; protected: + reader::TableColumnMappingMode mapping_mode() const override; + Status annotate_file_schema(std::vector* file_schema) override; + Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc* desc, bool* has_delete_file) override; + +private: + int64_t _split_schema_id = -1; }; } // namespace doris::paimon diff --git a/be/src/format/reader/table_reader.cpp b/be/src/format/reader/table_reader.cpp index aba4640d5b05cb..b04c6ca23d3289 100644 --- a/be/src/format/reader/table_reader.cpp +++ b/be/src/format/reader/table_reader.cpp @@ -30,6 +30,7 @@ #include "common/status.h" #include "core/assert_cast.h" #include "exec/common/endian.h" +#include "exprs/vexpr_context.h" #include "exprs/vslot_ref.h" #include "format/new_parquet/parquet_reader.h" #include "format/reader/column_mapper.h" @@ -109,9 +110,24 @@ std::string partition_values_debug_string(const std::map& pa return out.str(); } +std::string expr_context_debug_string(const VExprContextSPtr& context) { + if (context == nullptr) { + return "null"; + } + const auto root = context->root(); + if (root == nullptr) { + return "VExprContext{root=null}"; + } + std::ostringstream out; + out << "VExprContext{root_name=" << root->expr_name() + << ", root_debug=" << root->debug_string() << "}"; + return out.str(); +} + std::string table_filter_debug_string(const TableFilter& filter) { std::ostringstream out; - out << "TableFilter{has_conjunct=" << (filter.conjunct != nullptr) << ", global_indices=" + out << "TableFilter{conjunct=" << expr_context_debug_string(filter.conjunct) + << ", global_indices=" << join_table_reader_debug_strings( filter.global_indices, [](GlobalIndex global_index) { return std::to_string(global_index.value()); }) @@ -268,7 +284,13 @@ std::string TableReader::debug_string() const { [](const TableFilter& filter) { return table_filter_debug_string(filter); }) << ", table_column_predicates=" << table_column_predicates_debug_string(_table_column_predicates) - << ", conjunct_count=" << _conjuncts.size() << ", file_schema=" + << ", conjunct_count=" << _conjuncts.size() << ", conjuncts=" + << join_table_reader_debug_strings( + _conjuncts, + [](const VExprContextSPtr& conjunct) { + return expr_context_debug_string(conjunct); + }) + << ", file_schema=" << join_table_reader_debug_strings(_data_reader.file_schema, [](const ColumnDefinition& field) { return TableColumnMapper::debug_string(field); diff --git a/be/src/format/reader/table_reader.h b/be/src/format/reader/table_reader.h index cb37bf2746c92b..0f356628abd79d 100644 --- a/be/src/format/reader/table_reader.h +++ b/be/src/format/reader/table_reader.h @@ -215,9 +215,13 @@ class TableReader { // 切换到下一个 reader 的通用流程。 // 该方法先关闭当前 reader,再打开下一个具体 reader;子类不应重复实现这个循环。 Status create_next_reader(bool* eos); - virtual TableColumnMappingMode default_mapping_mode() const { + virtual TableColumnMappingMode mapping_mode() const { return TableColumnMappingMode::BY_NAME; } + virtual Status annotate_file_schema(std::vector* file_schema) { + DORIS_CHECK(file_schema != nullptr); + return Status::OK(); + } // 打开当前具体 reader。 // 子类在这里基于当前 split/task 初始化底层 FileReader。 @@ -225,13 +229,9 @@ class TableReader { // 1. Get file schema and create column mapping. std::vector file_schema; RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema)); - // TODO: It's different for paimon/hudi/iceberg - bool has_field_id = !file_schema.empty() && file_schema.front().has_identifier_field_id(); + RETURN_IF_ERROR(annotate_file_schema(&file_schema)); _data_reader.file_schema = file_schema; - _mapper_options.mode = default_mapping_mode() == TableColumnMappingMode::BY_FIELD_ID - ? (has_field_id ? TableColumnMappingMode::BY_FIELD_ID - : TableColumnMappingMode::BY_NAME) - : default_mapping_mode(); + _mapper_options.mode = mapping_mode(); _data_reader.column_mapper = TableColumnMapper(_mapper_options); RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_projected_columns, @@ -247,7 +247,8 @@ class TableReader { // are pruning hints. auto file_request = std::make_unique(); RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request( - _table_filters, _table_column_predicates, _projected_columns, file_request.get())); + _table_filters, _table_column_predicates, _projected_columns, file_request.get(), + _runtime_state)); bool constant_filter_pruned_split = false; RETURN_IF_ERROR(_evaluate_constant_filters(&constant_filter_pruned_split)); if (constant_filter_pruned_split) { @@ -303,6 +304,7 @@ class TableReader { } RETURN_IF_ERROR(_data_reader.reader->open(file_request)); RETURN_IF_ERROR(_open_mapping_exprs()); + LOG(WARNING) << "==========1 " << debug_string(); return Status::OK(); } @@ -558,6 +560,7 @@ class TableReader { const size_t rows, ColumnPtr* column) { if (mapping.has_complex_projection && mapping.file_local_id.has_value() && !mapping.child_mappings.empty()) { + DCHECK(mapping.projection != nullptr); int res_id; RETURN_IF_ERROR(mapping.projection->execute(current_block, &res_id)); RETURN_IF_ERROR(_materialize_complex_mapping_column( diff --git a/be/src/format/table/iceberg_reader_v2.h b/be/src/format/table/iceberg_reader_v2.h index d3b6bee81051ee..f80efa22fb1c4b 100644 --- a/be/src/format/table/iceberg_reader_v2.h +++ b/be/src/format/table/iceberg_reader_v2.h @@ -52,8 +52,10 @@ class IcebergTableReader : public reader::TableReader { } Status prepare_split(const reader::SplitReadOptions& options) override; - reader::TableColumnMappingMode default_mapping_mode() const override { - return reader::TableColumnMappingMode::BY_FIELD_ID; + reader::TableColumnMappingMode mapping_mode() const override { + return !_data_reader.file_schema.empty() && _has_field_id(_data_reader.file_schema) + ? reader::TableColumnMappingMode::BY_FIELD_ID + : reader::TableColumnMappingMode::BY_NAME; } protected: @@ -69,6 +71,17 @@ class IcebergTableReader : public reader::TableReader { Status _init_delete_predicates(const TTableFormatFileDesc& t_desc); private: + bool _has_field_id(const std::vector& schema) const { + for (const auto& field : schema) { + if (!field.has_identifier_field_id()) { + return false; + } + if (!_has_field_id(field.children)) { + return false; + } + } + return true; + } static constexpr int MIN_SUPPORT_DELETE_FILES_VERSION = 2; static constexpr int POSITION_DELETE = 1; static constexpr int EQUALITY_DELETE = 2; diff --git a/be/test/core/data_type_serde/data_type_serde_decoded_values_test.cpp b/be/test/core/data_type_serde/data_type_serde_decoded_values_test.cpp index 1622775b6a871a..47a5b5765db02d 100644 --- a/be/test/core/data_type_serde/data_type_serde_decoded_values_test.cpp +++ b/be/test/core/data_type_serde/data_type_serde_decoded_values_test.cpp @@ -245,9 +245,45 @@ TEST(DataTypeSerDeDecodedValuesTest, ReadNullableInt32Values) { EXPECT_FALSE(nullable_column.is_null_at(2)); EXPECT_TRUE(nullable_column.is_null_at(3)); EXPECT_EQ(nested_column.get_element(0), 1); - EXPECT_EQ(nested_column.get_element(1), 2); + EXPECT_EQ(nested_column.get_element(1), 0); EXPECT_EQ(nested_column.get_element(2), 3); - EXPECT_EQ(nested_column.get_element(3), 4); + EXPECT_EQ(nested_column.get_element(3), 0); +} + +TEST(DataTypeSerDeDecodedValuesTest, ReadNullableDecimalBinaryValues) { + auto type = std::make_shared(std::make_shared(18, 2)); + auto column = type->create_column(); + const uint8_t positive[] = {0x30, 0x39}; + const uint8_t negative[] = {0xff, 0xbd}; + std::vector values = { + StringRef(reinterpret_cast(positive), 2), + StringRef(static_cast(nullptr), 0), + StringRef(reinterpret_cast(negative), 2), + }; + const uint8_t null_map[] = {0, 1, 0}; + + DecodedColumnView view; + view.value_kind = DecodedValueKind::FIXED_BINARY; + view.row_count = values.size(); + view.binary_values = &values; + view.fixed_length = 2; + view.decimal_precision = 18; + view.decimal_scale = 2; + view.null_map = null_map; + + auto st = type->get_serde()->read_column_from_decoded_values(*column, view); + ASSERT_TRUE(st.ok()) << st; + + const auto& nullable_column = assert_cast(*column); + const auto& decimal_column = + assert_cast(nullable_column.get_nested_column()); + ASSERT_EQ(nullable_column.size(), 3); + EXPECT_FALSE(nullable_column.is_null_at(0)); + EXPECT_TRUE(nullable_column.is_null_at(1)); + EXPECT_FALSE(nullable_column.is_null_at(2)); + EXPECT_EQ(decimal_column.get_element(0), Decimal128V3(12345)); + EXPECT_EQ(decimal_column.get_element(1), Decimal128V3(0)); + EXPECT_EQ(decimal_column.get_element(2), Decimal128V3(-67)); } TEST(DataTypeSerDeDecodedValuesTest, RejectMismatchedValueKind) { diff --git a/be/test/format/reader/expr/cast_test.cpp b/be/test/format/reader/expr/cast_test.cpp index 8261ba8f2fbf0c..7acc0bf225dcec 100644 --- a/be/test/format/reader/expr/cast_test.cpp +++ b/be/test/format/reader/expr/cast_test.cpp @@ -343,7 +343,8 @@ TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) { reader::FileScanRequest file_request; ASSERT_TRUE( - mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok()); + mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request, &state) + .ok()); ASSERT_EQ(file_request.conjuncts.size(), 1); ASSERT_EQ(projection_ids(file_request.predicate_columns), std::vector({0})); const auto& localized_expr = file_request.conjuncts[0]->root(); @@ -376,6 +377,54 @@ TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) { file_request.conjuncts[0]->close(); } +TEST_F(CastTest, ColumnMapperRepreparesRewrittenPreparedFilter) { + reader::TableColumnMapper mapper; + reader::ColumnDefinition table_column; + table_column.identifier = Field::create_field(7); + table_column.name = "value"; + table_column.type = std::make_shared(); + std::vector projected_columns {table_column}; + + reader::ColumnDefinition file_field; + file_field.identifier = Field::create_field(0); + file_field.name = "value"; + file_field.type = std::make_shared(); + std::vector file_schema {file_field}; + + auto status = mapper.create_mapping(projected_columns, {}, file_schema); + ASSERT_TRUE(status.ok()) << status; + + auto cast = Cast::create_shared(table_column.type); + cast->add_child(TableSlotRef::create_shared(0, 0, -1, table_column.type, "value")); + reader::TableFilter table_filter; + table_filter.conjunct = VExprContext::create_shared(cast); + table_filter.global_indices = {reader::GlobalIndex(0)}; + status = table_filter.conjunct->prepare(&state, RowDescriptor()); + ASSERT_TRUE(status.ok()) << status; + status = table_filter.conjunct->open(&state); + ASSERT_TRUE(status.ok()) << status; + + reader::FileScanRequest file_request; + ASSERT_TRUE( + mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request, &state) + .ok()); + ASSERT_EQ(file_request.conjuncts.size(), 1); + const auto& localized_expr = file_request.conjuncts[0]->root(); + ASSERT_NE(dynamic_cast(localized_expr.get()), nullptr); + ASSERT_EQ(localized_expr->get_num_children(), 1); + const auto* localized_slot = + assert_cast(localized_expr->children()[0].get()); + EXPECT_EQ(localized_slot->column_id(), 0); + EXPECT_TRUE(localized_slot->data_type()->equals(*file_field.type)); + + status = file_request.conjuncts[0]->prepare(&state, RowDescriptor()); + ASSERT_TRUE(status.ok()) << status; + status = file_request.conjuncts[0]->open(&state); + ASSERT_TRUE(status.ok()) << status; + + file_request.conjuncts[0]->close(); +} + TEST_F(CastTest, ColumnMapperCastsLiteralForSlotLiteralPredicateTypeMismatch) { reader::TableColumnMapper mapper; reader::ColumnDefinition table_column; @@ -403,7 +452,8 @@ TEST_F(CastTest, ColumnMapperCastsLiteralForSlotLiteralPredicateTypeMismatch) { reader::FileScanRequest file_request; ASSERT_TRUE( - mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok()); + mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request, &state) + .ok()); ASSERT_EQ(file_request.conjuncts.size(), 1); ASSERT_EQ(projection_ids(file_request.predicate_columns), std::vector({0})); const auto& localized_expr = file_request.conjuncts[0]->root(); @@ -462,7 +512,8 @@ TEST_F(CastTest, ColumnMapperCastsLiteralForLiteralSlotPredicateTypeMismatch) { reader::FileScanRequest file_request; ASSERT_TRUE( - mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok()); + mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request, &state) + .ok()); ASSERT_EQ(file_request.conjuncts.size(), 1); const auto& localized_expr = file_request.conjuncts[0]->root(); ASSERT_EQ(localized_expr->get_num_children(), 2); @@ -522,7 +573,8 @@ TEST_F(CastTest, ColumnMapperCastsInPredicateLiteralsForTypeMismatch) { reader::FileScanRequest file_request; ASSERT_TRUE( - mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok()); + mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request, &state) + .ok()); ASSERT_EQ(file_request.conjuncts.size(), 1); ASSERT_EQ(projection_ids(file_request.predicate_columns), std::vector({0})); const auto& localized_expr = file_request.conjuncts[0]->root(); @@ -566,7 +618,8 @@ TEST_F(CastTest, ColumnMapperFallsBackToSlotCastWhenInPredicateLiteralRewriteFai reader::FileScanRequest file_request; ASSERT_TRUE( - mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok()); + mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request, &state) + .ok()); ASSERT_EQ(file_request.conjuncts.size(), 1); const auto& localized_expr = file_request.conjuncts[0]->root(); ASSERT_EQ(localized_expr->get_num_children(), 3); @@ -608,7 +661,9 @@ TEST_F(CastTest, ColumnMapperDoesNotLeakRewrittenInPredicateLiteralAcrossSplits) reader::TableColumnMapper int_mapper; ASSERT_TRUE(int_mapper.create_mapping(projected_columns, {}, {int_file_field}).ok()); reader::FileScanRequest int_request; - ASSERT_TRUE(int_mapper.create_scan_request({table_filter}, {}, projected_columns, &int_request) + ASSERT_TRUE(int_mapper + .create_scan_request({table_filter}, {}, projected_columns, &int_request, + &state) .ok()); ASSERT_EQ(int_request.conjuncts.size(), 1); const auto& int_localized_expr = int_request.conjuncts[0]->root(); @@ -626,7 +681,8 @@ TEST_F(CastTest, ColumnMapperDoesNotLeakRewrittenInPredicateLiteralAcrossSplits) ASSERT_TRUE(bigint_mapper.create_mapping(projected_columns, {}, {bigint_file_field}).ok()); reader::FileScanRequest bigint_request; ASSERT_TRUE(bigint_mapper - .create_scan_request({table_filter}, {}, projected_columns, &bigint_request) + .create_scan_request({table_filter}, {}, projected_columns, + &bigint_request, &state) .ok()); ASSERT_EQ(bigint_request.conjuncts.size(), 1); const auto& bigint_localized_expr = bigint_request.conjuncts[0]->root(); @@ -668,7 +724,8 @@ TEST_F(CastTest, ColumnMapperFallsBackToSlotCastWhenLiteralRewriteFails) { reader::FileScanRequest file_request; ASSERT_TRUE( - mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok()); + mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request, &state) + .ok()); ASSERT_EQ(file_request.conjuncts.size(), 1); const auto& localized_expr = file_request.conjuncts[0]->root(); ASSERT_EQ(localized_expr->get_num_children(), 2); @@ -706,7 +763,9 @@ TEST_F(CastTest, ColumnMapperDoesNotLeakRewrittenLiteralAcrossSplits) { reader::TableColumnMapper int_mapper; ASSERT_TRUE(int_mapper.create_mapping(projected_columns, {}, {int_file_field}).ok()); reader::FileScanRequest int_request; - ASSERT_TRUE(int_mapper.create_scan_request({table_filter}, {}, projected_columns, &int_request) + ASSERT_TRUE(int_mapper + .create_scan_request({table_filter}, {}, projected_columns, &int_request, + &state) .ok()); ASSERT_EQ(int_request.conjuncts.size(), 1); const auto& int_localized_expr = int_request.conjuncts[0]->root(); @@ -722,7 +781,8 @@ TEST_F(CastTest, ColumnMapperDoesNotLeakRewrittenLiteralAcrossSplits) { ASSERT_TRUE(bigint_mapper.create_mapping(projected_columns, {}, {bigint_file_field}).ok()); reader::FileScanRequest bigint_request; ASSERT_TRUE(bigint_mapper - .create_scan_request({table_filter}, {}, projected_columns, &bigint_request) + .create_scan_request({table_filter}, {}, projected_columns, + &bigint_request, &state) .ok()); ASSERT_EQ(bigint_request.conjuncts.size(), 1); const auto& bigint_localized_expr = bigint_request.conjuncts[0]->root(); @@ -764,7 +824,8 @@ TEST_F(CastTest, ColumnMapperKeepsExplicitSlotCastInSlotLiteralPredicate) { reader::FileScanRequest file_request; ASSERT_TRUE( - mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok()); + mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request, &state) + .ok()); ASSERT_EQ(file_request.conjuncts.size(), 1); const auto& localized_expr = file_request.conjuncts[0]->root(); ASSERT_EQ(localized_expr->get_num_children(), 2); @@ -804,9 +865,12 @@ TEST_F(CastTest, ColumnMapperDoesNotNestCastFilterAcrossScanRequests) { reader::FileScanRequest first_request; ASSERT_TRUE( - mapper.create_scan_request({table_filter}, {}, projected_columns, &first_request).ok()); + mapper.create_scan_request({table_filter}, {}, projected_columns, &first_request, &state) + .ok()); reader::FileScanRequest second_request; - ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, projected_columns, &second_request) + ASSERT_TRUE(mapper + .create_scan_request({table_filter}, {}, projected_columns, + &second_request, &state) .ok()); ASSERT_EQ(second_request.conjuncts.size(), 1); @@ -841,7 +905,9 @@ TEST_F(CastTest, ColumnMapperRewritesPreviousCastFilterToMatchingSplitType) { reader::TableColumnMapper int_mapper; ASSERT_TRUE(int_mapper.create_mapping(projected_columns, {}, {int_file_field}).ok()); reader::FileScanRequest int_request; - ASSERT_TRUE(int_mapper.create_scan_request({table_filter}, {}, projected_columns, &int_request) + ASSERT_TRUE(int_mapper + .create_scan_request({table_filter}, {}, projected_columns, &int_request, + &state) .ok()); const auto& int_localized_expr = int_request.conjuncts[0]->root(); @@ -857,7 +923,8 @@ TEST_F(CastTest, ColumnMapperRewritesPreviousCastFilterToMatchingSplitType) { ASSERT_TRUE(bigint_mapper.create_mapping(projected_columns, {}, {bigint_file_field}).ok()); reader::FileScanRequest bigint_request; ASSERT_TRUE(bigint_mapper - .create_scan_request({table_filter}, {}, projected_columns, &bigint_request) + .create_scan_request({table_filter}, {}, projected_columns, + &bigint_request, &state) .ok()); const auto& bigint_localized_expr = bigint_request.conjuncts[0]->root(); @@ -907,7 +974,7 @@ TEST_F(CastTest, ColumnMapperKeepsTableSlotIdWhenFileBlockPositionChanges) { table_filter.global_indices = {reader::GlobalIndex(0)}; reader::FileScanRequest first_request; - ASSERT_TRUE(mapper.localize_filters({table_filter}, {}, &first_request).ok()); + ASSERT_TRUE(mapper.localize_filters({table_filter}, {}, &first_request, &state).ok()); ASSERT_EQ(first_request.conjuncts.size(), 1); const auto* first_slot = assert_cast( first_request.conjuncts[0]->root()->children()[0].get()); @@ -918,7 +985,7 @@ TEST_F(CastTest, ColumnMapperKeepsTableSlotIdWhenFileBlockPositionChanges) { second_request.local_positions.emplace(reader::LocalColumnId(9), reader::LocalIndex(0)); second_request.local_positions.emplace(reader::LocalColumnId(10), reader::LocalIndex(1)); second_request.non_predicate_columns.push_back(field_projection(9)); - ASSERT_TRUE(mapper.localize_filters({table_filter}, {}, &second_request).ok()); + ASSERT_TRUE(mapper.localize_filters({table_filter}, {}, &second_request, &state).ok()); ASSERT_EQ(second_request.conjuncts.size(), 1); const auto* second_slot = assert_cast( second_request.conjuncts[0]->root()->children()[0].get());