From 4ec377cdd4adcdcabe27109438b0074f27b236dd Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 26 Jun 2026 20:07:02 +0800 Subject: [PATCH 1/2] [feature](be) Add Remote Doris reader to file scanner v2 ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: FileScannerV2 could not read Remote Doris Arrow Flight splits because FORMAT_ARROW was not routed to a v2 table reader and no v2-native Remote Doris reader existed. This change adds a Remote Doris TableReader/FileReader implementation for FileScannerV2 that opens Arrow Flight streams directly, builds the file-local schema from planned file slots, materializes Arrow RecordBatch data by column name into the v2 file-local block, applies localized filters through the v2 materialized-reader helper, validates protocol mismatches, and closes Flight resources. FORMAT_ARROW is enabled in FileScannerV2 only for table_format_type=remote_doris so ordinary Arrow stream files stay on the existing path. ### Release note Support Remote Doris scans in FileScannerV2 when FileScannerV2 is enabled. ### Check List (For Author) - Test: Manual test - BE unit test: attempted PARALLEL=1 ./run-be-ut.sh --run --filter='FileScannerV2Test.*:RemoteDorisV2ReaderTest.*', but the sandbox could not update .git/modules/contrib/datasketches-cpp and network fallback to github.com was unavailable; escalated retries timed out in approval review. - Manual test: python3 build-support/run_clang_format.py --clang-format-executable /usr/local/opt/llvm@16/bin/clang-format --style file --inplace false --extensions c,h,C,H,cpp,hpp,cc,hh,c++,h++,cxx,hxx --exclude none - Behavior changed: Yes. Remote Doris FORMAT_ARROW scan ranges can be routed to FileScannerV2. - Does this need documentation: No --- be/src/exec/scan/file_scanner_v2.cpp | 12 + be/src/format_v2/file_reader.h | 1 + .../format_v2/table/remote_doris_reader.cpp | 310 ++++++++++++++ be/src/format_v2/table/remote_doris_reader.h | 104 +++++ be/src/format_v2/table_reader.cpp | 2 + be/test/exec/scan/file_scanner_v2_test.cpp | 4 + .../table/remote_doris_reader_test.cpp | 387 ++++++++++++++++++ 7 files changed, 820 insertions(+) create mode 100644 be/src/format_v2/table/remote_doris_reader.cpp create mode 100644 be/src/format_v2/table/remote_doris_reader.h create mode 100644 be/test/format_v2/table/remote_doris_reader_test.cpp diff --git a/be/src/exec/scan/file_scanner_v2.cpp b/be/src/exec/scan/file_scanner_v2.cpp index 1e3c3479ca77be..7ed65fc92900b6 100644 --- a/be/src/exec/scan/file_scanner_v2.cpp +++ b/be/src/exec/scan/file_scanner_v2.cpp @@ -54,6 +54,7 @@ #include "format_v2/table/hudi_reader.h" #include "format_v2/table/iceberg_reader.h" #include "format_v2/table/paimon_reader.h" +#include "format_v2/table/remote_doris_reader.h" #include "format_v2/table_reader.h" #include "io/fs/file_meta_cache.h" #include "io/io_common.h" @@ -90,6 +91,10 @@ bool is_supported_table_format(const TFileRangeDesc& range) { table_format == "iceberg" || table_format == "paimon" || table_format == "hudi"; } +bool is_supported_arrow_table_format(const TFileRangeDesc& range) { + return table_format_name(range) == "remote_doris"; +} + bool is_supported_jni_table_format(const TFileRangeDesc& range) { const auto table_format = table_format_name(range); if (table_format == "paimon") { @@ -222,6 +227,8 @@ bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFile const auto format_type = get_range_format_type(params, range); if (format_type == TFileFormatType::FORMAT_PARQUET) { return is_supported_table_format(range); + } else if (format_type == TFileFormatType::FORMAT_ARROW) { + return is_supported_arrow_table_format(range); } else if (format_type == TFileFormatType::FORMAT_JNI) { return is_supported_jni_table_format(range); } else if (is_csv_format(format_type) || is_text_format(format_type) || @@ -379,6 +386,8 @@ Status FileScannerV2::_create_table_reader_for_format( *reader = std::make_unique(mc_desc); } else if (table_format == "trino_connector") { *reader = std::make_unique(); + } else if (table_format == "remote_doris") { + *reader = std::make_unique(); } else { return Status::NotSupported("FileScannerV2 does not support table format {}", table_format); } @@ -622,6 +631,9 @@ Status FileScannerV2::_to_file_format(TFileFormatType::type format_type, case TFileFormatType::FORMAT_NATIVE: *file_format = format::FileFormat::NATIVE; return Status::OK(); + case TFileFormatType::FORMAT_ARROW: + *file_format = format::FileFormat::ARROW; + return Status::OK(); default: return Status::NotSupported("FileScannerV2 does not support file format {}", to_string(format_type)); diff --git a/be/src/format_v2/file_reader.h b/be/src/format_v2/file_reader.h index 0cdd68c03fb3ef..6b990351aae928 100644 --- a/be/src/format_v2/file_reader.h +++ b/be/src/format_v2/file_reader.h @@ -118,6 +118,7 @@ enum class FileFormat { TEXT, JNI, NATIVE, + ARROW, }; // 通用文件层 scan 请求。 diff --git a/be/src/format_v2/table/remote_doris_reader.cpp b/be/src/format_v2/table/remote_doris_reader.cpp new file mode 100644 index 00000000000000..03fe669287101c --- /dev/null +++ b/be/src/format_v2/table/remote_doris_reader.cpp @@ -0,0 +1,310 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format_v2/table/remote_doris_reader.h" + +#include +#include + +#include +#include +#include +#include + +#include "common/cast_set.h" +#include "core/block/block.h" +#include "core/data_type/data_type.h" +#include "core/data_type_serde/data_type_serde.h" +#include "format/arrow/arrow_utils.h" +#include "format_v2/materialized_reader_util.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "util/timezone_utils.h" + +namespace doris::format::remote_doris { +namespace { + +Status validate_remote_doris_range(const TFileRangeDesc& range) { + if (!range.__isset.table_format_params || + range.table_format_params.table_format_type != "remote_doris") { + return Status::InvalidArgument("Remote Doris v2 reader requires remote_doris table format"); + } + if (!range.table_format_params.__isset.remote_doris_params) { + return Status::InvalidArgument("Remote Doris v2 reader requires remote_doris_params"); + } + const auto& params = range.table_format_params.remote_doris_params; + if (!params.__isset.location_uri || params.location_uri.empty()) { + return Status::InvalidArgument("Remote Doris v2 reader requires location_uri"); + } + if (!params.__isset.ticket || params.ticket.empty()) { + return Status::InvalidArgument("Remote Doris v2 reader requires ticket"); + } + return Status::OK(); +} + +class FlightRemoteDorisStream final : public RemoteDorisStream { +public: + explicit FlightRemoteDorisStream(const TFileRangeDesc& range) : _range(range) {} + + Status open() { + RETURN_IF_ERROR(validate_remote_doris_range(_range)); + const auto& params = _range.table_format_params.remote_doris_params; + arrow::flight::Location location; + RETURN_DORIS_STATUS_IF_ERROR( + arrow::flight::Location::Parse(params.location_uri).Value(&location)); + arrow::flight::Ticket ticket; + RETURN_DORIS_STATUS_IF_ERROR( + arrow::flight::Ticket::Deserialize(params.ticket).Value(&ticket)); + RETURN_DORIS_STATUS_IF_ERROR( + arrow::flight::FlightClient::Connect(location).Value(&_flight_client)); + RETURN_DORIS_STATUS_IF_ERROR(_flight_client->DoGet(ticket).Value(&_stream)); + return Status::OK(); + } + + Status next(std::shared_ptr* batch) override { + DORIS_CHECK(batch != nullptr); + arrow::flight::FlightStreamChunk chunk; + RETURN_DORIS_STATUS_IF_ERROR(_stream->Next().Value(&chunk)); + *batch = chunk.data; + return Status::OK(); + } + + Status close() override { + _stream.reset(); + if (_flight_client != nullptr) { + RETURN_DORIS_STATUS_IF_ERROR(_flight_client->Close()); + _flight_client.reset(); + } + return Status::OK(); + } + +private: + const TFileRangeDesc _range; + std::unique_ptr _flight_client; + std::unique_ptr _stream; +}; + +Status create_flight_stream(const TFileRangeDesc& range, std::unique_ptr* out) { + DORIS_CHECK(out != nullptr); + auto stream = std::make_unique(range); + RETURN_IF_ERROR(stream->open()); + *out = std::move(stream); + return Status::OK(); +} + +} // namespace + +RemoteDorisFileReader::RemoteDorisFileReader( + std::shared_ptr& system_properties, + std::unique_ptr& file_description, + std::shared_ptr io_ctx, RuntimeProfile* profile, const TFileRangeDesc& range, + const std::vector& file_slot_descs, + RemoteDorisStreamFactory stream_factory) + : FileReader(system_properties, file_description, std::move(io_ctx), profile), + _range(range), + _file_slot_descs(file_slot_descs), + _stream_factory(std::move(stream_factory)) { + TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctz); +} + +RemoteDorisFileReader::~RemoteDorisFileReader() { + static_cast(close()); +} + +Status RemoteDorisFileReader::init(RuntimeState* state) { + (void)state; + RETURN_IF_ERROR(validate_remote_doris_range(_range)); + RETURN_IF_ERROR(_build_col_name_to_file_id()); + _eof = false; + return Status::OK(); +} + +Status RemoteDorisFileReader::get_schema(std::vector* file_schema) const { + DORIS_CHECK(file_schema != nullptr); + file_schema->clear(); + file_schema->reserve(_file_slot_descs.size()); + for (size_t idx = 0; idx < _file_slot_descs.size(); ++idx) { + const auto* slot = _file_slot_descs[idx]; + DORIS_CHECK(slot != nullptr); + file_schema->push_back({ + .identifier = Field::create_field(cast_set(idx)), + .local_id = cast_set(idx), + .name = slot->col_name(), + .type = slot->type(), + }); + } + return Status::OK(); +} + +Status RemoteDorisFileReader::open(std::shared_ptr request) { + RETURN_IF_ERROR(FileReader::open(std::move(request))); + RETURN_IF_ERROR(_open_stream()); + _eof = false; + return Status::OK(); +} + +Status RemoteDorisFileReader::get_block(Block* file_block, size_t* rows, bool* eof) { + DORIS_CHECK(file_block != nullptr); + DORIS_CHECK(rows != nullptr); + DORIS_CHECK(eof != nullptr); + if (_stream == nullptr) { + return Status::InternalError("Remote Doris v2 reader is not open"); + } + + *rows = 0; + *eof = false; + std::shared_ptr batch; + RETURN_IF_ERROR(_stream->next(&batch)); + if (batch == nullptr) { + *eof = true; + _eof = true; + return Status::OK(); + } + + RETURN_IF_ERROR(_materialize_record_batch(*batch, file_block, rows)); + RETURN_IF_ERROR( + apply_materialized_reader_filters(_request.get(), _io_ctx.get(), file_block, rows)); + _reader_statistics.read_rows += *rows; + return Status::OK(); +} + +Status RemoteDorisFileReader::close() { + if (_stream != nullptr) { + RETURN_IF_ERROR(_stream->close()); + _stream.reset(); + } + _request.reset(); + _eof = true; + return Status::OK(); +} + +Status RemoteDorisFileReader::_open_stream() { + DORIS_CHECK(_stream == nullptr); + if (_stream_factory) { + RETURN_IF_ERROR(_stream_factory(_range, &_stream)); + } else { + RETURN_IF_ERROR(create_flight_stream(_range, &_stream)); + } + DORIS_CHECK(_stream != nullptr); + return Status::OK(); +} + +Status RemoteDorisFileReader::_materialize_record_batch(const arrow::RecordBatch& batch, + Block* file_block, size_t* rows) const { + DORIS_CHECK(file_block != nullptr); + DORIS_CHECK(rows != nullptr); + if (_request == nullptr) { + return Status::InternalError("Remote Doris v2 reader is not open"); + } + + std::vector materialized_columns(file_block->columns(), false); + for (int arrow_idx = 0; arrow_idx < batch.num_columns(); ++arrow_idx) { + const std::string& column_name = batch.schema()->field(arrow_idx)->name(); + const auto file_id_it = _col_name_to_file_id.find(column_name); + if (file_id_it == _col_name_to_file_id.end()) { + return Status::InternalError("Remote Doris returned unknown column {}", column_name); + } + const auto block_position_it = _request->local_positions.find(file_id_it->second); + if (block_position_it == _request->local_positions.end()) { + continue; + } + RETURN_IF_ERROR(_materialize_arrow_column(batch, arrow_idx, file_id_it->second, + block_position_it->second, file_block)); + materialized_columns[block_position_it->second.value()] = true; + } + + for (const auto& [file_column_id, block_position] : _request->local_positions) { + if (block_position.value() >= materialized_columns.size()) { + return Status::InternalError( + "Remote Doris requested block position {} out of range, block columns {}", + block_position.value(), materialized_columns.size()); + } + if (!materialized_columns[block_position.value()]) { + return Status::InternalError("Remote Doris did not return requested file column id {}", + file_column_id.value()); + } + } + + *rows = cast_set(batch.num_rows()); + return Status::OK(); +} + +Status RemoteDorisFileReader::_materialize_arrow_column(const arrow::RecordBatch& batch, + int arrow_column_idx, + LocalColumnId file_column_id, + const LocalIndex& block_position, + Block* file_block) const { + DORIS_CHECK(file_block != nullptr); + if (block_position.value() >= file_block->columns()) { + return Status::InternalError( + "Remote Doris block position {} out of range, block columns {}", + block_position.value(), file_block->columns()); + } + const auto column_name = batch.schema()->field(arrow_column_idx)->name(); + auto columns_guard = file_block->mutate_columns_scoped(); + auto& columns = columns_guard.mutable_columns(); + try { + RETURN_IF_ERROR(columns_guard.get_datatype_by_position(block_position.value()) + ->get_serde() + ->read_column_from_arrow(*columns[block_position.value()], + batch.column(arrow_column_idx).get(), 0, + batch.num_rows(), _ctz)); + } catch (const Exception& e) { + return Status::InternalError( + "Failed to convert Remote Doris Arrow column '{}' (file_column_id={}) to Doris " + "block: {}", + column_name, file_column_id.value(), e.what()); + } + return Status::OK(); +} + +Status RemoteDorisFileReader::_build_col_name_to_file_id() { + _col_name_to_file_id.clear(); + _col_name_to_file_id.reserve(_file_slot_descs.size()); + for (size_t idx = 0; idx < _file_slot_descs.size(); ++idx) { + const auto* slot = _file_slot_descs[idx]; + DORIS_CHECK(slot != nullptr); + _col_name_to_file_id.emplace(slot->col_name(), LocalColumnId(cast_set(idx))); + } + return Status::OK(); +} + +RemoteDorisReader::RemoteDorisReader(RemoteDorisStreamFactory stream_factory) + : _stream_factory(std::move(stream_factory)) {} + +Status RemoteDorisReader::init(TableReadOptions&& options) { + if (options.file_slot_descs == nullptr) { + return Status::InvalidArgument("Remote Doris v2 reader requires file slot descriptors"); + } + return TableReader::init(std::move(options)); +} + +Status RemoteDorisReader::prepare_split(const SplitReadOptions& options) { + RETURN_IF_ERROR(validate_remote_doris_range(options.current_range)); + return TableReader::prepare_split(options); +} + +Status RemoteDorisReader::create_file_reader(std::unique_ptr* reader) { + DORIS_CHECK(reader != nullptr); + DORIS_CHECK(_file_slot_descs != nullptr); + *reader = std::make_unique( + _system_properties, _current_task->data_file, _io_ctx, _scanner_profile, + _current_file_range_desc, *_file_slot_descs, _stream_factory); + return Status::OK(); +} + +} // namespace doris::format::remote_doris diff --git a/be/src/format_v2/table/remote_doris_reader.h b/be/src/format_v2/table/remote_doris_reader.h new file mode 100644 index 00000000000000..b4dd2a505a95ad --- /dev/null +++ b/be/src/format_v2/table/remote_doris_reader.h @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "format_v2/file_reader.h" +#include "format_v2/table_reader.h" +#include "gen_cpp/PlanNodes_types.h" + +namespace doris { +class Block; +class RuntimeProfile; +class RuntimeState; +class SlotDescriptor; +} // namespace doris + +namespace doris::format::remote_doris { + +// Small abstraction around Arrow Flight to keep Remote Doris v2 reader unit-testable without +// starting a Flight server. Production code uses FlightRemoteDorisStream; tests can provide +// RecordBatch-backed streams that exercise the same FileReader block materialization path. +class RemoteDorisStream { +public: + virtual ~RemoteDorisStream() = default; + virtual Status next(std::shared_ptr* batch) = 0; + virtual Status close() = 0; +}; + +using RemoteDorisStreamFactory = + std::function*)>; + +class RemoteDorisFileReader final : public FileReader { +public: + RemoteDorisFileReader(std::shared_ptr& system_properties, + std::unique_ptr& file_description, + std::shared_ptr io_ctx, RuntimeProfile* profile, + const TFileRangeDesc& range, + const std::vector& file_slot_descs, + RemoteDorisStreamFactory stream_factory = {}); + ~RemoteDorisFileReader() override; + + Status init(RuntimeState* state) override; + Status get_schema(std::vector* file_schema) const override; + Status open(std::shared_ptr request) override; + Status get_block(Block* file_block, size_t* rows, bool* eof) override; + Status close() override; + +private: + Status _open_stream(); + Status _materialize_record_batch(const arrow::RecordBatch& batch, Block* file_block, + size_t* rows) const; + Status _materialize_arrow_column(const arrow::RecordBatch& batch, int arrow_column_idx, + LocalColumnId file_column_id, const LocalIndex& block_position, + Block* file_block) const; + Status _build_col_name_to_file_id(); + + const TFileRangeDesc _range; + const std::vector _file_slot_descs; + RemoteDorisStreamFactory _stream_factory; + cctz::time_zone _ctz; + std::unique_ptr _stream; + std::unordered_map _col_name_to_file_id; +}; + +class RemoteDorisReader final : public TableReader { +public: + explicit RemoteDorisReader(RemoteDorisStreamFactory stream_factory = {}); + + Status init(TableReadOptions&& options) override; + Status prepare_split(const SplitReadOptions& options) override; + +protected: + Status create_file_reader(std::unique_ptr* reader) override; + +private: + RemoteDorisStreamFactory _stream_factory; +}; + +} // namespace doris::format::remote_doris diff --git a/be/src/format_v2/table_reader.cpp b/be/src/format_v2/table_reader.cpp index 5343a5a56323b6..2013b8c14403c0 100644 --- a/be/src/format_v2/table_reader.cpp +++ b/be/src/format_v2/table_reader.cpp @@ -83,6 +83,8 @@ std::string file_format_to_string(FileFormat format) { return "JNI"; case FileFormat::NATIVE: return "NATIVE"; + case FileFormat::ARROW: + return "ARROW"; } return "UNKNOWN"; } diff --git a/be/test/exec/scan/file_scanner_v2_test.cpp b/be/test/exec/scan/file_scanner_v2_test.cpp index 43240cd7d17e49..c44cc70628b7d1 100644 --- a/be/test/exec/scan/file_scanner_v2_test.cpp +++ b/be/test/exec/scan/file_scanner_v2_test.cpp @@ -108,6 +108,9 @@ TEST(FileScannerV2Test, SupportedFormatMatrix) { {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_TEXT, true}, {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_JSON, true}, {"tvf", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_NATIVE, true}, + {"remote_doris", TFileFormatType::FORMAT_ARROW, std::nullopt, true}, + {"hive", TFileFormatType::FORMAT_ARROW, std::nullopt, false}, + {"", TFileFormatType::FORMAT_ARROW, std::nullopt, false}, {"", TFileFormatType::FORMAT_WAL, std::nullopt, false}, }; @@ -183,6 +186,7 @@ TEST(FileScannerV2Test, FileFormatConversionMatrix) { {TFileFormatType::FORMAT_TEXT, format::FileFormat::TEXT}, {TFileFormatType::FORMAT_JSON, format::FileFormat::JSON}, {TFileFormatType::FORMAT_NATIVE, format::FileFormat::NATIVE}, + {TFileFormatType::FORMAT_ARROW, format::FileFormat::ARROW}, {TFileFormatType::FORMAT_ORC, std::nullopt}, }; diff --git a/be/test/format_v2/table/remote_doris_reader_test.cpp b/be/test/format_v2/table/remote_doris_reader_test.cpp new file mode 100644 index 00000000000000..8762f41db83b2f --- /dev/null +++ b/be/test/format_v2/table/remote_doris_reader_test.cpp @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format_v2/table/remote_doris_reader.h" + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/object_pool.h" +#include "core/assert_cast.h" +#include "core/block/block.h" +#include "core/column/column_nullable.h" +#include "core/column/column_string.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_nullable.h" +#include "core/data_type/data_type_number.h" +#include "core/data_type/data_type_string.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" +#include "format_v2/file_reader.h" +#include "gen_cpp/PlanNodes_types.h" +#include "io/file_factory.h" +#include "io/io_common.h" +#include "runtime/runtime_profile.h" +#include "runtime/runtime_state.h" +#include "testutil/desc_tbl_builder.h" + +namespace doris::format::remote_doris { +namespace { + +class BatchRemoteDorisStream final : public RemoteDorisStream { +public: + BatchRemoteDorisStream(std::vector> batches, + std::shared_ptr close_count) + : _batches(std::move(batches)), _close_count(std::move(close_count)) {} + + Status next(std::shared_ptr* batch) override { + DORIS_CHECK(batch != nullptr); + if (_next_batch >= _batches.size()) { + *batch = nullptr; + return Status::OK(); + } + *batch = _batches[_next_batch++]; + return Status::OK(); + } + + Status close() override { + ++(*_close_count); + return Status::OK(); + } + +private: + std::vector> _batches; + std::shared_ptr _close_count; + size_t _next_batch = 0; +}; + +TFileRangeDesc remote_doris_range() { + TRemoteDorisFileDesc remote_desc; + remote_desc.__set_location_uri("grpc://127.0.0.1:9050"); + remote_desc.__set_ticket("ticket-bytes"); + + TTableFormatFileDesc table_desc; + table_desc.__set_table_format_type("remote_doris"); + table_desc.__set_remote_doris_params(std::move(remote_desc)); + + TFileRangeDesc range; + range.__set_format_type(TFileFormatType::FORMAT_ARROW); + range.__set_path("/dummyPath"); + range.__set_table_format_params(std::move(table_desc)); + return range; +} + +std::vector remote_slots(ObjectPool* pool, DescriptorTbl** desc_tbl) { + DescriptorTblBuilder builder(pool); + builder.declare_tuple() << std::make_tuple(std::make_shared(), std::string("id")) + << std::make_tuple(std::make_shared(), + std::string("name")); + *desc_tbl = builder.build(); + return (*desc_tbl)->get_tuple_descriptor(0)->slots(); +} + +std::shared_ptr make_batch(const std::vector& names) { + arrow::Int32Builder id_builder; + EXPECT_TRUE(id_builder.Append(10).ok()); + EXPECT_TRUE(id_builder.Append(20).ok()); + std::shared_ptr id_array; + EXPECT_TRUE(id_builder.Finish(&id_array).ok()); + + arrow::StringBuilder name_builder; + EXPECT_TRUE(name_builder.Append("alice").ok()); + EXPECT_TRUE(name_builder.Append("bob").ok()); + std::shared_ptr name_array; + EXPECT_TRUE(name_builder.Finish(&name_array).ok()); + + std::vector> fields; + std::vector> arrays; + for (const auto& name : names) { + if (name == "id") { + fields.push_back(arrow::field("id", arrow::int32())); + arrays.push_back(id_array); + } else if (name == "name") { + fields.push_back(arrow::field("name", arrow::utf8())); + arrays.push_back(name_array); + } else { + fields.push_back(arrow::field(name, arrow::int32())); + arrays.push_back(id_array); + } + } + return arrow::RecordBatch::Make(arrow::schema(std::move(fields)), 2, std::move(arrays)); +} + +std::unique_ptr create_reader( + RuntimeProfile* profile, const TFileRangeDesc& range, + const std::vector& slots, + std::vector> batches, std::shared_ptr close_count, + std::shared_ptr io_ctx = nullptr) { + auto system_properties = std::make_shared(); + auto file_description = std::make_unique(); + file_description->path = "/dummyPath"; + auto factory = [batches = std::move(batches), close_count]( + const TFileRangeDesc&, + std::unique_ptr* stream) mutable { + *stream = std::make_unique(std::move(batches), close_count); + return Status::OK(); + }; + return std::make_unique(system_properties, file_description, + std::move(io_ctx), profile, range, slots, + std::move(factory)); +} + +Block make_request_block(const std::vector& schema, + const std::vector& local_ids) { + Block block; + for (const auto local_id : local_ids) { + const auto it = std::find_if(schema.begin(), schema.end(), [&](const auto& column) { + return column.local_id == local_id; + }); + DORIS_CHECK(it != schema.end()); + block.insert({it->type->create_column(), it->type, it->name}); + } + return block; +} + +int32_t nullable_int_at(const IColumn& column, size_t row) { + const auto& nullable = assert_cast(column); + const auto& nested = assert_cast(nullable.get_nested_column()); + return nested.get_data()[row]; +} + +std::string nullable_string_at(const IColumn& column, size_t row) { + const auto& nullable = assert_cast(column); + const auto& nested = assert_cast(nullable.get_nested_column()); + return nested.get_data_at(row).to_string(); +} + +class NullableIntGreaterThanExpr final : public VExpr { +public: + NullableIntGreaterThanExpr(size_t block_position, int32_t value) + : VExpr(std::make_shared(), false), + _block_position(block_position), + _value(value) {} + + const std::string& expr_name() const override { return _name; } + + bool is_constant() const override { return false; } + + Status execute_column_impl(VExprContext*, const Block* block, const Selector* selector, + size_t count, ColumnPtr& result_column) const override { + DORIS_CHECK(block != nullptr); + const auto& nullable = + assert_cast(*block->get_by_position(_block_position).column); + const auto& data = assert_cast(nullable.get_nested_column()); + + auto result = ColumnUInt8::create(); + auto& result_data = result->get_data(); + result_data.resize(count); + for (size_t row = 0; row < count; ++row) { + const auto source_row = selector == nullptr ? row : (*selector)[row]; + result_data[row] = + !nullable.is_null_at(source_row) && data.get_element(source_row) > _value; + } + result_column = std::move(result); + return Status::OK(); + } + + Status clone_node(VExprSPtr* cloned_expr) const override { + DORIS_CHECK(cloned_expr != nullptr); + *cloned_expr = std::make_shared(_block_position, _value); + return Status::OK(); + } + +private: + size_t _block_position; + int32_t _value; + const std::string _name = "NullableIntGreaterThanExpr"; +}; + +VExprContextSPtr prepared_conjunct(RuntimeState* state, const VExprSPtr& expr) { + auto context = VExprContext::create_shared(expr); + auto status = context->prepare(state, RowDescriptor()); + EXPECT_TRUE(status.ok()) << status; + status = context->open(state); + EXPECT_TRUE(status.ok()) << status; + return context; +} + +} // namespace + +TEST(RemoteDorisV2ReaderTest, BuildsSchemaFromSlotsAndProjectsRequestedColumns) { + ObjectPool pool; + DescriptorTbl* desc_tbl = nullptr; + const auto slots = remote_slots(&pool, &desc_tbl); + RuntimeState state; + RuntimeProfile profile("remote_doris_v2_reader_test"); + auto close_count = std::make_shared(0); + auto reader = create_reader(&profile, remote_doris_range(), slots, {make_batch({"id", "name"})}, + close_count); + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + ASSERT_EQ(schema.size(), 2); + EXPECT_EQ(schema[0].name, "id"); + EXPECT_EQ(schema[0].local_id, 0); + EXPECT_EQ(schema[1].name, "name"); + EXPECT_EQ(schema[1].local_id, 1); + + auto request = std::make_shared(); + FileScanRequestBuilder builder(request.get()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(1)).ok()); + ASSERT_TRUE(reader->open(request).ok()); + + auto block = make_request_block(schema, {1}); + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + ASSERT_EQ(rows, 2); + EXPECT_FALSE(eof); + EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 0), "alice"); + EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 1), "bob"); + + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + EXPECT_EQ(rows, 0); + EXPECT_TRUE(eof); + ASSERT_TRUE(reader->close().ok()); + EXPECT_EQ(*close_count, 1); +} + +TEST(RemoteDorisV2ReaderTest, HandlesDifferentArrowColumnOrder) { + ObjectPool pool; + DescriptorTbl* desc_tbl = nullptr; + const auto slots = remote_slots(&pool, &desc_tbl); + RuntimeState state; + RuntimeProfile profile("remote_doris_v2_reader_reordered_test"); + auto close_count = std::make_shared(0); + auto reader = create_reader(&profile, remote_doris_range(), slots, {make_batch({"name", "id"})}, + close_count); + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + auto request = std::make_shared(); + FileScanRequestBuilder builder(request.get()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(1)).ok()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(0)).ok()); + ASSERT_TRUE(reader->open(request).ok()); + + auto block = make_request_block(schema, {1, 0}); + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + ASSERT_EQ(rows, 2); + EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 0), "alice"); + EXPECT_EQ(nullable_int_at(*block.get_by_position(1).column, 1), 20); +} + +TEST(RemoteDorisV2ReaderTest, AppliesConjunctsAndTracksPredicateFilteredRows) { + ObjectPool pool; + DescriptorTbl* desc_tbl = nullptr; + const auto slots = remote_slots(&pool, &desc_tbl); + RuntimeState state; + RuntimeProfile profile("remote_doris_v2_reader_filter_test"); + auto close_count = std::make_shared(0); + auto io_ctx = std::make_shared(); + auto reader = create_reader(&profile, remote_doris_range(), slots, {make_batch({"id", "name"})}, + close_count, io_ctx); + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + auto request = std::make_shared(); + FileScanRequestBuilder builder(request.get()); + ASSERT_TRUE(builder.add_predicate_column(LocalColumnId(0)).ok()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(1)).ok()); + request->conjuncts = { + prepared_conjunct(&state, std::make_shared(0, 10))}; + ASSERT_TRUE(reader->open(request).ok()); + + auto block = make_request_block(schema, {0, 1}); + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + ASSERT_EQ(rows, 1); + EXPECT_FALSE(eof); + EXPECT_EQ(nullable_int_at(*block.get_by_position(0).column, 0), 20); + EXPECT_EQ(nullable_string_at(*block.get_by_position(1).column, 0), "bob"); + EXPECT_EQ(io_ctx->predicate_filtered_rows, 1); +} + +TEST(RemoteDorisV2ReaderTest, RejectsUnknownReturnedColumnAndMissingRequestedColumn) { + ObjectPool pool; + DescriptorTbl* desc_tbl = nullptr; + const auto slots = remote_slots(&pool, &desc_tbl); + RuntimeState state; + RuntimeProfile profile("remote_doris_v2_reader_error_test"); + + { + auto close_count = std::make_shared(0); + auto reader = create_reader(&profile, remote_doris_range(), slots, + {make_batch({"unknown"})}, close_count); + ASSERT_TRUE(reader->init(&state).ok()); + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + auto request = std::make_shared(); + FileScanRequestBuilder builder(request.get()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(0)).ok()); + ASSERT_TRUE(reader->open(request).ok()); + auto block = make_request_block(schema, {0}); + size_t rows = 0; + bool eof = false; + EXPECT_FALSE(reader->get_block(&block, &rows, &eof).ok()); + } + + { + auto close_count = std::make_shared(0); + auto reader = create_reader(&profile, remote_doris_range(), slots, {make_batch({"id"})}, + close_count); + ASSERT_TRUE(reader->init(&state).ok()); + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + auto request = std::make_shared(); + FileScanRequestBuilder builder(request.get()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(1)).ok()); + ASSERT_TRUE(reader->open(request).ok()); + auto block = make_request_block(schema, {1}); + size_t rows = 0; + bool eof = false; + EXPECT_FALSE(reader->get_block(&block, &rows, &eof).ok()); + } +} + +TEST(RemoteDorisV2ReaderTest, RejectsInvalidRemoteDorisRange) { + ObjectPool pool; + DescriptorTbl* desc_tbl = nullptr; + const auto slots = remote_slots(&pool, &desc_tbl); + RuntimeState state; + RuntimeProfile profile("remote_doris_v2_reader_bad_range_test"); + auto range = remote_doris_range(); + range.table_format_params.__isset.remote_doris_params = false; + auto close_count = std::make_shared(0); + auto reader = create_reader(&profile, range, slots, {}, close_count); + EXPECT_FALSE(reader->init(&state).ok()); +} + +} // namespace doris::format::remote_doris From 77fa4e722cac288d52614f31c1b59573f4c1216d Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 29 Jun 2026 09:48:10 +0800 Subject: [PATCH 2/2] [fix](be) Build Remote Doris complex file schema children ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Remote Doris file scanner v2 exposed complex slot types as top-level file columns without structural children. TableColumnMapper validates complex file schemas before building mappings, so ARRAY/MAP/STRUCT columns failed with malformed schema errors such as expected one ARRAY child but actual children was zero. This change synthesizes semantic children from the Doris slot type for Remote Doris file schema entries, using element for ARRAY, key/value for MAP, and field names for STRUCT. It also adds unit coverage for array, map, and struct schema generation. ### Release note None ### Check List (For Author) - Test: Unit Test / Manual test - Added RemoteDorisV2ReaderTest.BuildsComplexSchemaChildrenFromSlots - Ran git diff --check for the modified files - Attempted ./run-be-ut.sh --run --filter='RemoteDorisV2ReaderTest.*' with PARALLEL=4, but the standard script could not run in the sandbox because submodule config writes were denied and the required elevated run approval timed out twice - Behavior changed: No - Does this need documentation: No --- .../format_v2/table/remote_doris_reader.cpp | 55 ++++++++++++++++++ .../table/remote_doris_reader_test.cpp | 57 +++++++++++++++++++ 2 files changed, 112 insertions(+) diff --git a/be/src/format_v2/table/remote_doris_reader.cpp b/be/src/format_v2/table/remote_doris_reader.cpp index 03fe669287101c..39580fd2561897 100644 --- a/be/src/format_v2/table/remote_doris_reader.cpp +++ b/be/src/format_v2/table/remote_doris_reader.cpp @@ -26,8 +26,13 @@ #include #include "common/cast_set.h" +#include "core/assert_cast.h" #include "core/block/block.h" #include "core/data_type/data_type.h" +#include "core/data_type/data_type_array.h" +#include "core/data_type/data_type_map.h" +#include "core/data_type/data_type_nullable.h" +#include "core/data_type/data_type_struct.h" #include "core/data_type_serde/data_type_serde.h" #include "format/arrow/arrow_utils.h" #include "format_v2/materialized_reader_util.h" @@ -106,6 +111,53 @@ Status create_flight_stream(const TFileRangeDesc& range, std::unique_ptr synthesize_remote_doris_children(const DataTypePtr& type) { + std::vector children; + DORIS_CHECK(type != nullptr); + const auto nested_type = remove_nullable(type); + switch (nested_type->get_primitive_type()) { + case TYPE_ARRAY: { + const auto* array_type = assert_cast(nested_type.get()); + children.push_back( + remote_doris_child_definition("element", array_type->get_nested_type(), 0)); + break; + } + case TYPE_MAP: { + const auto* map_type = assert_cast(nested_type.get()); + children.push_back(remote_doris_child_definition("key", map_type->get_key_type(), 0)); + children.push_back(remote_doris_child_definition("value", map_type->get_value_type(), 1)); + break; + } + case TYPE_STRUCT: { + const auto* struct_type = assert_cast(nested_type.get()); + children.reserve(struct_type->get_elements().size()); + for (size_t idx = 0; idx < struct_type->get_elements().size(); ++idx) { + children.push_back(remote_doris_child_definition(struct_type->get_element_name(idx), + struct_type->get_element(idx), + cast_set(idx))); + } + break; + } + default: + break; + } + return children; +} + +ColumnDefinition remote_doris_child_definition(const std::string& name, DataTypePtr type, + int32_t local_id) { + ColumnDefinition child; + child.identifier = Field::create_field(name); + child.local_id = local_id; + child.name = name; + child.type = std::move(type); + child.children = synthesize_remote_doris_children(child.type); + return child; +} + } // namespace RemoteDorisFileReader::RemoteDorisFileReader( @@ -145,6 +197,9 @@ Status RemoteDorisFileReader::get_schema(std::vector* file_sch .local_id = cast_set(idx), .name = slot->col_name(), .type = slot->type(), + // Remote Doris exposes table slots as file columns. Complex columns still need + // structural children so TableColumnMapper can validate and project them. + .children = synthesize_remote_doris_children(slot->type()), }); } return Status::OK(); diff --git a/be/test/format_v2/table/remote_doris_reader_test.cpp b/be/test/format_v2/table/remote_doris_reader_test.cpp index 8762f41db83b2f..c7f7beaaa984aa 100644 --- a/be/test/format_v2/table/remote_doris_reader_test.cpp +++ b/be/test/format_v2/table/remote_doris_reader_test.cpp @@ -33,9 +33,12 @@ #include "core/column/column_nullable.h" #include "core/column/column_string.h" #include "core/column/column_vector.h" +#include "core/data_type/data_type_array.h" +#include "core/data_type/data_type_map.h" #include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_number.h" #include "core/data_type/data_type_string.h" +#include "core/data_type/data_type_struct.h" #include "exprs/vexpr.h" #include "exprs/vexpr_context.h" #include "format_v2/file_reader.h" @@ -101,6 +104,23 @@ std::vector remote_slots(ObjectPool* pool, DescriptorTbl** desc return (*desc_tbl)->get_tuple_descriptor(0)->slots(); } +std::vector remote_complex_slots(ObjectPool* pool, DescriptorTbl** desc_tbl) { + const auto string_type = make_nullable(std::make_shared()); + const auto int_type = make_nullable(std::make_shared()); + const auto array_type = make_nullable(std::make_shared(string_type)); + const auto map_type = make_nullable(std::make_shared(string_type, int_type)); + const auto struct_type = make_nullable(std::make_shared( + DataTypes {int_type, make_nullable(std::make_shared()), string_type}, + Strings {"f1", "f2", "f3"})); + + DescriptorTblBuilder builder(pool); + builder.declare_tuple() << std::make_tuple(array_type, std::string("c_array_s")) + << std::make_tuple(map_type, std::string("c_map")) + << std::make_tuple(struct_type, std::string("c_struct")); + *desc_tbl = builder.build(); + return (*desc_tbl)->get_tuple_descriptor(0)->slots(); +} + std::shared_ptr make_batch(const std::vector& names) { arrow::Int32Builder id_builder; EXPECT_TRUE(id_builder.Append(10).ok()); @@ -268,6 +288,43 @@ TEST(RemoteDorisV2ReaderTest, BuildsSchemaFromSlotsAndProjectsRequestedColumns) EXPECT_EQ(*close_count, 1); } +TEST(RemoteDorisV2ReaderTest, BuildsComplexSchemaChildrenFromSlots) { + ObjectPool pool; + DescriptorTbl* desc_tbl = nullptr; + const auto slots = remote_complex_slots(&pool, &desc_tbl); + RuntimeState state; + RuntimeProfile profile("remote_doris_v2_reader_complex_schema_test"); + auto close_count = std::make_shared(0); + auto reader = create_reader(&profile, remote_doris_range(), slots, {}, close_count); + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + ASSERT_EQ(schema.size(), 3); + + ASSERT_EQ(schema[0].name, "c_array_s"); + ASSERT_EQ(schema[0].children.size(), 1); + EXPECT_EQ(schema[0].children[0].name, "element"); + EXPECT_EQ(schema[0].children[0].local_id, 0); + EXPECT_TRUE(schema[0].children[0].children.empty()); + + ASSERT_EQ(schema[1].name, "c_map"); + ASSERT_EQ(schema[1].children.size(), 2); + EXPECT_EQ(schema[1].children[0].name, "key"); + EXPECT_EQ(schema[1].children[0].local_id, 0); + EXPECT_EQ(schema[1].children[1].name, "value"); + EXPECT_EQ(schema[1].children[1].local_id, 1); + + ASSERT_EQ(schema[2].name, "c_struct"); + ASSERT_EQ(schema[2].children.size(), 3); + EXPECT_EQ(schema[2].children[0].name, "f1"); + EXPECT_EQ(schema[2].children[0].local_id, 0); + EXPECT_EQ(schema[2].children[1].name, "f2"); + EXPECT_EQ(schema[2].children[1].local_id, 1); + EXPECT_EQ(schema[2].children[2].name, "f3"); + EXPECT_EQ(schema[2].children[2].local_id, 2); +} + TEST(RemoteDorisV2ReaderTest, HandlesDifferentArrowColumnOrder) { ObjectPool pool; DescriptorTbl* desc_tbl = nullptr;