[core] improve insert overwrite conflict detection#7595
Conversation
| readIncrementalChanges( | ||
| snapshotManager.snapshot(baseSnapshotId), | ||
| latestSnapshot, | ||
| partitionFilter), | ||
| changedPartitions); | ||
| Collection<SimpleFileEntry> mergedIncremental = |
There was a problem hiding this comment.
[Critical] 这里把 readIncrementalChanges(...) 调成了当前类方法,但实际定义在 scanner 上;同时 Collection<SimpleFileEntry> 缺少 java.util.Collection import。这两处都会导致新增冲突检测逻辑无法编译。
建议改为 scanner.readIncrementalChanges(...),并补充 java.util.Collection import。
— gpt-5.4 via Qwen Code /review
wenshao
left a comment
There was a problem hiding this comment.
Request changes — one inline comment was posted successfully, and two findings could not be attached to diff lines so they are included below.
-
[Critical]
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java:620,626
tryOverwritePartition新增baseSnapshotId参数后,这两处内部调用仍沿用旧签名,当前参数个数不匹配,paimon-core编译会直接失败。
建议在dropPartitions和truncateTable里的调用末尾补传null,保持非 sort compact 路径的现有行为,例如:tryOverwritePartition(..., new HashMap<>(), null); -
[Critical]
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java:343
SortCompactAction已经设置了withOverwriteBaseSnapshot(readSnapshotId),但 unaware-bucket 路径这里没有继续把overwriteBaseSnapshotId传给最终 sink/committer。这样在bucket = -1的 Flink sort compact 场景下,新增冲突检测实际上不会生效。
建议让 unaware-bucket 路径和 dynamic-bucket 路径一样透传overwriteBaseSnapshotId,并补一个对应的 Flink 测试覆盖该场景。
Reviewed by gpt-5.4 via Qwen Code /review
2.improve conflict detection
JingsongLi
left a comment
There was a problem hiding this comment.
This is a critical data safety fix. The data loss scenario during sort compact with concurrent writes is well-analyzed.
The timeline demonstrating the bug is clear and scary: file D gets deleted without its data appearing in the sorted output.
Solution review:
-
Part 1 (Pin DELETE to base snapshot): Correct. The DELETE list should only contain files that were actually read and included in the sorted output. Building from
latestSnapshotis fundamentally unsafe with concurrent writers. -
Part 2 (Concurrent write detection): Using
readIncrementalChanges(baseSnapshot, latestSnapshot)to detect new files in the overwritten partitions, then failing with conflict error — this is the right approach. Fail-loud is better than data loss. -
556 additions across core commit logic, Flink actions, and Spark writer. This is appropriately scoped for a correctness fix that spans the commit protocol.
Questions:
-
Is the conflict detection granularity at partition level or file level? Partition-level might cause false conflicts (concurrent write to same partition but different bucket). File-level would be more precise but more complex.
-
What happens for non-partitioned tables? The "overwritten partitions" concept doesn't apply — is the entire table treated as one partition?
-
The test
SortCompactActionForAppendTableITCase— does it reproduce the exact race condition (concurrent write between read and commit)?
This should be high priority for merge. Silent data loss is the worst class of bug.
Purpose
Problem
Sort Compact reads data from snapshot S_read, sorts it, then commits as OVERWRITE. However,
tryOverwritePartitionbuilds the DELETE list fromlatestSnapshot(which may beS_read+N). If concurrent writes add new files between S_read and latestSnapshot, those files are deleted without their data being included in the sorted output, causing silent
data loss.
Root Cause
Timeline:
Solution
Part 1: Pin DELETE list to base snapshot
Build the DELETE list from the snapshot the sort compact actually read from (base snapshot) instead of latestSnapshot. This ensures we only delete files that were included in the
sorted output.
Part 2: Concurrent write detection
When the base snapshot differs from the latest snapshot, use
readIncrementalChanges(baseSnapshot, latestSnapshot)to detect new files added to the overwritten partitions. If newfiles exist, fail the commit with a clear conflict error instead of silently losing data.
Changes
Core
FileStoreCommit.java: Add@Nullable Long baseSnapshotIdparameter tooverwritePartition()FileStoreCommitImpl.java:tryOverwritePartition()to build DELETE list frombaseSnapshotInnerTableCommit.java: AddwithOverwriteBaseSnapshot(@Nullable Long snapshotId)methodTableCommitImpl.java: ImplementwithOverwriteBaseSnapshot()and passbaseSnapshotIdto commitFlink
SortCompactAction.java: Capture read snapshot ID before building source, pass to sink builderFlinkSinkBuilder.java: AddoverwriteBaseSnapshotIdfield and setterFlinkWriteSink.java: AddoverwriteBaseSnapshotIdfield and setter, pass to committerSpark
CompactProcedure.java: Capture read snapshot ID for sort compact operationsPaimonSparkWriter.scala: AddwithOverwriteBaseSnapshot()methodBackward Compatibility
When
baseSnapshotId == null,tryOverwritePartitionfalls back to current behavior (latestSnapshot), so existing code paths (SQL INSERT OVERWRITE, DROP PARTITION, TRUNCATE,etc.) are unaffected.
The concurrent write detection only activates when:
baseSnapshotId != null(explicitly set by sort compact)Data Flow After Fix
Tests