From 17045b99898fe80e4dede554ec0cc4e137859454 Mon Sep 17 00:00:00 2001 From: wudi Date: Wed, 15 Apr 2026 14:31:17 +0800 Subject: [PATCH] [fix](streaming-job) Fix NPE in StreamingInsertJob.replayOnCommitted during EditLog replay (#62416) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? Problem Summary: Fix NPE in `StreamingInsertJob.replayOnCommitted()` during FE checkpoint EditLog replay. When a streaming insert task is canceled while its transaction is being committed, `beforeCommitted()` silently returned without setting `txnCommitAttachment`. The transaction still committed with a null attachment written to EditLog. During checkpoint replay, `replayOnCommitted()` calls `Preconditions.checkNotNull(txnState.getTxnCommitAttachment())`, throwing NPE. Two issues fixed in `beforeCommitted()`: 1. Throw `TransactionException` when task is canceled instead of silent `return`, preventing commit with null attachment. Consistent with `RoutineLoadJob.executeBeforeCheck()` pattern. 2. Fix write lock leak — replaced broken `shouldReleaseLock` (always `false`) with `passCheck` pattern: release lock on failure in `finally`, keep it for `onStreamTaskSuccess/Fail()` to release on success. --- .../insert/streaming/StreamingInsertJob.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index d7a325e22f541e..8f0caec4be3cd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -1021,13 +1021,13 @@ public long getId() { @Override public void beforeCommitted(TransactionState txnState) throws TransactionException { - boolean shouldReleaseLock = false; writeLock(); + boolean passCheck = false; try { if (runningStreamTask.getIsCanceled().get()) { - log.info("streaming insert job {} task {} is canceled, skip beforeCommitted", - getJobId(), runningStreamTask.getTaskId()); - return; + throw new TransactionException("streaming insert job " + getJobId() + + " task " + runningStreamTask.getTaskId() + + " is canceled, txn " + txnState.getTransactionId() + " could not be committed"); } ArrayList taskIds = new ArrayList<>(); @@ -1046,7 +1046,6 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti runningStreamTask.getTaskId(), runningStreamTask.getScanBackendIds()); - if (StringUtils.isBlank(offsetJson)) { throw new TransactionException("Cannot find offset for attachment, load job id is " + runningStreamTask.getTaskId()); @@ -1059,8 +1058,9 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti loadStatistic.getFileNumber(), loadStatistic.getTotalFileSizeB(), offsetJson)); + passCheck = true; } finally { - if (shouldReleaseLock) { + if (!passCheck) { writeUnlock(); } }