Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions iotdb-core/datanode/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@
<artifactId>openapi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>sdk-server</artifactId>
<version>${milo.version}</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_WEBSOCKET_PORT_KEY = "connector.websocket.port";
public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080;

public static final String CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY = "connector.opcua.tcp.port";
public static final int CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE = 12686;

public static final String CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY = "connector.opcua.https.port";
public static final int CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE = 8443;

private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.connector.protocol.opcua;

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.record.Tablet;

import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.UUID;

import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;

/**
* Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data are converted into
* tablets, then eventNodes to send to the subscriber clients. Notice that there is no namespace
* since the eventNodes do not need to be saved.
*/
public class OpcUaConnector implements PipeConnector {

private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaConnector.class);

private OpcUaServer server;

@Override
public void validate(PipeParameterValidator validator) throws Exception {
// All the parameters are optional
}

@Override
public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
throws Exception {
int tcpBindPort =
parameters.getIntOrDefault(
CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE);
int httpsBindPort =
parameters.getIntOrDefault(
CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY, CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE);

String user =
parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
String password =
parameters.getStringOrDefault(
CONNECTOR_IOTDB_PASSWORD_KEY, CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);

server =
new OpcUaServerBuilder()
.setTcpBindPort(tcpBindPort)
.setHttpsBindPort(httpsBindPort)
.setUser(user)
.setPassword(password)
.build();
server.startup();
}

@Override
public void handshake() throws Exception {
// Server side, do nothing
}

@Override
public void heartbeat() throws Exception {
// Server side, do nothing
}

@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
// PipeProcessor can change the type of TabletInsertionEvent
if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
&& !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
LOGGER.warn(
"IoTDBThriftSyncConnector only support "
+ "PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. "
+ "Ignore {}.",
tabletInsertionEvent);
return;
}

if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
transferTablet(
server, ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).convertToTablet());
} else {
transferTablet(
server, ((PipeRawTabletInsertionEvent) tabletInsertionEvent).convertToTablet());
}
}

/**
* Transfer tablet into eventNodes and post it on the eventBus, so that they will be heard at the
* subscribers. Notice that an eventNode is reused to reduce object creation costs.
*
* @param server OpcUaServer
* @param tablet the tablet to send
* @throws UaException if failed to create event
*/
private void transferTablet(OpcUaServer server, Tablet tablet) throws UaException {
// There is no nameSpace, so that nameSpaceIndex is always 0
int pseudoNameSpaceIndex = 0;
BaseEventTypeNode eventNode =
server
.getEventFactory()
.createEvent(
new NodeId(pseudoNameSpaceIndex, UUID.randomUUID()), Identifiers.BaseEventType);
// Use eventNode here because other nodes doesn't support values and times simultaneously
for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); ++columnIndex) {

TSDataType dataType = tablet.getSchemas().get(columnIndex).getType();

// Source name --> Sensor path, like root.test.d_0.s_0
eventNode.setSourceName(
tablet.deviceId
+ TsFileConstant.PATH_SEPARATOR
+ tablet.getSchemas().get(columnIndex).getMeasurementId());

// Source node --> Sensor type, like double
eventNode.setSourceNode(convertToOpcDataType(dataType));

for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) {
// Filter null value
if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) {
continue;
}

// time --> timeStamp
eventNode.setTime(new DateTime(tablet.timestamps[rowIndex]));

// Message --> Value
switch (dataType) {
case BOOLEAN:
eventNode.setMessage(
LocalizedText.english(
Boolean.toString(((boolean[]) tablet.values[columnIndex])[rowIndex])));
break;
case INT32:
eventNode.setMessage(
LocalizedText.english(
Integer.toString(((int[]) tablet.values[columnIndex])[rowIndex])));
break;
case INT64:
eventNode.setMessage(
LocalizedText.english(
Long.toString(((long[]) tablet.values[columnIndex])[rowIndex])));
break;
case FLOAT:
eventNode.setMessage(
LocalizedText.english(
Float.toString(((float[]) tablet.values[columnIndex])[rowIndex])));
break;
case DOUBLE:
eventNode.setMessage(
LocalizedText.english(
Double.toString(((double[]) tablet.values[columnIndex])[rowIndex])));
break;
case TEXT:
eventNode.setMessage(
LocalizedText.english(
((Binary[]) tablet.values[columnIndex])[rowIndex].toString()));
break;
case VECTOR:
case UNKNOWN:
default:
throw new PipeRuntimeNonCriticalException(
"Unsupported data type: " + tablet.getSchemas().get(columnIndex).getType());
}

// Send the event
server.getEventBus().post(eventNode);
}
}
eventNode.delete();
}

private NodeId convertToOpcDataType(TSDataType type) {
switch (type) {
case BOOLEAN:
return Identifiers.Boolean;
case INT32:
return Identifiers.Int32;
case INT64:
return Identifiers.Int64;
case FLOAT:
return Identifiers.Float;
case DOUBLE:
return Identifiers.Double;
case TEXT:
return Identifiers.String;
case VECTOR:
case UNKNOWN:
default:
throw new PipeRuntimeNonCriticalException("Unsupported data type: " + type);
}
}

@Override
public void transfer(Event event) throws Exception {
// Do nothing when receive heartbeat or other events
}

@Override
public void close() throws Exception {
server.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.connector.protocol.opcua;

import com.google.common.collect.Sets;
import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder;
import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.file.Path;
import java.security.Key;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.cert.X509Certificate;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;

class OpcUaKeyStoreLoader {

private static final Pattern IP_ADDR_PATTERN =
Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");

private static final String SERVER_ALIAS = "server-ai";

private final Logger logger = LoggerFactory.getLogger(getClass());

private X509Certificate serverCertificate;
private KeyPair serverKeyPair;

OpcUaKeyStoreLoader load(Path baseDir, char[] password) throws Exception {
KeyStore keyStore = KeyStore.getInstance("PKCS12");

File serverKeyStore = baseDir.resolve("iotdb-server.pfx").toFile();

logger.info("Loading KeyStore at {}", serverKeyStore);

if (!serverKeyStore.exists()) {
keyStore.load(null, password);

KeyPair keyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048);

String applicationUri = "urn:apache:iotdb:opc-ua-server:" + UUID.randomUUID();

SelfSignedCertificateBuilder builder =
new SelfSignedCertificateBuilder(keyPair)
.setCommonName("Apache IoTDB OPC UA server")
.setOrganization("Apache")
.setOrganizationalUnit("dev")
.setLocalityName("Beijing")
.setStateName("China")
.setCountryCode("CN")
.setApplicationUri(applicationUri);

// Get as many hostnames and IP addresses as we can listed in the certificate.
Set<String> hostnames =
Sets.union(
Sets.newHashSet(HostnameUtil.getHostname()),
HostnameUtil.getHostnames("0.0.0.0", false));

for (String hostname : hostnames) {
if (IP_ADDR_PATTERN.matcher(hostname).matches()) {
builder.addIpAddress(hostname);
} else {
builder.addDnsName(hostname);
}
}

X509Certificate certificate = builder.build();

keyStore.setKeyEntry(
SERVER_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] {certificate});
keyStore.store(new FileOutputStream(serverKeyStore), password);
} else {
keyStore.load(new FileInputStream(serverKeyStore), password);
}

Key serverPrivateKey = keyStore.getKey(SERVER_ALIAS, password);
if (serverPrivateKey instanceof PrivateKey) {
serverCertificate = (X509Certificate) keyStore.getCertificate(SERVER_ALIAS);

PublicKey serverPublicKey = serverCertificate.getPublicKey();
serverKeyPair = new KeyPair(serverPublicKey, (PrivateKey) serverPrivateKey);
}

return this;
}

X509Certificate getServerCertificate() {
return serverCertificate;
}

KeyPair getServerKeyPair() {
return serverKeyPair;
}
}
Loading