Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions be/src/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,17 +326,13 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_

// Process external table query task that select columns are all from path.
if (_read_table_columns.empty()) {
bool modify_row_ids = false;
RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof, &modify_row_ids));
RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof));

DCHECK(_table_format_reader);
RETURN_IF_ERROR(_table_format_reader->on_fill_partition_columns(
block, *read_rows, _lazy_read_ctx.partition_col_names));
RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
block, *read_rows, _lazy_read_ctx.missing_col_names));
if (_table_format_reader->has_synthesized_column_handlers()) {
RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
}
RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, *read_rows));
RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block, *read_rows));
Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts, block, block->columns());
Expand All @@ -357,8 +353,7 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
block, *read_rows, _lazy_read_ctx.missing_col_names));

if (_table_format_reader->has_synthesized_column_handlers() ||
_table_format_reader->has_generated_column_handlers()) {
if (_need_current_batch_row_positions()) {
RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
}
RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, *read_rows));
Expand Down Expand Up @@ -639,8 +634,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
block, pre_read_rows, _lazy_read_ctx.predicate_partition_col_names));
RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
block, pre_read_rows, _lazy_read_ctx.predicate_missing_col_names));
if (_table_format_reader->has_synthesized_column_handlers() ||
_table_format_reader->has_generated_column_handlers()) {
if (_need_current_batch_row_positions()) {
RETURN_IF_ERROR(_get_current_batch_row_id(pre_read_rows));
}
RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, pre_read_rows));
Expand Down Expand Up @@ -949,9 +943,13 @@ Status RowGroupReader::_get_block_column_pos(const Block& block, const std::stri
return Status::OK();
}

Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof,
bool* modify_row_ids) {
*modify_row_ids = false;
bool RowGroupReader::_need_current_batch_row_positions() const {
DCHECK(_table_format_reader);
return _table_format_reader->has_synthesized_column_handlers() ||
_table_format_reader->has_generated_column_handlers();
}

Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof) {
if (_position_delete_ctx.has_filter) {
int64_t start_row_id = _position_delete_ctx.current_row_id;
int64_t end_row_id = std::min(_position_delete_ctx.current_row_id + (int64_t)batch_size,
Expand All @@ -975,9 +973,7 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, b
_position_delete_ctx.current_row_id = end_row_id;
*batch_eof = _position_delete_ctx.current_row_id == _position_delete_ctx.last_row_id;

if (_table_format_reader->has_synthesized_column_handlers() ||
_table_format_reader->has_generated_column_handlers()) {
*modify_row_ids = true;
if (_need_current_batch_row_positions()) {
_current_batch_row_ids.clear();
_current_batch_row_ids.resize(*read_rows);
size_t idx = 0;
Expand All @@ -1000,9 +996,7 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, b
_remaining_rows = 0;
*batch_eof = true;
}
if (_table_format_reader->has_synthesized_column_handlers() ||
_table_format_reader->has_generated_column_handlers()) {
*modify_row_ids = true;
if (_need_current_batch_row_positions()) {
RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider {
}

private:
Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof,
bool* modify_row_ids);
Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof);

Status _read_column_data(Block* block, const std::vector<std::string>& columns,
size_t batch_size, size_t* read_rows, bool* batch_eof,
Expand All @@ -255,6 +254,7 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider {
const IColumn::Filter& filter);

bool _can_filter_by_dict(int slot_id, const tparquet::ColumnMetaData& column_metadata);
bool _need_current_batch_row_positions() const;
bool is_dictionary_encoded(const tparquet::ColumnMetaData& column_metadata);
Status _rewrite_dict_predicates();
Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id, bool is_nullable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,56 @@ suite("test_iceberg_v3_row_lineage_query_insert", "p0,external,iceberg,external_
assertEquals(expectedIds[2], combinedPredicate[1][0].toString().toInteger())
}

def assertRowLineageOnlyAggregatesReadable = { tableName, expectedRowCount ->
def rowIdsWithPhysicalColumn = sql("""
select _row_id, id
from ${tableName}
order by _row_id
""").collect { row -> row[0].toString().toLong() }
log.info("Checking row lineage only baseline for ${tableName}: rowIds=${rowIdsWithPhysicalColumn}")
assertEquals(expectedRowCount, rowIdsWithPhysicalColumn.size())
assertEquals(expectedRowCount, rowIdsWithPhysicalColumn.toSet().size())

def distinctRowIdsWithPhysicalColumn = sql("""
select distinct _row_id, id
from ${tableName}
order by _row_id
""").collect { row -> row[0].toString().toLong() }
log.info("""Checking distinct _row_id with physical column for ${tableName}: distinctRowIds=${distinctRowIdsWithPhysicalColumn}""")
assertEquals(rowIdsWithPhysicalColumn, distinctRowIdsWithPhysicalColumn)

def distinctRowIds = sql("""
select distinct _row_id
from ${tableName}
order by _row_id
""").collect { row -> row[0].toString().toLong() }
log.info("Checking distinct _row_id only for ${tableName}: distinctRowIds=${distinctRowIds}")
assertEquals(rowIdsWithPhysicalColumn, distinctRowIds)

def groupRows = sql("""
select _row_id, count(*)
from ${tableName}
group by _row_id
order by _row_id
""")
log.info("Checking group by _row_id only for ${tableName}: groupRows=${groupRows}")
assertEquals(expectedRowCount, groupRows.size())
assertEquals(rowIdsWithPhysicalColumn, groupRows.collect { row -> row[0].toString().toLong() })
groupRows.each { row ->
assertEquals(1, row[1].toString().toInteger())
}

def distinctAggRows = sql("""
select count(*), count(distinct _row_id), ndv(_row_id)
from ${tableName}
""")
log.info("Checking distinct aggregate on _row_id only for ${tableName}: result=${distinctAggRows}")
assertEquals(1, distinctAggRows.size())
assertEquals(expectedRowCount, distinctAggRows[0][0].toString().toInteger())
assertEquals(expectedRowCount, distinctAggRows[0][1].toString().toInteger())
assertEquals(expectedRowCount, distinctAggRows[0][2].toString().toInteger())
}

sql """drop catalog if exists ${catalogName}"""
sql """
create catalog if not exists ${catalogName} properties (
Expand Down Expand Up @@ -180,10 +230,11 @@ suite("test_iceberg_v3_row_lineage_query_insert", "p0,external,iceberg,external_
"""

sql """
insert into ${unpartitionedTable} values(1, 'Alice', 25);
insert into ${unpartitionedTable} values
(1, 'Alice', 25),
(2, 'Bob', 30),
(3, 'Charlie', 35)
"""
sql """ insert into ${unpartitionedTable} values(2, 'Bob', 30) """
sql """ insert into ${unpartitionedTable} values(3, 'Charlie', 35) """

log.info("Inserted initial rows into ${unpartitionedTable}")

Expand All @@ -193,6 +244,9 @@ suite("test_iceberg_v3_row_lineage_query_insert", "p0,external,iceberg,external_
// 3. Explicit SELECT on row lineage columns returns non-null values.
assertRowLineageHiddenColumns(unpartitionedTable, 3)
assertExplicitRowLineageReadable(unpartitionedTable, [1, 2, 3])
if (format == "parquet") {
assertRowLineageOnlyAggregatesReadable(unpartitionedTable, 3)
}

test {
sql """insert into ${unpartitionedTable}(_row_id, id, name, age) values (1, 9, 'BadRow', 99)"""
Expand All @@ -216,6 +270,9 @@ suite("test_iceberg_v3_row_lineage_query_insert", "p0,external,iceberg,external_
unpartitionedTable,
format,
"Unpartitioned normal INSERT")
if (format == "parquet") {
assertRowLineageOnlyAggregatesReadable(unpartitionedTable, 4)
}

sql """drop table if exists ${partitionedTable}"""
sql """
Expand Down
Loading