Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@
<td>Long</td>
<td>If set, committer will check if there are other commit user's snapshot starting from the snapshot after this one. If found a COMPACT / OVERWRITE snapshot, or found a APPEND snapshot which committed files to fixed bucket, commit will be aborted.If the value of this option is -1, committer will not check for its first commit.</td>
</tr>
<tr>
<td><h5>commit.overwrite-conflict.last-safe-snapshot</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>If set, committer will check OVERWRITE snapshots starting from the snapshot after this one. If an overwrite barrier snapshot or an ordinary overwrite snapshot which changed target partitions is found, commit will be aborted.</td>
</tr>
<tr>
<td><h5>commit.timeout</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
14 changes: 14 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2152,6 +2152,16 @@ public InlineElement getDescription() {
+ "APPEND snapshot which committed files to fixed bucket, commit will be aborted."
+ "If the value of this option is -1, committer will not check for its first commit.");

public static final ConfigOption<Long> COMMIT_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT =
ConfigOptions.key("commit.overwrite-conflict.last-safe-snapshot")
.longType()
.noDefaultValue()
.withDescription(
"If set, committer will check OVERWRITE snapshots starting from the "
+ "snapshot after this one. If an overwrite barrier snapshot "
+ "or an ordinary overwrite snapshot which changed target partitions "
+ "is found, commit will be aborted.");

public static final ConfigOption<String> CLUSTERING_COLUMNS =
key("clustering.columns")
.stringType()
Expand Down Expand Up @@ -3778,6 +3788,10 @@ public Optional<Long> commitStrictModeLastSafeSnapshot() {
return options.getOptional(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT);
}

public Optional<Long> commitOverwriteConflictLastSafeSnapshot() {
return options.getOptional(COMMIT_OVERWRITE_CONFLICT_LAST_SAFE_SNAPSHOT);
}

public List<String> clusteringColumns() {
return clusteringColumns(options.get(CLUSTERING_COLUMNS));
}
Expand Down
1 change: 1 addition & 0 deletions paimon-api/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class Snapshot implements Serializable {
private static final long serialVersionUID = 1L;

public static final long FIRST_SNAPSHOT_ID = 1;
public static final String OVERWRITE_BARRIER_PROPERTY = "overwrite-barrier";

protected static final int CURRENT_VERSION = 3;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public Result reassign(String commitUser) {
baseManifestList,
deltaManifestList,
rewrittenIndexManifest.indexManifest,
assignment.nextRowId);
assignment.nextRowId,
Collections.singletonMap(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true"));
if (!success) {
throw new RuntimeException(
"Failed to reassign row IDs because a newer snapshot has been committed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class BTreeGlobalIndexBuilder implements Serializable {
// readRowType is composed by partition fields, indexed field and _ROW_ID field
private RowType readRowType;
@Nullable private Snapshot snapshot;
@Nullable private Long scanSnapshotId;

@Nullable private PartitionPredicate partitionPredicate;

Expand Down Expand Up @@ -131,6 +132,10 @@ public BTreeGlobalIndexBuilder withSnapshot(Snapshot snapshot) {
return this;
}

public Optional<Long> scanSnapshotId() {
return Optional.ofNullable(scanSnapshotId);
}

public Optional<Pair<RowRangeIndex, List<DataSplit>>> scan() {
SnapshotReader snapshotReader = table.newSnapshotReader();
if (partitionPredicate != null) {
Expand All @@ -141,8 +146,10 @@ public Optional<Pair<RowRangeIndex, List<DataSplit>>> scan() {
? this.snapshot
: snapshotReader.snapshotManager().latestSnapshot();
if (snapshot == null) {
scanSnapshotId = null;
return Optional.empty();
}
scanSnapshotId = snapshot.id();
snapshotReader = snapshotReader.withSnapshot(snapshot);
Range dataRange = new Range(0, snapshot.nextRowId() - 1);

Expand All @@ -162,8 +169,10 @@ public Optional<Pair<RowRangeIndex, List<DataSplit>>> incrementalScan() {
? this.snapshot
: snapshotReader.snapshotManager().latestSnapshot();
if (snapshot == null) {
scanSnapshotId = null;
return Optional.empty();
}
scanSnapshotId = snapshot.id();
snapshotReader = snapshotReader.withSnapshot(snapshot);

Preconditions.checkArgument(indexField != null, "indexField must be set before scan.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ public FileStoreCommitImpl(
id))
.orElse(null);
this.conflictDetection = conflictDetectFactory.create(scanner);
options.commitOverwriteConflictLastSafeSnapshot()
.ifPresent(this.conflictDetection::setOverwriteConflictCheckFromSnapshot);
this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile);
}

Expand Down Expand Up @@ -325,6 +327,9 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
checkAppendFiles = true;
allowRollback = true;
}
if (conflictDetection.hasOverwriteConflictCheckFromSnapshot()) {
checkAppendFiles = true;
}

attempts +=
tryCommit(
Expand Down Expand Up @@ -1120,7 +1125,8 @@ public boolean replaceManifestList(
baseManifestList,
deltaManifestList,
latest.indexManifest(),
latest.nextRowId());
latest.nextRowId(),
withoutOverwriteBarrierProperties(latest.properties()));
}

public boolean replaceManifestList(
Expand All @@ -1130,6 +1136,24 @@ public boolean replaceManifestList(
Pair<String, Long> deltaManifestList,
@Nullable String indexManifest,
@Nullable Long nextRowId) {
return replaceManifestList(
latest,
totalRecordCount,
baseManifestList,
deltaManifestList,
indexManifest,
nextRowId,
withoutOverwriteBarrierProperties(latest.properties()));
}

public boolean replaceManifestList(
Snapshot latest,
long totalRecordCount,
Pair<String, Long> baseManifestList,
Pair<String, Long> deltaManifestList,
@Nullable String indexManifest,
@Nullable Long nextRowId,
@Nullable Map<String, String> properties) {
Snapshot newSnapshot =
new Snapshot(
latest.id() + 1,
Expand All @@ -1151,7 +1175,7 @@ public boolean replaceManifestList(
latest.watermark(),
latest.statistics(),
// if empty properties, just set to null
latest.properties(),
properties,
nextRowId);

return commitSnapshotImpl(newSnapshot, emptyList());
Expand Down Expand Up @@ -1230,12 +1254,23 @@ private boolean compactManifestOnce() {
null,
latestSnapshot.watermark(),
latestSnapshot.statistics(),
latestSnapshot.properties(),
withoutOverwriteBarrierProperties(latestSnapshot.properties()),
latestSnapshot.nextRowId());

return commitSnapshotImpl(newSnapshot, emptyList());
}

private static @Nullable Map<String, String> withoutOverwriteBarrierProperties(
@Nullable Map<String, String> properties) {
if (properties == null || !properties.containsKey(Snapshot.OVERWRITE_BARRIER_PROPERTY)) {
return properties;
}

Map<String, String> copied = new HashMap<>(properties);
copied.remove(Snapshot.OVERWRITE_BARRIER_PROPERTY);
return copied.isEmpty() ? null : copied;
}

private boolean commitSnapshotImpl(Snapshot newSnapshot, List<PartitionEntry> deltaStatistics) {
try {
List<PartitionStatistics> statistics = new ArrayList<>(deltaStatistics.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ protected boolean removeEldestEntry(Map.Entry<BinaryRow, Boolean> eldest) {

private @Nullable PartitionExpire partitionExpire;
private @Nullable Long rowIdCheckFromSnapshot = null;
private @Nullable Long overwriteConflictCheckFromSnapshot = null;

public ConflictDetection(
String tableName,
Expand Down Expand Up @@ -131,6 +132,15 @@ public boolean hasRowIdCheckFromSnapshot() {
return rowIdCheckFromSnapshot != null;
}

public void setOverwriteConflictCheckFromSnapshot(
@Nullable Long overwriteConflictCheckFromSnapshot) {
this.overwriteConflictCheckFromSnapshot = overwriteConflictCheckFromSnapshot;
}

public boolean hasOverwriteConflictCheckFromSnapshot() {
return overwriteConflictCheckFromSnapshot != null;
}

@Nullable
public Comparator<InternalRow> keyComparator() {
return keyComparator;
Expand Down Expand Up @@ -237,6 +247,11 @@ public Optional<RuntimeException> checkConflicts(
return exception;
}

exception = checkOverwriteConflicts(latestSnapshot, deltaEntries, deltaIndexEntries);
if (exception.isPresent()) {
return exception;
}

return checkForRowIdFromSnapshot(
latestSnapshot, deltaEntries, deltaIndexEntries, rowIdColumnConflictChecker);
}
Expand Down Expand Up @@ -544,6 +559,72 @@ private Optional<RuntimeException> checkForRowIdFromSnapshot(
return Optional.empty();
}

private Optional<RuntimeException> checkOverwriteConflicts(
Snapshot latestSnapshot,
List<SimpleFileEntry> deltaEntries,
List<IndexManifestEntry> deltaIndexEntries) {
if (!dataEvolutionEnabled) {
return Optional.empty();
}
if (overwriteConflictCheckFromSnapshot == null) {
return Optional.empty();
}
if (latestSnapshot.id() <= overwriteConflictCheckFromSnapshot) {
return Optional.empty();
}

List<BinaryRow> changedPartitions =
changedPartitionsIncludingAllIndexFiles(deltaEntries, deltaIndexEntries);
for (long id = overwriteConflictCheckFromSnapshot + 1; id <= latestSnapshot.id(); id++) {
Snapshot snapshot = snapshotManager.snapshot(id);
if (snapshot.commitKind() != CommitKind.OVERWRITE) {
continue;
}
if (hasOverwriteBarrierProperty(snapshot.properties())) {
return Optional.of(
new RuntimeException(
String.format(
"Overwrite barrier snapshot %s was committed after the "
+ "task planned from snapshot %s. The task must "
+ "be retried with the latest row ids.",
id, overwriteConflictCheckFromSnapshot)));
}
if (overwriteChangedTargetPartitions(snapshot, changedPartitions)) {
return Optional.of(
new RuntimeException(
String.format(
"Overwrite snapshot %s changed partitions after the "
+ "task planned from snapshot %s. The task must "
+ "be retried with the latest row ids.",
id, overwriteConflictCheckFromSnapshot)));
}
}
return Optional.empty();
}

private boolean hasOverwriteBarrierProperty(@Nullable Map<String, String> properties) {
return properties != null
&& Boolean.parseBoolean(properties.get(Snapshot.OVERWRITE_BARRIER_PROPERTY));
}

private boolean overwriteChangedTargetPartitions(
Snapshot snapshot, List<BinaryRow> changedPartitions) {
return !changedPartitions.isEmpty()
&& !commitScanner.readIncrementalEntries(snapshot, changedPartitions).isEmpty();
}

private List<BinaryRow> changedPartitionsIncludingAllIndexFiles(
List<SimpleFileEntry> dataFileChanges, List<IndexManifestEntry> indexFileChanges) {
Set<BinaryRow> changedPartitions = new HashSet<>();
for (SimpleFileEntry file : dataFileChanges) {
changedPartitions.add(file.partition());
}
for (IndexManifestEntry file : indexFileChanges) {
changedPartitions.add(file.partition());
}
return new ArrayList<>(changedPartitions);
}

Optional<RuntimeException> checkRowIdExistence(
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public void testReassignRowIdsByPartition() throws Exception {
assertThat(result.fileCount).isEqualTo(5L);
assertThat(result.rowCount).isEqualTo(5L);
assertThat(result.indexFileCount).isEqualTo(0L);
assertThat(table.snapshotManager().latestSnapshot().properties())
.containsEntry(Snapshot.OVERWRITE_BARRIER_PROPERTY, "true");

Map<String, List<Long>> rowIdsByPartition = rowIdsByPartition(table);
assertThat(rowIdsByPartition).hasSize(2);
Expand Down
Loading
Loading