From 52fb9fee6319f17c8528fa42e7f5d8442b2853da Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Wed, 27 May 2026 15:43:47 +0800 Subject: [PATCH] [refactor](be) Remove scanner eos flag (#63578) ### What changed Remove the extra `_alreay_eos` scanner state from the scan projection path. When `get_block()` reports eos while both `_padding_block` and the final `_origin_block` contain data, the projection path now merges them directly and returns `eos=true` in the same call. This can make only the final output block larger than the normal batch target, but each source block is already bounded by the lower scanner. ### Why `_alreay_eos` only existed to carry the final eos handoff across calls, and it also carried a typo in the member name. Merging the final padding/origin blocks at eos removes that extra state and keeps the block lifecycle simpler without changing query results. ### Validation - Formatted modified C++ files with `build-support/run_clang_format.py` using clang-format 16. - `git diff --check` - `ninja -C be/ut_build_ASAN -j 1 src/exec/CMakeFiles/Exec.dir/scan/scanner.cpp.o test/CMakeFiles/doris_be_test.dir/exec/scan/scanner_late_arrival_rf_test.cpp.o` - Attempted `./run-be-ut.sh --run -j 1 --filter=ScannerProjectionTest.merges_padding_block_when_limit_eos_without_extra_flag:ScannerLateArrivalRfTest.applied_rf_num_advances_after_late_arrival`; the full BE UT target did not complete locally because it started a broad rebuild on the shared host and process-resource pressure was observed (`fork: Resource temporarily unavailable`). ### Release note None --- be/src/exec/scan/scanner.cpp | 53 +++++++++---------- be/src/exec/scan/scanner.h | 1 - .../scan/scanner_late_arrival_rf_test.cpp | 52 +++++++++++++++++- 3 files changed, 75 insertions(+), 31 deletions(-) diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp index 4fc0d44561673e..9304fc6860a107 100644 --- a/be/src/exec/scan/scanner.cpp +++ b/be/src/exec/scan/scanner.cpp @@ -81,39 +81,36 @@ Status Scanner::get_block_after_projects(RuntimeState* state, Block* block, bool SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().vscanner_get_block); auto& row_descriptor = _local_state->_parent->row_descriptor(); if (_output_row_descriptor) { - if (_alreay_eos) { - *eos = true; - _padding_block.swap(_origin_block); - } else { - _origin_block.clear_column_data(row_descriptor.num_materialized_slots()); - const auto min_batch_size = std::max(state->batch_size() / 2, 1); - const auto block_max_bytes = state->preferred_block_size_bytes(); - while (_padding_block.rows() < min_batch_size && - _padding_block.bytes() < block_max_bytes && !*eos) { - RETURN_IF_ERROR(get_block(state, &_origin_block, eos)); - if (_origin_block.rows() >= min_batch_size) { - break; - } + _origin_block.clear_column_data(row_descriptor.num_materialized_slots()); + const auto min_batch_size = std::max(state->batch_size() / 2, 1); + const auto block_max_bytes = state->preferred_block_size_bytes(); + while (_padding_block.rows() < min_batch_size && _padding_block.bytes() < block_max_bytes && + !*eos) { + RETURN_IF_ERROR(get_block(state, &_origin_block, eos)); + if (*eos) { + // For the final block, merge any padding directly and return eos in this call. + // The merged tail can be larger than the target batch, but each source block is + // already bounded by the lower scanner. + RETURN_IF_ERROR(_merge_padding_block()); + _origin_block.clear_column_data(row_descriptor.num_materialized_slots()); + break; + } + if (_origin_block.rows() >= min_batch_size) { + break; + } - if (_origin_block.rows() + _padding_block.rows() <= state->batch_size() && - _origin_block.bytes() + _padding_block.bytes() <= block_max_bytes) { - RETURN_IF_ERROR(_merge_padding_block()); - _origin_block.clear_column_data(row_descriptor.num_materialized_slots()); - } else { - if (_origin_block.rows() < _padding_block.rows()) { - _padding_block.swap(_origin_block); - } - break; + if (_origin_block.rows() + _padding_block.rows() <= state->batch_size() && + _origin_block.bytes() + _padding_block.bytes() <= block_max_bytes) { + RETURN_IF_ERROR(_merge_padding_block()); + _origin_block.clear_column_data(row_descriptor.num_materialized_slots()); + } else { + if (_origin_block.rows() < _padding_block.rows()) { + _padding_block.swap(_origin_block); } + break; } } - // first output the origin block change eos = false, next time output padding block - // set the eos to true - if (*eos && !_padding_block.empty() && !_origin_block.empty()) { - _alreay_eos = true; - *eos = false; - } if (_origin_block.empty() && !_padding_block.empty()) { _padding_block.swap(_origin_block); } diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h index 4a8a7739ff89fa..0f5b7ffa850ac4 100644 --- a/be/src/exec/scan/scanner.h +++ b/be/src/exec/scan/scanner.h @@ -216,7 +216,6 @@ class Scanner { std::vector _intermediate_projections; Block _origin_block; Block _padding_block; - bool _alreay_eos = false; VExprContextSPtrs _common_expr_ctxs_push_down; diff --git a/be/test/exec/scan/scanner_late_arrival_rf_test.cpp b/be/test/exec/scan/scanner_late_arrival_rf_test.cpp index f1e21ebc4c34dd..0d31b69495187e 100644 --- a/be/test/exec/scan/scanner_late_arrival_rf_test.cpp +++ b/be/test/exec/scan/scanner_late_arrival_rf_test.cpp @@ -18,6 +18,9 @@ #include #include +#include + +#include "common/object_pool.h" #include "core/data_type/data_type_factory.hpp" #include "core/data_type/data_type_number.h" #include "exec/operator/mock_scan_operator.h" @@ -28,6 +31,10 @@ #include "exec/scan/scanner.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_descriptors.h" +#include "testutil/mock/mock_runtime_state.h" +#include "testutil/mock/mock_slot_ref.h" namespace doris { @@ -40,11 +47,22 @@ class TestScanner final : public Scanner { RuntimeProfile* profile) : Scanner(state, local_state, limit, profile) {} + void add_block(Block block) { _blocks.push_back(std::move(block)); } + protected: - Status _get_block_impl(RuntimeState* /*state*/, Block* /*block*/, bool* eof) override { - *eof = true; + Status _get_block_impl(RuntimeState* /*state*/, Block* block, bool* eof) override { + if (_blocks.empty()) { + *eof = true; + return Status::OK(); + } + *eof = false; + block->swap(_blocks.front()); + _blocks.pop_front(); return Status::OK(); } + +private: + std::list _blocks; }; class ScannerLateArrivalRfTest : public RuntimeFilterTest { @@ -117,4 +135,34 @@ TEST_F(ScannerLateArrivalRfTest, applied_rf_num_advances_after_late_arrival) { ASSERT_TRUE(scanner->_conjuncts.empty()); } +TEST(ScannerProjectionTest, merges_padding_block_when_limit_eos_without_extra_flag) { + ObjectPool pool; + auto data_type = std::make_shared(); + auto row_descriptor = MockRowDescriptor({data_type}, &pool); + + MockRuntimeState state; + state._batch_size = 6; + + auto op = std::make_shared(); + op->_row_descriptor = row_descriptor; + op->_output_row_descriptor = + std::make_unique(std::vector {data_type}, &pool); + op->_output_tuple_desc = op->_output_row_descriptor->tuple_descriptors()[0]; + + auto local_state = std::make_shared(&state, op.get()); + local_state->_projections = MockSlotRef::create_mock_contexts(0, data_type); + + RuntimeProfile profile("scanner"); + TestScanner scanner(&state, local_state.get(), 7, &profile); + ASSERT_TRUE(scanner.init(&state, {}).ok()); + scanner.add_block(ColumnHelper::create_block({0, 1})); + scanner.add_block(ColumnHelper::create_block({2, 3, 4, 5, 6})); + + Block first_output; + bool eos = false; + ASSERT_TRUE(scanner.get_block_after_projects(&state, &first_output, &eos).ok()); + EXPECT_TRUE(eos); + EXPECT_EQ(first_output.rows(), 7); +} + } // namespace doris