From 9127dee024ac2fe28813a686d032b6aceeda9654 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 26 May 2026 14:00:12 +0800 Subject: [PATCH 1/6] [core][flink] Detect row-id reassign conflicts for index commits --- .../java/org/apache/paimon/CoreOptions.java | 12 ++ .../main/java/org/apache/paimon/Snapshot.java | 1 + .../btree/BTreeGlobalIndexBuilder.java | 9 + .../paimon/operation/FileStoreCommitImpl.java | 41 ++++- .../operation/commit/ConflictDetection.java | 80 +++++++++ .../paimon/operation/FileStoreCommitTest.java | 125 +++++++++++++- .../commit/ConflictDetectionTest.java | 162 ++++++++++++++++++ .../flink/btree/BTreeIndexTopoBuilder.java | 34 +++- .../GenericGlobalIndexBuilder.java | 21 ++- .../globalindex/GenericIndexTopoBuilder.java | 33 +++- 10 files changed, 502 insertions(+), 16 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 6b0ef75ff823..3f05a62a1e3a 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2152,6 +2152,14 @@ public InlineElement getDescription() { + "APPEND snapshot which committed files to fixed bucket, commit will be aborted." + "If the value of this option is -1, committer will not check for its first commit."); + public static final ConfigOption COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT = + ConfigOptions.key("commit.row-id-reassign.last-safe-snapshot") + .longType() + .noDefaultValue() + .withDescription( + "If set, committer will check if there are row-id reassignment snapshots starting from the " + + "snapshot after this one. If found, commit will be aborted."); + public static final ConfigOption CLUSTERING_COLUMNS = key("clustering.columns") .stringType() @@ -3778,6 +3786,10 @@ public Optional commitStrictModeLastSafeSnapshot() { return options.getOptional(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT); } + public Optional commitRowIdReassignLastSafeSnapshot() { + return options.getOptional(COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT); + } + public List clusteringColumns() { return clusteringColumns(options.get(CLUSTERING_COLUMNS)); } diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java index 93f525a7e3fb..d7f0ca2131cf 100644 --- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java @@ -45,6 +45,7 @@ public class Snapshot implements Serializable { private static final long serialVersionUID = 1L; public static final long FIRST_SNAPSHOT_ID = 1; + public static final String ROW_ID_REASSIGN_PROPERTY = "row-id-reassign"; protected static final int CURRENT_VERSION = 3; diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java index 9f143d6cf666..ee3348843cb0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java @@ -95,6 +95,7 @@ public class BTreeGlobalIndexBuilder implements Serializable { // readRowType is composed by partition fields, indexed field and _ROW_ID field private RowType readRowType; @Nullable private Snapshot snapshot; + @Nullable private Long scanSnapshotId; @Nullable private PartitionPredicate partitionPredicate; @@ -131,6 +132,10 @@ public BTreeGlobalIndexBuilder withSnapshot(Snapshot snapshot) { return this; } + public Optional scanSnapshotId() { + return Optional.ofNullable(scanSnapshotId); + } + public Optional>> scan() { SnapshotReader snapshotReader = table.newSnapshotReader(); if (partitionPredicate != null) { @@ -141,8 +146,10 @@ public Optional>> scan() { ? this.snapshot : snapshotReader.snapshotManager().latestSnapshot(); if (snapshot == null) { + scanSnapshotId = null; return Optional.empty(); } + scanSnapshotId = snapshot.id(); snapshotReader = snapshotReader.withSnapshot(snapshot); Range dataRange = new Range(0, snapshot.nextRowId() - 1); @@ -162,8 +169,10 @@ public Optional>> incrementalScan() { ? this.snapshot : snapshotReader.snapshotManager().latestSnapshot(); if (snapshot == null) { + scanSnapshotId = null; return Optional.empty(); } + scanSnapshotId = snapshot.id(); snapshotReader = snapshotReader.withSnapshot(snapshot); Preconditions.checkArgument(indexField != null, "indexField must be set before scan."); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 3f9fdb9f1c0c..f42ebc412dd0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -218,6 +218,8 @@ public FileStoreCommitImpl( id)) .orElse(null); this.conflictDetection = conflictDetectFactory.create(scanner); + options.commitRowIdReassignLastSafeSnapshot() + .ifPresent(this.conflictDetection::setRowIdReassignCheckFromSnapshot); this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile); } @@ -325,6 +327,9 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { checkAppendFiles = true; allowRollback = true; } + if (conflictDetection.hasRowIdReassignCheckFromSnapshot()) { + checkAppendFiles = true; + } attempts += tryCommit( @@ -1120,7 +1125,8 @@ public boolean replaceManifestList( baseManifestList, deltaManifestList, latest.indexManifest(), - latest.nextRowId()); + latest.nextRowId(), + withoutRowIdReassignProperties(latest.properties())); } public boolean replaceManifestList( @@ -1130,6 +1136,24 @@ public boolean replaceManifestList( Pair deltaManifestList, @Nullable String indexManifest, @Nullable Long nextRowId) { + return replaceManifestList( + latest, + totalRecordCount, + baseManifestList, + deltaManifestList, + indexManifest, + nextRowId, + withoutRowIdReassignProperties(latest.properties())); + } + + public boolean replaceManifestList( + Snapshot latest, + long totalRecordCount, + Pair baseManifestList, + Pair deltaManifestList, + @Nullable String indexManifest, + @Nullable Long nextRowId, + @Nullable Map properties) { Snapshot newSnapshot = new Snapshot( latest.id() + 1, @@ -1151,7 +1175,7 @@ public boolean replaceManifestList( latest.watermark(), latest.statistics(), // if empty properties, just set to null - latest.properties(), + properties, nextRowId); return commitSnapshotImpl(newSnapshot, emptyList()); @@ -1230,12 +1254,23 @@ private boolean compactManifestOnce() { null, latestSnapshot.watermark(), latestSnapshot.statistics(), - latestSnapshot.properties(), + withoutRowIdReassignProperties(latestSnapshot.properties()), latestSnapshot.nextRowId()); return commitSnapshotImpl(newSnapshot, emptyList()); } + private static @Nullable Map withoutRowIdReassignProperties( + @Nullable Map properties) { + if (properties == null || !properties.containsKey(Snapshot.ROW_ID_REASSIGN_PROPERTY)) { + return properties; + } + + Map copied = new HashMap<>(properties); + copied.remove(Snapshot.ROW_ID_REASSIGN_PROPERTY); + return copied.isEmpty() ? null : copied; + } + private boolean commitSnapshotImpl(Snapshot newSnapshot, List deltaStatistics) { try { List statistics = new ArrayList<>(deltaStatistics.size()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 493d13d88dee..fc4e1afe77ff 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -95,6 +95,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { private @Nullable PartitionExpire partitionExpire; private @Nullable Long rowIdCheckFromSnapshot = null; + private @Nullable Long rowIdReassignCheckFromSnapshot = null; public ConflictDetection( String tableName, @@ -131,6 +132,14 @@ public boolean hasRowIdCheckFromSnapshot() { return rowIdCheckFromSnapshot != null; } + public void setRowIdReassignCheckFromSnapshot(@Nullable Long rowIdReassignCheckFromSnapshot) { + this.rowIdReassignCheckFromSnapshot = rowIdReassignCheckFromSnapshot; + } + + public boolean hasRowIdReassignCheckFromSnapshot() { + return rowIdReassignCheckFromSnapshot != null; + } + @Nullable public Comparator keyComparator() { return keyComparator; @@ -237,6 +246,11 @@ public Optional checkConflicts( return exception; } + exception = checkRowIdReassignConflicts(latestSnapshot, deltaEntries, deltaIndexEntries); + if (exception.isPresent()) { + return exception; + } + return checkForRowIdFromSnapshot( latestSnapshot, deltaEntries, deltaIndexEntries, rowIdColumnConflictChecker); } @@ -544,6 +558,72 @@ private Optional checkForRowIdFromSnapshot( return Optional.empty(); } + private Optional checkRowIdReassignConflicts( + Snapshot latestSnapshot, + List deltaEntries, + List deltaIndexEntries) { + if (!dataEvolutionEnabled) { + return Optional.empty(); + } + if (rowIdReassignCheckFromSnapshot == null) { + return Optional.empty(); + } + if (latestSnapshot.id() <= rowIdReassignCheckFromSnapshot) { + return Optional.empty(); + } + + List changedPartitions = + changedPartitionsIncludingAllIndexFiles(deltaEntries, deltaIndexEntries); + for (long id = rowIdReassignCheckFromSnapshot + 1; id <= latestSnapshot.id(); id++) { + Snapshot snapshot = snapshotManager.snapshot(id); + if (snapshot.commitKind() != CommitKind.OVERWRITE) { + continue; + } + if (hasRowIdReassignProperty(snapshot.properties())) { + return Optional.of( + new RuntimeException( + String.format( + "Row-id reassignment snapshot %s was committed after the " + + "task planned from snapshot %s. The task must " + + "be retried with the latest row ids.", + id, rowIdReassignCheckFromSnapshot))); + } + if (overwriteChangedTargetPartitions(snapshot, changedPartitions)) { + return Optional.of( + new RuntimeException( + String.format( + "Overwrite snapshot %s changed partitions after the " + + "task planned from snapshot %s. The task must " + + "be retried with the latest row ids.", + id, rowIdReassignCheckFromSnapshot))); + } + } + return Optional.empty(); + } + + private boolean hasRowIdReassignProperty(@Nullable Map properties) { + return properties != null + && Boolean.parseBoolean(properties.get(Snapshot.ROW_ID_REASSIGN_PROPERTY)); + } + + private boolean overwriteChangedTargetPartitions( + Snapshot snapshot, List changedPartitions) { + return !changedPartitions.isEmpty() + && !commitScanner.readIncrementalEntries(snapshot, changedPartitions).isEmpty(); + } + + private List changedPartitionsIncludingAllIndexFiles( + List dataFileChanges, List indexFileChanges) { + Set changedPartitions = new HashSet<>(); + for (SimpleFileEntry file : dataFileChanges) { + changedPartitions.add(file.partition()); + } + for (IndexManifestEntry file : indexFileChanges) { + changedPartitions.add(file.partition()); + } + return new ArrayList<>(changedPartitions); + } + Optional checkRowIdExistence( List baseEntries, List deltaEntries, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 71eb081de89f..eab2a9fa2ab0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -57,6 +57,7 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FailingFileIO; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TraceableFileIO; @@ -1008,6 +1009,100 @@ public void testCommitManifestWithProperties() throws Exception { } } + @Test + public void testReplaceManifestListWithRowIdReassignProperty() throws Exception { + TestFileStore store = createStore(false); + + List keyValues = generateDataList(1); + BinaryRow partition = gen.getPartition(keyValues.get(0)); + Snapshot latest = store.commitData(keyValues, s -> partition, kv -> 0).get(0); + + Map reassignProperties = new HashMap<>(); + reassignProperties.put("keep", "v1"); + reassignProperties.put(Snapshot.ROW_ID_REASSIGN_PROPERTY, "true"); + try (FileStoreCommitImpl commit = store.newCommit()) { + assertThat( + commit.replaceManifestList( + latest, + latest.totalRecordCount(), + baseManifestList(latest), + deltaManifestList(latest), + latest.indexManifest(), + latest.nextRowId(), + reassignProperties)) + .isTrue(); + } + + Snapshot reassignSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(reassignSnapshot.properties()).isEqualTo(reassignProperties); + + try (FileStoreCommitImpl commit = store.newCommit()) { + assertThat( + commit.replaceManifestList( + reassignSnapshot, + reassignSnapshot.totalRecordCount(), + baseManifestList(reassignSnapshot), + deltaManifestList(reassignSnapshot), + reassignSnapshot.indexManifest(), + reassignSnapshot.nextRowId())) + .isTrue(); + } + + Snapshot normalSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(normalSnapshot.properties()) + .containsEntry("keep", "v1") + .doesNotContainKey(Snapshot.ROW_ID_REASSIGN_PROPERTY); + } + + @Test + public void testRowIdReassignConflictFromOptions() throws Exception { + TestFileStore store = createStore(false); + + List keyValues = generateDataList(1); + BinaryRow partition = gen.getPartition(keyValues.get(0)); + Snapshot latest = store.commitData(keyValues, s -> partition, kv -> 0).get(0); + + Map reassignProperties = new HashMap<>(); + reassignProperties.put(Snapshot.ROW_ID_REASSIGN_PROPERTY, "true"); + try (FileStoreCommitImpl commit = store.newCommit()) { + assertThat( + commit.replaceManifestList( + latest, + latest.totalRecordCount(), + baseManifestList(latest), + deltaManifestList(latest), + latest.indexManifest(), + latest.nextRowId(), + reassignProperties)) + .isTrue(); + } + + AtomicReference committableRef = new AtomicReference<>(); + store.commitDataImpl( + generateDataList(1), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> committableRef.set(committable)); + + Map dynamicOptions = new HashMap<>(store.options().toMap()); + dynamicOptions.put(CoreOptions.COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT.key(), "1"); + try (FileStoreCommitImpl commit = + newCommitWithSnapshotCommit( + store, + "row-id-reassign-check", + new RenamingSnapshotCommit(store.snapshotManager(), Lock.empty()), + new CoreOptions(dynamicOptions), + true)) { + assertThatThrownBy(() -> commit.commit(checkNotNull(committableRef.get()), false)) + .hasMessageContaining("Row-id reassignment snapshot 2") + .hasMessageContaining("task planned from snapshot 1"); + } + } + @Test public void testCommitTwiceWithDifferentKind() throws Exception { TestFileStore store = createStore(false); @@ -1082,6 +1177,20 @@ public void testCommitRetryAfterFalseSuccessDoesNotCleanManifest() throws Except private FileStoreCommitImpl newCommitWithSnapshotCommit( TestFileStore store, String commitUser, SnapshotCommit snapshotCommit) { + return newCommitWithSnapshotCommit( + store, + commitUser, + snapshotCommit, + store.options(), + store.options().dataEvolutionEnabled()); + } + + private FileStoreCommitImpl newCommitWithSnapshotCommit( + TestFileStore store, + String commitUser, + SnapshotCommit snapshotCommit, + CoreOptions options, + boolean dataEvolutionEnabled) { String tableName = store.options().path().getName(); return new FileStoreCommitImpl( snapshotCommit, @@ -1090,7 +1199,7 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit( tableName, commitUser, store.partitionType(), - store.options(), + options, store.pathFactory(), store.snapshotManager(), store.manifestFileFactory(), @@ -1109,9 +1218,9 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit( store.pathFactory(), store.newKeyComparator(), store.bucketMode(), - store.options().deletionVectorsEnabled(), - store.options().dataEvolutionEnabled(), - store.options().pkClusteringOverride(), + options.deletionVectorsEnabled(), + dataEvolutionEnabled, + options.pkClusteringOverride(), store.newIndexFileHandler(), store.snapshotManager(), scanner), @@ -1153,6 +1262,14 @@ private TestFileStore createStore(boolean failing, Map options) return createStore(failing, 1, CoreOptions.ChangelogProducer.NONE, options); } + private Pair baseManifestList(Snapshot snapshot) { + return Pair.of(snapshot.baseManifestList(), snapshot.baseManifestListSize()); + } + + private Pair deltaManifestList(Snapshot snapshot) { + return Pair.of(snapshot.deltaManifestList(), snapshot.deltaManifestListSize()); + } + private TestFileStore createStore(boolean failing) throws Exception { return createStore(failing, 1); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index 1c36b9e09ee7..e243fe3b8ecd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -18,15 +18,20 @@ package org.apache.paimon.operation.commit; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.index.DeletionVectorMeta; +import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.manifest.SimpleFileEntryWithDV; import org.apache.paimon.table.BucketMode; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.api.Test; @@ -36,8 +41,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; @@ -47,6 +54,8 @@ import static org.apache.paimon.operation.commit.ConflictDetection.buildBaseEntriesWithDV; import static org.apache.paimon.operation.commit.ConflictDetection.buildDeltaEntriesWithDV; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class ConflictDetectionTest { @@ -330,6 +339,15 @@ private IndexManifestEntry createDvIndexEntry( DELETION_VECTORS_INDEX, fileName, 11, dvRanges.size(), dvRanges, null)); } + private IndexManifestEntry createIndexEntry( + String fileName, FileKind kind, BinaryRow partition) { + return new IndexManifestEntry( + kind, + partition, + 0, + new IndexFileMeta("btree", fileName, 11, 1, (GlobalIndexMeta) null, null)); + } + private void assertConflict( List baseEntries, List deltaEntries) { ArrayList simpleFileEntryWithDVS = new ArrayList<>(baseEntries); @@ -489,7 +507,122 @@ private SimpleFileEntry createFileEntryWithRowId( firstRowId); } + @Test + void testDetectsRowIdReassignSnapshotConflict() { + SnapshotManager snapshotManager = mock(SnapshotManager.class); + Map reassignProperties = new HashMap<>(); + reassignProperties.put(Snapshot.ROW_ID_REASSIGN_PROPERTY, "true"); + when(snapshotManager.snapshot(2L)) + .thenReturn(snapshot(2, Snapshot.CommitKind.OVERWRITE, reassignProperties)); + + ConflictDetection detection = createConflictDetection(snapshotManager); + detection.setRowIdReassignCheckFromSnapshot(1L); + + Optional exception = + detection.checkConflicts( + snapshot(3, null), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + null, + Snapshot.CommitKind.COMPACT); + + assertThat(exception).isPresent(); + assertThat(exception.get()) + .hasMessageContaining("Row-id reassignment snapshot 2") + .hasMessageContaining("task planned from snapshot 1"); + } + + @Test + void testIgnoresRowIdReassignPropertyOnNonOverwriteSnapshot() { + SnapshotManager snapshotManager = mock(SnapshotManager.class); + Map reassignProperties = new HashMap<>(); + reassignProperties.put(Snapshot.ROW_ID_REASSIGN_PROPERTY, "true"); + when(snapshotManager.snapshot(2L)) + .thenReturn(snapshot(2, Snapshot.CommitKind.APPEND, reassignProperties)); + + ConflictDetection detection = createConflictDetection(snapshotManager); + detection.setRowIdReassignCheckFromSnapshot(1L); + + Optional exception = + detection.checkConflicts( + snapshot(2, null), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + null, + Snapshot.CommitKind.COMPACT); + + assertThat(exception).isNotPresent(); + } + + @Test + void testDetectsOverlappedOverwriteSnapshotForIndexCommit() { + SnapshotManager snapshotManager = mock(SnapshotManager.class); + CommitScanner commitScanner = mock(CommitScanner.class); + Snapshot overwriteSnapshot = snapshot(2, Snapshot.CommitKind.OVERWRITE, null); + when(snapshotManager.snapshot(2L)).thenReturn(overwriteSnapshot); + + BinaryRow partition = BinaryRow.singleColumn(1); + when(commitScanner.readIncrementalEntries( + overwriteSnapshot, Collections.singletonList(partition))) + .thenReturn(Collections.singletonList(mock(ManifestEntry.class))); + + ConflictDetection detection = createConflictDetection(snapshotManager, commitScanner); + detection.setRowIdReassignCheckFromSnapshot(1L); + + Optional exception = + detection.checkConflicts( + snapshot(2, null), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(createIndexEntry("idx", ADD, partition)), + null, + Snapshot.CommitKind.COMPACT); + + assertThat(exception).isPresent(); + assertThat(exception.get()) + .hasMessageContaining("Overwrite snapshot 2") + .hasMessageContaining("task planned from snapshot 1"); + } + + @Test + void testIgnoresNonOverlappedOverwriteSnapshotForIndexCommit() { + SnapshotManager snapshotManager = mock(SnapshotManager.class); + CommitScanner commitScanner = mock(CommitScanner.class); + Snapshot overwriteSnapshot = snapshot(2, Snapshot.CommitKind.OVERWRITE, null); + when(snapshotManager.snapshot(2L)).thenReturn(overwriteSnapshot); + + BinaryRow partition = BinaryRow.singleColumn(1); + when(commitScanner.readIncrementalEntries( + overwriteSnapshot, Collections.singletonList(partition))) + .thenReturn(Collections.emptyList()); + + ConflictDetection detection = createConflictDetection(snapshotManager, commitScanner); + detection.setRowIdReassignCheckFromSnapshot(1L); + + Optional exception = + detection.checkConflicts( + snapshot(2, null), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(createIndexEntry("idx", ADD, partition)), + null, + Snapshot.CommitKind.COMPACT); + + assertThat(exception).isNotPresent(); + } + private ConflictDetection createConflictDetection() { + return createConflictDetection(null); + } + + private ConflictDetection createConflictDetection(@Nullable SnapshotManager snapshotManager) { + return createConflictDetection(snapshotManager, null); + } + + private ConflictDetection createConflictDetection( + @Nullable SnapshotManager snapshotManager, @Nullable CommitScanner commitScanner) { return new ConflictDetection( "test-table", "test-user", @@ -501,7 +634,36 @@ private ConflictDetection createConflictDetection() { true, false, null, + snapshotManager, + commitScanner); + } + + private Snapshot snapshot(long id, @Nullable Map properties) { + return snapshot(id, Snapshot.CommitKind.APPEND, properties); + } + + private Snapshot snapshot( + long id, Snapshot.CommitKind commitKind, @Nullable Map properties) { + return new Snapshot( + id, + 0, + null, + null, + null, + null, + null, + null, + null, + "commit-user", + id, + commitKind, + id, + 0, + 0, + null, + null, null, + properties, null); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index 9542eea4a435..896e87a10ff7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -95,6 +95,7 @@ public static boolean buildIndex( Options userOptions) throws Exception { List> allStreams = new ArrayList<>(); + Long rowIdReassignCheckFromSnapshot = null; for (String indexColumn : indexColumns) { BTreeGlobalIndexBuilder indexBuilder = indexBuilderSupplier.get().withIndexField(indexColumn); @@ -107,6 +108,12 @@ public static boolean buildIndex( if (!indexRangeAndSplits.isPresent()) { continue; } + if (indexBuilder.scanSnapshotId().isPresent()) { + rowIdReassignCheckFromSnapshot = + minSnapshot( + rowIdReassignCheckFromSnapshot, + indexBuilder.scanSnapshotId().get()); + } Pair> scanResult = indexRangeAndSplits.get(); List splits = splitByContiguousRowRange(scanResult.getRight()); @@ -194,13 +201,17 @@ public static boolean buildIndex( @SuppressWarnings("unchecked") DataStream[] rest = allStreams.subList(1, allStreams.size()).toArray(new DataStream[0]); - commit(table, allStreams.get(0).union(rest)); + commit(table, allStreams.get(0).union(rest), rowIdReassignCheckFromSnapshot); return true; } return false; } + private static Long minSnapshot(Long left, long right) { + return left == null ? right : Math.min(left, right); + } + public static void buildIndexAndExecute( StreamExecutionEnvironment env, FileStoreTable table, @@ -312,7 +323,11 @@ private static RowType withBuildTaskId(RowType readType, String buildTaskIdField return new RowType(readType.isNullable(), fields); } - private static void commit(FileStoreTable table, DataStream written) { + private static void commit( + FileStoreTable table, + DataStream written, + Long rowIdReassignCheckFromSnapshot) { + FileStoreTable commitTable = withRowIdReassignCheck(table, rowIdReassignCheckFromSnapshot); OneInputStreamOperatorFactory committerOperator = new CommitterOperatorFactory<>( false, @@ -320,7 +335,9 @@ private static void commit(FileStoreTable table, DataStream written "BTreeIndexCommitter-" + UUID.randomUUID(), context -> new StoreCommitter( - table, table.newCommit(context.commitUser()), context), + commitTable, + commitTable.newCommit(context.commitUser()), + context), new NoopCommittableStateManager()); written.transform("COMMIT OPERATOR", new CommittableTypeInfo(), committerOperator) @@ -328,6 +345,17 @@ private static void commit(FileStoreTable table, DataStream written .setMaxParallelism(1); } + private static FileStoreTable withRowIdReassignCheck( + FileStoreTable table, Long rowIdReassignCheckFromSnapshot) { + if (rowIdReassignCheckFromSnapshot == null) { + return table; + } + return table.copy( + Collections.singletonMap( + CoreOptions.COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT.key(), + String.valueOf(rowIdReassignCheckFromSnapshot))); + } + /** Operator to read data from splits. */ private static class ReadDataOperator extends org.apache.flink.table.runtime.operators.TableStreamOperator diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java index 48ea935f5b62..da4644524c1c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.globalindex; +import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.partition.PartitionPredicate; @@ -28,6 +29,7 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; +import java.util.Optional; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -39,6 +41,7 @@ public class GenericGlobalIndexBuilder implements Serializable { protected final FileStoreTable table; @Nullable protected PartitionPredicate partitionPredicate; + @Nullable protected Long scanSnapshotId; public GenericGlobalIndexBuilder(FileStoreTable table) { this.table = table; @@ -53,6 +56,10 @@ public FileStoreTable table() { return table; } + public Optional scanSnapshotId() { + return Optional.ofNullable(scanSnapshotId); + } + /** * Scans manifest entries to determine which files need to be indexed. * @@ -72,7 +79,19 @@ public List scan() { + "deleted rows to be indexed.", table.name()); - return table.store().newScan().withPartitionFilter(partitionPredicate).plan().files(); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + if (snapshot == null) { + scanSnapshotId = null; + return Collections.emptyList(); + } + scanSnapshotId = snapshot.id(); + + return table.store() + .newScan() + .withSnapshot(snapshot) + .withPartitionFilter(partitionPredicate) + .plan() + .files(); } /** Returns old index file entries that should be deleted after new indexes are built. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java index e37970723c9c..df3016e75d46 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java @@ -179,6 +179,10 @@ public static boolean buildIndex( List entries = indexBuilder.scan(); List deletedIndexEntries = indexBuilder.deletedIndexEntries(); + Long rowIdReassignCheckFromSnapshot = + indexBuilder.scanSnapshotId().isPresent() + ? indexBuilder.scanSnapshotId().get() + : null; return buildTopology( env, @@ -188,7 +192,8 @@ public static boolean buildIndex( userOptions, entries, deletedIndexEntries, - maxIndexedRowId); + maxIndexedRowId, + rowIdReassignCheckFromSnapshot); } /** @@ -208,7 +213,8 @@ private static boolean buildTopology( Options userOptions, List entries, List deletedIndexEntries, - long maxIndexedRowId) + long maxIndexedRowId, + Long rowIdReassignCheckFromSnapshot) throws Exception { long totalRowCount = entries.stream().mapToLong(e -> e.file().rowCount()).sum(); LOG.info( @@ -294,7 +300,7 @@ private static boolean buildTopology( built = built.union(deletes); } - commit(table, indexType, built); + commit(table, indexType, built, rowIdReassignCheckFromSnapshot); return true; } @@ -506,7 +512,11 @@ private static List createDeleteCommittables( } private static void commit( - FileStoreTable table, String indexType, DataStream written) { + FileStoreTable table, + String indexType, + DataStream written, + Long rowIdReassignCheckFromSnapshot) { + FileStoreTable commitTable = withRowIdReassignCheck(table, rowIdReassignCheckFromSnapshot); OneInputStreamOperatorFactory committerOperator = new CommitterOperatorFactory<>( false, @@ -514,7 +524,9 @@ private static void commit( "GenericIndexCommitter-" + indexType + "-" + UUID.randomUUID(), context -> new StoreCommitter( - table, table.newCommit(context.commitUser()), context), + commitTable, + commitTable.newCommit(context.commitUser()), + context), new NoopCommittableStateManager()); written.transform("COMMIT OPERATOR", new CommittableTypeInfo(), committerOperator) @@ -522,6 +534,17 @@ private static void commit( .setMaxParallelism(1); } + private static FileStoreTable withRowIdReassignCheck( + FileStoreTable table, Long rowIdReassignCheckFromSnapshot) { + if (rowIdReassignCheckFromSnapshot == null) { + return table; + } + return table.copy( + Collections.singletonMap( + CoreOptions.COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT.key(), + String.valueOf(rowIdReassignCheckFromSnapshot))); + } + /** Serializable descriptor for one shard's work. Each shard has its own DataSplit and Range. */ static class ShardTask implements Serializable { private static final long serialVersionUID = 1L; From 396d545c04b51b0a59804cf7f12994b08a4e072b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 26 May 2026 16:36:04 +0800 Subject: [PATCH 2/6] [docs] Document row-id reassign commit option --- docs/generated/core_configuration.html | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index 7bc02cf87bc6..f11f86396d28 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -320,6 +320,12 @@ Long If set, committer will check if there are other commit user's snapshot starting from the snapshot after this one. If found a COMPACT / OVERWRITE snapshot, or found a APPEND snapshot which committed files to fixed bucket, commit will be aborted.If the value of this option is -1, committer will not check for its first commit. + +
commit.row-id-reassign.last-safe-snapshot
+ (none) + Long + If set, committer will check if there are row-id reassignment snapshots starting from the snapshot after this one. If found, commit will be aborted. +
commit.timeout
(none) From f5b0a8054cab25bd0a9b444b431e3e88196966c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 26 May 2026 17:36:18 +0800 Subject: [PATCH 3/6] [core][flink] Address row-id reassign review comments --- .../paimon/operation/FileStoreCommitTest.java | 49 +++++++++++++++++++ .../flink/btree/BTreeIndexTopoBuilder.java | 16 ++---- .../globalindex/GenericIndexTopoBuilder.java | 20 ++------ .../globalindex/GlobalIndexCommitUtils.java | 43 ++++++++++++++++ 4 files changed, 100 insertions(+), 28 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index eab2a9fa2ab0..961bcb0e2338 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -1054,6 +1054,55 @@ public void testReplaceManifestListWithRowIdReassignProperty() throws Exception .doesNotContainKey(Snapshot.ROW_ID_REASSIGN_PROPERTY); } + @Test + public void testCompactManifestWithRowIdReassignProperty() throws Exception { + TestFileStore store = createStore(false); + + List keyValues = generateDataList(1); + BinaryRow partition = gen.getPartition(keyValues.get(0)); + store.commitData(keyValues, s -> partition, kv -> 0); + store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()); + Snapshot latest = + store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()) + .get(0); + + long deleteNum = + store.manifestListFactory().create().readDataManifests(latest).stream() + .mapToLong(ManifestFileMeta::numDeletedFiles) + .sum(); + assertThat(deleteNum).isGreaterThan(0); + + Map reassignProperties = new HashMap<>(); + reassignProperties.put("keep", "v1"); + reassignProperties.put(Snapshot.ROW_ID_REASSIGN_PROPERTY, "true"); + try (FileStoreCommitImpl commit = store.newCommit()) { + assertThat( + commit.replaceManifestList( + latest, + latest.totalRecordCount(), + baseManifestList(latest), + deltaManifestList(latest), + latest.indexManifest(), + latest.nextRowId(), + reassignProperties)) + .isTrue(); + } + + Snapshot reassignSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(reassignSnapshot.properties()).isEqualTo(reassignProperties); + + try (FileStoreCommit commit = store.newCommit()) { + commit.compactManifest(); + } + + Snapshot normalSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(normalSnapshot.id()).isGreaterThan(reassignSnapshot.id()); + assertThat(normalSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT); + assertThat(normalSnapshot.properties()) + .containsEntry("keep", "v1") + .doesNotContainKey(Snapshot.ROW_ID_REASSIGN_PROPERTY); + } + @Test public void testRowIdReassignConflictFromOptions() throws Exception { TestFileStore store = createStore(false); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index 896e87a10ff7..4a0655ac6590 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -27,6 +27,7 @@ import org.apache.paimon.flink.FlinkRowData; import org.apache.paimon.flink.FlinkRowWrapper; import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.flink.globalindex.GlobalIndexCommitUtils; import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.CommittableTypeInfo; import org.apache.paimon.flink.sink.CommitterOperatorFactory; @@ -327,7 +328,9 @@ private static void commit( FileStoreTable table, DataStream written, Long rowIdReassignCheckFromSnapshot) { - FileStoreTable commitTable = withRowIdReassignCheck(table, rowIdReassignCheckFromSnapshot); + FileStoreTable commitTable = + GlobalIndexCommitUtils.withRowIdReassignCheck( + table, rowIdReassignCheckFromSnapshot); OneInputStreamOperatorFactory committerOperator = new CommitterOperatorFactory<>( false, @@ -345,17 +348,6 @@ private static void commit( .setMaxParallelism(1); } - private static FileStoreTable withRowIdReassignCheck( - FileStoreTable table, Long rowIdReassignCheckFromSnapshot) { - if (rowIdReassignCheckFromSnapshot == null) { - return table; - } - return table.copy( - Collections.singletonMap( - CoreOptions.COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT.key(), - String.valueOf(rowIdReassignCheckFromSnapshot))); - } - /** Operator to read data from splits. */ private static class ReadDataOperator extends org.apache.flink.table.runtime.operators.TableStreamOperator diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java index df3016e75d46..2f72a92db627 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java @@ -179,10 +179,7 @@ public static boolean buildIndex( List entries = indexBuilder.scan(); List deletedIndexEntries = indexBuilder.deletedIndexEntries(); - Long rowIdReassignCheckFromSnapshot = - indexBuilder.scanSnapshotId().isPresent() - ? indexBuilder.scanSnapshotId().get() - : null; + Long rowIdReassignCheckFromSnapshot = indexBuilder.scanSnapshotId().orElse(null); return buildTopology( env, @@ -516,7 +513,9 @@ private static void commit( String indexType, DataStream written, Long rowIdReassignCheckFromSnapshot) { - FileStoreTable commitTable = withRowIdReassignCheck(table, rowIdReassignCheckFromSnapshot); + FileStoreTable commitTable = + GlobalIndexCommitUtils.withRowIdReassignCheck( + table, rowIdReassignCheckFromSnapshot); OneInputStreamOperatorFactory committerOperator = new CommitterOperatorFactory<>( false, @@ -534,17 +533,6 @@ private static void commit( .setMaxParallelism(1); } - private static FileStoreTable withRowIdReassignCheck( - FileStoreTable table, Long rowIdReassignCheckFromSnapshot) { - if (rowIdReassignCheckFromSnapshot == null) { - return table; - } - return table.copy( - Collections.singletonMap( - CoreOptions.COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT.key(), - String.valueOf(rowIdReassignCheckFromSnapshot))); - } - /** Serializable descriptor for one shard's work. Each shard has its own DataSplit and Range. */ static class ShardTask implements Serializable { private static final long serialVersionUID = 1L; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java new file mode 100644 index 000000000000..4365887b9740 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.globalindex; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.table.FileStoreTable; + +import javax.annotation.Nullable; + +import java.util.Collections; + +/** Utilities for global index commit topology. */ +public final class GlobalIndexCommitUtils { + + private GlobalIndexCommitUtils() {} + + public static FileStoreTable withRowIdReassignCheck( + FileStoreTable table, @Nullable Long rowIdReassignCheckFromSnapshot) { + if (rowIdReassignCheckFromSnapshot == null) { + return table; + } + return table.copy( + Collections.singletonMap( + CoreOptions.COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT.key(), + String.valueOf(rowIdReassignCheckFromSnapshot))); + } +} From 3cf6c3300768dfd13e46f5d9e550a81b2e945c7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 26 May 2026 17:59:02 +0800 Subject: [PATCH 4/6] [core][flink] Rename row-id overwrite conflict barrier --- docs/generated/core_configuration.html | 4 +- .../java/org/apache/paimon/CoreOptions.java | 14 +++-- .../main/java/org/apache/paimon/Snapshot.java | 2 +- .../DataEvolutionRowIdReassigner.java | 4 +- .../paimon/operation/FileStoreCommitImpl.java | 19 +++--- .../operation/commit/ConflictDetection.java | 35 ++++++----- .../DataEvolutionRowIdReassignerTest.java | 2 + .../paimon/operation/FileStoreCommitTest.java | 61 ++++++++++--------- .../commit/ConflictDetectionTest.java | 26 ++++---- .../flink/btree/BTreeIndexTopoBuilder.java | 14 ++--- .../globalindex/GenericIndexTopoBuilder.java | 14 ++--- .../globalindex/GlobalIndexCommitUtils.java | 10 +-- 12 files changed, 108 insertions(+), 97 deletions(-) diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index f11f86396d28..03aabf02c239 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -321,10 +321,10 @@ If set, committer will check if there are other commit user's snapshot starting from the snapshot after this one. If found a COMPACT / OVERWRITE snapshot, or found a APPEND snapshot which committed files to fixed bucket, commit will be aborted.If the value of this option is -1, committer will not check for its first commit. -
commit.row-id-reassign.last-safe-snapshot
+
commit.row-id-overwrite-conflict.last-safe-snapshot
(none) Long - If set, committer will check if there are row-id reassignment snapshots starting from the snapshot after this one. If found, commit will be aborted. + If set, committer will check OVERWRITE snapshots starting from the snapshot after this one. If a row-id overwrite barrier snapshot or an ordinary overwrite snapshot which changed target partitions is found, commit will be aborted.
commit.timeout
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 3f05a62a1e3a..e3685e8d9404 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2152,13 +2152,15 @@ public InlineElement getDescription() { + "APPEND snapshot which committed files to fixed bucket, commit will be aborted." + "If the value of this option is -1, committer will not check for its first commit."); - public static final ConfigOption COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT = - ConfigOptions.key("commit.row-id-reassign.last-safe-snapshot") + public static final ConfigOption COMMIT_ROW_ID_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT = + ConfigOptions.key("commit.row-id-overwrite-conflict.last-safe-snapshot") .longType() .noDefaultValue() .withDescription( - "If set, committer will check if there are row-id reassignment snapshots starting from the " - + "snapshot after this one. If found, commit will be aborted."); + "If set, committer will check OVERWRITE snapshots starting from the " + + "snapshot after this one. If a row-id overwrite barrier snapshot " + + "or an ordinary overwrite snapshot which changed target partitions " + + "is found, commit will be aborted."); public static final ConfigOption CLUSTERING_COLUMNS = key("clustering.columns") @@ -3786,8 +3788,8 @@ public Optional commitStrictModeLastSafeSnapshot() { return options.getOptional(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT); } - public Optional commitRowIdReassignLastSafeSnapshot() { - return options.getOptional(COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT); + public Optional commitRowIdOverwriteConflictLastSafeSnapshot() { + return options.getOptional(COMMIT_ROW_ID_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT); } public List clusteringColumns() { diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java index d7f0ca2131cf..d661847662c8 100644 --- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java @@ -45,7 +45,7 @@ public class Snapshot implements Serializable { private static final long serialVersionUID = 1L; public static final long FIRST_SNAPSHOT_ID = 1; - public static final String ROW_ID_REASSIGN_PROPERTY = "row-id-reassign"; + public static final String ROW_ID_OVERWRITE_BARRIER_PROPERTY = "row-id-overwrite-barrier"; protected static final int CURRENT_VERSION = 3; diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java index 1faa4cf66ab2..0c5c80555855 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java @@ -149,7 +149,9 @@ public Result reassign(String commitUser) { baseManifestList, deltaManifestList, rewrittenIndexManifest.indexManifest, - assignment.nextRowId); + assignment.nextRowId, + Collections.singletonMap( + Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true")); if (!success) { throw new RuntimeException( "Failed to reassign row IDs because a newer snapshot has been committed."); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index f42ebc412dd0..880829e03a19 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -218,8 +218,8 @@ public FileStoreCommitImpl( id)) .orElse(null); this.conflictDetection = conflictDetectFactory.create(scanner); - options.commitRowIdReassignLastSafeSnapshot() - .ifPresent(this.conflictDetection::setRowIdReassignCheckFromSnapshot); + options.commitRowIdOverwriteConflictLastSafeSnapshot() + .ifPresent(this.conflictDetection::setRowIdOverwriteConflictCheckFromSnapshot); this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile); } @@ -327,7 +327,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { checkAppendFiles = true; allowRollback = true; } - if (conflictDetection.hasRowIdReassignCheckFromSnapshot()) { + if (conflictDetection.hasRowIdOverwriteConflictCheckFromSnapshot()) { checkAppendFiles = true; } @@ -1126,7 +1126,7 @@ public boolean replaceManifestList( deltaManifestList, latest.indexManifest(), latest.nextRowId(), - withoutRowIdReassignProperties(latest.properties())); + withoutRowIdOverwriteBarrierProperties(latest.properties())); } public boolean replaceManifestList( @@ -1143,7 +1143,7 @@ public boolean replaceManifestList( deltaManifestList, indexManifest, nextRowId, - withoutRowIdReassignProperties(latest.properties())); + withoutRowIdOverwriteBarrierProperties(latest.properties())); } public boolean replaceManifestList( @@ -1254,20 +1254,21 @@ private boolean compactManifestOnce() { null, latestSnapshot.watermark(), latestSnapshot.statistics(), - withoutRowIdReassignProperties(latestSnapshot.properties()), + withoutRowIdOverwriteBarrierProperties(latestSnapshot.properties()), latestSnapshot.nextRowId()); return commitSnapshotImpl(newSnapshot, emptyList()); } - private static @Nullable Map withoutRowIdReassignProperties( + private static @Nullable Map withoutRowIdOverwriteBarrierProperties( @Nullable Map properties) { - if (properties == null || !properties.containsKey(Snapshot.ROW_ID_REASSIGN_PROPERTY)) { + if (properties == null + || !properties.containsKey(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY)) { return properties; } Map copied = new HashMap<>(properties); - copied.remove(Snapshot.ROW_ID_REASSIGN_PROPERTY); + copied.remove(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY); return copied.isEmpty() ? null : copied; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index fc4e1afe77ff..51ed37ac4fa9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -95,7 +95,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { private @Nullable PartitionExpire partitionExpire; private @Nullable Long rowIdCheckFromSnapshot = null; - private @Nullable Long rowIdReassignCheckFromSnapshot = null; + private @Nullable Long rowIdOverwriteConflictCheckFromSnapshot = null; public ConflictDetection( String tableName, @@ -132,12 +132,13 @@ public boolean hasRowIdCheckFromSnapshot() { return rowIdCheckFromSnapshot != null; } - public void setRowIdReassignCheckFromSnapshot(@Nullable Long rowIdReassignCheckFromSnapshot) { - this.rowIdReassignCheckFromSnapshot = rowIdReassignCheckFromSnapshot; + public void setRowIdOverwriteConflictCheckFromSnapshot( + @Nullable Long rowIdOverwriteConflictCheckFromSnapshot) { + this.rowIdOverwriteConflictCheckFromSnapshot = rowIdOverwriteConflictCheckFromSnapshot; } - public boolean hasRowIdReassignCheckFromSnapshot() { - return rowIdReassignCheckFromSnapshot != null; + public boolean hasRowIdOverwriteConflictCheckFromSnapshot() { + return rowIdOverwriteConflictCheckFromSnapshot != null; } @Nullable @@ -246,7 +247,7 @@ public Optional checkConflicts( return exception; } - exception = checkRowIdReassignConflicts(latestSnapshot, deltaEntries, deltaIndexEntries); + exception = checkRowIdOverwriteConflicts(latestSnapshot, deltaEntries, deltaIndexEntries); if (exception.isPresent()) { return exception; } @@ -558,35 +559,37 @@ private Optional checkForRowIdFromSnapshot( return Optional.empty(); } - private Optional checkRowIdReassignConflicts( + private Optional checkRowIdOverwriteConflicts( Snapshot latestSnapshot, List deltaEntries, List deltaIndexEntries) { if (!dataEvolutionEnabled) { return Optional.empty(); } - if (rowIdReassignCheckFromSnapshot == null) { + if (rowIdOverwriteConflictCheckFromSnapshot == null) { return Optional.empty(); } - if (latestSnapshot.id() <= rowIdReassignCheckFromSnapshot) { + if (latestSnapshot.id() <= rowIdOverwriteConflictCheckFromSnapshot) { return Optional.empty(); } List changedPartitions = changedPartitionsIncludingAllIndexFiles(deltaEntries, deltaIndexEntries); - for (long id = rowIdReassignCheckFromSnapshot + 1; id <= latestSnapshot.id(); id++) { + for (long id = rowIdOverwriteConflictCheckFromSnapshot + 1; + id <= latestSnapshot.id(); + id++) { Snapshot snapshot = snapshotManager.snapshot(id); if (snapshot.commitKind() != CommitKind.OVERWRITE) { continue; } - if (hasRowIdReassignProperty(snapshot.properties())) { + if (hasRowIdOverwriteBarrierProperty(snapshot.properties())) { return Optional.of( new RuntimeException( String.format( - "Row-id reassignment snapshot %s was committed after the " + "Row-id overwrite barrier snapshot %s was committed after the " + "task planned from snapshot %s. The task must " + "be retried with the latest row ids.", - id, rowIdReassignCheckFromSnapshot))); + id, rowIdOverwriteConflictCheckFromSnapshot))); } if (overwriteChangedTargetPartitions(snapshot, changedPartitions)) { return Optional.of( @@ -595,15 +598,15 @@ private Optional checkRowIdReassignConflicts( "Overwrite snapshot %s changed partitions after the " + "task planned from snapshot %s. The task must " + "be retried with the latest row ids.", - id, rowIdReassignCheckFromSnapshot))); + id, rowIdOverwriteConflictCheckFromSnapshot))); } } return Optional.empty(); } - private boolean hasRowIdReassignProperty(@Nullable Map properties) { + private boolean hasRowIdOverwriteBarrierProperty(@Nullable Map properties) { return properties != null - && Boolean.parseBoolean(properties.get(Snapshot.ROW_ID_REASSIGN_PROPERTY)); + && Boolean.parseBoolean(properties.get(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY)); } private boolean overwriteChangedTargetPartitions( diff --git a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java index 2cacf4722e1f..c75af66c77f3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java @@ -125,6 +125,8 @@ public void testReassignRowIdsByPartition() throws Exception { assertThat(result.fileCount).isEqualTo(5L); assertThat(result.rowCount).isEqualTo(5L); assertThat(result.indexFileCount).isEqualTo(0L); + assertThat(table.snapshotManager().latestSnapshot().properties()) + .containsEntry(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true"); Map> rowIdsByPartition = rowIdsByPartition(table); assertThat(rowIdsByPartition).hasSize(2); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 961bcb0e2338..ac0d08ed0297 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -1010,16 +1010,16 @@ public void testCommitManifestWithProperties() throws Exception { } @Test - public void testReplaceManifestListWithRowIdReassignProperty() throws Exception { + public void testReplaceManifestListWithRowIdOverwriteBarrierProperty() throws Exception { TestFileStore store = createStore(false); List keyValues = generateDataList(1); BinaryRow partition = gen.getPartition(keyValues.get(0)); Snapshot latest = store.commitData(keyValues, s -> partition, kv -> 0).get(0); - Map reassignProperties = new HashMap<>(); - reassignProperties.put("keep", "v1"); - reassignProperties.put(Snapshot.ROW_ID_REASSIGN_PROPERTY, "true"); + Map barrierProperties = new HashMap<>(); + barrierProperties.put("keep", "v1"); + barrierProperties.put(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true"); try (FileStoreCommitImpl commit = store.newCommit()) { assertThat( commit.replaceManifestList( @@ -1029,33 +1029,33 @@ public void testReplaceManifestListWithRowIdReassignProperty() throws Exception deltaManifestList(latest), latest.indexManifest(), latest.nextRowId(), - reassignProperties)) + barrierProperties)) .isTrue(); } - Snapshot reassignSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); - assertThat(reassignSnapshot.properties()).isEqualTo(reassignProperties); + Snapshot barrierSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(barrierSnapshot.properties()).isEqualTo(barrierProperties); try (FileStoreCommitImpl commit = store.newCommit()) { assertThat( commit.replaceManifestList( - reassignSnapshot, - reassignSnapshot.totalRecordCount(), - baseManifestList(reassignSnapshot), - deltaManifestList(reassignSnapshot), - reassignSnapshot.indexManifest(), - reassignSnapshot.nextRowId())) + barrierSnapshot, + barrierSnapshot.totalRecordCount(), + baseManifestList(barrierSnapshot), + deltaManifestList(barrierSnapshot), + barrierSnapshot.indexManifest(), + barrierSnapshot.nextRowId())) .isTrue(); } Snapshot normalSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); assertThat(normalSnapshot.properties()) .containsEntry("keep", "v1") - .doesNotContainKey(Snapshot.ROW_ID_REASSIGN_PROPERTY); + .doesNotContainKey(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY); } @Test - public void testCompactManifestWithRowIdReassignProperty() throws Exception { + public void testCompactManifestWithRowIdOverwriteBarrierProperty() throws Exception { TestFileStore store = createStore(false); List keyValues = generateDataList(1); @@ -1072,9 +1072,9 @@ public void testCompactManifestWithRowIdReassignProperty() throws Exception { .sum(); assertThat(deleteNum).isGreaterThan(0); - Map reassignProperties = new HashMap<>(); - reassignProperties.put("keep", "v1"); - reassignProperties.put(Snapshot.ROW_ID_REASSIGN_PROPERTY, "true"); + Map barrierProperties = new HashMap<>(); + barrierProperties.put("keep", "v1"); + barrierProperties.put(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true"); try (FileStoreCommitImpl commit = store.newCommit()) { assertThat( commit.replaceManifestList( @@ -1084,35 +1084,35 @@ public void testCompactManifestWithRowIdReassignProperty() throws Exception { deltaManifestList(latest), latest.indexManifest(), latest.nextRowId(), - reassignProperties)) + barrierProperties)) .isTrue(); } - Snapshot reassignSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); - assertThat(reassignSnapshot.properties()).isEqualTo(reassignProperties); + Snapshot barrierSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(barrierSnapshot.properties()).isEqualTo(barrierProperties); try (FileStoreCommit commit = store.newCommit()) { commit.compactManifest(); } Snapshot normalSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); - assertThat(normalSnapshot.id()).isGreaterThan(reassignSnapshot.id()); + assertThat(normalSnapshot.id()).isGreaterThan(barrierSnapshot.id()); assertThat(normalSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT); assertThat(normalSnapshot.properties()) .containsEntry("keep", "v1") - .doesNotContainKey(Snapshot.ROW_ID_REASSIGN_PROPERTY); + .doesNotContainKey(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY); } @Test - public void testRowIdReassignConflictFromOptions() throws Exception { + public void testRowIdOverwriteConflictFromOptions() throws Exception { TestFileStore store = createStore(false); List keyValues = generateDataList(1); BinaryRow partition = gen.getPartition(keyValues.get(0)); Snapshot latest = store.commitData(keyValues, s -> partition, kv -> 0).get(0); - Map reassignProperties = new HashMap<>(); - reassignProperties.put(Snapshot.ROW_ID_REASSIGN_PROPERTY, "true"); + Map barrierProperties = new HashMap<>(); + barrierProperties.put(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true"); try (FileStoreCommitImpl commit = store.newCommit()) { assertThat( commit.replaceManifestList( @@ -1122,7 +1122,7 @@ public void testRowIdReassignConflictFromOptions() throws Exception { deltaManifestList(latest), latest.indexManifest(), latest.nextRowId(), - reassignProperties)) + barrierProperties)) .isTrue(); } @@ -1138,16 +1138,17 @@ public void testRowIdReassignConflictFromOptions() throws Exception { (commit, committable) -> committableRef.set(committable)); Map dynamicOptions = new HashMap<>(store.options().toMap()); - dynamicOptions.put(CoreOptions.COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT.key(), "1"); + dynamicOptions.put( + CoreOptions.COMMIT_ROW_ID_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT.key(), "1"); try (FileStoreCommitImpl commit = newCommitWithSnapshotCommit( store, - "row-id-reassign-check", + "row-id-overwrite-barrier-check", new RenamingSnapshotCommit(store.snapshotManager(), Lock.empty()), new CoreOptions(dynamicOptions), true)) { assertThatThrownBy(() -> commit.commit(checkNotNull(committableRef.get()), false)) - .hasMessageContaining("Row-id reassignment snapshot 2") + .hasMessageContaining("Row-id overwrite barrier snapshot 2") .hasMessageContaining("task planned from snapshot 1"); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index e243fe3b8ecd..55c25d7753d8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -508,15 +508,15 @@ private SimpleFileEntry createFileEntryWithRowId( } @Test - void testDetectsRowIdReassignSnapshotConflict() { + void testDetectsRowIdOverwriteBarrierSnapshotConflict() { SnapshotManager snapshotManager = mock(SnapshotManager.class); - Map reassignProperties = new HashMap<>(); - reassignProperties.put(Snapshot.ROW_ID_REASSIGN_PROPERTY, "true"); + Map barrierProperties = new HashMap<>(); + barrierProperties.put(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true"); when(snapshotManager.snapshot(2L)) - .thenReturn(snapshot(2, Snapshot.CommitKind.OVERWRITE, reassignProperties)); + .thenReturn(snapshot(2, Snapshot.CommitKind.OVERWRITE, barrierProperties)); ConflictDetection detection = createConflictDetection(snapshotManager); - detection.setRowIdReassignCheckFromSnapshot(1L); + detection.setRowIdOverwriteConflictCheckFromSnapshot(1L); Optional exception = detection.checkConflicts( @@ -529,20 +529,20 @@ void testDetectsRowIdReassignSnapshotConflict() { assertThat(exception).isPresent(); assertThat(exception.get()) - .hasMessageContaining("Row-id reassignment snapshot 2") + .hasMessageContaining("Row-id overwrite barrier snapshot 2") .hasMessageContaining("task planned from snapshot 1"); } @Test - void testIgnoresRowIdReassignPropertyOnNonOverwriteSnapshot() { + void testIgnoresRowIdOverwriteBarrierPropertyOnNonOverwriteSnapshot() { SnapshotManager snapshotManager = mock(SnapshotManager.class); - Map reassignProperties = new HashMap<>(); - reassignProperties.put(Snapshot.ROW_ID_REASSIGN_PROPERTY, "true"); + Map barrierProperties = new HashMap<>(); + barrierProperties.put(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true"); when(snapshotManager.snapshot(2L)) - .thenReturn(snapshot(2, Snapshot.CommitKind.APPEND, reassignProperties)); + .thenReturn(snapshot(2, Snapshot.CommitKind.APPEND, barrierProperties)); ConflictDetection detection = createConflictDetection(snapshotManager); - detection.setRowIdReassignCheckFromSnapshot(1L); + detection.setRowIdOverwriteConflictCheckFromSnapshot(1L); Optional exception = detection.checkConflicts( @@ -569,7 +569,7 @@ void testDetectsOverlappedOverwriteSnapshotForIndexCommit() { .thenReturn(Collections.singletonList(mock(ManifestEntry.class))); ConflictDetection detection = createConflictDetection(snapshotManager, commitScanner); - detection.setRowIdReassignCheckFromSnapshot(1L); + detection.setRowIdOverwriteConflictCheckFromSnapshot(1L); Optional exception = detection.checkConflicts( @@ -599,7 +599,7 @@ void testIgnoresNonOverlappedOverwriteSnapshotForIndexCommit() { .thenReturn(Collections.emptyList()); ConflictDetection detection = createConflictDetection(snapshotManager, commitScanner); - detection.setRowIdReassignCheckFromSnapshot(1L); + detection.setRowIdOverwriteConflictCheckFromSnapshot(1L); Optional exception = detection.checkConflicts( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index 4a0655ac6590..13fc3a891a53 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -96,7 +96,7 @@ public static boolean buildIndex( Options userOptions) throws Exception { List> allStreams = new ArrayList<>(); - Long rowIdReassignCheckFromSnapshot = null; + Long rowIdOverwriteConflictCheckFromSnapshot = null; for (String indexColumn : indexColumns) { BTreeGlobalIndexBuilder indexBuilder = indexBuilderSupplier.get().withIndexField(indexColumn); @@ -110,9 +110,9 @@ public static boolean buildIndex( continue; } if (indexBuilder.scanSnapshotId().isPresent()) { - rowIdReassignCheckFromSnapshot = + rowIdOverwriteConflictCheckFromSnapshot = minSnapshot( - rowIdReassignCheckFromSnapshot, + rowIdOverwriteConflictCheckFromSnapshot, indexBuilder.scanSnapshotId().get()); } @@ -202,7 +202,7 @@ public static boolean buildIndex( @SuppressWarnings("unchecked") DataStream[] rest = allStreams.subList(1, allStreams.size()).toArray(new DataStream[0]); - commit(table, allStreams.get(0).union(rest), rowIdReassignCheckFromSnapshot); + commit(table, allStreams.get(0).union(rest), rowIdOverwriteConflictCheckFromSnapshot); return true; } @@ -327,10 +327,10 @@ private static RowType withBuildTaskId(RowType readType, String buildTaskIdField private static void commit( FileStoreTable table, DataStream written, - Long rowIdReassignCheckFromSnapshot) { + Long rowIdOverwriteConflictCheckFromSnapshot) { FileStoreTable commitTable = - GlobalIndexCommitUtils.withRowIdReassignCheck( - table, rowIdReassignCheckFromSnapshot); + GlobalIndexCommitUtils.withRowIdOverwriteConflictCheck( + table, rowIdOverwriteConflictCheckFromSnapshot); OneInputStreamOperatorFactory committerOperator = new CommitterOperatorFactory<>( false, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java index 2f72a92db627..e0b21d95af12 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java @@ -179,7 +179,7 @@ public static boolean buildIndex( List entries = indexBuilder.scan(); List deletedIndexEntries = indexBuilder.deletedIndexEntries(); - Long rowIdReassignCheckFromSnapshot = indexBuilder.scanSnapshotId().orElse(null); + Long rowIdOverwriteConflictCheckFromSnapshot = indexBuilder.scanSnapshotId().orElse(null); return buildTopology( env, @@ -190,7 +190,7 @@ public static boolean buildIndex( entries, deletedIndexEntries, maxIndexedRowId, - rowIdReassignCheckFromSnapshot); + rowIdOverwriteConflictCheckFromSnapshot); } /** @@ -211,7 +211,7 @@ private static boolean buildTopology( List entries, List deletedIndexEntries, long maxIndexedRowId, - Long rowIdReassignCheckFromSnapshot) + Long rowIdOverwriteConflictCheckFromSnapshot) throws Exception { long totalRowCount = entries.stream().mapToLong(e -> e.file().rowCount()).sum(); LOG.info( @@ -297,7 +297,7 @@ private static boolean buildTopology( built = built.union(deletes); } - commit(table, indexType, built, rowIdReassignCheckFromSnapshot); + commit(table, indexType, built, rowIdOverwriteConflictCheckFromSnapshot); return true; } @@ -512,10 +512,10 @@ private static void commit( FileStoreTable table, String indexType, DataStream written, - Long rowIdReassignCheckFromSnapshot) { + Long rowIdOverwriteConflictCheckFromSnapshot) { FileStoreTable commitTable = - GlobalIndexCommitUtils.withRowIdReassignCheck( - table, rowIdReassignCheckFromSnapshot); + GlobalIndexCommitUtils.withRowIdOverwriteConflictCheck( + table, rowIdOverwriteConflictCheckFromSnapshot); OneInputStreamOperatorFactory committerOperator = new CommitterOperatorFactory<>( false, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java index 4365887b9740..be383fbb4859 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java @@ -30,14 +30,14 @@ public final class GlobalIndexCommitUtils { private GlobalIndexCommitUtils() {} - public static FileStoreTable withRowIdReassignCheck( - FileStoreTable table, @Nullable Long rowIdReassignCheckFromSnapshot) { - if (rowIdReassignCheckFromSnapshot == null) { + public static FileStoreTable withRowIdOverwriteConflictCheck( + FileStoreTable table, @Nullable Long rowIdOverwriteConflictCheckFromSnapshot) { + if (rowIdOverwriteConflictCheckFromSnapshot == null) { return table; } return table.copy( Collections.singletonMap( - CoreOptions.COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT.key(), - String.valueOf(rowIdReassignCheckFromSnapshot))); + CoreOptions.COMMIT_ROW_ID_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT.key(), + String.valueOf(rowIdOverwriteConflictCheckFromSnapshot))); } } From 3303b689fe5e6bd5ca3b6ced6b827ba48ce3626b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 26 May 2026 18:18:14 +0800 Subject: [PATCH 5/6] [core][flink] Rename overwrite conflict option --- docs/generated/core_configuration.html | 2 +- .../java/org/apache/paimon/CoreOptions.java | 8 +++--- .../paimon/operation/FileStoreCommitImpl.java | 6 ++-- .../operation/commit/ConflictDetection.java | 28 +++++++++---------- .../paimon/operation/FileStoreCommitTest.java | 5 ++-- .../commit/ConflictDetectionTest.java | 8 +++--- .../flink/btree/BTreeIndexTopoBuilder.java | 14 +++++----- .../globalindex/GenericIndexTopoBuilder.java | 14 +++++----- .../globalindex/GlobalIndexCommitUtils.java | 10 +++---- 9 files changed, 46 insertions(+), 49 deletions(-) diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index 03aabf02c239..fc1824d106ce 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -321,7 +321,7 @@ If set, committer will check if there are other commit user's snapshot starting from the snapshot after this one. If found a COMPACT / OVERWRITE snapshot, or found a APPEND snapshot which committed files to fixed bucket, commit will be aborted.If the value of this option is -1, committer will not check for its first commit. -
commit.row-id-overwrite-conflict.last-safe-snapshot
+
commit.overwrite-conflict.last-safe-snapshot
(none) Long If set, committer will check OVERWRITE snapshots starting from the snapshot after this one. If a row-id overwrite barrier snapshot or an ordinary overwrite snapshot which changed target partitions is found, commit will be aborted. diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index e3685e8d9404..ef8533ac658a 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2152,8 +2152,8 @@ public InlineElement getDescription() { + "APPEND snapshot which committed files to fixed bucket, commit will be aborted." + "If the value of this option is -1, committer will not check for its first commit."); - public static final ConfigOption COMMIT_ROW_ID_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT = - ConfigOptions.key("commit.row-id-overwrite-conflict.last-safe-snapshot") + public static final ConfigOption COMMIT_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT = + ConfigOptions.key("commit.overwrite-conflict.last-safe-snapshot") .longType() .noDefaultValue() .withDescription( @@ -3788,8 +3788,8 @@ public Optional commitStrictModeLastSafeSnapshot() { return options.getOptional(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT); } - public Optional commitRowIdOverwriteConflictLastSafeSnapshot() { - return options.getOptional(COMMIT_ROW_ID_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT); + public Optional commitOverwriteConflictLastSafeSnapshot() { + return options.getOptional(COMMIT_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT); } public List clusteringColumns() { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 880829e03a19..edf18a43e131 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -218,8 +218,8 @@ public FileStoreCommitImpl( id)) .orElse(null); this.conflictDetection = conflictDetectFactory.create(scanner); - options.commitRowIdOverwriteConflictLastSafeSnapshot() - .ifPresent(this.conflictDetection::setRowIdOverwriteConflictCheckFromSnapshot); + options.commitOverwriteConflictLastSafeSnapshot() + .ifPresent(this.conflictDetection::setOverwriteConflictCheckFromSnapshot); this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile); } @@ -327,7 +327,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { checkAppendFiles = true; allowRollback = true; } - if (conflictDetection.hasRowIdOverwriteConflictCheckFromSnapshot()) { + if (conflictDetection.hasOverwriteConflictCheckFromSnapshot()) { checkAppendFiles = true; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 51ed37ac4fa9..f126312e30dc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -95,7 +95,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { private @Nullable PartitionExpire partitionExpire; private @Nullable Long rowIdCheckFromSnapshot = null; - private @Nullable Long rowIdOverwriteConflictCheckFromSnapshot = null; + private @Nullable Long overwriteConflictCheckFromSnapshot = null; public ConflictDetection( String tableName, @@ -132,13 +132,13 @@ public boolean hasRowIdCheckFromSnapshot() { return rowIdCheckFromSnapshot != null; } - public void setRowIdOverwriteConflictCheckFromSnapshot( - @Nullable Long rowIdOverwriteConflictCheckFromSnapshot) { - this.rowIdOverwriteConflictCheckFromSnapshot = rowIdOverwriteConflictCheckFromSnapshot; + public void setOverwriteConflictCheckFromSnapshot( + @Nullable Long overwriteConflictCheckFromSnapshot) { + this.overwriteConflictCheckFromSnapshot = overwriteConflictCheckFromSnapshot; } - public boolean hasRowIdOverwriteConflictCheckFromSnapshot() { - return rowIdOverwriteConflictCheckFromSnapshot != null; + public boolean hasOverwriteConflictCheckFromSnapshot() { + return overwriteConflictCheckFromSnapshot != null; } @Nullable @@ -247,7 +247,7 @@ public Optional checkConflicts( return exception; } - exception = checkRowIdOverwriteConflicts(latestSnapshot, deltaEntries, deltaIndexEntries); + exception = checkOverwriteConflicts(latestSnapshot, deltaEntries, deltaIndexEntries); if (exception.isPresent()) { return exception; } @@ -559,25 +559,23 @@ private Optional checkForRowIdFromSnapshot( return Optional.empty(); } - private Optional checkRowIdOverwriteConflicts( + private Optional checkOverwriteConflicts( Snapshot latestSnapshot, List deltaEntries, List deltaIndexEntries) { if (!dataEvolutionEnabled) { return Optional.empty(); } - if (rowIdOverwriteConflictCheckFromSnapshot == null) { + if (overwriteConflictCheckFromSnapshot == null) { return Optional.empty(); } - if (latestSnapshot.id() <= rowIdOverwriteConflictCheckFromSnapshot) { + if (latestSnapshot.id() <= overwriteConflictCheckFromSnapshot) { return Optional.empty(); } List changedPartitions = changedPartitionsIncludingAllIndexFiles(deltaEntries, deltaIndexEntries); - for (long id = rowIdOverwriteConflictCheckFromSnapshot + 1; - id <= latestSnapshot.id(); - id++) { + for (long id = overwriteConflictCheckFromSnapshot + 1; id <= latestSnapshot.id(); id++) { Snapshot snapshot = snapshotManager.snapshot(id); if (snapshot.commitKind() != CommitKind.OVERWRITE) { continue; @@ -589,7 +587,7 @@ private Optional checkRowIdOverwriteConflicts( "Row-id overwrite barrier snapshot %s was committed after the " + "task planned from snapshot %s. The task must " + "be retried with the latest row ids.", - id, rowIdOverwriteConflictCheckFromSnapshot))); + id, overwriteConflictCheckFromSnapshot))); } if (overwriteChangedTargetPartitions(snapshot, changedPartitions)) { return Optional.of( @@ -598,7 +596,7 @@ private Optional checkRowIdOverwriteConflicts( "Overwrite snapshot %s changed partitions after the " + "task planned from snapshot %s. The task must " + "be retried with the latest row ids.", - id, rowIdOverwriteConflictCheckFromSnapshot))); + id, overwriteConflictCheckFromSnapshot))); } } return Optional.empty(); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index ac0d08ed0297..11aa419c161a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -1104,7 +1104,7 @@ public void testCompactManifestWithRowIdOverwriteBarrierProperty() throws Except } @Test - public void testRowIdOverwriteConflictFromOptions() throws Exception { + public void testOverwriteConflictFromOptions() throws Exception { TestFileStore store = createStore(false); List keyValues = generateDataList(1); @@ -1138,8 +1138,7 @@ public void testRowIdOverwriteConflictFromOptions() throws Exception { (commit, committable) -> committableRef.set(committable)); Map dynamicOptions = new HashMap<>(store.options().toMap()); - dynamicOptions.put( - CoreOptions.COMMIT_ROW_ID_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT.key(), "1"); + dynamicOptions.put(CoreOptions.COMMIT_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT.key(), "1"); try (FileStoreCommitImpl commit = newCommitWithSnapshotCommit( store, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index 55c25d7753d8..75452fdb08ed 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -516,7 +516,7 @@ void testDetectsRowIdOverwriteBarrierSnapshotConflict() { .thenReturn(snapshot(2, Snapshot.CommitKind.OVERWRITE, barrierProperties)); ConflictDetection detection = createConflictDetection(snapshotManager); - detection.setRowIdOverwriteConflictCheckFromSnapshot(1L); + detection.setOverwriteConflictCheckFromSnapshot(1L); Optional exception = detection.checkConflicts( @@ -542,7 +542,7 @@ void testIgnoresRowIdOverwriteBarrierPropertyOnNonOverwriteSnapshot() { .thenReturn(snapshot(2, Snapshot.CommitKind.APPEND, barrierProperties)); ConflictDetection detection = createConflictDetection(snapshotManager); - detection.setRowIdOverwriteConflictCheckFromSnapshot(1L); + detection.setOverwriteConflictCheckFromSnapshot(1L); Optional exception = detection.checkConflicts( @@ -569,7 +569,7 @@ void testDetectsOverlappedOverwriteSnapshotForIndexCommit() { .thenReturn(Collections.singletonList(mock(ManifestEntry.class))); ConflictDetection detection = createConflictDetection(snapshotManager, commitScanner); - detection.setRowIdOverwriteConflictCheckFromSnapshot(1L); + detection.setOverwriteConflictCheckFromSnapshot(1L); Optional exception = detection.checkConflicts( @@ -599,7 +599,7 @@ void testIgnoresNonOverlappedOverwriteSnapshotForIndexCommit() { .thenReturn(Collections.emptyList()); ConflictDetection detection = createConflictDetection(snapshotManager, commitScanner); - detection.setRowIdOverwriteConflictCheckFromSnapshot(1L); + detection.setOverwriteConflictCheckFromSnapshot(1L); Optional exception = detection.checkConflicts( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index 13fc3a891a53..f70b8bf68911 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -96,7 +96,7 @@ public static boolean buildIndex( Options userOptions) throws Exception { List> allStreams = new ArrayList<>(); - Long rowIdOverwriteConflictCheckFromSnapshot = null; + Long overwriteConflictCheckFromSnapshot = null; for (String indexColumn : indexColumns) { BTreeGlobalIndexBuilder indexBuilder = indexBuilderSupplier.get().withIndexField(indexColumn); @@ -110,9 +110,9 @@ public static boolean buildIndex( continue; } if (indexBuilder.scanSnapshotId().isPresent()) { - rowIdOverwriteConflictCheckFromSnapshot = + overwriteConflictCheckFromSnapshot = minSnapshot( - rowIdOverwriteConflictCheckFromSnapshot, + overwriteConflictCheckFromSnapshot, indexBuilder.scanSnapshotId().get()); } @@ -202,7 +202,7 @@ public static boolean buildIndex( @SuppressWarnings("unchecked") DataStream[] rest = allStreams.subList(1, allStreams.size()).toArray(new DataStream[0]); - commit(table, allStreams.get(0).union(rest), rowIdOverwriteConflictCheckFromSnapshot); + commit(table, allStreams.get(0).union(rest), overwriteConflictCheckFromSnapshot); return true; } @@ -327,10 +327,10 @@ private static RowType withBuildTaskId(RowType readType, String buildTaskIdField private static void commit( FileStoreTable table, DataStream written, - Long rowIdOverwriteConflictCheckFromSnapshot) { + Long overwriteConflictCheckFromSnapshot) { FileStoreTable commitTable = - GlobalIndexCommitUtils.withRowIdOverwriteConflictCheck( - table, rowIdOverwriteConflictCheckFromSnapshot); + GlobalIndexCommitUtils.withOverwriteConflictCheck( + table, overwriteConflictCheckFromSnapshot); OneInputStreamOperatorFactory committerOperator = new CommitterOperatorFactory<>( false, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java index e0b21d95af12..6d9cd92b5a12 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java @@ -179,7 +179,7 @@ public static boolean buildIndex( List entries = indexBuilder.scan(); List deletedIndexEntries = indexBuilder.deletedIndexEntries(); - Long rowIdOverwriteConflictCheckFromSnapshot = indexBuilder.scanSnapshotId().orElse(null); + Long overwriteConflictCheckFromSnapshot = indexBuilder.scanSnapshotId().orElse(null); return buildTopology( env, @@ -190,7 +190,7 @@ public static boolean buildIndex( entries, deletedIndexEntries, maxIndexedRowId, - rowIdOverwriteConflictCheckFromSnapshot); + overwriteConflictCheckFromSnapshot); } /** @@ -211,7 +211,7 @@ private static boolean buildTopology( List entries, List deletedIndexEntries, long maxIndexedRowId, - Long rowIdOverwriteConflictCheckFromSnapshot) + Long overwriteConflictCheckFromSnapshot) throws Exception { long totalRowCount = entries.stream().mapToLong(e -> e.file().rowCount()).sum(); LOG.info( @@ -297,7 +297,7 @@ private static boolean buildTopology( built = built.union(deletes); } - commit(table, indexType, built, rowIdOverwriteConflictCheckFromSnapshot); + commit(table, indexType, built, overwriteConflictCheckFromSnapshot); return true; } @@ -512,10 +512,10 @@ private static void commit( FileStoreTable table, String indexType, DataStream written, - Long rowIdOverwriteConflictCheckFromSnapshot) { + Long overwriteConflictCheckFromSnapshot) { FileStoreTable commitTable = - GlobalIndexCommitUtils.withRowIdOverwriteConflictCheck( - table, rowIdOverwriteConflictCheckFromSnapshot); + GlobalIndexCommitUtils.withOverwriteConflictCheck( + table, overwriteConflictCheckFromSnapshot); OneInputStreamOperatorFactory committerOperator = new CommitterOperatorFactory<>( false, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java index be383fbb4859..44ff40120d1e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java @@ -30,14 +30,14 @@ public final class GlobalIndexCommitUtils { private GlobalIndexCommitUtils() {} - public static FileStoreTable withRowIdOverwriteConflictCheck( - FileStoreTable table, @Nullable Long rowIdOverwriteConflictCheckFromSnapshot) { - if (rowIdOverwriteConflictCheckFromSnapshot == null) { + public static FileStoreTable withOverwriteConflictCheck( + FileStoreTable table, @Nullable Long overwriteConflictCheckFromSnapshot) { + if (overwriteConflictCheckFromSnapshot == null) { return table; } return table.copy( Collections.singletonMap( - CoreOptions.COMMIT_ROW_ID_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT.key(), - String.valueOf(rowIdOverwriteConflictCheckFromSnapshot))); + CoreOptions.COMMIT_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT.key(), + String.valueOf(overwriteConflictCheckFromSnapshot))); } } From 923532c7f3dea58aebfdd6daf84666dd68c294e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 26 May 2026 18:22:36 +0800 Subject: [PATCH 6/6] [core] Rename overwrite barrier snapshot property --- docs/generated/core_configuration.html | 2 +- .../java/org/apache/paimon/CoreOptions.java | 2 +- .../main/java/org/apache/paimon/Snapshot.java | 2 +- .../DataEvolutionRowIdReassigner.java | 3 +-- .../paimon/operation/FileStoreCommitImpl.java | 13 ++++++------- .../operation/commit/ConflictDetection.java | 8 ++++---- .../DataEvolutionRowIdReassignerTest.java | 2 +- .../paimon/operation/FileStoreCommitTest.java | 18 +++++++++--------- .../commit/ConflictDetectionTest.java | 10 +++++----- 9 files changed, 29 insertions(+), 31 deletions(-) diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index fc1824d106ce..59f79c81f2ea 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -324,7 +324,7 @@
commit.overwrite-conflict.last-safe-snapshot
(none) Long - If set, committer will check OVERWRITE snapshots starting from the snapshot after this one. If a row-id overwrite barrier snapshot or an ordinary overwrite snapshot which changed target partitions is found, commit will be aborted. + If set, committer will check OVERWRITE snapshots starting from the snapshot after this one. If an overwrite barrier snapshot or an ordinary overwrite snapshot which changed target partitions is found, commit will be aborted.
commit.timeout
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index ef8533ac658a..6d8dd154741a 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2158,7 +2158,7 @@ public InlineElement getDescription() { .noDefaultValue() .withDescription( "If set, committer will check OVERWRITE snapshots starting from the " - + "snapshot after this one. If a row-id overwrite barrier snapshot " + + "snapshot after this one. If an overwrite barrier snapshot " + "or an ordinary overwrite snapshot which changed target partitions " + "is found, commit will be aborted."); diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java index d661847662c8..480e29f687ec 100644 --- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java @@ -45,7 +45,7 @@ public class Snapshot implements Serializable { private static final long serialVersionUID = 1L; public static final long FIRST_SNAPSHOT_ID = 1; - public static final String ROW_ID_OVERWRITE_BARRIER_PROPERTY = "row-id-overwrite-barrier"; + public static final String OVERWRITE_BARRIER_PROPERTY = "overwrite-barrier"; protected static final int CURRENT_VERSION = 3; diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java index 0c5c80555855..b7ee0b19b836 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java @@ -150,8 +150,7 @@ public Result reassign(String commitUser) { deltaManifestList, rewrittenIndexManifest.indexManifest, assignment.nextRowId, - Collections.singletonMap( - Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true")); + Collections.singletonMap(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true")); if (!success) { throw new RuntimeException( "Failed to reassign row IDs because a newer snapshot has been committed."); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index edf18a43e131..a183fb9ab5a8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1126,7 +1126,7 @@ public boolean replaceManifestList( deltaManifestList, latest.indexManifest(), latest.nextRowId(), - withoutRowIdOverwriteBarrierProperties(latest.properties())); + withoutOverwriteBarrierProperties(latest.properties())); } public boolean replaceManifestList( @@ -1143,7 +1143,7 @@ public boolean replaceManifestList( deltaManifestList, indexManifest, nextRowId, - withoutRowIdOverwriteBarrierProperties(latest.properties())); + withoutOverwriteBarrierProperties(latest.properties())); } public boolean replaceManifestList( @@ -1254,21 +1254,20 @@ private boolean compactManifestOnce() { null, latestSnapshot.watermark(), latestSnapshot.statistics(), - withoutRowIdOverwriteBarrierProperties(latestSnapshot.properties()), + withoutOverwriteBarrierProperties(latestSnapshot.properties()), latestSnapshot.nextRowId()); return commitSnapshotImpl(newSnapshot, emptyList()); } - private static @Nullable Map withoutRowIdOverwriteBarrierProperties( + private static @Nullable Map withoutOverwriteBarrierProperties( @Nullable Map properties) { - if (properties == null - || !properties.containsKey(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY)) { + if (properties == null || !properties.containsKey(Snapshot.OVERWRITE_BARRIER_PROPERTY)) { return properties; } Map copied = new HashMap<>(properties); - copied.remove(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY); + copied.remove(Snapshot.OVERWRITE_BARRIER_PROPERTY); return copied.isEmpty() ? null : copied; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index f126312e30dc..ca26dce08901 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -580,11 +580,11 @@ private Optional checkOverwriteConflicts( if (snapshot.commitKind() != CommitKind.OVERWRITE) { continue; } - if (hasRowIdOverwriteBarrierProperty(snapshot.properties())) { + if (hasOverwriteBarrierProperty(snapshot.properties())) { return Optional.of( new RuntimeException( String.format( - "Row-id overwrite barrier snapshot %s was committed after the " + "Overwrite barrier snapshot %s was committed after the " + "task planned from snapshot %s. The task must " + "be retried with the latest row ids.", id, overwriteConflictCheckFromSnapshot))); @@ -602,9 +602,9 @@ private Optional checkOverwriteConflicts( return Optional.empty(); } - private boolean hasRowIdOverwriteBarrierProperty(@Nullable Map properties) { + private boolean hasOverwriteBarrierProperty(@Nullable Map properties) { return properties != null - && Boolean.parseBoolean(properties.get(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY)); + && Boolean.parseBoolean(properties.get(Snapshot.OVERWRITE_BARRIER_PROPERTY)); } private boolean overwriteChangedTargetPartitions( diff --git a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java index c75af66c77f3..ae6f5d3f2504 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java @@ -126,7 +126,7 @@ public void testReassignRowIdsByPartition() throws Exception { assertThat(result.rowCount).isEqualTo(5L); assertThat(result.indexFileCount).isEqualTo(0L); assertThat(table.snapshotManager().latestSnapshot().properties()) - .containsEntry(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true"); + .containsEntry(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"); Map> rowIdsByPartition = rowIdsByPartition(table); assertThat(rowIdsByPartition).hasSize(2); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 11aa419c161a..ca9411d08b76 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -1010,7 +1010,7 @@ public void testCommitManifestWithProperties() throws Exception { } @Test - public void testReplaceManifestListWithRowIdOverwriteBarrierProperty() throws Exception { + public void testReplaceManifestListWithOverwriteBarrierProperty() throws Exception { TestFileStore store = createStore(false); List keyValues = generateDataList(1); @@ -1019,7 +1019,7 @@ public void testReplaceManifestListWithRowIdOverwriteBarrierProperty() throws Ex Map barrierProperties = new HashMap<>(); barrierProperties.put("keep", "v1"); - barrierProperties.put(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true"); + barrierProperties.put(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"); try (FileStoreCommitImpl commit = store.newCommit()) { assertThat( commit.replaceManifestList( @@ -1051,11 +1051,11 @@ public void testReplaceManifestListWithRowIdOverwriteBarrierProperty() throws Ex Snapshot normalSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); assertThat(normalSnapshot.properties()) .containsEntry("keep", "v1") - .doesNotContainKey(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY); + .doesNotContainKey(Snapshot.OVERWRITE_BARRIER_PROPERTY); } @Test - public void testCompactManifestWithRowIdOverwriteBarrierProperty() throws Exception { + public void testCompactManifestWithOverwriteBarrierProperty() throws Exception { TestFileStore store = createStore(false); List keyValues = generateDataList(1); @@ -1074,7 +1074,7 @@ public void testCompactManifestWithRowIdOverwriteBarrierProperty() throws Except Map barrierProperties = new HashMap<>(); barrierProperties.put("keep", "v1"); - barrierProperties.put(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true"); + barrierProperties.put(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"); try (FileStoreCommitImpl commit = store.newCommit()) { assertThat( commit.replaceManifestList( @@ -1100,7 +1100,7 @@ public void testCompactManifestWithRowIdOverwriteBarrierProperty() throws Except assertThat(normalSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT); assertThat(normalSnapshot.properties()) .containsEntry("keep", "v1") - .doesNotContainKey(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY); + .doesNotContainKey(Snapshot.OVERWRITE_BARRIER_PROPERTY); } @Test @@ -1112,7 +1112,7 @@ public void testOverwriteConflictFromOptions() throws Exception { Snapshot latest = store.commitData(keyValues, s -> partition, kv -> 0).get(0); Map barrierProperties = new HashMap<>(); - barrierProperties.put(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true"); + barrierProperties.put(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"); try (FileStoreCommitImpl commit = store.newCommit()) { assertThat( commit.replaceManifestList( @@ -1142,12 +1142,12 @@ public void testOverwriteConflictFromOptions() throws Exception { try (FileStoreCommitImpl commit = newCommitWithSnapshotCommit( store, - "row-id-overwrite-barrier-check", + "overwrite-barrier-check", new RenamingSnapshotCommit(store.snapshotManager(), Lock.empty()), new CoreOptions(dynamicOptions), true)) { assertThatThrownBy(() -> commit.commit(checkNotNull(committableRef.get()), false)) - .hasMessageContaining("Row-id overwrite barrier snapshot 2") + .hasMessageContaining("Overwrite barrier snapshot 2") .hasMessageContaining("task planned from snapshot 1"); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index 75452fdb08ed..de5c72a576f5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -508,10 +508,10 @@ private SimpleFileEntry createFileEntryWithRowId( } @Test - void testDetectsRowIdOverwriteBarrierSnapshotConflict() { + void testDetectsOverwriteBarrierSnapshotConflict() { SnapshotManager snapshotManager = mock(SnapshotManager.class); Map barrierProperties = new HashMap<>(); - barrierProperties.put(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true"); + barrierProperties.put(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"); when(snapshotManager.snapshot(2L)) .thenReturn(snapshot(2, Snapshot.CommitKind.OVERWRITE, barrierProperties)); @@ -529,15 +529,15 @@ void testDetectsRowIdOverwriteBarrierSnapshotConflict() { assertThat(exception).isPresent(); assertThat(exception.get()) - .hasMessageContaining("Row-id overwrite barrier snapshot 2") + .hasMessageContaining("Overwrite barrier snapshot 2") .hasMessageContaining("task planned from snapshot 1"); } @Test - void testIgnoresRowIdOverwriteBarrierPropertyOnNonOverwriteSnapshot() { + void testIgnoresOverwriteBarrierPropertyOnNonOverwriteSnapshot() { SnapshotManager snapshotManager = mock(SnapshotManager.class); Map barrierProperties = new HashMap<>(); - barrierProperties.put(Snapshot.ROW_ID_OVERWRITE_BARRIER_PROPERTY, "true"); + barrierProperties.put(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"); when(snapshotManager.snapshot(2L)) .thenReturn(snapshot(2, Snapshot.CommitKind.APPEND, barrierProperties));