Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ public static ManifestReader<DataFile> read(ManifestFile manifest, FileIO io) {
*/
public static ManifestReader<DataFile> read(
ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) {
return read(manifest, io, specsById, true);
}

static ManifestReader<DataFile> read(
ManifestFile manifest,
FileIO io,
Map<Integer, PartitionSpec> specsById,
boolean isCommitted) {
Preconditions.checkArgument(
manifest.content() == ManifestContent.DATA,
"Cannot read a delete manifest with a ManifestReader: %s",
Expand All @@ -136,6 +144,7 @@ public static ManifestReader<DataFile> read(
specsById,
inheritableMetadata,
manifest.firstRowId(),
isCommitted,
FileType.DATA_FILES);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ abstract class ManifestMergeManager<F extends ContentFile<F>> {

protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest);

protected ManifestReader<F> newManifestReader(ManifestFile manifest, boolean isCommitted) {
return newManifestReader(manifest);
}

Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) {
Iterator<ManifestFile> manifestIter = manifests.iterator();
if (!mergeEnabled || !manifestIter.hasNext()) {
Expand Down Expand Up @@ -172,7 +176,9 @@ private ManifestFile createManifest(int specId, List<ManifestFile> bin) {
boolean threw = true;
try {
for (ManifestFile manifest : bin) {
try (ManifestReader<F> reader = newManifestReader(manifest)) {
boolean isCommitted =
manifest.snapshotId() != null && snapshotId() != manifest.snapshotId();
try (ManifestReader<F> reader = newManifestReader(manifest, isCommitted)) {
for (ManifestEntry<F> entry : reader.entries()) {
if (entry.status() == Status.DELETED) {
// suppress deletes from previous snapshots. only files deleted by this snapshot
Expand Down
24 changes: 21 additions & 3 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private Class<? extends StructLike> fileClass() {
private final InputFile file;
private final InheritableMetadata inheritableMetadata;
private final Long firstRowId;
private final boolean isCommitted;
private final FileType content;
private final PartitionSpec spec;
private final Schema fileSchema;
Expand Down Expand Up @@ -115,12 +116,24 @@ protected ManifestReader(
InheritableMetadata inheritableMetadata,
Long firstRowId,
FileType content) {
this(file, specId, specsById, inheritableMetadata, firstRowId, true, content);
}

protected ManifestReader(
InputFile file,
int specId,
Map<Integer, PartitionSpec> specsById,
InheritableMetadata inheritableMetadata,
Long firstRowId,
boolean isCommitted,
FileType content) {
Preconditions.checkArgument(
firstRowId == null || content == FileType.DATA_FILES,
"First row ID is not valid for delete manifests");
this.file = file;
this.inheritableMetadata = inheritableMetadata;
this.firstRowId = firstRowId;
this.isCommitted = isCommitted;
this.content = content;

if (specsById != null) {
Expand Down Expand Up @@ -296,7 +309,7 @@ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {

CloseableIterable<ManifestEntry<F>> withMetadata =
CloseableIterable.transform(reader, inheritableMetadata::apply);
return CloseableIterable.transform(withMetadata, idAssigner(firstRowId));
return CloseableIterable.transform(withMetadata, idAssigner(firstRowId, isCommitted));
}

CloseableIterable<ManifestEntry<F>> liveEntries() {
Expand Down Expand Up @@ -386,7 +399,7 @@ static List<String> withStatsColumns(Collection<String> columns) {
}

private static <F extends ContentFile<F>> Function<ManifestEntry<F>, ManifestEntry<F>> idAssigner(
Long firstRowId) {
Long firstRowId, boolean isCommitted) {
if (firstRowId != null) {
return new Function<>() {
private long nextRowId = firstRowId;
Expand All @@ -404,8 +417,13 @@ public ManifestEntry<F> apply(ManifestEntry<F> entry) {
return entry;
}
};
} else if (!isCommitted) {
// Preserve firstRowId for entries in uncommitted manifests, including EXISTING entries that
// may be merged later
return Function.identity();
} else {
// data file's first_row_id is null when the manifest's first_row_id is null
// committed manifest with null manifest-level firstRowId (pre-v3 upgrade path)
// defensively set the first row ID for every entry to be null
return entry -> {
if (entry.file() instanceof BaseFile) {
((BaseFile<?>) entry.file()).setFirstRowId(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,13 @@ protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec)

@Override
protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newManifestReader(manifest);
return newManifestReader(manifest, true);
}

@Override
protected ManifestReader<DataFile> newManifestReader(
ManifestFile manifest, boolean isCommitted) {
return ManifestFiles.read(manifest, ops().io(), ops().current().specsById(), isCommitted);
}
}

Expand Down
36 changes: 36 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,42 @@ public void testDVs() throws IOException {
}
}

@TestTemplate
public void testReadCommitedManifestNullifiesEntryRowId() throws IOException {
long firstRowId = 42L;
DataFile fileWithRowId =
DataFiles.builder(SPEC).copy(FILE_A).withFirstRowId(firstRowId).build();
// the manifest has no manifest-level first_row_id (the v3+ entry still carries one)
ManifestFile manifest =
writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, fileWithRowId));
assertThat(manifest.firstRowId()).isNull();

try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO, table.specs())) {
assertThat(Iterables.getOnlyElement(reader).firstRowId()).isNull();
}
}

@TestTemplate
public void testReadUncommittedManifestPreservesEntryRowId() throws IOException {
assumeThat(formatVersion)
.as("first_row_id is only written in v3+ manifests")
.isGreaterThanOrEqualTo(3);

long firstRowId = 42L;
DataFile fileWithRowId =
DataFiles.builder(SPEC).copy(FILE_A).withFirstRowId(firstRowId).build();
// the manifest has no manifest-level first_row_id, but the entry has one
ManifestFile manifest =
writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, fileWithRowId));
assertThat(manifest.firstRowId()).isNull();
assertThat(fileWithRowId.firstRowId()).isEqualTo(firstRowId);

try (ManifestReader<DataFile> reader =
ManifestFiles.read(manifest, FILE_IO, table.specs(), false /* isCommitted */)) {
assertThat(Iterables.getOnlyElement(reader).firstRowId()).isEqualTo(firstRowId);
}
}

@TestTemplate
public void testDataFileSplitOffsetsNullWhenInvalid() throws IOException {
DataFile invalidOffset =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,33 @@ public void testUpgradeAssignmentWithManifestCompaction(@TempDir File altLocatio
FILE_C.recordCount() + FILE_B.recordCount());
}

@Test
public void testRewritePreservesExistingFileFirstRowIds() {
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();

// FILE_A gets firstRowId=0, FILE_B gets firstRowId=FILE_A.recordCount()=125; assert before
// rewrite
ManifestFile preRewriteManifest =
Iterables.getOnlyElement(table.currentSnapshot().dataManifests(table.io()));
checkDataFileAssignment(table, preRewriteManifest, 0L, FILE_A.recordCount());

// set low to trigger an internal manifest merge during the rewrite
table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1").commit();
// FILE_A and FILE_C must be removed and added in the same operation. Removing FILE_A creates
// an uncommitted manifest containing FILE_B as an existing entry. Adding FILE_C triggers the
// internal manifest merge to read that uncommitted manifest before its firstRowId is assigned.
table.newRewrite().deleteFile(FILE_A).addFile(FILE_C).commit();

// merged manifest live files: [FILE_C (added), FILE_B (existing)]
ManifestFile manifest =
Iterables.getOnlyElement(table.currentSnapshot().dataManifests(table.io()));
checkDataFileAssignment(
table,
manifest,
FILE_A.recordCount() + FILE_B.recordCount(), // FILE_C gets 225
FILE_A.recordCount()); // FILE_B must retain its original firstRowId (125)
}

private static ManifestContent content(int ordinal) {
return ManifestContent.values()[ordinal];
}
Expand Down
Loading