Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 25 additions & 28 deletions be/src/exec/scan/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,39 +87,36 @@ Status Scanner::get_block_after_projects(RuntimeState* state, Block* block, bool
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().vscanner_get_block);
auto& row_descriptor = _local_state->_parent->row_descriptor();
if (_output_row_descriptor) {
if (_alreay_eos) {
*eos = true;
_padding_block.swap(_origin_block);
} else {
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
const auto min_batch_size = std::max(state->batch_size() / 2, 1);
const auto block_max_bytes = state->preferred_block_size_bytes();
while (_padding_block.rows() < min_batch_size &&
_padding_block.bytes() < block_max_bytes && !*eos) {
RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
if (_origin_block.rows() >= min_batch_size) {
break;
}
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
const auto min_batch_size = std::max(state->batch_size() / 2, 1);
const auto block_max_bytes = state->preferred_block_size_bytes();
while (_padding_block.rows() < min_batch_size && _padding_block.bytes() < block_max_bytes &&
!*eos) {
RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
if (*eos) {
// For the final block, merge any padding directly and return eos in this call.
// The merged tail can be larger than the target batch, but each source block is
// already bounded by the lower scanner.
RETURN_IF_ERROR(_merge_padding_block());
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
break;
}
if (_origin_block.rows() >= min_batch_size) {
break;
}

if (_origin_block.rows() + _padding_block.rows() <= state->batch_size() &&
_origin_block.bytes() + _padding_block.bytes() <= block_max_bytes) {
RETURN_IF_ERROR(_merge_padding_block());
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
} else {
if (_origin_block.rows() < _padding_block.rows()) {
_padding_block.swap(_origin_block);
}
break;
if (_origin_block.rows() + _padding_block.rows() <= state->batch_size() &&
_origin_block.bytes() + _padding_block.bytes() <= block_max_bytes) {
RETURN_IF_ERROR(_merge_padding_block());
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
} else {
if (_origin_block.rows() < _padding_block.rows()) {
_padding_block.swap(_origin_block);
}
break;
}
}

// first output the origin block change eos = false, next time output padding block
// set the eos to true
if (*eos && !_padding_block.empty() && !_origin_block.empty()) {
_alreay_eos = true;
*eos = false;
}
if (_origin_block.empty() && !_padding_block.empty()) {
_padding_block.swap(_origin_block);
}
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/scan/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ class Scanner {
std::vector<VExprContextSPtrs> _intermediate_projections;
Block _origin_block;
Block _padding_block;
bool _alreay_eos = false;

VExprContextSPtrs _common_expr_ctxs_push_down;

Expand Down
52 changes: 50 additions & 2 deletions be/test/exec/scan/scanner_late_arrival_rf_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include <glog/logging.h>
#include <gtest/gtest.h>

#include <list>

#include "common/object_pool.h"
#include "core/data_type/data_type_factory.hpp"
#include "core/data_type/data_type_number.h"
#include "exec/operator/mock_scan_operator.h"
Expand All @@ -28,6 +31,10 @@
#include "exec/scan/scanner.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "testutil/column_helper.h"
#include "testutil/mock/mock_descriptors.h"
#include "testutil/mock/mock_runtime_state.h"
#include "testutil/mock/mock_slot_ref.h"

namespace doris {

Expand All @@ -40,11 +47,22 @@ class TestScanner final : public Scanner {
RuntimeProfile* profile)
: Scanner(state, local_state, limit, profile) {}

void add_block(Block block) { _blocks.push_back(std::move(block)); }

protected:
Status _get_block_impl(RuntimeState* /*state*/, Block* /*block*/, bool* eof) override {
*eof = true;
Status _get_block_impl(RuntimeState* /*state*/, Block* block, bool* eof) override {
if (_blocks.empty()) {
*eof = true;
return Status::OK();
}
*eof = false;
block->swap(_blocks.front());
_blocks.pop_front();
return Status::OK();
}

private:
std::list<Block> _blocks;
};

class ScannerLateArrivalRfTest : public RuntimeFilterTest {
Expand Down Expand Up @@ -117,4 +135,34 @@ TEST_F(ScannerLateArrivalRfTest, applied_rf_num_advances_after_late_arrival) {
ASSERT_TRUE(scanner->_conjuncts.empty());
}

TEST(ScannerProjectionTest, merges_padding_block_when_limit_eos_without_extra_flag) {
ObjectPool pool;
auto data_type = std::make_shared<DataTypeInt32>();
auto row_descriptor = MockRowDescriptor({data_type}, &pool);

MockRuntimeState state;
state._batch_size = 6;

auto op = std::make_shared<MockScanOperatorX>();
op->_row_descriptor = row_descriptor;
op->_output_row_descriptor =
std::make_unique<MockRowDescriptor>(std::vector<DataTypePtr> {data_type}, &pool);
op->_output_tuple_desc = op->_output_row_descriptor->tuple_descriptors()[0];

auto local_state = std::make_shared<MockScanLocalState>(&state, op.get());
local_state->_projections = MockSlotRef::create_mock_contexts(0, data_type);

RuntimeProfile profile("scanner");
TestScanner scanner(&state, local_state.get(), 7, &profile);
ASSERT_TRUE(scanner.init(&state, {}).ok());
scanner.add_block(ColumnHelper::create_block<DataTypeInt32>({0, 1}));
scanner.add_block(ColumnHelper::create_block<DataTypeInt32>({2, 3, 4, 5, 6}));

Block first_output;
bool eos = false;
ASSERT_TRUE(scanner.get_block_after_projects(&state, &first_output, &eos).ok());
EXPECT_TRUE(eos);
EXPECT_EQ(first_output.rows(), 7);
}

} // namespace doris
Loading