[flink] Fix batch fallback generating mixed split types for primary-key tables#3296
[flink] Fix batch fallback generating mixed split types for primary-key tables#3296matrixsparse wants to merge 1 commit into
Conversation
09b5acd to
7ac1f8c
Compare
|
Hi @luoyuxia, this is a follow-up fix for the mixed split types issue you mentioned in #3208. Could you PTAL? Thanks! cc @fresh-borzoni |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@matrixsparse Ty, LGTM in general, one comment:
It matches spark logic, but at the same time this scenario is a bit dangerous - we have a big table that was never tiered to lake, then we decide to tier it to lake and run batched query through this fallback, instead of using kv_snapshot and replaying log on top, we read from earliest which is potentially a lot of records. So it's not very efficient and potentially OOM prone.
Let's file an issue about this to address separately for Spark and Flink?
cc @luoyuxia WDYT about this plan?
luoyuxia
left a comment
There was a problem hiding this comment.
@matrixsparse Thanks for the quick fix. Left on comments. PTAL
| // Use log-only splits to avoid generating mixed split | ||
| // types (HybridSnapshotLogSplit + LogSplit) for | ||
| // primary-key tables, which is not supported. | ||
| splits = this.initLogTablePartitionSplits(partitions); |
There was a problem hiding this comment.
Note it'll just genereate log split without stopping offset which will then nerver stop..
There was a problem hiding this comment.
@matrixsparse Good catch! Updated getLogSplit() to use stoppingOffsetsInitializer.getBucketOffsets() and pass the stopping offset to the 4-parameter LogSplit constructor. PTAL.
7ac1f8c to
927825a
Compare
|
@matrixsparse @luoyuxia filed followups: |
Summary
Follow-up fix for #3208.
For primary-key tables in batch mode, when no lake snapshot exists, the previous
fallback logic had two issues:
Mixed split types:
initPartitionedSplits()/initNonPartitionedSplits()may produce both
HybridSnapshotLogSplitandLogSplit, which the Flinkconnector does not support merging.
Missing stopping offset:
getLogSplit()used the 3-parameterLogSplitconstructor without stopping offset, causing the reader to never stop in
batch mode.
Changes
initLogTablePartitionSplits()/getLogSplit()in the fallback pathto generate uniform
LogSplitfor all buckets.stoppingOffsetsInitializer.getBucketOffsets()ingetLogSplit()toprovide stopping offsets for batch mode bounded reads.