From e51159672e3c490a0728a6c9171f0e3182eaa1e7 Mon Sep 17 00:00:00 2001
From: Caideyipi <1336190078@qq.com>
Date: Mon, 28 Aug 2023 19:48:22 +0800
Subject: [PATCH 01/15] First version
---
iotdb-core/datanode/pom.xml | 5 +
.../constant/PipeConnectorConstant.java | 8 +
.../protocol/opcua/IoTDBKeyStoreLoader.java | 120 ++++++
.../protocol/opcua/IoTDBOpcUaConnector.java | 134 +++++++
.../protocol/opcua/IoTDBOpcUaServerUtils.java | 370 ++++++++++++++++++
.../PipeConnectorSubtaskManager.java | 4 +
.../planner/plan/node/write/InsertNode.java | 1 +
.../plugin/builtin/BuiltinPipePlugin.java | 2 +
.../connector/IoTDBOpcUaConnector.java | 28 ++
9 files changed, 672 insertions(+)
create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBKeyStoreLoader.java
create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java
create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java
create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBOpcUaConnector.java
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 3f04bf95a0e4a..dd117640e8b65 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -250,6 +250,11 @@
openapi
${project.version}
+
+ org.eclipse.milo
+ sdk-server
+ 0.6.10
+
commons-cli
commons-cli
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 2f65a39f86cd0..40c2c933507c5 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -51,6 +51,14 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY = "connector.version";
public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1";
+ public static final String CONNECTOR_IOTDB_OPC_UA_CONNECTOR_TCP_BIND_PORT_KEY =
+ "connector.tcp.bind.port";
+ public static final int CONNECTOR_IOTDB_OPC_UA_CONNECTOR_TCP_BIND_PORT_DEFAULT_VALUE = 12686;
+
+ public static final String CONNECTOR_IOTDB_OPC_UA_CONNECTOR_HTTPS_BIND_PORT_KEY =
+ "connector.https.bind.port";
+ public static final int CONNECTOR_IOTDB_OPC_UA_CONNECTOR_HTTPS_BIND_PORT_DEFAULT_VALUE = 8443;
+
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBKeyStoreLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBKeyStoreLoader.java
new file mode 100644
index 0000000000000..f5c841c80b2a5
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBKeyStoreLoader.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.opcua;
+
+import com.google.common.collect.Sets;
+import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Set;
+import java.util.UUID;
+import java.util.regex.Pattern;
+
+class IoTDBKeyStoreLoader {
+
+ private static final Pattern IP_ADDR_PATTERN =
+ Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
+
+ private static final String SERVER_ALIAS = "server-ai";
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private X509Certificate serverCertificate;
+ private KeyPair serverKeyPair;
+
+ IoTDBKeyStoreLoader load(Path baseDir, char[] password) throws Exception {
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+
+ File serverKeyStore = baseDir.resolve("example-server.pfx").toFile();
+
+ logger.info("Loading KeyStore at {}", serverKeyStore);
+
+ if (!serverKeyStore.exists()) {
+ keyStore.load(null, password);
+
+ KeyPair keyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
+
+ String applicationUri = "urn:apache:iotdb:opc-ua-server:" + UUID.randomUUID();
+
+ SelfSignedCertificateBuilder builder =
+ new SelfSignedCertificateBuilder(keyPair)
+ .setCommonName("Apache IoTDB OPC UA server")
+ .setOrganization("Apache")
+ .setOrganizationalUnit("dev")
+ .setLocalityName("Beijing")
+ .setStateName("China")
+ .setCountryCode("CN")
+ .setApplicationUri(applicationUri);
+
+ // Get as many hostnames and IP addresses as we can listed in the certificate.
+ Set hostnames =
+ Sets.union(
+ Sets.newHashSet(HostnameUtil.getHostname()),
+ HostnameUtil.getHostnames("0.0.0.0", false));
+
+ for (String hostname : hostnames) {
+ if (IP_ADDR_PATTERN.matcher(hostname).matches()) {
+ builder.addIpAddress(hostname);
+ } else {
+ builder.addDnsName(hostname);
+ }
+ }
+
+ X509Certificate certificate = builder.build();
+
+ keyStore.setKeyEntry(
+ SERVER_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] {certificate});
+ keyStore.store(new FileOutputStream(serverKeyStore), password);
+ } else {
+ keyStore.load(new FileInputStream(serverKeyStore), password);
+ }
+
+ Key serverPrivateKey = keyStore.getKey(SERVER_ALIAS, password);
+ if (serverPrivateKey instanceof PrivateKey) {
+ serverCertificate = (X509Certificate) keyStore.getCertificate(SERVER_ALIAS);
+
+ PublicKey serverPublicKey = serverCertificate.getPublicKey();
+ serverKeyPair = new KeyPair(serverPublicKey, (PrivateKey) serverPrivateKey);
+ }
+
+ return this;
+ }
+
+ X509Certificate getServerCertificate() {
+ return serverCertificate;
+ }
+
+ KeyPair getServerKeyPair() {
+ return serverKeyPair;
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java
new file mode 100644
index 0000000000000..5f7336877787f
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.opcua;
+
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_CONNECTOR_HTTPS_BIND_PORT_DEFAULT_VALUE;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_CONNECTOR_HTTPS_BIND_PORT_KEY;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_CONNECTOR_TCP_BIND_PORT_DEFAULT_VALUE;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_CONNECTOR_TCP_BIND_PORT_KEY;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
+
+/**
+ * Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data are converted into
+ * tablets, then eventNodes to send to the subscriber clients. Notice that there is no namespace
+ * since the eventNodes do not need to be saved.
+ */
+public class IoTDBOpcUaConnector implements PipeConnector {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBOpcUaConnector.class);
+
+ private OpcUaServer server;
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ // All the parameters are optional
+ }
+
+ @Override
+ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
+ throws Exception {
+ int tcpBindPort =
+ parameters.getIntOrDefault(
+ CONNECTOR_IOTDB_OPC_UA_CONNECTOR_TCP_BIND_PORT_KEY,
+ CONNECTOR_IOTDB_OPC_UA_CONNECTOR_TCP_BIND_PORT_DEFAULT_VALUE);
+ int httpsBindPort =
+ parameters.getIntOrDefault(
+ CONNECTOR_IOTDB_OPC_UA_CONNECTOR_HTTPS_BIND_PORT_KEY,
+ CONNECTOR_IOTDB_OPC_UA_CONNECTOR_HTTPS_BIND_PORT_DEFAULT_VALUE);
+
+ String user =
+ parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
+ String password =
+ parameters.getStringOrDefault(
+ CONNECTOR_IOTDB_PASSWORD_KEY, CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+
+ server = IoTDBOpcUaServerUtils.getIoTDBOpcUaServer(tcpBindPort, httpsBindPort, user, password);
+ server.startup();
+ }
+
+ @Override
+ public void handshake() throws Exception {
+ // Server side, do nothing
+ }
+
+ @Override
+ public void heartbeat() throws Exception {
+ // Server side, do nothing
+ }
+
+ @Override
+ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
+ // PipeProcessor can change the type of TabletInsertionEvent
+ if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+ && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+ LOGGER.warn(
+ "IoTDBThriftSyncConnector only support "
+ + "PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. "
+ + "Ignore {}.",
+ tabletInsertionEvent);
+ return;
+ }
+
+ if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) {
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ transfer(
+ ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
+ } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
+ transfer(((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
+ }
+ return;
+ }
+
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ IoTDBOpcUaServerUtils.transferTablet(
+ server, ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).convertToTablet());
+ } else {
+ IoTDBOpcUaServerUtils.transferTablet(
+ server, ((PipeRawTabletInsertionEvent) tabletInsertionEvent).convertToTablet());
+ }
+ }
+
+ @Override
+ public void transfer(Event event) throws Exception {
+ // Do nothing when receive heartbeat or other events
+ }
+
+ @Override
+ public void close() throws Exception {
+ server.shutdown();
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java
new file mode 100644
index 0000000000000..12ee9c977bd23
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.opcua;
+
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
+import org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig;
+import org.eclipse.milo.opcua.sdk.server.identity.CompositeValidator;
+import org.eclipse.milo.opcua.sdk.server.identity.UsernameIdentityValidator;
+import org.eclipse.milo.opcua.sdk.server.identity.X509IdentityValidator;
+import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode;
+import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.ServerTypeNode;
+import org.eclipse.milo.opcua.sdk.server.nodes.UaNode;
+import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.StatusCodes;
+import org.eclipse.milo.opcua.stack.core.UaException;
+import org.eclipse.milo.opcua.stack.core.UaRuntimeException;
+import org.eclipse.milo.opcua.stack.core.security.DefaultCertificateManager;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.transport.TransportProfile;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ByteString;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+import org.eclipse.milo.opcua.stack.core.types.structured.BuildInfo;
+import org.eclipse.milo.opcua.stack.core.util.CertificateUtil;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedHttpsCertificateBuilder;
+import org.eclipse.milo.opcua.stack.server.EndpointConfiguration;
+import org.eclipse.milo.opcua.stack.server.security.DefaultServerCertificateValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.*;
+import java.security.cert.X509Certificate;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT32;
+import static org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig.USER_TOKEN_POLICY_ANONYMOUS;
+import static org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig.USER_TOKEN_POLICY_USERNAME;
+import static org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig.USER_TOKEN_POLICY_X509;
+import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.ubyte;
+
+/**
+ * OPC UA Server builder for IoTDB to send data. The coding style referenced ExampleServer.java in
+ * Eclipse Milo.
+ */
+public class IoTDBOpcUaServerUtils {
+
+ private static final String WILD_CARD_ADDRESS = "0.0.0.0";
+ private static int eventId = 0;
+
+ public static OpcUaServer getIoTDBOpcUaServer(
+ int tcpBindPort, int httpsBindPort, String user, String password) throws Exception {
+ final Logger logger = LoggerFactory.getLogger(IoTDBOpcUaServerUtils.class);
+
+ Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "iotdb", "security");
+ Files.createDirectories(securityTempDir);
+ if (!Files.exists(securityTempDir)) {
+ throw new PipeException("Unable to create security temp dir: " + securityTempDir);
+ }
+
+ File pkiDir = securityTempDir.resolve("pki").toFile();
+
+ LoggerFactory.getLogger(IoTDBOpcUaServerUtils.class)
+ .info("Security dir: {}", securityTempDir.toAbsolutePath());
+ LoggerFactory.getLogger(IoTDBOpcUaServerUtils.class)
+ .info("Security pki dir: {}", pkiDir.getAbsolutePath());
+
+ IoTDBKeyStoreLoader loader =
+ new IoTDBKeyStoreLoader().load(securityTempDir, password.toCharArray());
+
+ DefaultCertificateManager certificateManager =
+ new DefaultCertificateManager(loader.getServerKeyPair(), loader.getServerCertificate());
+
+ DefaultTrustListManager trustListManager = new DefaultTrustListManager(pkiDir);
+ logger.info(
+ "Certificate directory is: {}, Please move certificates from the reject dir to the trusted directory to allow encrypted access",
+ pkiDir.getAbsolutePath());
+
+ KeyPair httpsKeyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
+
+ SelfSignedHttpsCertificateBuilder httpsCertificateBuilder =
+ new SelfSignedHttpsCertificateBuilder(httpsKeyPair);
+ httpsCertificateBuilder.setCommonName(HostnameUtil.getHostname());
+ HostnameUtil.getHostnames(WILD_CARD_ADDRESS).forEach(httpsCertificateBuilder::addDnsName);
+ X509Certificate httpsCertificate = httpsCertificateBuilder.build();
+
+ DefaultServerCertificateValidator certificateValidator =
+ new DefaultServerCertificateValidator(trustListManager);
+
+ UsernameIdentityValidator identityValidator =
+ new UsernameIdentityValidator(
+ true,
+ authChallenge -> {
+ String inputUsername = authChallenge.getUsername();
+ String inputPassword = authChallenge.getPassword();
+
+ return inputUsername.equals(user) && inputPassword.equals(password);
+ });
+
+ X509IdentityValidator x509IdentityValidator = new X509IdentityValidator(c -> true);
+
+ X509Certificate certificate =
+ certificateManager.getCertificates().stream()
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new UaRuntimeException(
+ StatusCodes.Bad_ConfigurationError, "No certificate found"));
+
+ String applicationUri =
+ CertificateUtil.getSanUri(certificate)
+ .orElseThrow(
+ () ->
+ new UaRuntimeException(
+ StatusCodes.Bad_ConfigurationError,
+ "Certificate is missing the application URI"));
+
+ Set endpointConfigurations =
+ createEndpointConfigurations(certificate, tcpBindPort, httpsBindPort);
+
+ OpcUaServerConfig serverConfig =
+ OpcUaServerConfig.builder()
+ .setApplicationUri(applicationUri)
+ .setApplicationName(LocalizedText.english("Apache IoTDB OPC UA server"))
+ .setEndpoints(endpointConfigurations)
+ .setBuildInfo(
+ new BuildInfo(
+ "urn:apache:iotdb:opc-ua-server",
+ "apache",
+ "Apache IoTDB OPC UA server",
+ OpcUaServer.SDK_VERSION,
+ "",
+ DateTime.now()))
+ .setCertificateManager(certificateManager)
+ .setTrustListManager(trustListManager)
+ .setCertificateValidator(certificateValidator)
+ .setHttpsKeyPair(httpsKeyPair)
+ .setHttpsCertificateChain(new X509Certificate[] {httpsCertificate})
+ .setIdentityValidator(new CompositeValidator(identityValidator, x509IdentityValidator))
+ .setProductUri("urn:apache:iotdb:opc-ua-server")
+ .build();
+
+ // Setup server to enable event posting
+ OpcUaServer server = new OpcUaServer(serverConfig);
+ UaNode serverNode =
+ server.getAddressSpaceManager().getManagedNode(Identifiers.Server).orElse(null);
+ if (serverNode instanceof ServerTypeNode) {
+ ((ServerTypeNode) serverNode).setEventNotifier(ubyte(1));
+ }
+ return server;
+ }
+
+ private static Set createEndpointConfigurations(
+ X509Certificate certificate, int tcpBindPort, int httpsBindPort) {
+ Set endpointConfigurations = new LinkedHashSet<>();
+
+ List bindAddresses = newArrayList();
+ bindAddresses.add(WILD_CARD_ADDRESS);
+
+ Set hostnames = new LinkedHashSet<>();
+ hostnames.add(HostnameUtil.getHostname());
+ hostnames.addAll(HostnameUtil.getHostnames(WILD_CARD_ADDRESS));
+
+ for (String bindAddress : bindAddresses) {
+ for (String hostname : hostnames) {
+ EndpointConfiguration.Builder builder =
+ EndpointConfiguration.newBuilder()
+ .setBindAddress(bindAddress)
+ .setHostname(hostname)
+ .setPath("/iotdb")
+ .setCertificate(certificate)
+ .addTokenPolicies(
+ USER_TOKEN_POLICY_ANONYMOUS,
+ USER_TOKEN_POLICY_USERNAME,
+ USER_TOKEN_POLICY_X509);
+
+ EndpointConfiguration.Builder noSecurityBuilder =
+ builder
+ .copy()
+ .setSecurityPolicy(SecurityPolicy.None)
+ .setSecurityMode(MessageSecurityMode.None);
+
+ endpointConfigurations.add(buildTcpEndpoint(noSecurityBuilder, tcpBindPort));
+ endpointConfigurations.add(buildHttpsEndpoint(noSecurityBuilder, httpsBindPort));
+
+ endpointConfigurations.add(
+ buildTcpEndpoint(
+ builder
+ .copy()
+ .setSecurityPolicy(SecurityPolicy.Basic256Sha256)
+ .setSecurityMode(MessageSecurityMode.SignAndEncrypt),
+ tcpBindPort));
+
+ endpointConfigurations.add(
+ buildHttpsEndpoint(
+ builder
+ .copy()
+ .setSecurityPolicy(SecurityPolicy.Basic256Sha256)
+ .setSecurityMode(MessageSecurityMode.Sign),
+ httpsBindPort));
+
+ EndpointConfiguration.Builder discoveryBuilder =
+ builder
+ .copy()
+ .setPath("/iotdb/discovery")
+ .setSecurityPolicy(SecurityPolicy.None)
+ .setSecurityMode(MessageSecurityMode.None);
+
+ endpointConfigurations.add(buildTcpEndpoint(discoveryBuilder, tcpBindPort));
+ endpointConfigurations.add(buildHttpsEndpoint(discoveryBuilder, httpsBindPort));
+ }
+ }
+
+ return endpointConfigurations;
+ }
+
+ private static EndpointConfiguration buildTcpEndpoint(
+ EndpointConfiguration.Builder base, int tcpBindPort) {
+ return base.copy()
+ .setTransportProfile(TransportProfile.TCP_UASC_UABINARY)
+ .setBindPort(tcpBindPort)
+ .build();
+ }
+
+ private static EndpointConfiguration buildHttpsEndpoint(
+ EndpointConfiguration.Builder base, int httpsBindPort) {
+ return base.copy()
+ .setTransportProfile(TransportProfile.HTTPS_UABINARY)
+ .setBindPort(httpsBindPort)
+ .build();
+ }
+
+ /**
+ * Transfer tablet into eventNodes and post it on the eventBus, so that they will be heard at the
+ * subscribers. Notice that an eventNode is reused to reduce object creation costs.
+ *
+ * @param server OpcUaServer
+ * @param tablet the tablet to send
+ * @throws UaException if failed to create event
+ */
+ public static void transferTablet(OpcUaServer server, Tablet tablet) throws UaException {
+ // There is no nameSpace, so that nameSpaceIndex is always 0
+ int pseudoNameSpaceIndex = 0;
+ BaseEventTypeNode eventNode =
+ server
+ .getEventFactory()
+ .createEvent(new NodeId(pseudoNameSpaceIndex, eventId++), Identifiers.BaseEventType);
+
+ // Use eventNode here because other nodes doesn't support values and times simultaneously
+ for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); ++columnIndex) {
+
+ TSDataType dataType = tablet.getSchemas().get(columnIndex).getType();
+
+ // Source name --> Sensor path, like root.test.d_0.s_0
+ eventNode.setSourceName(
+ tablet.deviceId + tablet.getSchemas().get(columnIndex).getMeasurementId());
+
+ // Source node --> Sensor type, like double
+ eventNode.setSourceNode(convertToOpcDataType(dataType));
+
+ // time --> timeStamp
+ eventNode.setTime(new DateTime(tablet.timestamps[columnIndex]));
+
+ for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) {
+ // Filter null value
+ if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) {
+ continue;
+ }
+
+ // Message --> Value
+ switch (dataType) {
+ case INT32:
+ eventNode.setMessage(
+ LocalizedText.english(
+ Integer.toString(((int[]) tablet.values[columnIndex])[rowIndex])));
+ break;
+ case INT64:
+ eventNode.setMessage(
+ LocalizedText.english(
+ Long.toString(((long[]) tablet.values[columnIndex])[rowIndex])));
+ break;
+ case FLOAT:
+ eventNode.setMessage(
+ LocalizedText.english(
+ Float.toString(((float[]) tablet.values[columnIndex])[rowIndex])));
+ break;
+ case DOUBLE:
+ eventNode.setMessage(
+ LocalizedText.english(
+ Double.toString(((double[]) tablet.values[columnIndex])[rowIndex])));
+ break;
+ case TEXT:
+ eventNode.setMessage(
+ LocalizedText.english(
+ ((Binary[]) tablet.values[columnIndex])[rowIndex].toString()));
+ break;
+ case VECTOR:
+ case UNKNOWN:
+ default:
+ throw new PipeRuntimeNonCriticalException(
+ "Unsupported data type: " + tablet.getSchemas().get(columnIndex).getType());
+ }
+
+ // Reset the eventId each time
+ eventNode.setEventId(ByteString.of(Integer.toString(eventId++).getBytes()));
+
+ // Send the event
+ server.getEventBus().post(eventNode);
+ }
+ }
+ eventNode.delete();
+ }
+
+ private static NodeId convertToOpcDataType(TSDataType type) {
+ switch (type) {
+ case INT32:
+ return Identifiers.Int32;
+ case INT64:
+ return Identifiers.Int64;
+ case FLOAT:
+ return Identifiers.Float;
+ case DOUBLE:
+ return Identifiers.Double;
+ case TEXT:
+ return Identifiers.String;
+ case VECTOR:
+ case UNKNOWN:
+ default:
+ throw new PipeRuntimeNonCriticalException("Unsupported data type: " + type);
+ }
+ }
+
+ private IoTDBOpcUaServerUtils() {
+ // Utility Class
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index fb06743874ffb..bce5ac654b940 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -26,6 +26,7 @@
import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBAirGapConnector;
import org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
+import org.apache.iotdb.db.pipe.connector.protocol.opcua.IoTDBOpcUaConnector;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
@@ -78,6 +79,9 @@ public synchronized String register(
} else if (connectorKey.equals(
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
pipeConnector = new IoTDBAirGapConnector();
+ } else if (connectorKey.equals(
+ BuiltinPipePlugin.IOTDB_OPC_UA_CONNECTOR.getPipePluginName())) {
+ pipeConnector = new IoTDBOpcUaConnector();
} else {
pipeConnector = PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index abca512e1e8d1..61edb88c0a593 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index ef973b1cc306d..659b0cb4a49c8 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBAirGapConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBLegacyPipeConnector;
+import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBOpcUaConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector;
@@ -43,6 +44,7 @@ public enum BuiltinPipePlugin {
IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncConnector.class),
IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeConnector.class),
IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapConnector.class),
+ IOTDB_OPC_UA_CONNECTOR("iotdb-opc-ua-connector", IoTDBOpcUaConnector.class);
;
private final String pipePluginName;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBOpcUaConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBOpcUaConnector.java
new file mode 100644
index 0000000000000..e5636a23a4c0c
--- /dev/null
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBOpcUaConnector.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents the IoTDB OPC UA
+ * connector. There is a real implementation in the server module but cannot be imported here. The
+ * pipe agent in the server module will replace this class with the real implementation when
+ * initializing the IoTDB OPC UA connector.
+ */
+public class IoTDBOpcUaConnector extends PlaceholderConnector {}
From 3d14a9b6961e7a6d95fe8815f82939e78b56e444 Mon Sep 17 00:00:00 2001
From: Caideyipi <1336190078@qq.com>
Date: Mon, 28 Aug 2023 20:31:02 +0800
Subject: [PATCH 02/15] Debug
---
.../pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java
index 12ee9c977bd23..62f918048bc16 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java
@@ -56,18 +56,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
+import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.security.*;
+import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import static com.google.common.collect.Lists.newArrayList;
-import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT32;
import static org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig.USER_TOKEN_POLICY_ANONYMOUS;
import static org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig.USER_TOKEN_POLICY_USERNAME;
import static org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig.USER_TOKEN_POLICY_X509;
From 7886a7259a88b514f80f89e16d64db345e23f6f8 Mon Sep 17 00:00:00 2001
From: Caideyipi <1336190078@qq.com>
Date: Wed, 30 Aug 2023 10:08:21 +0800
Subject: [PATCH 03/15] Bug fix
---
.../protocol/opcua/IoTDBKeyStoreLoader.java | 2 +-
.../protocol/opcua/IoTDBOpcUaServerUtils.java | 12 ++++++------
.../protocol/websocket/WebsocketConnector.java | 12 +++++++++---
3 files changed, 16 insertions(+), 10 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBKeyStoreLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBKeyStoreLoader.java
index f5c841c80b2a5..a1d0b949285d3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBKeyStoreLoader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBKeyStoreLoader.java
@@ -55,7 +55,7 @@ class IoTDBKeyStoreLoader {
IoTDBKeyStoreLoader load(Path baseDir, char[] password) throws Exception {
KeyStore keyStore = KeyStore.getInstance("PKCS12");
- File serverKeyStore = baseDir.resolve("example-server.pfx").toFile();
+ File serverKeyStore = baseDir.resolve("iotdb-server.pfx").toFile();
logger.info("Loading KeyStore at {}", serverKeyStore);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java
index 62f918048bc16..2a8fb432cbee0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java
@@ -65,6 +65,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import static com.google.common.collect.Lists.newArrayList;
import static org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig.USER_TOKEN_POLICY_ANONYMOUS;
@@ -277,8 +278,7 @@ public static void transferTablet(OpcUaServer server, Tablet tablet) throws UaEx
BaseEventTypeNode eventNode =
server
.getEventFactory()
- .createEvent(new NodeId(pseudoNameSpaceIndex, eventId++), Identifiers.BaseEventType);
-
+ .createEvent(new NodeId(pseudoNameSpaceIndex, UUID.randomUUID()), Identifiers.BaseEventType);
// Use eventNode here because other nodes doesn't support values and times simultaneously
for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); ++columnIndex) {
@@ -286,20 +286,20 @@ public static void transferTablet(OpcUaServer server, Tablet tablet) throws UaEx
// Source name --> Sensor path, like root.test.d_0.s_0
eventNode.setSourceName(
- tablet.deviceId + tablet.getSchemas().get(columnIndex).getMeasurementId());
+ tablet.deviceId + "." + tablet.getSchemas().get(columnIndex).getMeasurementId());
// Source node --> Sensor type, like double
eventNode.setSourceNode(convertToOpcDataType(dataType));
- // time --> timeStamp
- eventNode.setTime(new DateTime(tablet.timestamps[columnIndex]));
-
for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) {
// Filter null value
if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) {
continue;
}
+ // time --> timeStamp
+ eventNode.setTime(new DateTime(tablet.timestamps[rowIndex]));
+
// Message --> Value
switch (dataType) {
case INT32:
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java
index f9a01b3fc20e7..f9a577b7587c6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java
@@ -54,7 +54,9 @@ public class WebsocketConnector implements PipeConnector {
new PriorityQueue<>(Comparator.comparing(o -> o.left));
@Override
- public void validate(PipeParameterValidator validator) throws Exception {}
+ public void validate(PipeParameterValidator validator) throws Exception {
+ // The port is optional
+ }
@Override
public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
@@ -74,7 +76,9 @@ public void handshake() throws Exception {
}
@Override
- public void heartbeat() throws Exception {}
+ public void heartbeat() throws Exception {
+ // Server side, do nothing
+ }
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) {
@@ -107,7 +111,9 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception
}
@Override
- public void transfer(Event event) throws Exception {}
+ public void transfer(Event event) throws Exception {
+ // Do nothing when receive heartbeat or other events
+ }
@Override
public void close() throws Exception {
From 52a6a28118e3ea0839c8cae31e216d8a6b644453 Mon Sep 17 00:00:00 2001
From: Caideyipi <1336190078@qq.com>
Date: Wed, 30 Aug 2023 12:40:56 +0800
Subject: [PATCH 04/15] Refactor
---
.../dml/insertion/TabletInsertionEvent.java | 22 +++++++++
.../constant/PipeConnectorConstant.java | 14 +++---
.../protocol/airgap/IoTDBAirGapConnector.java | 7 +--
.../legacy/IoTDBLegacyPipeConnector.java | 34 ++++----------
.../protocol/opcua/IoTDBOpcUaConnector.java | 45 ++++++++++---------
.../async/IoTDBThriftAsyncConnector.java | 7 +--
.../thrift/sync/IoTDBThriftSyncConnector.java | 7 +--
...Connector.java => WebSocketConnector.java} | 14 +++---
.../websocket/WebSocketConnectorServer.java | 5 ++-
.../PipeInsertNodeTabletInsertionEvent.java | 15 +++----
.../tablet/PipeRawTabletInsertionEvent.java | 17 +++----
.../PipeConnectorSubtaskManager.java | 4 +-
12 files changed, 93 insertions(+), 98 deletions(-)
rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/{WebsocketConnector.java => WebSocketConnector.java} (92%)
diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index 56244574214ca..24a6992131671 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -44,4 +44,26 @@ public interface TabletInsertionEvent extends Event {
* results collected by the RowCollector
*/
Iterable processTablet(BiConsumer consumer);
+
+ /**
+ * Convert the data contained to tablet.
+ *
+ * @return the result tablet
+ */
+ Tablet convertToTablet();
+
+ /**
+ * Parse the event to resolve its pattern. The parsing process is done in
+ * PipeRawTabletInsertionEvent.
+ *
+ * @return the converted PipeRawTabletInsertionEvent
+ */
+ TabletInsertionEvent parseEventWithPattern();
+
+ /**
+ * Return whether the tablet is aligned.
+ *
+ * @return true if the tablet is aligned
+ */
+ boolean isAligned();
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 2769da6d24d41..da3c9d8cb82f0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -54,13 +54,15 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_WEBSOCKET_PORT_KEY = "connector.websocket.port";
public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080;
- public static final String CONNECTOR_IOTDB_OPC_UA_CONNECTOR_TCP_BIND_PORT_KEY =
- "connector.tcp.bind.port";
- public static final int CONNECTOR_IOTDB_OPC_UA_CONNECTOR_TCP_BIND_PORT_DEFAULT_VALUE = 12686;
+ public static final String CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_KEY = "connector.tcp.port";
+ public static final int CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE = 12686;
- public static final String CONNECTOR_IOTDB_OPC_UA_CONNECTOR_HTTPS_BIND_PORT_KEY =
- "connector.https.bind.port";
- public static final int CONNECTOR_IOTDB_OPC_UA_CONNECTOR_HTTPS_BIND_PORT_DEFAULT_VALUE = 8443;
+ public static final String CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_KEY = "connector.https.port";
+ public static final int CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE = 8443;
+
+ public static final String CONNECTOR_IOTDB_OPC_UA_ENABLE_CACHE_DATA_KEY =
+ "connector.cache.enable";
+ public static final boolean CONNECTOR_IOTDB_OPC_UA_ENABLE_CACHE_DATA_DEFAULT_VALUE = true;
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index 854205588f6db..6e4b8d21a6742 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -187,12 +187,7 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
}
if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) {
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- transfer(
- ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
- } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
- transfer(((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
- }
+ transfer((tabletInsertionEvent).parseEventWithPattern());
return;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 2fa4276289781..76e0fd8cde278 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -41,8 +41,6 @@
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
@@ -167,10 +165,14 @@ public void heartbeat() throws Exception {
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
- } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
- doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent
+ || tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+ final Tablet tablet = tabletInsertionEvent.convertToTablet();
+ if (tabletInsertionEvent.isAligned()) {
+ sessionPool.insertAlignedTablet(tablet);
+ } else {
+ sessionPool.insertTablet(tablet);
+ }
} else {
throw new NotImplementedException(
"IoTDBLegacyPipeConnector only support "
@@ -202,26 +204,6 @@ public void transfer(Event event) throws Exception {
}
}
- private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeInsertionEvent)
- throws IoTDBConnectionException, StatementExecutionException {
- final Tablet tablet = pipeInsertNodeInsertionEvent.convertToTablet();
- if (pipeInsertNodeInsertionEvent.isAligned()) {
- sessionPool.insertAlignedTablet(tablet);
- } else {
- sessionPool.insertTablet(tablet);
- }
- }
-
- private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent)
- throws PipeException, IoTDBConnectionException, StatementExecutionException {
- final Tablet tablet = pipeTabletInsertionEvent.convertToTablet();
- if (pipeTabletInsertionEvent.isAligned()) {
- sessionPool.insertAlignedTablet(tablet);
- } else {
- sessionPool.insertTablet(tablet);
- }
- }
-
private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
throws PipeException, TException, InterruptedException, IOException {
pipeTsFileInsertionEvent.waitForTsFileClose();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java
index 5f7336877787f..45af6446c48fc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java
@@ -28,15 +28,21 @@
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_CONNECTOR_HTTPS_BIND_PORT_DEFAULT_VALUE;
-import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_CONNECTOR_HTTPS_BIND_PORT_KEY;
-import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_CONNECTOR_TCP_BIND_PORT_DEFAULT_VALUE;
-import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_CONNECTOR_TCP_BIND_PORT_KEY;
+import java.util.Comparator;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_ENABLE_CACHE_DATA_DEFAULT_VALUE;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_ENABLE_CACHE_DATA_KEY;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_KEY;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
@@ -52,6 +58,9 @@ public class IoTDBOpcUaConnector implements PipeConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBOpcUaConnector.class);
private OpcUaServer server;
+ private boolean enableCacheData;
+ private final PriorityBlockingQueue> events =
+ new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
@Override
public void validate(PipeParameterValidator validator) throws Exception {
@@ -63,12 +72,12 @@ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfigurati
throws Exception {
int tcpBindPort =
parameters.getIntOrDefault(
- CONNECTOR_IOTDB_OPC_UA_CONNECTOR_TCP_BIND_PORT_KEY,
- CONNECTOR_IOTDB_OPC_UA_CONNECTOR_TCP_BIND_PORT_DEFAULT_VALUE);
+ CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_KEY,
+ CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE);
int httpsBindPort =
parameters.getIntOrDefault(
- CONNECTOR_IOTDB_OPC_UA_CONNECTOR_HTTPS_BIND_PORT_KEY,
- CONNECTOR_IOTDB_OPC_UA_CONNECTOR_HTTPS_BIND_PORT_DEFAULT_VALUE);
+ CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_KEY,
+ CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE);
String user =
parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
@@ -78,6 +87,11 @@ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfigurati
server = IoTDBOpcUaServerUtils.getIoTDBOpcUaServer(tcpBindPort, httpsBindPort, user, password);
server.startup();
+
+ enableCacheData =
+ parameters.getBooleanOrDefault(
+ CONNECTOR_IOTDB_OPC_UA_ENABLE_CACHE_DATA_KEY,
+ CONNECTOR_IOTDB_OPC_UA_ENABLE_CACHE_DATA_DEFAULT_VALUE);
}
@Override
@@ -104,22 +118,11 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
}
if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) {
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- transfer(
- ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
- } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
- transfer(((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
- }
+ transfer(tabletInsertionEvent.parseEventWithPattern());
return;
}
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- IoTDBOpcUaServerUtils.transferTablet(
- server, ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).convertToTablet());
- } else {
- IoTDBOpcUaServerUtils.transferTablet(
- server, ((PipeRawTabletInsertionEvent) tabletInsertionEvent).convertToTablet());
- }
+ IoTDBOpcUaServerUtils.transferTablet(server, tabletInsertionEvent.convertToTablet());
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index d983da389bda5..d39b170bd2740 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -151,12 +151,7 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
}
if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) {
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- transfer(
- ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
- } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
- transfer(((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
- }
+ transfer((tabletInsertionEvent).parseEventWithPattern());
return;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index f75bc7c612a16..12842fd73d775 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -186,12 +186,7 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
}
if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) {
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- transfer(
- ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
- } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
- transfer(((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
- }
+ transfer((tabletInsertionEvent).parseEventWithPattern());
return;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
similarity index 92%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index f9a577b7587c6..4680c21ad40f2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -43,8 +43,8 @@
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicLong;
-public class WebsocketConnector implements PipeConnector {
- private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketConnector.class);
+public class WebSocketConnector implements PipeConnector {
+ private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConnector.class);
private WebSocketConnectorServer server;
private int port;
@@ -85,14 +85,14 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) {
if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
&& !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
LOGGER.warn(
- "WebsocketConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. "
+ "WebSocketConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. "
+ "Current event: {}.",
tabletInsertionEvent);
return;
}
long commitId = commitIdGenerator.incrementAndGet();
((EnrichedEvent) tabletInsertionEvent)
- .increaseReferenceCount(WebsocketConnector.class.getName());
+ .increaseReferenceCount(WebSocketConnector.class.getName());
server.addEvent(new Pair<>(commitId, tabletInsertionEvent));
}
@@ -100,13 +100,13 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) {
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
LOGGER.warn(
- "WebsocketConnector only support PipeTsFileInsertionEvent. Current event: {}.",
+ "WebSocketConnector only support PipeTsFileInsertionEvent. Current event: {}.",
tsFileInsertionEvent);
return;
}
long commitId = commitIdGenerator.incrementAndGet();
((EnrichedEvent) tsFileInsertionEvent)
- .increaseReferenceCount(WebsocketConnector.class.getName());
+ .increaseReferenceCount(WebSocketConnector.class.getName());
server.addEvent(new Pair<>(commitId, tsFileInsertionEvent));
}
@@ -128,7 +128,7 @@ public synchronized void commit(long requestCommitId, @Nullable EnrichedEvent en
Optional.ofNullable(enrichedEvent)
.ifPresent(
event ->
- event.decreaseReferenceCount(WebsocketConnector.class.getName()))));
+ event.decreaseReferenceCount(WebSocketConnector.class.getName()))));
while (!commitQueue.isEmpty()) {
final Pair committer = commitQueue.peek();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
index c7993045e368d..aab84e429764a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.pipe.connector.protocol.websocket;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
@@ -46,12 +47,12 @@ public class WebSocketConnectorServer extends WebSocketServer {
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConnectorServer.class);
private final PriorityBlockingQueue> events =
new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
- private final WebsocketConnector websocketConnector;
+ private final WebSocketConnector websocketConnector;
private final ConcurrentMap eventMap = new ConcurrentHashMap<>();
public WebSocketConnectorServer(
- InetSocketAddress address, WebsocketConnector websocketConnector) {
+ InetSocketAddress address, WebSocketConnector websocketConnector) {
super(address);
this.websocketConnector = websocketConnector;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 18aa4d721ecea..847d82cda336d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -166,12 +166,7 @@ public Iterable processTablet(BiConsumer processTablet(BiConsumer
Date: Wed, 30 Aug 2023 12:43:25 +0800
Subject: [PATCH 05/15] remove cache
---
.../config/constant/PipeConnectorConstant.java | 4 ----
.../protocol/opcua/IoTDBOpcUaConnector.java | 14 --------------
2 files changed, 18 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index da3c9d8cb82f0..2f09bf84c2462 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -60,10 +60,6 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_KEY = "connector.https.port";
public static final int CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE = 8443;
- public static final String CONNECTOR_IOTDB_OPC_UA_ENABLE_CACHE_DATA_KEY =
- "connector.cache.enable";
- public static final boolean CONNECTOR_IOTDB_OPC_UA_ENABLE_CACHE_DATA_DEFAULT_VALUE = true;
-
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java
index 45af6446c48fc..fe972dadb3260 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java
@@ -28,17 +28,11 @@
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Comparator;
-import java.util.concurrent.PriorityBlockingQueue;
-
-import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_ENABLE_CACHE_DATA_DEFAULT_VALUE;
-import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_ENABLE_CACHE_DATA_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
@@ -58,9 +52,6 @@ public class IoTDBOpcUaConnector implements PipeConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBOpcUaConnector.class);
private OpcUaServer server;
- private boolean enableCacheData;
- private final PriorityBlockingQueue> events =
- new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
@Override
public void validate(PipeParameterValidator validator) throws Exception {
@@ -87,11 +78,6 @@ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfigurati
server = IoTDBOpcUaServerUtils.getIoTDBOpcUaServer(tcpBindPort, httpsBindPort, user, password);
server.startup();
-
- enableCacheData =
- parameters.getBooleanOrDefault(
- CONNECTOR_IOTDB_OPC_UA_ENABLE_CACHE_DATA_KEY,
- CONNECTOR_IOTDB_OPC_UA_ENABLE_CACHE_DATA_DEFAULT_VALUE);
}
@Override
From 9fbaf6d5f7052426fdcc23b715a4b5879042cd7a Mon Sep 17 00:00:00 2001
From: Caideyipi <1336190078@qq.com>
Date: Mon, 4 Sep 2023 17:20:41 +0800
Subject: [PATCH 06/15] Modifying
---
...pcUaConnector.java => OpcUaConnector.java} | 14 ++++--------
...ServerUtils.java => OpcUaServerUtils.java} | 22 +++++++++++++------
.../PipeConnectorSubtaskManager.java | 4 ++--
3 files changed, 21 insertions(+), 19 deletions(-)
rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/{IoTDBOpcUaConnector.java => OpcUaConnector.java} (89%)
rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/{IoTDBOpcUaServerUtils.java => OpcUaServerUtils.java} (95%)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
similarity index 89%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index fe972dadb3260..51a9f21706522 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.connector.protocol.opcua;
-import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -47,9 +46,9 @@
* tablets, then eventNodes to send to the subscriber clients. Notice that there is no namespace
* since the eventNodes do not need to be saved.
*/
-public class IoTDBOpcUaConnector implements PipeConnector {
+public class OpcUaConnector implements PipeConnector {
- private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBOpcUaConnector.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaConnector.class);
private OpcUaServer server;
@@ -76,7 +75,7 @@ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfigurati
parameters.getStringOrDefault(
CONNECTOR_IOTDB_PASSWORD_KEY, CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
- server = IoTDBOpcUaServerUtils.getIoTDBOpcUaServer(tcpBindPort, httpsBindPort, user, password);
+ server = OpcUaServerUtils.getOpcUaServer(tcpBindPort, httpsBindPort, user, password);
server.startup();
}
@@ -103,12 +102,7 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
return;
}
- if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) {
- transfer(tabletInsertionEvent.parseEventWithPattern());
- return;
- }
-
- IoTDBOpcUaServerUtils.transferTablet(server, tabletInsertionEvent.convertToTablet());
+ OpcUaServerUtils.transferTablet(server, tabletInsertionEvent.convertToTablet());
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerUtils.java
similarity index 95%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerUtils.java
index 2a8fb432cbee0..d181b9401fad2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBOpcUaServerUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerUtils.java
@@ -77,14 +77,14 @@
* OPC UA Server builder for IoTDB to send data. The coding style referenced ExampleServer.java in
* Eclipse Milo.
*/
-public class IoTDBOpcUaServerUtils {
+public class OpcUaServerUtils {
private static final String WILD_CARD_ADDRESS = "0.0.0.0";
private static int eventId = 0;
- public static OpcUaServer getIoTDBOpcUaServer(
+ public static OpcUaServer getOpcUaServer(
int tcpBindPort, int httpsBindPort, String user, String password) throws Exception {
- final Logger logger = LoggerFactory.getLogger(IoTDBOpcUaServerUtils.class);
+ final Logger logger = LoggerFactory.getLogger(OpcUaServerUtils.class);
Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "iotdb", "security");
Files.createDirectories(securityTempDir);
@@ -94,9 +94,9 @@ public static OpcUaServer getIoTDBOpcUaServer(
File pkiDir = securityTempDir.resolve("pki").toFile();
- LoggerFactory.getLogger(IoTDBOpcUaServerUtils.class)
+ LoggerFactory.getLogger(OpcUaServerUtils.class)
.info("Security dir: {}", securityTempDir.toAbsolutePath());
- LoggerFactory.getLogger(IoTDBOpcUaServerUtils.class)
+ LoggerFactory.getLogger(OpcUaServerUtils.class)
.info("Security pki dir: {}", pkiDir.getAbsolutePath());
IoTDBKeyStoreLoader loader =
@@ -278,7 +278,8 @@ public static void transferTablet(OpcUaServer server, Tablet tablet) throws UaEx
BaseEventTypeNode eventNode =
server
.getEventFactory()
- .createEvent(new NodeId(pseudoNameSpaceIndex, UUID.randomUUID()), Identifiers.BaseEventType);
+ .createEvent(
+ new NodeId(pseudoNameSpaceIndex, UUID.randomUUID()), Identifiers.BaseEventType);
// Use eventNode here because other nodes doesn't support values and times simultaneously
for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); ++columnIndex) {
@@ -302,6 +303,11 @@ public static void transferTablet(OpcUaServer server, Tablet tablet) throws UaEx
// Message --> Value
switch (dataType) {
+ case BOOLEAN:
+ eventNode.setMessage(
+ LocalizedText.english(
+ Boolean.toString(((boolean[]) tablet.values[columnIndex])[rowIndex])));
+ break;
case INT32:
eventNode.setMessage(
LocalizedText.english(
@@ -346,6 +352,8 @@ public static void transferTablet(OpcUaServer server, Tablet tablet) throws UaEx
private static NodeId convertToOpcDataType(TSDataType type) {
switch (type) {
+ case BOOLEAN:
+ return Identifiers.Boolean;
case INT32:
return Identifiers.Int32;
case INT64:
@@ -363,7 +371,7 @@ private static NodeId convertToOpcDataType(TSDataType type) {
}
}
- private IoTDBOpcUaServerUtils() {
+ private OpcUaServerUtils() {
// Utility Class
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index c8b226a545340..687865d8e392d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -26,7 +26,7 @@
import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBAirGapConnector;
import org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
-import org.apache.iotdb.db.pipe.connector.protocol.opcua.IoTDBOpcUaConnector;
+import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
import org.apache.iotdb.db.pipe.connector.protocol.websocket.WebSocketConnector;
@@ -82,7 +82,7 @@ public synchronized String register(
pipeConnector = new IoTDBAirGapConnector();
} else if (connectorKey.equals(
BuiltinPipePlugin.IOTDB_OPC_UA_CONNECTOR.getPipePluginName())) {
- pipeConnector = new IoTDBOpcUaConnector();
+ pipeConnector = new OpcUaConnector();
} else if (connectorKey.equals(BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName())) {
pipeConnector = new WebSocketConnector();
} else {
From 61ad164e2a0af2c90beb63dd5ee5ebaae06497b8 Mon Sep 17 00:00:00 2001
From: Caideyipi <1336190078@qq.com>
Date: Mon, 4 Sep 2023 17:22:21 +0800
Subject: [PATCH 07/15] Update OpcUaServerUtils.java
---
.../db/pipe/connector/protocol/opcua/OpcUaServerUtils.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerUtils.java
index d181b9401fad2..49093c426d1ca 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerUtils.java
@@ -21,6 +21,7 @@
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.record.Tablet;
@@ -287,7 +288,9 @@ public static void transferTablet(OpcUaServer server, Tablet tablet) throws UaEx
// Source name --> Sensor path, like root.test.d_0.s_0
eventNode.setSourceName(
- tablet.deviceId + "." + tablet.getSchemas().get(columnIndex).getMeasurementId());
+ tablet.deviceId
+ + TsFileConstant.PATH_SEPARATOR
+ + tablet.getSchemas().get(columnIndex).getMeasurementId());
// Source node --> Sensor type, like double
eventNode.setSourceNode(convertToOpcDataType(dataType));
From 210e12242978fe72a94821541fd92a5884594019 Mon Sep 17 00:00:00 2001
From: Caideyipi <1336190078@qq.com>
Date: Mon, 4 Sep 2023 18:03:22 +0800
Subject: [PATCH 08/15] refactor
---
.../subtask/connector/PipeConnectorSubtaskManager.java | 3 +--
.../commons/pipe/plugin/builtin/BuiltinPipePlugin.java | 4 ++--
.../{IoTDBOpcUaConnector.java => OpcUaConnector.java} | 10 +++++-----
3 files changed, 8 insertions(+), 9 deletions(-)
rename iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/{IoTDBOpcUaConnector.java => OpcUaConnector.java} (74%)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 687865d8e392d..cb901477b7619 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -80,8 +80,7 @@ public synchronized String register(
} else if (connectorKey.equals(
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
pipeConnector = new IoTDBAirGapConnector();
- } else if (connectorKey.equals(
- BuiltinPipePlugin.IOTDB_OPC_UA_CONNECTOR.getPipePluginName())) {
+ } else if (connectorKey.equals(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName())) {
pipeConnector = new OpcUaConnector();
} else if (connectorKey.equals(BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName())) {
pipeConnector = new WebSocketConnector();
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index b59b6d9d63245..9218e29df24dc 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -22,10 +22,10 @@
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBAirGapConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBLegacyPipeConnector;
-import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBOpcUaConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector;
+import org.apache.iotdb.commons.pipe.plugin.builtin.connector.OpcUaConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebSocketConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor;
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
@@ -46,7 +46,7 @@ public enum BuiltinPipePlugin {
IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeConnector.class),
IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapConnector.class),
WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class),
- IOTDB_OPC_UA_CONNECTOR("iotdb-opc-ua-connector", IoTDBOpcUaConnector.class);
+ OPC_UA_CONNECTOR("opc-ua-connector", OpcUaConnector.class),
;
private final String pipePluginName;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBOpcUaConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/OpcUaConnector.java
similarity index 74%
rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBOpcUaConnector.java
rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/OpcUaConnector.java
index e5636a23a4c0c..287a2dcb116b1 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBOpcUaConnector.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/OpcUaConnector.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
/**
- * This class is a placeholder and should not be initialized. It represents the IoTDB OPC UA
- * connector. There is a real implementation in the server module but cannot be imported here. The
- * pipe agent in the server module will replace this class with the real implementation when
- * initializing the IoTDB OPC UA connector.
+ * This class is a placeholder and should not be initialized. It represents the OPC UA connector.
+ * There is a real implementation in the server module but cannot be imported here. The pipe agent
+ * in the server module will replace this class with the real implementation when initializing the
+ * OPC UA connector.
*/
-public class IoTDBOpcUaConnector extends PlaceholderConnector {}
+public class OpcUaConnector extends PlaceholderConnector {}
From 1e97cc9d4fe054d9187d2b0c9fce3d39b681cea7 Mon Sep 17 00:00:00 2001
From: Caideyipi <1336190078@qq.com>
Date: Tue, 5 Sep 2023 18:14:53 +0800
Subject: [PATCH 09/15] Update IoTDBThriftSyncConnector.java
---
.../protocol/thrift/sync/IoTDBThriftSyncConnector.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index b5561b4b9a690..ad86dfba7615d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -226,7 +226,7 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception
return;
}
- if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePattern()) {
+ if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
transfer(event);
}
From fd9998914b3db9d22f13db2bac1da1df05ae77d4 Mon Sep 17 00:00:00 2001
From: Caideyipi <1336190078@qq.com>
Date: Wed, 6 Sep 2023 20:14:03 +0800
Subject: [PATCH 10/15] Revert changes
---
.../dml/insertion/TabletInsertionEvent.java | 22 -------------
.../protocol/airgap/IoTDBAirGapConnector.java | 7 ++++-
.../legacy/IoTDBLegacyPipeConnector.java | 31 ++++++++++++++++---
.../protocol/opcua/OpcUaConnector.java | 8 ++++-
.../async/IoTDBThriftAsyncConnector.java | 7 ++++-
.../thrift/sync/IoTDBThriftSyncConnector.java | 7 ++++-
.../PipeInsertNodeTabletInsertionEvent.java | 13 ++++----
.../tablet/PipeRawTabletInsertionEvent.java | 17 +++++-----
8 files changed, 65 insertions(+), 47 deletions(-)
diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index 24a6992131671..56244574214ca 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -44,26 +44,4 @@ public interface TabletInsertionEvent extends Event {
* results collected by the RowCollector
*/
Iterable processTablet(BiConsumer consumer);
-
- /**
- * Convert the data contained to tablet.
- *
- * @return the result tablet
- */
- Tablet convertToTablet();
-
- /**
- * Parse the event to resolve its pattern. The parsing process is done in
- * PipeRawTabletInsertionEvent.
- *
- * @return the converted PipeRawTabletInsertionEvent
- */
- TabletInsertionEvent parseEventWithPattern();
-
- /**
- * Return whether the tablet is aligned.
- *
- * @return true if the tablet is aligned
- */
- boolean isAligned();
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index 7d5ab558392e0..0637cda4a4fea 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -187,7 +187,12 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
}
if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) {
- transfer((tabletInsertionEvent).parseEventWithPattern());
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ transfer(
+ ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
+ } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
+ transfer(((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
+ }
return;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 76e0fd8cde278..c054931f64cbc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -41,6 +41,8 @@
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
@@ -167,11 +169,10 @@ public void heartbeat() throws Exception {
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent
|| tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
- final Tablet tablet = tabletInsertionEvent.convertToTablet();
- if (tabletInsertionEvent.isAligned()) {
- sessionPool.insertAlignedTablet(tablet);
- } else {
- sessionPool.insertTablet(tablet);
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
+ } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+ doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
}
} else {
throw new NotImplementedException(
@@ -180,6 +181,26 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
}
}
+ private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeInsertionEvent)
+ throws IoTDBConnectionException, StatementExecutionException {
+ final Tablet tablet = pipeInsertNodeInsertionEvent.convertToTablet();
+ if (pipeInsertNodeInsertionEvent.isAligned()) {
+ sessionPool.insertAlignedTablet(tablet);
+ } else {
+ sessionPool.insertTablet(tablet);
+ }
+ }
+
+ private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent)
+ throws PipeException, IoTDBConnectionException, StatementExecutionException {
+ final Tablet tablet = pipeTabletInsertionEvent.convertToTablet();
+ if (pipeTabletInsertionEvent.isAligned()) {
+ sessionPool.insertAlignedTablet(tablet);
+ } else {
+ sessionPool.insertTablet(tablet);
+ }
+ }
+
@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index 51a9f21706522..1d3b4ec8fe787 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -102,7 +102,13 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
return;
}
- OpcUaServerUtils.transferTablet(server, tabletInsertionEvent.convertToTablet());
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ OpcUaServerUtils.transferTablet(
+ server, ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).convertToTablet());
+ } else {
+ OpcUaServerUtils.transferTablet(
+ server, ((PipeRawTabletInsertionEvent) tabletInsertionEvent).convertToTablet());
+ }
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index bceafc480be49..cb2c68ab6806c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -151,7 +151,12 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
}
if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) {
- transfer((tabletInsertionEvent).parseEventWithPattern());
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ transfer(
+ ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
+ } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
+ transfer(((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
+ }
return;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index ad86dfba7615d..dfecd2170b354 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -186,7 +186,12 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
}
if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) {
- transfer((tabletInsertionEvent).parseEventWithPattern());
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ transfer(
+ ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
+ } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
+ transfer(((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
+ }
return;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 847d82cda336d..9dd86e47826d0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -166,7 +166,12 @@ public Iterable processTablet(BiConsumer processTablet(BiConsumer
Date: Wed, 6 Sep 2023 20:18:24 +0800
Subject: [PATCH 11/15] remove refactor 2
---
.../legacy/IoTDBLegacyPipeConnector.java | 54 +++++++++----------
.../PipeInsertNodeTabletInsertionEvent.java | 2 +
2 files changed, 28 insertions(+), 28 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index c054931f64cbc..b9de368bc9b8a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -165,24 +165,39 @@ public void heartbeat() throws Exception {
// do nothing
}
+ @Override
+ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
+ if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+ throw new NotImplementedException(
+ "IoTDBLegacyPipeConnector only support PipeTsFileInsertionEvent.");
+ }
+
+ try {
+ doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
+ } catch (TException e) {
+ throw new PipeConnectionException(
+ String.format(
+ "Network error when transfer tsFile insertion event: %s.", tsFileInsertionEvent),
+ e);
+ }
+ }
+
+
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent
- || tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
- } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
- doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
- }
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
+ } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+ doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
} else {
throw new NotImplementedException(
- "IoTDBLegacyPipeConnector only support "
- + "PipeInsertNodeInsertionEvent and PipeTabletInsertionEvent.");
+ "IoTDBLegacyPipeConnector only support "
+ + "PipeInsertNodeInsertionEvent and PipeTabletInsertionEvent.");
}
}
private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeInsertionEvent)
- throws IoTDBConnectionException, StatementExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
final Tablet tablet = pipeInsertNodeInsertionEvent.convertToTablet();
if (pipeInsertNodeInsertionEvent.isAligned()) {
sessionPool.insertAlignedTablet(tablet);
@@ -192,7 +207,7 @@ private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeInserti
}
private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent)
- throws PipeException, IoTDBConnectionException, StatementExecutionException {
+ throws PipeException, IoTDBConnectionException, StatementExecutionException {
final Tablet tablet = pipeTabletInsertionEvent.convertToTablet();
if (pipeTabletInsertionEvent.isAligned()) {
sessionPool.insertAlignedTablet(tablet);
@@ -201,23 +216,6 @@ private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent)
}
}
- @Override
- public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
- if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
- throw new NotImplementedException(
- "IoTDBLegacyPipeConnector only support PipeTsFileInsertionEvent.");
- }
-
- try {
- doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
- } catch (TException e) {
- throw new PipeConnectionException(
- String.format(
- "Network error when transfer tsFile insertion event: %s.", tsFileInsertionEvent),
- e);
- }
- }
-
@Override
public void transfer(Event event) throws Exception {
if (!(event instanceof PipeHeartbeatEvent)) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 9dd86e47826d0..18aa4d721ecea 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -184,6 +184,8 @@ public Tablet convertToTablet() {
}
}
+ /////////////////////////// parsePattern ///////////////////////////
+
public TabletInsertionEvent parseEventWithPattern() {
return new PipeRawTabletInsertionEvent(convertToTablet(), isAligned, pipeTaskMeta, this, true);
}
From b7e395a698f372cd4a36b2b8595ade0455af542e Mon Sep 17 00:00:00 2001
From: Caideyipi <1336190078@qq.com>
Date: Wed, 6 Sep 2023 20:23:02 +0800
Subject: [PATCH 12/15] Update IoTDBLegacyPipeConnector.java
---
.../legacy/IoTDBLegacyPipeConnector.java | 37 +++++++++----------
1 file changed, 18 insertions(+), 19 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index b9de368bc9b8a..2fa4276289781 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -165,6 +165,19 @@ public void heartbeat() throws Exception {
// do nothing
}
+ @Override
+ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
+ } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+ doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
+ } else {
+ throw new NotImplementedException(
+ "IoTDBLegacyPipeConnector only support "
+ + "PipeInsertNodeInsertionEvent and PipeTabletInsertionEvent.");
+ }
+ }
+
@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
@@ -182,22 +195,15 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception
}
}
-
@Override
- public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
- } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
- doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
- } else {
- throw new NotImplementedException(
- "IoTDBLegacyPipeConnector only support "
- + "PipeInsertNodeInsertionEvent and PipeTabletInsertionEvent.");
+ public void transfer(Event event) throws Exception {
+ if (!(event instanceof PipeHeartbeatEvent)) {
+ LOGGER.warn("IoTDBLegacyPipeConnector does not support transfer generic event: {}.", event);
}
}
private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeInsertionEvent)
- throws IoTDBConnectionException, StatementExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
final Tablet tablet = pipeInsertNodeInsertionEvent.convertToTablet();
if (pipeInsertNodeInsertionEvent.isAligned()) {
sessionPool.insertAlignedTablet(tablet);
@@ -207,7 +213,7 @@ private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeInserti
}
private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent)
- throws PipeException, IoTDBConnectionException, StatementExecutionException {
+ throws PipeException, IoTDBConnectionException, StatementExecutionException {
final Tablet tablet = pipeTabletInsertionEvent.convertToTablet();
if (pipeTabletInsertionEvent.isAligned()) {
sessionPool.insertAlignedTablet(tablet);
@@ -216,13 +222,6 @@ private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent)
}
}
- @Override
- public void transfer(Event event) throws Exception {
- if (!(event instanceof PipeHeartbeatEvent)) {
- LOGGER.warn("IoTDBLegacyPipeConnector does not support transfer generic event: {}.", event);
- }
- }
-
private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
throws PipeException, TException, InterruptedException, IOException {
pipeTsFileInsertionEvent.waitForTsFileClose();
From 15b2c0d4dc16c91c93a0be8360d15f1d4c2256d6 Mon Sep 17 00:00:00 2001
From: Caideyipi <1336190078@qq.com>
Date: Wed, 6 Sep 2023 20:50:27 +0800
Subject: [PATCH 13/15] refactor
---
iotdb-core/datanode/pom.xml | 2 +-
.../constant/PipeConnectorConstant.java | 8 +-
.../protocol/opcua/OpcUaConnector.java | 151 +++++++++++++--
...reLoader.java => OpcUaKeyStoreLoader.java} | 4 +-
...rverUtils.java => OpcUaServerBuilder.java} | 181 ++++--------------
pom.xml | 2 +
6 files changed, 191 insertions(+), 157 deletions(-)
rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/{IoTDBKeyStoreLoader.java => OpcUaKeyStoreLoader.java} (97%)
rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/{OpcUaServerUtils.java => OpcUaServerBuilder.java} (65%)
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 41c4b01cba6e8..ab11511469ab3 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -253,7 +253,7 @@
org.eclipse.milo
sdk-server
- 0.6.10
+ ${milo.version}
commons-cli
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 2f09bf84c2462..5f09012c00a5c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -54,11 +54,11 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_WEBSOCKET_PORT_KEY = "connector.websocket.port";
public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080;
- public static final String CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_KEY = "connector.tcp.port";
- public static final int CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE = 12686;
+ public static final String CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY = "connector.opcua.tcp.port";
+ public static final int CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE = 12686;
- public static final String CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_KEY = "connector.https.port";
- public static final int CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE = 8443;
+ public static final String CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY = "connector.opcua.https.port";
+ public static final int CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE = 8443;
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index 1d3b4ec8fe787..66685cd435080 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.connector.protocol.opcua;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -27,19 +28,32 @@
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
+import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.UaException;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ByteString;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
-import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_KEY;
-import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
-import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_KEY;
+import java.util.UUID;
+
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
/**
* Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data are converted into
@@ -52,6 +66,8 @@ public class OpcUaConnector implements PipeConnector {
private OpcUaServer server;
+ private int eventId = 0;
+
@Override
public void validate(PipeParameterValidator validator) throws Exception {
// All the parameters are optional
@@ -62,12 +78,10 @@ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfigurati
throws Exception {
int tcpBindPort =
parameters.getIntOrDefault(
- CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_KEY,
- CONNECTOR_IOTDB_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE);
+ CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE);
int httpsBindPort =
parameters.getIntOrDefault(
- CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_KEY,
- CONNECTOR_IOTDB_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE);
+ CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY, CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE);
String user =
parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
@@ -75,7 +89,13 @@ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfigurati
parameters.getStringOrDefault(
CONNECTOR_IOTDB_PASSWORD_KEY, CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
- server = OpcUaServerUtils.getOpcUaServer(tcpBindPort, httpsBindPort, user, password);
+ server =
+ new OpcUaServerBuilder()
+ .setTcpBindPort(tcpBindPort)
+ .setHttpsBindPort(httpsBindPort)
+ .setUser(user)
+ .setPassword(password)
+ .build();
server.startup();
}
@@ -103,14 +123,123 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
}
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- OpcUaServerUtils.transferTablet(
+ transferTablet(
server, ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).convertToTablet());
} else {
- OpcUaServerUtils.transferTablet(
+ transferTablet(
server, ((PipeRawTabletInsertionEvent) tabletInsertionEvent).convertToTablet());
}
}
+ /**
+ * Transfer tablet into eventNodes and post it on the eventBus, so that they will be heard at the
+ * subscribers. Notice that an eventNode is reused to reduce object creation costs.
+ *
+ * @param server OpcUaServer
+ * @param tablet the tablet to send
+ * @throws UaException if failed to create event
+ */
+ private void transferTablet(OpcUaServer server, Tablet tablet) throws UaException {
+ // There is no nameSpace, so that nameSpaceIndex is always 0
+ int pseudoNameSpaceIndex = 0;
+ BaseEventTypeNode eventNode =
+ server
+ .getEventFactory()
+ .createEvent(
+ new NodeId(pseudoNameSpaceIndex, UUID.randomUUID()), Identifiers.BaseEventType);
+ // Use eventNode here because other nodes doesn't support values and times simultaneously
+ for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); ++columnIndex) {
+
+ TSDataType dataType = tablet.getSchemas().get(columnIndex).getType();
+
+ // Source name --> Sensor path, like root.test.d_0.s_0
+ eventNode.setSourceName(
+ tablet.deviceId
+ + TsFileConstant.PATH_SEPARATOR
+ + tablet.getSchemas().get(columnIndex).getMeasurementId());
+
+ // Source node --> Sensor type, like double
+ eventNode.setSourceNode(convertToOpcDataType(dataType));
+
+ for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) {
+ // Filter null value
+ if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) {
+ continue;
+ }
+
+ // time --> timeStamp
+ eventNode.setTime(new DateTime(tablet.timestamps[rowIndex]));
+
+ // Message --> Value
+ switch (dataType) {
+ case BOOLEAN:
+ eventNode.setMessage(
+ LocalizedText.english(
+ Boolean.toString(((boolean[]) tablet.values[columnIndex])[rowIndex])));
+ break;
+ case INT32:
+ eventNode.setMessage(
+ LocalizedText.english(
+ Integer.toString(((int[]) tablet.values[columnIndex])[rowIndex])));
+ break;
+ case INT64:
+ eventNode.setMessage(
+ LocalizedText.english(
+ Long.toString(((long[]) tablet.values[columnIndex])[rowIndex])));
+ break;
+ case FLOAT:
+ eventNode.setMessage(
+ LocalizedText.english(
+ Float.toString(((float[]) tablet.values[columnIndex])[rowIndex])));
+ break;
+ case DOUBLE:
+ eventNode.setMessage(
+ LocalizedText.english(
+ Double.toString(((double[]) tablet.values[columnIndex])[rowIndex])));
+ break;
+ case TEXT:
+ eventNode.setMessage(
+ LocalizedText.english(
+ ((Binary[]) tablet.values[columnIndex])[rowIndex].toString()));
+ break;
+ case VECTOR:
+ case UNKNOWN:
+ default:
+ throw new PipeRuntimeNonCriticalException(
+ "Unsupported data type: " + tablet.getSchemas().get(columnIndex).getType());
+ }
+
+ // Reset the eventId each time
+ eventNode.setEventId(ByteString.of(Integer.toString(eventId++).getBytes()));
+
+ // Send the event
+ server.getEventBus().post(eventNode);
+ }
+ }
+ eventNode.delete();
+ }
+
+ private NodeId convertToOpcDataType(TSDataType type) {
+ switch (type) {
+ case BOOLEAN:
+ return Identifiers.Boolean;
+ case INT32:
+ return Identifiers.Int32;
+ case INT64:
+ return Identifiers.Int64;
+ case FLOAT:
+ return Identifiers.Float;
+ case DOUBLE:
+ return Identifiers.Double;
+ case TEXT:
+ return Identifiers.String;
+ case VECTOR:
+ case UNKNOWN:
+ default:
+ throw new PipeRuntimeNonCriticalException("Unsupported data type: " + type);
+ }
+ }
+
@Override
public void transfer(Event event) throws Exception {
// Do nothing when receive heartbeat or other events
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBKeyStoreLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaKeyStoreLoader.java
similarity index 97%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBKeyStoreLoader.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaKeyStoreLoader.java
index a1d0b949285d3..3f87e46acb812 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/IoTDBKeyStoreLoader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaKeyStoreLoader.java
@@ -40,7 +40,7 @@
import java.util.UUID;
import java.util.regex.Pattern;
-class IoTDBKeyStoreLoader {
+class OpcUaKeyStoreLoader {
private static final Pattern IP_ADDR_PATTERN =
Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
@@ -52,7 +52,7 @@ class IoTDBKeyStoreLoader {
private X509Certificate serverCertificate;
private KeyPair serverKeyPair;
- IoTDBKeyStoreLoader load(Path baseDir, char[] password) throws Exception {
+ OpcUaKeyStoreLoader load(Path baseDir, char[] password) throws Exception {
KeyStore keyStore = KeyStore.getInstance("PKCS12");
File serverKeyStore = baseDir.resolve("iotdb-server.pfx").toFile();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
similarity index 65%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerUtils.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
index 49093c426d1ca..7d27a9585b2ed 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
@@ -19,34 +19,25 @@
package org.apache.iotdb.db.pipe.connector.protocol.opcua;
-import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.record.Tablet;
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
import org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig;
import org.eclipse.milo.opcua.sdk.server.identity.CompositeValidator;
import org.eclipse.milo.opcua.sdk.server.identity.UsernameIdentityValidator;
import org.eclipse.milo.opcua.sdk.server.identity.X509IdentityValidator;
-import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode;
import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.ServerTypeNode;
import org.eclipse.milo.opcua.sdk.server.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.StatusCodes;
-import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.UaRuntimeException;
import org.eclipse.milo.opcua.stack.core.security.DefaultCertificateManager;
import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.transport.TransportProfile;
-import org.eclipse.milo.opcua.stack.core.types.builtin.ByteString;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
import org.eclipse.milo.opcua.stack.core.types.structured.BuildInfo;
import org.eclipse.milo.opcua.stack.core.util.CertificateUtil;
@@ -66,7 +57,6 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
-import java.util.UUID;
import static com.google.common.collect.Lists.newArrayList;
import static org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig.USER_TOKEN_POLICY_ANONYMOUS;
@@ -78,15 +68,41 @@
* OPC UA Server builder for IoTDB to send data. The coding style referenced ExampleServer.java in
* Eclipse Milo.
*/
-public class OpcUaServerUtils {
+public class OpcUaServerBuilder {
- private static final String WILD_CARD_ADDRESS = "0.0.0.0";
- private static int eventId = 0;
+ private final String wildCardAddress = "0.0.0.0";
+ private final Logger logger = LoggerFactory.getLogger(OpcUaServerBuilder.class);
- public static OpcUaServer getOpcUaServer(
- int tcpBindPort, int httpsBindPort, String user, String password) throws Exception {
- final Logger logger = LoggerFactory.getLogger(OpcUaServerUtils.class);
+ private int tcpBindPort;
+ private int httpsBindPort;
+ private String user;
+ private String password;
+ public OpcUaServerBuilder() {
+ // Empty constructor
+ }
+
+ public OpcUaServerBuilder setTcpBindPort(int tcpBindPort) {
+ this.tcpBindPort = tcpBindPort;
+ return this;
+ }
+
+ public OpcUaServerBuilder setHttpsBindPort(int httpsBindPort) {
+ this.httpsBindPort = httpsBindPort;
+ return this;
+ }
+
+ public OpcUaServerBuilder setUser(String user) {
+ this.user = user;
+ return this;
+ }
+
+ public OpcUaServerBuilder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public OpcUaServer build() throws Exception {
Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "iotdb", "security");
Files.createDirectories(securityTempDir);
if (!Files.exists(securityTempDir)) {
@@ -95,13 +111,13 @@ public static OpcUaServer getOpcUaServer(
File pkiDir = securityTempDir.resolve("pki").toFile();
- LoggerFactory.getLogger(OpcUaServerUtils.class)
+ LoggerFactory.getLogger(OpcUaServerBuilder.class)
.info("Security dir: {}", securityTempDir.toAbsolutePath());
- LoggerFactory.getLogger(OpcUaServerUtils.class)
+ LoggerFactory.getLogger(OpcUaServerBuilder.class)
.info("Security pki dir: {}", pkiDir.getAbsolutePath());
- IoTDBKeyStoreLoader loader =
- new IoTDBKeyStoreLoader().load(securityTempDir, password.toCharArray());
+ OpcUaKeyStoreLoader loader =
+ new OpcUaKeyStoreLoader().load(securityTempDir, password.toCharArray());
DefaultCertificateManager certificateManager =
new DefaultCertificateManager(loader.getServerKeyPair(), loader.getServerCertificate());
@@ -116,7 +132,7 @@ public static OpcUaServer getOpcUaServer(
SelfSignedHttpsCertificateBuilder httpsCertificateBuilder =
new SelfSignedHttpsCertificateBuilder(httpsKeyPair);
httpsCertificateBuilder.setCommonName(HostnameUtil.getHostname());
- HostnameUtil.getHostnames(WILD_CARD_ADDRESS).forEach(httpsCertificateBuilder::addDnsName);
+ HostnameUtil.getHostnames(wildCardAddress).forEach(httpsCertificateBuilder::addDnsName);
X509Certificate httpsCertificate = httpsCertificateBuilder.build();
DefaultServerCertificateValidator certificateValidator =
@@ -185,16 +201,16 @@ public static OpcUaServer getOpcUaServer(
return server;
}
- private static Set createEndpointConfigurations(
+ private Set createEndpointConfigurations(
X509Certificate certificate, int tcpBindPort, int httpsBindPort) {
Set endpointConfigurations = new LinkedHashSet<>();
List bindAddresses = newArrayList();
- bindAddresses.add(WILD_CARD_ADDRESS);
+ bindAddresses.add(wildCardAddress);
Set hostnames = new LinkedHashSet<>();
hostnames.add(HostnameUtil.getHostname());
- hostnames.addAll(HostnameUtil.getHostnames(WILD_CARD_ADDRESS));
+ hostnames.addAll(HostnameUtil.getHostnames(wildCardAddress));
for (String bindAddress : bindAddresses) {
for (String hostname : hostnames) {
@@ -249,7 +265,7 @@ private static Set createEndpointConfigurations(
return endpointConfigurations;
}
- private static EndpointConfiguration buildTcpEndpoint(
+ private EndpointConfiguration buildTcpEndpoint(
EndpointConfiguration.Builder base, int tcpBindPort) {
return base.copy()
.setTransportProfile(TransportProfile.TCP_UASC_UABINARY)
@@ -257,124 +273,11 @@ private static EndpointConfiguration buildTcpEndpoint(
.build();
}
- private static EndpointConfiguration buildHttpsEndpoint(
+ private EndpointConfiguration buildHttpsEndpoint(
EndpointConfiguration.Builder base, int httpsBindPort) {
return base.copy()
.setTransportProfile(TransportProfile.HTTPS_UABINARY)
.setBindPort(httpsBindPort)
.build();
}
-
- /**
- * Transfer tablet into eventNodes and post it on the eventBus, so that they will be heard at the
- * subscribers. Notice that an eventNode is reused to reduce object creation costs.
- *
- * @param server OpcUaServer
- * @param tablet the tablet to send
- * @throws UaException if failed to create event
- */
- public static void transferTablet(OpcUaServer server, Tablet tablet) throws UaException {
- // There is no nameSpace, so that nameSpaceIndex is always 0
- int pseudoNameSpaceIndex = 0;
- BaseEventTypeNode eventNode =
- server
- .getEventFactory()
- .createEvent(
- new NodeId(pseudoNameSpaceIndex, UUID.randomUUID()), Identifiers.BaseEventType);
- // Use eventNode here because other nodes doesn't support values and times simultaneously
- for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); ++columnIndex) {
-
- TSDataType dataType = tablet.getSchemas().get(columnIndex).getType();
-
- // Source name --> Sensor path, like root.test.d_0.s_0
- eventNode.setSourceName(
- tablet.deviceId
- + TsFileConstant.PATH_SEPARATOR
- + tablet.getSchemas().get(columnIndex).getMeasurementId());
-
- // Source node --> Sensor type, like double
- eventNode.setSourceNode(convertToOpcDataType(dataType));
-
- for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) {
- // Filter null value
- if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) {
- continue;
- }
-
- // time --> timeStamp
- eventNode.setTime(new DateTime(tablet.timestamps[rowIndex]));
-
- // Message --> Value
- switch (dataType) {
- case BOOLEAN:
- eventNode.setMessage(
- LocalizedText.english(
- Boolean.toString(((boolean[]) tablet.values[columnIndex])[rowIndex])));
- break;
- case INT32:
- eventNode.setMessage(
- LocalizedText.english(
- Integer.toString(((int[]) tablet.values[columnIndex])[rowIndex])));
- break;
- case INT64:
- eventNode.setMessage(
- LocalizedText.english(
- Long.toString(((long[]) tablet.values[columnIndex])[rowIndex])));
- break;
- case FLOAT:
- eventNode.setMessage(
- LocalizedText.english(
- Float.toString(((float[]) tablet.values[columnIndex])[rowIndex])));
- break;
- case DOUBLE:
- eventNode.setMessage(
- LocalizedText.english(
- Double.toString(((double[]) tablet.values[columnIndex])[rowIndex])));
- break;
- case TEXT:
- eventNode.setMessage(
- LocalizedText.english(
- ((Binary[]) tablet.values[columnIndex])[rowIndex].toString()));
- break;
- case VECTOR:
- case UNKNOWN:
- default:
- throw new PipeRuntimeNonCriticalException(
- "Unsupported data type: " + tablet.getSchemas().get(columnIndex).getType());
- }
-
- // Reset the eventId each time
- eventNode.setEventId(ByteString.of(Integer.toString(eventId++).getBytes()));
-
- // Send the event
- server.getEventBus().post(eventNode);
- }
- }
- eventNode.delete();
- }
-
- private static NodeId convertToOpcDataType(TSDataType type) {
- switch (type) {
- case BOOLEAN:
- return Identifiers.Boolean;
- case INT32:
- return Identifiers.Int32;
- case INT64:
- return Identifiers.Int64;
- case FLOAT:
- return Identifiers.Float;
- case DOUBLE:
- return Identifiers.Double;
- case TEXT:
- return Identifiers.String;
- case VECTOR:
- case UNKNOWN:
- default:
- throw new PipeRuntimeNonCriticalException("Unsupported data type: " + type);
- }
- }
-
- private OpcUaServerUtils() {
- // Utility Class
- }
}
diff --git a/pom.xml b/pom.xml
index 7479cc7ade419..2fac1d4bef58e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -211,6 +211,8 @@
1.5.3
+
+ 0.6.10