diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 2562685261fc7e..0b397ec66e3365 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -981,13 +981,13 @@ public long getId() { @Override public void beforeCommitted(TransactionState txnState) throws TransactionException { - boolean shouldReleaseLock = false; writeLock(); + boolean passCheck = false; try { if (runningStreamTask.getIsCanceled().get()) { - log.info("streaming insert job {} task {} is canceled, skip beforeCommitted", - getJobId(), runningStreamTask.getTaskId()); - return; + throw new TransactionException("streaming insert job " + getJobId() + + " task " + runningStreamTask.getTaskId() + + " is canceled, txn " + txnState.getTransactionId() + " could not be committed"); } ArrayList taskIds = new ArrayList<>(); @@ -1008,8 +1008,9 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti loadStatistic.getFileNumber(), loadStatistic.getTotalFileSizeB(), runningStreamTask.getRunningOffset().toSerializedJson())); + passCheck = true; } finally { - if (shouldReleaseLock) { + if (!passCheck) { writeUnlock(); } }