Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ public Optional<RuntimeException> checkConflicts(
return exception;
}

if (commitKind != CommitKind.COMPACT) {
Long nextRowId = latestSnapshot.nextRowId();
exception = checkRowIdExistence(baseEntries, deltaEntries, nextRowId);
if (exception.isPresent()) {
return exception;
}
}

exception = checkRowIdRangeConflicts(commitKind, mergedEntries);
if (exception.isPresent()) {
return exception;
Expand Down Expand Up @@ -536,6 +544,99 @@ private Optional<RuntimeException> checkForRowIdFromSnapshot(
return Optional.empty();
}

Optional<RuntimeException> checkRowIdExistence(
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
@Nullable Long nextRowId) {
if (!dataEvolutionEnabled) {
return Optional.empty();
}

List<SimpleFileEntry> filesToCheck =
deltaEntries.stream()
.filter(
e ->
e.kind() == FileKind.ADD
&& e.firstRowId() != null
&& nextRowId != null
&& e.firstRowId() < nextRowId)
.collect(Collectors.toList());

if (filesToCheck.isEmpty()) {
return Optional.empty();
}

Set<FileRowIdKey> existingIndex = new HashSet<>();
for (SimpleFileEntry base : baseEntries) {
if (base.firstRowId() != null) {
existingIndex.add(
new FileRowIdKey(
base.partition(),
base.bucket(),
base.firstRowId(),
base.rowCount()));
}
}

for (SimpleFileEntry entry : filesToCheck) {
FileRowIdKey key =
new FileRowIdKey(
entry.partition(),
entry.bucket(),
entry.firstRowId(),
entry.rowCount());
if (!existingIndex.contains(key)) {
return Optional.of(
new RuntimeException(
String.format(
"Row ID existence conflict: file '%s' references "
+ "firstRowId=%d, rowCount=%d in bucket %d, "
+ "but no matching file exists in the current snapshot. "
+ "The referenced file may have been rewritten by a "
+ "concurrent compaction or removed by an overwrite.",
entry.fileName(),
entry.firstRowId(),
entry.rowCount(),
entry.bucket())));
}
}
return Optional.empty();
}

private static class FileRowIdKey {
private final BinaryRow partition;
private final int bucket;
private final long firstRowId;
private final long rowCount;

FileRowIdKey(BinaryRow partition, int bucket, long firstRowId, long rowCount) {
this.partition = partition;
this.bucket = bucket;
this.firstRowId = firstRowId;
this.rowCount = rowCount;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FileRowIdKey that = (FileRowIdKey) o;
return bucket == that.bucket
&& firstRowId == that.firstRowId
&& rowCount == that.rowCount
&& Objects.equals(partition, that.partition);
}

@Override
public int hashCode() {
return Objects.hash(partition, bucket, firstRowId, rowCount);
}
}

private static boolean dedicatedStorageFile(String fileName) {
return isBlobFile(fileName) || isVectorStoreFile(fileName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;

import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
Expand Down Expand Up @@ -374,6 +375,120 @@ void testShouldBeOverwriteCommit() {
.isFalse();
}

@Test
void testCheckRowIdExistenceNoConflict() {
ConflictDetection detection = createConflictDetection();

List<SimpleFileEntry> baseEntries = new ArrayList<>();
baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L));

List<SimpleFileEntry> deltaEntries = new ArrayList<>();
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));

assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty();
}

@Test
void testCheckRowIdExistenceBaseFileRemoved() {
ConflictDetection detection = createConflictDetection();

List<SimpleFileEntry> baseEntries = new ArrayList<>();

List<SimpleFileEntry> deltaEntries = new ArrayList<>();
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));

Optional<RuntimeException> result =
detection.checkRowIdExistence(baseEntries, deltaEntries, 100L);
assertThat(result).isPresent();
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
}

@Test
void testCheckRowIdExistenceBaseFileRewritten() {
ConflictDetection detection = createConflictDetection();

List<SimpleFileEntry> baseEntries = new ArrayList<>();
baseEntries.add(createFileEntryWithRowId("f2", ADD, 0L, 200L));

List<SimpleFileEntry> deltaEntries = new ArrayList<>();
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));

Optional<RuntimeException> result =
detection.checkRowIdExistence(baseEntries, deltaEntries, 200L);
assertThat(result).isPresent();
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
}

@Test
void testCheckRowIdExistenceSkipsNewlyAppendedFiles() {
ConflictDetection detection = createConflictDetection();

// nextRowId=100: files with firstRowId >= 100 are newly appended, not references
List<SimpleFileEntry> baseEntries = new ArrayList<>();
baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L));

List<SimpleFileEntry> deltaEntries = new ArrayList<>();
// partial-column update referencing existing rows (firstRowId=0 < nextRowId=100)
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));
// newly appended file (firstRowId=100 >= nextRowId=100), should be skipped
deltaEntries.add(createFileEntryWithRowId("new1", ADD, 100L, 50L));

assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty();
}

@Test
void testCheckRowIdExistenceSkipsNonPreAssigned() {
ConflictDetection detection = createConflictDetection();

List<SimpleFileEntry> baseEntries = new ArrayList<>();

List<SimpleFileEntry> deltaEntries = new ArrayList<>();
deltaEntries.add(createFileEntry("f1", ADD));

assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty();
}

@Test
void testCheckRowIdExistenceSkipsDeleteEntries() {
ConflictDetection detection = createConflictDetection();

List<SimpleFileEntry> baseEntries = new ArrayList<>();

List<SimpleFileEntry> deltaEntries = new ArrayList<>();
deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L));

assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty();
}

@Test
void testCheckRowIdExistenceSkipsWhenNextRowIdNull() {
ConflictDetection detection = createConflictDetection();

List<SimpleFileEntry> baseEntries = new ArrayList<>();
List<SimpleFileEntry> deltaEntries = new ArrayList<>();
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L));

assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, null)).isEmpty();
}

private SimpleFileEntry createFileEntryWithRowId(
String fileName, FileKind kind, long firstRowId, long rowCount) {
return new SimpleFileEntry(
kind,
EMPTY_ROW,
0,
1,
0,
fileName,
Collections.emptyList(),
null,
EMPTY_ROW,
EMPTY_ROW,
null,
rowCount,
firstRowId);
}

private ConflictDetection createConflictDetection() {
return new ConflictDetection(
"test-table",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ def test_compact_conflict_shard_update(self):
tc = wb.new_commit()
with self.assertRaises(RuntimeError) as ctx:
tc.commit(stale_commit_msgs)
self.assertIn("conflicts", str(ctx.exception))
self.assertIn("conflict", str(ctx.exception))
tc.close()
print(f"Conflict detected as expected: {ctx.exception}")

Expand Down
Loading
Loading