diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 29ee0458183f..493d13d88dee 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -224,6 +224,14 @@ public Optional checkConflicts( return exception; } + if (commitKind != CommitKind.COMPACT) { + Long nextRowId = latestSnapshot.nextRowId(); + exception = checkRowIdExistence(baseEntries, deltaEntries, nextRowId); + if (exception.isPresent()) { + return exception; + } + } + exception = checkRowIdRangeConflicts(commitKind, mergedEntries); if (exception.isPresent()) { return exception; @@ -536,6 +544,99 @@ private Optional checkForRowIdFromSnapshot( return Optional.empty(); } + Optional checkRowIdExistence( + List baseEntries, + List deltaEntries, + @Nullable Long nextRowId) { + if (!dataEvolutionEnabled) { + return Optional.empty(); + } + + List filesToCheck = + deltaEntries.stream() + .filter( + e -> + e.kind() == FileKind.ADD + && e.firstRowId() != null + && nextRowId != null + && e.firstRowId() < nextRowId) + .collect(Collectors.toList()); + + if (filesToCheck.isEmpty()) { + return Optional.empty(); + } + + Set existingIndex = new HashSet<>(); + for (SimpleFileEntry base : baseEntries) { + if (base.firstRowId() != null) { + existingIndex.add( + new FileRowIdKey( + base.partition(), + base.bucket(), + base.firstRowId(), + base.rowCount())); + } + } + + for (SimpleFileEntry entry : filesToCheck) { + FileRowIdKey key = + new FileRowIdKey( + entry.partition(), + entry.bucket(), + entry.firstRowId(), + entry.rowCount()); + if (!existingIndex.contains(key)) { + return Optional.of( + new RuntimeException( + String.format( + "Row ID existence conflict: file '%s' references " + + "firstRowId=%d, rowCount=%d in bucket %d, " + + "but no matching file exists in the current snapshot. " + + "The referenced file may have been rewritten by a " + + "concurrent compaction or removed by an overwrite.", + entry.fileName(), + entry.firstRowId(), + entry.rowCount(), + entry.bucket()))); + } + } + return Optional.empty(); + } + + private static class FileRowIdKey { + private final BinaryRow partition; + private final int bucket; + private final long firstRowId; + private final long rowCount; + + FileRowIdKey(BinaryRow partition, int bucket, long firstRowId, long rowCount) { + this.partition = partition; + this.bucket = bucket; + this.firstRowId = firstRowId; + this.rowCount = rowCount; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FileRowIdKey that = (FileRowIdKey) o; + return bucket == that.bucket + && firstRowId == that.firstRowId + && rowCount == that.rowCount + && Objects.equals(partition, that.partition); + } + + @Override + public int hashCode() { + return Objects.hash(partition, bucket, firstRowId, rowCount); + } + } + private static boolean dedicatedStorageFile(String fileName) { return isBlobFile(fileName) || isVectorStoreFile(fileName); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index c3e0258da28f..1c36b9e09ee7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; +import java.util.Optional; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; @@ -374,6 +375,120 @@ void testShouldBeOverwriteCommit() { .isFalse(); } + @Test + void testCheckRowIdExistenceNoConflict() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L)); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); + + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty(); + } + + @Test + void testCheckRowIdExistenceBaseFileRemoved() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); + + Optional result = + detection.checkRowIdExistence(baseEntries, deltaEntries, 100L); + assertThat(result).isPresent(); + assertThat(result.get().getMessage()).contains("Row ID existence conflict"); + } + + @Test + void testCheckRowIdExistenceBaseFileRewritten() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("f2", ADD, 0L, 200L)); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); + + Optional result = + detection.checkRowIdExistence(baseEntries, deltaEntries, 200L); + assertThat(result).isPresent(); + assertThat(result.get().getMessage()).contains("Row ID existence conflict"); + } + + @Test + void testCheckRowIdExistenceSkipsNewlyAppendedFiles() { + ConflictDetection detection = createConflictDetection(); + + // nextRowId=100: files with firstRowId >= 100 are newly appended, not references + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L)); + + List deltaEntries = new ArrayList<>(); + // partial-column update referencing existing rows (firstRowId=0 < nextRowId=100) + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); + // newly appended file (firstRowId=100 >= nextRowId=100), should be skipped + deltaEntries.add(createFileEntryWithRowId("new1", ADD, 100L, 50L)); + + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty(); + } + + @Test + void testCheckRowIdExistenceSkipsNonPreAssigned() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntry("f1", ADD)); + + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty(); + } + + @Test + void testCheckRowIdExistenceSkipsDeleteEntries() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L)); + + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty(); + } + + @Test + void testCheckRowIdExistenceSkipsWhenNextRowIdNull() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); + + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, null)).isEmpty(); + } + + private SimpleFileEntry createFileEntryWithRowId( + String fileName, FileKind kind, long firstRowId, long rowCount) { + return new SimpleFileEntry( + kind, + EMPTY_ROW, + 0, + 1, + 0, + fileName, + Collections.emptyList(), + null, + EMPTY_ROW, + EMPTY_ROW, + null, + rowCount, + firstRowId); + } + private ConflictDetection createConflictDetection() { return new ConflictDetection( "test-table", diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index f48f4c99f368..13555a423131 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -762,7 +762,7 @@ def test_compact_conflict_shard_update(self): tc = wb.new_commit() with self.assertRaises(RuntimeError) as ctx: tc.commit(stale_commit_msgs) - self.assertIn("conflicts", str(ctx.exception)) + self.assertIn("conflict", str(ctx.exception)) tc.close() print(f"Conflict detected as expected: {ctx.exception}") diff --git a/paimon-python/pypaimon/tests/write/conflict_detection_test.py b/paimon-python/pypaimon/tests/write/conflict_detection_test.py new file mode 100644 index 000000000000..302a7d801fa3 --- /dev/null +++ b/paimon-python/pypaimon/tests/write/conflict_detection_test.py @@ -0,0 +1,297 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from dataclasses import dataclass +from typing import List + +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.schema.data_types import AtomicType, DataField +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.write.commit.conflict_detection import ( + ConflictDetection, + RowIdColumnConflictChecker, +) + + +def _make_file(file_name, row_count=100, first_row_id=None, + schema_id=0, write_cols=None): + return DataFileMeta( + file_name=file_name, + file_size=1024, + row_count=row_count, + min_key=None, + max_key=None, + key_stats=None, + value_stats=None, + min_sequence_number=0, + max_sequence_number=0, + schema_id=schema_id, + level=0, + extra_files=[], + first_row_id=first_row_id, + write_cols=write_cols, + ) + + +_EMPTY_PARTITION = GenericRow([], []) + + +def _make_entry(file_name, kind=0, bucket=0, first_row_id=None, + row_count=100, write_cols=None, schema_id=0): + return ManifestEntry( + kind=kind, + partition=_EMPTY_PARTITION, + bucket=bucket, + total_buckets=1, + file=_make_file(file_name, row_count=row_count, + first_row_id=first_row_id, schema_id=schema_id, + write_cols=write_cols), + ) + + +@dataclass +class _FakeSchema: + id: int + fields: List[DataField] + + +class _FakeSchemaManager: + + def __init__(self, schemas=None): + self._schemas = {} + if schemas: + for s in schemas: + self._schemas[s.id] = s + + def get_schema(self, schema_id): + return self._schemas.get(schema_id) + + +_DEFAULT_SCHEMA = _FakeSchema( + id=0, + fields=[ + DataField(1, "col_a", AtomicType("INT")), + DataField(2, "col_b", AtomicType("STRING")), + DataField(3, "col_c", AtomicType("BIGINT")), + ], +) + + +class TestCheckRowIdExistence(unittest.TestCase): + + def _make_detection(self): + return ConflictDetection( + data_evolution_enabled=True, + snapshot_manager=None, + manifest_list_manager=None, + table=None, + commit_scanner=None, + ) + + def test_no_conflict_when_base_file_exists(self): + detection = self._make_detection() + base = [_make_entry("f1", kind=0, first_row_id=0, row_count=100)] + delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] + self.assertIsNone( + detection.check_row_id_existence(base, delta, next_row_id=200)) + + def test_conflict_when_base_file_removed(self): + detection = self._make_detection() + base = [] + delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] + result = detection.check_row_id_existence(base, delta, next_row_id=200) + self.assertIsNotNone(result) + self.assertIn("Row ID existence conflict", str(result)) + + def test_conflict_when_base_file_rewritten(self): + detection = self._make_detection() + base = [_make_entry("f2", kind=0, first_row_id=0, row_count=200)] + delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] + result = detection.check_row_id_existence(base, delta, next_row_id=200) + self.assertIsNotNone(result) + self.assertIn("Row ID existence conflict", str(result)) + + def test_skip_newly_appended_files(self): + detection = self._make_detection() + base = [] + delta = [_make_entry("p1", kind=0, first_row_id=200, row_count=100)] + self.assertIsNone( + detection.check_row_id_existence(base, delta, next_row_id=200)) + + def test_skip_when_no_pre_assigned_row_id(self): + detection = self._make_detection() + base = [] + delta = [_make_entry("f1", kind=0)] + self.assertIsNone( + detection.check_row_id_existence(base, delta, next_row_id=200)) + + def test_skip_delete_entries(self): + detection = self._make_detection() + base = [] + delta = [_make_entry("f1", kind=1, first_row_id=0, row_count=100)] + self.assertIsNone( + detection.check_row_id_existence(base, delta, next_row_id=200)) + + def test_skip_when_data_evolution_disabled(self): + detection = ConflictDetection( + data_evolution_enabled=False, + snapshot_manager=None, + manifest_list_manager=None, + table=None, + commit_scanner=None, + ) + base = [] + delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] + self.assertIsNone( + detection.check_row_id_existence(base, delta, next_row_id=200)) + + def test_skip_when_next_row_id_is_none(self): + detection = self._make_detection() + base = [] + delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] + self.assertIsNone( + detection.check_row_id_existence(base, delta, next_row_id=None)) + + +class TestRowIdColumnConflictChecker(unittest.TestCase): + + def _make_checker(self, delta_files, schema=None): + schema_mgr = _FakeSchemaManager([schema or _DEFAULT_SCHEMA]) + return RowIdColumnConflictChecker.from_data_files(schema_mgr, delta_files) + + def test_no_conflict_disjoint_rows(self): + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, write_cols=["col_a"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=200, + write_cols=["col_a"]) + self.assertFalse(checker.conflicts_with(committed)) + + def test_no_conflict_same_rows_different_columns(self): + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, write_cols=["col_a"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=0, + write_cols=["col_b"]) + self.assertFalse(checker.conflicts_with(committed)) + + def test_conflict_same_rows_same_columns(self): + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, write_cols=["col_a"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=0, + write_cols=["col_a"]) + self.assertTrue(checker.conflicts_with(committed)) + + def test_conflict_overlapping_rows_overlapping_columns(self): + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, + write_cols=["col_a", "col_b"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=50, + write_cols=["col_b", "col_c"]) + self.assertTrue(checker.conflicts_with(committed)) + + def test_conflict_null_write_cols_committed(self): + """null write_cols means full-schema write — always conflicts on column dimension.""" + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, write_cols=["col_a"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=0, + write_cols=None) + self.assertTrue(checker.conflicts_with(committed)) + + def test_conflict_null_write_cols_delta(self): + """null write_cols in delta means all columns are in the write range.""" + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, write_cols=None), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=0, + write_cols=["col_b"]) + self.assertTrue(checker.conflicts_with(committed)) + + def test_no_conflict_committed_file_no_row_id(self): + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, write_cols=["col_a"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=None, + write_cols=["col_a"]) + self.assertFalse(checker.conflicts_with(committed)) + + def test_none_when_no_delta_files_with_row_id(self): + delta_files = [ + _make_file("d1", row_count=100, first_row_id=None), + ] + schema_mgr = _FakeSchemaManager([_DEFAULT_SCHEMA]) + checker = RowIdColumnConflictChecker.from_data_files(schema_mgr, delta_files) + self.assertIsNone(checker) + + def test_system_fields_skipped(self): + """System fields like _ROW_ID should not count as column conflicts.""" + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, + write_cols=["_ROW_ID", "col_a"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=0, + write_cols=["_ROW_ID", "col_b"]) + self.assertFalse(checker.conflicts_with(committed)) + + def test_cross_schema_field_id_resolution(self): + """Fields with same ID but different names across schema versions should still match.""" + schema_v0 = _FakeSchema( + id=0, + fields=[ + DataField(1, "col_a", AtomicType("INT")), + DataField(2, "col_b", AtomicType("STRING")), + ], + ) + schema_v1 = _FakeSchema( + id=1, + fields=[ + DataField(1, "col_a_renamed", AtomicType("INT")), + DataField(2, "col_b", AtomicType("STRING")), + DataField(3, "col_c", AtomicType("BIGINT")), + ], + ) + schema_mgr = _FakeSchemaManager([schema_v0, schema_v1]) + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, + schema_id=0, write_cols=["col_a"]), + ] + checker = RowIdColumnConflictChecker.from_data_files(schema_mgr, delta_files) + committed_same_field = _make_file( + "c1", row_count=100, first_row_id=0, + schema_id=1, write_cols=["col_a_renamed"]) + self.assertTrue(checker.conflicts_with(committed_same_field)) + committed_diff_field = _make_file( + "c2", row_count=100, first_row_id=0, + schema_id=1, write_cols=["col_c"]) + self.assertFalse(checker.conflicts_with(committed_diff_field)) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py b/paimon-python/pypaimon/write/commit/conflict_detection.py index 6b7652c00be8..fc92dc838b93 100644 --- a/paimon-python/pypaimon/write/commit/conflict_detection.py +++ b/paimon-python/pypaimon/write/commit/conflict_detection.py @@ -19,31 +19,140 @@ Conflict detection for commit operations. """ +import bisect + from pypaimon.manifest.manifest_list_manager import ManifestListManager from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.file_entry import FileEntry +from pypaimon.table.special_fields import SpecialFields from pypaimon.utils.range import Range from pypaimon.utils.range_helper import RangeHelper from pypaimon.write.commit.commit_scanner import CommitScanner -class ConflictDetection: - """Detects conflicts between base and delta files during commit. +class RowIdColumnConflictChecker: + """Checks for row ID × column conflicts between delta files and committed files. - This class provides row ID range conflict checks and row ID from snapshot conflict checks - for Data Evolution tables. + Built from the current commit's delta files. For each committed file, + checks whether it overlaps with the delta files on BOTH dimensions: + row-id range AND write columns. """ + def __init__(self, write_ranges, schema_manager): + self._write_ranges = write_ranges + self._schema_manager = schema_manager + self._field_id_cache = {} + + @classmethod + def from_data_files(cls, schema_manager, delta_files): + files_with_row_id = [f for f in delta_files if f.first_row_id is not None] + if not files_with_row_id: + return None + + range_helper = RangeHelper(lambda f: f.row_id_range()) + groups = range_helper.merge_overlapping_ranges(files_with_row_id) + + write_ranges = [] + for group in groups: + merged_from = min(f.first_row_id for f in group) + merged_to = max(f.first_row_id + f.row_count - 1 for f in group) + merged_range = Range(merged_from, merged_to) + + field_ids = set() + for f in group: + cls._add_write_field_ids(field_ids, f, schema_manager) + + write_ranges.append(_WriteRange(merged_range, field_ids)) + + write_ranges.sort(key=lambda wr: (wr.range.from_, wr.range.to)) + return cls(write_ranges, schema_manager) + + def is_empty(self): + return len(self._write_ranges) == 0 + + def conflicts_with(self, file): + if file.first_row_id is None: + return False + + file_range = Range(file.first_row_id, file.first_row_id + file.row_count - 1) + index = self._first_possible_range(file_range) + + while index < len(self._write_ranges): + wr = self._write_ranges[index] + if wr.range.from_ > file_range.to: + return False + if wr.range.overlaps(file_range) and self._contains_any_write_field(wr.field_ids, file): + return True + index += 1 + + return False + + def _first_possible_range(self, target): + keys = [wr.range.to for wr in self._write_ranges] + return bisect.bisect_left(keys, target.from_) + + def _contains_any_write_field(self, field_ids, file): + if file.write_cols is None: + return True + for col_name in file.write_cols: + fid = self._field_id(file, col_name) + if fid is not None and fid in field_ids: + return True + return False + + def _field_id(self, file, col_name): + if SpecialFields.is_system_field(col_name): + return None + name_to_id = self._field_id_by_name(file.schema_id) + fid = name_to_id.get(col_name) + if fid is None: + raise RuntimeError( + f"Column '{col_name}' not found in schema {file.schema_id}") + return fid + + def _field_id_by_name(self, schema_id): + if schema_id not in self._field_id_cache: + schema = self._schema_manager.get_schema(schema_id) + if schema is None: + raise RuntimeError(f"Schema {schema_id} not found") + self._field_id_cache[schema_id] = { + field.name: field.id for field in schema.fields + } + return self._field_id_cache[schema_id] + + @classmethod + def _add_write_field_ids(cls, field_ids, file, schema_manager): + if file.write_cols is None: + schema = schema_manager.get_schema(file.schema_id) + if schema is not None: + for field in schema.fields: + if not SpecialFields.is_system_field(field.name): + field_ids.add(field.id) + else: + name_to_id = {} + schema = schema_manager.get_schema(file.schema_id) + if schema is not None: + name_to_id = {field.name: field.id for field in schema.fields} + for col_name in file.write_cols: + if SpecialFields.is_system_field(col_name): + continue + fid = name_to_id.get(col_name) + if fid is not None: + field_ids.add(fid) + + +class _WriteRange: + + def __init__(self, range_, field_ids): + self.range = range_ + self.field_ids = field_ids + + +class ConflictDetection: + """Detects conflicts between base and delta files during commit.""" + def __init__(self, data_evolution_enabled, snapshot_manager, manifest_list_manager: ManifestListManager, table, commit_scanner: CommitScanner): - """Initialize ConflictDetection. - - Args: - data_evolution_enabled: Whether data evolution feature is enabled. - snapshot_manager: Manager for reading snapshot metadata. - manifest_list_manager: Manager for reading manifest lists. - table: The FileStoreTable instance. - """ self.data_evolution_enabled = data_evolution_enabled self.snapshot_manager = snapshot_manager self.manifest_list_manager = manifest_list_manager @@ -58,20 +167,6 @@ def has_row_id_check_from_snapshot(self): return self._row_id_check_from_snapshot is not None def check_conflicts(self, latest_snapshot, base_entries, delta_entries, commit_kind): - """Run all conflict checks and return the first detected conflict. - - merges base_entries and delta_entries, then runs conflict checks - on the merged result. - - Args: - latest_snapshot: The latest snapshot at commit time. - base_entries: All entries read from the latest snapshot. - delta_entries: The delta entries being committed. - commit_kind: The kind of commit (e.g. "APPEND", "COMPACT", "OVERWRITE"). - - Returns: - A RuntimeError if a conflict is detected, otherwise None. - """ all_entries = list(base_entries) + list(delta_entries) try: @@ -80,25 +175,61 @@ def check_conflicts(self, latest_snapshot, base_entries, delta_entries, commit_k return RuntimeError( "File deletion conflicts detected! Give up committing. " + str(e)) + if commit_kind != "COMPACT": + next_row_id = latest_snapshot.next_row_id if latest_snapshot else None + conflict = self.check_row_id_existence( + base_entries, delta_entries, next_row_id) + if conflict is not None: + return conflict + conflict = self.check_row_id_range_conflicts(commit_kind, merged_entries) if conflict is not None: return conflict return self.check_row_id_from_snapshot(latest_snapshot, delta_entries) - def check_row_id_range_conflicts(self, commit_kind, commit_entries): - """Check for row ID range conflicts among merged entries. + def check_row_id_existence(self, base_entries, delta_entries, next_row_id=None): + if not self.data_evolution_enabled: + return None + + if next_row_id is None: + return None + + files_to_check = [ + entry for entry in delta_entries + if entry.kind == 0 + and entry.file.first_row_id is not None + and entry.file.first_row_id < next_row_id + ] - only enabled when data evolution is active, and checks that - overlapping row ID ranges in non-blob data files are identical. + if not files_to_check: + return None + + existing_index = set() + for base in base_entries: + if base.file.first_row_id is not None: + existing_index.add(( + base.partition, base.bucket, + base.file.first_row_id, base.file.row_count)) + + for entry in files_to_check: + key = (entry.partition, entry.bucket, + entry.file.first_row_id, entry.file.row_count) + if key not in existing_index: + return RuntimeError( + "Row ID existence conflict: file '{}' references " + "firstRowId={}, rowCount={} in bucket {}, " + "but no matching file exists in the current snapshot. " + "The referenced file may have been rewritten by a " + "concurrent compaction or removed by an overwrite.".format( + entry.file.file_name, + entry.file.first_row_id, + entry.file.row_count, + entry.bucket)) - Args: - commit_kind: The kind of commit (e.g. "APPEND", "COMPACT"). - commit_entries: The entries being committed. + return None - Returns: - A RuntimeError if conflict is detected, otherwise None. - """ + def check_row_id_range_conflicts(self, commit_kind, commit_entries): if not self.data_evolution_enabled: return None if self._row_id_check_from_snapshot is None and commit_kind != "COMPACT": @@ -137,31 +268,16 @@ def check_row_id_range_conflicts(self, commit_kind, commit_entries): return None def check_row_id_from_snapshot(self, latest_snapshot, commit_entries): - """Check for row ID conflicts from a specific snapshot onwards. - - collects row ID ranges from delta entries, then checks if any - incremental changes between the check snapshot and latest snapshot - have overlapping row ID ranges. - - Args: - latest_snapshot: The latest snapshot at commit time. - commit_entries: The delta entries being committed. - - Returns: - A RuntimeError if conflict is detected, otherwise None. - """ if not self.data_evolution_enabled: return None if self._row_id_check_from_snapshot is None: return None - history_id_ranges = [] - for entry in commit_entries: - first_row_id = entry.file.first_row_id - row_count = entry.file.row_count - if first_row_id is not None: - history_id_ranges.append( - Range(first_row_id, first_row_id + row_count - 1)) + delta_files = [entry.file for entry in commit_entries] + column_checker = RowIdColumnConflictChecker.from_data_files( + self.table.schema_manager, delta_files) + if column_checker is None or column_checker.is_empty(): + return None check_snapshot = self.snapshot_manager.get_snapshot_by_id( self._row_id_check_from_snapshot) @@ -187,12 +303,11 @@ def check_row_id_from_snapshot(self, latest_snapshot, commit_entries): if file_range is None: continue if file_range.from_ < check_next_row_id: - for history_range in history_id_ranges: - if history_range.overlaps(file_range): - return RuntimeError( - "For Data Evolution table, multiple 'MERGE INTO' " - "operations have encountered conflicts, updating " - "the same file, which can render some updates " - "ineffective.") + if column_checker.conflicts_with(entry.file): + return RuntimeError( + "For Data Evolution table, multiple 'MERGE INTO' " + "operations have encountered conflicts, updating " + "the same file, which can render some updates " + "ineffective.") return None diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 486e28924014..cfb4090e771c 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -330,21 +330,6 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str # process snapshot new_snapshot_id = latest_snapshot.id + 1 if latest_snapshot else 1 - # Check if row tracking is enabled - row_tracking_enabled = self.table.options.row_tracking_enabled() - - # Apply row tracking logic if enabled - next_row_id = None - if row_tracking_enabled: - # Assign snapshot ID to delta files - commit_entries = self._assign_snapshot_id(new_snapshot_id, commit_entries) - - # Get the next row ID start from the latest snapshot - first_row_id_start = self._get_next_row_id_start(latest_snapshot) - - # Assign row IDs to new files and get the next row ID for the snapshot - commit_entries, next_row_id = self._assign_row_tracking_meta(first_row_id_start, commit_entries) - # Conflict detection: read base entries from latest snapshot, then check conflicts if detect_conflicts and latest_snapshot is not None: base_entries = self.commit_scanner.read_all_entries_from_changed_partitions( @@ -358,6 +343,14 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str return RetryResult(latest_snapshot, conflict_exception) raise conflict_exception + # Apply row tracking logic after conflict detection (matches Java ordering) + row_tracking_enabled = self.table.options.row_tracking_enabled() + next_row_id = None + if row_tracking_enabled: + commit_entries = self._assign_snapshot_id(new_snapshot_id, commit_entries) + first_row_id_start = self._get_next_row_id_start(latest_snapshot) + commit_entries, next_row_id = self._assign_row_tracking_meta(first_row_id_start, commit_entries) + try: new_manifest_file_meta = self._write_manifest_file(commit_entries, new_manifest_file) self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta]) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index ff54322b92a8..644eb49847b6 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -201,7 +201,10 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { success = true } catch { case e: Exception => - if (!e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT' operations")) { + if ( + !e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT' operations") + && !e.getMessage.contains("Row ID existence conflict") + ) { throw e } }