[improve](streaming-job) async chunk splitting for cdc source job#63079
[improve](streaming-job) async chunk splitting for cdc source job#63079JNSimba wants to merge 23 commits into
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
This PR moves StreamingInsertJob (CDC FROM-TO and cdc_stream TVF) snapshot chunk splitting from a synchronous CREATE STREAMING JOB path to an incremental, scheduler-tick-driven flow. The goal is to avoid long blocking CREATE times and BRPC timeouts on large / skewed PK tables by fetching snapshot splits in small batches and persisting progress for recovery.
Changes:
- Adds split-progress APIs to
SourceOffsetProviderand implements an async split state machine inJdbcSourceOffsetProvider(plus new FE tests). - Introduces
FetchTableSplitsRequestfields to drive stateless, resumable split generation (nextSplitStart/nextSplitId/batchSize) and rebuilds cdc_client split fetching around flink-cdcChunkSplitter. - Persists per-table chunk lists incrementally via
StreamingJobUtils.upsertChunkList, and advances splits each scheduler tick (including a pre-advance in PENDING).
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java | Reworks /api/fetchSplits handling to drive flink-cdc ChunkSplitter directly (stateless batch split generation). |
| fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/SplitProgressTest.java | Unit tests for SplitProgress default state and deep-copy semantics. |
| fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProviderAsyncSplitTest.java | Unit tests covering async split advancement, dedup, noMoreSplits, and committed-progress advancement. |
| fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java | Adds per-table chunk_list UPSERT support with id reuse / allocation. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java | Adds default split-progress hooks (initSplitProgress, advanceSplits, noMoreSplits). |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java | Removes create-time pre-splitting; re-init split progress on replay; relies on scheduler-driven split fetching. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | Implements async split progress, scheduler-driven split fetching, persistence to system table, and restart replay logic. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java | Calls advanceSplitsIfNeed() each tick and pre-advances once in PENDING before dispatch. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Persists syncTables, initializes split progress on CREATE, and adds advanceSplitsIfNeed() that pauses job on failure. |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java | Adds nextSplitStart, nextSplitId, and batchSize fields to support resumable batched split fetching. |
Comments suppressed due to low confidence (1)
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java:320
- replayIfNeed() comment still says snapshot splits in the meta table are "written by initOnCreate", but initOnCreate() is now an intentional no-op and meta writes come from scheduler-driven advanceSplits()/upsertChunkList. Updating this comment will avoid confusion when debugging TVF recovery behavior.
// Re-init transient split progress fields lost across FE restart.
// syncTables itself is persisted on StreamingInsertJob; cdcSplitProgress is rebuilt empty
// here and advanceSplits will resume from the system table on next tick.
if (cdcSplitProgress == null) {
initSplitProgress(job.getSyncTables());
}
if (currentOffset == null) {
// No committed txn yet. If snapshot splits exist in the meta table (written by
// initOnCreate), restore remainingSplits so getNextOffset() returns snapshot splits
// instead of a BinlogSplit (which would incorrectly skip the snapshot phase).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
/review |
There was a problem hiding this comment.
I found one additional blocking issue beyond the existing review threads.
Critical checkpoint conclusions:
- Goal/test coverage: the PR moves CDC chunk splitting from CREATE-time blocking to scheduler-driven async fetching and adds focused unit tests, but the TVF FE-restart path is not covered and can stop fetching the remaining chunks.
- Scope: the change is focused on async split progress, though it touches both FROM-TO and TVF paths.
- Concurrency: existing threads already cover the broad splitsLock/RPC concern; I did not add a duplicate. The new issue is lifecycle/replay state, not a new lock-order finding.
- Lifecycle/replay: blocking issue found for cdc_stream TVF restart because transient cachedSyncTables is not restored.
- Configuration/compatibility: no new config or wire-incompatible persistent format issue found beyond the added optional request fields.
- Parallel paths: FROM-TO replay initializes split progress, but the TVF override does not do the equivalent unconditionally.
- Testing: unit tests cover the base provider state machine, but miss TVF replay after FE restart. No end-to-end regression result is included in this PR.
- Observability/performance/transactions: no additional distinct issue found beyond existing review threads.
- User focus: no additional user-provided review focus was supplied.
|
run buildall |
|
/review |
There was a problem hiding this comment.
Reviewed the full PR diff and existing review threads. I found two additional correctness issues that are not duplicates of the existing comments.
Critical checkpoint conclusions:
- Goal/test: The PR moves CDC snapshot split generation to scheduler-driven batches and adds tests for the base async state machine, but the cdc_stream TVF path still has an untested concurrent update path and the split-key type cache lacks collision coverage.
- Scope/focus: The implementation is mostly focused, but it now routes both non-TVF and TVF CDC through shared async splitting state, which exposes subclass state-management gaps.
- Concurrency: Blocking issue found.
JdbcTvfSourceOffsetProvider.updateOffset()mutates inherited split lists/maps withoutsplitsLockwhile scheduler-drivenadvanceSplits()and inherited readers use that lock. - Lifecycle/restart: Existing review threads already cover TVF restart/cache restoration concerns; I did not duplicate them.
- Config: New CDC RPC/batch configs are present with descriptions and are read dynamically.
- Compatibility/protocol: FE now sends
batchSizeand resume fields; existing threads already cover composite-boundary contract concerns. - Parallel paths: MySQL and generic JDBC split fetching were both updated; the cache-key collision issue affects the generic JDBC path and should be considered for MySQL if chunk key/database identity can vary.
- Tests: Added tests cover base provider state, but no TVF concurrency/restart continuation test or cache-collision test covers the issues below.
- Observability: Logs were added around split fetch and progress; no additional blocker found.
- Persistence/transactions: Snapshot progress is persisted through the meta table/EditLog or txn attachment depending on path; existing unresolved threads cover restart truncation risks.
- Performance: Existing review threads already cover long lock holds and repeated sorting; I did not duplicate them.
User focus: no additional user-provided review focus was supplied.
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
… fix unevenly-cut resume
|
/review |
There was a problem hiding this comment.
Automated review result: request changes. I did not add new inline comments because the blocking findings I confirmed are already covered by existing review threads and should not be duplicated.
Critical checkpoint conclusions:
- Goal/test proof: The PR aims to move CDC snapshot split generation to scheduler-driven batches and add resume/restart coverage. Tests were added, but current code still has unresolved correctness and concurrency concerns already raised in inline threads, including snapshot truncation/races and split-fetch batching behavior.
- Scope/focus: The feature is focused, but the change is broad across FE scheduling, persisted split metadata, CDC client split generation, and TVF recovery; several edge paths still need tightening before merge.
- Concurrency: Not OK. Existing threads cover split-state access/locking issues and long blocking RPC/internal SQL work while holding splitsLock. These can delay task commits/scheduling or race with TVF updateOffset paths.
- Lifecycle/restart: Not OK. Existing TVF restart comments cover mid-snapshot replay/resume hazards where transient split progress/table cache reconstruction can truncate snapshot progress after restored chunks are consumed.
- Configuration: A new dynamic FE batch-size config is read at split-fetch time, which is appropriate, but existing comments note the batch contract/enforcement is still incomplete in CDC client paths.
- Compatibility/storage format: No new external storage format incompatibility found beyond the new persisted split-progress fields, but replay behavior must be correct across FE restart before this is safe.
- Parallel paths: Not fully OK. MySQL, generic JDBC/Postgres, non-TVF, and cdc_stream TVF paths all need consistent batch limits, startup-mode handling, and resume semantics; existing threads identify gaps.
- Conditional checks/error handling: Some existing concerns remain around splitId parsing, composite split boundaries, SQL literal construction, and error clarity.
- Test coverage/results: Regression and unit tests were added, but coverage does not yet prove the unresolved edge cases from existing threads are fixed; no tests were run by this review.
- Observability: Logging was added for split fetch/progress, sufficient for basic diagnosis, but correctness blockers remain.
- Transaction/persistence/data correctness: Not OK until the existing replay, upsert, and split-progress concerns are resolved; these can affect snapshot completeness/duplicates after restart or retries.
- FE/BE variable passing: The new batchSize field is passed from FE now, but existing comments show downstream enforcement still needs work.
- Performance: Existing comments cover heavy work under splitsLock and repeated full-list sorting/materialization; these are relevant for large split counts.
User focus: No additional user-provided review focus was supplied.
Existing inline review threads should be addressed rather than duplicated here, especially the split-state concurrency/locking, TVF restart recovery, SQL upsert safety, batch-size enforcement, split-key/composite-boundary handling, and split metadata determinism issues.
…plits and sink resolveSplitKeyClass to PG reader
There was a problem hiding this comment.
I reviewed the current head against the existing review threads and did not add duplicate inline comments. I cannot approve this revision yet because the already-open review threads still cover blocking correctness/reliability issues in the async CDC split path, including restart/resume cursor reconstruction, empty/invalid split-batch handling, durable split-meta consistency, unsafe meta-table SQL/upsert semantics, batch-size enforcement, and regression-test races.
Checkpoint conclusions:
- Goal/test proof: the PR aims to make CDC snapshot split generation asynchronous and adds unit/regression coverage, but the existing unresolved threads show important restart, resume, and batching cases are not yet proven safe.
- Scope/clarity: the feature is focused, but it changes FE scheduling, durable meta, cdc_client split generation, and TVF replay paths together, increasing integration risk.
- Concurrency: split state is now lock-protected in many FE paths, but existing threads identify remaining scheduler/commit/replay races and long-running state transitions that need resolution.
- Lifecycle/restart: FE restart and checkpoint replay are central to this change; existing comments identify cases where restored split progress can truncate or duplicate snapshots.
- Configuration: the new mutable split batch-size config still needs positive-value validation to avoid wedging split advancement.
- Compatibility/storage: no storage-format compatibility issue was found, but the new internal meta-table persistence/upsert semantics remain a correctness concern.
- Parallel code paths: both multi-table JDBC and cdc_stream TVF paths were reviewed; existing comments already cover distinct TVF and non-TVF recovery gaps.
- Conditional checks/error handling: several new checks exist, but existing comments call out empty fetch results and malformed/invalid state needing fail-fast or bounded retry behavior.
- Test coverage/results: coverage is broad, but existing test-race comments and missing targeted restart/error tests remain blockers.
- Observability/performance: logs were added, but existing comments still cover scheduler overhead/lock and retry-observability concerns.
- Data correctness: unresolved issues can lead to duplicate, skipped, or truncated snapshot chunks, so the core data-correctness checkpoint is not satisfied yet.
User focus: no additional user-provided review focus was specified.
TPC-H: Total hot run time: 29397 ms |
TPC-DS: Total hot run time: 169833 ms |
FE Regression Coverage ReportIncrement line coverage |
…F sourceProperties is restored
|
/review |
|
run buildall |
…start regression case
|
run buildall |
There was a problem hiding this comment.
I cannot approve this PR yet. The current head still needs the existing blocking async CDC split review threads resolved, especially the snapshot replay/transition cases, non-progressing empty split batches, batch-size/config validation, and meta-table persistence/escaping/id-allocation concerns. I did not add duplicate inline comments because those issues are already present in the existing review context.
Critical checkpoint conclusions:
- Goal/test proof: the goal is scheduler-driven async CDC snapshot split generation and recovery. The added tests cover several paths, but existing threads identify restart and scheduler interleaving cases that are not yet safely proven.
- Scope/focus: the change is broad across FE scheduling, offset replay, cdc_client split generation, internal meta persistence, and regression tests; it is not a purely local change.
- Concurrency: split state is now lock-protected in several places and long RPC/UPSERT work was moved outside the lock, but existing scheduler/task success interleavings and empty-batch progress behavior remain blocking concerns.
- Lifecycle/replay: FE restart replay is the highest-risk part; existing comments describe cases where restored binlog/snapshot/meta state can re-cut, truncate, or duplicate snapshot work.
- Configuration: the new mutable split batch size is still used as an RPC batch size and needs positive-value validation/clamping per the existing thread.
- Compatibility: no storage-format compatibility issue was found, but FE/cdc_client request semantics around split boundaries and startup modes must stay consistent with existing comments.
- Parallel paths: both normal JDBC and TVF paths were modified; existing comments already cover distinct replay/locking gaps across both.
- Conditional checks: several new conditions are intended to distinguish snapshot vs binlog and complete vs mid-split states; the existing review threads show remaining edge cases requiring clearer handling.
- Test coverage/results: tests were added, but existing comments call out races/coverage gaps around sampler assertions and restart timing.
- Observability: logs were added around split progress; no additional observability-only blocker found.
- Transaction/persistence: internal meta-table writes are part of durable split recovery; existing comments on ignored/unsafe meta persistence and SQL construction remain important.
- Data correctness: unresolved issues can duplicate, skip, or truncate snapshot chunks after scheduler ticks or FE restart, so data correctness is not yet established.
- Performance: long lock hold was improved, but existing comments still note batch-size enforcement and repeated full-list processing concerns.
User focus: no additional user-provided review focus was supplied.
TPC-H: Total hot run time: 30810 ms |
TPC-DS: Total hot run time: 170989 ms |
TPC-H: Total hot run time: 31292 ms |
TPC-DS: Total hot run time: 168963 ms |
FE Regression Coverage ReportIncrement line coverage |
1 similar comment
FE Regression Coverage ReportIncrement line coverage |
|
/review |
There was a problem hiding this comment.
I found one additional blocking issue beyond the existing review threads: async split resume casts the persisted split boundary with Jackson convertValue directly, which does not preserve the temporal conversion behavior already used when reconstructing snapshot splits for execution.
Critical checkpoint conclusions: the async split goal is mostly covered by the new scheduler-driven design and tests, but temporal split-key resume is not proven and can fail after the first batch; the change is focused but still touches high-risk split lifecycle paths; concurrency issues around split state already have existing review threads and I did not duplicate them; lifecycle/replay paths were reviewed, with this new issue affecting mid-table resume after FE/CDC JSON round-trip; the new mutable batch-size config is covered by an existing review thread; no storage-format compatibility issue was found in this pass; parallel MySQL and generic JDBC paths both need the same temporal-bound fix; regression coverage is broad but does not cover temporal chunk-key async resume; observability is adequate for this issue because the failure would surface as fetchSplits failure but the root cause would be hard to diagnose from tests; no BE transaction/version/delete-bitmap concerns apply; no additional user focus was provided.
…ounds to keep temporal types intact
|
/review |
|
run buildall |
There was a problem hiding this comment.
Summary: Requesting changes for one additional correctness issue found in the current head. The existing review threads already cover the other major async split concerns, including temporal boundary conversion, TVF replay cursor restoration, invalid batch size, long lock holds, meta-table SQL safety, and duplicate snapshot reuse after a committed snapshot offset.
Critical checkpoint conclusions:
- Goal/test coverage: The PR aims to make CDC snapshot splitting asynchronous and adds regression/unit coverage, but the empty-first-batch path can still start binlog consumption before snapshot splits exist, so the goal is not fully met.
- Scope/focus: The change is focused on async CDC split state, but the new scheduler/provider interaction needs one more guard.
- Concurrency: The PR improves several split-list locks; no new distinct lock-order issue was found beyond existing review threads.
- Lifecycle/restart: Restart handling is heavily touched; no additional restart issue beyond the existing threads and the empty-first-batch dispatch issue was identified.
- Configuration: The dynamic batch-size validation issue is already covered by an existing thread; no additional config issue found.
- Compatibility/protocol: No additional FE-BE protocol compatibility issue found beyond existing split boundary contract threads.
- Parallel paths: MySQL/JDBC/TVF paths were checked; no additional uncopied path issue found beyond existing threads.
- Tests/results: Tests were reviewed at code level; no additional test-only blocker beyond existing sampler-race threads.
- Observability/performance: Existing threads already cover the main performance/observability risks; no additional blocking issue found.
- Data correctness: The new issue can skip the initial snapshot and lose/duplicate CDC data when the first async split fetch returns no splits.
User focus: No additional user-provided review focus was supplied.
| } else { | ||
| // only binlog | ||
| return currentOffset == null | ||
| ? new JdbcOffset(Collections.singletonList(new BinlogSplit())) : currentOffset; |
There was a problem hiding this comment.
This fallback can still start a binlog task before any snapshot split has been produced. In the first PENDING scheduler tick for an initial/snapshot job, advanceSplitsIfNeed() may call advanceSplits(), which sets cdcSplitProgress.currentSplittingTable and then returns if /api/fetchSplits yields an empty batch. handlePendingState() still creates the first streaming task, and because currentOffset == null and remainingSplits is empty, this branch returns an empty BinlogSplit. That task can then read from the binlog without the initial snapshot ever being scheduled. This is distinct from the already-raised post-commit reuse issue: it happens before the first snapshot task exists. Please make snapshot modes return a non-consumable/defer state (or have the scheduler skip task creation) while checkNeedSplitChunks(sourceProperties) is true, remainingSplits is empty, and noMoreSplits() is false.
TPC-H: Total hot run time: 31183 ms |
TPC-DS: Total hot run time: 169024 ms |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
run external |
FE Regression Coverage ReportIncrement line coverage |
Summary
StreamingInsertJob(CDC FROM-TO andcdc_streamTVF paths) used to callsplitChunks()synchronously insideCREATE STREAMING JOB, asking cdc_client to cut every chunk of every table before returning. On large/non-uniform PK tables this can take 30+ minutes — far beyond the BE→cdc_client BRPC 60s timeout, and the SQL client blocks the whole time.This PR makes splitting tick-driven by the FE scheduler:
CREATEreturns immediately; no more synchronoussplitChunks().advanceSplits()issues one short fetchSplits RPC (defaultbatchSize=100) and pushes that batch intoremainingSplits. Tasks dispatch as soon as the first batch lands, so end-to-end first-byte latency stays close to flink-cdc's.ChunkSplitterfrom the(currentSplittingTable, nextSplitStart, nextSplitId)triple supplied by FE; flink-cdc internals are untouched (uses the publicChunkSplitterAPI only).committedSplitProgress(3-fieldSplitProgress) + existingchunkHighWatermarkMap/binlogOffsetPersiststreaming_job_metasystem table holds fullchunk_listJSON per table (UPSERT eachadvanceSplits)SourceOffsetProvider#initSplitProgress/noMoreSplits/advanceSplitsinterface;StreamingJobSchedulerTask.handlePendingStatepre-advances one batch so the first task doesn't wait a fullmax_interval.Detailed design lives in the linked plan.
Changes
fe-common:FetchTableSplitsRequestaddsnextSplitStart(Object[]) /nextSplitId/batchSize.fe-core:SourceOffsetProvideradds 3 default methods:initSplitProgress/advanceSplits/noMoreSplits.JdbcSourceOffsetProviderimplements the async state machine (committed/cdcSplitProgress,advanceSplits, dedup, system-table UPSERT, replay path).JdbcTvfSourceOffsetProvider.initOnCreateno longer pre-splits; relies on the same scheduler tick path.StreamingInsertJobcarriessyncTables(@SerializedName("st"));initSourceJob/initInsertJobinitializeSplitProgress;advanceSplitsIfNeed()mirrorsfetchMetaerror handling (PAUSE on failure).StreamingJobSchedulerTask.handlePendingState/handleRunningStatecalladvanceSplitsIfNeed()each tick; PENDING handler pre-advances and short-circuits if PAUSED.StreamingJobUtils.upsertChunkListcovers id-allocation viaMAX(id)+1lookup.cdc_client/JdbcIncrementalSourceReader:getSourceSplits()rebuilt around the publicChunkSplitterAPI (no more in-memory loop / reflection hack).Tests
SplitProgressTest— copy/null-field semantics.JdbcSourceOffsetProviderAsyncSplitTest— coversadvanceSplits(first call / continue same table / cross-table switch / dedup / empty batch),noMoreSplits,updateOffsetcommitted-progress advancement (mid-chunk vs last chunk vs replay missing-split path), andcomputeCdcRemainingTables.test_streaming_postgres_job_async_split.groovy— 100 rows ×snapshot_split_size=5→ 20 splits across multiple ticks; asserts CREATE returns < 30s, full snapshot count + DISTINCT id, then INSERT/UPDATE/DELETE in binlog phase.Test plan
mvn test -pl fe/fe-core -Dtest=JdbcSourceOffsetProviderAsyncSplitTest,SplitProgressTesttest_streaming_postgres_job_async_splitregression locallyCREATEreturns in seconds,SHOW STREAMING JOBimmediately reflects the new job, snapshot completes, binlog phase healthycdc_streamTVF + StreamingInsertJob path: confirm CREATE no longer blocks