Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,18 @@ public void testOPCUASink() throws Exception {
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", Collections.singletonMap("sink", "opc-ua-sink"))
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(Collections.emptyMap())
.setProcessorAttributes(Collections.emptyMap()))
.getCode());

// Test conflict
connectorAttributes.put("password", "conflict");
Assert.assertEquals(
TSStatusCode.PIPE_ERROR.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(Collections.emptyMap())
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.io.File;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -137,31 +138,41 @@ public void customize(

nameSpace =
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP
.computeIfAbsent(
.compute(
serverKey,
key -> {
(key, oldValue) -> {
try {
final OpcUaServer newServer =
new OpcUaServerBuilder()
.setTcpBindPort(tcpBindPort)
.setHttpsBindPort(httpsBindPort)
.setUser(user)
.setPassword(password)
.setSecurityDir(securityDir)
.setEnableAnonymousAccess(enableAnonymousAccess)
.build();
nameSpace =
new OpcUaNameSpace(
newServer,
parameters
.getStringOrDefault(
Arrays.asList(
CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE));
nameSpace.startup();
newServer.startup().get();
return new Pair<>(new AtomicInteger(0), nameSpace);
if (Objects.isNull(oldValue)) {
final OpcUaServerBuilder builder =
new OpcUaServerBuilder()
.setTcpBindPort(tcpBindPort)
.setHttpsBindPort(httpsBindPort)
.setUser(user)
.setPassword(password)
.setSecurityDir(securityDir)
.setEnableAnonymousAccess(enableAnonymousAccess);
final OpcUaServer newServer = builder.build();
nameSpace =
new OpcUaNameSpace(
newServer,
parameters
.getStringOrDefault(
Arrays.asList(
CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
builder);
nameSpace.startup();
newServer.startup().get();
return new Pair<>(new AtomicInteger(0), nameSpace);
} else {
oldValue
.getRight()
.checkEquals(user, password, securityDir, enableAnonymousAccess);
return oldValue;
}
} catch (final PipeException e) {
throw e;
} catch (final Exception e) {
throw new PipeException("Failed to build and startup OpcUaServer", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;

import java.nio.file.Paths;
import java.sql.Date;
import java.time.LocalDate;
import java.time.ZoneId;
Expand All @@ -63,10 +64,15 @@ public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
private final boolean isClientServerModel;
private final SubscriptionModel subscriptionModel;
private final OpcUaServerBuilder builder;

OpcUaNameSpace(final OpcUaServer server, final boolean isClientServerModel) {
OpcUaNameSpace(
final OpcUaServer server,
final boolean isClientServerModel,
final OpcUaServerBuilder builder) {
super(server, NAMESPACE_URI);
this.isClientServerModel = isClientServerModel;
this.builder = builder;

subscriptionModel = new SubscriptionModel(server, this);
getLifecycleManager().addLifecycle(subscriptionModel);
Expand Down Expand Up @@ -370,4 +376,14 @@ public void onDataItemsDeleted(final List<DataItem> dataItems) {
public void onMonitoringModeChanged(final List<MonitoredItem> monitoredItems) {
subscriptionModel.onMonitoringModeChanged(monitoredItems);
}

/////////////////////////////// Conflict detection ///////////////////////////////

void checkEquals(
final String user,
final String password,
final String securityDir,
final boolean enableAnonymousAccess) {
builder.checkEquals(user, password, Paths.get(securityDir), enableAnonymousAccess);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import static com.google.common.collect.Lists.newArrayList;
Expand All @@ -81,7 +83,7 @@ public class OpcUaServerBuilder {
private Path securityDir;
private boolean enableAnonymousAccess;

public OpcUaServerBuilder() {
OpcUaServerBuilder() {
tcpBindPort = PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
httpsBindPort = PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
user = PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
Expand All @@ -91,37 +93,37 @@ public OpcUaServerBuilder() {
PipeConnectorConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
}

public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
this.tcpBindPort = tcpBindPort;
return this;
}

public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
this.httpsBindPort = httpsBindPort;
return this;
}

public OpcUaServerBuilder setUser(final String user) {
OpcUaServerBuilder setUser(final String user) {
this.user = user;
return this;
}

public OpcUaServerBuilder setPassword(final String password) {
OpcUaServerBuilder setPassword(final String password) {
this.password = password;
return this;
}

public OpcUaServerBuilder setSecurityDir(final String securityDir) {
OpcUaServerBuilder setSecurityDir(final String securityDir) {
this.securityDir = Paths.get(securityDir);
return this;
}

public OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymousAccess) {
OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymousAccess) {
this.enableAnonymousAccess = enableAnonymousAccess;
return this;
}

public OpcUaServer build() throws Exception {
OpcUaServer build() throws Exception {
Files.createDirectories(securityDir);
if (!Files.exists(securityDir)) {
throw new PipeException("Unable to create security dir: " + securityDir);
Expand Down Expand Up @@ -298,4 +300,29 @@ private EndpointConfiguration buildHttpsEndpoint(
.setBindPort(httpsBindPort)
.build();
}

/////////////////////////////// Conflict detection ///////////////////////////////

void checkEquals(
final String user,
final String password,
final Path securityDir,
final boolean enableAnonymousAccess) {
checkEquals("user", this.user, user);
checkEquals("password", this.password, password);
checkEquals(
"security dir",
FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()),
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, enableAnonymousAccess);
}

private void checkEquals(final String attrName, final Object thisAttr, final Object thatAttr) {
if (!Objects.equals(thisAttr, thatAttr)) {
throw new PipeException(
String.format(
"The existing server with tcp port %s and https port %s's %s %s conflicts to the new %s %s, reject reusing.",
tcpBindPort, httpsBindPort, attrName, thisAttr, attrName, thatAttr));
}
}
}