From 7c24abc3ea60d23f79fe8638c82b2c2f0edecdfa Mon Sep 17 00:00:00 2001 From: daidai Date: Thu, 28 May 2026 17:09:37 +0800 Subject: [PATCH] [fix](iceberg)fix iceberg v3 row lineage count distinct error result --- .../format/parquet/vparquet_group_reader.cpp | 30 ++++----- be/src/format/parquet/vparquet_group_reader.h | 4 +- ...iceberg_v3_row_lineage_query_insert.groovy | 63 ++++++++++++++++++- 3 files changed, 74 insertions(+), 23 deletions(-) diff --git a/be/src/format/parquet/vparquet_group_reader.cpp b/be/src/format/parquet/vparquet_group_reader.cpp index 016fc4eb9d72b7..030793e2ca0a48 100644 --- a/be/src/format/parquet/vparquet_group_reader.cpp +++ b/be/src/format/parquet/vparquet_group_reader.cpp @@ -326,17 +326,13 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ // Process external table query task that select columns are all from path. if (_read_table_columns.empty()) { - bool modify_row_ids = false; - RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof, &modify_row_ids)); + RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof)); DCHECK(_table_format_reader); RETURN_IF_ERROR(_table_format_reader->on_fill_partition_columns( block, *read_rows, _lazy_read_ctx.partition_col_names)); RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns( block, *read_rows, _lazy_read_ctx.missing_col_names)); - if (_table_format_reader->has_synthesized_column_handlers()) { - RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows)); - } RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, *read_rows)); RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block, *read_rows)); Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts, block, block->columns()); @@ -357,8 +353,7 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns( block, *read_rows, _lazy_read_ctx.missing_col_names)); - if (_table_format_reader->has_synthesized_column_handlers() || - _table_format_reader->has_generated_column_handlers()) { + if (_need_current_batch_row_positions()) { RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows)); } RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, *read_rows)); @@ -639,8 +634,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re block, pre_read_rows, _lazy_read_ctx.predicate_partition_col_names)); RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns( block, pre_read_rows, _lazy_read_ctx.predicate_missing_col_names)); - if (_table_format_reader->has_synthesized_column_handlers() || - _table_format_reader->has_generated_column_handlers()) { + if (_need_current_batch_row_positions()) { RETURN_IF_ERROR(_get_current_batch_row_id(pre_read_rows)); } RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, pre_read_rows)); @@ -949,9 +943,13 @@ Status RowGroupReader::_get_block_column_pos(const Block& block, const std::stri return Status::OK(); } -Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof, - bool* modify_row_ids) { - *modify_row_ids = false; +bool RowGroupReader::_need_current_batch_row_positions() const { + DCHECK(_table_format_reader); + return _table_format_reader->has_synthesized_column_handlers() || + _table_format_reader->has_generated_column_handlers(); +} + +Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof) { if (_position_delete_ctx.has_filter) { int64_t start_row_id = _position_delete_ctx.current_row_id; int64_t end_row_id = std::min(_position_delete_ctx.current_row_id + (int64_t)batch_size, @@ -975,9 +973,7 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, b _position_delete_ctx.current_row_id = end_row_id; *batch_eof = _position_delete_ctx.current_row_id == _position_delete_ctx.last_row_id; - if (_table_format_reader->has_synthesized_column_handlers() || - _table_format_reader->has_generated_column_handlers()) { - *modify_row_ids = true; + if (_need_current_batch_row_positions()) { _current_batch_row_ids.clear(); _current_batch_row_ids.resize(*read_rows); size_t idx = 0; @@ -1000,9 +996,7 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, b _remaining_rows = 0; *batch_eof = true; } - if (_table_format_reader->has_synthesized_column_handlers() || - _table_format_reader->has_generated_column_handlers()) { - *modify_row_ids = true; + if (_need_current_batch_row_positions()) { RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows)); } } diff --git a/be/src/format/parquet/vparquet_group_reader.h b/be/src/format/parquet/vparquet_group_reader.h index e9eb5370e02240..79b171ca064528 100644 --- a/be/src/format/parquet/vparquet_group_reader.h +++ b/be/src/format/parquet/vparquet_group_reader.h @@ -227,8 +227,7 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider { } private: - Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof, - bool* modify_row_ids); + Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof); Status _read_column_data(Block* block, const std::vector& columns, size_t batch_size, size_t* read_rows, bool* batch_eof, @@ -255,6 +254,7 @@ class RowGroupReader : public ProfileCollector, public RowPositionProvider { const IColumn::Filter& filter); bool _can_filter_by_dict(int slot_id, const tparquet::ColumnMetaData& column_metadata); + bool _need_current_batch_row_positions() const; bool is_dictionary_encoded(const tparquet::ColumnMetaData& column_metadata); Status _rewrite_dict_predicates(); Status _rewrite_dict_conjuncts(std::vector& dict_codes, int slot_id, bool is_nullable); diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy index 7276fadba76b2c..28e28bdd8870b8 100644 --- a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy @@ -140,6 +140,56 @@ suite("test_iceberg_v3_row_lineage_query_insert", "p0,external,iceberg,external_ assertEquals(expectedIds[2], combinedPredicate[1][0].toString().toInteger()) } + def assertRowLineageOnlyAggregatesReadable = { tableName, expectedRowCount -> + def rowIdsWithPhysicalColumn = sql(""" + select _row_id, id + from ${tableName} + order by _row_id + """).collect { row -> row[0].toString().toLong() } + log.info("Checking row lineage only baseline for ${tableName}: rowIds=${rowIdsWithPhysicalColumn}") + assertEquals(expectedRowCount, rowIdsWithPhysicalColumn.size()) + assertEquals(expectedRowCount, rowIdsWithPhysicalColumn.toSet().size()) + + def distinctRowIdsWithPhysicalColumn = sql(""" + select distinct _row_id, id + from ${tableName} + order by _row_id + """).collect { row -> row[0].toString().toLong() } + log.info("""Checking distinct _row_id with physical column for ${tableName}: distinctRowIds=${distinctRowIdsWithPhysicalColumn}""") + assertEquals(rowIdsWithPhysicalColumn, distinctRowIdsWithPhysicalColumn) + + def distinctRowIds = sql(""" + select distinct _row_id + from ${tableName} + order by _row_id + """).collect { row -> row[0].toString().toLong() } + log.info("Checking distinct _row_id only for ${tableName}: distinctRowIds=${distinctRowIds}") + assertEquals(rowIdsWithPhysicalColumn, distinctRowIds) + + def groupRows = sql(""" + select _row_id, count(*) + from ${tableName} + group by _row_id + order by _row_id + """) + log.info("Checking group by _row_id only for ${tableName}: groupRows=${groupRows}") + assertEquals(expectedRowCount, groupRows.size()) + assertEquals(rowIdsWithPhysicalColumn, groupRows.collect { row -> row[0].toString().toLong() }) + groupRows.each { row -> + assertEquals(1, row[1].toString().toInteger()) + } + + def distinctAggRows = sql(""" + select count(*), count(distinct _row_id), ndv(_row_id) + from ${tableName} + """) + log.info("Checking distinct aggregate on _row_id only for ${tableName}: result=${distinctAggRows}") + assertEquals(1, distinctAggRows.size()) + assertEquals(expectedRowCount, distinctAggRows[0][0].toString().toInteger()) + assertEquals(expectedRowCount, distinctAggRows[0][1].toString().toInteger()) + assertEquals(expectedRowCount, distinctAggRows[0][2].toString().toInteger()) + } + sql """drop catalog if exists ${catalogName}""" sql """ create catalog if not exists ${catalogName} properties ( @@ -180,10 +230,11 @@ suite("test_iceberg_v3_row_lineage_query_insert", "p0,external,iceberg,external_ """ sql """ - insert into ${unpartitionedTable} values(1, 'Alice', 25); + insert into ${unpartitionedTable} values + (1, 'Alice', 25), + (2, 'Bob', 30), + (3, 'Charlie', 35) """ - sql """ insert into ${unpartitionedTable} values(2, 'Bob', 30) """ - sql """ insert into ${unpartitionedTable} values(3, 'Charlie', 35) """ log.info("Inserted initial rows into ${unpartitionedTable}") @@ -193,6 +244,9 @@ suite("test_iceberg_v3_row_lineage_query_insert", "p0,external,iceberg,external_ // 3. Explicit SELECT on row lineage columns returns non-null values. assertRowLineageHiddenColumns(unpartitionedTable, 3) assertExplicitRowLineageReadable(unpartitionedTable, [1, 2, 3]) + if (format == "parquet") { + assertRowLineageOnlyAggregatesReadable(unpartitionedTable, 3) + } test { sql """insert into ${unpartitionedTable}(_row_id, id, name, age) values (1, 9, 'BadRow', 99)""" @@ -216,6 +270,9 @@ suite("test_iceberg_v3_row_lineage_query_insert", "p0,external,iceberg,external_ unpartitionedTable, format, "Unpartitioned normal INSERT") + if (format == "parquet") { + assertRowLineageOnlyAggregatesReadable(unpartitionedTable, 4) + } sql """drop table if exists ${partitionedTable}""" sql """