From e75ff842d29c8c0767c9e6ca0bc98f734dfe97a0 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Fri, 8 May 2026 19:56:50 +0200 Subject: [PATCH] ORC: Backport add _row_id and _last_updated_sequence_number raeder in Orc to support lineage --- .../iceberg/flink/data/FlinkOrcReader.java | 2 +- .../iceberg/flink/data/FlinkOrcReaders.java | 15 ++++-- .../maintenance/api/TestRewriteDataFiles.java | 51 +++++++++++-------- .../operator/OperatorTestBase.java | 35 ++++++++++--- .../iceberg/flink/data/FlinkOrcReader.java | 2 +- .../iceberg/flink/data/FlinkOrcReaders.java | 15 ++++-- .../maintenance/api/TestRewriteDataFiles.java | 51 +++++++++++-------- .../operator/OperatorTestBase.java | 35 ++++++++++--- 8 files changed, 144 insertions(+), 62 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java index 3e3a29112cf4..77f16bfdb2ab 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java @@ -70,7 +70,7 @@ public OrcValueReader record( TypeDescription record, List names, List> fields) { - return FlinkOrcReaders.struct(fields, iStruct, idToConstant); + return FlinkOrcReaders.struct(record, fields, iStruct, idToConstant); } @Override diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java index 7a4a15c7e600..c5c958fbdb04 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java @@ -39,6 +39,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; @@ -91,8 +92,11 @@ public static OrcValueReader map( } public static OrcValueReader struct( - List> readers, Types.StructType struct, Map idToConstant) { - return new StructReader(readers, struct, idToConstant); + TypeDescription record, + List> readers, + Types.StructType struct, + Map idToConstant) { + return new StructReader(record, readers, struct, idToConstant); } private static class StringReader implements OrcValueReader { @@ -265,8 +269,11 @@ private static class StructReader extends OrcValueReaders.StructReader private final int numFields; StructReader( - List> readers, Types.StructType struct, Map idToConstant) { - super(readers, struct, idToConstant); + TypeDescription record, + List> readers, + Types.StructType struct, + Map idToConstant) { + super(record, readers, struct, idToConstant); this.numFields = struct.fields().size(); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java index 97b8b6786545..88b949a9a7f8 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.stream.StreamSupport; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; @@ -44,8 +45,14 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; class TestRewriteDataFiles extends MaintenanceTaskTestBase { + + private static final FileFormat[] FILE_FORMATS = + new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC}; + @Test void testRewriteUnpartitioned() throws Exception { Table table = createTable(); @@ -83,13 +90,14 @@ void testRewriteUnpartitioned() throws Exception { createRecord(4, "d"))); } - @Test - void testRewriteUnpartitionedPreserveLineage() throws Exception { - Table table = createTable(3); - insert(table, 1, "a"); - insert(table, 2, "b"); - insert(table, 3, "c"); - insert(table, 4, "d"); + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testRewriteUnpartitionedPreserveLineage(FileFormat fileFormat) throws Exception { + Table table = createTable(3, fileFormat); + insert(table, 1, "a", fileFormat); + insert(table, 2, "b", fileFormat); + insert(table, 3, "c", fileFormat); + insert(table, 4, "d", fileFormat); assertFileNum(table, 4, 0); @@ -123,15 +131,17 @@ void testRewriteUnpartitionedPreserveLineage() throws Exception { schema); } - @Test - void testRewriteTheSameFilePreserveLineage() throws Exception { - Table table = createTable(3); - insert(table, 1, "a"); - insert(table, 2, "b"); + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testRewriteTheSameFilePreserveLineage(FileFormat fileFormat) throws Exception { + Table table = createTable(3, fileFormat); + insert(table, 1, "a", fileFormat); + insert(table, 2, "b", fileFormat); // Create a file with two lines of data to verify that the rowid is read correctly. insert( table, - ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), SimpleDataUtil.createRecord(4, "d"))); + ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), SimpleDataUtil.createRecord(4, "d")), + fileFormat); assertFileNum(table, 3, 0); @@ -167,13 +177,14 @@ void testRewriteTheSameFilePreserveLineage() throws Exception { schema); } - @Test - void testRewritePartitionedPreserveLineage() throws Exception { - Table table = createPartitionedTable(3); - insertPartitioned(table, 1, "p1"); - insertPartitioned(table, 2, "p1"); - insertPartitioned(table, 3, "p2"); - insertPartitioned(table, 4, "p2"); + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testRewritePartitionedPreserveLineage(FileFormat fileFormat) throws Exception { + Table table = createPartitionedTable(3, fileFormat); + insertPartitioned(table, 1, "p1", fileFormat); + insertPartitioned(table, 2, "p1", fileFormat); + insertPartitioned(table, 3, "p2", fileFormat); + insertPartitioned(table, 4, "p2", fileFormat); assertFileNum(table, 4, 0); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index 93291e8cc29a..06ab7861c0f5 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -133,10 +133,14 @@ void after() throws IOException { } protected static Table createTable() { - return createTable(2); + return createTable(2, FileFormat.PARQUET); } protected static Table createTable(int formatVersion) { + return createPartitionedTable(formatVersion, FileFormat.PARQUET); + } + + protected static Table createTable(int formatVersion, FileFormat fileFormat) { return CATALOG_EXTENSION .catalog() .createTable( @@ -145,6 +149,8 @@ protected static Table createTable(int formatVersion) { PartitionSpec.unpartitioned(), null, ImmutableMap.of( + "write.format.default", + fileFormat.name(), TableProperties.FORMAT_VERSION, String.valueOf(formatVersion), "flink.max-continuous-empty-commits", @@ -182,7 +188,7 @@ protected static Table createTableWithDelete(int formatVersion) { "format-version", String.valueOf(formatVersion), "write.upsert.enabled", "true")); } - protected static Table createPartitionedTable(int formatVersion) { + protected static Table createPartitionedTable(int formatVersion, FileFormat fileFormat) { return CATALOG_EXTENSION .catalog() .createTable( @@ -191,6 +197,8 @@ protected static Table createPartitionedTable(int formatVersion) { PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(), null, ImmutableMap.of( + "write.format.default", + fileFormat.name(), "format-version", String.valueOf(formatVersion), "flink.max-continuous-empty-commits", @@ -198,17 +206,27 @@ protected static Table createPartitionedTable(int formatVersion) { } protected static Table createPartitionedTable() { - return createPartitionedTable(2); + return createPartitionedTable(2, FileFormat.PARQUET); } protected void insert(Table table, Integer id, String data) throws IOException { - new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + insert(table, id, data, FileFormat.PARQUET); + } + + protected void insert(Table table, Integer id, String data, FileFormat fileFormat) + throws IOException { + new GenericAppenderHelper(table, fileFormat, warehouseDir) .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, data))); table.refresh(); } protected void insert(Table table, List records) throws IOException { - new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir).appendToTable(records); + insert(table, records, FileFormat.PARQUET); + } + + protected void insert(Table table, List records, FileFormat fileFormat) + throws IOException { + new GenericAppenderHelper(table, fileFormat, warehouseDir).appendToTable(records); table.refresh(); } @@ -309,7 +327,12 @@ protected void update(Table table, Integer id, String oldData, String tempData, } protected void insertPartitioned(Table table, Integer id, String data) throws IOException { - new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + insertPartitioned(table, id, data, FileFormat.PARQUET); + } + + protected void insertPartitioned(Table table, Integer id, String data, FileFormat fileFormat) + throws IOException { + new GenericAppenderHelper(table, fileFormat, warehouseDir) .appendToTable( TestHelpers.Row.of(data), Lists.newArrayList(SimpleDataUtil.createRecord(id, data))); table.refresh(); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java index 3e3a29112cf4..77f16bfdb2ab 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java @@ -70,7 +70,7 @@ public OrcValueReader record( TypeDescription record, List names, List> fields) { - return FlinkOrcReaders.struct(fields, iStruct, idToConstant); + return FlinkOrcReaders.struct(record, fields, iStruct, idToConstant); } @Override diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java index 7a4a15c7e600..c5c958fbdb04 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java @@ -39,6 +39,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; @@ -91,8 +92,11 @@ public static OrcValueReader map( } public static OrcValueReader struct( - List> readers, Types.StructType struct, Map idToConstant) { - return new StructReader(readers, struct, idToConstant); + TypeDescription record, + List> readers, + Types.StructType struct, + Map idToConstant) { + return new StructReader(record, readers, struct, idToConstant); } private static class StringReader implements OrcValueReader { @@ -265,8 +269,11 @@ private static class StructReader extends OrcValueReaders.StructReader private final int numFields; StructReader( - List> readers, Types.StructType struct, Map idToConstant) { - super(readers, struct, idToConstant); + TypeDescription record, + List> readers, + Types.StructType struct, + Map idToConstant) { + super(record, readers, struct, idToConstant); this.numFields = struct.fields().size(); } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java index 97b8b6786545..88b949a9a7f8 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.stream.StreamSupport; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; @@ -44,8 +45,14 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; class TestRewriteDataFiles extends MaintenanceTaskTestBase { + + private static final FileFormat[] FILE_FORMATS = + new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC}; + @Test void testRewriteUnpartitioned() throws Exception { Table table = createTable(); @@ -83,13 +90,14 @@ void testRewriteUnpartitioned() throws Exception { createRecord(4, "d"))); } - @Test - void testRewriteUnpartitionedPreserveLineage() throws Exception { - Table table = createTable(3); - insert(table, 1, "a"); - insert(table, 2, "b"); - insert(table, 3, "c"); - insert(table, 4, "d"); + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testRewriteUnpartitionedPreserveLineage(FileFormat fileFormat) throws Exception { + Table table = createTable(3, fileFormat); + insert(table, 1, "a", fileFormat); + insert(table, 2, "b", fileFormat); + insert(table, 3, "c", fileFormat); + insert(table, 4, "d", fileFormat); assertFileNum(table, 4, 0); @@ -123,15 +131,17 @@ void testRewriteUnpartitionedPreserveLineage() throws Exception { schema); } - @Test - void testRewriteTheSameFilePreserveLineage() throws Exception { - Table table = createTable(3); - insert(table, 1, "a"); - insert(table, 2, "b"); + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testRewriteTheSameFilePreserveLineage(FileFormat fileFormat) throws Exception { + Table table = createTable(3, fileFormat); + insert(table, 1, "a", fileFormat); + insert(table, 2, "b", fileFormat); // Create a file with two lines of data to verify that the rowid is read correctly. insert( table, - ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), SimpleDataUtil.createRecord(4, "d"))); + ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), SimpleDataUtil.createRecord(4, "d")), + fileFormat); assertFileNum(table, 3, 0); @@ -167,13 +177,14 @@ void testRewriteTheSameFilePreserveLineage() throws Exception { schema); } - @Test - void testRewritePartitionedPreserveLineage() throws Exception { - Table table = createPartitionedTable(3); - insertPartitioned(table, 1, "p1"); - insertPartitioned(table, 2, "p1"); - insertPartitioned(table, 3, "p2"); - insertPartitioned(table, 4, "p2"); + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testRewritePartitionedPreserveLineage(FileFormat fileFormat) throws Exception { + Table table = createPartitionedTable(3, fileFormat); + insertPartitioned(table, 1, "p1", fileFormat); + insertPartitioned(table, 2, "p1", fileFormat); + insertPartitioned(table, 3, "p2", fileFormat); + insertPartitioned(table, 4, "p2", fileFormat); assertFileNum(table, 4, 0); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index 6dd6cda84f27..d6563e782e43 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -133,10 +133,14 @@ void after() throws IOException { } protected static Table createTable() { - return createTable(2); + return createTable(2, FileFormat.PARQUET); } protected static Table createTable(int formatVersion) { + return createPartitionedTable(formatVersion, FileFormat.PARQUET); + } + + protected static Table createTable(int formatVersion, FileFormat fileFormat) { return CATALOG_EXTENSION .catalog() .createTable( @@ -145,6 +149,8 @@ protected static Table createTable(int formatVersion) { PartitionSpec.unpartitioned(), null, ImmutableMap.of( + "write.format.default", + fileFormat.name(), TableProperties.FORMAT_VERSION, String.valueOf(formatVersion), "flink.max-continuous-empty-commits", @@ -182,7 +188,7 @@ protected static Table createTableWithDelete(int formatVersion) { "format-version", String.valueOf(formatVersion), "write.upsert.enabled", "true")); } - protected static Table createPartitionedTable(int formatVersion) { + protected static Table createPartitionedTable(int formatVersion, FileFormat fileFormat) { return CATALOG_EXTENSION .catalog() .createTable( @@ -191,6 +197,8 @@ protected static Table createPartitionedTable(int formatVersion) { PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(), null, ImmutableMap.of( + "write.format.default", + fileFormat.name(), "format-version", String.valueOf(formatVersion), "flink.max-continuous-empty-commits", @@ -198,17 +206,27 @@ protected static Table createPartitionedTable(int formatVersion) { } protected static Table createPartitionedTable() { - return createPartitionedTable(2); + return createPartitionedTable(2, FileFormat.PARQUET); } protected void insert(Table table, Integer id, String data) throws IOException { - new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + insert(table, id, data, FileFormat.PARQUET); + } + + protected void insert(Table table, Integer id, String data, FileFormat fileFormat) + throws IOException { + new GenericAppenderHelper(table, fileFormat, warehouseDir) .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, data))); table.refresh(); } protected void insert(Table table, List records) throws IOException { - new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir).appendToTable(records); + insert(table, records, FileFormat.PARQUET); + } + + protected void insert(Table table, List records, FileFormat fileFormat) + throws IOException { + new GenericAppenderHelper(table, fileFormat, warehouseDir).appendToTable(records); table.refresh(); } @@ -309,7 +327,12 @@ protected void update(Table table, Integer id, String oldData, String tempData, } protected void insertPartitioned(Table table, Integer id, String data) throws IOException { - new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + insertPartitioned(table, id, data, FileFormat.PARQUET); + } + + protected void insertPartitioned(Table table, Integer id, String data, FileFormat fileFormat) + throws IOException { + new GenericAppenderHelper(table, fileFormat, warehouseDir) .appendToTable( TestHelpers.Row.of(data), Lists.newArrayList(SimpleDataUtil.createRecord(id, data))); table.refresh();