diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index d7a325e22f541e..8f0caec4be3cd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -1021,13 +1021,13 @@ public long getId() { @Override public void beforeCommitted(TransactionState txnState) throws TransactionException { - boolean shouldReleaseLock = false; writeLock(); + boolean passCheck = false; try { if (runningStreamTask.getIsCanceled().get()) { - log.info("streaming insert job {} task {} is canceled, skip beforeCommitted", - getJobId(), runningStreamTask.getTaskId()); - return; + throw new TransactionException("streaming insert job " + getJobId() + + " task " + runningStreamTask.getTaskId() + + " is canceled, txn " + txnState.getTransactionId() + " could not be committed"); } ArrayList taskIds = new ArrayList<>(); @@ -1046,7 +1046,6 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti runningStreamTask.getTaskId(), runningStreamTask.getScanBackendIds()); - if (StringUtils.isBlank(offsetJson)) { throw new TransactionException("Cannot find offset for attachment, load job id is " + runningStreamTask.getTaskId()); @@ -1059,8 +1058,9 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti loadStatistic.getFileNumber(), loadStatistic.getTotalFileSizeB(), offsetJson)); + passCheck = true; } finally { - if (shouldReleaseLock) { + if (!passCheck) { writeUnlock(); } }