diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java index d45e924620f04..8a1b1de07b67e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java @@ -461,7 +461,7 @@ public void testExtractorTimeRangeMatch() throws Exception { TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); }; - boolean insertResult = true; + boolean insertResult; try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java index 009d966681261..13842b9b13684 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java @@ -19,66 +19,22 @@ package org.apache.iotdb.db.pipe.connector.protocol.airgap; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant; import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBAirGapConnector; -import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.UnknownHostException; import java.util.HashMap; -import java.util.Set; -import java.util.stream.Collectors; @TreeModel @TableModel public abstract class IoTDBDataNodeAirGapConnector extends IoTDBAirGapConnector { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeAirGapConnector.class); - - @Override - public void validate(final PipeParameterValidator validator) throws Exception { - super.validate(validator); - - final PipeConfig pipeConfig = PipeConfig.getInstance(); - final Set givenNodeUrls = parseNodeUrls(validator.getParameters()); - - validator.validate( - empty -> { - try { - // Ensure the sink doesn't point to the air gap receiver on DataNode itself - return !(pipeConfig.getPipeAirGapReceiverEnabled() - && NodeUrlUtils.containsLocalAddress( - givenNodeUrls.stream() - .filter( - tEndPoint -> - tEndPoint.getPort() == pipeConfig.getPipeAirGapReceiverPort()) - .map(TEndPoint::getIp) - .collect(Collectors.toList()))); - } catch (final UnknownHostException e) { - LOGGER.warn("Unknown host when checking pipe sink IP.", e); - return false; - } - }, - String.format( - "One of the endpoints %s of the receivers is pointing back to the air gap receiver %s on sender itself, or unknown host when checking pipe sink IP.", - givenNodeUrls, - new TEndPoint( - IoTDBDescriptor.getInstance().getConfig().getRpcAddress(), - pipeConfig.getPipeAirGapReceiverPort()))); - } - @Override protected boolean mayNeedHandshakeWhenFail() { return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java index 0327b6651076d..35cbd39149bd3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java @@ -28,7 +28,6 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient; -import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData; @@ -64,14 +63,12 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; @@ -139,24 +136,6 @@ public void validate(final PipeParameterValidator validator) throws Exception { parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY), parameters.hasAttribute(SINK_IOTDB_IP_KEY), parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) - .validate( - empty -> { - try { - // Ensure the sink doesn't point to the legacy receiver on DataNode itself - return !NodeUrlUtils.containsLocalAddress( - givenNodeUrls.stream() - .filter(tEndPoint -> tEndPoint.getPort() == ioTDBConfig.getRpcPort()) - .map(TEndPoint::getIp) - .collect(Collectors.toList())); - } catch (final UnknownHostException e) { - LOGGER.warn("Unknown host when checking pipe sink IP.", e); - return false; - } - }, - String.format( - "One of the endpoints %s of the receivers is pointing back to the legacy receiver %s on sender itself, or unknown host when checking pipe sink IP.", - givenNodeUrls, - new TEndPoint(ioTDBConfig.getRpcAddress(), ioTDBConfig.getRpcPort()))) .validate( args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean) args[2]), String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index dbd96fd7d1b2c..020c2b127b5af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -22,58 +22,20 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClientManager; import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBSslSyncConnector; -import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeSyncClientManager; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.UnknownHostException; import java.util.List; import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; @TreeModel @TableModel public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeSyncConnector.class); - protected IoTDBDataNodeSyncClientManager clientManager; - @Override - public void validate(final PipeParameterValidator validator) throws Exception { - super.validate(validator); - - final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig(); - final Set givenNodeUrls = parseNodeUrls(validator.getParameters()); - - validator.validate( - empty -> { - try { - // Ensure the sink doesn't point to the thrift receiver on DataNode itself - return !NodeUrlUtils.containsLocalAddress( - givenNodeUrls.stream() - .filter(tEndPoint -> tEndPoint.getPort() == iotdbConfig.getRpcPort()) - .map(TEndPoint::getIp) - .collect(Collectors.toList())); - } catch (final UnknownHostException e) { - LOGGER.warn("Unknown host when checking pipe sink IP.", e); - return false; - } - }, - String.format( - "One of the endpoints %s of the receivers is pointing back to the thrift receiver %s on sender itself, " - + "or unknown host when checking pipe sink IP.", - givenNodeUrls, new TEndPoint(iotdbConfig.getRpcAddress(), iotdbConfig.getRpcPort()))); - } - @Override protected IoTDBSyncClientManager constructClient( final List nodeUrls,