From 667544c8d9c2d1545008027c2c82c3abab8c2f25 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 26 May 2026 12:48:57 +0800 Subject: [PATCH 1/7] [core][python] Add row ID existence check and column-level conflict detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a Rust-style row ID existence check to both Java and Python commit validation, ensuring pre-assigned firstRowId files reference base files that still exist in the current snapshot. This covers a gap where checkRowIdRangeConflicts misses conflicts when base files are fully deleted (e.g., DROP PARTITION / OVERWRITE) with no replacement. Also port Java's RowIdColumnConflictChecker to Python, enabling row×column two-dimensional conflict detection in check_row_id_from_snapshot. This allows concurrent MERGE INTO on different columns of the same rows to proceed without false conflicts. Co-Authored-By: Claude Opus 4.6 --- .../operation/commit/ConflictDetection.java | 93 ++++++ .../commit/ConflictDetectionTest.java | 87 ++++++ .../tests/write/conflict_detection_test.py | 279 ++++++++++++++++++ .../write/commit/conflict_detection.py | 234 +++++++++++---- 4 files changed, 630 insertions(+), 63 deletions(-) create mode 100644 paimon-python/pypaimon/tests/write/conflict_detection_test.py diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 29ee0458183f..fbe4a9f3b6b7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -224,6 +224,13 @@ public Optional checkConflicts( return exception; } + if (commitKind != CommitKind.COMPACT) { + exception = checkRowIdExistence(baseEntries, deltaEntries); + if (exception.isPresent()) { + return exception; + } + } + exception = checkRowIdRangeConflicts(commitKind, mergedEntries); if (exception.isPresent()) { return exception; @@ -536,6 +543,92 @@ private Optional checkForRowIdFromSnapshot( return Optional.empty(); } + Optional checkRowIdExistence( + List baseEntries, List deltaEntries) { + if (!dataEvolutionEnabled) { + return Optional.empty(); + } + + List filesToCheck = + deltaEntries.stream() + .filter(e -> e.kind() == FileKind.ADD && e.firstRowId() != null) + .collect(Collectors.toList()); + + if (filesToCheck.isEmpty()) { + return Optional.empty(); + } + + Set existingIndex = new HashSet<>(); + for (SimpleFileEntry base : baseEntries) { + if (base.firstRowId() != null) { + existingIndex.add( + new FileRowIdKey( + base.partition(), + base.bucket(), + base.firstRowId(), + base.rowCount())); + } + } + + for (SimpleFileEntry entry : filesToCheck) { + FileRowIdKey key = + new FileRowIdKey( + entry.partition(), + entry.bucket(), + entry.firstRowId(), + entry.rowCount()); + if (!existingIndex.contains(key)) { + return Optional.of( + new RuntimeException( + String.format( + "Row ID existence conflict: file '%s' references " + + "firstRowId=%d, rowCount=%d in bucket %d, " + + "but no matching file exists in the current snapshot. " + + "The referenced file may have been rewritten by a " + + "concurrent compaction or removed by an overwrite.", + entry.fileName(), + entry.firstRowId(), + entry.rowCount(), + entry.bucket()))); + } + } + return Optional.empty(); + } + + private static class FileRowIdKey { + private final BinaryRow partition; + private final int bucket; + private final long firstRowId; + private final long rowCount; + + FileRowIdKey(BinaryRow partition, int bucket, long firstRowId, long rowCount) { + this.partition = partition; + this.bucket = bucket; + this.firstRowId = firstRowId; + this.rowCount = rowCount; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FileRowIdKey that = (FileRowIdKey) o; + return bucket == that.bucket + && firstRowId == that.firstRowId + && rowCount == that.rowCount + && Objects.equals(partition, that.partition); + } + + @Override + public int hashCode() { + return Objects.hash(partition, bucket, firstRowId, rowCount); + } + } + private static boolean dedicatedStorageFile(String fileName) { return isBlobFile(fileName) || isVectorStoreFile(fileName); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index c3e0258da28f..53d2e542e8c4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; +import java.util.Optional; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; @@ -374,6 +375,92 @@ void testShouldBeOverwriteCommit() { .isFalse(); } + @Test + void testCheckRowIdExistenceNoConflict() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L)); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); + + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries)).isEmpty(); + } + + @Test + void testCheckRowIdExistenceBaseFileRemoved() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); + + Optional result = + detection.checkRowIdExistence(baseEntries, deltaEntries); + assertThat(result).isPresent(); + assertThat(result.get().getMessage()).contains("Row ID existence conflict"); + } + + @Test + void testCheckRowIdExistenceBaseFileRewritten() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("f2", ADD, 0L, 200L)); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); + + Optional result = + detection.checkRowIdExistence(baseEntries, deltaEntries); + assertThat(result).isPresent(); + assertThat(result.get().getMessage()).contains("Row ID existence conflict"); + } + + @Test + void testCheckRowIdExistenceSkipsNonPreAssigned() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntry("f1", ADD)); + + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries)).isEmpty(); + } + + @Test + void testCheckRowIdExistenceSkipsDeleteEntries() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L)); + + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries)).isEmpty(); + } + + private SimpleFileEntry createFileEntryWithRowId( + String fileName, FileKind kind, long firstRowId, long rowCount) { + return new SimpleFileEntry( + kind, + EMPTY_ROW, + 0, + 1, + 0, + fileName, + Collections.emptyList(), + null, + EMPTY_ROW, + EMPTY_ROW, + null, + rowCount, + firstRowId); + } + private ConflictDetection createConflictDetection() { return new ConflictDetection( "test-table", diff --git a/paimon-python/pypaimon/tests/write/conflict_detection_test.py b/paimon-python/pypaimon/tests/write/conflict_detection_test.py new file mode 100644 index 000000000000..79de72f62878 --- /dev/null +++ b/paimon-python/pypaimon/tests/write/conflict_detection_test.py @@ -0,0 +1,279 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from dataclasses import dataclass +from typing import List, Optional + +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.schema.data_types import AtomicType, DataField +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.write.commit.conflict_detection import ( + ConflictDetection, + RowIdColumnConflictChecker, +) + + +def _make_file(file_name, row_count=100, first_row_id=None, + schema_id=0, write_cols=None): + return DataFileMeta( + file_name=file_name, + file_size=1024, + row_count=row_count, + min_key=None, + max_key=None, + key_stats=None, + value_stats=None, + min_sequence_number=0, + max_sequence_number=0, + schema_id=schema_id, + level=0, + extra_files=[], + first_row_id=first_row_id, + write_cols=write_cols, + ) + + +_EMPTY_PARTITION = GenericRow([], []) + + +def _make_entry(file_name, kind=0, bucket=0, first_row_id=None, + row_count=100, write_cols=None, schema_id=0): + return ManifestEntry( + kind=kind, + partition=_EMPTY_PARTITION, + bucket=bucket, + total_buckets=1, + file=_make_file(file_name, row_count=row_count, + first_row_id=first_row_id, schema_id=schema_id, + write_cols=write_cols), + ) + + +@dataclass +class _FakeSchema: + id: int + fields: List[DataField] + + +class _FakeSchemaManager: + + def __init__(self, schemas=None): + self._schemas = {} + if schemas: + for s in schemas: + self._schemas[s.id] = s + + def get_schema(self, schema_id): + return self._schemas.get(schema_id) + + +_DEFAULT_SCHEMA = _FakeSchema( + id=0, + fields=[ + DataField(1, "col_a", AtomicType("INT")), + DataField(2, "col_b", AtomicType("STRING")), + DataField(3, "col_c", AtomicType("BIGINT")), + ], +) + + +class TestCheckRowIdExistence(unittest.TestCase): + + def _make_detection(self): + return ConflictDetection( + data_evolution_enabled=True, + snapshot_manager=None, + manifest_list_manager=None, + table=None, + commit_scanner=None, + ) + + def test_no_conflict_when_base_file_exists(self): + detection = self._make_detection() + base = [_make_entry("f1", kind=0, first_row_id=0, row_count=100)] + delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] + self.assertIsNone(detection.check_row_id_existence(base, delta)) + + def test_conflict_when_base_file_removed(self): + detection = self._make_detection() + base = [] + delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] + result = detection.check_row_id_existence(base, delta) + self.assertIsNotNone(result) + self.assertIn("Row ID existence conflict", str(result)) + + def test_conflict_when_base_file_rewritten(self): + detection = self._make_detection() + base = [_make_entry("f2", kind=0, first_row_id=0, row_count=200)] + delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] + result = detection.check_row_id_existence(base, delta) + self.assertIsNotNone(result) + self.assertIn("Row ID existence conflict", str(result)) + + def test_skip_when_no_pre_assigned_row_id(self): + detection = self._make_detection() + base = [] + delta = [_make_entry("f1", kind=0)] + self.assertIsNone(detection.check_row_id_existence(base, delta)) + + def test_skip_delete_entries(self): + detection = self._make_detection() + base = [] + delta = [_make_entry("f1", kind=1, first_row_id=0, row_count=100)] + self.assertIsNone(detection.check_row_id_existence(base, delta)) + + def test_skip_when_data_evolution_disabled(self): + detection = ConflictDetection( + data_evolution_enabled=False, + snapshot_manager=None, + manifest_list_manager=None, + table=None, + commit_scanner=None, + ) + base = [] + delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] + self.assertIsNone(detection.check_row_id_existence(base, delta)) + + +class TestRowIdColumnConflictChecker(unittest.TestCase): + + def _make_checker(self, delta_files, schema=None): + schema_mgr = _FakeSchemaManager([schema or _DEFAULT_SCHEMA]) + return RowIdColumnConflictChecker.from_data_files(schema_mgr, delta_files) + + def test_no_conflict_disjoint_rows(self): + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, write_cols=["col_a"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=200, + write_cols=["col_a"]) + self.assertFalse(checker.conflicts_with(committed)) + + def test_no_conflict_same_rows_different_columns(self): + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, write_cols=["col_a"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=0, + write_cols=["col_b"]) + self.assertFalse(checker.conflicts_with(committed)) + + def test_conflict_same_rows_same_columns(self): + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, write_cols=["col_a"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=0, + write_cols=["col_a"]) + self.assertTrue(checker.conflicts_with(committed)) + + def test_conflict_overlapping_rows_overlapping_columns(self): + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, + write_cols=["col_a", "col_b"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=50, + write_cols=["col_b", "col_c"]) + self.assertTrue(checker.conflicts_with(committed)) + + def test_conflict_null_write_cols_committed(self): + """null write_cols means full-schema write — always conflicts on column dimension.""" + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, write_cols=["col_a"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=0, + write_cols=None) + self.assertTrue(checker.conflicts_with(committed)) + + def test_conflict_null_write_cols_delta(self): + """null write_cols in delta means all columns are in the write range.""" + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, write_cols=None), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=0, + write_cols=["col_b"]) + self.assertTrue(checker.conflicts_with(committed)) + + def test_no_conflict_committed_file_no_row_id(self): + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, write_cols=["col_a"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=None, + write_cols=["col_a"]) + self.assertFalse(checker.conflicts_with(committed)) + + def test_none_when_no_delta_files_with_row_id(self): + delta_files = [ + _make_file("d1", row_count=100, first_row_id=None), + ] + schema_mgr = _FakeSchemaManager([_DEFAULT_SCHEMA]) + checker = RowIdColumnConflictChecker.from_data_files(schema_mgr, delta_files) + self.assertIsNone(checker) + + def test_system_fields_skipped(self): + """System fields like _ROW_ID should not count as column conflicts.""" + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, + write_cols=["_ROW_ID", "col_a"]), + ] + checker = self._make_checker(delta_files) + committed = _make_file("c1", row_count=100, first_row_id=0, + write_cols=["_ROW_ID", "col_b"]) + self.assertFalse(checker.conflicts_with(committed)) + + def test_cross_schema_field_id_resolution(self): + """Fields with same ID but different names across schema versions should still match.""" + schema_v0 = _FakeSchema( + id=0, + fields=[ + DataField(1, "col_a", AtomicType("INT")), + DataField(2, "col_b", AtomicType("STRING")), + ], + ) + schema_v1 = _FakeSchema( + id=1, + fields=[ + DataField(1, "col_a_renamed", AtomicType("INT")), + DataField(2, "col_b", AtomicType("STRING")), + DataField(3, "col_c", AtomicType("BIGINT")), + ], + ) + schema_mgr = _FakeSchemaManager([schema_v0, schema_v1]) + delta_files = [ + _make_file("d1", row_count=100, first_row_id=0, + schema_id=0, write_cols=["col_a"]), + ] + checker = RowIdColumnConflictChecker.from_data_files(schema_mgr, delta_files) + committed_same_field = _make_file( + "c1", row_count=100, first_row_id=0, + schema_id=1, write_cols=["col_a_renamed"]) + self.assertTrue(checker.conflicts_with(committed_same_field)) + committed_diff_field = _make_file( + "c2", row_count=100, first_row_id=0, + schema_id=1, write_cols=["col_c"]) + self.assertFalse(checker.conflicts_with(committed_diff_field)) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py b/paimon-python/pypaimon/write/commit/conflict_detection.py index 6b7652c00be8..bf51da530375 100644 --- a/paimon-python/pypaimon/write/commit/conflict_detection.py +++ b/paimon-python/pypaimon/write/commit/conflict_detection.py @@ -19,31 +19,140 @@ Conflict detection for commit operations. """ +import bisect + from pypaimon.manifest.manifest_list_manager import ManifestListManager from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.file_entry import FileEntry +from pypaimon.table.special_fields import SpecialFields from pypaimon.utils.range import Range from pypaimon.utils.range_helper import RangeHelper from pypaimon.write.commit.commit_scanner import CommitScanner -class ConflictDetection: - """Detects conflicts between base and delta files during commit. +class RowIdColumnConflictChecker: + """Checks for row ID × column conflicts between delta files and committed files. - This class provides row ID range conflict checks and row ID from snapshot conflict checks - for Data Evolution tables. + Built from the current commit's delta files. For each committed file, + checks whether it overlaps with the delta files on BOTH dimensions: + row-id range AND write columns. """ + def __init__(self, write_ranges, schema_manager): + self._write_ranges = write_ranges + self._schema_manager = schema_manager + self._field_id_cache = {} + + @classmethod + def from_data_files(cls, schema_manager, delta_files): + files_with_row_id = [f for f in delta_files if f.first_row_id is not None] + if not files_with_row_id: + return None + + range_helper = RangeHelper(lambda f: f.row_id_range()) + groups = range_helper.merge_overlapping_ranges(files_with_row_id) + + write_ranges = [] + for group in groups: + merged_from = min(f.first_row_id for f in group) + merged_to = max(f.first_row_id + f.row_count - 1 for f in group) + merged_range = Range(merged_from, merged_to) + + field_ids = set() + for f in group: + cls._add_write_field_ids(field_ids, f, schema_manager) + + write_ranges.append(_WriteRange(merged_range, field_ids)) + + write_ranges.sort(key=lambda wr: (wr.range.from_, wr.range.to)) + return cls(write_ranges, schema_manager) + + def is_empty(self): + return len(self._write_ranges) == 0 + + def conflicts_with(self, file): + if file.first_row_id is None: + return False + + file_range = Range(file.first_row_id, file.first_row_id + file.row_count - 1) + index = self._first_possible_range(file_range) + + while index < len(self._write_ranges): + wr = self._write_ranges[index] + if wr.range.from_ > file_range.to: + return False + if wr.range.overlaps(file_range) and self._contains_any_write_field(wr.field_ids, file): + return True + index += 1 + + return False + + def _first_possible_range(self, target): + keys = [wr.range.to for wr in self._write_ranges] + return bisect.bisect_left(keys, target.from_) + + def _contains_any_write_field(self, field_ids, file): + if file.write_cols is None: + return True + for col_name in file.write_cols: + fid = self._field_id(file, col_name) + if fid is not None and fid in field_ids: + return True + return False + + def _field_id(self, file, col_name): + if SpecialFields.is_system_field(col_name): + return None + name_to_id = self._field_id_by_name(file.schema_id) + fid = name_to_id.get(col_name) + if fid is None: + raise RuntimeError( + f"Column '{col_name}' not found in schema {file.schema_id}") + return fid + + def _field_id_by_name(self, schema_id): + if schema_id not in self._field_id_cache: + schema = self._schema_manager.get_schema(schema_id) + if schema is None: + raise RuntimeError(f"Schema {schema_id} not found") + self._field_id_cache[schema_id] = { + field.name: field.id for field in schema.fields + } + return self._field_id_cache[schema_id] + + @classmethod + def _add_write_field_ids(cls, field_ids, file, schema_manager): + if file.write_cols is None: + schema = schema_manager.get_schema(file.schema_id) + if schema is not None: + for field in schema.fields: + if not SpecialFields.is_system_field(field.name): + field_ids.add(field.id) + else: + name_to_id = {} + schema = schema_manager.get_schema(file.schema_id) + if schema is not None: + name_to_id = {field.name: field.id for field in schema.fields} + for col_name in file.write_cols: + if SpecialFields.is_system_field(col_name): + continue + fid = name_to_id.get(col_name) + if fid is not None: + field_ids.add(fid) + + +class _WriteRange: + + def __init__(self, range_, field_ids): + self.range = range_ + self.field_ids = field_ids + + +class ConflictDetection: + """Detects conflicts between base and delta files during commit.""" + def __init__(self, data_evolution_enabled, snapshot_manager, manifest_list_manager: ManifestListManager, table, commit_scanner: CommitScanner): - """Initialize ConflictDetection. - - Args: - data_evolution_enabled: Whether data evolution feature is enabled. - snapshot_manager: Manager for reading snapshot metadata. - manifest_list_manager: Manager for reading manifest lists. - table: The FileStoreTable instance. - """ self.data_evolution_enabled = data_evolution_enabled self.snapshot_manager = snapshot_manager self.manifest_list_manager = manifest_list_manager @@ -58,20 +167,6 @@ def has_row_id_check_from_snapshot(self): return self._row_id_check_from_snapshot is not None def check_conflicts(self, latest_snapshot, base_entries, delta_entries, commit_kind): - """Run all conflict checks and return the first detected conflict. - - merges base_entries and delta_entries, then runs conflict checks - on the merged result. - - Args: - latest_snapshot: The latest snapshot at commit time. - base_entries: All entries read from the latest snapshot. - delta_entries: The delta entries being committed. - commit_kind: The kind of commit (e.g. "APPEND", "COMPACT", "OVERWRITE"). - - Returns: - A RuntimeError if a conflict is detected, otherwise None. - """ all_entries = list(base_entries) + list(delta_entries) try: @@ -80,25 +175,54 @@ def check_conflicts(self, latest_snapshot, base_entries, delta_entries, commit_k return RuntimeError( "File deletion conflicts detected! Give up committing. " + str(e)) + if commit_kind != "COMPACT": + conflict = self.check_row_id_existence(base_entries, delta_entries) + if conflict is not None: + return conflict + conflict = self.check_row_id_range_conflicts(commit_kind, merged_entries) if conflict is not None: return conflict return self.check_row_id_from_snapshot(latest_snapshot, delta_entries) - def check_row_id_range_conflicts(self, commit_kind, commit_entries): - """Check for row ID range conflicts among merged entries. + def check_row_id_existence(self, base_entries, delta_entries): + if not self.data_evolution_enabled: + return None - only enabled when data evolution is active, and checks that - overlapping row ID ranges in non-blob data files are identical. + files_to_check = [ + entry for entry in delta_entries + if entry.kind == 0 and entry.file.first_row_id is not None + ] + + if not files_to_check: + return None - Args: - commit_kind: The kind of commit (e.g. "APPEND", "COMPACT"). - commit_entries: The entries being committed. + existing_index = set() + for base in base_entries: + if base.file.first_row_id is not None: + existing_index.add(( + base.partition, base.bucket, + base.file.first_row_id, base.file.row_count)) + + for entry in files_to_check: + key = (entry.partition, entry.bucket, + entry.file.first_row_id, entry.file.row_count) + if key not in existing_index: + return RuntimeError( + "Row ID existence conflict: file '{}' references " + "firstRowId={}, rowCount={} in bucket {}, " + "but no matching file exists in the current snapshot. " + "The referenced file may have been rewritten by a " + "concurrent compaction or removed by an overwrite.".format( + entry.file.file_name, + entry.file.first_row_id, + entry.file.row_count, + entry.bucket)) + + return None - Returns: - A RuntimeError if conflict is detected, otherwise None. - """ + def check_row_id_range_conflicts(self, commit_kind, commit_entries): if not self.data_evolution_enabled: return None if self._row_id_check_from_snapshot is None and commit_kind != "COMPACT": @@ -137,31 +261,16 @@ def check_row_id_range_conflicts(self, commit_kind, commit_entries): return None def check_row_id_from_snapshot(self, latest_snapshot, commit_entries): - """Check for row ID conflicts from a specific snapshot onwards. - - collects row ID ranges from delta entries, then checks if any - incremental changes between the check snapshot and latest snapshot - have overlapping row ID ranges. - - Args: - latest_snapshot: The latest snapshot at commit time. - commit_entries: The delta entries being committed. - - Returns: - A RuntimeError if conflict is detected, otherwise None. - """ if not self.data_evolution_enabled: return None if self._row_id_check_from_snapshot is None: return None - history_id_ranges = [] - for entry in commit_entries: - first_row_id = entry.file.first_row_id - row_count = entry.file.row_count - if first_row_id is not None: - history_id_ranges.append( - Range(first_row_id, first_row_id + row_count - 1)) + delta_files = [entry.file for entry in commit_entries] + column_checker = RowIdColumnConflictChecker.from_data_files( + self.table.schema_manager, delta_files) + if column_checker is None or column_checker.is_empty(): + return None check_snapshot = self.snapshot_manager.get_snapshot_by_id( self._row_id_check_from_snapshot) @@ -187,12 +296,11 @@ def check_row_id_from_snapshot(self, latest_snapshot, commit_entries): if file_range is None: continue if file_range.from_ < check_next_row_id: - for history_range in history_id_ranges: - if history_range.overlaps(file_range): - return RuntimeError( - "For Data Evolution table, multiple 'MERGE INTO' " - "operations have encountered conflicts, updating " - "the same file, which can render some updates " - "ineffective.") + if column_checker.conflicts_with(entry.file): + return RuntimeError( + "For Data Evolution table, multiple 'MERGE INTO' " + "operations have encountered conflicts, updating " + "the same file, which can render some updates " + "ineffective.") return None From bb3c4863fbb3ce27701f6039ce87a55f656e5124 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 26 May 2026 13:34:19 +0800 Subject: [PATCH 2/7] [core][python] Fix checkRowIdExistence to skip newly appended files Row tracking assigns firstRowId before conflict detection, so newly appended files also have firstRowId set and were falsely rejected. Add nextRowId parameter to only check files where firstRowId < nextRowId. Co-Authored-By: Claude Opus 4.6 --- .../operation/commit/ConflictDetection.java | 14 +++++-- .../commit/ConflictDetectionTest.java | 38 ++++++++++++++++--- .../tests/write/conflict_detection_test.py | 30 ++++++++++++--- .../write/commit/conflict_detection.py | 13 +++++-- 4 files changed, 78 insertions(+), 17 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index fbe4a9f3b6b7..493d13d88dee 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -225,7 +225,8 @@ public Optional checkConflicts( } if (commitKind != CommitKind.COMPACT) { - exception = checkRowIdExistence(baseEntries, deltaEntries); + Long nextRowId = latestSnapshot.nextRowId(); + exception = checkRowIdExistence(baseEntries, deltaEntries, nextRowId); if (exception.isPresent()) { return exception; } @@ -544,14 +545,21 @@ private Optional checkForRowIdFromSnapshot( } Optional checkRowIdExistence( - List baseEntries, List deltaEntries) { + List baseEntries, + List deltaEntries, + @Nullable Long nextRowId) { if (!dataEvolutionEnabled) { return Optional.empty(); } List filesToCheck = deltaEntries.stream() - .filter(e -> e.kind() == FileKind.ADD && e.firstRowId() != null) + .filter( + e -> + e.kind() == FileKind.ADD + && e.firstRowId() != null + && nextRowId != null + && e.firstRowId() < nextRowId) .collect(Collectors.toList()); if (filesToCheck.isEmpty()) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index 53d2e542e8c4..1c36b9e09ee7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -385,7 +385,7 @@ void testCheckRowIdExistenceNoConflict() { List deltaEntries = new ArrayList<>(); deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); - assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries)).isEmpty(); + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty(); } @Test @@ -398,7 +398,7 @@ void testCheckRowIdExistenceBaseFileRemoved() { deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); Optional result = - detection.checkRowIdExistence(baseEntries, deltaEntries); + detection.checkRowIdExistence(baseEntries, deltaEntries, 100L); assertThat(result).isPresent(); assertThat(result.get().getMessage()).contains("Row ID existence conflict"); } @@ -414,11 +414,28 @@ void testCheckRowIdExistenceBaseFileRewritten() { deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); Optional result = - detection.checkRowIdExistence(baseEntries, deltaEntries); + detection.checkRowIdExistence(baseEntries, deltaEntries, 200L); assertThat(result).isPresent(); assertThat(result.get().getMessage()).contains("Row ID existence conflict"); } + @Test + void testCheckRowIdExistenceSkipsNewlyAppendedFiles() { + ConflictDetection detection = createConflictDetection(); + + // nextRowId=100: files with firstRowId >= 100 are newly appended, not references + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L)); + + List deltaEntries = new ArrayList<>(); + // partial-column update referencing existing rows (firstRowId=0 < nextRowId=100) + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); + // newly appended file (firstRowId=100 >= nextRowId=100), should be skipped + deltaEntries.add(createFileEntryWithRowId("new1", ADD, 100L, 50L)); + + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty(); + } + @Test void testCheckRowIdExistenceSkipsNonPreAssigned() { ConflictDetection detection = createConflictDetection(); @@ -428,7 +445,7 @@ void testCheckRowIdExistenceSkipsNonPreAssigned() { List deltaEntries = new ArrayList<>(); deltaEntries.add(createFileEntry("f1", ADD)); - assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries)).isEmpty(); + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty(); } @Test @@ -440,7 +457,18 @@ void testCheckRowIdExistenceSkipsDeleteEntries() { List deltaEntries = new ArrayList<>(); deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L)); - assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries)).isEmpty(); + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty(); + } + + @Test + void testCheckRowIdExistenceSkipsWhenNextRowIdNull() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 100L)); + + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, null)).isEmpty(); } private SimpleFileEntry createFileEntryWithRowId( diff --git a/paimon-python/pypaimon/tests/write/conflict_detection_test.py b/paimon-python/pypaimon/tests/write/conflict_detection_test.py index 79de72f62878..fb04dee3676b 100644 --- a/paimon-python/pypaimon/tests/write/conflict_detection_test.py +++ b/paimon-python/pypaimon/tests/write/conflict_detection_test.py @@ -108,13 +108,14 @@ def test_no_conflict_when_base_file_exists(self): detection = self._make_detection() base = [_make_entry("f1", kind=0, first_row_id=0, row_count=100)] delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] - self.assertIsNone(detection.check_row_id_existence(base, delta)) + self.assertIsNone( + detection.check_row_id_existence(base, delta, next_row_id=200)) def test_conflict_when_base_file_removed(self): detection = self._make_detection() base = [] delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] - result = detection.check_row_id_existence(base, delta) + result = detection.check_row_id_existence(base, delta, next_row_id=200) self.assertIsNotNone(result) self.assertIn("Row ID existence conflict", str(result)) @@ -122,21 +123,30 @@ def test_conflict_when_base_file_rewritten(self): detection = self._make_detection() base = [_make_entry("f2", kind=0, first_row_id=0, row_count=200)] delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] - result = detection.check_row_id_existence(base, delta) + result = detection.check_row_id_existence(base, delta, next_row_id=200) self.assertIsNotNone(result) self.assertIn("Row ID existence conflict", str(result)) + def test_skip_newly_appended_files(self): + detection = self._make_detection() + base = [] + delta = [_make_entry("p1", kind=0, first_row_id=200, row_count=100)] + self.assertIsNone( + detection.check_row_id_existence(base, delta, next_row_id=200)) + def test_skip_when_no_pre_assigned_row_id(self): detection = self._make_detection() base = [] delta = [_make_entry("f1", kind=0)] - self.assertIsNone(detection.check_row_id_existence(base, delta)) + self.assertIsNone( + detection.check_row_id_existence(base, delta, next_row_id=200)) def test_skip_delete_entries(self): detection = self._make_detection() base = [] delta = [_make_entry("f1", kind=1, first_row_id=0, row_count=100)] - self.assertIsNone(detection.check_row_id_existence(base, delta)) + self.assertIsNone( + detection.check_row_id_existence(base, delta, next_row_id=200)) def test_skip_when_data_evolution_disabled(self): detection = ConflictDetection( @@ -148,7 +158,15 @@ def test_skip_when_data_evolution_disabled(self): ) base = [] delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] - self.assertIsNone(detection.check_row_id_existence(base, delta)) + self.assertIsNone( + detection.check_row_id_existence(base, delta, next_row_id=200)) + + def test_skip_when_next_row_id_is_none(self): + detection = self._make_detection() + base = [] + delta = [_make_entry("p1", kind=0, first_row_id=0, row_count=100)] + self.assertIsNone( + detection.check_row_id_existence(base, delta, next_row_id=None)) class TestRowIdColumnConflictChecker(unittest.TestCase): diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py b/paimon-python/pypaimon/write/commit/conflict_detection.py index bf51da530375..fc92dc838b93 100644 --- a/paimon-python/pypaimon/write/commit/conflict_detection.py +++ b/paimon-python/pypaimon/write/commit/conflict_detection.py @@ -176,7 +176,9 @@ def check_conflicts(self, latest_snapshot, base_entries, delta_entries, commit_k "File deletion conflicts detected! Give up committing. " + str(e)) if commit_kind != "COMPACT": - conflict = self.check_row_id_existence(base_entries, delta_entries) + next_row_id = latest_snapshot.next_row_id if latest_snapshot else None + conflict = self.check_row_id_existence( + base_entries, delta_entries, next_row_id) if conflict is not None: return conflict @@ -186,13 +188,18 @@ def check_conflicts(self, latest_snapshot, base_entries, delta_entries, commit_k return self.check_row_id_from_snapshot(latest_snapshot, delta_entries) - def check_row_id_existence(self, base_entries, delta_entries): + def check_row_id_existence(self, base_entries, delta_entries, next_row_id=None): if not self.data_evolution_enabled: return None + if next_row_id is None: + return None + files_to_check = [ entry for entry in delta_entries - if entry.kind == 0 and entry.file.first_row_id is not None + if entry.kind == 0 + and entry.file.first_row_id is not None + and entry.file.first_row_id < next_row_id ] if not files_to_check: From b2cc80734a792e17e2cf3979eebea0bcafbb305f Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 26 May 2026 13:46:24 +0800 Subject: [PATCH 3/7] [python] Move row ID assignment after conflict detection to align with Java In Java, conflict detection runs before row ID assignment. Python had the opposite order, which meant delta entries already had firstRowId set during conflict checks. Reorder to match Java's flow. Co-Authored-By: Claude Opus 4.6 --- .../pypaimon/write/file_store_commit.py | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 486e28924014..cfb4090e771c 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -330,21 +330,6 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str # process snapshot new_snapshot_id = latest_snapshot.id + 1 if latest_snapshot else 1 - # Check if row tracking is enabled - row_tracking_enabled = self.table.options.row_tracking_enabled() - - # Apply row tracking logic if enabled - next_row_id = None - if row_tracking_enabled: - # Assign snapshot ID to delta files - commit_entries = self._assign_snapshot_id(new_snapshot_id, commit_entries) - - # Get the next row ID start from the latest snapshot - first_row_id_start = self._get_next_row_id_start(latest_snapshot) - - # Assign row IDs to new files and get the next row ID for the snapshot - commit_entries, next_row_id = self._assign_row_tracking_meta(first_row_id_start, commit_entries) - # Conflict detection: read base entries from latest snapshot, then check conflicts if detect_conflicts and latest_snapshot is not None: base_entries = self.commit_scanner.read_all_entries_from_changed_partitions( @@ -358,6 +343,14 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str return RetryResult(latest_snapshot, conflict_exception) raise conflict_exception + # Apply row tracking logic after conflict detection (matches Java ordering) + row_tracking_enabled = self.table.options.row_tracking_enabled() + next_row_id = None + if row_tracking_enabled: + commit_entries = self._assign_snapshot_id(new_snapshot_id, commit_entries) + first_row_id_start = self._get_next_row_id_start(latest_snapshot) + commit_entries, next_row_id = self._assign_row_tracking_meta(first_row_id_start, commit_entries) + try: new_manifest_file_meta = self._write_manifest_file(commit_entries, new_manifest_file) self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta]) From 88d228c34677ba8e852eef1fba8be27a38783c9f Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 26 May 2026 13:49:39 +0800 Subject: [PATCH 4/7] [python] Remove unused Optional import in conflict detection test Co-Authored-By: Claude Opus 4.6 --- paimon-python/pypaimon/tests/write/conflict_detection_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/write/conflict_detection_test.py b/paimon-python/pypaimon/tests/write/conflict_detection_test.py index fb04dee3676b..302a7d801fa3 100644 --- a/paimon-python/pypaimon/tests/write/conflict_detection_test.py +++ b/paimon-python/pypaimon/tests/write/conflict_detection_test.py @@ -17,7 +17,7 @@ import unittest from dataclasses import dataclass -from typing import List, Optional +from typing import List from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import ManifestEntry From 2e338d9d41aa335bad0d390d836ab091e2d790a0 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 26 May 2026 14:18:45 +0800 Subject: [PATCH 5/7] [python] Fix compact conflict e2e test assertion to match new error message The check_row_id_existence error message uses singular 'conflict', while the test asserted on plural 'conflicts'. Use 'conflict' to match both existence check and range conflict messages. Co-Authored-By: Claude Opus 4.6 --- paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index f48f4c99f368..13555a423131 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -762,7 +762,7 @@ def test_compact_conflict_shard_update(self): tc = wb.new_commit() with self.assertRaises(RuntimeError) as ctx: tc.commit(stale_commit_msgs) - self.assertIn("conflicts", str(ctx.exception)) + self.assertIn("conflict", str(ctx.exception)) tc.close() print(f"Conflict detected as expected: {ctx.exception}") From ef5291d24433f1b1065d0439516bc0dd31434a42 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 26 May 2026 15:04:24 +0800 Subject: [PATCH 6/7] [spark] Add Row ID existence conflict to doWithRetry catch in test The concurrent merge+compact test retries on conflict exceptions, but only caught the range conflict message. Add the new existence conflict message so the retry works for both conflict types. Co-Authored-By: Claude Opus 4.6 --- .../org/apache/paimon/spark/sql/RowTrackingTestBase.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index ff54322b92a8..5ca0c9f94d70 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -201,7 +201,8 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { success = true } catch { case e: Exception => - if (!e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT' operations")) { + if (!e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT' operations") + && !e.getMessage.contains("Row ID existence conflict")) { throw e } } From a9631f3a022f2a0ebdbef4099e9752f9c1df7d84 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 26 May 2026 15:26:06 +0800 Subject: [PATCH 7/7] [spark] Fix spotless formatting in RowTrackingTestBase Co-Authored-By: Claude Opus 4.6 --- .../org/apache/paimon/spark/sql/RowTrackingTestBase.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 5ca0c9f94d70..644eb49847b6 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -201,8 +201,10 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { success = true } catch { case e: Exception => - if (!e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT' operations") - && !e.getMessage.contains("Row ID existence conflict")) { + if ( + !e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT' operations") + && !e.getMessage.contains("Row ID existence conflict") + ) { throw e } }