diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index 7bc02cf87bc6..59f79c81f2ea 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -320,6 +320,12 @@ Long If set, committer will check if there are other commit user's snapshot starting from the snapshot after this one. If found a COMPACT / OVERWRITE snapshot, or found a APPEND snapshot which committed files to fixed bucket, commit will be aborted.If the value of this option is -1, committer will not check for its first commit. + +
commit.overwrite-conflict.last-safe-snapshot
+ (none) + Long + If set, committer will check OVERWRITE snapshots starting from the snapshot after this one. If an overwrite barrier snapshot or an ordinary overwrite snapshot which changed target partitions is found, commit will be aborted. +
commit.timeout
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 6b0ef75ff823..6d8dd154741a 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2152,6 +2152,16 @@ public InlineElement getDescription() { + "APPEND snapshot which committed files to fixed bucket, commit will be aborted." + "If the value of this option is -1, committer will not check for its first commit."); + public static final ConfigOption COMMIT_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT = + ConfigOptions.key("commit.overwrite-conflict.last-safe-snapshot") + .longType() + .noDefaultValue() + .withDescription( + "If set, committer will check OVERWRITE snapshots starting from the " + + "snapshot after this one. If an overwrite barrier snapshot " + + "or an ordinary overwrite snapshot which changed target partitions " + + "is found, commit will be aborted."); + public static final ConfigOption CLUSTERING_COLUMNS = key("clustering.columns") .stringType() @@ -3778,6 +3788,10 @@ public Optional commitStrictModeLastSafeSnapshot() { return options.getOptional(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT); } + public Optional commitOverwriteConflictLastSafeSnapshot() { + return options.getOptional(COMMIT_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT); + } + public List clusteringColumns() { return clusteringColumns(options.get(CLUSTERING_COLUMNS)); } diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java index 93f525a7e3fb..480e29f687ec 100644 --- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java @@ -45,6 +45,7 @@ public class Snapshot implements Serializable { private static final long serialVersionUID = 1L; public static final long FIRST_SNAPSHOT_ID = 1; + public static final String OVERWRITE_BARRIER_PROPERTY = "overwrite-barrier"; protected static final int CURRENT_VERSION = 3; diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java index 1faa4cf66ab2..b7ee0b19b836 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java @@ -149,7 +149,8 @@ public Result reassign(String commitUser) { baseManifestList, deltaManifestList, rewrittenIndexManifest.indexManifest, - assignment.nextRowId); + assignment.nextRowId, + Collections.singletonMap(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true")); if (!success) { throw new RuntimeException( "Failed to reassign row IDs because a newer snapshot has been committed."); diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java index 9f143d6cf666..ee3348843cb0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java @@ -95,6 +95,7 @@ public class BTreeGlobalIndexBuilder implements Serializable { // readRowType is composed by partition fields, indexed field and _ROW_ID field private RowType readRowType; @Nullable private Snapshot snapshot; + @Nullable private Long scanSnapshotId; @Nullable private PartitionPredicate partitionPredicate; @@ -131,6 +132,10 @@ public BTreeGlobalIndexBuilder withSnapshot(Snapshot snapshot) { return this; } + public Optional scanSnapshotId() { + return Optional.ofNullable(scanSnapshotId); + } + public Optional>> scan() { SnapshotReader snapshotReader = table.newSnapshotReader(); if (partitionPredicate != null) { @@ -141,8 +146,10 @@ public Optional>> scan() { ? this.snapshot : snapshotReader.snapshotManager().latestSnapshot(); if (snapshot == null) { + scanSnapshotId = null; return Optional.empty(); } + scanSnapshotId = snapshot.id(); snapshotReader = snapshotReader.withSnapshot(snapshot); Range dataRange = new Range(0, snapshot.nextRowId() - 1); @@ -162,8 +169,10 @@ public Optional>> incrementalScan() { ? this.snapshot : snapshotReader.snapshotManager().latestSnapshot(); if (snapshot == null) { + scanSnapshotId = null; return Optional.empty(); } + scanSnapshotId = snapshot.id(); snapshotReader = snapshotReader.withSnapshot(snapshot); Preconditions.checkArgument(indexField != null, "indexField must be set before scan."); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 3f9fdb9f1c0c..a183fb9ab5a8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -218,6 +218,8 @@ public FileStoreCommitImpl( id)) .orElse(null); this.conflictDetection = conflictDetectFactory.create(scanner); + options.commitOverwriteConflictLastSafeSnapshot() + .ifPresent(this.conflictDetection::setOverwriteConflictCheckFromSnapshot); this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile); } @@ -325,6 +327,9 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { checkAppendFiles = true; allowRollback = true; } + if (conflictDetection.hasOverwriteConflictCheckFromSnapshot()) { + checkAppendFiles = true; + } attempts += tryCommit( @@ -1120,7 +1125,8 @@ public boolean replaceManifestList( baseManifestList, deltaManifestList, latest.indexManifest(), - latest.nextRowId()); + latest.nextRowId(), + withoutOverwriteBarrierProperties(latest.properties())); } public boolean replaceManifestList( @@ -1130,6 +1136,24 @@ public boolean replaceManifestList( Pair deltaManifestList, @Nullable String indexManifest, @Nullable Long nextRowId) { + return replaceManifestList( + latest, + totalRecordCount, + baseManifestList, + deltaManifestList, + indexManifest, + nextRowId, + withoutOverwriteBarrierProperties(latest.properties())); + } + + public boolean replaceManifestList( + Snapshot latest, + long totalRecordCount, + Pair baseManifestList, + Pair deltaManifestList, + @Nullable String indexManifest, + @Nullable Long nextRowId, + @Nullable Map properties) { Snapshot newSnapshot = new Snapshot( latest.id() + 1, @@ -1151,7 +1175,7 @@ public boolean replaceManifestList( latest.watermark(), latest.statistics(), // if empty properties, just set to null - latest.properties(), + properties, nextRowId); return commitSnapshotImpl(newSnapshot, emptyList()); @@ -1230,12 +1254,23 @@ private boolean compactManifestOnce() { null, latestSnapshot.watermark(), latestSnapshot.statistics(), - latestSnapshot.properties(), + withoutOverwriteBarrierProperties(latestSnapshot.properties()), latestSnapshot.nextRowId()); return commitSnapshotImpl(newSnapshot, emptyList()); } + private static @Nullable Map withoutOverwriteBarrierProperties( + @Nullable Map properties) { + if (properties == null || !properties.containsKey(Snapshot.OVERWRITE_BARRIER_PROPERTY)) { + return properties; + } + + Map copied = new HashMap<>(properties); + copied.remove(Snapshot.OVERWRITE_BARRIER_PROPERTY); + return copied.isEmpty() ? null : copied; + } + private boolean commitSnapshotImpl(Snapshot newSnapshot, List deltaStatistics) { try { List statistics = new ArrayList<>(deltaStatistics.size()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 493d13d88dee..ca26dce08901 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -95,6 +95,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { private @Nullable PartitionExpire partitionExpire; private @Nullable Long rowIdCheckFromSnapshot = null; + private @Nullable Long overwriteConflictCheckFromSnapshot = null; public ConflictDetection( String tableName, @@ -131,6 +132,15 @@ public boolean hasRowIdCheckFromSnapshot() { return rowIdCheckFromSnapshot != null; } + public void setOverwriteConflictCheckFromSnapshot( + @Nullable Long overwriteConflictCheckFromSnapshot) { + this.overwriteConflictCheckFromSnapshot = overwriteConflictCheckFromSnapshot; + } + + public boolean hasOverwriteConflictCheckFromSnapshot() { + return overwriteConflictCheckFromSnapshot != null; + } + @Nullable public Comparator keyComparator() { return keyComparator; @@ -237,6 +247,11 @@ public Optional checkConflicts( return exception; } + exception = checkOverwriteConflicts(latestSnapshot, deltaEntries, deltaIndexEntries); + if (exception.isPresent()) { + return exception; + } + return checkForRowIdFromSnapshot( latestSnapshot, deltaEntries, deltaIndexEntries, rowIdColumnConflictChecker); } @@ -544,6 +559,72 @@ private Optional checkForRowIdFromSnapshot( return Optional.empty(); } + private Optional checkOverwriteConflicts( + Snapshot latestSnapshot, + List deltaEntries, + List deltaIndexEntries) { + if (!dataEvolutionEnabled) { + return Optional.empty(); + } + if (overwriteConflictCheckFromSnapshot == null) { + return Optional.empty(); + } + if (latestSnapshot.id() <= overwriteConflictCheckFromSnapshot) { + return Optional.empty(); + } + + List changedPartitions = + changedPartitionsIncludingAllIndexFiles(deltaEntries, deltaIndexEntries); + for (long id = overwriteConflictCheckFromSnapshot + 1; id <= latestSnapshot.id(); id++) { + Snapshot snapshot = snapshotManager.snapshot(id); + if (snapshot.commitKind() != CommitKind.OVERWRITE) { + continue; + } + if (hasOverwriteBarrierProperty(snapshot.properties())) { + return Optional.of( + new RuntimeException( + String.format( + "Overwrite barrier snapshot %s was committed after the " + + "task planned from snapshot %s. The task must " + + "be retried with the latest row ids.", + id, overwriteConflictCheckFromSnapshot))); + } + if (overwriteChangedTargetPartitions(snapshot, changedPartitions)) { + return Optional.of( + new RuntimeException( + String.format( + "Overwrite snapshot %s changed partitions after the " + + "task planned from snapshot %s. The task must " + + "be retried with the latest row ids.", + id, overwriteConflictCheckFromSnapshot))); + } + } + return Optional.empty(); + } + + private boolean hasOverwriteBarrierProperty(@Nullable Map properties) { + return properties != null + && Boolean.parseBoolean(properties.get(Snapshot.OVERWRITE_BARRIER_PROPERTY)); + } + + private boolean overwriteChangedTargetPartitions( + Snapshot snapshot, List changedPartitions) { + return !changedPartitions.isEmpty() + && !commitScanner.readIncrementalEntries(snapshot, changedPartitions).isEmpty(); + } + + private List changedPartitionsIncludingAllIndexFiles( + List dataFileChanges, List indexFileChanges) { + Set changedPartitions = new HashSet<>(); + for (SimpleFileEntry file : dataFileChanges) { + changedPartitions.add(file.partition()); + } + for (IndexManifestEntry file : indexFileChanges) { + changedPartitions.add(file.partition()); + } + return new ArrayList<>(changedPartitions); + } + Optional checkRowIdExistence( List baseEntries, List deltaEntries, diff --git a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java index 2cacf4722e1f..ae6f5d3f2504 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java @@ -125,6 +125,8 @@ public void testReassignRowIdsByPartition() throws Exception { assertThat(result.fileCount).isEqualTo(5L); assertThat(result.rowCount).isEqualTo(5L); assertThat(result.indexFileCount).isEqualTo(0L); + assertThat(table.snapshotManager().latestSnapshot().properties()) + .containsEntry(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"); Map> rowIdsByPartition = rowIdsByPartition(table); assertThat(rowIdsByPartition).hasSize(2); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 71eb081de89f..ca9411d08b76 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -57,6 +57,7 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FailingFileIO; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TraceableFileIO; @@ -1008,6 +1009,149 @@ public void testCommitManifestWithProperties() throws Exception { } } + @Test + public void testReplaceManifestListWithOverwriteBarrierProperty() throws Exception { + TestFileStore store = createStore(false); + + List keyValues = generateDataList(1); + BinaryRow partition = gen.getPartition(keyValues.get(0)); + Snapshot latest = store.commitData(keyValues, s -> partition, kv -> 0).get(0); + + Map barrierProperties = new HashMap<>(); + barrierProperties.put("keep", "v1"); + barrierProperties.put(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"); + try (FileStoreCommitImpl commit = store.newCommit()) { + assertThat( + commit.replaceManifestList( + latest, + latest.totalRecordCount(), + baseManifestList(latest), + deltaManifestList(latest), + latest.indexManifest(), + latest.nextRowId(), + barrierProperties)) + .isTrue(); + } + + Snapshot barrierSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(barrierSnapshot.properties()).isEqualTo(barrierProperties); + + try (FileStoreCommitImpl commit = store.newCommit()) { + assertThat( + commit.replaceManifestList( + barrierSnapshot, + barrierSnapshot.totalRecordCount(), + baseManifestList(barrierSnapshot), + deltaManifestList(barrierSnapshot), + barrierSnapshot.indexManifest(), + barrierSnapshot.nextRowId())) + .isTrue(); + } + + Snapshot normalSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(normalSnapshot.properties()) + .containsEntry("keep", "v1") + .doesNotContainKey(Snapshot.OVERWRITE_BARRIER_PROPERTY); + } + + @Test + public void testCompactManifestWithOverwriteBarrierProperty() throws Exception { + TestFileStore store = createStore(false); + + List keyValues = generateDataList(1); + BinaryRow partition = gen.getPartition(keyValues.get(0)); + store.commitData(keyValues, s -> partition, kv -> 0); + store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()); + Snapshot latest = + store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()) + .get(0); + + long deleteNum = + store.manifestListFactory().create().readDataManifests(latest).stream() + .mapToLong(ManifestFileMeta::numDeletedFiles) + .sum(); + assertThat(deleteNum).isGreaterThan(0); + + Map barrierProperties = new HashMap<>(); + barrierProperties.put("keep", "v1"); + barrierProperties.put(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"); + try (FileStoreCommitImpl commit = store.newCommit()) { + assertThat( + commit.replaceManifestList( + latest, + latest.totalRecordCount(), + baseManifestList(latest), + deltaManifestList(latest), + latest.indexManifest(), + latest.nextRowId(), + barrierProperties)) + .isTrue(); + } + + Snapshot barrierSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(barrierSnapshot.properties()).isEqualTo(barrierProperties); + + try (FileStoreCommit commit = store.newCommit()) { + commit.compactManifest(); + } + + Snapshot normalSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(normalSnapshot.id()).isGreaterThan(barrierSnapshot.id()); + assertThat(normalSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT); + assertThat(normalSnapshot.properties()) + .containsEntry("keep", "v1") + .doesNotContainKey(Snapshot.OVERWRITE_BARRIER_PROPERTY); + } + + @Test + public void testOverwriteConflictFromOptions() throws Exception { + TestFileStore store = createStore(false); + + List keyValues = generateDataList(1); + BinaryRow partition = gen.getPartition(keyValues.get(0)); + Snapshot latest = store.commitData(keyValues, s -> partition, kv -> 0).get(0); + + Map barrierProperties = new HashMap<>(); + barrierProperties.put(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"); + try (FileStoreCommitImpl commit = store.newCommit()) { + assertThat( + commit.replaceManifestList( + latest, + latest.totalRecordCount(), + baseManifestList(latest), + deltaManifestList(latest), + latest.indexManifest(), + latest.nextRowId(), + barrierProperties)) + .isTrue(); + } + + AtomicReference committableRef = new AtomicReference<>(); + store.commitDataImpl( + generateDataList(1), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> committableRef.set(committable)); + + Map dynamicOptions = new HashMap<>(store.options().toMap()); + dynamicOptions.put(CoreOptions.COMMIT_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT.key(), "1"); + try (FileStoreCommitImpl commit = + newCommitWithSnapshotCommit( + store, + "overwrite-barrier-check", + new RenamingSnapshotCommit(store.snapshotManager(), Lock.empty()), + new CoreOptions(dynamicOptions), + true)) { + assertThatThrownBy(() -> commit.commit(checkNotNull(committableRef.get()), false)) + .hasMessageContaining("Overwrite barrier snapshot 2") + .hasMessageContaining("task planned from snapshot 1"); + } + } + @Test public void testCommitTwiceWithDifferentKind() throws Exception { TestFileStore store = createStore(false); @@ -1082,6 +1226,20 @@ public void testCommitRetryAfterFalseSuccessDoesNotCleanManifest() throws Except private FileStoreCommitImpl newCommitWithSnapshotCommit( TestFileStore store, String commitUser, SnapshotCommit snapshotCommit) { + return newCommitWithSnapshotCommit( + store, + commitUser, + snapshotCommit, + store.options(), + store.options().dataEvolutionEnabled()); + } + + private FileStoreCommitImpl newCommitWithSnapshotCommit( + TestFileStore store, + String commitUser, + SnapshotCommit snapshotCommit, + CoreOptions options, + boolean dataEvolutionEnabled) { String tableName = store.options().path().getName(); return new FileStoreCommitImpl( snapshotCommit, @@ -1090,7 +1248,7 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit( tableName, commitUser, store.partitionType(), - store.options(), + options, store.pathFactory(), store.snapshotManager(), store.manifestFileFactory(), @@ -1109,9 +1267,9 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit( store.pathFactory(), store.newKeyComparator(), store.bucketMode(), - store.options().deletionVectorsEnabled(), - store.options().dataEvolutionEnabled(), - store.options().pkClusteringOverride(), + options.deletionVectorsEnabled(), + dataEvolutionEnabled, + options.pkClusteringOverride(), store.newIndexFileHandler(), store.snapshotManager(), scanner), @@ -1153,6 +1311,14 @@ private TestFileStore createStore(boolean failing, Map options) return createStore(failing, 1, CoreOptions.ChangelogProducer.NONE, options); } + private Pair baseManifestList(Snapshot snapshot) { + return Pair.of(snapshot.baseManifestList(), snapshot.baseManifestListSize()); + } + + private Pair deltaManifestList(Snapshot snapshot) { + return Pair.of(snapshot.deltaManifestList(), snapshot.deltaManifestListSize()); + } + private TestFileStore createStore(boolean failing) throws Exception { return createStore(failing, 1); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index 1c36b9e09ee7..de5c72a576f5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -18,15 +18,20 @@ package org.apache.paimon.operation.commit; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.index.DeletionVectorMeta; +import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.manifest.SimpleFileEntryWithDV; import org.apache.paimon.table.BucketMode; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.api.Test; @@ -36,8 +41,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; @@ -47,6 +54,8 @@ import static org.apache.paimon.operation.commit.ConflictDetection.buildBaseEntriesWithDV; import static org.apache.paimon.operation.commit.ConflictDetection.buildDeltaEntriesWithDV; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class ConflictDetectionTest { @@ -330,6 +339,15 @@ private IndexManifestEntry createDvIndexEntry( DELETION_VECTORS_INDEX, fileName, 11, dvRanges.size(), dvRanges, null)); } + private IndexManifestEntry createIndexEntry( + String fileName, FileKind kind, BinaryRow partition) { + return new IndexManifestEntry( + kind, + partition, + 0, + new IndexFileMeta("btree", fileName, 11, 1, (GlobalIndexMeta) null, null)); + } + private void assertConflict( List baseEntries, List deltaEntries) { ArrayList simpleFileEntryWithDVS = new ArrayList<>(baseEntries); @@ -489,7 +507,122 @@ private SimpleFileEntry createFileEntryWithRowId( firstRowId); } + @Test + void testDetectsOverwriteBarrierSnapshotConflict() { + SnapshotManager snapshotManager = mock(SnapshotManager.class); + Map barrierProperties = new HashMap<>(); + barrierProperties.put(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"); + when(snapshotManager.snapshot(2L)) + .thenReturn(snapshot(2, Snapshot.CommitKind.OVERWRITE, barrierProperties)); + + ConflictDetection detection = createConflictDetection(snapshotManager); + detection.setOverwriteConflictCheckFromSnapshot(1L); + + Optional exception = + detection.checkConflicts( + snapshot(3, null), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + null, + Snapshot.CommitKind.COMPACT); + + assertThat(exception).isPresent(); + assertThat(exception.get()) + .hasMessageContaining("Overwrite barrier snapshot 2") + .hasMessageContaining("task planned from snapshot 1"); + } + + @Test + void testIgnoresOverwriteBarrierPropertyOnNonOverwriteSnapshot() { + SnapshotManager snapshotManager = mock(SnapshotManager.class); + Map barrierProperties = new HashMap<>(); + barrierProperties.put(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"); + when(snapshotManager.snapshot(2L)) + .thenReturn(snapshot(2, Snapshot.CommitKind.APPEND, barrierProperties)); + + ConflictDetection detection = createConflictDetection(snapshotManager); + detection.setOverwriteConflictCheckFromSnapshot(1L); + + Optional exception = + detection.checkConflicts( + snapshot(2, null), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + null, + Snapshot.CommitKind.COMPACT); + + assertThat(exception).isNotPresent(); + } + + @Test + void testDetectsOverlappedOverwriteSnapshotForIndexCommit() { + SnapshotManager snapshotManager = mock(SnapshotManager.class); + CommitScanner commitScanner = mock(CommitScanner.class); + Snapshot overwriteSnapshot = snapshot(2, Snapshot.CommitKind.OVERWRITE, null); + when(snapshotManager.snapshot(2L)).thenReturn(overwriteSnapshot); + + BinaryRow partition = BinaryRow.singleColumn(1); + when(commitScanner.readIncrementalEntries( + overwriteSnapshot, Collections.singletonList(partition))) + .thenReturn(Collections.singletonList(mock(ManifestEntry.class))); + + ConflictDetection detection = createConflictDetection(snapshotManager, commitScanner); + detection.setOverwriteConflictCheckFromSnapshot(1L); + + Optional exception = + detection.checkConflicts( + snapshot(2, null), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(createIndexEntry("idx", ADD, partition)), + null, + Snapshot.CommitKind.COMPACT); + + assertThat(exception).isPresent(); + assertThat(exception.get()) + .hasMessageContaining("Overwrite snapshot 2") + .hasMessageContaining("task planned from snapshot 1"); + } + + @Test + void testIgnoresNonOverlappedOverwriteSnapshotForIndexCommit() { + SnapshotManager snapshotManager = mock(SnapshotManager.class); + CommitScanner commitScanner = mock(CommitScanner.class); + Snapshot overwriteSnapshot = snapshot(2, Snapshot.CommitKind.OVERWRITE, null); + when(snapshotManager.snapshot(2L)).thenReturn(overwriteSnapshot); + + BinaryRow partition = BinaryRow.singleColumn(1); + when(commitScanner.readIncrementalEntries( + overwriteSnapshot, Collections.singletonList(partition))) + .thenReturn(Collections.emptyList()); + + ConflictDetection detection = createConflictDetection(snapshotManager, commitScanner); + detection.setOverwriteConflictCheckFromSnapshot(1L); + + Optional exception = + detection.checkConflicts( + snapshot(2, null), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(createIndexEntry("idx", ADD, partition)), + null, + Snapshot.CommitKind.COMPACT); + + assertThat(exception).isNotPresent(); + } + private ConflictDetection createConflictDetection() { + return createConflictDetection(null); + } + + private ConflictDetection createConflictDetection(@Nullable SnapshotManager snapshotManager) { + return createConflictDetection(snapshotManager, null); + } + + private ConflictDetection createConflictDetection( + @Nullable SnapshotManager snapshotManager, @Nullable CommitScanner commitScanner) { return new ConflictDetection( "test-table", "test-user", @@ -501,7 +634,36 @@ private ConflictDetection createConflictDetection() { true, false, null, + snapshotManager, + commitScanner); + } + + private Snapshot snapshot(long id, @Nullable Map properties) { + return snapshot(id, Snapshot.CommitKind.APPEND, properties); + } + + private Snapshot snapshot( + long id, Snapshot.CommitKind commitKind, @Nullable Map properties) { + return new Snapshot( + id, + 0, + null, + null, + null, + null, + null, + null, + null, + "commit-user", + id, + commitKind, + id, + 0, + 0, + null, + null, null, + properties, null); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index 9542eea4a435..f70b8bf68911 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -27,6 +27,7 @@ import org.apache.paimon.flink.FlinkRowData; import org.apache.paimon.flink.FlinkRowWrapper; import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.flink.globalindex.GlobalIndexCommitUtils; import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.CommittableTypeInfo; import org.apache.paimon.flink.sink.CommitterOperatorFactory; @@ -95,6 +96,7 @@ public static boolean buildIndex( Options userOptions) throws Exception { List> allStreams = new ArrayList<>(); + Long overwriteConflictCheckFromSnapshot = null; for (String indexColumn : indexColumns) { BTreeGlobalIndexBuilder indexBuilder = indexBuilderSupplier.get().withIndexField(indexColumn); @@ -107,6 +109,12 @@ public static boolean buildIndex( if (!indexRangeAndSplits.isPresent()) { continue; } + if (indexBuilder.scanSnapshotId().isPresent()) { + overwriteConflictCheckFromSnapshot = + minSnapshot( + overwriteConflictCheckFromSnapshot, + indexBuilder.scanSnapshotId().get()); + } Pair> scanResult = indexRangeAndSplits.get(); List splits = splitByContiguousRowRange(scanResult.getRight()); @@ -194,13 +202,17 @@ public static boolean buildIndex( @SuppressWarnings("unchecked") DataStream[] rest = allStreams.subList(1, allStreams.size()).toArray(new DataStream[0]); - commit(table, allStreams.get(0).union(rest)); + commit(table, allStreams.get(0).union(rest), overwriteConflictCheckFromSnapshot); return true; } return false; } + private static Long minSnapshot(Long left, long right) { + return left == null ? right : Math.min(left, right); + } + public static void buildIndexAndExecute( StreamExecutionEnvironment env, FileStoreTable table, @@ -312,7 +324,13 @@ private static RowType withBuildTaskId(RowType readType, String buildTaskIdField return new RowType(readType.isNullable(), fields); } - private static void commit(FileStoreTable table, DataStream written) { + private static void commit( + FileStoreTable table, + DataStream written, + Long overwriteConflictCheckFromSnapshot) { + FileStoreTable commitTable = + GlobalIndexCommitUtils.withOverwriteConflictCheck( + table, overwriteConflictCheckFromSnapshot); OneInputStreamOperatorFactory committerOperator = new CommitterOperatorFactory<>( false, @@ -320,7 +338,9 @@ private static void commit(FileStoreTable table, DataStream written "BTreeIndexCommitter-" + UUID.randomUUID(), context -> new StoreCommitter( - table, table.newCommit(context.commitUser()), context), + commitTable, + commitTable.newCommit(context.commitUser()), + context), new NoopCommittableStateManager()); written.transform("COMMIT OPERATOR", new CommittableTypeInfo(), committerOperator) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java index 48ea935f5b62..da4644524c1c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.globalindex; +import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.partition.PartitionPredicate; @@ -28,6 +29,7 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; +import java.util.Optional; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -39,6 +41,7 @@ public class GenericGlobalIndexBuilder implements Serializable { protected final FileStoreTable table; @Nullable protected PartitionPredicate partitionPredicate; + @Nullable protected Long scanSnapshotId; public GenericGlobalIndexBuilder(FileStoreTable table) { this.table = table; @@ -53,6 +56,10 @@ public FileStoreTable table() { return table; } + public Optional scanSnapshotId() { + return Optional.ofNullable(scanSnapshotId); + } + /** * Scans manifest entries to determine which files need to be indexed. * @@ -72,7 +79,19 @@ public List scan() { + "deleted rows to be indexed.", table.name()); - return table.store().newScan().withPartitionFilter(partitionPredicate).plan().files(); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + if (snapshot == null) { + scanSnapshotId = null; + return Collections.emptyList(); + } + scanSnapshotId = snapshot.id(); + + return table.store() + .newScan() + .withSnapshot(snapshot) + .withPartitionFilter(partitionPredicate) + .plan() + .files(); } /** Returns old index file entries that should be deleted after new indexes are built. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java index e37970723c9c..6d9cd92b5a12 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java @@ -179,6 +179,7 @@ public static boolean buildIndex( List entries = indexBuilder.scan(); List deletedIndexEntries = indexBuilder.deletedIndexEntries(); + Long overwriteConflictCheckFromSnapshot = indexBuilder.scanSnapshotId().orElse(null); return buildTopology( env, @@ -188,7 +189,8 @@ public static boolean buildIndex( userOptions, entries, deletedIndexEntries, - maxIndexedRowId); + maxIndexedRowId, + overwriteConflictCheckFromSnapshot); } /** @@ -208,7 +210,8 @@ private static boolean buildTopology( Options userOptions, List entries, List deletedIndexEntries, - long maxIndexedRowId) + long maxIndexedRowId, + Long overwriteConflictCheckFromSnapshot) throws Exception { long totalRowCount = entries.stream().mapToLong(e -> e.file().rowCount()).sum(); LOG.info( @@ -294,7 +297,7 @@ private static boolean buildTopology( built = built.union(deletes); } - commit(table, indexType, built); + commit(table, indexType, built, overwriteConflictCheckFromSnapshot); return true; } @@ -506,7 +509,13 @@ private static List createDeleteCommittables( } private static void commit( - FileStoreTable table, String indexType, DataStream written) { + FileStoreTable table, + String indexType, + DataStream written, + Long overwriteConflictCheckFromSnapshot) { + FileStoreTable commitTable = + GlobalIndexCommitUtils.withOverwriteConflictCheck( + table, overwriteConflictCheckFromSnapshot); OneInputStreamOperatorFactory committerOperator = new CommitterOperatorFactory<>( false, @@ -514,7 +523,9 @@ private static void commit( "GenericIndexCommitter-" + indexType + "-" + UUID.randomUUID(), context -> new StoreCommitter( - table, table.newCommit(context.commitUser()), context), + commitTable, + commitTable.newCommit(context.commitUser()), + context), new NoopCommittableStateManager()); written.transform("COMMIT OPERATOR", new CommittableTypeInfo(), committerOperator) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java new file mode 100644 index 000000000000..44ff40120d1e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GlobalIndexCommitUtils.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.globalindex; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.table.FileStoreTable; + +import javax.annotation.Nullable; + +import java.util.Collections; + +/** Utilities for global index commit topology. */ +public final class GlobalIndexCommitUtils { + + private GlobalIndexCommitUtils() {} + + public static FileStoreTable withOverwriteConflictCheck( + FileStoreTable table, @Nullable Long overwriteConflictCheckFromSnapshot) { + if (overwriteConflictCheckFromSnapshot == null) { + return table; + } + return table.copy( + Collections.singletonMap( + CoreOptions.COMMIT_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT.key(), + String.valueOf(overwriteConflictCheckFromSnapshot))); + } +}