diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java index 932c8bd6c2981..14567b01f1125 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java @@ -63,7 +63,18 @@ public void testOPCUASink() throws Exception { TSStatusCode.SUCCESS_STATUS.getStatusCode(), client .createPipe( - new TCreatePipeReq("testPipe", Collections.singletonMap("sink", "opc-ua-sink")) + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(Collections.emptyMap()) + .setProcessorAttributes(Collections.emptyMap())) + .getCode()); + + // Test conflict + connectorAttributes.put("password", "conflict"); + Assert.assertEquals( + TSStatusCode.PIPE_ERROR.getStatusCode(), + client + .createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) .setExtractorAttributes(Collections.emptyMap()) .setProcessorAttributes(Collections.emptyMap())) .getCode()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java index a406eb0d1f4d4..02d2017ba1125 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java @@ -39,6 +39,7 @@ import java.io.File; import java.util.Arrays; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -137,31 +138,41 @@ public void customize( nameSpace = SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP - .computeIfAbsent( + .compute( serverKey, - key -> { + (key, oldValue) -> { try { - final OpcUaServer newServer = - new OpcUaServerBuilder() - .setTcpBindPort(tcpBindPort) - .setHttpsBindPort(httpsBindPort) - .setUser(user) - .setPassword(password) - .setSecurityDir(securityDir) - .setEnableAnonymousAccess(enableAnonymousAccess) - .build(); - nameSpace = - new OpcUaNameSpace( - newServer, - parameters - .getStringOrDefault( - Arrays.asList( - CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY), - CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE) - .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE)); - nameSpace.startup(); - newServer.startup().get(); - return new Pair<>(new AtomicInteger(0), nameSpace); + if (Objects.isNull(oldValue)) { + final OpcUaServerBuilder builder = + new OpcUaServerBuilder() + .setTcpBindPort(tcpBindPort) + .setHttpsBindPort(httpsBindPort) + .setUser(user) + .setPassword(password) + .setSecurityDir(securityDir) + .setEnableAnonymousAccess(enableAnonymousAccess); + final OpcUaServer newServer = builder.build(); + nameSpace = + new OpcUaNameSpace( + newServer, + parameters + .getStringOrDefault( + Arrays.asList( + CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY), + CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE) + .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE), + builder); + nameSpace.startup(); + newServer.startup().get(); + return new Pair<>(new AtomicInteger(0), nameSpace); + } else { + oldValue + .getRight() + .checkEquals(user, password, securityDir, enableAnonymousAccess); + return oldValue; + } + } catch (final PipeException e) { + throw e; } catch (final Exception e) { throw new PipeException("Failed to build and startup OpcUaServer", e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java index db7dcb1bfe32b..38bb9fb1534d9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java @@ -52,6 +52,7 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; +import java.nio.file.Paths; import java.sql.Date; import java.time.LocalDate; import java.time.ZoneId; @@ -63,10 +64,15 @@ public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server"; private final boolean isClientServerModel; private final SubscriptionModel subscriptionModel; + private final OpcUaServerBuilder builder; - OpcUaNameSpace(final OpcUaServer server, final boolean isClientServerModel) { + OpcUaNameSpace( + final OpcUaServer server, + final boolean isClientServerModel, + final OpcUaServerBuilder builder) { super(server, NAMESPACE_URI); this.isClientServerModel = isClientServerModel; + this.builder = builder; subscriptionModel = new SubscriptionModel(server, this); getLifecycleManager().addLifecycle(subscriptionModel); @@ -370,4 +376,14 @@ public void onDataItemsDeleted(final List dataItems) { public void onMonitoringModeChanged(final List monitoredItems) { subscriptionModel.onMonitoringModeChanged(monitoredItems); } + + /////////////////////////////// Conflict detection /////////////////////////////// + + void checkEquals( + final String user, + final String password, + final String securityDir, + final boolean enableAnonymousAccess) { + builder.checkEquals(user, password, Paths.get(securityDir), enableAnonymousAccess); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java index 144d7c9dc2a56..316a4fb72b7de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java @@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -57,6 +58,7 @@ import java.security.cert.X509Certificate; import java.util.LinkedHashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import static com.google.common.collect.Lists.newArrayList; @@ -81,7 +83,7 @@ public class OpcUaServerBuilder { private Path securityDir; private boolean enableAnonymousAccess; - public OpcUaServerBuilder() { + OpcUaServerBuilder() { tcpBindPort = PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE; httpsBindPort = PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE; user = PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; @@ -91,37 +93,37 @@ public OpcUaServerBuilder() { PipeConnectorConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE; } - public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) { + OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) { this.tcpBindPort = tcpBindPort; return this; } - public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) { + OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) { this.httpsBindPort = httpsBindPort; return this; } - public OpcUaServerBuilder setUser(final String user) { + OpcUaServerBuilder setUser(final String user) { this.user = user; return this; } - public OpcUaServerBuilder setPassword(final String password) { + OpcUaServerBuilder setPassword(final String password) { this.password = password; return this; } - public OpcUaServerBuilder setSecurityDir(final String securityDir) { + OpcUaServerBuilder setSecurityDir(final String securityDir) { this.securityDir = Paths.get(securityDir); return this; } - public OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymousAccess) { + OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymousAccess) { this.enableAnonymousAccess = enableAnonymousAccess; return this; } - public OpcUaServer build() throws Exception { + OpcUaServer build() throws Exception { Files.createDirectories(securityDir); if (!Files.exists(securityDir)) { throw new PipeException("Unable to create security dir: " + securityDir); @@ -298,4 +300,29 @@ private EndpointConfiguration buildHttpsEndpoint( .setBindPort(httpsBindPort) .build(); } + + /////////////////////////////// Conflict detection /////////////////////////////// + + void checkEquals( + final String user, + final String password, + final Path securityDir, + final boolean enableAnonymousAccess) { + checkEquals("user", this.user, user); + checkEquals("password", this.password, password); + checkEquals( + "security dir", + FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()), + FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString())); + checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, enableAnonymousAccess); + } + + private void checkEquals(final String attrName, final Object thisAttr, final Object thatAttr) { + if (!Objects.equals(thisAttr, thatAttr)) { + throw new PipeException( + String.format( + "The existing server with tcp port %s and https port %s's %s %s conflicts to the new %s %s, reject reusing.", + tcpBindPort, httpsBindPort, attrName, thisAttr, attrName, thatAttr)); + } + } }