-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: use correct scan mode when in TABLE_SCAN_THEN_INCREMENTAL mode #7338
Conversation
BATCH, | ||
INCREMENTAL_APPEND_SCAN | ||
} | ||
|
||
private static ScanMode checkScanMode(ScanContext context) { | ||
@VisibleForTesting | ||
static ScanMode checkScanMode(ScanContext context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chenjunjiedada thx for catching the bug and creating the PR fix.
For the conditions here, is there any other simpler logic? E.g., is it enough to just remove the context.isStreaming()
condition in the original if clause?
Also I think it is better safer/more clear to construct a new ScanContext
object and set the useSnapshotId
.
if (scanContext.streamingStartingStrategy()
== StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
// do a batch table scan first
splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext, workerPool);
LOG.info(
"Discovered {} splits from initial batch table scan with snapshot Id {}",
splits.size(),
startSnapshot.snapshotId());
// For TABLE_SCAN_THEN_INCREMENTAL, incremental mode starts exclusive from the startSnapshot
toPosition =
IcebergEnumeratorPosition.of(startSnapshot.snapshotId(), startSnapshot.timestampMillis());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the conditions here, is there any other simpler logic? E.g., is it enough to just remove the context.isStreaming() condition in the original if clause?
Yes, it looks more simple and more direct.
Also I think it is better safer/more clear to construct a new ScanContext object and set the useSnapshotId.
Agree, we can use scanContext.copyWithSnapshotId
to achieve that.
@chenjunjiedada thx for finding and fixing this bug |
@chenjunjiedada can you create a backport PR too? |
When consuming a table in
TABLE_SCAN_THEN_INCREMENTAL
mode and its snapshot history has expired, data can be lost. This is becausecheckScanMode
returns incremental mode when the scan context is streaming. To address this issue, we have added a case to handle theTABLE_SCAN_THEN_INCREMENTAL
mode.