[fix](streaming-job) keep isCanceled set when cancel runs on terminal task#63427
[fix](streaming-job) keep isCanceled set when cancel runs on terminal task#63427JNSimba wants to merge 2 commits into
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
/review |
There was a problem hiding this comment.
Pull request overview
Fixes a streaming insert (CDC / JdbcSourceOffsetProvider) edge case where a late BE commitOffset callback could overwrite a timed-out/canceled task’s terminal state, clear the job’s failureReason, and leave the job permanently stuck in PAUSED (auto-resume never triggers).
Changes:
- Update
AbstractStreamingTask.cancel()to always flipisCanceledeven if the task is already in a terminal state. - Add an
isCanceledguard inStreamingInsertJob.commitOffset()to drop late commit callbacks for canceled multi-table tasks before any side effects occur. - Add unit tests covering terminal-state cancel semantics, idempotency, and
commitOffsetlate-callback suppression.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java | Ensures cancel() reliably publishes cancellation via isCanceled, even for terminal tasks. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Drops late commitOffset callbacks when the current multi-table task is already canceled. |
| fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobLateCallbackTest.java | Adds unit coverage for terminal-task cancel behavior and late-callback suppression. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
run buildall |
TPC-H: Total hot run time: 31561 ms |
|
run cloud_p0 |
TPC-DS: Total hot run time: 169981 ms |
|
run fe_ut |
|
run cloud_p0 |
|
run p0 |
|
run feut |
|
run external |
1 similar comment
|
run external |
FE Regression Coverage ReportIncrement line coverage |
1 similar comment
FE Regression Coverage ReportIncrement line coverage |
|
/review |
TPC-H: Total hot run time: 31818 ms |
TPC-DS: Total hot run time: 171240 ms |
FE Regression Coverage ReportIncrement line coverage |
There was a problem hiding this comment.
I found a blocking data-correctness issue in the new late-callback handling.
Critical checkpoints:
- Goal and tests: The PR aims to keep failed/canceled streaming multi-table tasks from being turned back into successful tasks by late BE commit callbacks. The unit test covers the skipped-callback shape, but it does not cover the case where the BE write has already succeeded and only the FE callback is late.
- Scope/minimality: The code change is small, but the canceled-task guard is broader than the intended failed-terminal case.
- Concurrency/lifecycle: This path is inherently concurrent with user pause/stop, timeout failure handling, BE stream-load completion, and callback delivery. The job write lock serializes FE state mutation, but it does not cancel or roll back the already-issued BE multi-table write.
- Compatibility/config/static lifecycle: No new configs, persistence format changes, static lifecycle concerns, or mixed-version protocol changes found.
- Parallel paths: Single-table transactional streaming uses txn callbacks; this issue is specific to multi-table commitOffset, where offset persistence is separated from BE writes.
- Tests: The added unit test verifies that a canceled failed task does not become SUCCESS, but it misses the data-consistency scenario where a canceled/timed-out task already loaded rows and still needs its offset committed or otherwise compensated.
- Observability: Existing logs are sufficient to see skipped callbacks, but logging does not prevent offset/data divergence.
- Transaction/persistence/data writes: Blocking issue found: target data can be loaded while source offset persistence is skipped, causing reprocessing/duplicates after resume.
- User focus: No additional user-provided review focus.
Please address the inline issue before approval.
What problem does this PR solve?
Issue Number: N/A
Related PR: N/A
Problem Summary:
Streaming insert job (CDC source / JdbcSourceOffsetProvider) can become permanently stuck in PAUSED when a BE-side commit arrives after FE-side task timeout. Symptoms observed in production:
auto resumenever recovers the job; only manualRESUME JOBworks.Root cause
processTimeoutTasksdetects task timeout and callsrunningMultiTask.onFail("task failed cause timeout").AbstractStreamingTask.onFailsets task status to FAILED.StreamingInsertJob.onStreamTaskFailsetsfailureReasonand callsupdateJobStatus(PAUSED), which in turn invokesclearRunningStreamTask→task.cancel(true).AbstractStreamingTask.cancel()short-circuits on terminal status: it returns immediately when status is already FAILED/SUCCESS/CANCELED, soisCanceledis never flipped totrue.StreamingInsertJob.commitOffset. The currentrunningStreamTask != null && instanceof StreamingMultiTblTask + taskId matchchecks all pass, and downstream defenses insuccessCallback/beforeCommittedalso gate ongetIsCanceled().get(), which is stillfalse.successCallbacktherefore overrides task status back to SUCCESS, callsonStreamTaskSuccess→resetFailureInfo(null), clearingfailureReason.StreamingJobSchedulerTask.autoResumeHandlerreturns early wheneverfailureReason == null, so the PAUSED job is never resumed.The bug is essentially:
cancel()is supposed to be the single source of truth that says "this task instance is dead, do not accept further callbacks", but its terminal short-circuit prevents the signal from being broadcast throughisCanceled, leaving every other defense in the streaming task path silently bypassed.Fix
AbstractStreamingTask.cancel(): always flipisCanceledon entry, even when the task is already in a terminal state. This restores the contract that 10+ existinggetIsCanceled().get()checks across the streaming task path rely on (e.g.successCallback,beforeCommitted, internal abort points inStreamingInsertTask/StreamingMultiTblTask).StreamingInsertJob.commitOffset(): add anisCanceledguard right after theinstanceof StreamingMultiTblTaskcheck so the late callback is dropped (logged at INFO) before any side effects (updateNoTxnJobStatisticAndOffset,onTaskCommitted,persistOffsetProviderIfNeed) run.Release note
Fix streaming insert job stuck in PAUSED when a late BE commit callback arrives after FE-side task timeout.
Check List (For Author)
New unit tests in
StreamingInsertJobLateCallbackTest:cancel()flipsisCanceledon a terminal-state task (FAILED / SUCCESS) without overriding the existing status.cancel()transitions a RUNNING task to CANCELED correctly.cancel()is idempotent — the second invocation early-returns and leaves task state untouched.commitOffset()silently skips when the running task is already canceled (status preserved, no successCallback side effects).Regression coverage relies on existing CDC pause/resume suites under
regression-test/suites/job_p0/streaming_job/cdc/to guard the normal happy path. The exact "BE late callback after FE timeout" timing cannot be reliably reproduced in the existing non-nonConcurrentCDC tests without adding debug points tocommitOffset.Behavior changed:
commitOffsetarriving after a streaming task has been canceled is now dropped (logged at INFO) instead of being allowed to mutate task / job state and clearfailureReason. Side effect: on non-unique-key target tables, auto-resume may now produce a small number of duplicate rows from re-running the same input range, in exchange for the job no longer being permanently stuck in PAUSED.Does this need documentation?