From 96d321c35da2b2062916a28372f3f688ebca6bfa Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 23 Jul 2025 14:47:57 +0800 Subject: [PATCH 1/2] remove-useless --- .../manual/basic/IoTDBPipeExtractorIT.java | 2 +- .../airgap/IoTDBDataNodeAirGapConnector.java | 44 ------------- .../legacy/IoTDBLegacyPipeConnector.java | 21 ------ .../sync/IoTDBDataNodeSyncConnector.java | 38 ----------- .../iotdb/commons/utils/NodeUrlUtils.java | 64 ------------------- 5 files changed, 1 insertion(+), 168 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java index d45e924620f04..8a1b1de07b67e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java @@ -461,7 +461,7 @@ public void testExtractorTimeRangeMatch() throws Exception { TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); }; - boolean insertResult = true; + boolean insertResult; try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java index 009d966681261..13842b9b13684 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java @@ -19,66 +19,22 @@ package org.apache.iotdb.db.pipe.connector.protocol.airgap; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant; import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBAirGapConnector; -import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.UnknownHostException; import java.util.HashMap; -import java.util.Set; -import java.util.stream.Collectors; @TreeModel @TableModel public abstract class IoTDBDataNodeAirGapConnector extends IoTDBAirGapConnector { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeAirGapConnector.class); - - @Override - public void validate(final PipeParameterValidator validator) throws Exception { - super.validate(validator); - - final PipeConfig pipeConfig = PipeConfig.getInstance(); - final Set givenNodeUrls = parseNodeUrls(validator.getParameters()); - - validator.validate( - empty -> { - try { - // Ensure the sink doesn't point to the air gap receiver on DataNode itself - return !(pipeConfig.getPipeAirGapReceiverEnabled() - && NodeUrlUtils.containsLocalAddress( - givenNodeUrls.stream() - .filter( - tEndPoint -> - tEndPoint.getPort() == pipeConfig.getPipeAirGapReceiverPort()) - .map(TEndPoint::getIp) - .collect(Collectors.toList()))); - } catch (final UnknownHostException e) { - LOGGER.warn("Unknown host when checking pipe sink IP.", e); - return false; - } - }, - String.format( - "One of the endpoints %s of the receivers is pointing back to the air gap receiver %s on sender itself, or unknown host when checking pipe sink IP.", - givenNodeUrls, - new TEndPoint( - IoTDBDescriptor.getInstance().getConfig().getRpcAddress(), - pipeConfig.getPipeAirGapReceiverPort()))); - } - @Override protected boolean mayNeedHandshakeWhenFail() { return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java index 0327b6651076d..35cbd39149bd3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java @@ -28,7 +28,6 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient; -import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData; @@ -64,14 +63,12 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; @@ -139,24 +136,6 @@ public void validate(final PipeParameterValidator validator) throws Exception { parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY), parameters.hasAttribute(SINK_IOTDB_IP_KEY), parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) - .validate( - empty -> { - try { - // Ensure the sink doesn't point to the legacy receiver on DataNode itself - return !NodeUrlUtils.containsLocalAddress( - givenNodeUrls.stream() - .filter(tEndPoint -> tEndPoint.getPort() == ioTDBConfig.getRpcPort()) - .map(TEndPoint::getIp) - .collect(Collectors.toList())); - } catch (final UnknownHostException e) { - LOGGER.warn("Unknown host when checking pipe sink IP.", e); - return false; - } - }, - String.format( - "One of the endpoints %s of the receivers is pointing back to the legacy receiver %s on sender itself, or unknown host when checking pipe sink IP.", - givenNodeUrls, - new TEndPoint(ioTDBConfig.getRpcAddress(), ioTDBConfig.getRpcPort()))) .validate( args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean) args[2]), String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index dbd96fd7d1b2c..020c2b127b5af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -22,58 +22,20 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClientManager; import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBSslSyncConnector; -import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeSyncClientManager; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.UnknownHostException; import java.util.List; import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; @TreeModel @TableModel public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeSyncConnector.class); - protected IoTDBDataNodeSyncClientManager clientManager; - @Override - public void validate(final PipeParameterValidator validator) throws Exception { - super.validate(validator); - - final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig(); - final Set givenNodeUrls = parseNodeUrls(validator.getParameters()); - - validator.validate( - empty -> { - try { - // Ensure the sink doesn't point to the thrift receiver on DataNode itself - return !NodeUrlUtils.containsLocalAddress( - givenNodeUrls.stream() - .filter(tEndPoint -> tEndPoint.getPort() == iotdbConfig.getRpcPort()) - .map(TEndPoint::getIp) - .collect(Collectors.toList())); - } catch (final UnknownHostException e) { - LOGGER.warn("Unknown host when checking pipe sink IP.", e); - return false; - } - }, - String.format( - "One of the endpoints %s of the receivers is pointing back to the thrift receiver %s on sender itself, " - + "or unknown host when checking pipe sink IP.", - givenNodeUrls, new TEndPoint(iotdbConfig.getRpcAddress(), iotdbConfig.getRpcPort()))); - } - @Override protected IoTDBSyncClientManager constructClient( final List nodeUrls, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java index e8190fdf71425..8f3ae49446bd9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java @@ -27,15 +27,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.StringJoiner; -import java.util.stream.Collectors; public class NodeUrlUtils { @@ -185,63 +180,4 @@ public static List parseTConfigNodeUrls(String configNodeUr throws BadNodeUrlException { return parseTConfigNodeUrls(Arrays.asList(configNodeUrls.split(";"))); } - - /** - * Detect whether the given addresses or host names(may contain both) point to the node itself. - * - * @param addressesOrHostNames List of the addresses or host name to check - * @return true if one of the given strings point to the node itself - * @throws UnknownHostException Throw when unable to parse the given addresses or host names - */ - public static boolean containsLocalAddress(List addressesOrHostNames) - throws UnknownHostException { - if (addressesOrHostNames == null) { - return false; - } - - Set selfAddresses = getAllLocalAddresses(); - - for (String addressOrHostName : addressesOrHostNames) { - if (addressOrHostName == null) { - continue; - } - // Unify address or hostName, converting them to addresses - Set translatedAddresses = - Arrays.stream(InetAddress.getAllByName(addressOrHostName)) - .map(InetAddress::getHostAddress) - .collect(Collectors.toCollection(HashSet::new)); - translatedAddresses.retainAll(selfAddresses); - - if (!translatedAddresses.isEmpty()) { - return true; - } - } - - return false; - } - - /** - * Return all the internal, external, IPv4, IPv6 and loopback addresses(without hostname) of the - * node - * - * @return the local addresses set - * @throws UnknownHostException Throw when unable to self addresses or host names (Normally not - * thrown) - */ - public static Set getAllLocalAddresses() throws UnknownHostException { - // Check internal and external, IPv4 and IPv6 network addresses - Set selfAddresses = - Arrays.stream(InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) - .map(InetAddress::getHostAddress) - .collect(Collectors.toCollection(HashSet::new)); - // Check IPv4 and IPv6 loopback addresses 127.0.0.1 and 0.0.0.0.0.0.0.1 - selfAddresses.addAll( - Arrays.stream(InetAddress.getAllByName(LOOPBACK_HOST_NAME)) - .map(InetAddress::getHostAddress) - .collect(Collectors.toCollection(HashSet::new))); - // Check general address 0.0.0.0 - selfAddresses.add(WILD_CARD_ADDRESS); - - return selfAddresses; - } } From 5fe48656d70f574bbf7aa474444b0a56b1a0f30b Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 23 Jul 2025 14:58:03 +0800 Subject: [PATCH 2/2] fix --- .../iotdb/commons/utils/NodeUrlUtils.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java index 8f3ae49446bd9..e8190fdf71425 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java @@ -27,10 +27,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.StringJoiner; +import java.util.stream.Collectors; public class NodeUrlUtils { @@ -180,4 +185,63 @@ public static List parseTConfigNodeUrls(String configNodeUr throws BadNodeUrlException { return parseTConfigNodeUrls(Arrays.asList(configNodeUrls.split(";"))); } + + /** + * Detect whether the given addresses or host names(may contain both) point to the node itself. + * + * @param addressesOrHostNames List of the addresses or host name to check + * @return true if one of the given strings point to the node itself + * @throws UnknownHostException Throw when unable to parse the given addresses or host names + */ + public static boolean containsLocalAddress(List addressesOrHostNames) + throws UnknownHostException { + if (addressesOrHostNames == null) { + return false; + } + + Set selfAddresses = getAllLocalAddresses(); + + for (String addressOrHostName : addressesOrHostNames) { + if (addressOrHostName == null) { + continue; + } + // Unify address or hostName, converting them to addresses + Set translatedAddresses = + Arrays.stream(InetAddress.getAllByName(addressOrHostName)) + .map(InetAddress::getHostAddress) + .collect(Collectors.toCollection(HashSet::new)); + translatedAddresses.retainAll(selfAddresses); + + if (!translatedAddresses.isEmpty()) { + return true; + } + } + + return false; + } + + /** + * Return all the internal, external, IPv4, IPv6 and loopback addresses(without hostname) of the + * node + * + * @return the local addresses set + * @throws UnknownHostException Throw when unable to self addresses or host names (Normally not + * thrown) + */ + public static Set getAllLocalAddresses() throws UnknownHostException { + // Check internal and external, IPv4 and IPv6 network addresses + Set selfAddresses = + Arrays.stream(InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) + .map(InetAddress::getHostAddress) + .collect(Collectors.toCollection(HashSet::new)); + // Check IPv4 and IPv6 loopback addresses 127.0.0.1 and 0.0.0.0.0.0.0.1 + selfAddresses.addAll( + Arrays.stream(InetAddress.getAllByName(LOOPBACK_HOST_NAME)) + .map(InetAddress::getHostAddress) + .collect(Collectors.toCollection(HashSet::new))); + // Check general address 0.0.0.0 + selfAddresses.add(WILD_CARD_ADDRESS); + + return selfAddresses; + } }