[core] Add merge-on-read option for DV table batch L0 visibility#7948
Conversation
…bility In shared dataset scenarios with dedicated compaction jobs, DV tables skip level 0 files by default. This adds a config option to allow reading uncompacted data via merge-on-read, trading read performance for data freshness.
JingsongLi
left a comment
There was a problem hiding this comment.
Thanks for the PR, @JunRuiLee. The motivation is clear and the implementation is focused. A few comments:
Overall
The core logic change in batchScanSkipLevel0() is correct and preserves backward compatibility. The mutual exclusion validation with visibility-callback.enabled is a good safeguard. Test coverage (core + Flink IT) is solid.
Below are a few inline suggestions.
| @@ -3649,7 +3660,10 @@ public boolean forceLookup() { | |||
There was a problem hiding this comment.
Suggestion: add a helper accessor method
For consistency with other boolean options (e.g., deletionVectorsEnabled(), visibilityCallbackEnabled()), consider adding a dedicated accessor method:
public boolean deletionVectorsMergeOnRead() {
return options.get(DELETION_VECTORS_MERGE_ON_READ);
}Then batchScanSkipLevel0() can call deletionVectorsMergeOnRead() instead of directly accessing the config, and SchemaValidation can also use options.deletionVectorsMergeOnRead() instead of options.toConfiguration().get(CoreOptions.DELETION_VECTORS_MERGE_ON_READ).
There was a problem hiding this comment.
Good call, added deletionVectorsMergeOnRead() and updated both callers.
|
|
||
| checkArgument( | ||
| !(options.toConfiguration().get(CoreOptions.DELETION_VECTORS_MERGE_ON_READ) | ||
| && options.visibilityCallbackEnabled()), |
There was a problem hiding this comment.
Consider validating merge-on-read requires DV enabled
validateForDeletionVectors is only called when deletionVectorsEnabled() is true. If a user sets deletion-vectors.merge-on-read=true without deletion-vectors.enabled=true, the option is silently ignored. This could be a confusing misconfiguration.
Consider adding a check (either here or in the general validation section):
checkArgument(
!(options.toConfiguration().get(CoreOptions.DELETION_VECTORS_MERGE_ON_READ)
&& !options.deletionVectorsEnabled()),
"deletion-vectors.merge-on-read requires deletion-vectors.enabled to be true.");There was a problem hiding this comment.
Makes sense. Added a check in the general validation section so it fails early with a clear message. Also added a test for this case.
| sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')"); | ||
|
|
||
| sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')"); | ||
|
|
There was a problem hiding this comment.
Inconsistency with core test testDefaultSkipsLevel0
The core-level test DeletionVectorsMergeOnReadTest#testDefaultSkipsLevel0 asserts an empty result when reading without merge-on-read (all files at level 0, none visible). But this Flink IT test asserts the first commit's data is visible without merge-on-read.
This difference is likely because the core test explicitly sets bucket=1 (fixed bucket) while this Flink test doesn't (defaults to dynamic bucket mode, which may place initial files differently). Could be worth:
- Adding a brief comment explaining why the first commit's data is visible here (e.g., files from the first snapshot are at level > 0 in this config), or
- Aligning the two tests to use the same bucket configuration for consistency.
There was a problem hiding this comment.
Fixed — added 'bucket' = '1' to align with the core test, and changed the assertion to isEmpty() for consistency.
- Add deletionVectorsMergeOnRead() accessor in CoreOptions - Validate merge-on-read requires deletion-vectors.enabled - Simplify SchemaValidation to use accessor instead of raw config get - Align Flink IT test with core test (fixed bucket, consistent assertions) - Add testMergeOnReadRequiresDvEnabled validation test
| } | ||
|
|
||
| checkArgument( | ||
| !(options.deletionVectorsMergeOnRead() && options.visibilityCallbackEnabled()), |
There was a problem hiding this comment.
visibilityCallbackEnabled also works for postpone bucket. So it is meaningful to enable deletion-vectors.merge-on-read together with visibility-callback.enabled
There was a problem hiding this comment.
Good point. Removed the conflict check. Added positive validation tests and extended VisibilityCallbackTest to cover (dv=true, postpone=true) with merge-on-read=true.
Remove the mutual exclusion check between deletion-vectors.merge-on-read and visibility-callback.enabled. The two features are not fundamentally conflicting: merge-on-read controls read-side L0 file visibility, while visibility-callback is a commit callback that waits for compaction. They can meaningfully coexist especially in postpone bucket scenarios.
Motivation
In some production shared-dataset deployments, compaction is separated from writers and runs as dedicated jobs. For DV primary-key tables, batch scans skip level-0 files by default, so newly written uncompacted data is not visible until compaction finishes. This means readers depend on compaction progress to see the latest data.
visibility-callback.enableddoes not fit this requirement because it also relies on compaction: commits are returned only after compaction makes the data visible through the optimized read path. In this scenario, users want to read the latest committed data without waiting for compaction.Changes
This PR adds a new option
deletion-vectors.merge-on-readfor DV tables. When enabled, batch scans include DV level-0 files and merge them at read time, so readers can see uncompacted committed data without depending on compaction. This trades read performance for fresher batch results.This option only affects batch scan visibility of DV level-0 files. It does not change streaming scan or changelog behavior.
deletion-vectors.merge-on-readcore option (defaultfalse).CoreOptions.batchScanSkipLevel0()).visibility-callback.enabledinSchemaValidation.