diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java b/core/src/main/java/org/apache/iceberg/MetricsUtil.java index b631af0fc5ef..2cd001b5c46f 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsUtil.java +++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java @@ -40,6 +40,37 @@ public class MetricsUtil { private MetricsUtil() {} + /** + * Copies a metrics object without lower and upper bounds for given fields. + * + * @param excludedFieldIds field IDs for which the lower and upper bounds must be dropped + * @return a new metrics object without lower and upper bounds for given fields + */ + public static Metrics copyWithoutFieldBounds(Metrics metrics, Set excludedFieldIds) { + return new Metrics( + metrics.recordCount(), + metrics.columnSizes(), + metrics.valueCounts(), + metrics.nullValueCounts(), + metrics.nanValueCounts(), + copyWithoutKeys(metrics.lowerBounds(), excludedFieldIds), + copyWithoutKeys(metrics.upperBounds(), excludedFieldIds)); + } + + private static Map copyWithoutKeys(Map map, Set keys) { + if (map == null) { + return null; + } + + Map filteredMap = Maps.newHashMap(map); + + for (K key : keys) { + filteredMap.remove(key); + } + + return filteredMap.isEmpty() ? null : filteredMap; + } + /** * Construct mapping relationship between column id to NaN value counts from input metrics and * metrics config. diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java index d3e01bcd04d5..4f799b434993 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java @@ -18,11 +18,17 @@ */ package org.apache.iceberg.deletes; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; + import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Set; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsUtil; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.encryption.EncryptionKeyMetadata; @@ -30,6 +36,7 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.util.CharSequenceSet; /** @@ -40,6 +47,9 @@ * records, consider using {@link SortingPositionOnlyDeleteWriter} instead. */ public class PositionDeleteWriter implements FileWriter, DeleteWriteResult> { + private static final Set SINGLE_REFERENCED_FILE_BOUNDS_ONLY = + ImmutableSet.of(DELETE_FILE_PATH.fieldId(), DELETE_FILE_POS.fieldId()); + private final FileAppender appender; private final FileFormat format; private final String location; @@ -89,7 +99,7 @@ public void close() throws IOException { .withEncryptionKeyMetadata(keyMetadata) .withSplitOffsets(appender.splitOffsets()) .withFileSizeInBytes(appender.length()) - .withMetrics(appender.metrics()) + .withMetrics(metrics()) .build(); } } @@ -107,4 +117,13 @@ public DeleteFile toDeleteFile() { public DeleteWriteResult result() { return new DeleteWriteResult(toDeleteFile(), referencedDataFiles()); } + + private Metrics metrics() { + Metrics metrics = appender.metrics(); + if (referencedDataFiles.size() > 1) { + return MetricsUtil.copyWithoutFieldBounds(metrics, SINGLE_REFERENCED_FILE_BOUNDS_ONLY); + } else { + return metrics; + } + } } diff --git a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java index eff918b145f7..7910c666b45d 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java +++ b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java @@ -229,6 +229,17 @@ public void testPositionDeleteWriter() throws IOException { DeleteFile deleteFile = result.first(); CharSequenceSet referencedDataFiles = result.second(); + if (fileFormat == FileFormat.AVRO) { + Assert.assertNull(deleteFile.lowerBounds()); + Assert.assertNull(deleteFile.upperBounds()); + } else { + Assert.assertEquals(1, referencedDataFiles.size()); + Assert.assertEquals(2, deleteFile.lowerBounds().size()); + Assert.assertTrue(deleteFile.lowerBounds().containsKey(DELETE_FILE_PATH.fieldId())); + Assert.assertEquals(2, deleteFile.upperBounds().size()); + Assert.assertTrue(deleteFile.upperBounds().containsKey(DELETE_FILE_PATH.fieldId())); + } + // verify the written delete file GenericRecord deleteRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema()); List expectedDeletes = @@ -302,6 +313,53 @@ public void testPositionDeleteWriterWithRow() throws IOException { Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); } + @Test + public void testPositionDeleteWriterMultipleDataFiles() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + + // write two data files + DataFile dataFile1 = writeData(writerFactory, dataRows, table.spec(), partition); + DataFile dataFile2 = writeData(writerFactory, dataRows, table.spec(), partition); + + // write a position delete file referencing both + List> deletes = + ImmutableList.of( + positionDelete(dataFile1.path(), 0L, null), + positionDelete(dataFile1.path(), 2L, null), + positionDelete(dataFile2.path(), 4L, null)); + Pair result = + writePositionDeletes(writerFactory, deletes, table.spec(), partition); + DeleteFile deleteFile = result.first(); + CharSequenceSet referencedDataFiles = result.second(); + + // verify the written delete file has NO lower and upper bounds + Assert.assertEquals(2, referencedDataFiles.size()); + Assert.assertNull(deleteFile.lowerBounds()); + Assert.assertNull(deleteFile.upperBounds()); + + // commit the data and delete files + table + .newRowDelta() + .addRows(dataFile1) + .addRows(dataFile2) + .addDeletes(deleteFile) + .validateDataFilesExist(referencedDataFiles) + .validateDeletedFiles() + .commit(); + + // verify the delete file is applied correctly + List expectedRows = + ImmutableList.of( + toRow(2, "aaa"), + toRow(4, "aaa"), + toRow(5, "aaa"), + toRow(1, "aaa"), + toRow(2, "aaa"), + toRow(3, "aaa"), + toRow(4, "aaa")); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } + private DataFile writeData( FileWriterFactory writerFactory, List rows, PartitionSpec spec, StructLike partitionKey) throws IOException { diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java index 21698cf37646..d1a782057006 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java +++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java @@ -203,6 +203,44 @@ public void testPositionDeleteMetrics() throws IOException { 3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5))); } + @Test + public void testPositionDeleteMetricsCoveringMultipleDataFiles() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table); + EncryptedOutputFile outputFile = fileFactory.newOutputFile(); + PositionDeleteWriter deleteWriter = + writerFactory.newPositionDeleteWriter(outputFile, table.spec(), null); + + try { + PositionDelete positionDelete = PositionDelete.create(); + + positionDelete.set("File A", 1, toRow(3, "3", true, 3L)); + deleteWriter.write(positionDelete); + + positionDelete.set("File B", 1, toRow(3, "3", true, 3L)); + deleteWriter.write(positionDelete); + + } finally { + deleteWriter.close(); + } + + DeleteFile deleteFile = deleteWriter.toDeleteFile(); + + // should have NO bounds for path and position as the file covers multiple data paths + Map lowerBounds = deleteFile.lowerBounds(); + Assert.assertEquals(2, lowerBounds.size()); + Assert.assertEquals( + 3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), lowerBounds.get(1))); + Assert.assertEquals( + 3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), lowerBounds.get(5))); + + Map upperBounds = deleteFile.upperBounds(); + Assert.assertEquals(2, upperBounds.size()); + Assert.assertEquals( + 3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), upperBounds.get(1))); + Assert.assertEquals( + 3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5))); + } + @Test public void testMaxColumns() throws IOException { File tableDir = temp.newFolder();