diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index a38b025e0f05..bdff9f27b2cf 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -272,6 +272,42 @@ void testDataWriterGenericWriteEngineRead(FileFormat fileFormat, DataGenerator d assertEquals(schema, convertToEngineRecords(genericRecords, schema), readRecords); } + /** Write with engine type T, read with engine type T */ + @ParameterizedTest + @FieldSource("FORMAT_AND_GENERATOR") + void testDataWriterEngineWriteEngineRead(FileFormat fileFormat, DataGenerator dataGenerator) + throws IOException { + Schema schema = dataGenerator.schema(); + FileWriterBuilder, Object> writerBuilder = + FormatModelRegistry.dataWriteBuilder(fileFormat, engineType(), encryptedFile); + + DataWriter writer = writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build(); + + List genericRecords = dataGenerator.generateRecords(); + List engineRecords = convertToEngineRecords(genericRecords, schema); + + try (writer) { + engineRecords.forEach(writer::write); + } + + DataFile dataFile = writer.toDataFile(); + + assertThat(dataFile).isNotNull(); + assertThat(dataFile.recordCount()).isEqualTo(engineRecords.size()); + assertThat(dataFile.format()).isEqualTo(fileFormat); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(schema) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertEquals(schema, engineRecords, readRecords); + } + /** Write with engine type T, read with Generic Record */ @ParameterizedTest @FieldSource("FORMAT_AND_GENERATOR") diff --git a/data/src/test/java/org/apache/iceberg/data/DataGenerators.java b/data/src/test/java/org/apache/iceberg/data/DataGenerators.java index 390c0949cb72..7c27ca6c55b8 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataGenerators.java +++ b/data/src/test/java/org/apache/iceberg/data/DataGenerators.java @@ -29,7 +29,8 @@ */ class DataGenerators { - static final DataGenerator[] ALL = new DataGenerator[] {new StructOfPrimitive()}; + static final DataGenerator[] ALL = + new DataGenerator[] {new StructOfPrimitive(), new StructWithDecimals()}; private DataGenerators() {} @@ -50,6 +51,20 @@ public Schema schema() { } } + static class StructWithDecimals implements DataGenerator { + private final Schema schema = + new Schema( + required(1, "row_id", Types.StringType.get()), + required(2, "dec_9_2", Types.DecimalType.of(9, 2)), + required(3, "dec_15_3", Types.DecimalType.of(15, 3)), + required(4, "dec_38_10", Types.DecimalType.of(38, 10))); + + @Override + public Schema schema() { + return schema; + } + } + static class DefaultSchema implements DataGenerator { private final Schema schema = new Schema( diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 3de64aa99865..9627258ea3a5 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -362,7 +362,6 @@ private static class BinaryDecimalReader public DecimalData read(DecimalData ignored) { Binary binary = column.nextBinary(); BigDecimal bigDecimal = new BigDecimal(new BigInteger(binary.getBytes()), scale); - // TODO: need a unit test to write-read-validate decimal via FlinkParquetWrite/Reader return DecimalData.fromBigDecimal(bigDecimal, precision, scale); } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 3de64aa99865..9627258ea3a5 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -362,7 +362,6 @@ private static class BinaryDecimalReader public DecimalData read(DecimalData ignored) { Binary binary = column.nextBinary(); BigDecimal bigDecimal = new BigDecimal(new BigInteger(binary.getBytes()), scale); - // TODO: need a unit test to write-read-validate decimal via FlinkParquetWrite/Reader return DecimalData.fromBigDecimal(bigDecimal, precision, scale); } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 0e6856daa67a..49d96d1325fd 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -379,7 +379,6 @@ private static class BinaryDecimalReader public DecimalData read(DecimalData ignored) { Binary binary = column.nextBinary(); BigDecimal bigDecimal = new BigDecimal(new BigInteger(binary.getBytes()), scale); - // TODO: need a unit test to write-read-validate decimal via FlinkParquetWrite/Reader return DecimalData.fromBigDecimal(bigDecimal, precision, scale); } }