[Improve](Streamingjob) support only snapshot sync for mysql and pg#61389
[Improve](Streamingjob) support only snapshot sync for mysql and pg#61389JNSimba wants to merge 1 commit intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
There was a problem hiding this comment.
Pull request overview
This PR adds a new offset=snapshot mode for JDBC-based StreamingJobs (MySQL/PostgreSQL) to perform a one-time full snapshot sync and then stop, instead of transitioning into continuous binlog/WAL consumption.
Changes:
- Add
snapshotas a supportedoffsetvalue, including FE config validation and BE (cdc_client) connector startup options. - Introduce
SourceOffsetProvider.hasReachedEnd()and FE job logic to mark a StreamingJob asFINISHEDwhen snapshot-only consumption completes. - Add regression tests for snapshot-only mode for MySQL and PostgreSQL.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.groovy | New regression test verifying snapshot-only behavior for PostgreSQL. |
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.groovy | New regression test verifying snapshot-only behavior for MySQL. |
| regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.out | Expected output for new PostgreSQL snapshot-only test. |
| regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.out | Expected output for new MySQL snapshot-only test. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java | Map offset=snapshot to StartupOptions.snapshot() for Postgres connector. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java | Enable snapshot split path for StartupMode.SNAPSHOT and set snapshot startup options. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java | Treat offset=snapshot like initial for split generation (snapshot path). |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | FE offset logic for snapshot-only completion + hasReachedEnd() implementation. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java | Add hasReachedEnd() default method. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java | Handle null offset returned by provider (snapshot-only completion signal). |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Mark job FINISHED when provider reports hasReachedEnd(). |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java | Allow offset=snapshot during FE source property validation. |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java | Add OFFSET_SNAPSHOT constant and update offset comment. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| if (key.equals(DataSourceConfigKeys.OFFSET) | ||
| && !(value.equals(DataSourceConfigKeys.OFFSET_INITIAL) | ||
| || value.equals(DataSourceConfigKeys.OFFSET_LATEST))) { | ||
| || value.equals(DataSourceConfigKeys.OFFSET_LATEST) | ||
| || value.equals(DataSourceConfigKeys.OFFSET_SNAPSHOT))) { | ||
| return false; |
| public static final String INCLUDE_TABLES = "include_tables"; | ||
| public static final String EXCLUDE_TABLES = "exclude_tables"; | ||
| // initial,earliest,latest,{binlog,postion},\d{13} | ||
| // initial,earliest,latest,snapshot,{binlog,postion},\d{13} |
| currentOffset = new JdbcOffset(); | ||
| if (!lastSnapshotSplits.isEmpty()) { | ||
| currentOffset.setSplits(lastSnapshotSplits); | ||
| } else { | ||
| // when snapshot to binlog phase fe restarts | ||
| BinlogSplit binlogSplit = new BinlogSplit(); | ||
| binlogSplit.setFinishedSplits(finishedSplits); | ||
| currentOffset.setSplits(Collections.singletonList(binlogSplit)); | ||
| if (!isSnapshotOnlyMode()) { | ||
| // initial mode: rebuild binlog split for snapshot-to-binlog transition | ||
| BinlogSplit binlogSplit = new BinlogSplit(); | ||
| binlogSplit.setFinishedSplits(finishedSplits); | ||
| currentOffset.setSplits(Collections.singletonList(binlogSplit)); | ||
| } | ||
| // snapshot-only: leave splits empty, hasReachedEnd() will return true |
| // snapshot-only mode: all splits done, signal job to stop | ||
| return null; |
| @@ -119,6 +124,10 @@ public void run() throws JobException { | |||
| log.info("streaming task has been canceled, task id is {}", getTaskId()); | |||
| return; | |||
| } | |||
| if (this.runningOffset == null) { | |||
| // offset is null when source has reached end (e.g. snapshot-only mode completed) | |||
| return; | |||
| } | |||
| Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task); | ||
| if (offsetProvider.hasReachedEnd()) { | ||
| // offset provider has reached a natural end, mark job as finished | ||
| log.info("Streaming insert job {} source data fully consumed, marking job as FINISHED", getJobId()); | ||
| updateJobStatus(JobStatus.FINISHED); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Code Review Summary
This PR adds a snapshot offset mode to StreamingJob so users can perform a one-time full table sync without establishing ongoing binlog/WAL replication. The design is sound conceptually — adding hasReachedEnd() to SourceOffsetProvider and checking it in onStreamTaskSuccess() is a clean extension. However, I found a critical crash-recovery bug and a lifecycle gap that need to be addressed.
Critical Checkpoints
1. Goal & correctness: The normal (non-crash) path works correctly — snapshot splits are consumed, hasReachedEnd() returns true, and the job transitions to FINISHED. Tests verify this happy path. However, the crash-recovery path has a critical bug (see below).
2. Modification scope: Focused and minimal. The changes are well-contained to the offset provider, task lifecycle, and CDC client readers.
3. Concurrency: The hasReachedEnd() check in onStreamTaskSuccess() runs under the write lock (acquired by commitOffset() upstream), so race conditions are not a concern on the normal path.
4. Lifecycle / crash recovery (BUG): In replayIfNeed() for snapshot-only mode when all splits are finished, currentOffset is created via new JdbcOffset() but splits are never set (lines 386-392). This leaves currentOffset.splits as null. Subsequently, hasReachedEnd() calls currentOffset.snapshotSplit() which invokes Preconditions.checkState(splits != null && !splits.isEmpty()) (JdbcOffset.java:98) — this throws IllegalStateException. The same crash will occur in hasMoreDataToConsume() at line 249. The crash-recovery path for a completed snapshot-only job will loop: PAUSED → PENDING → replay → create task → scheduler calls hasMoreDataToConsume() → crash → PAUSED → repeat.
5. Task lifecycle gap when runningOffset is null (BUG): When StreamingMultiTblTask.before() gets null from getNextOffset() and returns early, the execute() loop calls before() → run() → onSuccess(). But onSuccess() in StreamingMultiTblTask never calls onStreamTaskSuccess() (it just logs and returns false). The job completion signal only comes via the external commitOffset() → successCallback() → onStreamTaskSuccess() path, which never fires for a no-op task. The task finishes silently, the job stays RUNNING with a dead task, and eventually times out into PAUSED. This creates an infinite PAUSED→PENDING→RUNNING→timeout→PAUSED loop. This bug compounds with bug #4 above since the crash-recovery path is the one that creates these null-offset tasks.
6. Parallel code paths: N/A — the StreamingInsertTask (TVF path) doesn't use snapshot-only mode.
7. Incompatible changes: The new OFFSET_SNAPSHOT constant is additive. No serialization format changes.
8. Configuration items: No new configs added. The existing offset property validation is correctly updated.
9. Test coverage: Two regression tests are provided (MySQL and PostgreSQL) covering the happy path. Tests use order by in queries, use qt_ prefix for deterministic output, drop tables before use. The .out files claim to be auto-generated. However, there is no test for crash recovery (FE restart during or after snapshot-only completion). Missing negative test: what happens when creating a snapshot job with an empty source table?
10. Observability: Adequate logging at INFO level for the new code paths.
11. Performance: No concerns — the snapshot path reuses existing chunk-split infrastructure.
Summary of Issues
| # | Severity | Description |
|---|---|---|
| 1 | Critical | replayIfNeed() leaves currentOffset.splits null/unset for snapshot-only completed state, causing IllegalStateException in snapshotSplit() on crash recovery |
| 2 | High | Null-offset task from crash recovery completes silently without triggering onStreamTaskSuccess(), so the job never transitions to FINISHED — infinite PAUSED/PENDING loop |
| @@ -535,7 +545,21 @@ | |||
There was a problem hiding this comment.
Bug (Critical): currentOffset created with null/empty splits causes crash on recovery
When isSnapshotOnlyMode() is true and lastSnapshotSplits.isEmpty(), this code creates a JdbcOffset via new JdbcOffset() at line 382 but never calls setSplits(). The splits field remains null (default from @NoArgsConstructor).
Later, hasReachedEnd() calls currentOffset.snapshotSplit() which does:
Preconditions.checkState(splits != null && !splits.isEmpty(), "splits is null or empty");This throws IllegalStateException. Same crash path via hasMoreDataToConsume() → currentOffset.snapshotSplit().
Suggested fix: Either:
- Set a sentinel split list so
snapshotSplit()works, or - Modify
hasReachedEnd()to check for null/empty splits directly without callingsnapshotSplit(), or - In this branch, handle the snapshot-only completed state by transitioning the job to FINISHED immediately during replay (best approach).
| log.info("streaming multi task {} offset is null (snapshot-only completed), skip execution", taskId); | ||
| return; | ||
| } | ||
| log.info("streaming multi task {} get running offset: {}", taskId, runningOffset.toString()); |
There was a problem hiding this comment.
Bug (High): Early return from before() when offset is null leaves the job stuck
When getNextOffset() returns null (snapshot-only completed), this task returns early from before(), then run() also returns early, then onSuccess() returns false without calling onStreamTaskSuccess(). The StreamingMultiTblTask.onSuccess() never calls onStreamTaskSuccess() — it always returns false (line 243).
The job completion signal for multi-table tasks only comes via the external commitOffset() → successCallback() → onStreamTaskSuccess() callback path, which is never triggered for a no-op task (no data sent to BE, no offset committed).
Result: The task finishes silently, the job stays in RUNNING with a dead task reference, eventually times out to PAUSED, auto-resumes to PENDING, creates another null-offset task, and loops forever.
Suggested fix: In StreamingMultiTblTask.onSuccess(), when runningOffset == null, directly call streamingInsertJob.onStreamTaskSuccess(this) (after acquiring the write lock via the appropriate path). Alternatively, handle this in before() by checking offsetProvider.hasReachedEnd() and signaling the job directly.
| return isSnapshotOnlyMode() | ||
| && currentOffset != null | ||
| && currentOffset.snapshotSplit() | ||
| && remainingSplits.isEmpty(); |
There was a problem hiding this comment.
Note: hasReachedEnd() calls currentOffset.snapshotSplit() which requires non-null, non-empty splits (enforced by Preconditions.checkState in JdbcOffset.snapshotSplit()). After crash recovery for a completed snapshot-only job, currentOffset has null splits (see comment on replayIfNeed), so this method will throw IllegalStateException.
Consider either:
- Adding a null-safe split check here directly:
currentOffset.getSplits() != null && !currentOffset.getSplits().isEmpty() && currentOffset.snapshotSplit() - Or ensuring
replayIfNeedalways leavescurrentOffsetin a valid state with actual splits set.
What problem does this PR solve?
Background
StreamingJob currently supports two offset modes:
initial: full snapshot + continuous incremental replicationlatest: incremental replication only (no snapshot)There is no way to perform a one-time full sync and stop. This is
needed for data migration scenarios where only a point-in-time full
copy is required, without ongoing replication.
Usage
Set
offset=snapshotwhen creating a StreamingJob:The job will perform a full table snapshot and automatically transition
to FINISHED once all data is synced. No binlog/WAL subscription is
established.
Design
The implementation centers on a hasReachedEnd() signal in
SourceOffsetProvider:
when all snapshot splits are consumed in snapshot-only mode.
StreamingInsertJob.onStreamTaskSuccess() checks hasReachedEnd()
before creating the next task — if true, the job is marked FINISHED.
both MySQL and PostgreSQL connectors. The chunk-split path is reused
from initial mode.
auto-resumes via PAUSED→PENDING. getNextOffset() returns null,
the task no-ops safely, and onStreamTaskSuccess() re-checks
hasReachedEnd() to correctly transition to FINISHED.
Testing
Added regression tests for both MySQL and PostgreSQL:
Both tests verify:
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)