From 00e26db5299e453b16924dc9deeda500319a6ca2 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Fri, 8 Nov 2024 14:28:58 +0800 Subject: [PATCH] fix deletion --- .../pipeconsensus/PipeConsensusAsyncConnector.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java index e0afc92af6b56..2ba6856012641 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java @@ -406,7 +406,16 @@ public void transfer(Event event) throws Exception { // Transfer deletion if (event instanceof PipeDeleteDataNodeEvent) { + PipeDeleteDataNodeEvent deleteDataNodeEvent = (PipeDeleteDataNodeEvent) event; + boolean enqueueResult = addEvent2Buffer(deleteDataNodeEvent); + if (!enqueueResult) { + throw new PipeRuntimeConnectorRetryTimesConfigurableException( + ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE); + } retryConnector.transfer(event); + // Since transfer method will throw an exception if transfer failed, removeEventFromBuffer + // will not be executed when transfer failed. + this.removeEventFromBuffer(deleteDataNodeEvent); return; }