[fix](streaming-job) Avoid NPE on cross-table DML during snapshot chunk read#63435
[fix](streaming-job) Avoid NPE on cross-table DML during snapshot chunk read#63435JNSimba wants to merge 3 commits into
Conversation
|
run buildall |
|
run buildall |
|
/review |
|
run buildall |
There was a problem hiding this comment.
Review result: request changes.
Critical checkpoint conclusions:
- Goal/test: The PR appears intended to avoid NPEs for cross-table DML during snapshot chunk reads, but the added class currently does not compile, so the goal is not proven. I did not find a passing test result in the PR context.
- Scope/focus: The live GitHub PR diff is focused on one added CDC fetcher file. The prompt-provided head SHA differs from the live PR head reported by GitHub, so inline comments were anchored to the live PR diff accepted by GitHub.
- Concurrency/lifecycle: The class uses a single-thread executor and atomic flags; no additional confirmed concurrency/lifecycle issue was reviewed beyond the compile blocker.
- Config/compatibility/persistence/data writes: Not applicable to this live diff.
- Parallel paths: This is a copied replacement class for the CDC reader path; the immediate blocker prevents validating runtime behavior.
- Tests/results: Build coverage is insufficient because a missing import would be caught by compilation. No generated result files are involved.
- Observability/performance: No separate confirmed issue found.
User focus: No additional user-provided review focus was specified.
|
/review |
There was a problem hiding this comment.
Reviewed the actual GitHub PR diff for apache/doris#63435. GitHub reports this PR only adds fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java; the broader local two-dot diff was not part of the reviewable PR file set.
No additional blocking issues found beyond the existing inline review context. The existing thread mentions a missing TableId import; the current PR diff includes import io.debezium.relational.TableId;, so I did not submit a duplicate inline comment.
Critical checkpoint conclusions:
- Goal/test proof: the change copies Flink CDC 3.6.0
IncrementalSourceScanFetcherand adds a table-id guard before range checking backfill change records. The code path matches the intended FLINK-39633-style fix; no dedicated test was added in this PR. - Scope/focus: the actual PR diff is small and focused on the CDC fetcher override.
- Concurrency/lifecycle: no new concurrency or lifecycle issue found; existing single-thread executor and close behavior are unchanged from upstream.
- Configuration/API compatibility: no config or API compatibility issue found in the actual PR file.
- Functionally parallel paths: no parallel in-repo implementation was found in the PR diff; this class intentionally shadows the dependency class.
- Special checks: the added
recordTableId == null || !recordTableId.equals(currentSnapshotSplit.getTableId())guard is appropriate to avoid evaluating split keys for other tables. - Test coverage: compile verification was attempted with
mvn -q -DskipTests compileunderfs_brokers/cdc_client, but it failed on existing missingorg.apache.doris.job.cdc.*dependencies before isolating this file. - User focus: no additional user-provided review focus was supplied.
What problem does this PR solve?
During the snapshot phase, the chunk reader (
IncrementalSourceScanFetcher)consumes from a change-event queue that may also contain DML records from
other tables being captured concurrently. When such a foreign-table
record reached
isChangeRecordInChunkRange, the code compared it againstthe current chunk's PK range via
isRecordBetween(...). Two problems:extracting its PK throws NPE.
necessarily align with this chunk's bounds, so any range comparison is
meaningless and the record would be incorrectly merged into the wrong
chunk's output buffer.
This patch adds an explicit
TableIdcheck at the very start ofisChangeRecordInChunkRange: records whoseTableIddoes not matchcurrentSnapshotSplit.getTableId()are skipped before any PK-basedcomparison runs.
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)