diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index a997369637dde8..fd6a1dabd4e368 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -146,6 +146,17 @@ message(STATUS "build task executor simulator: ${BUILD_TASK_EXECUTOR_SIMULATOR}" option(BUILD_FILE_CACHE_LRU_TOOL "ON for building file cache lru tool or OFF for not" OFF) message(STATUS "build file cache lru tool: ${BUILD_FILE_CACHE_LRU_TOOL}") +option(ENABLE_PAIMON_CPP "Enable Paimon C++ integration" ON) +set(PAIMON_HOME "" CACHE PATH "Paimon install prefix") + +# Allow env to override when reconfiguring (avoid picking /usr/local). +if (DEFINED ENV{ENABLE_PAIMON_CPP}) + set(ENABLE_PAIMON_CPP "$ENV{ENABLE_PAIMON_CPP}" CACHE BOOL "" FORCE) +endif() +if (DEFINED ENV{PAIMON_HOME} AND NOT PAIMON_HOME) + set(PAIMON_HOME "$ENV{PAIMON_HOME}" CACHE PATH "" FORCE) +endif() + set(CMAKE_SKIP_RPATH TRUE) set(Boost_USE_STATIC_LIBS ON) set(Boost_USE_STATIC_RUNTIME ON) @@ -544,6 +555,10 @@ set(COMMON_THIRDPARTY ${COMMON_THIRDPARTY} ) +if (ENABLE_PAIMON_CPP) + message(STATUS "Paimon C++ enabled: legacy thirdparty static linkage mode") +endif() + if ((ARCH_AMD64 OR ARCH_AARCH64) AND OS_LINUX) add_library(hadoop_hdfs STATIC IMPORTED) set_target_properties(hadoop_hdfs PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/hadoop_hdfs_3_4/native/libhdfs.a) @@ -571,6 +586,13 @@ endif() if (absl_FOUND) set(COMMON_THIRDPARTY ${COMMON_THIRDPARTY} + absl::cord + absl::cord_internal + absl::cordz_functions + absl::cordz_info + absl::cordz_update_scope + absl::cordz_update_tracker + absl::crc_cord_state absl::flags absl::random_random absl::spinlock_wait @@ -596,6 +618,80 @@ if (BUILD_BENCHMARK) ) endif() +set(PAIMON_FACTORY_REGISTRY_LIBS) +set(PAIMON_ARROW_CORE_LIB) +set(PAIMON_ARROW_FILESYSTEM_LIB) +set(PAIMON_ARROW_DATASET_LIB) +set(PAIMON_ARROW_ACERO_LIB) +if (ENABLE_PAIMON_CPP) + set(_paimon_arrow_core_candidates + ${THIRDPARTY_DIR}/paimon-cpp/lib64/paimon_deps/libarrow.a + ${THIRDPARTY_DIR}/lib64/libarrow.a + ${THIRDPARTY_DIR}/lib/libarrow.a + ) + foreach(_paimon_arrow_core_candidate IN LISTS _paimon_arrow_core_candidates) + if (EXISTS "${_paimon_arrow_core_candidate}") + add_library(paimon_arrow_core STATIC IMPORTED) + set_target_properties(paimon_arrow_core PROPERTIES + IMPORTED_LOCATION ${_paimon_arrow_core_candidate}) + set(PAIMON_ARROW_CORE_LIB paimon_arrow_core) + break() + endif() + endforeach() + set(_paimon_arrow_filesystem_candidates + ${THIRDPARTY_DIR}/paimon-cpp/lib64/paimon_deps/libarrow_filesystem.a + ${THIRDPARTY_DIR}/lib64/libarrow_filesystem.a + ${THIRDPARTY_DIR}/lib/libarrow_filesystem.a + ) + foreach(_paimon_arrow_filesystem_candidate IN LISTS _paimon_arrow_filesystem_candidates) + if (EXISTS "${_paimon_arrow_filesystem_candidate}") + add_library(paimon_arrow_filesystem STATIC IMPORTED) + set_target_properties(paimon_arrow_filesystem PROPERTIES + IMPORTED_LOCATION ${_paimon_arrow_filesystem_candidate}) + set(PAIMON_ARROW_FILESYSTEM_LIB paimon_arrow_filesystem) + break() + endif() + endforeach() + set(_paimon_arrow_dataset_candidates + ${THIRDPARTY_DIR}/paimon-cpp/lib64/paimon_deps/libarrow_dataset.a + ${THIRDPARTY_DIR}/lib64/libarrow_dataset.a + ${THIRDPARTY_DIR}/lib/libarrow_dataset.a + ) + foreach(_paimon_arrow_dataset_candidate IN LISTS _paimon_arrow_dataset_candidates) + if (EXISTS "${_paimon_arrow_dataset_candidate}") + add_library(paimon_arrow_dataset STATIC IMPORTED) + set_target_properties(paimon_arrow_dataset PROPERTIES + IMPORTED_LOCATION ${_paimon_arrow_dataset_candidate}) + set(PAIMON_ARROW_DATASET_LIB paimon_arrow_dataset) + break() + endif() + endforeach() + set(_paimon_arrow_acero_candidates + ${THIRDPARTY_DIR}/paimon-cpp/lib64/paimon_deps/libarrow_acero.a + ${THIRDPARTY_DIR}/lib64/libarrow_acero.a + ${THIRDPARTY_DIR}/lib/libarrow_acero.a + ) + foreach(_paimon_arrow_acero_candidate IN LISTS _paimon_arrow_acero_candidates) + if (EXISTS "${_paimon_arrow_acero_candidate}") + add_library(paimon_arrow_acero STATIC IMPORTED) + set_target_properties(paimon_arrow_acero PROPERTIES + IMPORTED_LOCATION ${_paimon_arrow_acero_candidate}) + set(PAIMON_ARROW_ACERO_LIB paimon_arrow_acero) + break() + endif() + endforeach() + if (PAIMON_ARROW_DATASET_LIB) + # paimon_parquet_file_format depends on Arrow Dataset symbols. + # Force-link it only when arrow_dataset is available. + set(PAIMON_FACTORY_REGISTRY_LIBS + paimon_parquet_file_format + ) + list(REMOVE_ITEM COMMON_THIRDPARTY ${PAIMON_FACTORY_REGISTRY_LIBS}) + else() + message(STATUS "Paimon C++: libarrow_dataset.a not found, keep paimon_parquet_file_format as regular static lib") + endif() +endif() + set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${WL_START_GROUP} @@ -613,6 +709,34 @@ set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} clucene-core-static) set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} clucene-shared-static) set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} clucene-contribs-lib) +if (ENABLE_PAIMON_CPP) + if (PAIMON_FACTORY_REGISTRY_LIBS) + set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} + -Wl,--whole-archive + ${PAIMON_FACTORY_REGISTRY_LIBS} + -Wl,--no-whole-archive) + endif() + if (PAIMON_ARROW_CORE_LIB) + set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${PAIMON_ARROW_CORE_LIB}) + endif() + if (PAIMON_ARROW_FILESYSTEM_LIB) + set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${PAIMON_ARROW_FILESYSTEM_LIB}) + endif() + if (PAIMON_ARROW_DATASET_LIB) + set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${PAIMON_ARROW_DATASET_LIB}) + endif() + if (PAIMON_ARROW_ACERO_LIB) + set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${PAIMON_ARROW_ACERO_LIB}) + endif() + + # paimon-cpp internal dependencies (renamed with _paimon suffix) + # These must come after paimon libraries to resolve symbols. + set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} roaring_bitmap_paimon) + set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} xxhash_paimon) + set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} fmt_paimon) + set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} tbb_paimon) +endif() + set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${WL_END_GROUP}) # Add all external dependencies. They should come after the palo libs. diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake index 9bb7b8ba748769..441ebe8dc738ba 100644 --- a/be/cmake/thirdparty.cmake +++ b/be/cmake/thirdparty.cmake @@ -179,3 +179,18 @@ add_thirdparty(icudata LIB64) add_thirdparty(pugixml LIB64) + +if (ENABLE_PAIMON_CPP) + add_thirdparty(paimon LIB64) + add_thirdparty(paimon_parquet_file_format LIB64) + add_thirdparty(paimon_orc_file_format LIB64) + add_thirdparty(paimon_blob_file_format LIB64) + add_thirdparty(paimon_local_file_system LIB64) + add_thirdparty(paimon_file_index LIB64) + add_thirdparty(paimon_global_index LIB64) + + add_thirdparty(roaring_bitmap_paimon LIB64) + add_thirdparty(xxhash_paimon LIB64) + add_thirdparty(fmt_paimon LIB64) + add_thirdparty(tbb_paimon LIB64) +endif() diff --git a/be/src/vec/exec/format/table/paimon_cpp_reader.cpp b/be/src/vec/exec/format/table/paimon_cpp_reader.cpp new file mode 100644 index 00000000000000..756c87c32f0717 --- /dev/null +++ b/be/src/vec/exec/format/table/paimon_cpp_reader.cpp @@ -0,0 +1,336 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "paimon_cpp_reader.h" + +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "paimon/defs.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/read_context.h" +#include "paimon/table/source/table_read.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "util/url_coding.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/exec/format/table/paimon_doris_file_system.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +namespace { +constexpr const char* VALUE_KIND_FIELD = "_VALUE_KIND"; + +} // namespace + +PaimonCppReader::PaimonCppReader(const std::vector& file_slot_descs, + RuntimeState* state, RuntimeProfile* profile, + const TFileRangeDesc& range, + const TFileScanRangeParams* range_params) + : _file_slot_descs(file_slot_descs), + _state(state), + _profile(profile), + _range(range), + _range_params(range_params) { + TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz); + if (range.__isset.table_format_params && + range.table_format_params.__isset.table_level_row_count) { + _remaining_table_level_row_count = range.table_format_params.table_level_row_count; + } else { + _remaining_table_level_row_count = -1; + } +} + +PaimonCppReader::~PaimonCppReader() = default; + +Status PaimonCppReader::init_reader() { + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count >= 0) { + return Status::OK(); + } + return _init_paimon_reader(); +} + +Status PaimonCppReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count >= 0) { + auto rows = std::min(_remaining_table_level_row_count, + (int64_t)_state->query_options().batch_size); + _remaining_table_level_row_count -= rows; + auto mutate_columns = block->mutate_columns(); + for (auto& col : mutate_columns) { + col->resize(rows); + } + block->set_columns(std::move(mutate_columns)); + *read_rows = rows; + *eof = false; + if (_remaining_table_level_row_count == 0) { + *eof = true; + } + return Status::OK(); + } + + if (!_batch_reader) { + return Status::InternalError("paimon-cpp reader is not initialized"); + } + + if (_col_name_to_block_idx.empty()) { + _col_name_to_block_idx = block->get_name_to_pos_map(); + } + + auto batch_result = _batch_reader->NextBatch(); + if (!batch_result.ok()) { + return Status::InternalError("paimon-cpp read batch failed: {}", + batch_result.status().ToString()); + } + auto batch = std::move(batch_result).value(); + if (paimon::BatchReader::IsEofBatch(batch)) { + *read_rows = 0; + *eof = true; + return Status::OK(); + } + + arrow::Result> import_result = + arrow::ImportRecordBatch(batch.first.get(), batch.second.get()); + if (!import_result.ok()) { + return Status::InternalError("failed to import paimon-cpp arrow batch: {}", + import_result.status().message()); + } + + auto record_batch = std::move(import_result).ValueUnsafe(); + const auto num_rows = static_cast(record_batch->num_rows()); + const auto num_columns = record_batch->num_columns(); + for (int c = 0; c < num_columns; ++c) { + const auto& field = record_batch->schema()->field(c); + if (field->name() == VALUE_KIND_FIELD) { + continue; + } + + auto it = _col_name_to_block_idx.find(field->name()); + if (it == _col_name_to_block_idx.end()) { + // Skip columns that are not in the block (e.g., partition columns handled elsewhere) + continue; + } + const vectorized::ColumnWithTypeAndName& column_with_name = + block->get_by_position(it->second); + try { + RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow( + column_with_name.column->assume_mutable_ref(), record_batch->column(c).get(), 0, + num_rows, _ctzz)); + } catch (Exception& e) { + return Status::InternalError("Failed to convert from arrow to block: {}", e.what()); + } + } + + *read_rows = num_rows; + *eof = false; + return Status::OK(); +} + +Status PaimonCppReader::get_columns(std::unordered_map* name_to_type, + std::unordered_set* missing_cols) { + for (const auto& slot : _file_slot_descs) { + name_to_type->emplace(slot->col_name(), slot->type()); + } + return Status::OK(); +} + +Status PaimonCppReader::close() { + if (_batch_reader) { + _batch_reader->Close(); + } + return Status::OK(); +} + +Status PaimonCppReader::_init_paimon_reader() { + register_paimon_doris_file_system(); + RETURN_IF_ERROR(_decode_split(&_split)); + + auto table_path_opt = _resolve_table_path(); + if (!table_path_opt.has_value()) { + return Status::InternalError( + "paimon-cpp missing paimon_table; cannot resolve paimon table root path"); + } + auto options = _build_options(); + auto read_columns = _build_read_columns(); + + // Avoid moving strings across module boundaries to prevent allocator mismatches in ASAN builds. + std::string table_path = table_path_opt.value(); + static std::once_flag options_log_once; + std::call_once(options_log_once, [&]() { + auto has_key = [&](const char* key) { + auto it = options.find(key); + return (it != options.end() && !it->second.empty()) ? "set" : "empty"; + }; + auto value_or = [&](const char* key) { + auto it = options.find(key); + return it != options.end() ? it->second : std::string(""); + }; + LOG(INFO) << "paimon-cpp options summary: table_path=" << table_path + << " AWS_ACCESS_KEY=" << has_key("AWS_ACCESS_KEY") + << " AWS_SECRET_KEY=" << has_key("AWS_SECRET_KEY") + << " AWS_TOKEN=" << has_key("AWS_TOKEN") + << " AWS_ENDPOINT=" << value_or("AWS_ENDPOINT") + << " AWS_REGION=" << value_or("AWS_REGION") + << " use_path_style=" << value_or("use_path_style") + << " fs.oss.endpoint=" << value_or("fs.oss.endpoint") + << " fs.s3a.endpoint=" << value_or("fs.s3a.endpoint"); + }); + paimon::ReadContextBuilder builder(table_path); + if (!read_columns.empty()) { + builder.SetReadSchema(read_columns); + } + if (!options.empty()) { + builder.SetOptions(options); + } + if (_predicate) { + builder.SetPredicate(_predicate); + builder.EnablePredicateFilter(true); + } + + auto context_result = builder.Finish(); + if (!context_result.ok()) { + return Status::InternalError("paimon-cpp build read context failed: {}", + context_result.status().ToString()); + } + auto context = std::move(context_result).value(); + + auto table_read_result = paimon::TableRead::Create(std::move(context)); + if (!table_read_result.ok()) { + return Status::InternalError("paimon-cpp create table read failed: {}", + table_read_result.status().ToString()); + } + auto table_read = std::move(table_read_result).value(); + auto reader_result = table_read->CreateReader(_split); + if (!reader_result.ok()) { + return Status::InternalError("paimon-cpp create reader failed: {}", + reader_result.status().ToString()); + } + _table_read = std::move(table_read); + _batch_reader = std::move(reader_result).value(); + return Status::OK(); +} + +Status PaimonCppReader::_decode_split(std::shared_ptr* split) { + if (!_range.__isset.table_format_params || !_range.table_format_params.__isset.paimon_params || + !_range.table_format_params.paimon_params.__isset.paimon_split) { + return Status::InternalError("paimon-cpp missing paimon_split in scan range"); + } + const auto& encoded_split = _range.table_format_params.paimon_params.paimon_split; + std::string decoded_split; + if (!base64_decode(encoded_split, &decoded_split)) { + return Status::InternalError("paimon-cpp base64 decode paimon_split failed"); + } + auto pool = paimon::GetDefaultPool(); + auto split_result = + paimon::Split::Deserialize(decoded_split.data(), decoded_split.size(), pool); + if (!split_result.ok()) { + return Status::InternalError("paimon-cpp deserialize split failed: {}", + split_result.status().ToString()); + } + *split = std::move(split_result).value(); + return Status::OK(); +} + +std::optional PaimonCppReader::_resolve_table_path() const { + if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params && + _range.table_format_params.paimon_params.__isset.paimon_table && + !_range.table_format_params.paimon_params.paimon_table.empty()) { + return _range.table_format_params.paimon_params.paimon_table; + } + return std::nullopt; +} + +std::vector PaimonCppReader::_build_read_columns() const { + std::vector columns; + columns.reserve(_file_slot_descs.size()); + for (const auto& slot : _file_slot_descs) { + columns.emplace_back(slot->col_name()); + } + return columns; +} + +std::map PaimonCppReader::_build_options() const { + std::map options; + if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params && + _range.table_format_params.paimon_params.__isset.paimon_options) { + options.insert(_range.table_format_params.paimon_params.paimon_options.begin(), + _range.table_format_params.paimon_params.paimon_options.end()); + } + + if (_range_params && _range_params->__isset.properties && !_range_params->properties.empty()) { + for (const auto& kv : _range_params->properties) { + options[kv.first] = kv.second; + } + } else if (_range.__isset.table_format_params && + _range.table_format_params.__isset.paimon_params && + _range.table_format_params.paimon_params.__isset.hadoop_conf) { + for (const auto& kv : _range.table_format_params.paimon_params.hadoop_conf) { + options[kv.first] = kv.second; + } + } + + auto copy_if_missing = [&](const char* from_key, const char* to_key) { + if (options.find(to_key) != options.end()) { + return; + } + auto it = options.find(from_key); + if (it != options.end() && !it->second.empty()) { + options[to_key] = it->second; + } + }; + + // Map common OSS/S3 Hadoop configs to Doris S3 property keys. + copy_if_missing("fs.oss.accessKeyId", "AWS_ACCESS_KEY"); + copy_if_missing("fs.oss.accessKeySecret", "AWS_SECRET_KEY"); + copy_if_missing("fs.oss.sessionToken", "AWS_TOKEN"); + copy_if_missing("fs.oss.endpoint", "AWS_ENDPOINT"); + copy_if_missing("fs.oss.region", "AWS_REGION"); + copy_if_missing("fs.s3a.access.key", "AWS_ACCESS_KEY"); + copy_if_missing("fs.s3a.secret.key", "AWS_SECRET_KEY"); + copy_if_missing("fs.s3a.session.token", "AWS_TOKEN"); + copy_if_missing("fs.s3a.endpoint", "AWS_ENDPOINT"); + copy_if_missing("fs.s3a.region", "AWS_REGION"); + copy_if_missing("fs.s3a.path.style.access", "use_path_style"); + + // FE currently does not pass paimon_options in scan ranges. + // Backfill file.format/manifest.format from split file_format to avoid + // paimon-cpp falling back to default manifest.format=avro. + if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params && + _range.table_format_params.paimon_params.__isset.file_format && + !_range.table_format_params.paimon_params.file_format.empty()) { + const auto& split_file_format = _range.table_format_params.paimon_params.file_format; + auto file_format_it = options.find(paimon::Options::FILE_FORMAT); + if (file_format_it == options.end() || file_format_it->second.empty()) { + options[paimon::Options::FILE_FORMAT] = split_file_format; + } + auto manifest_format_it = options.find(paimon::Options::MANIFEST_FORMAT); + if (manifest_format_it == options.end() || manifest_format_it->second.empty()) { + options[paimon::Options::MANIFEST_FORMAT] = split_file_format; + } + } + + options[paimon::Options::FILE_SYSTEM] = "doris"; + return options; +} + +#include "common/compile_check_end.h" +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_cpp_reader.h b/be/src/vec/exec/format/table/paimon_cpp_reader.h new file mode 100644 index 00000000000000..a1e8cfc19174e1 --- /dev/null +++ b/be/src/vec/exec/format/table/paimon_cpp_reader.h @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cctz/time_zone.h" +#include "common/status.h" +#include "exec/olap_common.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/table/source/split.h" +#include "vec/exec/format/generic_reader.h" + +namespace paimon { +class TableRead; +class Predicate; +} // namespace paimon + +namespace doris { +class RuntimeProfile; +class RuntimeState; +class SlotDescriptor; +} // namespace doris + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +class Block; + +class PaimonCppReader : public GenericReader { + ENABLE_FACTORY_CREATOR(PaimonCppReader); + +public: + PaimonCppReader(const std::vector& file_slot_descs, RuntimeState* state, + RuntimeProfile* profile, const TFileRangeDesc& range, + const TFileScanRangeParams* range_params); + ~PaimonCppReader() override; + + Status init_reader(); + Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + Status get_columns(std::unordered_map* name_to_type, + std::unordered_set* missing_cols) override; + Status close() override; + void set_predicate(std::shared_ptr predicate) { + _predicate = std::move(predicate); + } + +private: + Status _init_paimon_reader(); + Status _decode_split(std::shared_ptr* split); + // Resolve paimon table root path for schema/manifest lookup. + std::optional _resolve_table_path() const; + std::vector _build_read_columns() const; + std::map _build_options() const; + + const std::vector& _file_slot_descs; + RuntimeState* _state = nullptr; + [[maybe_unused]] RuntimeProfile* _profile = nullptr; + const TFileRangeDesc& _range; + const TFileScanRangeParams* _range_params = nullptr; + + std::shared_ptr _split; + std::unique_ptr _table_read; + std::unique_ptr _batch_reader; + std::shared_ptr _predicate; + + std::unordered_map _col_name_to_block_idx; + int64_t _remaining_table_level_row_count = -1; + cctz::time_zone _ctzz; +}; + +#include "common/compile_check_end.h" +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_doris_file_system.cpp b/be/src/vec/exec/format/table/paimon_doris_file_system.cpp new file mode 100644 index 00000000000000..abe6fb0c7cbb63 --- /dev/null +++ b/be/src/vec/exec/format/table/paimon_doris_file_system.cpp @@ -0,0 +1,664 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "paimon_doris_file_system.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "gen_cpp/Types_types.h" +#include "io/file_factory.h" +#include "io/fs/file_reader.h" +#include "io/fs/file_system.h" +#include "io/fs/file_writer.h" +#include "io/fs/local_file_system.h" +#include "paimon/factories/factory.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/file_system_factory.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +struct ParsedUri { + std::string scheme; + std::string authority; +}; + +std::string to_lower(std::string value) { + std::ranges::transform(value, value.begin(), + [](unsigned char c) { return static_cast(std::tolower(c)); }); + return value; +} + +ParsedUri parse_uri(const std::string& path) { + ParsedUri parsed; + size_t scheme_pos = path.find("://"); + size_t delim_len = 3; + if (scheme_pos == std::string::npos) { + scheme_pos = path.find(":/"); + delim_len = 2; + } + if (scheme_pos == std::string::npos || scheme_pos == 0) { + return parsed; + } + parsed.scheme = to_lower(path.substr(0, scheme_pos)); + size_t authority_start = scheme_pos + delim_len; + if (authority_start >= path.size() || path[authority_start] == '/') { + return parsed; + } + size_t next_slash = path.find('/', authority_start); + if (next_slash == std::string::npos) { + parsed.authority = path.substr(authority_start); + } else { + parsed.authority = path.substr(authority_start, next_slash - authority_start); + } + return parsed; +} + +bool is_s3_scheme(const std::string& scheme) { + return scheme == "s3" || scheme == "s3a" || scheme == "s3n" || scheme == "oss" || + scheme == "obs" || scheme == "cos" || scheme == "cosn" || scheme == "gs" || + scheme == "abfs" || scheme == "abfss" || scheme == "wasb" || scheme == "wasbs"; +} + +bool is_hdfs_scheme(const std::string& scheme) { + return scheme == "hdfs" || scheme == "viewfs" || scheme == "local"; +} + +bool is_http_scheme(const std::string& scheme) { + return scheme == "http" || scheme == "https"; +} + +doris::TFileType::type map_scheme_to_file_type(const std::string& scheme) { + if (scheme.empty()) { + return doris::TFileType::FILE_HDFS; + } + if (scheme == "file") { + return doris::TFileType::FILE_LOCAL; + } + if (is_hdfs_scheme(scheme)) { + return doris::TFileType::FILE_HDFS; + } + if (is_s3_scheme(scheme)) { + return doris::TFileType::FILE_S3; + } + if (is_http_scheme(scheme)) { + return doris::TFileType::FILE_HTTP; + } + if (scheme == "ofs" || scheme == "gfs" || scheme == "jfs") { + return doris::TFileType::FILE_BROKER; + } + return doris::TFileType::FILE_HDFS; +} + +std::string replace_scheme(const std::string& path, const std::string& scheme) { + size_t scheme_pos = path.find("://"); + size_t delim_len = 3; + if (scheme_pos == std::string::npos) { + scheme_pos = path.find(":/"); + delim_len = 2; + } + if (scheme_pos == std::string::npos) { + return path; + } + return scheme + "://" + path.substr(scheme_pos + delim_len); +} + +std::string normalize_local_path(const std::string& path) { + if (!path.starts_with("file:")) { + return path; + } + constexpr size_t file_prefix_len = 5; + size_t start = file_prefix_len; + if (path.compare(start, 2, "//") == 0 && path.size() - start > 2) { + size_t next_slash = path.find('/', start + 2); + if (next_slash == std::string::npos) { + return ""; + } + start = next_slash; + } + return path.substr(start); +} + +std::string normalize_path_for_type(const std::string& path, const std::string& scheme, + doris::TFileType::type type) { + if (type == doris::TFileType::FILE_LOCAL) { + return normalize_local_path(path); + } + if (type == doris::TFileType::FILE_S3 && scheme != "s3" && !is_http_scheme(scheme)) { + return replace_scheme(path, "s3"); + } + return path; +} + +std::string build_fs_cache_key(doris::TFileType::type type, const ParsedUri& uri, + const std::string& default_fs_name) { + switch (type) { + case doris::TFileType::FILE_LOCAL: + return "local"; + case doris::TFileType::FILE_S3: + return "s3://" + uri.authority; + case doris::TFileType::FILE_HTTP: + return "http://" + uri.authority; + case doris::TFileType::FILE_BROKER: + return "broker"; + case doris::TFileType::FILE_HDFS: + default: + if (!uri.scheme.empty() || !uri.authority.empty()) { + return uri.scheme + "://" + uri.authority; + } + return default_fs_name; + } +} + +paimon::Status to_paimon_status(const doris::Status& status) { + if (status.ok()) { + return paimon::Status::OK(); + } + switch (status.code()) { + case doris::ErrorCode::NOT_FOUND: + case doris::ErrorCode::DIR_NOT_EXIST: + return paimon::Status::NotExist(status.to_string()); + case doris::ErrorCode::ALREADY_EXIST: + case doris::ErrorCode::FILE_ALREADY_EXIST: + return paimon::Status::Exist(status.to_string()); + case doris::ErrorCode::INVALID_ARGUMENT: + case doris::ErrorCode::INVALID_INPUT_SYNTAX: + return paimon::Status::Invalid(status.to_string()); + case doris::ErrorCode::NOT_IMPLEMENTED_ERROR: + return paimon::Status::NotImplemented(status.to_string()); + default: + return paimon::Status::IOError(status.to_string()); + } +} + +std::string join_path(const std::string& base, const std::string& child) { + if (base.empty()) { + return child; + } + if (base.back() == '/') { + return base + child; + } + return base + "/" + child; +} + +std::string parent_path_no_scheme(const std::string& path) { + if (path.empty()) { + return ""; + } + size_t end = path.size(); + while (end > 1 && path[end - 1] == '/') { + --end; + } + size_t pos = path.rfind('/', end - 1); + if (pos == std::string::npos) { + return ""; + } + if (pos == 0) { + return "/"; + } + return path.substr(0, pos); +} + +std::string parent_path(const std::string& path) { + ParsedUri uri = parse_uri(path); + if (uri.scheme.empty()) { + return parent_path_no_scheme(path); + } + size_t scheme_pos = path.find("://"); + size_t delim_len = 3; + if (scheme_pos == std::string::npos) { + scheme_pos = path.find(":/"); + delim_len = 2; + } + if (scheme_pos == std::string::npos) { + return parent_path_no_scheme(path); + } + size_t start = scheme_pos + delim_len; + size_t slash = path.find('/', start); + if (slash == std::string::npos) { + return ""; + } + std::string path_part = path.substr(slash); + std::string parent_part = parent_path_no_scheme(path_part); + if (parent_part.empty()) { + return ""; + } + std::string prefix = uri.scheme + "://"; + if (!uri.authority.empty()) { + prefix += uri.authority; + } + return prefix + parent_part; +} + +class DorisInputStream : public InputStream { +public: + DorisInputStream(doris::io::FileReaderSPtr reader, std::string path) + : reader_(std::move(reader)), path_(std::move(path)) {} + + Status Seek(int64_t offset, SeekOrigin origin) override { + int64_t target = 0; + if (origin == SeekOrigin::FS_SEEK_SET) { + target = offset; + } else if (origin == SeekOrigin::FS_SEEK_CUR) { + target = position_ + offset; + } else if (origin == SeekOrigin::FS_SEEK_END) { + target = static_cast(reader_->size()) + offset; + } else { + return Status::Invalid("unknown seek origin"); + } + if (target < 0) { + return Status::Invalid("seek position is negative"); + } + position_ = target; + return Status::OK(); + } + + Result GetPos() const override { return position_; } + + Result Read(char* buffer, uint32_t size) override { + size_t bytes_read = 0; + doris::Status status = reader_->read_at(position_, doris::Slice(buffer, size), &bytes_read); + if (!status.ok()) { + return to_paimon_status(status); + } + position_ += static_cast(bytes_read); + return static_cast(bytes_read); + } + + Result Read(char* buffer, uint32_t size, uint64_t offset) override { + size_t bytes_read = 0; + doris::Status status = reader_->read_at(offset, doris::Slice(buffer, size), &bytes_read); + if (!status.ok()) { + return to_paimon_status(status); + } + return static_cast(bytes_read); + } + + void ReadAsync(char* buffer, uint32_t size, uint64_t offset, + std::function&& callback) override { + Result result = Read(buffer, size, offset); + Status status = Status::OK(); + if (!result.ok()) { + status = result.status(); + } + callback(status); + } + + Result GetUri() const override { return path_; } + + Result Length() const override { return static_cast(reader_->size()); } + + Status Close() override { return to_paimon_status(reader_->close()); } + +private: + doris::io::FileReaderSPtr reader_; + std::string path_; + int64_t position_ = 0; +}; + +class DorisOutputStream : public OutputStream { +public: + DorisOutputStream(doris::io::FileWriterPtr writer, std::string path) + : writer_(std::move(writer)), path_(std::move(path)) {} + + Result Write(const char* buffer, uint32_t size) override { + doris::Status status = writer_->append(doris::Slice(buffer, size)); + if (!status.ok()) { + return to_paimon_status(status); + } + return static_cast(size); + } + + Status Flush() override { return Status::OK(); } + + Result GetPos() const override { + return static_cast(writer_->bytes_appended()); + } + + Result GetUri() const override { return path_; } + + Status Close() override { return to_paimon_status(writer_->close()); } + +private: + doris::io::FileWriterPtr writer_; + std::string path_; +}; + +class DorisBasicFileStatus : public BasicFileStatus { +public: + DorisBasicFileStatus(std::string path, bool is_dir) : path_(std::move(path)), is_dir_(is_dir) {} + + bool IsDir() const override { return is_dir_; } + std::string GetPath() const override { return path_; } + +private: + std::string path_; + bool is_dir_; +}; + +class DorisFileStatus : public FileStatus { +public: + DorisFileStatus(std::string path, bool is_dir, uint64_t length, int64_t mtime) + : path_(std::move(path)), is_dir_(is_dir), length_(length), mtime_(mtime) {} + + uint64_t GetLen() const override { return length_; } + bool IsDir() const override { return is_dir_; } + std::string GetPath() const override { return path_; } + int64_t GetModificationTime() const override { return mtime_; } + +private: + std::string path_; + bool is_dir_; + uint64_t length_; + int64_t mtime_; +}; + +class DorisFileSystem : public FileSystem { +public: + explicit DorisFileSystem(std::map options) + : options_(std::move(options)) { + auto it = options_.find("fs.defaultFS"); + if (it != options_.end()) { + default_fs_name_ = it->second; + } + } + + Result> Open(const std::string& path) const override { + PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path)); + auto& fs = resolved.first; + auto& normalized_path = resolved.second; + doris::io::FileReaderSPtr reader; + doris::io::FileReaderOptions reader_options = doris::io::FileReaderOptions::DEFAULT; + doris::Status status = fs->open_file(normalized_path, &reader, &reader_options); + if (!status.ok()) { + return to_paimon_status(status); + } + return std::make_unique(std::move(reader), normalized_path); + } + + Result> Create(const std::string& path, + bool overwrite) const override { + PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path)); + auto& fs = resolved.first; + auto& normalized_path = resolved.second; + if (!overwrite) { + bool exists = false; + doris::Status exists_status = fs->exists(normalized_path, &exists); + if (!exists_status.ok()) { + return to_paimon_status(exists_status); + } + if (exists) { + return Status::Exist("file already exists: ", normalized_path); + } + } + std::string parent = parent_path(normalized_path); + if (!parent.empty()) { + doris::Status mkdir_status = fs->create_directory(parent); + if (!mkdir_status.ok()) { + return to_paimon_status(mkdir_status); + } + } + doris::io::FileWriterPtr writer; + doris::Status status = fs->create_file(normalized_path, &writer); + if (!status.ok()) { + return to_paimon_status(status); + } + return std::make_unique(std::move(writer), normalized_path); + } + + Status Mkdirs(const std::string& path) const override { + PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path)); + doris::Status status = resolved.first->create_directory(resolved.second); + return to_paimon_status(status); + } + + Status Rename(const std::string& src, const std::string& dst) const override { + PAIMON_ASSIGN_OR_RAISE(auto src_resolved, resolve_path(src)); + PAIMON_ASSIGN_OR_RAISE(auto dst_resolved, resolve_path(dst)); + doris::Status status = src_resolved.first->rename(src_resolved.second, dst_resolved.second); + return to_paimon_status(status); + } + + Status Delete(const std::string& path, bool recursive = true) const override { + PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path)); + bool exists = false; + doris::Status exists_status = resolved.first->exists(resolved.second, &exists); + if (!exists_status.ok()) { + return to_paimon_status(exists_status); + } + if (!exists) { + return Status::OK(); + } + int64_t size = 0; + doris::Status size_status = resolved.first->file_size(resolved.second, &size); + if (size_status.ok()) { + return to_paimon_status(resolved.first->delete_file(resolved.second)); + } + if (recursive) { + return to_paimon_status(resolved.first->delete_directory(resolved.second)); + } + return to_paimon_status(size_status); + } + + Result> GetFileStatus(const std::string& path) const override { + ParsedUri uri = parse_uri(path); + doris::TFileType::type type = map_scheme_to_file_type(uri.scheme); + PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path)); + bool exists = false; + doris::Status exists_status = resolved.first->exists(resolved.second, &exists); + if (!exists_status.ok()) { + return to_paimon_status(exists_status); + } + if (!exists) { + if (type != doris::TFileType::FILE_S3) { + return Status::NotExist("path not exists: ", resolved.second); + } + std::vector files; + bool list_exists = false; + doris::Status list_status = + resolved.first->list(resolved.second, false, &files, &list_exists); + if (!list_status.ok()) { + return to_paimon_status(list_status); + } + if (!list_exists && files.empty()) { + return Status::NotExist("path not exists: ", resolved.second); + } + return std::make_unique(resolved.second, true, 0, 0); + } + int64_t size = 0; + doris::Status size_status = resolved.first->file_size(resolved.second, &size); + if (size_status.ok()) { + return std::make_unique(resolved.second, false, + static_cast(size), 0); + } + std::vector files; + bool list_exists = false; + doris::Status list_status = + resolved.first->list(resolved.second, false, &files, &list_exists); + if (!list_status.ok()) { + return to_paimon_status(list_status); + } + if (!list_exists && files.empty()) { + return Status::NotExist("path not exists: ", resolved.second); + } + return std::make_unique(resolved.second, true, 0, 0); + } + + Status ListDir(const std::string& directory, + std::vector>* status_list) const override { + PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(directory)); + auto file_status = GetFileStatus(directory); + if (file_status.ok() && !file_status.value()->IsDir()) { + return Status::IOError("path is not a directory: ", directory); + } + std::vector files; + bool exists = false; + doris::Status status = resolved.first->list(resolved.second, false, &files, &exists); + if (!status.ok()) { + return to_paimon_status(status); + } + if (!exists) { + return Status::OK(); + } + status_list->reserve(status_list->size() + files.size()); + for (const auto& file : files) { + status_list->emplace_back(std::make_unique( + join_path(resolved.second, file.file_name), !file.is_file)); + } + return Status::OK(); + } + + Status ListFileStatus(const std::string& path, + std::vector>* status_list) const override { + PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path)); + auto self_status = GetFileStatus(path); + if (!self_status.ok()) { + if (self_status.status().IsNotExist()) { + return Status::OK(); + } + return self_status.status(); + } + if (!self_status.value()->IsDir()) { + status_list->emplace_back(std::move(self_status).value()); + return Status::OK(); + } + std::vector files; + bool exists = false; + doris::Status list_status = resolved.first->list(resolved.second, false, &files, &exists); + if (!list_status.ok()) { + return to_paimon_status(list_status); + } + if (!exists) { + return Status::OK(); + } + status_list->reserve(status_list->size() + files.size()); + for (const auto& file : files) { + uint64_t length = file.is_file ? static_cast(file.file_size) : 0; + status_list->emplace_back(std::make_unique( + join_path(resolved.second, file.file_name), !file.is_file, length, 0)); + } + return Status::OK(); + } + + Result Exists(const std::string& path) const override { + ParsedUri uri = parse_uri(path); + doris::TFileType::type type = map_scheme_to_file_type(uri.scheme); + PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path)); + bool exists = false; + doris::Status status = resolved.first->exists(resolved.second, &exists); + if (!status.ok()) { + return to_paimon_status(status); + } + if (!exists && type == doris::TFileType::FILE_S3) { + std::vector files; + bool list_exists = false; + doris::Status list_status = + resolved.first->list(resolved.second, false, &files, &list_exists); + if (!list_status.ok()) { + return to_paimon_status(list_status); + } + return list_exists || !files.empty(); + } + return exists; + } + +private: + Result> resolve_path( + const std::string& path) const { + auto uri = parse_uri(path); + doris::TFileType::type type = map_scheme_to_file_type(uri.scheme); + std::string normalized_path = normalize_path_for_type(path, uri.scheme, type); + if (type == doris::TFileType::FILE_LOCAL) { + doris::io::FileSystemSPtr fs = doris::io::global_local_filesystem(); + return std::make_pair(std::move(fs), normalized_path); + } + std::string fs_key = build_fs_cache_key(type, uri, default_fs_name_); + { + std::lock_guard lock(fs_lock_); + auto it = fs_cache_.find(fs_key); + if (it != fs_cache_.end()) { + return std::make_pair(it->second, normalized_path); + } + } + doris::io::FSPropertiesRef fs_properties(type); + const std::map* properties = &options_; + std::map properties_override; + if (type == doris::TFileType::FILE_HTTP && !options_.contains("uri") && + !uri.scheme.empty()) { + properties_override = options_; + properties_override["uri"] = uri.scheme + "://" + uri.authority; + properties = &properties_override; + } + fs_properties.properties = properties; + if (!broker_addresses_.empty()) { + fs_properties.broker_addresses = &broker_addresses_; + } + doris::io::FileDescription file_description = { + .path = normalized_path, .file_size = -1, .mtime = 0, .fs_name = default_fs_name_}; + auto fs_result = doris::FileFactory::create_fs(fs_properties, file_description); + if (!fs_result.has_value()) { + return to_paimon_status(fs_result.error()); + } + doris::io::FileSystemSPtr fs = std::move(fs_result).value(); + { + std::lock_guard lock(fs_lock_); + fs_cache_.emplace(std::move(fs_key), fs); + } + return std::make_pair(std::move(fs), std::move(normalized_path)); + } + + std::map options_; + std::vector broker_addresses_; + std::string default_fs_name_; + mutable std::mutex fs_lock_; + mutable std::unordered_map fs_cache_; +}; + +class DorisFileSystemFactory : public FileSystemFactory { +public: + static const char IDENTIFIER[]; + + const char* Identifier() const override { return IDENTIFIER; } + + Result> Create( + const std::string& path, + const std::map& options) const override { + return std::make_unique(options); + } +}; + +const char DorisFileSystemFactory::IDENTIFIER[] = "doris"; + +REGISTER_PAIMON_FACTORY(DorisFileSystemFactory); + +} // namespace paimon + +namespace doris::vectorized { + +void register_paimon_doris_file_system() {} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_doris_file_system.h b/be/src/vec/exec/format/table/paimon_doris_file_system.h new file mode 100644 index 00000000000000..22552c6eb6ff02 --- /dev/null +++ b/be/src/vec/exec/format/table/paimon_doris_file_system.h @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +namespace doris::vectorized { + +// Force-link helper so the paimon-cpp file system factory registration is kept. +void register_paimon_doris_file_system(); + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/table/paimon_predicate_converter.cpp b/be/src/vec/exec/format/table/paimon_predicate_converter.cpp new file mode 100644 index 00000000000000..6c8251ddb431cc --- /dev/null +++ b/be/src/vec/exec/format/table/paimon_predicate_converter.cpp @@ -0,0 +1,659 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/table/paimon_predicate_converter.h" + +#include +#include +#include + +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/predicate/predicate_builder.h" +#include "runtime/decimalv2_value.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "util/timezone_utils.h" +#include "vec/columns/column_const.h" +#include "vec/columns/column_nullable.h" +#include "vec/core/field.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/exprs/vcompound_pred.h" +#include "vec/exprs/vdirect_in_predicate.h" +#include "vec/exprs/vectorized_fn_call.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vin_predicate.h" +#include "vec/exprs/vliteral.h" +#include "vec/exprs/vslot_ref.h" +#include "vec/runtime/timestamptz_value.h" +#include "vec/runtime/vdatetime_value.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +PaimonPredicateConverter::PaimonPredicateConverter( + const std::vector& file_slot_descs, RuntimeState* state) + : _state(state) { + _field_index_by_name.reserve(file_slot_descs.size()); + for (size_t i = 0; i < file_slot_descs.size(); ++i) { + const auto& name = file_slot_descs[i]->col_name(); + auto normalized = _normalize_name(name); + if (_field_index_by_name.find(normalized) == _field_index_by_name.end()) { + _field_index_by_name.emplace(std::move(normalized), static_cast(i)); + } + } + + if (!TimezoneUtils::find_cctz_time_zone("GMT", _gmt_tz)) { + TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _gmt_tz); + } +} + +std::shared_ptr PaimonPredicateConverter::build( + const VExprContextSPtrs& conjuncts) { + std::vector> predicates; + predicates.reserve(conjuncts.size()); + for (const auto& conjunct : conjuncts) { + if (!conjunct || !conjunct->root()) { + continue; + } + auto root = conjunct->root(); + if (root->is_rf_wrapper()) { + if (auto impl = root->get_impl()) { + root = impl; + } + } + auto predicate = _convert_expr(root); + if (predicate) { + predicates.emplace_back(std::move(predicate)); + } + } + + if (predicates.empty()) { + return nullptr; + } + if (predicates.size() == 1) { + return predicates.front(); + } + auto and_result = paimon::PredicateBuilder::And(predicates); + if (!and_result.ok()) { + return nullptr; + } + return std::move(and_result).value(); +} + +std::shared_ptr PaimonPredicateConverter::_convert_expr(const VExprSPtr& expr) { + if (!expr) { + return nullptr; + } + + auto uncast = VExpr::expr_without_cast(expr); + + if (auto* direct_in = dynamic_cast(uncast.get())) { + VExprSPtr in_expr; + if (direct_in->get_slot_in_expr(in_expr)) { + return _convert_in(in_expr); + } + return nullptr; + } + + if (dynamic_cast(uncast.get()) != nullptr) { + return _convert_in(uncast); + } + + switch (uncast->op()) { + case TExprOpcode::COMPOUND_AND: + case TExprOpcode::COMPOUND_OR: + return _convert_compound(uncast); + case TExprOpcode::COMPOUND_NOT: + return nullptr; + case TExprOpcode::EQ: + case TExprOpcode::EQ_FOR_NULL: + case TExprOpcode::NE: + case TExprOpcode::GE: + case TExprOpcode::GT: + case TExprOpcode::LE: + case TExprOpcode::LT: + return _convert_binary(uncast); + default: + break; + } + + if (auto* fn = dynamic_cast(uncast.get())) { + auto fn_name = _normalize_name(fn->function_name()); + if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") { + return _convert_is_null(uncast, fn_name); + } + if (fn_name == "like") { + return _convert_like_prefix(uncast); + } + } + + return nullptr; +} + +std::shared_ptr PaimonPredicateConverter::_convert_compound( + const VExprSPtr& expr) { + if (!expr || expr->get_num_children() != 2) { + return nullptr; + } + auto left = _convert_expr(expr->get_child(0)); + if (!left) { + return nullptr; + } + auto right = _convert_expr(expr->get_child(1)); + if (!right) { + return nullptr; + } + + if (expr->op() == TExprOpcode::COMPOUND_AND) { + auto and_result = paimon::PredicateBuilder::And({left, right}); + return and_result.ok() ? std::move(and_result).value() : nullptr; + } + if (expr->op() == TExprOpcode::COMPOUND_OR) { + auto or_result = paimon::PredicateBuilder::Or({left, right}); + return or_result.ok() ? std::move(or_result).value() : nullptr; + } + return nullptr; +} + +std::shared_ptr PaimonPredicateConverter::_convert_in(const VExprSPtr& expr) { + auto* in_pred = dynamic_cast(expr.get()); + if (!in_pred || expr->get_num_children() < 2) { + return nullptr; + } + auto field_meta = _resolve_field(expr->get_child(0)); + if (!field_meta) { + return nullptr; + } + + std::vector literals; + literals.reserve(expr->get_num_children() - 1); + for (uint16_t i = 1; i < expr->get_num_children(); ++i) { + auto literal = _convert_literal(expr->get_child(i), *field_meta->slot_desc, + field_meta->field_type); + if (!literal) { + return nullptr; + } + literals.emplace_back(std::move(*literal)); + } + + if (literals.empty()) { + return nullptr; + } + if (in_pred->is_not_in()) { + return paimon::PredicateBuilder::NotIn(field_meta->index, field_meta->slot_desc->col_name(), + field_meta->field_type, literals); + } + return paimon::PredicateBuilder::In(field_meta->index, field_meta->slot_desc->col_name(), + field_meta->field_type, literals); +} + +std::shared_ptr PaimonPredicateConverter::_convert_binary( + const VExprSPtr& expr) { + if (!expr || expr->get_num_children() != 2) { + return nullptr; + } + auto field_meta = _resolve_field(expr->get_child(0)); + if (!field_meta) { + return nullptr; + } + + if (expr->op() == TExprOpcode::EQ_FOR_NULL) { + return paimon::PredicateBuilder::IsNull( + field_meta->index, field_meta->slot_desc->col_name(), field_meta->field_type); + } + + auto literal = + _convert_literal(expr->get_child(1), *field_meta->slot_desc, field_meta->field_type); + if (!literal) { + return nullptr; + } + + switch (expr->op()) { + case TExprOpcode::EQ: + return paimon::PredicateBuilder::Equal(field_meta->index, field_meta->slot_desc->col_name(), + field_meta->field_type, *literal); + case TExprOpcode::NE: + return paimon::PredicateBuilder::NotEqual(field_meta->index, + field_meta->slot_desc->col_name(), + field_meta->field_type, *literal); + case TExprOpcode::GE: + return paimon::PredicateBuilder::GreaterOrEqual(field_meta->index, + field_meta->slot_desc->col_name(), + field_meta->field_type, *literal); + case TExprOpcode::GT: + return paimon::PredicateBuilder::GreaterThan(field_meta->index, + field_meta->slot_desc->col_name(), + field_meta->field_type, *literal); + case TExprOpcode::LE: + return paimon::PredicateBuilder::LessOrEqual(field_meta->index, + field_meta->slot_desc->col_name(), + field_meta->field_type, *literal); + case TExprOpcode::LT: + return paimon::PredicateBuilder::LessThan(field_meta->index, + field_meta->slot_desc->col_name(), + field_meta->field_type, *literal); + default: + break; + } + return nullptr; +} + +std::shared_ptr PaimonPredicateConverter::_convert_is_null( + const VExprSPtr& expr, const std::string& fn_name) { + if (!expr || expr->get_num_children() != 1) { + return nullptr; + } + auto field_meta = _resolve_field(expr->get_child(0)); + if (!field_meta) { + return nullptr; + } + if (fn_name == "is_not_null_pred") { + return paimon::PredicateBuilder::IsNotNull( + field_meta->index, field_meta->slot_desc->col_name(), field_meta->field_type); + } + return paimon::PredicateBuilder::IsNull(field_meta->index, field_meta->slot_desc->col_name(), + field_meta->field_type); +} + +std::shared_ptr PaimonPredicateConverter::_convert_like_prefix( + const VExprSPtr& expr) { + if (!expr || expr->get_num_children() != 2) { + return nullptr; + } + auto field_meta = _resolve_field(expr->get_child(0)); + if (!field_meta || field_meta->field_type != paimon::FieldType::STRING) { + return nullptr; + } + + auto pattern_opt = _extract_string_literal(expr->get_child(1)); + if (!pattern_opt) { + return nullptr; + } + const std::string& pattern = *pattern_opt; + if (!pattern.empty() && pattern.front() == '%') { + return nullptr; + } + if (pattern.empty() || pattern.back() != '%') { + return nullptr; + } + + std::string prefix = pattern.substr(0, pattern.size() - 1); + paimon::Literal lower_literal(paimon::FieldType::STRING, prefix.data(), prefix.size()); + auto lower_pred = paimon::PredicateBuilder::GreaterOrEqual( + field_meta->index, field_meta->slot_desc->col_name(), field_meta->field_type, + lower_literal); + + auto upper_prefix = _next_prefix(prefix); + if (!upper_prefix) { + return lower_pred; + } + + paimon::Literal upper_literal(paimon::FieldType::STRING, upper_prefix->data(), + upper_prefix->size()); + auto upper_pred = + paimon::PredicateBuilder::LessThan(field_meta->index, field_meta->slot_desc->col_name(), + field_meta->field_type, upper_literal); + auto and_result = paimon::PredicateBuilder::And({lower_pred, upper_pred}); + return and_result.ok() ? std::move(and_result).value() : nullptr; +} + +std::optional PaimonPredicateConverter::_resolve_field( + const VExprSPtr& expr) const { + if (!_state || !expr) { + return std::nullopt; + } + auto slot_expr = VExpr::expr_without_cast(expr); + auto* slot_ref = dynamic_cast(slot_expr.get()); + if (!slot_ref) { + return std::nullopt; + } + auto* slot_desc = _state->desc_tbl().get_slot_descriptor(slot_ref->slot_id()); + if (!slot_desc) { + return std::nullopt; + } + auto normalized = _normalize_name(slot_desc->col_name()); + auto it = _field_index_by_name.find(normalized); + if (it == _field_index_by_name.end()) { + return std::nullopt; + } + auto slot_type = slot_desc->type(); + auto field_type = + _to_paimon_field_type(slot_type->get_primitive_type(), slot_type->get_precision()); + if (!field_type) { + return std::nullopt; + } + return FieldMeta {it->second, *field_type, slot_desc}; +} + +std::optional PaimonPredicateConverter::_convert_literal( + const VExprSPtr& expr, const SlotDescriptor& slot_desc, + paimon::FieldType field_type) const { + auto literal_expr = VExpr::expr_without_cast(expr); + auto* literal = dynamic_cast(literal_expr.get()); + if (!literal) { + return std::nullopt; + } + + auto literal_type = remove_nullable(literal->get_data_type()); + PrimitiveType literal_primitive = literal_type->get_primitive_type(); + PrimitiveType slot_primitive = slot_desc.type()->get_primitive_type(); + + ColumnPtr col = literal->get_column_ptr()->convert_to_full_column_if_const(); + if (const auto* nullable = check_and_get_column(*col)) { + if (nullable->is_null_at(0)) { + return std::nullopt; + } + col = nullable->get_nested_column_ptr(); + } + + Field field; + col->get(0, field); + + switch (slot_primitive) { + case TYPE_BOOLEAN: { + if (literal_primitive != TYPE_BOOLEAN) { + return std::nullopt; + } + return paimon::Literal(static_cast(field.get())); + } + case TYPE_TINYINT: + case TYPE_SMALLINT: + case TYPE_INT: + case TYPE_BIGINT: { + if (!_is_integer_type(literal_primitive)) { + return std::nullopt; + } + int64_t value = 0; + switch (literal_primitive) { + case TYPE_TINYINT: + value = field.get(); + break; + case TYPE_SMALLINT: + value = field.get(); + break; + case TYPE_INT: + value = field.get(); + break; + case TYPE_BIGINT: + value = field.get(); + break; + default: + return std::nullopt; + } + if (slot_primitive == TYPE_TINYINT) { + return paimon::Literal(static_cast(value)); + } + if (slot_primitive == TYPE_SMALLINT) { + return paimon::Literal(static_cast(value)); + } + if (slot_primitive == TYPE_INT) { + return paimon::Literal(static_cast(value)); + } + return paimon::Literal(static_cast(value)); + } + case TYPE_DOUBLE: { + if (literal_primitive != TYPE_DOUBLE && literal_primitive != TYPE_FLOAT) { + return std::nullopt; + } + double value = 0; + if (literal_primitive == TYPE_FLOAT) { + value = static_cast(field.get()); + } else { + value = field.get(); + } + return paimon::Literal(value); + } + case TYPE_DATE: + case TYPE_DATEV2: { + if (!_is_date_type(literal_primitive)) { + return std::nullopt; + } + int64_t seconds = 0; + if (literal_primitive == TYPE_DATE) { + const auto& dt = field.get(); + if (!dt.is_valid_date()) { + return std::nullopt; + } + dt.unix_timestamp(&seconds, _gmt_tz); + } else if (literal_primitive == TYPE_DATEV2) { + const auto& dt = field.get(); + if (!dt.is_valid_date()) { + return std::nullopt; + } + dt.unix_timestamp(&seconds, _gmt_tz); + } + int32_t days = _seconds_to_days(seconds); + return paimon::Literal(paimon::FieldType::DATE, days); + } + case TYPE_DATETIME: + case TYPE_DATETIMEV2: { + if (!_is_datetime_type(literal_primitive)) { + return std::nullopt; + } + if (literal_primitive == TYPE_DATETIME) { + const auto& dt = field.get(); + if (!dt.is_valid_date()) { + return std::nullopt; + } + int64_t seconds = 0; + dt.unix_timestamp(&seconds, _gmt_tz); + return paimon::Literal(paimon::Timestamp::FromEpochMillis(seconds * 1000)); + } + std::pair ts; + const auto& dt = field.get(); + if (!dt.is_valid_date()) { + return std::nullopt; + } + dt.unix_timestamp(&ts, _gmt_tz); + int64_t millis = ts.first * 1000 + ts.second / 1000; + return paimon::Literal(paimon::Timestamp::FromEpochMillis(millis)); + } + case TYPE_VARCHAR: + case TYPE_STRING: { + if (!_is_string_type(literal_primitive)) { + return std::nullopt; + } + const auto& value = field.get(); + return paimon::Literal(field_type, value.data(), value.size()); + } + case TYPE_DECIMALV2: + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128I: + case TYPE_DECIMAL256: { + if (!_is_decimal_type(literal_primitive)) { + return std::nullopt; + } + int32_t precision = static_cast(literal_type->get_precision()); + int32_t scale = static_cast(literal_type->get_scale()); + if (precision <= 0 || precision > paimon::Decimal::MAX_PRECISION) { + return std::nullopt; + } + + paimon::Decimal::int128_t value = 0; + switch (literal_primitive) { + case TYPE_DECIMALV2: { + const auto& dec = field.get(); + value = dec.value(); + break; + } + case TYPE_DECIMAL32: { + const auto& dec = field.get(); + value = dec.value; + break; + } + case TYPE_DECIMAL64: { + const auto& dec = field.get(); + value = dec.value; + break; + } + case TYPE_DECIMAL128I: { + const auto& dec = field.get(); + value = dec.value; + break; + } + default: + return std::nullopt; + } + return paimon::Literal(paimon::Decimal(precision, scale, value)); + } + default: + break; + } + return std::nullopt; +} + +std::optional PaimonPredicateConverter::_extract_string_literal( + const VExprSPtr& expr) const { + auto literal_expr = VExpr::expr_without_cast(expr); + auto* literal = dynamic_cast(literal_expr.get()); + if (!literal) { + return std::nullopt; + } + auto literal_type = remove_nullable(literal->get_data_type()); + PrimitiveType literal_primitive = literal_type->get_primitive_type(); + if (!_is_string_type(literal_primitive)) { + return std::nullopt; + } + + ColumnPtr col = literal->get_column_ptr()->convert_to_full_column_if_const(); + if (const auto* nullable = check_and_get_column(*col)) { + if (nullable->is_null_at(0)) { + return std::nullopt; + } + col = nullable->get_nested_column_ptr(); + } + Field field; + col->get(0, field); + const auto& value = field.get(); + return value; +} + +std::string PaimonPredicateConverter::_normalize_name(std::string_view name) { + std::string out(name); + std::transform(out.begin(), out.end(), out.begin(), + [](unsigned char c) { return static_cast(std::tolower(c)); }); + return out; +} + +std::optional PaimonPredicateConverter::_next_prefix(const std::string& prefix) { + if (prefix.empty()) { + return std::nullopt; + } + std::string upper = prefix; + for (int i = static_cast(upper.size()) - 1; i >= 0; --i) { + auto c = static_cast(upper[i]); + if (c != 0xFF) { + upper[i] = static_cast(c + 1); + upper.resize(i + 1); + return upper; + } + } + return std::nullopt; +} + +int32_t PaimonPredicateConverter::_seconds_to_days(int64_t seconds) { + static constexpr int64_t kSecondsPerDay = 24 * 60 * 60; + int64_t days = seconds / kSecondsPerDay; + if (seconds < 0 && seconds % kSecondsPerDay != 0) { + --days; + } + return static_cast(days); +} + +bool PaimonPredicateConverter::_is_integer_type(PrimitiveType type) { + switch (type) { + case TYPE_TINYINT: + case TYPE_SMALLINT: + case TYPE_INT: + case TYPE_BIGINT: + return true; + default: + return false; + } +} + +bool PaimonPredicateConverter::_is_string_type(PrimitiveType type) { + return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_STRING; +} + +bool PaimonPredicateConverter::_is_decimal_type(PrimitiveType type) { + switch (type) { + case TYPE_DECIMALV2: + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128I: + case TYPE_DECIMAL256: + return true; + default: + return false; + } +} + +bool PaimonPredicateConverter::_is_date_type(PrimitiveType type) { + return type == TYPE_DATE || type == TYPE_DATEV2; +} + +bool PaimonPredicateConverter::_is_datetime_type(PrimitiveType type) { + return type == TYPE_DATETIME || type == TYPE_DATETIMEV2; +} + +std::optional PaimonPredicateConverter::_to_paimon_field_type( + PrimitiveType type, uint32_t precision) { + switch (type) { + case TYPE_BOOLEAN: + return paimon::FieldType::BOOLEAN; + case TYPE_TINYINT: + return paimon::FieldType::TINYINT; + case TYPE_SMALLINT: + return paimon::FieldType::SMALLINT; + case TYPE_INT: + return paimon::FieldType::INT; + case TYPE_BIGINT: + return paimon::FieldType::BIGINT; + case TYPE_DOUBLE: + return paimon::FieldType::DOUBLE; + case TYPE_VARCHAR: + case TYPE_STRING: + return paimon::FieldType::STRING; + case TYPE_DATE: + case TYPE_DATEV2: + return paimon::FieldType::DATE; + case TYPE_DATETIME: + case TYPE_DATETIMEV2: + return paimon::FieldType::TIMESTAMP; + case TYPE_DECIMALV2: + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128I: + case TYPE_DECIMAL256: + if (precision > 0 && precision > paimon::Decimal::MAX_PRECISION) { + return std::nullopt; + } + return paimon::FieldType::DECIMAL; + case TYPE_FLOAT: + case TYPE_CHAR: + default: + return std::nullopt; + } +} + +#include "common/compile_check_end.h" +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_predicate_converter.h b/be/src/vec/exec/format/table/paimon_predicate_converter.h new file mode 100644 index 00000000000000..a844b497d52179 --- /dev/null +++ b/be/src/vec/exec/format/table/paimon_predicate_converter.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "cctz/time_zone.h" +#include "paimon/defs.h" +#include "paimon/predicate/literal.h" +#include "runtime/define_primitive_type.h" +#include "vec/exprs/vexpr_fwd.h" + +namespace paimon { +class Predicate; +} // namespace paimon + +namespace doris { +class RuntimeState; +class SlotDescriptor; +} // namespace doris + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +class PaimonPredicateConverter { +public: + PaimonPredicateConverter(const std::vector& file_slot_descs, + RuntimeState* state); + + std::shared_ptr build(const VExprContextSPtrs& conjuncts); + +private: + struct FieldMeta { + int32_t index = -1; + paimon::FieldType field_type = paimon::FieldType::UNKNOWN; + const SlotDescriptor* slot_desc = nullptr; + }; + + std::shared_ptr _convert_expr(const VExprSPtr& expr); + std::shared_ptr _convert_compound(const VExprSPtr& expr); + std::shared_ptr _convert_in(const VExprSPtr& expr); + std::shared_ptr _convert_binary(const VExprSPtr& expr); + std::shared_ptr _convert_is_null(const VExprSPtr& expr, + const std::string& fn_name); + std::shared_ptr _convert_like_prefix(const VExprSPtr& expr); + + std::optional _resolve_field(const VExprSPtr& expr) const; + std::optional _convert_literal(const VExprSPtr& expr, + const SlotDescriptor& slot_desc, + paimon::FieldType field_type) const; + std::optional _extract_string_literal(const VExprSPtr& expr) const; + + static std::string _normalize_name(std::string_view name); + static std::optional _next_prefix(const std::string& prefix); + static int32_t _seconds_to_days(int64_t seconds); + static bool _is_integer_type(PrimitiveType type); + static bool _is_string_type(PrimitiveType type); + static bool _is_decimal_type(PrimitiveType type); + static bool _is_date_type(PrimitiveType type); + static bool _is_datetime_type(PrimitiveType type); + static std::optional _to_paimon_field_type(PrimitiveType type, + uint32_t precision); + + std::unordered_map _field_index_by_name; + RuntimeState* _state = nullptr; + cctz::time_zone _gmt_tz; +}; + +#include "common/compile_check_end.h" +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index 1b90c23b0b79bc..59ef0c80573d98 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -69,7 +69,9 @@ #include "vec/exec/format/table/iceberg_reader.h" #include "vec/exec/format/table/lakesoul_jni_reader.h" #include "vec/exec/format/table/max_compute_jni_reader.h" +#include "vec/exec/format/table/paimon_cpp_reader.h" #include "vec/exec/format/table/paimon_jni_reader.h" +#include "vec/exec/format/table/paimon_predicate_converter.h" #include "vec/exec/format/table/paimon_reader.h" #include "vec/exec/format/table/remote_doris_reader.h" #include "vec/exec/format/table/transactional_hive_reader.h" @@ -995,9 +997,25 @@ Status FileScanner::_get_next_reader() { _cur_reader = std::move(mc_reader); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "paimon") { - _cur_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, - range, _params); - init_status = ((PaimonJniReader*)(_cur_reader.get()))->init_reader(); + if (_state->query_options().__isset.enable_paimon_cpp_reader && + _state->query_options().enable_paimon_cpp_reader) { + auto cpp_reader = PaimonCppReader::create_unique(_file_slot_descs, _state, + _profile, range, _params); + cpp_reader->set_push_down_agg_type(_get_push_down_agg_type()); + if (!_is_load && !_push_down_conjuncts.empty()) { + PaimonPredicateConverter predicate_converter(_file_slot_descs, _state); + auto predicate = predicate_converter.build(_push_down_conjuncts); + if (predicate) { + cpp_reader->set_predicate(std::move(predicate)); + } + } + init_status = cpp_reader->init_reader(); + _cur_reader = std::move(cpp_reader); + } else { + _cur_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, + range, _params); + init_status = ((PaimonJniReader*)(_cur_reader.get()))->init_reader(); + } } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "hudi") { _cur_reader = HudiJniReader::create_unique(*_params, @@ -1018,8 +1036,9 @@ Status FileScanner::_get_next_reader() { } // Set col_name_to_block_idx for JNI readers to avoid repeated map creation if (_cur_reader) { - static_cast(_cur_reader.get()) - ->set_col_name_to_block_idx(&_src_block_name_to_idx); + if (auto* jni_reader = dynamic_cast(_cur_reader.get())) { + jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx); + } } break; } diff --git a/be/test/vec/exec/format/table/paimon_cpp_reader_test.cpp b/be/test/vec/exec/format/table/paimon_cpp_reader_test.cpp new file mode 100644 index 00000000000000..95fb22d9b65d77 --- /dev/null +++ b/be/test/vec/exec/format/table/paimon_cpp_reader_test.cpp @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/table/paimon_cpp_reader.h" + +#include + +#include +#include + +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" + +namespace doris::vectorized { + +class PaimonCppReaderTest : public testing::Test { +protected: + void SetUp() override { + _query_options.__set_batch_size(3); + _runtime_state = std::make_unique(_query_options, _query_globals); + } + + TFileRangeDesc _build_range_with_table_level_row_count(int64_t row_count) { + TFileRangeDesc range; + range.__isset.table_format_params = true; + range.table_format_params.__isset.table_level_row_count = true; + range.table_format_params.table_level_row_count = row_count; + return range; + } + + TQueryOptions _query_options; + TQueryGlobals _query_globals; + std::unique_ptr _runtime_state; + RuntimeProfile _profile {"paimon_cpp_reader_test"}; + std::vector _file_slot_descs; +}; + +TEST_F(PaimonCppReaderTest, CountPushDownUsesTableLevelRowCount) { + auto range = _build_range_with_table_level_row_count(5); + PaimonCppReader reader(_file_slot_descs, _runtime_state.get(), &_profile, range, nullptr); + reader.set_push_down_agg_type(TPushAggOp::type::COUNT); + + auto init_status = reader.init_reader(); + ASSERT_TRUE(init_status.ok()) << init_status; + + Block block; + size_t read_rows = 0; + bool eof = false; + + auto first_status = reader.get_next_block(&block, &read_rows, &eof); + ASSERT_TRUE(first_status.ok()) << first_status; + EXPECT_EQ(3, read_rows); + EXPECT_FALSE(eof); + + auto second_status = reader.get_next_block(&block, &read_rows, &eof); + ASSERT_TRUE(second_status.ok()) << second_status; + EXPECT_EQ(2, read_rows); + EXPECT_TRUE(eof); + + auto third_status = reader.get_next_block(&block, &read_rows, &eof); + ASSERT_TRUE(third_status.ok()) << third_status; + EXPECT_EQ(0, read_rows); + EXPECT_TRUE(eof); +} + +TEST_F(PaimonCppReaderTest, InitReaderFailsWithoutPaimonSplit) { + TFileRangeDesc range; + range.__isset.table_format_params = true; + range.table_format_params.__isset.paimon_params = true; + range.table_format_params.paimon_params.__isset.paimon_table = true; + range.table_format_params.paimon_params.paimon_table = "s3://bucket/db.tbl"; + + PaimonCppReader reader(_file_slot_descs, _runtime_state.get(), &_profile, range, nullptr); + auto status = reader.init_reader(); + + ASSERT_FALSE(status.ok()); + EXPECT_NE(status.to_string().find("missing paimon_split"), std::string::npos); +} + +} // namespace doris::vectorized diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonUtils.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonUtils.java index 44ffb298c98c33..d1d6c2f9bdb579 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonUtils.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonUtils.java @@ -26,7 +26,8 @@ import java.util.stream.Collectors; public class PaimonUtils { - private static final Base64.Decoder DECODER = Base64.getUrlDecoder(); + private static final Base64.Decoder URL_DECODER = Base64.getUrlDecoder(); + private static final Base64.Decoder STD_DECODER = Base64.getDecoder(); public static List getFieldNames(RowType rowType) { return rowType.getFields().stream() @@ -37,9 +38,14 @@ public static List getFieldNames(RowType rowType) { public static T deserialize(String encodedStr) { try { - return InstantiationUtil.deserializeObject( - DECODER.decode(encodedStr.getBytes(java.nio.charset.StandardCharsets.UTF_8)), - PaimonUtils.class.getClassLoader()); + byte[] decoded; + try { + decoded = URL_DECODER.decode(encodedStr.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + } catch (IllegalArgumentException e) { + // Fallback to standard Base64 for splits encoded by native Paimon serialization. + decoded = STD_DECODER.decode(encodedStr.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + } + return InstantiationUtil.deserializeObject(decoded, PaimonUtils.class.getClassLoader()); } catch (Throwable e) { throw new RuntimeException(e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 08358a3da99c65..5a8910fa00c363 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -50,6 +50,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.io.DataOutputViewStreamWrapper; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.partition.Partition; import org.apache.paimon.predicate.Predicate; @@ -58,6 +59,7 @@ import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.tag.Tag; import org.apache.paimon.types.ArrayType; @@ -76,6 +78,7 @@ import org.apache.paimon.utils.Projection; import org.apache.paimon.utils.RowDataToObjectArrayConverter; +import java.io.ByteArrayOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.time.DateTimeException; @@ -451,6 +454,23 @@ public static String encodeObjectToString(T t) { } } + /** + * Serialize DataSplit using Paimon's native binary format. + * This format is compatible with paimon-cpp reader. + * Uses standard Base64 encoding (not URL-safe) for BE compatibility. + */ + public static String encodeDataSplitToString(DataSplit split) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + split.serialize(out); + byte[] bytes = baos.toByteArray(); + return Base64.getEncoder().encodeToString(bytes); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize DataSplit using Paimon native format", e); + } + } + public static Map getPartitionInfoMap(Table table, BinaryRow partitionValues, String timeZone) { Map partitionInfoMap = new HashMap<>(); List partitionKeys = table.partitionKeys(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 1783477b7ade34..8402f0a757e5ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -221,9 +221,19 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) String fileFormat = getFileFormat(paimonSplit.getPathString()); if (split != null) { - // use jni reader + // use jni reader or paimon-cpp reader rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI); - fileDesc.setPaimonSplit(PaimonUtil.encodeObjectToString(split)); + // Use Paimon native serialization for paimon-cpp reader + if (sessionVariable.isEnablePaimonCppReader() && split instanceof DataSplit) { + fileDesc.setPaimonSplit(PaimonUtil.encodeDataSplitToString((DataSplit) split)); + } else { + fileDesc.setPaimonSplit(PaimonUtil.encodeObjectToString(split)); + } + // Set table location for paimon-cpp reader + String tableLocation = source.getTableLocation(); + if (tableLocation != null) { + fileDesc.setPaimonTable(tableLocation); + } rangeDesc.setSelfSplitWeight(paimonSplit.getSelfSplitWeight()); } else { // use native reader diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index 69ab8a7fbc2677..1f654394f9149c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -26,6 +26,7 @@ import org.apache.doris.thrift.TFileAttributes; import com.google.common.annotations.VisibleForTesting; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -70,4 +71,12 @@ public ExternalCatalog getCatalog() { public String getFileFormatFromTableProperties() { return originTable.options().getOrDefault("file.format", "parquet"); } + + public String getTableLocation() { + if (originTable instanceof FileStoreTable) { + return ((FileStoreTable) originTable).location().toString(); + } + // Fallback to path option + return originTable.options().get("path"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 779991183542fa..17be8472bf0d58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -734,6 +734,8 @@ public class SessionVariable implements Serializable, Writable { public static final String FORCE_JNI_SCANNER = "force_jni_scanner"; + public static final String ENABLE_PAIMON_CPP_READER = "enable_paimon_cpp_reader"; + public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE = "enable_count_push_down_for_external_table"; public static final String FETCH_ALL_FE_FOR_SYSTEM_TABLE = "fetch_all_fe_for_system_table"; @@ -1289,13 +1291,14 @@ public void checkQuerySlotCount(String slotCnt) { public enum IgnoreSplitType { NONE, IGNORE_JNI, - IGNORE_NATIVE + IGNORE_NATIVE, + IGNORE_PAIMON_CPP } public static final String IGNORE_SPLIT_TYPE = "ignore_split_type"; @VariableMgr.VarAttr(name = IGNORE_SPLIT_TYPE, checker = "checkIgnoreSplitType", - options = {"NONE", "IGNORE_JNI", "IGNORE_NATIVE"}, + options = {"NONE", "IGNORE_JNI", "IGNORE_NATIVE", "IGNORE_PAIMON_CPP"}, description = {"忽略指定类型的 split", "Ignore splits of the specified type"}) public String ignoreSplitType = IgnoreSplitType.NONE.toString(); @@ -2722,6 +2725,11 @@ public boolean isEnableHboNonStrictMatchingMode() { description = {"强制使用 jni 方式读取外表", "Force the use of jni mode to read external table"}) private boolean forceJniScanner = false; + @VariableMgr.VarAttr(name = ENABLE_PAIMON_CPP_READER, + fuzzy = true, + description = {"Paimon 非原生文件读取使用 paimon-cpp", "Use paimon-cpp for non-native Paimon reads"}) + private boolean enablePaimonCppReader = false; + @VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE, fuzzy = true, description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown optimization for external table"}) @@ -3491,6 +3499,7 @@ private void setFuzzyForCatalog(Random random) { // jni this.forceJniScanner = random.nextBoolean(); + this.enablePaimonCppReader = random.nextBoolean(); // statistics this.fetchHiveRowCountSync = random.nextBoolean(); @@ -5072,6 +5081,7 @@ public TQueryOptions toThrift() { tResult.setEnableParquetFilterByMinMax(enableParquetFilterByMinMax); tResult.setEnableParquetFilterByBloomFilter(enableParquetFilterByBloomFilter); tResult.setEnableOrcFilterByMinMax(enableOrcFilterByMinMax); + tResult.setEnablePaimonCppReader(enablePaimonCppReader); tResult.setCheckOrcInitSargsSuccess(checkOrcInitSargsSuccess); tResult.setTruncateCharOrVarcharColumns(truncateCharOrVarcharColumns); @@ -5806,6 +5816,10 @@ public boolean isForceJniScanner() { return forceJniScanner; } + public boolean isEnablePaimonCppReader() { + return enablePaimonCppReader; + } + public String getIgnoreSplitType() { return ignoreSplitType; } @@ -5814,7 +5828,8 @@ public void checkIgnoreSplitType(String value) { try { IgnoreSplitType.valueOf(value); } catch (Exception e) { - throw new UnsupportedOperationException("We only support `NONE`, `IGNORE_JNI` and `IGNORE_NATIVE`"); + throw new UnsupportedOperationException( + "We only support `NONE`, `IGNORE_JNI`, `IGNORE_NATIVE` and `IGNORE_PAIMON_CPP`"); } } @@ -5826,6 +5841,10 @@ public void setForceJniScanner(boolean force) { forceJniScanner = force; } + public void setEnablePaimonCppReader(boolean enable) { + enablePaimonCppReader = enable; + } + public boolean isEnableCountPushDownForExternalTable() { return enableCountPushDownForExternalTable; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 3217eeacb06874..f7512fc06b2f7b 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -431,6 +431,9 @@ struct TQueryOptions { 195: optional bool enable_left_semi_direct_return_opt; + 200: optional bool enable_adjust_conjunct_order_by_cost; + // Use paimon-cpp to read Paimon splits on BE + 201: optional bool enable_paimon_cpp_reader = false; // Whether all fragments of this query are assigned to a single backend. // When true, the streaming aggregation operator can use more aggressive // hash table expansion thresholds since all data is local. diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_cpp_reader.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_cpp_reader.groovy new file mode 100644 index 00000000000000..64ef323add6119 --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_cpp_reader.groovy @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_cpp_reader", "p0,external") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled paimon test") + return + } + + String catalogName = "test_paimon_cpp_reader" + String hdfsPort = context.config.otherConfigs.get("hive2HdfsPort") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + try { + sql """drop catalog if exists ${catalogName}""" + sql """create catalog if not exists ${catalogName} properties ( + "type" = "paimon", + "paimon.catalog.type" = "filesystem", + "warehouse" = "hdfs://${externalEnvIp}:${hdfsPort}/user/doris/paimon1" + );""" + sql """switch ${catalogName}""" + sql """use db1""" + // Do not force JNI; keep default selection behavior. + sql """set force_jni_scanner=false""" + + def testQueries = [ + """select c1 from complex_all order by c1""", + """select c1 from complex_all where c1 >= 2 order by c1""", + """select * from all_table order by c1""", + """select * from all_table_with_parquet where c13 like '13%' order by c1""", + """select * from complex_tab order by c1""", + """select c3['a_test'], c3['b_test'], c3['bbb'], c3['ccc'] from complex_tab order by c3['a_test'], c3['b_test']""", + """select array_max(c2) c from complex_tab order by c""", + """select c20[0] c from complex_all order by c""", + """select * from deletion_vector_orc""", + """select * from deletion_vector_parquet""" + ] + + // Default path is JNI when enable_paimon_cpp_reader=false. + sql """set enable_paimon_cpp_reader=false""" + def jniResults = testQueries.collect { query -> sql(query) } + + sql """set enable_paimon_cpp_reader=true""" + def cppResults = testQueries.collect { query -> sql(query) } + + assertTrue(cppResults[0].size() > 0) + for (int i = 0; i < testQueries.size(); i++) { + assertEquals(jniResults[i].toString(), cppResults[i].toString()) + } + } finally { + sql """set enable_paimon_cpp_reader=false""" + sql """set force_jni_scanner=false""" + sql """drop catalog if exists ${catalogName}""" + } +}