From 2e114d9633a9b0fa2174be92c88da63d914b0877 Mon Sep 17 00:00:00 2001 From: xuanronaldo Date: Mon, 28 Aug 2023 10:48:51 +0800 Subject: [PATCH 1/5] add name tag in pom.xml --- .../flink-sql-iotdb-connector/pom.xml | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 iotdb-connector/flink-sql-iotdb-connector/pom.xml diff --git a/iotdb-connector/flink-sql-iotdb-connector/pom.xml b/iotdb-connector/flink-sql-iotdb-connector/pom.xml new file mode 100644 index 0000000000000..0a23c56989536 --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/pom.xml @@ -0,0 +1,68 @@ + + + + 4.0.0 + + org.apache.iotdb + iotdb-parent + 1.3.0-SNAPSHOT + ../../pom.xml + + flink-sql-iotdb-connector + IoTDB: Connector: Apache Flink SQL Connector + 1.3.0-SNAPSHOT + + UTF-8 + 1.17.0 + + + + + org.apache.iotdb + iotdb-session + ${project.version} + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + + + org.apache.flink + flink-table-common + ${flink.version} + + + org.java-websocket + Java-WebSocket + ${websocket.version} + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + From c1f97ade892baa81d969076c1c74a74f6c13ae6b Mon Sep 17 00:00:00 2001 From: xuanronaldo Date: Fri, 25 Aug 2023 10:32:45 +0200 Subject: [PATCH 2/5] update code. --- .../sql/client/IoTDBWebsocketClient.java | 72 +++++ .../iotdb/flink/sql/common/Options.java | 55 ++++ .../apache/iotdb/flink/sql/common/Utils.java | 145 ++++++++++ .../exception/IllegalIoTDBPathException.java | 25 ++ .../sql/exception/IllegalOptionException.java | 25 ++ .../sql/exception/IllegalSchemaException.java | 25 ++ .../exception/IllegalUrlPathException.java | 25 ++ .../UnsupportedDataTypeException.java | 25 ++ .../sql/factory/IoTDBDynamicTableFactory.java | 184 +++++++++++++ .../function/IoTDBBoundedScanFunction.java | 165 ++++++++++++ .../sql/function/IoTDBCDCSourceFunction.java | 255 ++++++++++++++++++ .../sql/function/IoTDBLookupFunction.java | 151 +++++++++++ .../flink/sql/function/IoTDBSinkFunction.java | 125 +++++++++ .../sql/provider/IoTDBDynamicTableSink.java | 64 +++++ .../sql/provider/IoTDBDynamicTableSource.java | 76 ++++++ .../flink/sql/wrapper/SchemaWrapper.java | 46 ++++ .../flink/sql/wrapper/TabletWrapper.java | 46 ++++ .../org.apache.flink.table.factories.Factory | 20 ++ iotdb-connector/pom.xml | 50 ++++ .../constant/PipeConnectorConstant.java | 3 + .../websocket/WebSocketConnectorServer.java | 219 +++++++++++++++ .../websocket/WebsocketConnector.java | 139 ++++++++++ .../PipeConnectorSubtaskManager.java | 3 + .../plugin/builtin/BuiltinPipePlugin.java | 2 + .../builtin/connector/WebsocketConnector.java | 28 ++ 25 files changed, 1973 insertions(+) create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebsocketClient.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Utils.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalIoTDBPathException.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalOptionException.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalSchemaException.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalUrlPathException.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/UnsupportedDataTypeException.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBLookupFunction.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBSinkFunction.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSink.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSource.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/SchemaWrapper.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/TabletWrapper.java create mode 100644 iotdb-connector/flink-sql-iotdb-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 iotdb-connector/pom.xml create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebsocketConnector.java diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebsocketClient.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebsocketClient.java new file mode 100644 index 0000000000000..72df12b49544f --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebsocketClient.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.client; + +import org.apache.iotdb.flink.sql.function.IoTDBCDCSourceFunction; +import org.apache.iotdb.flink.sql.wrapper.TabletWrapper; +import org.apache.iotdb.tsfile.write.record.Tablet; + +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.nio.ByteBuffer; + +public class IoTDBWebsocketClient extends WebSocketClient { + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBWebsocketClient.class); + private final IoTDBCDCSourceFunction function; + + public IoTDBWebsocketClient(URI uri, IoTDBCDCSourceFunction function) { + super(uri); + this.function = function; + } + + @Override + public void onOpen(ServerHandshake serverHandshake) { + String log = + String.format("The connection with %s:%d has been created!", uri.getHost(), uri.getPort()); + LOGGER.info(log); + } + + @Override + public void onMessage(String s) {} + + @Override + public void onMessage(ByteBuffer bytes) { + super.onMessage(bytes); + String log = String.format("Received a message from %s:%d", uri.getHost(), uri.getPort()); + LOGGER.info(log); + long commitId = bytes.getLong(); + Tablet tablet = Tablet.deserialize(bytes); + function.addTabletWrapper(new TabletWrapper(commitId, this, tablet)); + } + + @Override + public void onClose(int i, String s, boolean b) { + LOGGER.info("The connection has been closed!"); + } + + @Override + public void onError(Exception e) { + String log = String.format("Got an error: %s", e.getMessage()); + LOGGER.error(log); + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java new file mode 100644 index 0000000000000..0fe0196b00c81 --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.common; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class Options { + public static final ConfigOption NODE_URLS = + ConfigOptions.key("nodeUrls").stringType().defaultValue("127.0.0.1:6667"); + public static final ConfigOption USER = + ConfigOptions.key("user").stringType().defaultValue("root"); + public static final ConfigOption PASSWORD = + ConfigOptions.key("password").stringType().defaultValue("root"); + public static final ConfigOption DEVICE = + ConfigOptions.key("device").stringType().noDefaultValue(); + public static final ConfigOption ALIGNED = + ConfigOptions.key("aligned").booleanType().defaultValue(false); + public static final ConfigOption MODE = + ConfigOptions.key("mode").enumType(Mode.class).defaultValue(Mode.BOUNDED); + public static final ConfigOption CDC_PORT = + ConfigOptions.key("cdc.port").intType().defaultValue(8080); + + public static final ConfigOption CDC_TASK_NAME = + ConfigOptions.key("cdc.task.name").stringType().noDefaultValue(); + public static final ConfigOption LOOKUP_CACHE_MAX_ROWS = + ConfigOptions.key("lookup.cache.max-rows").intType().defaultValue(-1); + public static final ConfigOption LOOKUP_CACHE_TTL_SEC = + ConfigOptions.key("lookup.cache.ttl-sec").intType().defaultValue(-1); + public static final ConfigOption SCAN_BOUNDED_LOWER_BOUND = + ConfigOptions.key("scan.bounded.lower-bound").longType().defaultValue(-1L); + public static final ConfigOption SCAN_BOUNDED_UPPER_BOUND = + ConfigOptions.key("scan.bounded.upper-bound").longType().defaultValue(-1L); + + public enum Mode { + CDC, + BOUNDED; + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Utils.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Utils.java new file mode 100644 index 0000000000000..8fb1e618cd2b4 --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Utils.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.common; + +import org.apache.iotdb.flink.sql.exception.UnsupportedDataTypeException; +import org.apache.iotdb.tsfile.exception.NullFieldException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.utils.Binary; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.net.Socket; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +public class Utils { + private Utils() {} + + public static Object getValue(Field value, String dataType) { + try { + if ("INT32".equals(dataType)) { + return value.getIntV(); + } else if ("INT64".equals(dataType)) { + return value.getLongV(); + } else if ("FLOAT".equals(dataType)) { + return value.getFloatV(); + } else if ("DOUBLE".equals(dataType)) { + return value.getDoubleV(); + } else if ("BOOLEAN".equals(dataType)) { + return value.getBoolV(); + } else if ("TEXT".equals(dataType)) { + return StringData.fromString(value.getStringValue()); + } else { + String exception = String.format("IoTDB don't support the data type: %s", dataType); + throw new UnsupportedDataTypeException(exception); + } + } catch (NullFieldException e) { + return null; + } + } + + public static Object getValue(Field value, DataType dataType) { + if (dataType.equals(DataTypes.INT())) { + return value.getIntV(); + } else if (dataType.equals(DataTypes.BIGINT())) { + return value.getLongV(); + } else if (dataType.equals(DataTypes.FLOAT())) { + return value.getFloatV(); + } else if (dataType.equals(DataTypes.DOUBLE())) { + return value.getDoubleV(); + } else if (dataType.equals(DataTypes.BOOLEAN())) { + return value.getBoolV(); + } else if (dataType.equals(DataTypes.STRING())) { + return StringData.fromString(value.getStringValue()); + } else { + throw new UnsupportedDataTypeException("IoTDB don't support the data type: " + dataType); + } + } + + public static Object getValue(RowData value, DataType dataType, int index) { + try { + if (dataType.equals(DataTypes.INT())) { + return value.getInt(index); + } else if (dataType.equals(DataTypes.BIGINT())) { + return value.getLong(index); + } else if (dataType.equals(DataTypes.FLOAT())) { + return value.getFloat(index); + } else if (dataType.equals(DataTypes.DOUBLE())) { + return value.getDouble(index); + } else if (dataType.equals(DataTypes.BOOLEAN())) { + return value.getBoolean(index); + } else if (dataType.equals(DataTypes.STRING())) { + return value.getString(index).toString(); + } else { + throw new UnsupportedDataTypeException("IoTDB don't support the data type: " + dataType); + } + } catch (NullPointerException e) { + return null; + } + } + + public static boolean isNumeric(String s) { + Pattern pattern = Pattern.compile("\\d*"); + return pattern.matcher(s).matches(); + } + + public static RowData convert(RowRecord rowRecord, List columnTypes) { + ArrayList values = new ArrayList<>(); + values.add(rowRecord.getTimestamp()); + List fields = rowRecord.getFields(); + for (int i = 0; i < fields.size(); i++) { + values.add(getValue(fields.get(i), columnTypes.get(i + 1))); + } + GenericRowData rowData = GenericRowData.of(values.toArray()); + return rowData; + } + + public static List object2List(Object obj, TSDataType dataType) { + ArrayList objects = new ArrayList<>(); + int length = Array.getLength(obj); + for (int i = 0; i < length; i++) { + if (dataType == TSDataType.TEXT) { + objects.add(StringData.fromString(((Binary) Array.get(obj, i)).getStringValue())); + } else { + objects.add(Array.get(obj, i)); + } + } + return objects; + } + + public static boolean isURIAvailable(URI uri) { + try { + new Socket(uri.getHost(), uri.getPort()).close(); + return true; + } catch (IOException e) { + return false; + } + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalIoTDBPathException.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalIoTDBPathException.java new file mode 100644 index 0000000000000..d698a39c550b2 --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalIoTDBPathException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.exception; + +public class IllegalIoTDBPathException extends RuntimeException { + public IllegalIoTDBPathException(String s) { + super(s); + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalOptionException.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalOptionException.java new file mode 100644 index 0000000000000..e1e8808b263cf --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalOptionException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.exception; + +public class IllegalOptionException extends RuntimeException { + public IllegalOptionException(String s) { + super(s); + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalSchemaException.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalSchemaException.java new file mode 100644 index 0000000000000..5f620b30170d2 --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalSchemaException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.exception; + +public class IllegalSchemaException extends RuntimeException { + public IllegalSchemaException(String s) { + super(s); + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalUrlPathException.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalUrlPathException.java new file mode 100644 index 0000000000000..7885ce5f0b51b --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/IllegalUrlPathException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.exception; + +public class IllegalUrlPathException extends RuntimeException { + public IllegalUrlPathException(String s) { + super(s); + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/UnsupportedDataTypeException.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/UnsupportedDataTypeException.java new file mode 100644 index 0000000000000..9b7a6e3c517d3 --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/exception/UnsupportedDataTypeException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.exception; + +public class UnsupportedDataTypeException extends RuntimeException { + public UnsupportedDataTypeException(String s) { + super(s); + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java new file mode 100644 index 0000000000000..9cf3e21608a4c --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.factory; + +import org.apache.iotdb.flink.sql.common.Options; +import org.apache.iotdb.flink.sql.common.Utils; +import org.apache.iotdb.flink.sql.exception.IllegalIoTDBPathException; +import org.apache.iotdb.flink.sql.exception.IllegalOptionException; +import org.apache.iotdb.flink.sql.exception.IllegalSchemaException; +import org.apache.iotdb.flink.sql.exception.IllegalUrlPathException; +import org.apache.iotdb.flink.sql.exception.UnsupportedDataTypeException; +import org.apache.iotdb.flink.sql.provider.IoTDBDynamicTableSink; +import org.apache.iotdb.flink.sql.provider.IoTDBDynamicTableSource; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class IoTDBDynamicTableFactory + implements DynamicTableSourceFactory, DynamicTableSinkFactory { + private static final HashSet supportedDataTypes = new HashSet<>(); + + static { + supportedDataTypes.add(DataTypes.INT()); + supportedDataTypes.add(DataTypes.BIGINT()); + supportedDataTypes.add(DataTypes.FLOAT()); + supportedDataTypes.add(DataTypes.DOUBLE()); + supportedDataTypes.add(DataTypes.BOOLEAN()); + supportedDataTypes.add(DataTypes.STRING()); + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + + ReadableConfig options = helper.getOptions(); + TableSchema schema = context.getCatalogTable().getSchema(); + + validate(options, schema); + + return new IoTDBDynamicTableSource(options, schema); + } + + @Override + public String factoryIdentifier() { + return "IoTDB"; + } + + @Override + public Set> requiredOptions() { + HashSet> requiredOptions = new HashSet<>(); + requiredOptions.add(Options.DEVICE); + + return requiredOptions; + } + + @Override + public Set> optionalOptions() { + HashSet> optionalOptions = new HashSet<>(); + optionalOptions.add(Options.NODE_URLS); + optionalOptions.add(Options.USER); + optionalOptions.add(Options.PASSWORD); + optionalOptions.add(Options.LOOKUP_CACHE_MAX_ROWS); + optionalOptions.add(Options.LOOKUP_CACHE_TTL_SEC); + optionalOptions.add(Options.ALIGNED); + optionalOptions.add(Options.MODE); + optionalOptions.add(Options.CDC_TASK_NAME); + optionalOptions.add(Options.CDC_PORT); + + return optionalOptions; + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + + ReadableConfig options = helper.getOptions(); + TableSchema schema = context.getCatalogTable().getSchema(); + + validate(options, schema); + + return new IoTDBDynamicTableSink(options, schema); + } + + protected void validate(ReadableConfig options, TableSchema schema) { + String[] fieldNames = schema.getFieldNames(); + DataType[] fieldDataTypes = schema.getFieldDataTypes(); + + if (!"Time_".equals(fieldNames[0]) || !fieldDataTypes[0].equals(DataTypes.BIGINT())) { + throw new IllegalSchemaException( + "The first field's name must be `Time_`, and its data type must be BIGINT."); + } + for (String fieldName : fieldNames) { + if (fieldName.contains("\\.")) { + throw new IllegalIoTDBPathException( + String.format( + "The field name `%s` contains character `.`, it's not allowed in IoTDB.", + fieldName)); + } + if (Utils.isNumeric(fieldName)) { + throw new IllegalIoTDBPathException( + String.format( + "The field name `%s` is a purely number, it's not allowed in IoTDB.", fieldName)); + } + } + + for (DataType fieldDataType : fieldDataTypes) { + if (!supportedDataTypes.contains(fieldDataType)) { + throw new UnsupportedDataTypeException( + "IoTDB don't support the data type: " + fieldDataType); + } + } + + String device = options.get(Options.DEVICE); + if (!device.startsWith("root.")) { + throw new IllegalIoTDBPathException("The option `device` must starts with 'root.'."); + } + for (String s : device.split("\\.")) { + if (Utils.isNumeric(s)) { + throw new IllegalIoTDBPathException( + String.format( + "The option `device` contains a purely number path: `%s`, it's not allowed in IoTDB.", + s)); + } + } + + List nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(",")); + for (String nodeUrl : nodeUrls) { + String[] split = nodeUrl.split(":"); + if (split.length != 2) { + throw new IllegalUrlPathException("Every node's URL must be in the format of `host:port`."); + } + if (!Utils.isNumeric(split[1]) + && Integer.valueOf(split[1]) > 65535 + && Integer.valueOf(split[1]) < 1) { + throw new IllegalUrlPathException( + "The port must be a number, and it could not be greater than 65535 or less than 1."); + } + } + + Long lowerBound = options.get(Options.SCAN_BOUNDED_LOWER_BOUND); + Long upperBound = options.get(Options.SCAN_BOUNDED_UPPER_BOUND); + if (lowerBound > 0L && upperBound > 0L && upperBound < lowerBound) { + throw new IllegalOptionException( + "The value of option `scan.bounded.lower-bound` could not be greater than the value of option `scan.bounded.upper-bound`."); + } + + if (options.get(Options.MODE) == Options.Mode.CDC + && options.get(Options.CDC_TASK_NAME) == null) { + throw new IllegalOptionException( + "The option `cdc.task.name` is required when option `mode` equals `CDC`"); + } + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java new file mode 100644 index 0000000000000..2d7170376eb7b --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.function; + +import org.apache.iotdb.flink.sql.common.Options; +import org.apache.iotdb.flink.sql.common.Utils; +import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.read.common.RowRecord; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class IoTDBBoundedScanFunction extends RichInputFormat { + private final ReadableConfig options; + private final List> tableSchema; + private final String device; + private final long lowerBound; + private final long upperBound; + private final List measurements; + private Session session; + private SessionDataSet dataSet; + private List columnTypes; + + public IoTDBBoundedScanFunction(ReadableConfig options, SchemaWrapper schemaWrapper) { + this.options = options; + tableSchema = schemaWrapper.getSchema(); + device = options.get(Options.DEVICE); + lowerBound = options.get(Options.SCAN_BOUNDED_LOWER_BOUND); + upperBound = options.get(Options.SCAN_BOUNDED_UPPER_BOUND); + measurements = + tableSchema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList()); + } + + @Override + public void configure(Configuration configuration) { + // fo nothing + } + + @Override + public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException { + return baseStatistics; + } + + @Override + public InputSplit[] createInputSplits(int i) throws IOException { + return new GenericInputSplit[] {new GenericInputSplit(1, 1)}; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + @Override + public void openInputFormat() throws IOException { + session = + new Session.Builder() + .nodeUrls(Arrays.asList(options.get(Options.NODE_URLS).split(","))) + .username(options.get(Options.USER)) + .password(options.get(Options.PASSWORD)) + .build(); + + try { + session.open(false); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void open(InputSplit inputSplit) throws IOException { + String sql; + if (lowerBound < 0L && upperBound < 0L) { + sql = String.format("SELECT %s FROM %s", String.join(",", measurements), device); + } else if (lowerBound < 0L && upperBound > 0L) { + sql = + String.format( + "SELECT %s FROM %s WHERE TIME <= %d", + String.join(",", measurements), device, upperBound); + } else if (lowerBound > 0L && upperBound < 0L) { + sql = + String.format( + "SELECT %s FROM %s WHERE TIME >= %d", + String.join(",", measurements), device, lowerBound); + } else { + sql = + String.format( + "SELECT %s FROM %s WHERE TIME >= %d AND TIME <= %d", + String.join(",", measurements), device, lowerBound, upperBound); + } + try { + dataSet = session.executeQueryStatement(sql); + columnTypes = dataSet.getColumnTypes(); + } catch (StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean reachedEnd() throws IOException { + try { + return !dataSet.hasNext(); + } catch (StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + + @Override + public RowData nextRecord(RowData rowData) throws IOException { + try { + RowRecord rowRecord = dataSet.next(); + return Utils.convert(rowRecord, columnTypes); + } catch (StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + try { + if (dataSet != null) { + dataSet.close(); + } + if (session != null) { + session.close(); + } + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java new file mode 100644 index 0000000000000..92013f9488bfb --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.function; + +import org.apache.iotdb.flink.sql.client.IoTDBWebsocketClient; +import org.apache.iotdb.flink.sql.common.Options; +import org.apache.iotdb.flink.sql.common.Utils; +import org.apache.iotdb.flink.sql.exception.IllegalOptionException; +import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper; +import org.apache.iotdb.flink.sql.wrapper.TabletWrapper; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.utils.BitMap; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.types.DataType; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.enums.ReadyState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +public class IoTDBCDCSourceFunction extends RichSourceFunction { + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBCDCSourceFunction.class); + private final List socketClients = new ArrayList<>(); + private final int cdcPort; + private final List nodeUrls; + private final String taskName; + private final String device; + private final String user; + private final String password; + private final List measurements; + private final BlockingQueue tabletWrappers; + private transient ExecutorService consumeExecutor; + + public IoTDBCDCSourceFunction(ReadableConfig options, SchemaWrapper schemaWrapper) { + List> tableSchema = schemaWrapper.getSchema(); + cdcPort = options.get(Options.CDC_PORT); + nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(",")); + taskName = options.get(Options.CDC_TASK_NAME); + device = options.get(Options.DEVICE); + user = options.get(Options.USER); + password = options.get(Options.PASSWORD); + measurements = + tableSchema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList()); + + tabletWrappers = new ArrayBlockingQueue<>(nodeUrls.size()); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + Session session = + new Session.Builder().username(user).password(password).nodeUrls(nodeUrls).build(); + session.open(false); + boolean hasCreatedPipeTask = + session.executeQueryStatement(String.format("show pipe flink_cdc_%s", taskName)).hasNext(); + if (!hasCreatedPipeTask) { + for (String nodeUrl : nodeUrls) { + URI uri = new URI(String.format("ws://%s:%d", nodeUrl.split(":")[0], cdcPort)); + if (Utils.isURIAvailable(uri)) { + throw new IllegalOptionException( + String.format( + "The port `%d` has been bound. Please use another one by option `cdc.port`.", + cdcPort)); + } + } + String createPipeCommand = + String.format( + "CREATE PIPE flink_cdc_%s\n" + + "WITH EXTRACTOR (\n" + + "'extractor' = 'iotdb-extractor',\n" + + "'extractor.pattern' = '%s',\n" + + "'extractor.history.enable' = 'true',\n" + + "'extractor.realtime.enable' = 'true',\n" + + "'extractor.realtime.mode' = 'hybrid',\n" + + ") WITH CONNECTOR (\n" + + "'connector' = 'websocket-connector',\n" + + "'connector.websocket.port' = '%d'" + + ")", + taskName, device, cdcPort); + session.executeNonQueryStatement(createPipeCommand); + } + + String status = + session + .executeQueryStatement(String.format("show pipe flink_cdc_%s", taskName)) + .next() + .getFields() + .get(2) + .getStringValue(); + if ("STOPPED".equals(status)) { + session.executeNonQueryStatement(String.format("start pipe flink_cdc_%s", taskName)); + } + + session.close(); + + consumeExecutor = Executors.newFixedThreadPool(1); + for (String nodeUrl : nodeUrls) { + URI uri = new URI(String.format("ws://%s:%s", nodeUrl.split(":")[0], cdcPort)); + socketClients.add(initAndGet(uri)); + } + } + + @Override + public void run(SourceContext ctx) throws InterruptedException { + consumeExecutor.submit(new ConsumeRunnable(ctx)); + consumeExecutor.shutdown(); + while (true) { + for (IoTDBWebsocketClient socketClient : socketClients) { + if (socketClient.getReadyState().equals(ReadyState.CLOSED)) { + while (!Utils.isURIAvailable(socketClient.getURI())) { + String log = + String.format( + "The URI %s:%d is not available now, sleep 5 seconds.", + socketClient.getURI().getHost(), socketClient.getURI().getPort()); + LOGGER.warn(log); + Thread.sleep(5000); + } + socketClient.reconnect(); + while (!socketClient.getReadyState().equals(ReadyState.OPEN)) { + Thread.sleep(1000); + } + socketClient.send("START"); + } else { + Thread.sleep(1000); + } + } + } + } + + @Override + public void cancel() { + socketClients.forEach(WebSocketClient::close); + } + + public void addTabletWrapper(TabletWrapper tabletWrapper) { + try { + this.tabletWrappers.put(tabletWrapper); + } catch (InterruptedException e) { + String host = tabletWrapper.getWebsocketClient().getRemoteSocketAddress().getHostName(); + int port = tabletWrapper.getWebsocketClient().getRemoteSocketAddress().getPort(); + String log = + String.format( + "The tablet from %s:%d can't be put into queue, because: %s", + host, port, e.getMessage()); + LOGGER.warn(log); + Thread.currentThread().interrupt(); + } + } + + private IoTDBWebsocketClient initAndGet(URI uri) throws InterruptedException { + while (!Utils.isURIAvailable(uri)) { + String log = + String.format( + "The URI %s:%d is not available now, sleep 5 seconds.", uri.getHost(), uri.getPort()); + LOGGER.warn(log); + Thread.sleep(5000); + } + IoTDBWebsocketClient client = new IoTDBWebsocketClient(uri, this); + client.connect(); + while (!client.getReadyState().equals(ReadyState.OPEN)) { + Thread.sleep(1000); + } + client.send("START"); + return client; + } + + public void collectTablet(Tablet tablet, SourceContext ctx) { + if (!device.equals(tablet.deviceId)) { + return; + } + List schemas = tablet.getSchemas(); + int rowSize = tablet.rowSize; + HashMap>> values = new HashMap<>(); + for (MeasurementSchema schema : schemas) { + String measurement = schema.getMeasurementId(); + values.put( + measurement, + new Pair<>( + tablet.bitMaps[schemas.indexOf(schema)], + Utils.object2List(tablet.values[schemas.indexOf(schema)], schema.getType()))); + } + for (int i = 0; i < rowSize; i++) { + ArrayList row = new ArrayList<>(); + row.add(tablet.timestamps[i]); + for (String measurement : measurements) { + if (values.get(measurement).getLeft() == null + || !values.get(measurement).getLeft().isMarked(i)) { + row.add(values.get(measurement).getRight().get(i)); + } else { + row.add(null); + } + } + RowData rowData = (RowData) GenericRowData.of(row.toArray()); + ctx.collect(rowData); + } + } + + private class ConsumeRunnable implements Runnable { + SourceContext context; + + public ConsumeRunnable(SourceContext context) { + this.context = context; + } + + @Override + public void run() { + while (true) { + try { + TabletWrapper tabletWrapper = tabletWrappers.take(); + collectTablet(tabletWrapper.getTablet(), context); + tabletWrapper + .getWebsocketClient() + .send(String.format("ACK:%d", tabletWrapper.getCommitId())); + } catch (InterruptedException e) { + LOGGER.warn("The tablet can't be taken from queue!"); + Thread.currentThread().interrupt(); + } + } + } + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBLookupFunction.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBLookupFunction.java new file mode 100644 index 0000000000000..9c26d2175b158 --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBLookupFunction.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.function; + +import org.apache.iotdb.flink.sql.common.Options; +import org.apache.iotdb.flink.sql.common.Utils; +import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.shaded.curator5.com.google.common.cache.Cache; +import org.apache.flink.shaded.curator5.com.google.common.cache.CacheBuilder; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.DataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class IoTDBLookupFunction extends TableFunction { + private final List> schema; + private final int cacheMaxRows; + private final int cacheTTLSec; + private final List nodeUrls; + private final String user; + private final String password; + private final String device; + private final List measurements; + private Session session; + + private transient Cache cache; + + public IoTDBLookupFunction(ReadableConfig options, SchemaWrapper schemaWrapper) { + this.schema = schemaWrapper.getSchema(); + + cacheMaxRows = options.get(Options.LOOKUP_CACHE_MAX_ROWS); + + cacheTTLSec = options.get(Options.LOOKUP_CACHE_TTL_SEC); + + nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(",")); + + user = options.get(Options.USER); + + password = options.get(Options.PASSWORD); + + device = options.get(Options.DEVICE); + + measurements = + schema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList()); + } + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + session = new Session.Builder().nodeUrls(nodeUrls).username(user).password(password).build(); + session.open(false); + + if (cacheMaxRows > 0 && cacheTTLSec > 0) { + cache = + CacheBuilder.newBuilder() + .expireAfterAccess(cacheTTLSec, TimeUnit.SECONDS) + .maximumSize(cacheMaxRows) + .build(); + } + } + + @Override + public void close() throws Exception { + if (cache != null) { + cache.invalidateAll(); + } + if (session != null) { + session.close(); + } + super.close(); + } + + public void eval(Object obj) throws IoTDBConnectionException, StatementExecutionException { + RowData lookupKey = GenericRowData.of(obj); + if (cache != null) { + RowData cacheRow = cache.getIfPresent(lookupKey); + if (cacheRow != null) { + collect(cacheRow); + return; + } + } + + long timestamp = lookupKey.getLong(0); + + String sql = + String.format( + "SELECT %s FROM %s WHERE TIME=%d", + StringUtils.join(measurements, ','), device, timestamp); + SessionDataSet dataSet = session.executeQueryStatement(sql); + List columnNames = dataSet.getColumnNames(); + columnNames.remove("Time"); + RowRecord rowRecord = dataSet.next(); + if (rowRecord == null) { + ArrayList values = new ArrayList<>(); + values.add(timestamp); + for (int i = 0; i < schema.size(); i++) { + values.add(null); + } + GenericRowData rowData = GenericRowData.of(values.toArray()); + collect(rowData); + return; + } + List fields = rowRecord.getFields(); + + ArrayList values = new ArrayList<>(); + values.add(timestamp); + for (Tuple2 filed : schema) { + values.add( + Utils.getValue(fields.get(columnNames.indexOf(device + '.' + filed.f0)), filed.f1)); + } + + GenericRowData rowData = GenericRowData.of(values.toArray()); + if (cache != null) { + cache.put(lookupKey, rowData); + } + collect(rowData); + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBSinkFunction.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBSinkFunction.java new file mode 100644 index 0000000000000..a78d925940576 --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBSinkFunction.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.function; + +import org.apache.iotdb.flink.sql.common.Options; +import org.apache.iotdb.flink.sql.common.Utils; +import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class IoTDBSinkFunction implements SinkFunction { + private final List> schema; + private final List nodeUrls; + private final String user; + private final String password; + private final String device; + private final boolean aligned; + private final List measurements; + private final List dataTypes; + private static final Map TYPE_MAP = new HashMap<>(); + + private static Session session; + + static { + TYPE_MAP.put(DataTypes.INT(), TSDataType.INT32); + TYPE_MAP.put(DataTypes.BIGINT(), TSDataType.INT64); + TYPE_MAP.put(DataTypes.FLOAT(), TSDataType.FLOAT); + TYPE_MAP.put(DataTypes.DOUBLE(), TSDataType.DOUBLE); + TYPE_MAP.put(DataTypes.BOOLEAN(), TSDataType.BOOLEAN); + TYPE_MAP.put(DataTypes.STRING(), TSDataType.TEXT); + } + + public IoTDBSinkFunction(ReadableConfig options, SchemaWrapper schemaWrapper) { + // get schema + this.schema = schemaWrapper.getSchema(); + // get options + nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(",")); + user = options.get(Options.USER); + password = options.get(Options.PASSWORD); + device = options.get(Options.DEVICE); + aligned = options.get(Options.ALIGNED); + // get measurements and data types from schema + measurements = + schema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList()); + dataTypes = schema.stream().map(field -> TYPE_MAP.get(field.f1)).collect(Collectors.toList()); + } + + @Override + public void invoke(RowData rowData, Context context) throws Exception { + // open the session if the session has not been opened + if (session == null) { + session = new Session.Builder().nodeUrls(nodeUrls).username(user).password(password).build(); + session.open(false); + } + // load data from RowData + if (rowData.getRowKind().equals(RowKind.INSERT) + || rowData.getRowKind().equals(RowKind.UPDATE_AFTER)) { + long timestamp = rowData.getLong(0); + ArrayList measurementsOfRow = new ArrayList<>(); + ArrayList dataTypesOfRow = new ArrayList<>(); + ArrayList values = new ArrayList<>(); + for (int i = 0; i < this.measurements.size(); i++) { + Object value = Utils.getValue(rowData, schema.get(i).f1, i + 1); + if (value == null) { + continue; + } + measurementsOfRow.add(this.measurements.get(i)); + dataTypesOfRow.add(this.dataTypes.get(i)); + values.add(value); + } + // insert data + if (aligned) { + session.insertAlignedRecord(device, timestamp, measurementsOfRow, dataTypesOfRow, values); + } else { + session.insertRecord(device, timestamp, measurementsOfRow, dataTypesOfRow, values); + } + } else if (rowData.getRowKind().equals(RowKind.DELETE)) { + ArrayList paths = new ArrayList<>(); + for (String measurement : measurements) { + paths.add(String.format("%s.%s", device, measurement)); + } + session.deleteData(paths, rowData.getLong(0)); + } else if (rowData.getRowKind().equals(RowKind.UPDATE_BEFORE)) { + // do nothing + } + } + + @Override + public void finish() throws Exception { + if (session != null) { + session.close(); + } + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSink.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSink.java new file mode 100644 index 0000000000000..1a0302980801c --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSink.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.provider; + +import org.apache.iotdb.flink.sql.function.IoTDBSinkFunction; +import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.types.RowKind; + +public class IoTDBDynamicTableSink implements DynamicTableSink { + private final ReadableConfig options; + private final TableSchema schema; + + public IoTDBDynamicTableSink(ReadableConfig options, TableSchema schema) { + this.options = options; + this.schema = schema; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.DELETE) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .build(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + return SinkFunctionProvider.of(new IoTDBSinkFunction(options, new SchemaWrapper(schema))); + } + + @Override + public DynamicTableSink copy() { + return new IoTDBDynamicTableSink(options, schema); + } + + @Override + public String asSummaryString() { + return "IoTDB Dynamic Table Sink"; + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSource.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSource.java new file mode 100644 index 0000000000000..535da831a6fe6 --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSource.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.provider; + +import org.apache.iotdb.flink.sql.common.Options; +import org.apache.iotdb.flink.sql.function.IoTDBBoundedScanFunction; +import org.apache.iotdb.flink.sql.function.IoTDBCDCSourceFunction; +import org.apache.iotdb.flink.sql.function.IoTDBLookupFunction; +import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.InputFormatProvider; +import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.TableFunctionProvider; + +public class IoTDBDynamicTableSource implements LookupTableSource, ScanTableSource { + private final ReadableConfig options; + private final TableSchema schema; + + public IoTDBDynamicTableSource(ReadableConfig options, TableSchema schema) { + this.options = options; + this.schema = schema; + } + + @Override + public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) { + return TableFunctionProvider.of(new IoTDBLookupFunction(options, new SchemaWrapper(schema))); + } + + @Override + public DynamicTableSource copy() { + return new IoTDBDynamicTableSource(options, schema); + } + + @Override + public String asSummaryString() { + return "IoTDB Dynamic Table Source"; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + if (options.get(Options.MODE) == Options.Mode.CDC) { + return SourceFunctionProvider.of( + new IoTDBCDCSourceFunction<>(options, new SchemaWrapper(schema)), false); + } else { + return InputFormatProvider.of( + new IoTDBBoundedScanFunction(options, new SchemaWrapper(schema))); + } + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/SchemaWrapper.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/SchemaWrapper.java new file mode 100644 index 0000000000000..4a50225981303 --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/SchemaWrapper.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.wrapper; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.types.DataType; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class SchemaWrapper implements Serializable { + private List> schema; + + public SchemaWrapper(TableSchema schema) { + this.schema = new ArrayList<>(); + + for (String fieldName : schema.getFieldNames()) { + if (fieldName == "Time_") { + continue; + } + this.schema.add(new Tuple2<>(fieldName, schema.getFieldDataType(fieldName).get())); + } + } + + public List> getSchema() { + return schema; + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/TabletWrapper.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/TabletWrapper.java new file mode 100644 index 0000000000000..fbf0eee10e04a --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/TabletWrapper.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.flink.sql.wrapper; + +import org.apache.iotdb.flink.sql.client.IoTDBWebsocketClient; +import org.apache.iotdb.tsfile.write.record.Tablet; + +public class TabletWrapper { + private long commitId; + private IoTDBWebsocketClient websocketClient; + private Tablet tablet; + + public TabletWrapper(long commitId, IoTDBWebsocketClient websocketClient, Tablet tablet) { + this.commitId = commitId; + this.websocketClient = websocketClient; + this.tablet = tablet; + } + + public long getCommitId() { + return commitId; + } + + public IoTDBWebsocketClient getWebsocketClient() { + return websocketClient; + } + + public Tablet getTablet() { + return tablet; + } +} diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/iotdb-connector/flink-sql-iotdb-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000000..4eee20a998647 --- /dev/null +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.iotdb.flink.sql.factory.IoTDBDynamicTableFactory \ No newline at end of file diff --git a/iotdb-connector/pom.xml b/iotdb-connector/pom.xml new file mode 100644 index 0000000000000..06e04d1e67429 --- /dev/null +++ b/iotdb-connector/pom.xml @@ -0,0 +1,50 @@ + + + + 4.0.0 + + org.apache.iotdb + iotdb-parent + 1.3.0-SNAPSHOT + + iotdb-connector + pom + IoTDB: Connector + + flink-iotdb-connector + flink-tsfile-connector + grafana-connector + hadoop + hive-connector + spark-iotdb-connector + spark-tsfile + zeppelin-interpreter + + + + with-grafana-plugin + + grafana-plugin + + + + diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java index 2f65a39f86cd0..3c870a9ba15d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java @@ -51,6 +51,9 @@ public class PipeConnectorConstant { public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY = "connector.version"; public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1"; + public static final String CONNECTOR_WEBSOCKET_PORT_KEY = "connector.websocket.port"; + public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080; + private PipeConnectorConstant() { throw new IllegalStateException("Utility class"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java new file mode 100644 index 0000000000000..5efc7a05295cc --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.pipe.connector.protocol.websocket; + +import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.tsfile.exception.NotImplementedException; +import org.apache.iotdb.tsfile.utils.Pair; + +import org.java_websocket.WebSocket; +import org.java_websocket.handshake.ClientHandshake; +import org.java_websocket.server.WebSocketServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.PriorityBlockingQueue; + +public class WebSocketConnectorServer extends WebSocketServer { + private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConnectorServer.class); + private final PriorityBlockingQueue> events = + new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left)); + private WebsocketConnector websocketConnector; + + private ConcurrentMap eventMap = new ConcurrentHashMap<>(); + + public WebSocketConnectorServer( + InetSocketAddress address, WebsocketConnector websocketConnector) { + super(address); + this.websocketConnector = websocketConnector; + } + + @Override + public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) { + String log = + String.format( + "The connection from client %s:%d has been opened!", + webSocket.getRemoteSocketAddress().getHostName(), + webSocket.getRemoteSocketAddress().getPort()); + LOGGER.info(log); + } + + @Override + public void onClose(WebSocket webSocket, int i, String s, boolean b) { + String log = + String.format( + "The client from %s:%d has been closed!", + webSocket.getRemoteSocketAddress().getAddress(), + webSocket.getRemoteSocketAddress().getPort()); + LOGGER.info(log); + } + + @Override + public void onMessage(WebSocket webSocket, String s) { + String log = + String.format( + "Received a message `%s` from %s:%d", + s, + webSocket.getRemoteSocketAddress().getHostName(), + webSocket.getRemoteSocketAddress().getPort()); + LOGGER.info(log); + if (s.startsWith("START")) { + handleStart(webSocket); + } else if (s.startsWith("ACK")) { + handleAck(webSocket, s); + } else if (s.startsWith("ERROR")) { + handleError(webSocket, s); + } + } + + @Override + public void onError(WebSocket webSocket, Exception e) { + String log = null; + if (webSocket.getRemoteSocketAddress() != null) { + log = + String.format( + "Got an error `%s` from %s:%d", + e.getMessage(), + webSocket.getLocalSocketAddress().getHostName(), + webSocket.getLocalSocketAddress().getPort()); + } else { + log = String.format("Got an error `%s` from client", e.getMessage()); + } + LOGGER.error(log); + } + + @Override + public void onStart() { + String log = + String.format( + "The websocket server %s:%d has been started!", + this.getAddress().getHostName(), this.getPort()); + LOGGER.error(log); + } + + public void addEvent(Pair event) { + if (events.size() >= 50) { + synchronized (events) { + while (events.size() >= 50) { + try { + events.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + events.put(event); + } + + private void handleStart(WebSocket webSocket) { + try { + ArrayList webSockets = new ArrayList<>(); + webSockets.add(webSocket); + Pair eventPair = events.take(); + synchronized (events) { + events.notifyAll(); + transfer(eventPair, webSockets); + } + } catch (InterruptedException e) { + String log = String.format("The event can't be taken, because: %s", e.getMessage()); + LOGGER.warn(log); + Thread.currentThread().interrupt(); + } + } + + private void handleAck(WebSocket webSocket, String s) { + long commitId = Long.parseLong(s.replace("ACK:", "")); + Event event = eventMap.remove(commitId); + websocketConnector.commit( + commitId, event instanceof EnrichedEvent ? (EnrichedEvent) event : null); + handleStart(webSocket); + } + + private void handleError(WebSocket webSocket, String s) { + long commitId = Long.parseLong(s.replace("ERROR:", "")); + String log = + String.format( + "The tablet of commitId: %d can't be parsed by client, it will be retried later.", + commitId); + LOGGER.warn(log); + events.put(new Pair<>(commitId, eventMap.remove(commitId))); + handleStart(webSocket); + } + + private void transfer(Pair eventPair, List webSockets) { + Long commitId = eventPair.getLeft(); + Event event = eventPair.getRight(); + try { + ByteBuffer tabletBuffer = null; + if (event instanceof PipeInsertNodeTabletInsertionEvent) { + tabletBuffer = ((PipeInsertNodeTabletInsertionEvent) event).convertToTablet().serialize(); + } else if (event instanceof PipeRawTabletInsertionEvent) { + tabletBuffer = ((PipeRawTabletInsertionEvent) event).convertToTablet().serialize(); + } else if (event instanceof PipeTsFileInsertionEvent) { + PipeTsFileInsertionEvent tsFileInsertionEvent = (PipeTsFileInsertionEvent) event; + tsFileInsertionEvent.waitForTsFileClose(); + Iterable subEvents = tsFileInsertionEvent.toTabletInsertionEvents(); + for (TabletInsertionEvent subEvent : subEvents) { + tabletBuffer = ((PipeRawTabletInsertionEvent) subEvent).convertToTablet().serialize(); + } + } else { + throw new NotImplementedException( + "IoTDBCDCConnector only support " + + "PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent."); + } + if (tabletBuffer == null) { + return; + } + ByteBuffer payload = ByteBuffer.allocate(Long.BYTES + tabletBuffer.limit()); + payload.putLong(commitId); + payload.put(tabletBuffer); + payload.flip(); + this.broadcast(payload, webSockets); + eventMap.put(eventPair.getLeft(), eventPair.getRight()); + String log = + String.format( + "Transferred a message to client %s:%d", + webSockets.get(0).getRemoteSocketAddress().getAddress().getHostName(), + webSockets.get(0).getRemoteSocketAddress().getPort()); + LOGGER.info(log); + } catch (InterruptedException e) { + events.put(eventPair); + Thread.currentThread().interrupt(); + throw new PipeException(e.getMessage()); + } catch (Exception e) { + events.put(eventPair); + e.printStackTrace(); + throw new PipeException(e.getMessage()); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java new file mode 100644 index 0000000000000..f9a01b3fc20e7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.pipe.connector.protocol.websocket; + +import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant; +import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.tsfile.utils.Pair; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.net.InetSocketAddress; +import java.util.Comparator; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.concurrent.atomic.AtomicLong; + +public class WebsocketConnector implements PipeConnector { + private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketConnector.class); + private WebSocketConnectorServer server; + private int port; + + public final AtomicLong commitIdGenerator = new AtomicLong(0); + private final AtomicLong lastCommitId = new AtomicLong(0); + private final PriorityQueue> commitQueue = + new PriorityQueue<>(Comparator.comparing(o -> o.left)); + + @Override + public void validate(PipeParameterValidator validator) throws Exception {} + + @Override + public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) + throws Exception { + port = + parameters.getIntOrDefault( + PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY, + PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE); + } + + @Override + public void handshake() throws Exception { + if (server == null) { + server = new WebSocketConnectorServer(new InetSocketAddress(port), this); + server.start(); + } + } + + @Override + public void heartbeat() throws Exception {} + + @Override + public void transfer(TabletInsertionEvent tabletInsertionEvent) { + if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) + && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { + LOGGER.warn( + "WebsocketConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. " + + "Current event: {}.", + tabletInsertionEvent); + return; + } + long commitId = commitIdGenerator.incrementAndGet(); + ((EnrichedEvent) tabletInsertionEvent) + .increaseReferenceCount(WebsocketConnector.class.getName()); + server.addEvent(new Pair<>(commitId, tabletInsertionEvent)); + } + + @Override + public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { + if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) { + LOGGER.warn( + "WebsocketConnector only support PipeTsFileInsertionEvent. Current event: {}.", + tsFileInsertionEvent); + return; + } + long commitId = commitIdGenerator.incrementAndGet(); + ((EnrichedEvent) tsFileInsertionEvent) + .increaseReferenceCount(WebsocketConnector.class.getName()); + server.addEvent(new Pair<>(commitId, tsFileInsertionEvent)); + } + + @Override + public void transfer(Event event) throws Exception {} + + @Override + public void close() throws Exception { + server.stop(); + } + + public synchronized void commit(long requestCommitId, @Nullable EnrichedEvent enrichedEvent) { + commitQueue.offer( + new Pair<>( + requestCommitId, + () -> + Optional.ofNullable(enrichedEvent) + .ifPresent( + event -> + event.decreaseReferenceCount(WebsocketConnector.class.getName())))); + + while (!commitQueue.isEmpty()) { + final Pair committer = commitQueue.peek(); + if (lastCommitId.get() + 1 != committer.left) { + break; + } + + committer.right.run(); + lastCommitId.incrementAndGet(); + + commitQueue.poll(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java index fb06743874ffb..bc956afe58ca7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector; import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector; import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector; +import org.apache.iotdb.db.pipe.connector.protocol.websocket.WebsocketConnector; import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor; import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue; import org.apache.iotdb.pipe.api.PipeConnector; @@ -78,6 +79,8 @@ public synchronized String register( } else if (connectorKey.equals( BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) { pipeConnector = new IoTDBAirGapConnector(); + } else if (connectorKey.equals(BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName())) { + pipeConnector = new WebsocketConnector(); } else { pipeConnector = PipeAgent.plugin().reflectConnector(pipeConnectorParameters); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index ef973b1cc306d..c9eebfc1f44fb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector; +import org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebsocketConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor; import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor; @@ -43,6 +44,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncConnector.class), IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeConnector.class), IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapConnector.class), + WEBSOCKET_CONNECTOR("websocket-connector", WebsocketConnector.class), ; private final String pipePluginName; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebsocketConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebsocketConnector.java new file mode 100644 index 0000000000000..82302d55e2666 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebsocketConnector.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.plugin.builtin.connector; + +/** + * This class is a placeholder and should not be initialized. It represents the IoTDB Thrift + * connector. There is a real implementation in the server module but cannot be imported here. The + * pipe agent in the server module will replace this class with the real implementation when + * initializing the IoTDB Thrift connector. + */ +public class WebsocketConnector extends PlaceholderConnector {} From 03d384540d79258a53abc10b5e23f15611effce7 Mon Sep 17 00:00:00 2001 From: xuanronaldo Date: Fri, 25 Aug 2023 11:24:39 +0200 Subject: [PATCH 3/5] refactor --- ...tClient.java => IoTDBWebSocketClient.java} | 21 +++++++++-------- .../apache/iotdb/flink/sql/common/Utils.java | 7 +++--- .../sql/factory/IoTDBDynamicTableFactory.java | 23 +++++++++++-------- .../function/IoTDBBoundedScanFunction.java | 18 +++++++-------- .../sql/function/IoTDBCDCSourceFunction.java | 21 +++++++++-------- .../flink/sql/function/IoTDBSinkFunction.java | 10 ++++---- .../sql/provider/IoTDBDynamicTableSource.java | 2 +- .../flink/sql/wrapper/SchemaWrapper.java | 4 ++-- .../flink/sql/wrapper/TabletWrapper.java | 12 +++++----- .../websocket/WebSocketConnectorServer.java | 23 ++++++++----------- .../plugin/builtin/BuiltinPipePlugin.java | 4 ++-- ...Connector.java => WebSocketConnector.java} | 4 ++-- 12 files changed, 76 insertions(+), 73 deletions(-) rename iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/{IoTDBWebsocketClient.java => IoTDBWebSocketClient.java} (80%) rename iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/{WebsocketConnector.java => WebSocketConnector.java} (92%) diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebsocketClient.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java similarity index 80% rename from iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebsocketClient.java rename to iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java index 72df12b49544f..eb57749bc3935 100644 --- a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebsocketClient.java +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java @@ -30,11 +30,11 @@ import java.net.URI; import java.nio.ByteBuffer; -public class IoTDBWebsocketClient extends WebSocketClient { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBWebsocketClient.class); +public class IoTDBWebSocketClient extends WebSocketClient { + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBWebSocketClient.class); private final IoTDBCDCSourceFunction function; - public IoTDBWebsocketClient(URI uri, IoTDBCDCSourceFunction function) { + public IoTDBWebSocketClient(URI uri, IoTDBCDCSourceFunction function) { super(uri); this.function = function; } @@ -42,18 +42,18 @@ public IoTDBWebsocketClient(URI uri, IoTDBCDCSourceFunction function) { @Override public void onOpen(ServerHandshake serverHandshake) { String log = - String.format("The connection with %s:%d has been created!", uri.getHost(), uri.getPort()); + String.format("The connection with %s:%d has been created.", uri.getHost(), uri.getPort()); LOGGER.info(log); } @Override - public void onMessage(String s) {} + public void onMessage(String s) { + // Do nothing + } @Override public void onMessage(ByteBuffer bytes) { super.onMessage(bytes); - String log = String.format("Received a message from %s:%d", uri.getHost(), uri.getPort()); - LOGGER.info(log); long commitId = bytes.getLong(); Tablet tablet = Tablet.deserialize(bytes); function.addTabletWrapper(new TabletWrapper(commitId, this, tablet)); @@ -61,12 +61,15 @@ public void onMessage(ByteBuffer bytes) { @Override public void onClose(int i, String s, boolean b) { - LOGGER.info("The connection has been closed!"); + LOGGER.info("The connection to {}:{} has been closed.", uri.getHost(), uri.getPort()); } @Override public void onError(Exception e) { - String log = String.format("Got an error: %s", e.getMessage()); + String log = + String.format( + "An error occurred when connecting to %s:%s: %s.", + uri.getHost(), uri.getPort(), e.getMessage()); LOGGER.error(log); } } diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Utils.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Utils.java index 8fb1e618cd2b4..fb08e40c34521 100644 --- a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Utils.java +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Utils.java @@ -57,7 +57,7 @@ public static Object getValue(Field value, String dataType) { } else if ("TEXT".equals(dataType)) { return StringData.fromString(value.getStringValue()); } else { - String exception = String.format("IoTDB don't support the data type: %s", dataType); + String exception = String.format("IoTDB doesn't support the data type: %s", dataType); throw new UnsupportedDataTypeException(exception); } } catch (NullFieldException e) { @@ -79,7 +79,7 @@ public static Object getValue(Field value, DataType dataType) { } else if (dataType.equals(DataTypes.STRING())) { return StringData.fromString(value.getStringValue()); } else { - throw new UnsupportedDataTypeException("IoTDB don't support the data type: " + dataType); + throw new UnsupportedDataTypeException("IoTDB doesn't support the data type: " + dataType); } } @@ -117,8 +117,7 @@ public static RowData convert(RowRecord rowRecord, List columnTypes) { for (int i = 0; i < fields.size(); i++) { values.add(getValue(fields.get(i), columnTypes.get(i + 1))); } - GenericRowData rowData = GenericRowData.of(values.toArray()); - return rowData; + return GenericRowData.of(values.toArray()); } public static List object2List(Object obj, TSDataType dataType) { diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java index 9cf3e21608a4c..8c65d0377ca94 100644 --- a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java @@ -39,9 +39,7 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; -import java.util.Arrays; import java.util.HashSet; -import java.util.List; import java.util.Set; public class IoTDBDynamicTableFactory @@ -130,14 +128,14 @@ protected void validate(ReadableConfig options, TableSchema schema) { if (Utils.isNumeric(fieldName)) { throw new IllegalIoTDBPathException( String.format( - "The field name `%s` is a purely number, it's not allowed in IoTDB.", fieldName)); + "The field name `%s` is a pure number, which is not allowed in IoTDB.", fieldName)); } } for (DataType fieldDataType : fieldDataTypes) { if (!supportedDataTypes.contains(fieldDataType)) { throw new UnsupportedDataTypeException( - "IoTDB don't support the data type: " + fieldDataType); + "IoTDB doesn't support the data type: " + fieldDataType); } } @@ -154,17 +152,24 @@ protected void validate(ReadableConfig options, TableSchema schema) { } } - List nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(",")); + String[] nodeUrls = options.get(Options.NODE_URLS).split(","); for (String nodeUrl : nodeUrls) { String[] split = nodeUrl.split(":"); if (split.length != 2) { throw new IllegalUrlPathException("Every node's URL must be in the format of `host:port`."); } - if (!Utils.isNumeric(split[1]) - && Integer.valueOf(split[1]) > 65535 - && Integer.valueOf(split[1]) < 1) { + if (!Utils.isNumeric(split[1])) { throw new IllegalUrlPathException( - "The port must be a number, and it could not be greater than 65535 or less than 1."); + String.format("The port in url %s must be a number.", nodeUrl)); + } else { + int port = Integer.parseInt(split[1]); + if (port > 65535) { + throw new IllegalUrlPathException( + String.format("The port in url %s must be smaller than 65536", nodeUrl)); + } else if (port < 1) { + throw new IllegalUrlPathException( + String.format("The port in url %s must be greater than 0.", nodeUrl)); + } } } diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java index 2d7170376eb7b..e8c3f7cca2159 100644 --- a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java @@ -39,14 +39,12 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; -import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; public class IoTDBBoundedScanFunction extends RichInputFormat { private final ReadableConfig options; - private final List> tableSchema; private final String device; private final long lowerBound; private final long upperBound; @@ -57,7 +55,7 @@ public class IoTDBBoundedScanFunction extends RichInputFormat> tableSchema = schemaWrapper.getSchema(); device = options.get(Options.DEVICE); lowerBound = options.get(Options.SCAN_BOUNDED_LOWER_BOUND); upperBound = options.get(Options.SCAN_BOUNDED_UPPER_BOUND); @@ -71,12 +69,12 @@ public void configure(Configuration configuration) { } @Override - public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException { + public BaseStatistics getStatistics(BaseStatistics baseStatistics) { return baseStatistics; } @Override - public InputSplit[] createInputSplits(int i) throws IOException { + public InputSplit[] createInputSplits(int i) { return new GenericInputSplit[] {new GenericInputSplit(1, 1)}; } @@ -86,7 +84,7 @@ public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { } @Override - public void openInputFormat() throws IOException { + public void openInputFormat() { session = new Session.Builder() .nodeUrls(Arrays.asList(options.get(Options.NODE_URLS).split(","))) @@ -102,7 +100,7 @@ public void openInputFormat() throws IOException { } @Override - public void open(InputSplit inputSplit) throws IOException { + public void open(InputSplit inputSplit) { String sql; if (lowerBound < 0L && upperBound < 0L) { sql = String.format("SELECT %s FROM %s", String.join(",", measurements), device); @@ -131,7 +129,7 @@ public void open(InputSplit inputSplit) throws IOException { } @Override - public boolean reachedEnd() throws IOException { + public boolean reachedEnd() { try { return !dataSet.hasNext(); } catch (StatementExecutionException | IoTDBConnectionException e) { @@ -140,7 +138,7 @@ public boolean reachedEnd() throws IOException { } @Override - public RowData nextRecord(RowData rowData) throws IOException { + public RowData nextRecord(RowData rowData) { try { RowRecord rowRecord = dataSet.next(); return Utils.convert(rowRecord, columnTypes); @@ -150,7 +148,7 @@ public RowData nextRecord(RowData rowData) throws IOException { } @Override - public void close() throws IOException { + public void close() { try { if (dataSet != null) { dataSet.close(); diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java index 92013f9488bfb..a814f40b17d88 100644 --- a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.flink.sql.function; -import org.apache.iotdb.flink.sql.client.IoTDBWebsocketClient; +import org.apache.iotdb.flink.sql.client.IoTDBWebSocketClient; import org.apache.iotdb.flink.sql.common.Options; import org.apache.iotdb.flink.sql.common.Utils; import org.apache.iotdb.flink.sql.exception.IllegalOptionException; @@ -35,6 +35,7 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.java_websocket.client.WebSocketClient; import org.java_websocket.enums.ReadyState; @@ -52,9 +53,9 @@ import java.util.concurrent.Executors; import java.util.stream.Collectors; -public class IoTDBCDCSourceFunction extends RichSourceFunction { +public class IoTDBCDCSourceFunction extends RichSourceFunction { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBCDCSourceFunction.class); - private final List socketClients = new ArrayList<>(); + private final List socketClients = new ArrayList<>(); private final int cdcPort; private final List nodeUrls; private final String taskName; @@ -139,7 +140,7 @@ public void run(SourceContext ctx) throws InterruptedException { consumeExecutor.submit(new ConsumeRunnable(ctx)); consumeExecutor.shutdown(); while (true) { - for (IoTDBWebsocketClient socketClient : socketClients) { + for (IoTDBWebSocketClient socketClient : socketClients) { if (socketClient.getReadyState().equals(ReadyState.CLOSED)) { while (!Utils.isURIAvailable(socketClient.getURI())) { String log = @@ -170,8 +171,8 @@ public void addTabletWrapper(TabletWrapper tabletWrapper) { try { this.tabletWrappers.put(tabletWrapper); } catch (InterruptedException e) { - String host = tabletWrapper.getWebsocketClient().getRemoteSocketAddress().getHostName(); - int port = tabletWrapper.getWebsocketClient().getRemoteSocketAddress().getPort(); + String host = tabletWrapper.getWebSocketClient().getRemoteSocketAddress().getHostName(); + int port = tabletWrapper.getWebSocketClient().getRemoteSocketAddress().getPort(); String log = String.format( "The tablet from %s:%d can't be put into queue, because: %s", @@ -181,7 +182,7 @@ public void addTabletWrapper(TabletWrapper tabletWrapper) { } } - private IoTDBWebsocketClient initAndGet(URI uri) throws InterruptedException { + private IoTDBWebSocketClient initAndGet(URI uri) throws InterruptedException { while (!Utils.isURIAvailable(uri)) { String log = String.format( @@ -189,7 +190,7 @@ private IoTDBWebsocketClient initAndGet(URI uri) throws InterruptedException { LOGGER.warn(log); Thread.sleep(5000); } - IoTDBWebsocketClient client = new IoTDBWebsocketClient(uri, this); + IoTDBWebSocketClient client = new IoTDBWebSocketClient(uri, this); client.connect(); while (!client.getReadyState().equals(ReadyState.OPEN)) { Thread.sleep(1000); @@ -224,7 +225,7 @@ public void collectTablet(Tablet tablet, SourceContext ctx) { row.add(null); } } - RowData rowData = (RowData) GenericRowData.of(row.toArray()); + RowData rowData = GenericRowData.of(row.toArray()); ctx.collect(rowData); } } @@ -243,7 +244,7 @@ public void run() { TabletWrapper tabletWrapper = tabletWrappers.take(); collectTablet(tabletWrapper.getTablet(), context); tabletWrapper - .getWebsocketClient() + .getWebSocketClient() .send(String.format("ACK:%d", tabletWrapper.getCommitId())); } catch (InterruptedException e) { LOGGER.warn("The tablet can't be taken from queue!"); diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBSinkFunction.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBSinkFunction.java index a78d925940576..4933e66b0f91e 100644 --- a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBSinkFunction.java +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBSinkFunction.java @@ -62,15 +62,15 @@ public class IoTDBSinkFunction implements SinkFunction { } public IoTDBSinkFunction(ReadableConfig options, SchemaWrapper schemaWrapper) { - // get schema + // Get schema this.schema = schemaWrapper.getSchema(); - // get options + // Get options nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(",")); user = options.get(Options.USER); password = options.get(Options.PASSWORD); device = options.get(Options.DEVICE); aligned = options.get(Options.ALIGNED); - // get measurements and data types from schema + // Get measurements and data types from schema measurements = schema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList()); dataTypes = schema.stream().map(field -> TYPE_MAP.get(field.f1)).collect(Collectors.toList()); @@ -78,12 +78,12 @@ public IoTDBSinkFunction(ReadableConfig options, SchemaWrapper schemaWrapper) { @Override public void invoke(RowData rowData, Context context) throws Exception { - // open the session if the session has not been opened + // Open the session if the session has not been opened if (session == null) { session = new Session.Builder().nodeUrls(nodeUrls).username(user).password(password).build(); session.open(false); } - // load data from RowData + // Load data from RowData if (rowData.getRowKind().equals(RowKind.INSERT) || rowData.getRowKind().equals(RowKind.UPDATE_AFTER)) { long timestamp = rowData.getLong(0); diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSource.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSource.java index 535da831a6fe6..7feb4dcb2b13c 100644 --- a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSource.java +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/provider/IoTDBDynamicTableSource.java @@ -67,7 +67,7 @@ public ChangelogMode getChangelogMode() { public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { if (options.get(Options.MODE) == Options.Mode.CDC) { return SourceFunctionProvider.of( - new IoTDBCDCSourceFunction<>(options, new SchemaWrapper(schema)), false); + new IoTDBCDCSourceFunction(options, new SchemaWrapper(schema)), false); } else { return InputFormatProvider.of( new IoTDBBoundedScanFunction(options, new SchemaWrapper(schema))); diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/SchemaWrapper.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/SchemaWrapper.java index 4a50225981303..6f8ec11fbf2ac 100644 --- a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/SchemaWrapper.java +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/SchemaWrapper.java @@ -27,13 +27,13 @@ import java.util.List; public class SchemaWrapper implements Serializable { - private List> schema; + private final List> schema; public SchemaWrapper(TableSchema schema) { this.schema = new ArrayList<>(); for (String fieldName : schema.getFieldNames()) { - if (fieldName == "Time_") { + if ("Time_".equals(fieldName)) { continue; } this.schema.add(new Tuple2<>(fieldName, schema.getFieldDataType(fieldName).get())); diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/TabletWrapper.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/TabletWrapper.java index fbf0eee10e04a..4ef08386695a2 100644 --- a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/TabletWrapper.java +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/wrapper/TabletWrapper.java @@ -18,15 +18,15 @@ */ package org.apache.iotdb.flink.sql.wrapper; -import org.apache.iotdb.flink.sql.client.IoTDBWebsocketClient; +import org.apache.iotdb.flink.sql.client.IoTDBWebSocketClient; import org.apache.iotdb.tsfile.write.record.Tablet; public class TabletWrapper { - private long commitId; - private IoTDBWebsocketClient websocketClient; - private Tablet tablet; + private final long commitId; + private final IoTDBWebSocketClient websocketClient; + private final Tablet tablet; - public TabletWrapper(long commitId, IoTDBWebsocketClient websocketClient, Tablet tablet) { + public TabletWrapper(long commitId, IoTDBWebSocketClient websocketClient, Tablet tablet) { this.commitId = commitId; this.websocketClient = websocketClient; this.tablet = tablet; @@ -36,7 +36,7 @@ public long getCommitId() { return commitId; } - public IoTDBWebsocketClient getWebsocketClient() { + public IoTDBWebSocketClient getWebSocketClient() { return websocketClient; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java index 5efc7a05295cc..c7993045e368d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java @@ -36,9 +36,8 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; -import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.PriorityBlockingQueue; @@ -47,9 +46,9 @@ public class WebSocketConnectorServer extends WebSocketServer { private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConnectorServer.class); private final PriorityBlockingQueue> events = new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left)); - private WebsocketConnector websocketConnector; + private final WebsocketConnector websocketConnector; - private ConcurrentMap eventMap = new ConcurrentHashMap<>(); + private final ConcurrentMap eventMap = new ConcurrentHashMap<>(); public WebSocketConnectorServer( InetSocketAddress address, WebsocketConnector websocketConnector) { @@ -97,7 +96,7 @@ public void onMessage(WebSocket webSocket, String s) { @Override public void onError(WebSocket webSocket, Exception e) { - String log = null; + String log; if (webSocket.getRemoteSocketAddress() != null) { log = String.format( @@ -115,7 +114,7 @@ public void onError(WebSocket webSocket, Exception e) { public void onStart() { String log = String.format( - "The websocket server %s:%d has been started!", + "The webSocket server %s:%d has been started!", this.getAddress().getHostName(), this.getPort()); LOGGER.error(log); } @@ -137,12 +136,10 @@ public void addEvent(Pair event) { private void handleStart(WebSocket webSocket) { try { - ArrayList webSockets = new ArrayList<>(); - webSockets.add(webSocket); Pair eventPair = events.take(); synchronized (events) { events.notifyAll(); - transfer(eventPair, webSockets); + transfer(eventPair, webSocket); } } catch (InterruptedException e) { String log = String.format("The event can't be taken, because: %s", e.getMessage()); @@ -170,7 +167,7 @@ private void handleError(WebSocket webSocket, String s) { handleStart(webSocket); } - private void transfer(Pair eventPair, List webSockets) { + private void transfer(Pair eventPair, WebSocket webSocket) { Long commitId = eventPair.getLeft(); Event event = eventPair.getRight(); try { @@ -198,13 +195,13 @@ private void transfer(Pair eventPair, List webSockets) { payload.putLong(commitId); payload.put(tabletBuffer); payload.flip(); - this.broadcast(payload, webSockets); + this.broadcast(payload, Collections.singletonList(webSocket)); eventMap.put(eventPair.getLeft(), eventPair.getRight()); String log = String.format( "Transferred a message to client %s:%d", - webSockets.get(0).getRemoteSocketAddress().getAddress().getHostName(), - webSockets.get(0).getRemoteSocketAddress().getPort()); + webSocket.getRemoteSocketAddress().getAddress().getHostName(), + webSocket.getRemoteSocketAddress().getPort()); LOGGER.info(log); } catch (InterruptedException e) { events.put(eventPair); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index c9eebfc1f44fb..f625e41cf72b1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector; -import org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebsocketConnector; +import org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebSocketConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor; import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor; @@ -44,7 +44,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncConnector.class), IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeConnector.class), IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapConnector.class), - WEBSOCKET_CONNECTOR("websocket-connector", WebsocketConnector.class), + WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class), ; private final String pipePluginName; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebsocketConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java similarity index 92% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebsocketConnector.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java index 82302d55e2666..a9d3a8ccf9ef7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebsocketConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java @@ -20,9 +20,9 @@ package org.apache.iotdb.commons.pipe.plugin.builtin.connector; /** - * This class is a placeholder and should not be initialized. It represents the IoTDB Thrift + * This class is a placeholder and should not be initialized. It represents the IoTDB WebSocket * connector. There is a real implementation in the server module but cannot be imported here. The * pipe agent in the server module will replace this class with the real implementation when * initializing the IoTDB Thrift connector. */ -public class WebsocketConnector extends PlaceholderConnector {} +public class WebSocketConnector extends PlaceholderConnector {} From 0cb42244b510643d05a872bea5c7caecdd52a9df Mon Sep 17 00:00:00 2001 From: xuanronaldo Date: Mon, 28 Aug 2023 15:56:16 +0800 Subject: [PATCH 4/5] add module flink-sql-iotdb-connector --- iotdb-connector/flink-sql-iotdb-connector/pom.xml | 4 ++-- iotdb-connector/pom.xml | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/iotdb-connector/flink-sql-iotdb-connector/pom.xml b/iotdb-connector/flink-sql-iotdb-connector/pom.xml index 0a23c56989536..cd3d20a1a5ffb 100644 --- a/iotdb-connector/flink-sql-iotdb-connector/pom.xml +++ b/iotdb-connector/flink-sql-iotdb-connector/pom.xml @@ -24,12 +24,12 @@ org.apache.iotdb iotdb-parent - 1.3.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml flink-sql-iotdb-connector IoTDB: Connector: Apache Flink SQL Connector - 1.3.0-SNAPSHOT + 1.2.0-SNAPSHOT UTF-8 1.17.0 diff --git a/iotdb-connector/pom.xml b/iotdb-connector/pom.xml index 06e04d1e67429..14321833059e9 100644 --- a/iotdb-connector/pom.xml +++ b/iotdb-connector/pom.xml @@ -31,6 +31,7 @@ IoTDB: Connector flink-iotdb-connector + flink-sql-iotdb-connector flink-tsfile-connector grafana-connector hadoop From d2243afec2e943edd908b79cd3eec3e865650463 Mon Sep 17 00:00:00 2001 From: xuanronaldo Date: Mon, 28 Aug 2023 17:15:18 +0800 Subject: [PATCH 5/5] add module flink-sql-iotdb-connector --- iotdb-core/datanode/pom.xml | 5 +++++ pom.xml | 3 +++ 2 files changed, 8 insertions(+) diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index 7aa1da5247f80..ee23748402899 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -212,6 +212,11 @@ com.lmax disruptor + + org.java-websocket + Java-WebSocket + ${websocket.version} + diff --git a/pom.xml b/pom.xml index 654b774babb41..ce0bf9d1aed32 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,7 @@ iotdb-connector/spark-iotdb-connector/scala_2.11 iotdb-connector/spark-iotdb-connector/scala_2.12 iotdb-connector/flink-tsfile-connector + iotdb-connector/flink-sql-iotdb-connector iotdb-connector/flink-iotdb-connector distribution iotdb-connector/hive-connector @@ -227,6 +228,8 @@ 1.17.0 generate-sources + + 1.5.3