From 9794cea535ff2165ba9d69c5b53310d32ab61f8a Mon Sep 17 00:00:00 2001 From: wudi Date: Wed, 15 Apr 2026 14:31:17 +0800 Subject: [PATCH] [fix](streaming-job) Fix NPE in StreamingInsertJob.replayOnCommitted during EditLog replay (#62416) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem Summary: Fix NPE in `StreamingInsertJob.replayOnCommitted()` during FE checkpoint EditLog replay. When a streaming insert task is canceled while its transaction is being committed, `beforeCommitted()` silently returned without setting `txnCommitAttachment`. The transaction still committed with a null attachment written to EditLog. During checkpoint replay, `replayOnCommitted()` calls `Preconditions.checkNotNull(txnState.getTxnCommitAttachment())`, throwing NPE. Two issues fixed in `beforeCommitted()`: 1. Throw `TransactionException` when task is canceled instead of silent `return`, preventing commit with null attachment. Consistent with `RoutineLoadJob.executeBeforeCheck()` pattern. 2. Fix write lock leak — replaced broken `shouldReleaseLock` (always `false`) with `passCheck` pattern: release lock on failure in `finally`, keep it for `onStreamTaskSuccess/Fail()` to release on success. (cherry picked from commit 8548d7b86c8c499a7e885ebcc7175ad3bf78e38b) --- .../insert/streaming/StreamingInsertJob.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 2562685261fc7e..0b397ec66e3365 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -981,13 +981,13 @@ public long getId() { @Override public void beforeCommitted(TransactionState txnState) throws TransactionException { - boolean shouldReleaseLock = false; writeLock(); + boolean passCheck = false; try { if (runningStreamTask.getIsCanceled().get()) { - log.info("streaming insert job {} task {} is canceled, skip beforeCommitted", - getJobId(), runningStreamTask.getTaskId()); - return; + throw new TransactionException("streaming insert job " + getJobId() + + " task " + runningStreamTask.getTaskId() + + " is canceled, txn " + txnState.getTransactionId() + " could not be committed"); } ArrayList taskIds = new ArrayList<>(); @@ -1008,8 +1008,9 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti loadStatistic.getFileNumber(), loadStatistic.getTotalFileSizeB(), runningStreamTask.getRunningOffset().toSerializedJson())); + passCheck = true; } finally { - if (shouldReleaseLock) { + if (!passCheck) { writeUnlock(); } }