diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java index e0afc92af6b56..2ba6856012641 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java @@ -406,7 +406,16 @@ public void transfer(Event event) throws Exception { // Transfer deletion if (event instanceof PipeDeleteDataNodeEvent) { + PipeDeleteDataNodeEvent deleteDataNodeEvent = (PipeDeleteDataNodeEvent) event; + boolean enqueueResult = addEvent2Buffer(deleteDataNodeEvent); + if (!enqueueResult) { + throw new PipeRuntimeConnectorRetryTimesConfigurableException( + ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE); + } retryConnector.transfer(event); + // Since transfer method will throw an exception if transfer failed, removeEventFromBuffer + // will not be executed when transfer failed. + this.removeEventFromBuffer(deleteDataNodeEvent); return; }