diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index ffeff9c99145..5ac55f0cf41f 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -151,6 +151,14 @@ public static ManifestReader read(ManifestFile manifest, FileIO io) { */ public static ManifestReader read( ManifestFile manifest, FileIO io, Map specsById) { + return read(manifest, io, specsById, true); + } + + static ManifestReader read( + ManifestFile manifest, + FileIO io, + Map specsById, + boolean isCommitted) { Preconditions.checkArgument( manifest.content() == ManifestContent.DATA, "Cannot read a delete manifest with a ManifestReader: %s", @@ -163,6 +171,7 @@ public static ManifestReader read( specsById, inheritableMetadata, manifest.firstRowId(), + isCommitted, FileType.DATA_FILES); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java index 410edcc06859..0aec0ac69a9f 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java @@ -72,6 +72,10 @@ abstract class ManifestMergeManager> { protected abstract ManifestReader newManifestReader(ManifestFile manifest); + protected ManifestReader newManifestReader(ManifestFile manifest, boolean isCommitted) { + return newManifestReader(manifest); + } + Iterable mergeManifests(Iterable manifests) { Iterator manifestIter = manifests.iterator(); if (!mergeEnabled || !manifestIter.hasNext()) { @@ -192,7 +196,9 @@ private ManifestFile createManifest(int specId, List bin) { boolean threw = true; try { for (ManifestFile manifest : bin) { - try (ManifestReader reader = newManifestReader(manifest)) { + boolean isCommitted = + manifest.snapshotId() != null && snapshotId() != manifest.snapshotId(); + try (ManifestReader reader = newManifestReader(manifest, isCommitted)) { for (ManifestEntry entry : reader.entries()) { if (entry.status() == Status.DELETED) { // suppress deletes from previous snapshots. only files deleted by this snapshot diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 09bbe8b0cc6b..e3c2325ab780 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -92,6 +92,7 @@ private Class fileClass() { private final InputFile file; private final InheritableMetadata inheritableMetadata; private final Long firstRowId; + private final boolean isCommitted; private final FileType content; private final PartitionSpec spec; private final Schema fileSchema; @@ -125,12 +126,24 @@ protected ManifestReader( InheritableMetadata inheritableMetadata, Long firstRowId, FileType content) { + this(file, specId, specsById, inheritableMetadata, firstRowId, true, content); + } + + protected ManifestReader( + InputFile file, + int specId, + Map specsById, + InheritableMetadata inheritableMetadata, + Long firstRowId, + boolean isCommitted, + FileType content) { Preconditions.checkArgument( firstRowId == null || content == FileType.DATA_FILES, "First row ID is not valid for delete manifests"); this.file = file; this.inheritableMetadata = inheritableMetadata; this.firstRowId = firstRowId; + this.isCommitted = isCommitted; this.content = content; if (specsById != null) { @@ -308,7 +321,7 @@ private CloseableIterable> open(Schema projection) { CloseableIterable> withMetadata = CloseableIterable.transform(reader, inheritableMetadata::apply); - return CloseableIterable.transform(withMetadata, idAssigner(firstRowId)); + return CloseableIterable.transform(withMetadata, idAssigner(firstRowId, isCommitted)); } CloseableIterable> liveEntries() { @@ -398,7 +411,7 @@ static List withStatsColumns(Collection columns) { } private static > Function, ManifestEntry> idAssigner( - Long firstRowId) { + Long firstRowId, boolean isCommitted) { if (firstRowId != null) { return new Function<>() { private long nextRowId = firstRowId; @@ -416,8 +429,13 @@ public ManifestEntry apply(ManifestEntry entry) { return entry; } }; + } else if (!isCommitted) { + // Preserve firstRowId for entries in uncommitted manifests, including EXISTING entries that + // may be merged later + return Function.identity(); } else { - // data file's first_row_id is null when the manifest's first_row_id is null + // committed manifest with null manifest-level firstRowId (pre-v3 upgrade path) + // defensively set the first row ID for every entry to be null return entry -> { if (entry.file() instanceof BaseFile) { ((BaseFile) entry.file()).setFirstRowId(null); diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index e072382543b7..1a70b4f90b8f 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -1250,7 +1250,13 @@ protected ManifestWriter newManifestWriter(PartitionSpec manifestSpec) @Override protected ManifestReader newManifestReader(ManifestFile manifest) { - return MergingSnapshotProducer.this.newManifestReader(manifest); + return newManifestReader(manifest, true); + } + + @Override + protected ManifestReader newManifestReader( + ManifestFile manifest, boolean isCommitted) { + return ManifestFiles.read(manifest, ops().io(), ops().current().specsById(), isCommitted); } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index 0af0c87d3512..6690a1483e53 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -218,6 +218,42 @@ public void testDVs() throws IOException { } } + @TestTemplate + public void testReadCommitedManifestNullifiesEntryRowId() throws IOException { + long firstRowId = 42L; + DataFile fileWithRowId = + DataFiles.builder(SPEC).copy(FILE_A).withFirstRowId(firstRowId).build(); + // the manifest has no manifest-level first_row_id (the v3+ entry still carries one) + ManifestFile manifest = + writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, fileWithRowId)); + assertThat(manifest.firstRowId()).isNull(); + + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO, table.specs())) { + assertThat(Iterables.getOnlyElement(reader).firstRowId()).isNull(); + } + } + + @TestTemplate + public void testReadUncommittedManifestPreservesEntryRowId() throws IOException { + assumeThat(formatVersion) + .as("first_row_id is only written in v3+ manifests") + .isGreaterThanOrEqualTo(3); + + long firstRowId = 42L; + DataFile fileWithRowId = + DataFiles.builder(SPEC).copy(FILE_A).withFirstRowId(firstRowId).build(); + // the manifest has no manifest-level first_row_id, but the entry has one + ManifestFile manifest = + writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, fileWithRowId)); + assertThat(manifest.firstRowId()).isNull(); + assertThat(fileWithRowId.firstRowId()).isEqualTo(firstRowId); + + try (ManifestReader reader = + ManifestFiles.read(manifest, FILE_IO, table.specs(), false /* isCommitted */)) { + assertThat(Iterables.getOnlyElement(reader).firstRowId()).isEqualTo(firstRowId); + } + } + @TestTemplate public void testDataFileSplitOffsetsNullWhenInvalid() throws IOException { DataFile invalidOffset = diff --git a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java index 870622bc983d..42ee8d015779 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java +++ b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java @@ -781,6 +781,33 @@ public void testUpgradeAssignmentWithManifestCompaction(@TempDir File altLocatio FILE_C.recordCount() + FILE_B.recordCount()); } + @Test + public void testRewritePreservesExistingFileFirstRowIds() { + table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + + // FILE_A gets firstRowId=0, FILE_B gets firstRowId=FILE_A.recordCount()=125; assert before + // rewrite + ManifestFile preRewriteManifest = + Iterables.getOnlyElement(table.currentSnapshot().dataManifests(table.io())); + checkDataFileAssignment(table, preRewriteManifest, 0L, FILE_A.recordCount()); + + // set low to trigger an internal manifest merge during the rewrite + table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1").commit(); + // FILE_A and FILE_C must be removed and added in the same operation. Removing FILE_A creates + // an uncommitted manifest containing FILE_B as an existing entry. Adding FILE_C triggers the + // internal manifest merge to read that uncommitted manifest before its firstRowId is assigned. + table.newRewrite().deleteFile(FILE_A).addFile(FILE_C).commit(); + + // merged manifest live files: [FILE_C (added), FILE_B (existing)] + ManifestFile manifest = + Iterables.getOnlyElement(table.currentSnapshot().dataManifests(table.io())); + checkDataFileAssignment( + table, + manifest, + FILE_A.recordCount() + FILE_B.recordCount(), // FILE_C gets 225 + FILE_A.recordCount()); // FILE_B must retain its original firstRowId (125) + } + private static ManifestContent content(int ordinal) { return ManifestContent.values()[ordinal]; }