From ae7ae6100cd91113d99cb2e40ea37c59f9ddd9b9 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Wed, 27 May 2026 11:13:26 +0800 Subject: [PATCH 1/3] Data: Add TCK for Writer builder in FileFormat API --- .../iceberg/data/BaseFormatModelTests.java | 308 +++++++++++++----- 1 file changed, 232 insertions(+), 76 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index 5373749406bc..83dbc425202b 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -69,6 +70,7 @@ import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; @@ -197,51 +199,9 @@ void after() { void testDataWriterEngineWriteGenericRead(FileFormat fileFormat, DataGenerator dataGenerator) throws IOException { Schema schema = dataGenerator.schema(); - FileWriterBuilder, Object> writerBuilder = - FormatModelRegistry.dataWriteBuilder(fileFormat, engineType(), encryptedFile); - - DataWriter writer = writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build(); - List genericRecords = dataGenerator.generateRecords(); List engineRecords = convertToEngineRecords(genericRecords, schema); - - try (writer) { - engineRecords.forEach(writer::write); - } - - DataFile dataFile = writer.toDataFile(); - - assertThat(dataFile).isNotNull(); - assertThat(dataFile.recordCount()).isEqualTo(engineRecords.size()); - assertThat(dataFile.format()).isEqualTo(fileFormat); - - readAndAssertGenericRecords(fileFormat, schema, genericRecords); - } - - /** Write with engine type T without explicit engineSchema, read with Generic Record */ - @ParameterizedTest - @FieldSource("FORMAT_AND_GENERATOR") - void testDataWriterEngineWriteWithoutEngineSchema( - FileFormat fileFormat, DataGenerator dataGenerator) throws IOException { - Schema schema = dataGenerator.schema(); - FileWriterBuilder, Object> writerBuilder = - FormatModelRegistry.dataWriteBuilder(fileFormat, engineType(), encryptedFile); - - DataWriter writer = writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build(); - - List genericRecords = dataGenerator.generateRecords(); - List engineRecords = convertToEngineRecords(genericRecords, schema); - - try (writer) { - engineRecords.forEach(writer::write); - } - - DataFile dataFile = writer.toDataFile(); - - assertThat(dataFile).isNotNull(); - assertThat(dataFile.recordCount()).isEqualTo(engineRecords.size()); - assertThat(dataFile.format()).isEqualTo(fileFormat); - + writeEngineRecords(fileFormat, schema, engineRecords); readAndAssertGenericRecords(fileFormat, schema, genericRecords); } @@ -274,34 +234,10 @@ void testDataWriterGenericWriteEngineRead(FileFormat fileFormat, DataGenerator d void testDataWriterEngineWriteEngineRead(FileFormat fileFormat, DataGenerator dataGenerator) throws IOException { Schema schema = dataGenerator.schema(); - FileWriterBuilder, Object> writerBuilder = - FormatModelRegistry.dataWriteBuilder(fileFormat, engineType(), encryptedFile); - - DataWriter writer = writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build(); - List genericRecords = dataGenerator.generateRecords(); List engineRecords = convertToEngineRecords(genericRecords, schema); - - try (writer) { - engineRecords.forEach(writer::write); - } - - DataFile dataFile = writer.toDataFile(); - - assertThat(dataFile).isNotNull(); - assertThat(dataFile.recordCount()).isEqualTo(engineRecords.size()); - assertThat(dataFile.format()).isEqualTo(fileFormat); - - InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); - List readRecords; - try (CloseableIterable reader = - FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) - .project(schema) - .build()) { - readRecords = ImmutableList.copyOf(reader); - } - - assertEquals(schema, engineRecords, readRecords); + writeEngineRecords(fileFormat, schema, engineRecords); + readAndAssertEngineRecords(fileFormat, schema, genericRecords, Function.identity()); } /** Write with engine type T, read with Generic Record */ @@ -1662,6 +1598,69 @@ void testReadFileWithoutFieldIdsUsingNameMapping(FileFormat fileFormat) throws I assertEquals(icebergSchema, convertToEngineRecords(genericRecords, icebergSchema), readRecords); } + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testDataWriterOverwrite(FileFormat fileFormat) throws IOException { + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + + List genericRecords = dataGenerator.generateRecords(); + List engineRecords = convertToEngineRecords(genericRecords, schema); + + writeEngineRecords(fileFormat, schema, engineRecords); + readAndAssertGenericRecords(fileFormat, schema, genericRecords); + + assertThatThrownBy(() -> writeEngineRecords(fileFormat, schema, engineRecords)) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageContaining("Already exists"); + + genericRecords = dataGenerator.generateRecords(); + writeEngineRecords(fileFormat, schema, convertToEngineRecords(genericRecords, schema), true); + readAndAssertGenericRecords(fileFormat, schema, genericRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testDataWriterSet(FileFormat fileFormat) throws IOException { + writeAndAssertDataWriterWithConfig( + fileFormat, + (writerBuilder, format) -> + writerBuilder.set(compressionProperty(format), compressionValue(format)), + format -> + assertThat(actualCompressionCodec(format)).isEqualTo(expectedCompressionCodec(format))); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testDataWriterSetAll(FileFormat fileFormat) throws IOException { + writeAndAssertDataWriterWithConfig( + fileFormat, + (writerBuilder, format) -> + writerBuilder.setAll(Map.of(compressionProperty(format), compressionValue(format))), + format -> + assertThat(actualCompressionCodec(format)).isEqualTo(expectedCompressionCodec(format))); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testDataWriterMeta(FileFormat fileFormat) throws IOException { + writeAndAssertDataWriterWithConfig( + fileFormat, + (writerBuilder, format) -> writerBuilder.meta("tck.meta.key", "tck-meta-value"), + format -> + assertThat(fileMetadataValue(format, "tck.meta.key")).isEqualTo("tck-meta-value")); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testDataWriterMetaMap(FileFormat fileFormat) throws IOException { + writeAndAssertDataWriterWithConfig( + fileFormat, + (writerBuilder, format) -> writerBuilder.meta(Map.of("tck.meta.key", "tck-meta-value")), + format -> + assertThat(fileMetadataValue(format, "tck.meta.key")).isEqualTo("tck-meta-value")); + } + private void readAndAssertGenericRecords( FileFormat fileFormat, Schema schema, List expected) throws IOException { InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); @@ -2205,13 +2204,7 @@ private void writeOrcWithoutFieldIds(Schema schema, List records) throws } InputFile inputFile = outputFile.toInputFile(); - OrcFile.ReaderOptions readerOptions = - OrcFile.readerOptions(conf) - .useUTCTimestamp(true) - .filesystem(OrcWritingTestUtils.inputFileSystem(inputFile)) - .maxLength(inputFile.getLength()); - - try (Reader reader = OrcFile.createReader(hadoopPath, readerOptions)) { + try (Reader reader = newOrcReader(inputFile, conf)) { assertThat(TestORCSchemaUtil.hasIds(reader.getSchema())).isFalse(); } } @@ -2257,4 +2250,167 @@ private void readAndAssertEngineRecords( assertEquals( readSchema, convertToEngineRecords(expectedGenericRecords, readSchema), readRecords); } + + private DataFile writeEngineRecords(FileFormat fileFormat, Schema schema, List records) + throws IOException { + return writeEngineRecords(fileFormat, schema, records, false); + } + + private DataFile writeEngineRecords( + FileFormat fileFormat, Schema schema, List records, boolean overwrite) throws IOException { + FileWriterBuilder, Object> writerBuilder = + FormatModelRegistry.dataWriteBuilder(fileFormat, engineType(), encryptedFile); + + writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()); + + if (overwrite) { + writerBuilder.overwrite(); + } + + DataWriter writer = writerBuilder.build(); + + try (writer) { + records.forEach(writer::write); + } + + DataFile dataFile = writer.toDataFile(); + assertThat(dataFile).isNotNull(); + assertThat(dataFile.recordCount()).isEqualTo(records.size()); + assertThat(dataFile.format()).isEqualTo(fileFormat); + + return dataFile; + } + + private static Reader newOrcReader(InputFile inputFile, Configuration conf) throws IOException { + Path hadoopPath = new Path(inputFile.location()); + OrcFile.ReaderOptions readerOptions = + OrcFile.readerOptions(conf) + .useUTCTimestamp(true) + .filesystem(OrcWritingTestUtils.inputFileSystem(inputFile)) + .maxLength(inputFile.getLength()); + + return OrcFile.createReader(hadoopPath, readerOptions); + } + + private static String compressionProperty(FileFormat fileFormat) { + return switch (fileFormat) { + case AVRO -> TableProperties.AVRO_COMPRESSION; + case PARQUET -> TableProperties.PARQUET_COMPRESSION; + case ORC -> TableProperties.ORC_COMPRESSION; + default -> + throw new UnsupportedOperationException( + "No compression property defined for format: " + fileFormat); + }; + } + + private static String compressionValue(FileFormat fileFormat) { + return switch (fileFormat) { + case AVRO, PARQUET -> "uncompressed"; + case ORC -> "none"; + default -> + throw new UnsupportedOperationException( + "No compression value defined for format: " + fileFormat); + }; + } + + private static String expectedCompressionCodec(FileFormat fileFormat) { + return switch (fileFormat) { + case AVRO -> "null"; + case PARQUET -> "UNCOMPRESSED"; + case ORC -> "NONE"; + default -> + throw new UnsupportedOperationException( + "No expected compression codec defined for format: " + fileFormat); + }; + } + + private String actualCompressionCodec(FileFormat fileFormat) throws IOException { + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + + return switch (fileFormat) { + case AVRO -> avroMetadataValue(inputFile, "avro.codec"); + case PARQUET -> parquetCompressionCodec(inputFile); + case ORC -> orcCompressionCodec(inputFile); + default -> throw new UnsupportedOperationException("Unsupported file format: " + fileFormat); + }; + } + + private static String orcCompressionCodec(InputFile inputFile) throws IOException { + try (Reader reader = newOrcReader(inputFile, new Configuration())) { + return reader.getCompressionKind().name(); + } + } + + private static String avroMetadataValue(InputFile inputFile, String key) throws IOException { + try (DataFileStream reader = + new DataFileStream<>(inputFile.newStream(), new GenericDatumReader<>())) { + return reader.getMetaString(key); + } + } + + private static String parquetCompressionCodec(InputFile inputFile) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(ParquetFileTestUtils.file(inputFile))) { + return reader.getFooter().getBlocks().get(0).getColumns().get(0).getCodec().name(); + } + } + + private String fileMetadataValue(FileFormat fileFormat, String key) throws IOException { + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + + return switch (fileFormat) { + case AVRO -> avroMetadataValue(inputFile, key); + case PARQUET -> parquetMetadataValue(inputFile, key); + case ORC -> orcMetadataValue(inputFile, key); + default -> throw new UnsupportedOperationException("Unsupported file format: " + fileFormat); + }; + } + + private static String parquetMetadataValue(InputFile inputFile, String key) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(ParquetFileTestUtils.file(inputFile))) { + return reader.getFooter().getFileMetaData().getKeyValueMetaData().get(key); + } + } + + private static String orcMetadataValue(InputFile inputFile, String key) throws IOException { + try (Reader reader = newOrcReader(inputFile, new Configuration())) { + ByteBuffer metadataValue = reader.getMetadataValue(key).duplicate(); + byte[] bytes = new byte[metadataValue.remaining()]; + metadataValue.get(bytes); + return new String(bytes, StandardCharsets.UTF_8); + } + } + + @FunctionalInterface + private interface DataWriterEffectAssertion { + void accept(FileFormat fileFormat) throws IOException; + } + + private void writeAndAssertDataWriterWithConfig( + FileFormat fileFormat, + BiConsumer, Object>, FileFormat> configureWriter, + DataWriterEffectAssertion assertWriterEffect) + throws IOException { + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + List engineRecords = convertToEngineRecords(genericRecords, schema); + + FileWriterBuilder, Object> writerBuilder = + FormatModelRegistry.dataWriteBuilder(fileFormat, engineType(), encryptedFile); + writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()); + configureWriter.accept(writerBuilder, fileFormat); + + DataWriter writer = writerBuilder.build(); + + try (writer) { + engineRecords.forEach(writer::write); + } + + DataFile dataFile = writer.toDataFile(); + assertThat(dataFile).isNotNull(); + assertThat(dataFile.recordCount()).isEqualTo(genericRecords.size()); + assertThat(dataFile.format()).isEqualTo(fileFormat); + assertWriterEffect.accept(fileFormat); + readAndAssertGenericRecords(fileFormat, schema, genericRecords); + } } From e381ca519c33e2a7f6ead62b423492c337305258 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Thu, 28 May 2026 11:32:46 +0800 Subject: [PATCH 2/3] revert testDataWriterEngineWriteWithoutEngineSchema && fix generateRecords the same data --- .../apache/iceberg/data/BaseFormatModelTests.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index 83dbc425202b..7aa24c863ec5 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -205,6 +205,18 @@ void testDataWriterEngineWriteGenericRead(FileFormat fileFormat, DataGenerator d readAndAssertGenericRecords(fileFormat, schema, genericRecords); } + /** Write with engine type T without explicit engineSchema, read with Generic Record */ + @ParameterizedTest + @FieldSource("FORMAT_AND_GENERATOR") + void testDataWriterEngineWriteWithoutEngineSchema( + FileFormat fileFormat, DataGenerator dataGenerator) throws IOException { + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + List engineRecords = convertToEngineRecords(genericRecords, schema); + writeEngineRecords(fileFormat, schema, engineRecords); + readAndAssertGenericRecords(fileFormat, schema, genericRecords); + } + /** Write with Generic Record, read with engine type T */ @ParameterizedTest @FieldSource("FORMAT_AND_GENERATOR") @@ -1614,7 +1626,7 @@ void testDataWriterOverwrite(FileFormat fileFormat) throws IOException { .isInstanceOf(AlreadyExistsException.class) .hasMessageContaining("Already exists"); - genericRecords = dataGenerator.generateRecords(); + genericRecords = dataGenerator.generateRecords(20); writeEngineRecords(fileFormat, schema, convertToEngineRecords(genericRecords, schema), true); readAndAssertGenericRecords(fileFormat, schema, genericRecords); } From eca95aeeee0df319c8cd696950f8c484c722103a Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Thu, 28 May 2026 13:26:36 +0800 Subject: [PATCH 3/3] Add new interface FileFormatTestSupport --- .../iceberg/data/BaseFormatModelTests.java | 265 ++---------------- .../iceberg/data/FileFormatTestSupport.java | 76 +++++ .../apache/iceberg/data/avro/AvroFormat.java | 122 ++++++++ .../apache/iceberg/data/orc/OrcFormat.java | 147 ++++++++++ .../iceberg/data/parquet/ParquetFormat.java | 109 +++++++ 5 files changed, 475 insertions(+), 244 deletions(-) create mode 100644 data/src/test/java/org/apache/iceberg/data/FileFormatTestSupport.java create mode 100644 data/src/test/java/org/apache/iceberg/data/avro/AvroFormat.java create mode 100644 data/src/test/java/org/apache/iceberg/data/orc/OrcFormat.java create mode 100644 data/src/test/java/org/apache/iceberg/data/parquet/ParquetFormat.java diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index 7aa24c863ec5..f82426736df0 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -21,6 +21,13 @@ import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH; import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; import static org.apache.iceberg.TestBase.SCHEMA; +import static org.apache.iceberg.data.FileFormatTestSupport.FEATURE_CASE_SENSITIVE; +import static org.apache.iceberg.data.FileFormatTestSupport.FEATURE_COLUMN_LEVEL_METRICS; +import static org.apache.iceberg.data.FileFormatTestSupport.FEATURE_COLUMN_METRICS_TRUNCATE_BINARY; +import static org.apache.iceberg.data.FileFormatTestSupport.FEATURE_FILTER; +import static org.apache.iceberg.data.FileFormatTestSupport.FEATURE_READER_DEFAULT; +import static org.apache.iceberg.data.FileFormatTestSupport.FEATURE_REUSE_CONTAINERS; +import static org.apache.iceberg.data.FileFormatTestSupport.FEATURE_SPLIT; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; @@ -28,9 +35,7 @@ import java.io.File; import java.io.IOException; -import java.io.OutputStream; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -40,14 +45,6 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.IntStream; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.io.DatumWriter; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -62,8 +59,6 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestTables; -import org.apache.iceberg.avro.AvroTestHelpers; -import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; @@ -83,15 +78,8 @@ import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.mapping.NameMapping; -import org.apache.iceberg.orc.ORCSchemaUtil; -import org.apache.iceberg.orc.OrcRowWriter; -import org.apache.iceberg.orc.OrcWritingTestUtils; -import org.apache.iceberg.orc.TestORCSchemaUtil; -import org.apache.iceberg.parquet.ParquetFileTestUtils; -import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -99,14 +87,6 @@ import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.apache.orc.OrcFile; -import org.apache.orc.Reader; -import org.apache.orc.TypeDescription; -import org.apache.orc.Writer; -import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; -import org.apache.parquet.avro.AvroParquetWriter; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.ParquetWriter; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.io.TempDir; @@ -132,8 +112,7 @@ protected boolean supportsBatchReads() { @TempDir private File tableDir; - private static final FileFormat[] FILE_FORMATS = - new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC}; + private static final FileFormat[] FILE_FORMATS = FileFormatTestSupport.formats(); private static final List FORMAT_AND_GENERATOR = Arrays.stream(FILE_FORMATS) @@ -143,29 +122,6 @@ protected boolean supportsBatchReads() { .map(generator -> Arguments.of(format, generator))) .toList(); - static final String FEATURE_FILTER = "filter"; - static final String FEATURE_CASE_SENSITIVE = "caseSensitive"; - static final String FEATURE_SPLIT = "split"; - static final String FEATURE_READER_DEFAULT = "readerDefault"; - static final String FEATURE_REUSE_CONTAINERS = "reuseContainers"; - static final String FEATURE_COLUMN_LEVEL_METRICS = "columnLevelMetrics"; - static final String FEATURE_COLUMN_METRICS_TRUNCATE_BINARY = "columnMetricsTruncateBinary"; - - private static final Map MISSING_FEATURES = - Map.of( - FileFormat.AVRO, - new String[] { - FEATURE_FILTER, - FEATURE_CASE_SENSITIVE, - FEATURE_SPLIT, - FEATURE_COLUMN_LEVEL_METRICS, - FEATURE_COLUMN_METRICS_TRUNCATE_BINARY - }, - FileFormat.ORC, - new String[] { - FEATURE_REUSE_CONTAINERS, FEATURE_COLUMN_METRICS_TRUNCATE_BINARY, FEATURE_READER_DEFAULT - }); - private InMemoryFileIO fileIO; private EncryptedOutputFile encryptedFile; @@ -1722,14 +1678,13 @@ private List convertToEngineRecords(List records, Schema schema) { } private static void assumeSupports(FileFormat fileFormat, String feature) { - assumeThat(MISSING_FEATURES.getOrDefault(fileFormat, new String[] {})).doesNotContain(feature); + assumeThat(supportsFeature(fileFormat, feature)).isTrue(); } /** * Returns whether the given file format supports the specified feature. * - *

The check is based on {@link #MISSING_FEATURES}. Features not listed as missing for a format - * are treated as supported. + *

The check is based on {@link FileFormatTestSupport#supportsFeature(String)}. * *

Prefer this method over {@link #assumeSupports(FileFormat, String)} when only part of a test * should be skipped conditionally. Unlike {@code assumeSupports}, this method does not abort the @@ -1741,8 +1696,7 @@ private static void assumeSupports(FileFormat fileFormat, String feature) { * @return {@code true} if the feature is supported by the format; {@code false} otherwise */ private static boolean supportsFeature(FileFormat fileFormat, String feature) { - String[] missing = MISSING_FEATURES.getOrDefault(fileFormat, new String[] {}); - return !Arrays.asList(missing).contains(feature); + return FileFormatTestSupport.forFormat(fileFormat).supportsFeature(feature); } private DataFile writeRecordsForSplit(FileFormat fileFormat, Schema schema, List records) @@ -1775,13 +1729,7 @@ private DataFile writeRecordsForSplit(FileFormat fileFormat, Schema schema, List } private static String splitSizeProperty(FileFormat fileFormat) { - return switch (fileFormat) { - case PARQUET -> TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; - case ORC -> TableProperties.ORC_STRIPE_SIZE_BYTES; - default -> - throw new UnsupportedOperationException( - "No split size property defined for format: " + fileFormat); - }; + return FileFormatTestSupport.forFormat(fileFormat).splitSizeProperty(); } private static void assertCounts( @@ -2123,102 +2071,8 @@ private static Record copy(Record source, Schema sourceSchema, Schema targetSche private void writeRecordsWithoutFieldIds( FileFormat fileFormat, Schema schema, List records) throws IOException { - switch (fileFormat) { - case PARQUET -> writeParquetWithoutFieldIds(schema, records); - case AVRO -> writeAvroWithoutFieldIds(schema, records); - case ORC -> writeOrcWithoutFieldIds(schema, records); - default -> throw new UnsupportedOperationException("Unsupported file format: " + fileFormat); - } - } - - private void writeAvroWithoutFieldIds(Schema schema, List records) throws IOException { - org.apache.avro.Schema avroSchemaWithoutIds = AvroTestHelpers.removeIds(schema); - - OutputFile outputFile = encryptedFile.encryptingOutputFile(); - DatumWriter datumWriter = new GenericDatumWriter<>(avroSchemaWithoutIds); - try (OutputStream out = outputFile.create(); - DataFileWriter writer = new DataFileWriter<>(datumWriter)) { - writer.create(avroSchemaWithoutIds, out); - for (Record record : records) { - GenericData.Record avroRecord = new GenericData.Record(avroSchemaWithoutIds); - for (Types.NestedField field : schema.columns()) { - avroRecord.put(field.name(), record.getField(field.name())); - } - - writer.append(avroRecord); - } - } - - try (DataFileStream reader = - new DataFileStream<>(outputFile.toInputFile().newStream(), new GenericDatumReader<>())) { - assertThat(AvroTestHelpers.hasIds(reader.getSchema())).isFalse(); - } - } - - private void writeParquetWithoutFieldIds(Schema schema, List records) throws IOException { - org.apache.avro.Schema avroSchemaWithoutIds = AvroTestHelpers.removeIds(schema); - - OutputFile outputFile = encryptedFile.encryptingOutputFile(); - - try (ParquetWriter writer = - AvroParquetWriter.builder(ParquetFileTestUtils.file(outputFile)) - .withDataModel(GenericData.get()) - .withSchema(avroSchemaWithoutIds) - .withConf(new Configuration()) - .build()) { - for (Record record : records) { - GenericData.Record avroRecord = new GenericData.Record(avroSchemaWithoutIds); - for (Types.NestedField field : schema.columns()) { - avroRecord.put(field.name(), record.getField(field.name())); - } - - writer.write(avroRecord); - } - } - - try (ParquetFileReader reader = - ParquetFileReader.open(ParquetFileTestUtils.file(outputFile.toInputFile()))) { - assertThat(ParquetSchemaUtil.hasIds(reader.getFooter().getFileMetaData().getSchema())) - .isFalse(); - } - } - - private void writeOrcWithoutFieldIds(Schema schema, List records) throws IOException { - TypeDescription typeWithIds = ORCSchemaUtil.convert(schema); - TypeDescription typeWithoutIds = TestORCSchemaUtil.removeIds(typeWithIds); - - OutputFile outputFile = encryptedFile.encryptingOutputFile(); - Path hadoopPath = new Path(outputFile.location()); - - Configuration conf = new Configuration(); - OrcFile.WriterOptions options = - OrcFile.writerOptions(conf) - .useUTCTimestamp(true) - .setSchema(typeWithoutIds) - .fileSystem(OrcWritingTestUtils.outputFileSystem(outputFile)); - - OrcRowWriter rowWriter = GenericOrcWriter.buildWriter(schema, typeWithIds); - - try (Writer orcWriter = OrcFile.createWriter(hadoopPath, options)) { - VectorizedRowBatch batch = typeWithoutIds.createRowBatch(); - for (Record record : records) { - rowWriter.write(record, batch); - if (batch.size == batch.getMaxSize()) { - orcWriter.addRowBatch(batch); - batch.reset(); - } - } - - if (batch.size > 0) { - orcWriter.addRowBatch(batch); - batch.reset(); - } - } - - InputFile inputFile = outputFile.toInputFile(); - try (Reader reader = newOrcReader(inputFile, conf)) { - assertThat(TestORCSchemaUtil.hasIds(reader.getSchema())).isFalse(); - } + FileFormatTestSupport.forFormat(fileFormat) + .writeRecordsWithoutFieldIds(encryptedFile.encryptingOutputFile(), schema, records); } private void runTypePromotionCheck( @@ -2293,103 +2147,26 @@ private DataFile writeEngineRecords( return dataFile; } - private static Reader newOrcReader(InputFile inputFile, Configuration conf) throws IOException { - Path hadoopPath = new Path(inputFile.location()); - OrcFile.ReaderOptions readerOptions = - OrcFile.readerOptions(conf) - .useUTCTimestamp(true) - .filesystem(OrcWritingTestUtils.inputFileSystem(inputFile)) - .maxLength(inputFile.getLength()); - - return OrcFile.createReader(hadoopPath, readerOptions); - } - private static String compressionProperty(FileFormat fileFormat) { - return switch (fileFormat) { - case AVRO -> TableProperties.AVRO_COMPRESSION; - case PARQUET -> TableProperties.PARQUET_COMPRESSION; - case ORC -> TableProperties.ORC_COMPRESSION; - default -> - throw new UnsupportedOperationException( - "No compression property defined for format: " + fileFormat); - }; + return FileFormatTestSupport.forFormat(fileFormat).compressionProperty(); } private static String compressionValue(FileFormat fileFormat) { - return switch (fileFormat) { - case AVRO, PARQUET -> "uncompressed"; - case ORC -> "none"; - default -> - throw new UnsupportedOperationException( - "No compression value defined for format: " + fileFormat); - }; + return FileFormatTestSupport.forFormat(fileFormat).compressionValue(); } private static String expectedCompressionCodec(FileFormat fileFormat) { - return switch (fileFormat) { - case AVRO -> "null"; - case PARQUET -> "UNCOMPRESSED"; - case ORC -> "NONE"; - default -> - throw new UnsupportedOperationException( - "No expected compression codec defined for format: " + fileFormat); - }; + return FileFormatTestSupport.forFormat(fileFormat).expectedCompressionCodec(); } private String actualCompressionCodec(FileFormat fileFormat) throws IOException { - InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); - - return switch (fileFormat) { - case AVRO -> avroMetadataValue(inputFile, "avro.codec"); - case PARQUET -> parquetCompressionCodec(inputFile); - case ORC -> orcCompressionCodec(inputFile); - default -> throw new UnsupportedOperationException("Unsupported file format: " + fileFormat); - }; - } - - private static String orcCompressionCodec(InputFile inputFile) throws IOException { - try (Reader reader = newOrcReader(inputFile, new Configuration())) { - return reader.getCompressionKind().name(); - } - } - - private static String avroMetadataValue(InputFile inputFile, String key) throws IOException { - try (DataFileStream reader = - new DataFileStream<>(inputFile.newStream(), new GenericDatumReader<>())) { - return reader.getMetaString(key); - } - } - - private static String parquetCompressionCodec(InputFile inputFile) throws IOException { - try (ParquetFileReader reader = ParquetFileReader.open(ParquetFileTestUtils.file(inputFile))) { - return reader.getFooter().getBlocks().get(0).getColumns().get(0).getCodec().name(); - } + return FileFormatTestSupport.forFormat(fileFormat) + .actualCompressionCodec(encryptedFile.encryptingOutputFile().toInputFile()); } private String fileMetadataValue(FileFormat fileFormat, String key) throws IOException { - InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); - - return switch (fileFormat) { - case AVRO -> avroMetadataValue(inputFile, key); - case PARQUET -> parquetMetadataValue(inputFile, key); - case ORC -> orcMetadataValue(inputFile, key); - default -> throw new UnsupportedOperationException("Unsupported file format: " + fileFormat); - }; - } - - private static String parquetMetadataValue(InputFile inputFile, String key) throws IOException { - try (ParquetFileReader reader = ParquetFileReader.open(ParquetFileTestUtils.file(inputFile))) { - return reader.getFooter().getFileMetaData().getKeyValueMetaData().get(key); - } - } - - private static String orcMetadataValue(InputFile inputFile, String key) throws IOException { - try (Reader reader = newOrcReader(inputFile, new Configuration())) { - ByteBuffer metadataValue = reader.getMetadataValue(key).duplicate(); - byte[] bytes = new byte[metadataValue.remaining()]; - metadataValue.get(bytes); - return new String(bytes, StandardCharsets.UTF_8); - } + return FileFormatTestSupport.forFormat(fileFormat) + .metadataValue(encryptedFile.encryptingOutputFile().toInputFile(), key); } @FunctionalInterface diff --git a/data/src/test/java/org/apache/iceberg/data/FileFormatTestSupport.java b/data/src/test/java/org/apache/iceberg/data/FileFormatTestSupport.java new file mode 100644 index 000000000000..78479fa7786d --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/FileFormatTestSupport.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.avro.AvroFormat; +import org.apache.iceberg.data.orc.OrcFormat; +import org.apache.iceberg.data.parquet.ParquetFormat; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; + +public interface FileFormatTestSupport { + + String FEATURE_FILTER = "filter"; + String FEATURE_CASE_SENSITIVE = "caseSensitive"; + String FEATURE_SPLIT = "split"; + String FEATURE_READER_DEFAULT = "readerDefault"; + String FEATURE_REUSE_CONTAINERS = "reuseContainers"; + String FEATURE_COLUMN_LEVEL_METRICS = "columnLevelMetrics"; + String FEATURE_COLUMN_METRICS_TRUNCATE_BINARY = "columnMetricsTruncateBinary"; + + FileFormatTestSupport[] ALL = + new FileFormatTestSupport[] {new AvroFormat(), new OrcFormat(), new ParquetFormat()}; + + static FileFormat[] formats() { + return Arrays.stream(ALL).map(FileFormatTestSupport::format).toArray(FileFormat[]::new); + } + + static FileFormatTestSupport forFormat(FileFormat format) { + return Arrays.stream(ALL) + .filter(testSupportFormat -> testSupportFormat.format() == format) + .findFirst() + .orElseThrow(() -> new UnsupportedOperationException("Unsupported file format: " + format)); + } + + FileFormat format(); + + default boolean supportsFeature(String feature) { + return true; + } + + void writeRecordsWithoutFieldIds(OutputFile outputFile, Schema schema, List records) + throws IOException; + + String compressionProperty(); + + String compressionValue(); + + String expectedCompressionCodec(); + + String actualCompressionCodec(InputFile inputFile) throws IOException; + + String metadataValue(InputFile inputFile, String key) throws IOException; + + String splitSizeProperty(); +} diff --git a/data/src/test/java/org/apache/iceberg/data/avro/AvroFormat.java b/data/src/test/java/org/apache/iceberg/data/avro/AvroFormat.java new file mode 100644 index 000000000000..baf3e389b0b1 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/avro/AvroFormat.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.avro; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.avro.AvroTestHelpers; +import org.apache.iceberg.data.FileFormatTestSupport; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.types.Types; + +public class AvroFormat implements FileFormatTestSupport { + @Override + public FileFormat format() { + return FileFormat.AVRO; + } + + @Override + public boolean supportsFeature(String feature) { + return switch (feature) { + case FEATURE_FILTER, + FEATURE_CASE_SENSITIVE, + FEATURE_SPLIT, + FEATURE_COLUMN_LEVEL_METRICS, + FEATURE_COLUMN_METRICS_TRUNCATE_BINARY -> + false; + default -> true; + }; + } + + @Override + public void writeRecordsWithoutFieldIds( + OutputFile outputFile, Schema schema, List records) throws IOException { + org.apache.avro.Schema avroSchemaWithoutIds = AvroTestHelpers.removeIds(schema); + DatumWriter datumWriter = new GenericDatumWriter<>(avroSchemaWithoutIds); + try (OutputStream out = outputFile.create(); + DataFileWriter writer = new DataFileWriter<>(datumWriter)) { + writer.create(avroSchemaWithoutIds, out); + for (Record record : records) { + GenericData.Record avroRecord = new GenericData.Record(avroSchemaWithoutIds); + for (Types.NestedField field : schema.columns()) { + avroRecord.put(field.name(), record.getField(field.name())); + } + + writer.append(avroRecord); + } + } + + try (DataFileStream reader = + new DataFileStream<>(outputFile.toInputFile().newStream(), new GenericDatumReader<>())) { + assertThat(AvroTestHelpers.hasIds(reader.getSchema())).isFalse(); + } + } + + @Override + public String compressionProperty() { + return TableProperties.AVRO_COMPRESSION; + } + + @Override + public String compressionValue() { + return "uncompressed"; + } + + @Override + public String expectedCompressionCodec() { + return "null"; + } + + @Override + public String actualCompressionCodec(InputFile inputFile) throws IOException { + return avroMetadataValue(inputFile, "avro.codec"); + } + + @Override + public String metadataValue(InputFile inputFile, String key) throws IOException { + return avroMetadataValue(inputFile, key); + } + + @Override + public String splitSizeProperty() { + throw new UnsupportedOperationException( + "No split size property defined for format: " + format()); + } + + private static String avroMetadataValue(InputFile inputFile, String key) throws IOException { + try (DataFileStream reader = + new DataFileStream<>(inputFile.newStream(), new GenericDatumReader<>())) { + return reader.getMetaString(key); + } + } +} diff --git a/data/src/test/java/org/apache/iceberg/data/orc/OrcFormat.java b/data/src/test/java/org/apache/iceberg/data/orc/OrcFormat.java new file mode 100644 index 000000000000..841ad000f1e3 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/orc/OrcFormat.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.orc; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.FileFormatTestSupport; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORCSchemaUtil; +import org.apache.iceberg.orc.OrcRowWriter; +import org.apache.iceberg.orc.OrcWritingTestUtils; +import org.apache.iceberg.orc.TestORCSchemaUtil; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +public class OrcFormat implements FileFormatTestSupport { + @Override + public FileFormat format() { + return FileFormat.ORC; + } + + @Override + public boolean supportsFeature(String feature) { + return switch (feature) { + case FEATURE_REUSE_CONTAINERS, + FEATURE_COLUMN_METRICS_TRUNCATE_BINARY, + FEATURE_READER_DEFAULT -> + false; + default -> true; + }; + } + + @Override + public void writeRecordsWithoutFieldIds( + OutputFile outputFile, Schema schema, List records) throws IOException { + TypeDescription typeWithIds = ORCSchemaUtil.convert(schema); + TypeDescription typeWithoutIds = TestORCSchemaUtil.removeIds(typeWithIds); + Path hadoopPath = new Path(outputFile.location()); + + Configuration conf = new Configuration(); + OrcFile.WriterOptions options = + OrcFile.writerOptions(conf) + .useUTCTimestamp(true) + .setSchema(typeWithoutIds) + .fileSystem(OrcWritingTestUtils.outputFileSystem(outputFile)); + + OrcRowWriter rowWriter = GenericOrcWriter.buildWriter(schema, typeWithIds); + + try (Writer orcWriter = OrcFile.createWriter(hadoopPath, options)) { + VectorizedRowBatch batch = typeWithoutIds.createRowBatch(); + for (Record record : records) { + rowWriter.write(record, batch); + if (batch.size == batch.getMaxSize()) { + orcWriter.addRowBatch(batch); + batch.reset(); + } + } + + if (batch.size > 0) { + orcWriter.addRowBatch(batch); + batch.reset(); + } + } + + try (Reader reader = newOrcReader(outputFile.toInputFile(), conf)) { + assertThat(TestORCSchemaUtil.hasIds(reader.getSchema())).isFalse(); + } + } + + @Override + public String compressionProperty() { + return TableProperties.ORC_COMPRESSION; + } + + @Override + public String compressionValue() { + return "none"; + } + + @Override + public String expectedCompressionCodec() { + return "NONE"; + } + + @Override + public String actualCompressionCodec(InputFile inputFile) throws IOException { + try (Reader reader = newOrcReader(inputFile, new Configuration())) { + return reader.getCompressionKind().name(); + } + } + + @Override + public String metadataValue(InputFile inputFile, String key) throws IOException { + try (Reader reader = newOrcReader(inputFile, new Configuration())) { + ByteBuffer metadataValue = reader.getMetadataValue(key).duplicate(); + byte[] bytes = new byte[metadataValue.remaining()]; + metadataValue.get(bytes); + return new String(bytes, StandardCharsets.UTF_8); + } + } + + @Override + public String splitSizeProperty() { + return TableProperties.ORC_STRIPE_SIZE_BYTES; + } + + private static Reader newOrcReader(InputFile inputFile, Configuration conf) throws IOException { + Path hadoopPath = new Path(inputFile.location()); + OrcFile.ReaderOptions readerOptions = + OrcFile.readerOptions(conf) + .useUTCTimestamp(true) + .filesystem(OrcWritingTestUtils.inputFileSystem(inputFile)) + .maxLength(inputFile.getLength()); + + return OrcFile.createReader(hadoopPath, readerOptions); + } +} diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/ParquetFormat.java b/data/src/test/java/org/apache/iceberg/data/parquet/ParquetFormat.java new file mode 100644 index 000000000000..81764123450a --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/parquet/ParquetFormat.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.parquet; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.avro.AvroTestHelpers; +import org.apache.iceberg.data.FileFormatTestSupport; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.ParquetFileTestUtils; +import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.types.Types; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; + +public class ParquetFormat implements FileFormatTestSupport { + @Override + public FileFormat format() { + return FileFormat.PARQUET; + } + + @Override + public void writeRecordsWithoutFieldIds( + OutputFile outputFile, Schema schema, List records) throws IOException { + org.apache.avro.Schema avroSchemaWithoutIds = AvroTestHelpers.removeIds(schema); + + try (ParquetWriter writer = + AvroParquetWriter.builder(ParquetFileTestUtils.file(outputFile)) + .withDataModel(GenericData.get()) + .withSchema(avroSchemaWithoutIds) + .withConf(new Configuration()) + .build()) { + for (Record record : records) { + GenericData.Record avroRecord = new GenericData.Record(avroSchemaWithoutIds); + for (Types.NestedField field : schema.columns()) { + avroRecord.put(field.name(), record.getField(field.name())); + } + + writer.write(avroRecord); + } + } + + try (ParquetFileReader reader = + ParquetFileReader.open(ParquetFileTestUtils.file(outputFile.toInputFile()))) { + assertThat(ParquetSchemaUtil.hasIds(reader.getFooter().getFileMetaData().getSchema())) + .isFalse(); + } + } + + @Override + public String compressionProperty() { + return TableProperties.PARQUET_COMPRESSION; + } + + @Override + public String compressionValue() { + return "uncompressed"; + } + + @Override + public String expectedCompressionCodec() { + return "UNCOMPRESSED"; + } + + @Override + public String actualCompressionCodec(InputFile inputFile) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(ParquetFileTestUtils.file(inputFile))) { + return reader.getFooter().getBlocks().get(0).getColumns().get(0).getCodec().name(); + } + } + + @Override + public String metadataValue(InputFile inputFile, String key) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(ParquetFileTestUtils.file(inputFile))) { + return reader.getFooter().getFileMetaData().getKeyValueMetaData().get(key); + } + } + + @Override + public String splitSizeProperty() { + return TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; + } +}