Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion be/src/exec/operator/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,6 @@ class OperatorXBase : public OperatorBase {
_resource_profile(tnode.resource_profile),
_limit(tnode.limit) {
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}));
_output_row_descriptor =
std::make_unique<RowDescriptor>(descs, std::vector {tnode.output_tuple_id});
}
Expand Down
210 changes: 159 additions & 51 deletions be/src/exec/operator/table_function_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "core/block/block.h"
#include "core/block/column_numbers.h"
#include "core/column/column_nullable.h"
#include "core/column/column_struct.h"
#include "core/column/column_vector.h"
#include "core/custom_allocator.h"
#include "exec/operator/operator.h"
#include "exprs/table_function/table_function_factory.h"
Expand Down Expand Up @@ -264,30 +266,156 @@ Status TableFunctionLocalState::_get_expanded_block_block_fast_path(
const auto& offsets = *_block_fast_path_ctx.offsets_ptr;
const auto child_rows = cast_set<int64_t>(offsets.size());

std::vector<uint32_t> row_ids;
row_ids.reserve(remaining_capacity);
uint64_t first_nested_idx = 0;
uint64_t expected_next_nested_idx = 0;
bool found_nested_range = false;

int64_t child_row = _block_fast_path_row;
uint64_t in_row_offset = _block_fast_path_in_row_offset;
int produced_rows = 0;

while (produced_rows < remaining_capacity && child_row < child_rows) {
if (_block_fast_path_ctx.array_nullmap_data &&
_block_fast_path_ctx.array_nullmap_data[child_row]) {
// NULL array row: skip it here. Slow path will handle output semantics if needed.
child_row++;
in_row_offset = 0;
continue;
const bool is_outer = _fns[0]->is_outer();
const bool is_posexplode = _block_fast_path_ctx.generate_row_index;
auto& out_col = columns[p._child_slots.size()];

// Decompose posexplode struct output column if needed
ColumnStruct* struct_col_ptr = nullptr;
ColumnUInt8* outer_struct_nullmap_ptr = nullptr;
IColumn* value_col_ptr = nullptr;
ColumnInt32* pos_col_ptr = nullptr;
if (is_posexplode) {
if (out_col->is_nullable()) {
auto* nullable = assert_cast<ColumnNullable*>(out_col.get());
struct_col_ptr = assert_cast<ColumnStruct*>(nullable->get_nested_column_ptr().get());
outer_struct_nullmap_ptr =
assert_cast<ColumnUInt8*>(nullable->get_null_map_column_ptr().get());
} else {
struct_col_ptr = assert_cast<ColumnStruct*>(out_col.get());
}
pos_col_ptr = assert_cast<ColumnInt32*>(&struct_col_ptr->get_column(0));
value_col_ptr = &struct_col_ptr->get_column(1);
}
// Segment tracking: accumulate contiguous nested ranges, flush on boundaries.
// Array column offsets are monotonically non-decreasing, so nested data across child rows
// is always contiguous (even with NULL/empty rows that contribute zero elements).
struct ExpandSegmentContext {
std::vector<uint32_t>
seg_row_ids; // row ids of non table-function columns to replicate for this segment
std::vector<int32_t>
seg_positions; // for posexplode, the position values to write for this segment
int64_t seg_nested_start = -1; // start offset in the nested column of this segment
int seg_nested_count =
0; // number of nested rows in this segment (can be > child row count due to multiple elements per row)
};
ExpandSegmentContext segment_ctx;
segment_ctx.seg_row_ids.reserve(remaining_capacity);
if (is_posexplode) {
segment_ctx.seg_positions.reserve(remaining_capacity);
}

auto reset_expand_segment_ctx = [&segment_ctx, is_posexplode]() {
segment_ctx.seg_nested_start = -1;
segment_ctx.seg_nested_count = 0;
segment_ctx.seg_row_ids.clear();
if (is_posexplode) {
segment_ctx.seg_positions.clear();
}
};

// Flush accumulated contiguous segment to output columns
auto flush_segment = [&]() {
if (segment_ctx.seg_nested_count == 0) {
return;
}

// Non-TF columns: replicate each child row for every output element
for (auto index : p._output_slot_indexs) {
auto src_column = _child_block->get_by_position(index).column;
columns[index]->insert_indices_from(
*src_column, segment_ctx.seg_row_ids.data(),
segment_ctx.seg_row_ids.data() + segment_ctx.seg_row_ids.size());
}

if (is_posexplode) {
// Write positions
pos_col_ptr->insert_many_raw_data(
reinterpret_cast<const char*>(segment_ctx.seg_positions.data()),
segment_ctx.seg_positions.size());
// Write nested values to the struct's value sub-column
DCHECK(value_col_ptr->is_nullable())
<< "posexplode fast path requires nullable value column";
auto* val_nullable = assert_cast<ColumnNullable*>(value_col_ptr);
val_nullable->get_nested_column_ptr()->insert_range_from(
*_block_fast_path_ctx.nested_col, segment_ctx.seg_nested_start,
segment_ctx.seg_nested_count);
auto* val_nullmap =
assert_cast<ColumnUInt8*>(val_nullable->get_null_map_column_ptr().get());
auto& val_nullmap_data = val_nullmap->get_data();
const size_t old_size = val_nullmap_data.size();
val_nullmap_data.resize(old_size + segment_ctx.seg_nested_count);
if (_block_fast_path_ctx.nested_nullmap_data != nullptr) {
memcpy(val_nullmap_data.data() + old_size,
_block_fast_path_ctx.nested_nullmap_data + segment_ctx.seg_nested_start,
segment_ctx.seg_nested_count * sizeof(UInt8));
} else {
memset(val_nullmap_data.data() + old_size, 0,
segment_ctx.seg_nested_count * sizeof(UInt8));
}
// Struct-level null map: these rows are not null
if (outer_struct_nullmap_ptr) {
outer_struct_nullmap_ptr->insert_many_defaults(segment_ctx.seg_nested_count);
}
} else if (out_col->is_nullable()) {
auto* out_nullable = assert_cast<ColumnNullable*>(out_col.get());
out_nullable->get_nested_column_ptr()->insert_range_from(
*_block_fast_path_ctx.nested_col, segment_ctx.seg_nested_start,
segment_ctx.seg_nested_count);
auto* nullmap_column =
assert_cast<ColumnUInt8*>(out_nullable->get_null_map_column_ptr().get());
auto& nullmap_data = nullmap_column->get_data();
const size_t old_size = nullmap_data.size();
nullmap_data.resize(old_size + segment_ctx.seg_nested_count);
if (_block_fast_path_ctx.nested_nullmap_data != nullptr) {
memcpy(nullmap_data.data() + old_size,
_block_fast_path_ctx.nested_nullmap_data + segment_ctx.seg_nested_start,
segment_ctx.seg_nested_count * sizeof(UInt8));
} else {
memset(nullmap_data.data() + old_size, 0,
segment_ctx.seg_nested_count * sizeof(UInt8));
}
} else {
out_col->insert_range_from(*_block_fast_path_ctx.nested_col,
segment_ctx.seg_nested_start, segment_ctx.seg_nested_count);
}
reset_expand_segment_ctx();
};

// Emit one NULL output row for an outer-null/empty child row
auto emit_outer_null = [&](int64_t cr) {
for (auto index : p._output_slot_indexs) {
auto src_column = _child_block->get_by_position(index).column;
columns[index]->insert_from(*src_column, cr);
}
out_col->insert_default();
};
// Walk through child rows, accumulating contiguous segments into the output,
// then when hitting a null/empty row or reaching the end,
// flush the segment using bulk operations.
// For outer-null rows, insert a NULL and copy the non-table-function columns directly.
// This naturally handles both outer and non-outer modes since non-outer mode
// just won't produce any null outputs.
// For posexplode, generate position indices alongside this.
while (produced_rows < remaining_capacity && child_row < child_rows) {
const bool is_null_row = _block_fast_path_ctx.array_nullmap_data &&
_block_fast_path_ctx.array_nullmap_data[child_row];

const uint64_t prev_off = child_row == 0 ? 0 : offsets[child_row - 1];
const uint64_t cur_off = offsets[child_row];
const uint64_t cur_off = is_null_row ? prev_off : offsets[child_row];
const uint64_t nested_len = cur_off - prev_off;

if (in_row_offset >= nested_len) {
if (is_null_row || in_row_offset >= nested_len) {
// for outer functions, emit null row for NULL or empty array rows
if (is_outer && in_row_offset == 0 && (is_null_row || nested_len == 0)) {
flush_segment();
emit_outer_null(child_row);
produced_rows++;
}
child_row++;
in_row_offset = 0;
continue;
Expand All @@ -301,57 +429,37 @@ Status TableFunctionLocalState::_get_expanded_block_block_fast_path(
DCHECK_LE(nested_start + take_count, cur_off);
DCHECK_LE(nested_start + take_count, _block_fast_path_ctx.nested_col->size());

if (!found_nested_range) {
found_nested_range = true;
first_nested_idx = nested_start;
expected_next_nested_idx = nested_start;
if (segment_ctx.seg_nested_count == 0) {
segment_ctx.seg_nested_start = nested_start;
} else {
// Nested data from an array column is always contiguous: offsets are monotonically
// non-decreasing, so skipping NULL/empty rows doesn't create gaps.
DCHECK_EQ(static_cast<uint64_t>(segment_ctx.seg_nested_start +
segment_ctx.seg_nested_count),
nested_start)
<< "nested data must be contiguous across child rows";
}
DCHECK_EQ(nested_start, expected_next_nested_idx);

// Map each produced output row back to its source child row for copying non-table-function
// columns via insert_indices_from().
for (int j = 0; j < take_count; ++j) {
row_ids.push_back(cast_set<uint32_t>(child_row));
segment_ctx.seg_row_ids.push_back(cast_set<uint32_t>(child_row));
if (is_posexplode) {
segment_ctx.seg_positions.push_back(cast_set<int32_t>(in_row_offset + j));
}
}

segment_ctx.seg_nested_count += take_count;
produced_rows += take_count;
expected_next_nested_idx += take_count;
in_row_offset += take_count;
if (in_row_offset >= nested_len) {
child_row++;
in_row_offset = 0;
}
}

if (produced_rows > 0) {
for (auto index : p._output_slot_indexs) {
auto src_column = _child_block->get_by_position(index).column;
columns[index]->insert_indices_from(*src_column, row_ids.data(),
row_ids.data() + produced_rows);
}

auto& out_col = columns[p._child_slots.size()];
if (out_col->is_nullable()) {
auto* out_nullable = assert_cast<ColumnNullable*>(out_col.get());
out_nullable->get_nested_column_ptr()->insert_range_from(
*_block_fast_path_ctx.nested_col, first_nested_idx, produced_rows);
auto* nullmap_column =
assert_cast<ColumnUInt8*>(out_nullable->get_null_map_column_ptr().get());
auto& nullmap_data = nullmap_column->get_data();
const size_t old_size = nullmap_data.size();
nullmap_data.resize(old_size + produced_rows);
if (_block_fast_path_ctx.nested_nullmap_data != nullptr) {
memcpy(nullmap_data.data() + old_size,
_block_fast_path_ctx.nested_nullmap_data + first_nested_idx,
produced_rows * sizeof(UInt8));
} else {
memset(nullmap_data.data() + old_size, 0, produced_rows * sizeof(UInt8));
}
} else {
out_col->insert_range_from(*_block_fast_path_ctx.nested_col, first_nested_idx,
produced_rows);
}
}
// Flush any remaining segment
flush_segment();

_block_fast_path_row = child_row;
_block_fast_path_in_row_offset = in_row_offset;
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/table_function/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class TableFunction {
const IColumn::Offsets64* offsets_ptr = nullptr;
ColumnPtr nested_col = nullptr;
const UInt8* nested_nullmap_data = nullptr;
bool generate_row_index = false;
};

virtual Status prepare() { return Status::OK(); }
Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/table_function/vexplode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Status VExplodeTableFunction::process_init(Block* block, RuntimeState* state) {
}

bool VExplodeTableFunction::support_block_fast_path() const {
return !_is_outer;
return true;
}

Status VExplodeTableFunction::prepare_block_fast_path(Block* /*block*/, RuntimeState* /*state*/,
Expand Down
3 changes: 2 additions & 1 deletion be/src/exprs/table_function/vexplode_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Status VExplodeV2TableFunction::process_init(Block* block, RuntimeState* state)
}

bool VExplodeV2TableFunction::support_block_fast_path() const {
return !_is_outer && !_generate_row_index && _multi_detail.size() == 1;
return _multi_detail.size() == 1;
}

Status VExplodeV2TableFunction::prepare_block_fast_path(Block* /*block*/, RuntimeState* /*state*/,
Expand All @@ -123,6 +123,7 @@ Status VExplodeV2TableFunction::prepare_block_fast_path(Block* /*block*/, Runtim
ctx->offsets_ptr = detail.offsets_ptr;
ctx->nested_col = detail.nested_col;
ctx->nested_nullmap_data = detail.nested_nullmap_data;
ctx->generate_row_index = _generate_row_index;
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/test/exec/operator/analytic_sink_operator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ struct AnalyticSinkOperatorTest : public ::testing::Test {
sink = std::make_unique<AnalyticSinkOperatorX>(&pool);
source = std::make_unique<AnalyticSourceOperatorX>();
state = std::make_shared<MockRuntimeState>();
state->batsh_size = batch_size;
state->_batch_size = batch_size;
std::cout << "AnalyticSinkOperatorTest::SetUp() batch_size: " << batch_size << std::endl;
_child_op = std::make_unique<MockAnalyticSinkOperator>();
for (int i = 0; i < batch_size; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct DistinctStreamingAggOperatorTest : public ::testing::Test {
op = std::make_unique<DistinctStreamingAggOperatorX>();
mock_op = std::make_shared<MockOperatorX>();
state = std::make_shared<MockRuntimeState>();
state->batsh_size = 10;
state->_batch_size = 10;
op->_child = mock_op;
}

Expand Down
2 changes: 1 addition & 1 deletion be/test/exec/operator/exchange_source_operator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ struct MockExchangeSourceLocalState : public ExchangeLocalState {
struct ExchangeSourceOperatorXTest : public ::testing::Test {
void SetUp() override {
state = std::make_shared<MockRuntimeState>();
state->batsh_size = 10;
state->_batch_size = 10;
}

void create_op(int num_senders, bool is_merging, int offset, int limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PartitionSortOperatorMockOperator : public OperatorXBase {
struct PartitionSortOperatorTest : public ::testing::Test {
void SetUp() override {
state = std::make_shared<MockRuntimeState>();
state->batsh_size = 10;
state->_batch_size = 10;
_child_op = std::make_unique<PartitionSortOperatorMockOperator>();
}

Expand Down
2 changes: 1 addition & 1 deletion be/test/exec/operator/query_cache_operator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class QueryCacheMockChildOperator : public OperatorXBase {
struct QueryCacheOperatorTest : public ::testing::Test {
void SetUp() override {
state = std::make_shared<MockRuntimeState>();
state->batsh_size = 10;
state->_batch_size = 10;
child_op = std::make_unique<QueryCacheMockChildOperator>();
query_cache_uptr.reset(QueryCache::create_global_cache(1024 * 1024 * 1024));
query_cache = query_cache_uptr.get();
Expand Down
2 changes: 1 addition & 1 deletion be/test/exec/operator/repeat_operator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct RepeatOperatorTest : public ::testing::Test {
op = std::make_unique<RepeatOperatorX>();
mock_op = std::make_shared<MockOperatorX>();
state = std::make_shared<MockRuntimeState>();
state->batsh_size = 10;
state->_batch_size = 10;
op->_child = mock_op;
}

Expand Down
4 changes: 2 additions & 2 deletions be/test/exec/operator/set_operator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ template <bool is_intersect>
struct SetOperatorTest : public ::testing::Test {
void SetUp() override {
state = std::make_shared<MockRuntimeState>();
state->batsh_size = 5;
state->_batch_size = 5;
}

void init_op(int child_size, DataTypes output_type) {
Expand Down Expand Up @@ -352,7 +352,7 @@ TEST_F(ExceptOperatorTest, test_build_not_ignore_null) {
TEST_F(ExceptOperatorTest, test_output_null_batsh_size) {
init_op(2, {std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())});

state->batsh_size = 3; // set batch size to 3
state->_batch_size = 3; // set batch size to 3
sink_op->_child_exprs = MockSlotRef::create_mock_contexts(
DataTypes {std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())});
probe_sink_ops[0]->_child_exprs = MockSlotRef::create_mock_contexts(
Expand Down
2 changes: 1 addition & 1 deletion be/test/exec/operator/sort_operator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class MockOperator : public OperatorXBase {
struct SortOperatorTest : public ::testing::Test {
void SetUp() override {
state = std::make_shared<MockRuntimeState>();
state->batsh_size = 10;
state->_batch_size = 10;
_child_op = std::make_unique<MockOperator>();
}

Expand Down
Loading
Loading