From acf4d5b1b469adf383b06266ef2e9815f472cd25 Mon Sep 17 00:00:00 2001 From: Li Hongyu Date: Tue, 10 Jan 2023 10:06:09 +0800 Subject: [PATCH] [Feature][Connector-V2] add tdengine source (#2832) * [Feature][Connector-V2]add tdengine source and sink 1. add ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY to statement 2. add tdengine e2e module * SourceSplitEnumerator.Context Co-authored-by: bjyflihongyu Co-authored-by: tyrantlucifer --- docs/en/connector-v2/sink/TDengine.md | 69 ++++++ docs/en/connector-v2/source/TDengine.md | 82 +++++++ plugin-mapping.properties | 3 + .../connector-tdengine/pom.xml | 45 ++++ .../tdengine/config/TDengineSourceConfig.java | 79 +++++++ .../exception/TDengineConnectorException.java | 36 ++++ .../seatunnel/tdengine/sink/TDengineSink.java | 63 ++++++ .../tdengine/sink/TDengineSinkWriter.java | 120 +++++++++++ .../tdengine/source/TDengineSource.java | 141 ++++++++++++ .../tdengine/source/TDengineSourceReader.java | 159 ++++++++++++++ .../tdengine/source/TDengineSourceSplit.java | 46 ++++ .../source/TDengineSourceSplitEnumerator.java | 204 ++++++++++++++++++ .../tdengine/state/TDengineSourceState.java | 36 ++++ .../typemapper/TDengineTypeMapper.java | 153 +++++++++++++ .../seatunnel/tdengine/TDengineTest.java | 50 +++++ seatunnel-connectors-v2/pom.xml | 1 + seatunnel-dist/pom.xml | 6 + .../connector-tdengine-e2e/pom.xml | 47 ++++ .../e2e/connector/tdengine/TDengineIT.java | 198 +++++++++++++++++ .../tdengine/tdengine_source_to_sink.conf | 59 +++++ .../seatunnel-connector-v2-e2e/pom.xml | 1 + .../batch/ParallelBatchPartitionReader.java | 4 +- 22 files changed, 1601 insertions(+), 1 deletion(-) create mode 100644 docs/en/connector-v2/sink/TDengine.md create mode 100644 docs/en/connector-v2/source/TDengine.md create mode 100644 seatunnel-connectors-v2/connector-tdengine/pom.xml create mode 100644 seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java create mode 100644 seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java create mode 100644 seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java create mode 100644 seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java create mode 100644 seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java create mode 100644 seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java create mode 100644 seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplit.java create mode 100644 seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java create mode 100644 seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java create mode 100644 seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/typemapper/TDengineTypeMapper.java create mode 100644 seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/TDengineTest.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf diff --git a/docs/en/connector-v2/sink/TDengine.md b/docs/en/connector-v2/sink/TDengine.md new file mode 100644 index 00000000000..597a7a99b50 --- /dev/null +++ b/docs/en/connector-v2/sink/TDengine.md @@ -0,0 +1,69 @@ +# TDengine + +> TDengine sink connector + +## Description + +Used to write data to TDengine. You need to create stable before running seatunnel task + +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|----------------------------|---------|----------|---------------| +| url | string | yes | - | +| username | string | yes | - | +| password | string | yes | - | +| database | string | yes | | +| stable | string | yes | - | +| timezone | string | no | UTC | + +### url [string] + +the url of the TDengine when you select the TDengine + +e.g. +``` +jdbc:TAOS-RS://localhost:6041/ +``` + +### username [string] + +the username of the TDengine when you select + +### password [string] + +the password of the TDengine when you select + +### database [string] + +the database of the TDengine when you select + +### stable [string] + +the stable of the TDengine when you select + +### timezone [string] + +the timeznoe of the TDengine sever, it's important to the ts field + +## Example + +### sink + +```hocon +sink { + TDengine { + url : "jdbc:TAOS-RS://localhost:6041/" + username : "root" + password : "taosdata" + database : "power2" + stable : "meters2" + timezone: UTC + } +} +``` \ No newline at end of file diff --git a/docs/en/connector-v2/source/TDengine.md b/docs/en/connector-v2/source/TDengine.md new file mode 100644 index 00000000000..fea0a080d99 --- /dev/null +++ b/docs/en/connector-v2/source/TDengine.md @@ -0,0 +1,82 @@ +# TDengine + +> TDengine source connector + +## Description + +Read external data source data through TDengine. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [column projection](../../concept/connector-v2-features.md) + +supports query SQL and can achieve projection effect. + +- [x] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|----------------------------|---------|----------|---------------| +| url | string | yes | - | +| username | string | yes | - | +| password | string | yes | - | +| database | string | yes | | +| stable | string | yes | - | +| lower_bound | long | yes | - | +| upper_bound | long | yes | - | + +### url [string] + +the url of the TDengine when you select the TDengine + +e.g. +``` +jdbc:TAOS-RS://localhost:6041/ +``` + +### username [string] + +the username of the TDengine when you select + +### password [string] + +the password of the TDengine when you select + +### database [string] + +the database of the TDengine when you select + +### stable [string] + +the stable of the TDengine when you select + +### lower_bound [long] + +the lower_bound of the migration period + +### upper_bound [long] + +the upper_bound of the migration period + +## Example + +### source +```hocon +source { + TDengine { + url : "jdbc:TAOS-RS://localhost:6041/" + username : "root" + password : "taosdata" + database : "power" + stable : "meters" + lower_bound : "2018-10-03 14:38:05.000" + upper_bound : "2018-10-03 14:38:16.800" + result_table_name = "tdengine_result" + } +} +``` \ No newline at end of file diff --git a/plugin-mapping.properties b/plugin-mapping.properties index c12aa0ec793..8e55c397c44 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -166,4 +166,7 @@ seatunnel.source.Maxcompute = connector-maxcompute seatunnel.sink.Maxcompute = connector-maxcompute seatunnel.source.MySQL-CDC = connector-cdc-mysql seatunnel.sink.S3Redshift = connector-s3-redshift +seatunnel.source.TDengine = connector-tdengine +seatunnel.sink.TDengine = connector-tdengine seatunnel.source.Persistiq = connector-http-persistiq + diff --git a/seatunnel-connectors-v2/connector-tdengine/pom.xml b/seatunnel-connectors-v2/connector-tdengine/pom.xml new file mode 100644 index 00000000000..31db6a553ec --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/pom.xml @@ -0,0 +1,45 @@ + + + + + org.apache.seatunnel + seatunnel-connectors-v2 + ${revision} + + 4.0.0 + + connector-tdengine + + + + org.apache.seatunnel + connector-common + ${project.version} + + + com.taosdata.jdbc + taos-jdbcdriver + 3.0.3 + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java new file mode 100644 index 00000000000..4bd7e1c6087 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.config; + +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.LOWER_BOUND; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.TIMEZONE; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.UPPER_BOUND; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class TDengineSourceConfig implements Serializable { + + /** + * jdbc:TAOS-RS://localhost:6041/ + */ + private String url; + private String username; + private String password; + private String database; + private String stable; + //param of timezone in 'jdbc:TAOS-RS' just effect on taosadapter side, other than the JDBC client side + //so this param represent the server-side timezone setting up + private String timezone; + private String lowerBound; + private String upperBound; + private List fields; + private List tags; + + public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) { + TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig(); + tdengineSourceConfig.setUrl(pluginConfig.hasPath(URL) ? pluginConfig.getString(URL) : null); + tdengineSourceConfig.setDatabase(pluginConfig.hasPath(DATABASE) ? pluginConfig.getString(DATABASE) : null); + tdengineSourceConfig.setStable(pluginConfig.hasPath(STABLE) ? pluginConfig.getString(STABLE) : null); + tdengineSourceConfig.setUsername(pluginConfig.hasPath(USERNAME) ? pluginConfig.getString(USERNAME) : null); + tdengineSourceConfig.setPassword(pluginConfig.hasPath(PASSWORD) ? pluginConfig.getString(PASSWORD) : null); + tdengineSourceConfig.setUpperBound(pluginConfig.hasPath(UPPER_BOUND) ? pluginConfig.getString(UPPER_BOUND) : null); + tdengineSourceConfig.setLowerBound(pluginConfig.hasPath(LOWER_BOUND) ? pluginConfig.getString(LOWER_BOUND) : null); + tdengineSourceConfig.setTimezone(pluginConfig.hasPath(TIMEZONE) ? pluginConfig.getString(TIMEZONE) : "UTC"); + return tdengineSourceConfig; + } + + public static class ConfigNames { + + public static String URL = "url"; + public static String USERNAME = "username"; + public static String PASSWORD = "password"; + public static String DATABASE = "database"; + public static String STABLE = "stable"; + public static String TIMEZONE = "timezone"; + public static String LOWER_BOUND = "lower_bound"; + public static String UPPER_BOUND = "upper_bound"; + } +} diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java new file mode 100644 index 00000000000..77e1edac8fa --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class TDengineConnectorException extends SeaTunnelRuntimeException { + public TDengineConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public TDengineConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public TDengineConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java new file mode 100644 index 00000000000..a06cd8999d3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.sink; + +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSink.class) +public class TDengineSink extends AbstractSimpleSink { + private SeaTunnelRowType seaTunnelRowType; + private Config pluginConfig; + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return this.seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new TDengineSinkWriter(pluginConfig, seaTunnelRowType); + } + + @Override + public void prepare(Config pluginConfig) { + this.pluginConfig = pluginConfig; + } + + @Override + public String getPluginName() { + return "TDengine"; + } +} diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java new file mode 100644 index 00000000000..e6c0b5785d0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Objects; + +@Slf4j +public class TDengineSinkWriter extends AbstractSinkWriter { + + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); + private final Connection conn; + private final TDengineSourceConfig config; + private int tagsNum; + + @SneakyThrows + public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) { + config = TDengineSourceConfig.buildSourceConfig(pluginConfig); + String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword()); + conn = DriverManager.getConnection(jdbcUrl); + try (Statement statement = conn.createStatement()) { + final ResultSet metaResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable()); + while (metaResultSet.next()) { + if (StringUtils.equals("TAG", metaResultSet.getString("note"))) { + tagsNum++; + } + } + } + } + + @SneakyThrows + @Override + @SuppressWarnings("checkstyle:RegexpSingleline") + public void write(SeaTunnelRow element) { + final ArrayList tags = Lists.newArrayList(); + for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) { + tags.add(element.getField(i)); + } + final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ","); + + final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum); + + try (Statement statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { + String sql = String.format("INSERT INTO %s using %s tags ( %s ) VALUES ( %s );", + element.getField(0), + config.getStable(), + tagValues, + StringUtils.join(convertDataType(metrics), ",")); + final int rowCount = statement.executeUpdate(sql); + if (rowCount == 0) { + Throwables.propagateIfPossible(new TDengineConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "insert error:" + element)); + } + } + } + + @Override + public void close() { + if (Objects.nonNull(conn)) { + try { + conn.close(); + } catch (SQLException e) { + throw new TDengineConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, "TDengine writer connection close failed", e); + } + } + } + + private Object[] convertDataType(Object[] objects) { + return Arrays.stream(objects) + .map(object -> { + if (LocalDateTime.class.equals(object.getClass())) { + //transform timezone according to the config + return "'" + ((LocalDateTime) object).atZone(ZoneId.systemDefault()).withZoneSameInstant(ZoneId.of(config.getTimezone())).format(FORMATTER) + "'"; + } else if (String.class.equals(object.getClass())) { + return "'" + object + "'"; + } + return object; + }) + .toArray(); + } +} diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java new file mode 100644 index 00000000000..a3d393d693f --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.source; + +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.buildSourceConfig; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceReader.Context; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; +import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState; +import org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTypeMapper; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; +import com.google.common.collect.Lists; +import lombok.SneakyThrows; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.List; + +/** + * TDengine source each split corresponds one subtable + *

+ * TODO: wait for optimization + * 1. batch -> batch + stream + * 2. one item of data writing -> a batch of data writing + */ +@AutoService(SeaTunnelSource.class) +public class TDengineSource implements SeaTunnelSource { + + private SeaTunnelRowType seaTunnelRowType; + private TDengineSourceConfig tdengineSourceConfig; + + @Override + public String getPluginName() { + return "TDengine"; + } + + @SneakyThrows + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL, DATABASE, STABLE, USERNAME, PASSWORD); + if (!result.isSuccess()) { + throw new TDengineConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "TDengine connection require url/database/stable/username/password. All of these must not be empty."); + } + tdengineSourceConfig = buildSourceConfig(pluginConfig); + + //add subtable_name and tags to `seaTunnelRowType` + SeaTunnelRowType originRowType = getSTableMetaInfo(tdengineSourceConfig); + seaTunnelRowType = addHiddenAttribute(originRowType); + } + + @SneakyThrows + private SeaTunnelRowType getSTableMetaInfo(TDengineSourceConfig config) { + String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword()); + Connection conn = DriverManager.getConnection(jdbcUrl); + List fieldNames = Lists.newArrayList(); + List> fieldTypes = Lists.newArrayList(); + try (Statement statement = conn.createStatement()) { + final ResultSet metaResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable()); + while (metaResultSet.next()) { + fieldNames.add(metaResultSet.getString(1)); + fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2))); + } + } + return new SeaTunnelRowType(fieldNames.toArray(new String[0]), fieldTypes.toArray(new SeaTunnelDataType[0])); + } + + private SeaTunnelRowType addHiddenAttribute(SeaTunnelRowType originRowType) { + //0-subtable_name / 1-n field_names / + String[] fieldNames = ArrayUtils.add(originRowType.getFieldNames(), 0, "subtable_name"); + // n+1-> tags + SeaTunnelDataType[] fieldTypes = ArrayUtils.add(originRowType.getFieldTypes(), 0, BasicType.STRING_TYPE); + return new SeaTunnelRowType(fieldNames, fieldTypes); + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SeaTunnelDataType getProducedType() { + return seaTunnelRowType; + } + + @Override + public SourceReader createReader(Context readerContext) { + return new TDengineSourceReader(tdengineSourceConfig, readerContext); + } + + @Override + public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) { + return new TDengineSourceSplitEnumerator(seaTunnelRowType, tdengineSourceConfig, enumeratorContext); + } + + @Override + public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, + TDengineSourceState checkpointState) { + return new TDengineSourceSplitEnumerator(seaTunnelRowType, tdengineSourceConfig, checkpointState, enumeratorContext); + } + +} diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java new file mode 100644 index 00000000000..1795b794e4c --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.source; + +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; + +import com.google.common.collect.Sets; +import com.taosdata.jdbc.TSDBDriver; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + +@Slf4j +public class TDengineSourceReader implements SourceReader { + + private static final long THREAD_WAIT_TIME = 500L; + + private final TDengineSourceConfig config; + + private final Set sourceSplits; + + private final Context context; + + private Connection conn; + + public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) { + this.config = config; + this.sourceSplits = Sets.newHashSet(); + this.context = readerContext; + } + + @Override + public void pollNext(Collector collector) throws InterruptedException { + if (sourceSplits.isEmpty()) { + Thread.sleep(THREAD_WAIT_TIME); + return; + } + synchronized (collector.getCheckpointLock()) { + sourceSplits.forEach(split -> { + try { + read(split, collector); + } catch (Exception e) { + throw new TDengineConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "TDengine split read error", e); + } + }); + } + + if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded TDengine source"); + context.signalNoMoreElement(); + } + } + + @Override + public void open(){ + String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword()); + Properties connProps = new Properties(); + //todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true", + // there is a exception : Caused by: java.sql.SQLException: can't create connection with server + // under docker network env + // @bobo (tdengine) + connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "false"); + try { + conn = DriverManager.getConnection(jdbcUrl, connProps); + } catch (SQLException e) { + throw new TDengineConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "get TDengine connection failed:" + jdbcUrl); + } + } + + @Override + public void close() { + try { + if (!Objects.isNull(conn)) { + conn.close(); + } + } catch (SQLException e) { + throw new TDengineConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "TDengine reader connection close failed", e); + } + } + + private void read(TDengineSourceSplit split, Collector output) throws Exception { + try (Statement statement = conn.createStatement()) { + final ResultSet resultSet = statement.executeQuery(split.getQuery()); + ResultSetMetaData meta = resultSet.getMetaData(); + + while (resultSet.next()) { + Object[] datas = new Object[meta.getColumnCount() + 1]; + datas[0] = split.splitId(); + for (int i = 1; i <= meta.getColumnCount(); i++) { + datas[i] = convertDataType(resultSet.getObject(i)); + } + output.collect(new SeaTunnelRow(datas)); + } + } + } + + private Object convertDataType(Object object) { + if (Timestamp.class.equals(object.getClass())) { + return ((Timestamp) object).toLocalDateTime(); + } else if (byte[].class.equals(object.getClass())) { + return new String((byte[]) object); + } + return object; + } + + @Override + public List snapshotState(long checkpointId) { + return new ArrayList<>(sourceSplits); + } + + @Override + public void addSplits(List splits) { + sourceSplits.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + // do nothing + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // do nothing + } +} diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplit.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplit.java new file mode 100644 index 00000000000..2b8ad47f83e --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplit.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.source; + +import org.apache.seatunnel.api.source.SourceSplit; + +public class TDengineSourceSplit implements SourceSplit { + + private static final long serialVersionUID = -1L; + + private String splitId; + + /** + * final query statement + */ + private String query; + + @Override + public String splitId() { + return splitId; + } + + public String getQuery() { + return query; + } + + public TDengineSourceSplit(String splitId, String query) { + this.splitId = splitId; + this.query = query; + } +} diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java new file mode 100644 index 00000000000..2a8d4656aa1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.source; + +import org.apache.seatunnel.api.source.SourceEvent; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; +import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState; + +import com.google.common.collect.Sets; +import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public class TDengineSourceSplitEnumerator implements SourceSplitEnumerator { + + private final SourceSplitEnumerator.Context context; + private final TDengineSourceConfig config; + private Set pendingSplit = new HashSet<>(); + private Set assignedSplit = new HashSet<>(); + private Connection conn; + private SeaTunnelRowType seaTunnelRowType; + + public TDengineSourceSplitEnumerator(SeaTunnelRowType seaTunnelRowType, TDengineSourceConfig config, SourceSplitEnumerator.Context context) { + this(seaTunnelRowType, config, null, context); + } + + public TDengineSourceSplitEnumerator(SeaTunnelRowType seaTunnelRowType, TDengineSourceConfig config, TDengineSourceState sourceState, SourceSplitEnumerator.Context context) { + this.config = config; + this.context = context; + this.seaTunnelRowType = seaTunnelRowType; + if (sourceState != null) { + this.assignedSplit = sourceState.getAssignedSplit(); + } + } + + private static int getSplitOwner(String tp, int numReaders) { + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + @SneakyThrows + @Override + public void open() { + String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword()); + conn = DriverManager.getConnection(jdbcUrl); + } + + @Override + public void run() throws SQLException { + pendingSplit = getAllSplits(); + assignSplit(context.registeredReaders()); + } + + /* + * 1. get timestampField + * 2. get all sub tables of configured super table + * 3. each split has one sub table + */ + private Set getAllSplits() throws SQLException { + final String timestampFieldName; + try (Statement statement = conn.createStatement()) { + final ResultSet fieldNameResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable()); + fieldNameResultSet.next(); + timestampFieldName = fieldNameResultSet.getString(1); + } + + final Set splits = Sets.newHashSet(); + try (Statement statement = conn.createStatement()) { + String metaSQL = "select table_name from information_schema.ins_tables where db_name = '" + config.getDatabase() + "' and stable_name='" + config.getStable() + "';"; + ResultSet subTableNameResultSet = statement.executeQuery(metaSQL); + while (subTableNameResultSet.next()) { + final String subTableName = subTableNameResultSet.getString(1); + final TDengineSourceSplit splitBySubTable = createSplitBySubTable(subTableName, timestampFieldName); + splits.add(splitBySubTable); + } + } + return splits; + } + + private TDengineSourceSplit createSplitBySubTable(String subTableName, String timestampFieldName) { + String selectFields = Arrays.stream(seaTunnelRowType.getFieldNames()).skip(1).collect(Collectors.joining(",")); + String subTableSQL = "select " + selectFields + " from " + config.getDatabase() + "." + subTableName; + String start = config.getLowerBound(); + String end = config.getUpperBound(); + if (start != null || end != null) { + String startCondition = null; + String endCondition = null; + //Left closed right away + if (start != null) { + startCondition = timestampFieldName + " >= '" + start + "'"; + } + if (end != null) { + endCondition = timestampFieldName + " < '" + end + "'"; + } + String query = StringUtils.join(new String[]{startCondition, endCondition}, " and "); + subTableSQL = subTableSQL + " where " + query; + } + + return new TDengineSourceSplit(subTableName, subTableSQL); + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (!splits.isEmpty()) { + pendingSplit.addAll(splits); + assignSplit(Collections.singletonList(subtaskId)); + } + } + + @Override + public int currentUnassignedSplitSize() { + return pendingSplit.size(); + } + + @Override + public void registerReader(int subtaskId) { + if (!pendingSplit.isEmpty()) { + assignSplit(Collections.singletonList(subtaskId)); + } + } + + private void assignSplit(Collection taskIDList) { + assignedSplit = pendingSplit.stream() + .map(split -> { + int splitOwner = getSplitOwner(split.splitId(), context.currentParallelism()); + if (taskIDList.contains(splitOwner)) { + context.assignSplit(splitOwner, split); + return split; + } else { + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + pendingSplit.clear(); + } + + @Override + public TDengineSourceState snapshotState(long checkpointId) { + return new TDengineSourceState(assignedSplit); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + SourceSplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + //nothing to do + } + + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + SourceSplitEnumerator.super.notifyCheckpointAborted(checkpointId); + } + + @Override + public void close() { + try { + if (!Objects.isNull(conn)) { + conn.close(); + } + } catch (SQLException e) { + throw new TDengineConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "TDengine split_enumerator connection close failed", e); + } + } + + @Override + public void handleSplitRequest(int subtaskId) { + //nothing to do + } +} diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java new file mode 100644 index 00000000000..fc839682a92 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.state; + +import org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit; + +import java.io.Serializable; +import java.util.Set; + +public class TDengineSourceState implements Serializable { + + private final Set assignedSplit; + + public TDengineSourceState(Set assignedSplit) { + this.assignedSplit = assignedSplit; + } + + public Set getAssignedSplit() { + return assignedSplit; + } +} diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/typemapper/TDengineTypeMapper.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/typemapper/TDengineTypeMapper.java new file mode 100644 index 00000000000..35e50c6704a --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/typemapper/TDengineTypeMapper.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TDengineTypeMapper { + + + // ============================data types===================== + + private static final String TDENGINE_UNKNOWN = "UNKNOWN"; + private static final String TDENGINE_BIT = "BIT"; + + // -------------------------number---------------------------- + private static final String TDENGINE_TINYINT = "TINYINT"; + private static final String TDENGINE_TINYINT_UNSIGNED = "TINYINT UNSIGNED"; + private static final String TDENGINE_SMALLINT = "SMALLINT"; + private static final String TDENGINE_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED"; + private static final String TDENGINE_MEDIUMINT = "MEDIUMINT"; + private static final String TDENGINE_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED"; + private static final String TDENGINE_INT = "INT"; + private static final String TDENGINE_INT_UNSIGNED = "INT UNSIGNED"; + private static final String TDENGINE_INTEGER = "INTEGER"; + private static final String TDENGINE_INTEGER_UNSIGNED = "INTEGER UNSIGNED"; + private static final String TDENGINE_BIGINT = "BIGINT"; + private static final String TDENGINE_BIGINT_UNSIGNED = "BIGINT UNSIGNED"; + private static final String TDENGINE_DECIMAL = "DECIMAL"; + private static final String TDENGINE_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; + private static final String TDENGINE_FLOAT = "FLOAT"; + private static final String TDENGINE_FLOAT_UNSIGNED = "FLOAT UNSIGNED"; + private static final String TDENGINE_DOUBLE = "DOUBLE"; + private static final String TDENGINE_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; + + // -------------------------string---------------------------- + private static final String TDENGINE_CHAR = "CHAR"; + private static final String TDENGINE_VARCHAR = "VARCHAR"; + private static final String TDENGINE_TINYTEXT = "TINYTEXT"; + private static final String TDENGINE_MEDIUMTEXT = "MEDIUMTEXT"; + private static final String TDENGINE_TEXT = "TEXT"; + private static final String TDENGINE_LONGTEXT = "LONGTEXT"; + private static final String TDENGINE_JSON = "JSON"; + + // ------------------------------time------------------------- + private static final String TDENGINE_DATE = "DATE"; + private static final String TDENGINE_DATETIME = "DATETIME"; + private static final String TDENGINE_TIME = "TIME"; + private static final String TDENGINE_TIMESTAMP = "TIMESTAMP"; + private static final String TDENGINE_YEAR = "YEAR"; + + // ------------------------------blob------------------------- + private static final String TDENGINE_TINYBLOB = "TINYBLOB"; + private static final String TDENGINE_MEDIUMBLOB = "MEDIUMBLOB"; + private static final String TDENGINE_BLOB = "BLOB"; + private static final String TDENGINE_LONGBLOB = "LONGBLOB"; + private static final String TDENGINE_BINARY = "BINARY"; + private static final String TDENGINE_VARBINARY = "VARBINARY"; + private static final String TDENGINE_GEOMETRY = "GEOMETRY"; + + @SuppressWarnings("checkstyle:MagicNumber") + public static SeaTunnelDataType mapping(String tdengineType) { + switch (tdengineType) { + case TDENGINE_BIT: + return BasicType.BOOLEAN_TYPE; + case TDENGINE_TINYINT: + case TDENGINE_TINYINT_UNSIGNED: + case TDENGINE_SMALLINT: + case TDENGINE_SMALLINT_UNSIGNED: + case TDENGINE_MEDIUMINT: + case TDENGINE_MEDIUMINT_UNSIGNED: + case TDENGINE_INT: + case TDENGINE_INTEGER: + case TDENGINE_YEAR: + return BasicType.INT_TYPE; + case TDENGINE_INT_UNSIGNED: + case TDENGINE_INTEGER_UNSIGNED: + case TDENGINE_BIGINT: + return BasicType.LONG_TYPE; + case TDENGINE_BIGINT_UNSIGNED: + return new DecimalType(20, 0); + case TDENGINE_DECIMAL: + log.warn("{} will probably cause value overflow.", TDENGINE_DECIMAL); + return new DecimalType(38, 18); + case TDENGINE_DECIMAL_UNSIGNED: + return new DecimalType(38, 18); + case TDENGINE_FLOAT: + return BasicType.FLOAT_TYPE; + case TDENGINE_FLOAT_UNSIGNED: + log.warn("{} will probably cause value overflow.", TDENGINE_FLOAT_UNSIGNED); + return BasicType.FLOAT_TYPE; + case TDENGINE_DOUBLE: + return BasicType.DOUBLE_TYPE; + case TDENGINE_DOUBLE_UNSIGNED: + log.warn("{} will probably cause value overflow.", TDENGINE_DOUBLE_UNSIGNED); + return BasicType.DOUBLE_TYPE; + case TDENGINE_CHAR: + case TDENGINE_TINYTEXT: + case TDENGINE_MEDIUMTEXT: + case TDENGINE_TEXT: + case TDENGINE_VARCHAR: + case TDENGINE_JSON: + case TDENGINE_LONGTEXT: + return BasicType.STRING_TYPE; + case TDENGINE_DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case TDENGINE_TIME: + return LocalTimeType.LOCAL_TIME_TYPE; + case TDENGINE_DATETIME: + case TDENGINE_TIMESTAMP: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + + case TDENGINE_TINYBLOB: + case TDENGINE_MEDIUMBLOB: + case TDENGINE_BLOB: + case TDENGINE_LONGBLOB: + case TDENGINE_VARBINARY: + case TDENGINE_BINARY: + return PrimitiveByteArrayType.INSTANCE; + + //Doesn't support yet + case TDENGINE_GEOMETRY: + case TDENGINE_UNKNOWN: + default: + throw new TDengineConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, String.format( + "Doesn't support TDENGINE type '%s' on column '%s' yet.", + tdengineType)); + } + } +} diff --git a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/TDengineTest.java b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/TDengineTest.java new file mode 100644 index 00000000000..11844619326 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/TDengineTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine; + +import com.taosdata.jdbc.TSDBDriver; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +public class TDengineTest { + + public void testQueryUrl(String jdbcUrl) { + Assertions.assertDoesNotThrow(() -> { + try (Connection conn = getConnection(jdbcUrl)) { + try (Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("SELECT location,AVG(voltage) FROM meters GROUP BY location;"); + } + } + }); + } + + @SneakyThrows + private Connection getConnection(String jdbcUrl) { + Properties connProps = new Properties(); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true"); + return DriverManager.getConnection(jdbcUrl, connProps); + } + +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index ff56c817bb3..bdae3668bcf 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -68,6 +68,7 @@ connector-openmldb connector-doris connector-maxcompute + connector-tdengine diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index af2ad8ac0e7..f887959b7c0 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -443,6 +443,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-tdengine + ${project.version} + provided + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/pom.xml new file mode 100644 index 00000000000..974a96be022 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/pom.xml @@ -0,0 +1,47 @@ + + + + + seatunnel-connector-v2-e2e + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-tdengine-e2e + + + 8 + 8 + + + + + org.apache.seatunnel + connector-fake + ${project.version} + test + + + org.apache.seatunnel + connector-tdengine + ${project.version} + test + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java new file mode 100644 index 00000000000..19ef53524a4 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.tdengine; + +import static org.awaitility.Awaitility.given; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@Slf4j +public class TDengineIT extends TestSuiteBase implements TestResource { + private static final String DOCKER_IMAGE = "tdengine/tdengine:3.0.2.1"; + private static final String NETWORK_ALIASES1 = "flink_e2e_tdengine_src"; + private static final String NETWORK_ALIASES2 = "flink_e2e_tdengine_sink"; + private static final int PORT = 6041; + + private GenericContainer tdengineServer1; + private GenericContainer tdengineServer2; + private Connection connection1; + private Connection connection2; + private int testDataCount; + + @BeforeAll + @Override + public void startUp() throws Exception { + tdengineServer1 = new GenericContainer<>(DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(NETWORK_ALIASES1) + .withExposedPorts(PORT) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE))) + .waitingFor(new HostPortWaitStrategy() + .withStartupTimeout(Duration.ofMinutes(2))); + tdengineServer2 = new GenericContainer<>(DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(NETWORK_ALIASES2) + .withExposedPorts(PORT) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE))) + .waitingFor(new HostPortWaitStrategy() + .withStartupTimeout(Duration.ofMinutes(2))); + Startables.deepStart(Stream.of(tdengineServer1)).join(); + Startables.deepStart(Stream.of(tdengineServer2)).join(); + log.info("TDengine container started"); + connection1 = createConnect(tdengineServer1); + connection2 = createConnect(tdengineServer2); + // wait for TDengine fully start + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .atMost(120, TimeUnit.SECONDS) + .untilAsserted(() -> Assertions.assertEquals(Boolean.TRUE, connection1.isValid(100) & connection2.isValid(100))); + testDataCount = generateTestDataSet(); + log.info("tdengine testDataCount=" + testDataCount); // rowCount=8 + } + + @SneakyThrows + private int generateTestDataSet() { + int rowCount; + try (Statement stmt = connection1.createStatement()) { + stmt.execute("CREATE DATABASE power KEEP 3650"); + stmt.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " + + "TAGS (location BINARY(64), groupId INT)"); + String sql = getSQL(); + rowCount = stmt.executeUpdate(sql); + + } + try (Statement stmt = connection2.createStatement()) { + stmt.execute("CREATE DATABASE power2 KEEP 3650"); + stmt.execute("CREATE STABLE power2.meters2 (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " + + "TAGS (location BINARY(64), groupId INT)"); + } + return rowCount; + } + + @TestTemplate + public void testTDengine(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob("/tdengine/tdengine_source_to_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + long rowCountInserted = readSinkDataset(); + Assertions.assertEquals(rowCountInserted, testDataCount); + } + + @SneakyThrows + private long readSinkDataset() { + long rowCount; + try (Statement stmt = connection2.createStatement()) { + ResultSet resultSet = stmt.executeQuery("select count(1) from power2.meters2;"); + resultSet.next(); + rowCount = resultSet.getLong(1); + } + return rowCount; + } + + @SneakyThrows + private Connection createConnect(GenericContainer tdengineServer) { + String jdbcUrl = "jdbc:TAOS-RS://" + tdengineServer.getHost() + ":" + tdengineServer.getFirstMappedPort() + "?user=root&password=taosdata"; + Connection conn = DriverManager.getConnection(jdbcUrl); + log.info("TDengine Connected! " + jdbcUrl); + return conn; + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (connection1 != null) { + connection1.close(); + } + if (connection2 != null) { + connection2.close(); + } + if (tdengineServer1 != null) { + tdengineServer1.stop(); + } + if (tdengineServer2 != null) { + tdengineServer2.stop(); + } + } + + /** + * The generated SQL is: + * INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000) + * power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000) + * power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000) + * power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000) + * power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000) + * power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000) + * power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000) + * power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000) + */ + private static String getSQL() { + StringBuilder sb = new StringBuilder("INSERT INTO "); + for (String line : getRawData()) { + String[] ps = line.split(","); + sb.append("power." + ps[0]).append(" USING power.meters TAGS(") + .append(ps[5]).append(", ") // tag: location + .append(ps[6]) // tag: groupId + .append(") VALUES(") + .append('\'').append(ps[1]).append('\'').append(",") // ts + .append(ps[2]).append(",") // current + .append(ps[3]).append(",") // voltage + .append(ps[4]).append(") "); // phase + } + return sb.toString(); + } + + private static List getRawData() { + return Arrays.asList( + "d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,'California.SanFrancisco',2", + "d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,'California.SanFrancisco',2", + "d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,'California.SanFrancisco',2", + "d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,'California.SanFrancisco',3", + "d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,'California.LosAngeles',2", + "d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,'California.LosAngeles',2", + "d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,'California.LosAngeles',3", + "d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,'California.LosAngeles',3" + ); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf new file mode 100644 index 00000000000..803a10f8394 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 2 + job.mode = "STREAMING" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + TDengine { + url : "jdbc:TAOS-RS://flink_e2e_tdengine_src:6041/" + username : "root" + password : "taosdata" + database : "power" + stable : "meters" + lower_bound : "2018-10-03 14:38:05.000" + upper_bound : "2018-10-03 14:38:16.801" + result_table_name = "tdengine_result" + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + +transform { +} + +sink { + TDengine { + url : "jdbc:TAOS-RS://flink_e2e_tdengine_sink:6041/" + username : "root" + password : "taosdata" + database : "power2" + stable : "meters2" + timezone : "UTC" + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 466e02041fa..457217735c8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -45,6 +45,7 @@ connector-cdc-mysql-e2e connector-iceberg-e2e connector-iceberg-hadoop3-e2e + connector-tdengine-e2e connector-datahub-e2e diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java index 6fa2aa94a3a..353ada932e2 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java @@ -123,7 +123,9 @@ public InternalRow get() { public void close() throws IOException { running = false; try { - internalSource.close(); + if (internalSource != null) { + internalSource.close(); + } } catch (Exception e) { throw new RuntimeException(e); }