diff --git a/docs/en/connector-v2/sink/Druid.md b/docs/en/connector-v2/sink/Druid.md new file mode 100644 index 00000000000..cbf290b2c37 --- /dev/null +++ b/docs/en/connector-v2/sink/Druid.md @@ -0,0 +1,101 @@ +# Druid + +> Druid sink connector + +## Description + +Write data to Apache Druid. + + +## Options + +| name | type | required | default value | +| ----------------------- | -------- | -------- | ------------- | +| coordinator_url | `String` | yes | - | +| datasource | `String` | yes | - | +| columns | `List` | yes| __time | +| timestamp_column | `String` | no | timestamp | +| timestamp_format | `String` | no | auto | +| timestamp_missing_value | `String` | no | - | + +### coordinator_url [`String`] + +The URL of Coordinator service in Apache Druid. + +### datasource [`String`] + +The DataSource name in Apache Druid. + +### columns [`List`] + +These columns that you want to write of Druid. + +### timestamp_column [`String`] + +The timestamp column name in Apache Druid, the default value is `timestamp`. + +### timestamp_format [`String`] + +The timestamp format in Apache Druid, the default value is `auto`, it could be: + +- `iso` + - ISO8601 with 'T' separator, like "2000-01-01T01:02:03.456" + +- `posix` + - seconds since epoch + +- `millis` + - milliseconds since epoch + +- `micro` + - microseconds since epoch + +- `nano` + - nanoseconds since epoch + +- `auto` + - automatically detects ISO (either 'T' or space separator) or millis format + +- any [Joda DateTimeFormat](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html) string + +### timestamp_missing_value [`String`] + +The timestamp missing value in Apache Druid, which is used for input records that have a null or missing timestamp. The value of `timestamp_missing_value` should be in ISO 8601 format, for example `"2022-02-02T02:02:02.222"`. + +## Example + +### Simple + +```hocon +DruidSink { + coordinator_url = "http://localhost:8081/" + datasource = "wikipedia" + columns = ["flags","page"] +} +``` + +### Specified timestamp column and format + +```hocon +DruidSink { + coordinator_url = "http://localhost:8081/" + datasource = "wikipedia" + timestamp_column = "timestamp" + timestamp_format = "auto" + columns = ["flags","page"] +} +``` + +### Specified timestamp column, format and missing value + +```hocon +DruidSink { + coordinator_url = "http://localhost:8081/" + datasource = "wikipedia" + timestamp_column = "timestamp" + timestamp_format = "auto" + timestamp_missing_value = "2022-02-02T02:02:02.222" + columns = ["flags","page"] +} +``` + diff --git a/docs/en/connector-v2/source/Druid.md b/docs/en/connector-v2/source/Druid.md new file mode 100644 index 00000000000..4343fd31aa0 --- /dev/null +++ b/docs/en/connector-v2/source/Druid.md @@ -0,0 +1,54 @@ +# Druid + +> Druid source connector + +## Description + +Read data from Apache Druid. + +## Options + +| name | type | required | default value | +| ---------- | -------------- | -------- | ------------- | +| url | `String` | yes | - | +| datasource | `String` | yes | - | +| start_date | `String` | no | - | +| end_date | `String` | no | - | +| columns | `List` | no | `*` | + +### url [`String`] + +The URL of JDBC of Apache Druid. + +### datasource [`String`] + +The DataSource name in Apache Druid. + +### start_date [`String`] + +The start date of DataSource, for example, `'2016-06-27'`, `'2016-06-27 00:00:00'`, etc. + +### end_date [`String`] + +The end date of DataSource, for example, `'2016-06-28'`, `'2016-06-28 00:00:00'`, etc. + +### columns [`List`] + +These columns that you want to write of DataSource. + +### common options [string] + +Source Plugin common parameters, refer to [Source Plugin](common-options.mdx) for details + + +## Example + +```hocon +DruidSource { + url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/" + datasource = "wikipedia" + start_date = "2016-06-27 00:00:00" + end_date = "2016-06-28 00:00:00" + columns = ["flags","page"] +} +``` diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 7eecae29397..178cea18d1c 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -14,113 +14,104 @@ # See the License for the specific language governing permissions and # limitations under the License. # - # This mapping is used to resolve the Jar package name without version (or call artifactId) # corresponding to the module in the user Config, helping SeaTunnel to load the correct Jar package. - # Flink Source -flink.source.DruidSource = seatunnel-connector-flink-druid -flink.source.FakeSource = seatunnel-connector-flink-fake -flink.source.FakeSourceStream = seatunnel-connector-flink-fake -flink.source.FileSource = seatunnel-connector-flink-file -flink.source.InfluxDbSource = seatunnel-connector-flink-influxdb -flink.source.JdbcSource = seatunnel-connector-flink-jdbc -flink.source.KafkaTableStream = seatunnel-connector-flink-kafka -flink.source.SocketStream = seatunnel-connector-flink-socket -flink.source.Http = seatunnel-connector-flink-http - +flink.source.DruidSource=seatunnel-connector-flink-druid +flink.source.FakeSource=seatunnel-connector-flink-fake +flink.source.FakeSourceStream=seatunnel-connector-flink-fake +flink.source.FileSource=seatunnel-connector-flink-file +flink.source.InfluxDbSource=seatunnel-connector-flink-influxdb +flink.source.JdbcSource=seatunnel-connector-flink-jdbc +flink.source.KafkaTableStream=seatunnel-connector-flink-kafka +flink.source.SocketStream=seatunnel-connector-flink-socket +flink.source.Http=seatunnel-connector-flink-http # Flink Sink - -flink.sink.Clickhouse = seatunnel-connector-flink-clickhouse -flink.sink.ClickhouseFile = seatunnel-connector-flink-clickhouse -flink.sink.ConsoleSink = seatunnel-connector-flink-console -flink.sink.DorisSink = seatunnel-connector-flink-doris -flink.sink.DruidSink = seatunnel-connector-flink-druid -flink.sink.ElasticSearch = seatunnel-connector-flink-elasticsearch7 -flink.sink.FileSink = seatunnel-connector-flink-file -flink.sink.InfluxDbSink = seatunnel-connector-flink-influxdb -flink.sink.JdbcSink = seatunnel-connector-flink-jdbc -flink.sink.Kafka = seatunnel-connector-flink-kafka -flink.sink.AssertSink = seatunnel-connector-flink-assert - +flink.sink.Clickhouse=seatunnel-connector-flink-clickhouse +flink.sink.ClickhouseFile=seatunnel-connector-flink-clickhouse +flink.sink.ConsoleSink=seatunnel-connector-flink-console +flink.sink.DorisSink=seatunnel-connector-flink-doris +flink.sink.DruidSink=seatunnel-connector-flink-druid +flink.sink.ElasticSearch=seatunnel-connector-flink-elasticsearch7 +flink.sink.FileSink=seatunnel-connector-flink-file +flink.sink.InfluxDbSink=seatunnel-connector-flink-influxdb +flink.sink.JdbcSink=seatunnel-connector-flink-jdbc +flink.sink.Kafka=seatunnel-connector-flink-kafka +flink.sink.AssertSink=seatunnel-connector-flink-assert # Spark Source - -spark.source.ElasticSearch = seatunnel-connector-spark-elasticsearch -spark.source.Fake = seatunnel-connector-spark-fake -spark.source.FakeStream = seatunnel-connector-spark-fake -spark.source.FeishuSheet = seatunnel-connector-spark-feishu -spark.source.File = seatunnel-connector-spark-file -spark.source.Hbase = seatunnel-connector-spark-hbase -spark.source.Hive = seatunnel-connector-spark-hive -spark.source.Http = seatunnel-connector-spark-http -spark.source.Hudi = seatunnel-connector-spark-hudi -spark.source.Iceberg = seatunnel-connector-spark-iceberg -spark.source.Jdbc = seatunnel-connector-spark-jdbc -spark.source.KafkaStream = seatunnel-connector-spark-kafka -spark.source.Kudu = seatunnel-connector-spark-kudu -spark.source.MongoDB = seatunnel-connector-spark-mongodb -spark.source.Neo4j = seatunnel-connector-spark-neo4j -spark.source.Phoenix = seatunnel-connector-spark-phoenix -spark.source.Redis = seatunnel-connector-spark-redis -spark.source.SocketStream = seatunnel-connector-spark-socket -spark.source.TiDB = seatunnel-connector-spark-tidb - +spark.source.ElasticSearch=seatunnel-connector-spark-elasticsearch +spark.source.Fake=seatunnel-connector-spark-fake +spark.source.FakeStream=seatunnel-connector-spark-fake +spark.source.FeishuSheet=seatunnel-connector-spark-feishu +spark.source.File=seatunnel-connector-spark-file +spark.source.Hbase=seatunnel-connector-spark-hbase +spark.source.Hive=seatunnel-connector-spark-hive +spark.source.Http=seatunnel-connector-spark-http +spark.source.Hudi=seatunnel-connector-spark-hudi +spark.source.Iceberg=seatunnel-connector-spark-iceberg +spark.source.Jdbc=seatunnel-connector-spark-jdbc +spark.source.KafkaStream=seatunnel-connector-spark-kafka +spark.source.Kudu=seatunnel-connector-spark-kudu +spark.source.MongoDB=seatunnel-connector-spark-mongodb +spark.source.Neo4j=seatunnel-connector-spark-neo4j +spark.source.Phoenix=seatunnel-connector-spark-phoenix +spark.source.Redis=seatunnel-connector-spark-redis +spark.source.SocketStream=seatunnel-connector-spark-socket +spark.source.TiDB=seatunnel-connector-spark-tidb # Spark Sink - -spark.sink.Clickhouse = seatunnel-connector-spark-clickhouse -spark.sink.ClickhouseFile = seatunnel-connector-spark-clickhouse -spark.sink.Console = seatunnel-connector-spark-console -spark.sink.Doris = seatunnel-connector-spark-doris -spark.sink.ElasticSearch = seatunnel-connector-spark-elasticsearch -spark.sink.Email = seatunnel-connector-spark-email -spark.sink.File = seatunnel-connector-spark-file -spark.sink.Hbase = seatunnel-connector-spark-hbase -spark.sink.Hive = seatunnel-connector-spark-hive -spark.sink.Hudi = seatunnel-connector-spark-hudi -spark.sink.Iceberg = seatunnel-connector-spark-iceberg -spark.sink.Jdbc = seatunnel-connector-spark-jdbc -spark.sink.Kafka = seatunnel-connector-spark-kafka -spark.sink.Kudu = seatunnel-connector-spark-kudu -spark.sink.MongoDB = seatunnel-connector-spark-mongodb -spark.sink.Phoenix = seatunnel-connector-spark-phoenix -spark.sink.Redis = seatunnel-connector-spark-redis -spark.sink.TiDB = seatunnel-connector-spark-tidb - +spark.sink.Clickhouse=seatunnel-connector-spark-clickhouse +spark.sink.ClickhouseFile=seatunnel-connector-spark-clickhouse +spark.sink.Console=seatunnel-connector-spark-console +spark.sink.Doris=seatunnel-connector-spark-doris +spark.sink.ElasticSearch=seatunnel-connector-spark-elasticsearch +spark.sink.Email=seatunnel-connector-spark-email +spark.sink.File=seatunnel-connector-spark-file +spark.sink.Hbase=seatunnel-connector-spark-hbase +spark.sink.Hive=seatunnel-connector-spark-hive +spark.sink.Hudi=seatunnel-connector-spark-hudi +spark.sink.Iceberg=seatunnel-connector-spark-iceberg +spark.sink.Jdbc=seatunnel-connector-spark-jdbc +spark.sink.Kafka=seatunnel-connector-spark-kafka +spark.sink.Kudu=seatunnel-connector-spark-kudu +spark.sink.MongoDB=seatunnel-connector-spark-mongodb +spark.sink.Phoenix=seatunnel-connector-spark-phoenix +spark.sink.Redis=seatunnel-connector-spark-redis +spark.sink.TiDB=seatunnel-connector-spark-tidb # SeaTunnel new connector API - -seatunnel.source.FakeSource = connector-fake -seatunnel.sink.Console = connector-console -seatunnel.sink.Assert = connector-assert -seatunnel.source.Kafka = connector-kafka -seatunnel.sink.Kafka = connector-kafka -seatunnel.source.Http = connector-http-base -seatunnel.sink.Http = connector-http-base -seatunnel.sink.Feishu = connector-http-feishu -seatunnel.source.Socket = connector-socket -seatunnel.sink.Hive = connector-hive -seatunnel.source.Hive = connector-hive -seatunnel.source.Clickhouse = connector-clickhouse -seatunnel.sink.Clickhouse = connector-clickhouse -seatunnel.sink.ClickhouseFile = connector-clickhouse -seatunnel.source.Jdbc = connector-jdbc -seatunnel.sink.Jdbc = connector-jdbc -seatunnel.source.Kudu = connector-kudu -seatunnel.sink.Kudu = connector-kudu -seatunnel.sink.Email = connector-email -seatunnel.source.HdfsFile = connector-file-hadoop -seatunnel.sink.HdfsFile = connector-file-hadoop -seatunnel.source.LocalFile = connector-file-local -seatunnel.sink.LocalFile = connector-file-local -seatunnel.source.OssFile = connector-file-oss -seatunnel.source.Pulsar = connector-pulsar -seatunnel.source.Hudi = connector-hudi -seatunnel.sink.DingTalk = connector-dingtalk -seatunnel.sink.elasticsearch = connector-elasticsearch -seatunnel.source.IoTDB = connector-iotdb -seatunnel.sink.IoTDB = connector-iotdb -seatunnel.sink.Neo4j = connector-neo4j -seatunnel.sink.FtpFile = connector-file-ftp -seatunnel.sink.Socket = connector-socket -seatunnel.source.Redis = connector-redis -seatunnel.sink.DataHub = connector-datahub - +seatunnel.source.FakeSource=connector-fake +seatunnel.sink.Console=connector-console +seatunnel.sink.Assert=connector-assert +seatunnel.source.Kafka=connector-kafka +seatunnel.sink.Kafka=connector-kafka +seatunnel.source.Http=connector-http-base +seatunnel.sink.Http=connector-http-base +seatunnel.sink.Feishu=connector-http-feishu +seatunnel.source.Socket=connector-socket +seatunnel.sink.Hive=connector-hive +seatunnel.source.Hive=connector-hive +seatunnel.source.Clickhouse=connector-clickhouse +seatunnel.sink.Clickhouse=connector-clickhouse +seatunnel.sink.ClickhouseFile=connector-clickhouse +seatunnel.source.Jdbc=connector-jdbc +seatunnel.sink.Jdbc=connector-jdbc +seatunnel.source.Kudu=connector-kudu +seatunnel.sink.Kudu=connector-kudu +seatunnel.sink.Email=connector-email +seatunnel.source.HdfsFile=connector-file-hadoop +seatunnel.sink.HdfsFile=connector-file-hadoop +seatunnel.source.LocalFile=connector-file-local +seatunnel.sink.LocalFile=connector-file-local +seatunnel.source.OssFile=connector-file-oss +seatunnel.source.Pulsar=connector-pulsar +seatunnel.source.Hudi=connector-hudi +seatunnel.sink.DingTalk=connector-dingtalk +seatunnel.sink.elasticsearch=connector-elasticsearch +seatunnel.source.IoTDB=connector-iotdb +seatunnel.sink.IoTDB=connector-iotdb +seatunnel.sink.Neo4j=connector-neo4j +seatunnel.sink.FtpFile=connector-file-ftp +seatunnel.sink.Socket=connector-socket +seatunnel.source.Redis=connector-redis +seatunnel.sink.DataHub=connector-datahub +seatunnel.source.Druid=connector-druid +seatunnel.sink.Druid=connector-druid diff --git a/pom.xml b/pom.xml index 0c4695e5240..52d8cf11ee6 100644 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,7 @@ seatunnel-plugin-discovery seatunnel-formats seatunnel-dist - seatunnel-server + seatunnel-server @@ -197,6 +197,12 @@ 3.10.0 4.2.0 0.10.7 + 4.5.13 + 1.2.83 + 1.15.0 + 0.23.0 + 2.10.14 + 0.22.1 @@ -206,7 +212,36 @@ seatunnel-config-shade ${seatunnel.config.shade.version} - + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + com.alibaba + fastjson + ${fastjson.version} + + + joda-time + joda-time + ${joda-tim.version} + + + org.apache.druid + druid-indexing-service + ${druid.version} + + + org.apache.druid + druid-core + ${druid-core.version} + + + org.apache.calcite.avatica + avatica-core + ${avatica-core.version} + mysql @@ -256,7 +291,7 @@ commons-collections4 ${commons-collections4.version} - + com.beust jcommander @@ -344,7 +379,7 @@ slf4j-log4j12 ${slf4j.version} - + commons-logging commons-logging @@ -848,4 +883,4 @@ - + \ No newline at end of file diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml index ce13e8adb75..37e48375437 100644 --- a/seatunnel-connectors-v2-dist/pom.xml +++ b/seatunnel-connectors-v2-dist/pom.xml @@ -18,8 +18,8 @@ --> - seatunnel @@ -121,12 +121,12 @@ connector-dingtalk ${project.version} - + org.apache.seatunnel connector-kudu ${project.version} - - + + org.apache.seatunnel connector-email ${project.version} @@ -156,6 +156,11 @@ connector-datahub ${project.version} + + org.apache.seatunnel + connector-druid + ${project.version} + diff --git a/seatunnel-connectors-v2/connector-druid/pom.xml b/seatunnel-connectors-v2/connector-druid/pom.xml new file mode 100644 index 00000000000..1ab81976133 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/pom.xml @@ -0,0 +1,55 @@ + + + + seatunnel-connectors-v2 + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-druid + + + 2.6.7 + + + + + org.apache.seatunnel + connector-common + ${project.version} + + + com.fasterxml.jackson.datatype + jackson-datatype-joda + ${jackson-datatype-joda.version} + + + org.apache.httpcomponents + httpclient + + + com.alibaba + fastjson + + + joda-time + joda-time + + + org.apache.druid + druid-indexing-service + + + org.apache.druid + druid-core + + + org.apache.calcite.avatica + avatica-core + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java new file mode 100644 index 00000000000..a4d7e69a00e --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.client; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions; +import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper; + +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.TimeZone; + +@Data +public class DruidInputFormat implements Serializable { + protected static final String COLUMNS_DEFAULT = "*"; + protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE 1=1"; + private static final Logger LOGGER = LoggerFactory.getLogger(DruidInputFormat.class); + protected transient Connection connection; + protected transient PreparedStatement statement; + protected transient ResultSet resultSet; + protected SeaTunnelRowType rowTypeInfo; + protected DruidSourceOptions druidSourceOptions; + protected String quarySQL; + protected boolean hasNext; + + public DruidInputFormat(DruidSourceOptions druidSourceOptions) { + this.druidSourceOptions = druidSourceOptions; + this.rowTypeInfo = initTableField(); + } + + public ResultSetMetaData getResultSetMetaData() throws SQLException { + try { + quarySQL = getSQL(); + connection = DriverManager.getConnection(druidSourceOptions.getUrl()); + statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + return statement.getMetaData(); + } catch (SQLException se) { + throw new SQLException("ResultSetMetaData() failed." + se.getMessage(), se); + } + } + + public void openInputFormat() { + try { + connection = DriverManager.getConnection(druidSourceOptions.getUrl()); + statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + resultSet = statement.executeQuery(); + hasNext = resultSet.next(); + } catch (SQLException se) { + throw new IllegalArgumentException("openInputFormat() failed." + se.getMessage(), se); + } + } + + private String getSQL() throws SQLException { + String columns = COLUMNS_DEFAULT; + String startTimestamp = druidSourceOptions.getStartTimestamp(); + String endTimestamp = druidSourceOptions.getEndTimestamp(); + String dataSource = druidSourceOptions.getDatasource(); + if (druidSourceOptions.getColumns() != null && druidSourceOptions.getColumns().size() > 0) { + columns = String.join(",", druidSourceOptions.getColumns()); + } + String sql = String.format(QUERY_TEMPLATE, columns, dataSource); + if (startTimestamp != null) { + sql += " AND __time >= '" + startTimestamp + "'"; + } + if (endTimestamp != null) { + sql += " AND __time < '" + endTimestamp + "'"; + } + return sql; + } + + public void closeInputFormat() { + try { + if (resultSet != null) { + resultSet.close(); + } + if (statement != null) { + statement.close(); + } + } catch (SQLException se) { + LOGGER.error("DruidInputFormat Statement couldn't be closed", se); + } finally { + statement = null; + resultSet = null; + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException se) { + LOGGER.error("DruidInputFormat Connection couldn't be closed", se); + } finally { + connection = null; + } + } + } + + public boolean reachedEnd() throws IOException { + return !hasNext; + } + + public SeaTunnelRow nextRecord() throws IOException { + try { + if (!hasNext) { + return null; + } + SeaTunnelRow seaTunnelRow = toInternal(resultSet, rowTypeInfo); + // update hasNext after we've read the record + hasNext = resultSet.next(); + return seaTunnelRow; + } catch (SQLException se) { + throw new IOException("Couldn't read data - " + se.getMessage(), se); + } catch (NullPointerException npe) { + throw new IOException("Couldn't access resultSet", npe); + } + } + + public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType rowTypeInfo) throws SQLException { + List fields = new ArrayList<>(); + SeaTunnelDataType[] seaTunnelDataTypes = rowTypeInfo.getFieldTypes(); + + for (int i = 1; i <= seaTunnelDataTypes.length; i++) { + Object seatunnelField; + SeaTunnelDataType seaTunnelDataType = seaTunnelDataTypes[i - 1]; + if (null == rs.getObject(i)) { + seatunnelField = null; + } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getBoolean(i); + } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getByte(i); + } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getShort(i); + } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getInt(i); + } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getLong(i); + } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getFloat(i); + } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getDouble(i); + } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getString(i); + } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) { + Timestamp ts = rs.getTimestamp(i, Calendar.getInstance(TimeZone.getTimeZone("UTC"))); + LocalDateTime localDateTime = LocalDateTime.ofInstant(ts.toInstant(), ZoneId.of("UTC")); // good + seatunnelField = localDateTime; + } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getDate(i); + } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getDate(i); + } else { + throw new IllegalStateException("Unexpected value: " + seaTunnelDataType); + } + + fields.add(seatunnelField); + } + + return new SeaTunnelRow(fields.toArray()); + } + + private SeaTunnelRowType initTableField() { + ArrayList> seaTunnelDataTypes = new ArrayList<>(); + ArrayList fieldNames = new ArrayList<>(); + + try { + ResultSetMetaData resultSetMetaData = getResultSetMetaData(); + for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { + fieldNames.add(resultSetMetaData.getColumnName(i)); + seaTunnelDataTypes.add(DruidTypeMapper.DRUID_TYPE_MAPPS.get(resultSetMetaData.getColumnTypeName(i))); + } + } catch (SQLException e) { + LOGGER.warn("get row type info exception", e); + } + rowTypeInfo = new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType[seaTunnelDataTypes.size()])); + + return rowTypeInfo; + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java new file mode 100644 index 00000000000..62ef37ea87d --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.client; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.joda.JodaModule; +import lombok.Data; +import org.apache.commons.collections.CollectionUtils; +import org.apache.druid.data.input.MaxSizeSplitHintSpec; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InlineInputSource; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Data +public class DruidOutputFormat implements Serializable { + public static final String DEFAULT_LINE_DELIMITER = "\n"; + public static final String DEFAULT_FIELD_DELIMITER = ","; + private static final Logger LOGGER = LoggerFactory.getLogger(DruidOutputFormat.class); + private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp"; + private static final String DEFAULT_TIMESTAMP_FORMAT = "auto"; + private static final String DEFAULT_COLUMN = "__time"; + private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = new DateTime(); + private final transient StringBuilder data; + private final String coordinatorURL; + private final String datasource; + private final String timestampColumn; + private final String timestampFormat; + private final DateTime timestampMissingValue; + private List columns; + + public DruidOutputFormat(String coordinatorURL, + String datasource, + String timestampColumn, + String timestampFormat, + String timestampMissingValue, + List columns + ) { + this.data = new StringBuilder(); + this.coordinatorURL = coordinatorURL; + this.datasource = datasource; + this.timestampColumn = timestampColumn == null ? DEFAULT_TIMESTAMP_COLUMN : timestampColumn; + this.timestampFormat = timestampFormat == null ? DEFAULT_TIMESTAMP_FORMAT : timestampFormat; + this.timestampMissingValue = timestampMissingValue == null ? DEFAULT_TIMESTAMP_MISSING_VALUE : DateTimes.of(timestampMissingValue); + this.columns = columns; + } + + public void write(SeaTunnelRow element) { + int fieldIndex = element.getArity(); + for (int i = 0; i < fieldIndex; i++) { + Object v = element.getField(i); + if (i != 0) { + this.data.append(DEFAULT_FIELD_DELIMITER); + } + if (v != null) { + this.data.append(v); + } + } + this.data.append(DEFAULT_LINE_DELIMITER); + } + + public void closeOutputFormat() { + try { + ParallelIndexIOConfig ioConfig = parallelIndexIOConfig(); + ParallelIndexTuningConfig tuningConfig = tuningConfig(); + ParallelIndexSupervisorTask indexTask = parallelIndexSupervisorTask(ioConfig, tuningConfig); + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JodaModule()); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.configure(MapperFeature.AUTO_DETECT_GETTERS, false); + mapper.configure(MapperFeature.AUTO_DETECT_FIELDS, false); + mapper.configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false); + mapper.configure(MapperFeature.AUTO_DETECT_SETTERS, false); + mapper.configure(SerializationFeature.INDENT_OUTPUT, false); + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + String taskJSON = mapper.writeValueAsString(indexTask); + JSONObject jsonObject = JSON.parseObject(taskJSON); + jsonObject.remove("id"); + jsonObject.remove("groupId"); + jsonObject.remove("resource"); + JSONObject spec = jsonObject.getJSONObject("spec"); + spec.remove("tuningConfig"); + jsonObject.put("spec", spec); + taskJSON = jsonObject.toJSONString(); + + URL url = new URL(this.coordinatorURL + "druid/indexer/v1/task"); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + con.setRequestMethod("POST"); + con.setRequestProperty("Content-Type", "application/json"); + con.setRequestProperty("Accept", "application/json, text/plain, */*"); + con.setDoOutput(true); + try (OutputStream os = con.getOutputStream()) { + byte[] input = taskJSON.getBytes(StandardCharsets.UTF_8); + os.write(input, 0, input.length); + } + try (BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream(), StandardCharsets.UTF_8))) { + StringBuilder response = new StringBuilder(); + String responseLine; + while ((responseLine = br.readLine()) != null) { + response.append(responseLine.trim()); + } + LOGGER.info("Druid write task has been sent, and the response is {}", response.toString()); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private ParallelIndexSupervisorTask parallelIndexSupervisorTask(ParallelIndexIOConfig ioConfig, ParallelIndexTuningConfig tuningConfig) { + return new ParallelIndexSupervisorTask( + null, + null, + null, + new ParallelIndexIngestionSpec( + new DataSchema( + this.datasource, + new TimestampSpec(this.timestampColumn, this.timestampFormat, this.timestampMissingValue), + new DimensionsSpec(Collections.emptyList()), + null, + new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, false, null), + null + ), + ioConfig, + tuningConfig + ), + null + ); + } + + private ParallelIndexIOConfig parallelIndexIOConfig() { + Set columnss = new HashSet<>(); + columnss.add(DEFAULT_COLUMN); + CollectionUtils.addAll(columnss, this.getColumns().get(0).split(",")); + columnss.add(timestampColumn); + + return new ParallelIndexIOConfig( + null, + new InlineInputSource(this.data.toString()), + new CsvInputFormat( + new ArrayList<>(columnss), + "|", + null, + false, + 0 + ), + false, + null + ); + } + + private ParallelIndexTuningConfig tuningConfig() { + return new ParallelIndexTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + new MaxSizeSplitHintSpec(null, 1), + null, + null, + null, + null, + false, + null, + null, + null, + null, + 1, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkConfig.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkConfig.java new file mode 100644 index 00000000000..493db86a6c9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkConfig.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.config; + +public class DruidSinkConfig { + public static final String COORDINATOR_URL = "coordinator_url"; + public static final String DATASOURCE = "datasource"; + public static final String TIMESTAMP_COLUMN = "timestamp_column"; + public static final String TIMESTAMP_FORMAT = "timestamp_format"; + public static final String TIMESTAMP_MISSING_VALUE = "timestamp_missing_value"; + public static final String COLUMNS = "columns"; +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java new file mode 100644 index 00000000000..c351767eb0f --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.joda.time.DateTime; + +import java.io.Serializable; +import java.util.List; + +@Data +@AllArgsConstructor +public class DruidSinkOptions implements Serializable { + private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp"; + private static final String DEFAULT_TIMESTAMP_FORMAT = "auto"; + private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = null; + private static final int DEFAULT_PARALLELISM = 1; + private String coordinatorURL; + private String datasource; + private String timestampColumn; + private String timestampFormat; + private String timestampMissingValue; + private List columns; + private int parallelism; + + public DruidSinkOptions(Config pluginConfig) { + this.coordinatorURL = pluginConfig.getString(DruidSinkConfig.COORDINATOR_URL); + this.datasource = pluginConfig.getString(DruidSinkConfig.DATASOURCE); + this.columns = pluginConfig.getStringList(DruidSinkConfig.COLUMNS); + this.timestampColumn = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_COLUMN) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_COLUMN) : null; + this.timestampFormat = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_FORMAT) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_FORMAT) : null; + this.timestampMissingValue = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_MISSING_VALUE) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_MISSING_VALUE) : null; + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceConfig.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceConfig.java new file mode 100644 index 00000000000..4fe7aa144f0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceConfig.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.config; + +public class DruidSourceConfig { + public static final String URL = "url"; + public static final String DATASOURCE = "datasource"; + public static final String START_TIMESTAMP = "start_date"; + public static final String END_TIMESTAMP = "end_date"; + public static final String COLUMNS = "columns"; +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceOptions.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceOptions.java new file mode 100644 index 00000000000..87333ca78cb --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceOptions.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +@AllArgsConstructor +public class DruidSourceOptions implements Serializable { + private String url; + private String datasource; + private String startTimestamp; + private String endTimestamp; + private List columns; + + private String partitionColumn; + private Long partitionUpperBound; + private Long partitionLowerBound; + private Integer parallelism; + + public DruidSourceOptions(Config pluginConfig) { + this.url = pluginConfig.getString(DruidSourceConfig.URL); + this.datasource = pluginConfig.getString(DruidSourceConfig.DATASOURCE); + this.columns = pluginConfig.hasPath(DruidSourceConfig.COLUMNS) ? pluginConfig.getStringList(DruidSourceConfig.COLUMNS) : null; + this.startTimestamp = pluginConfig.hasPath(DruidSourceConfig.START_TIMESTAMP) ? pluginConfig.getString(DruidSourceConfig.START_TIMESTAMP) : null; + this.endTimestamp = pluginConfig.hasPath(DruidSourceConfig.END_TIMESTAMP) ? pluginConfig.getString(DruidSourceConfig.END_TIMESTAMP) : null; + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidTypeMapper.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidTypeMapper.java new file mode 100644 index 00000000000..9506b84f252 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidTypeMapper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.config; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; + +import java.util.HashMap; +import java.util.Map; + +public class DruidTypeMapper { + public static Map> DRUID_TYPE_MAPPS = new HashMap<>(); + + static { + // https://druid.apache.org/docs/latest/querying/sql.html#data-types + DRUID_TYPE_MAPPS.put("CHAR", BasicType.STRING_TYPE); + DRUID_TYPE_MAPPS.put("VARCHAR", BasicType.STRING_TYPE); + DRUID_TYPE_MAPPS.put("DECIMAL", BasicType.DOUBLE_TYPE); + DRUID_TYPE_MAPPS.put("FLOAT", BasicType.FLOAT_TYPE); + DRUID_TYPE_MAPPS.put("REAL", BasicType.DOUBLE_TYPE); + DRUID_TYPE_MAPPS.put("DOUBLE", BasicType.DOUBLE_TYPE); + DRUID_TYPE_MAPPS.put("BOOLEAN", BasicType.LONG_TYPE); + DRUID_TYPE_MAPPS.put("TINYINT", BasicType.LONG_TYPE); + DRUID_TYPE_MAPPS.put("SMALLINT", BasicType.LONG_TYPE); + DRUID_TYPE_MAPPS.put("INTEGER", BasicType.LONG_TYPE); + DRUID_TYPE_MAPPS.put("BIGINT", BasicType.LONG_TYPE); + DRUID_TYPE_MAPPS.put("TIMESTAMP", LocalTimeType.LOCAL_DATE_TIME_TYPE); + DRUID_TYPE_MAPPS.put("DATE", LocalTimeType.LOCAL_DATE_TYPE); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSink.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSink.java new file mode 100644 index 00000000000..448343d66b7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSink.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSinkOptions; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSink.class) +public class DruidSink extends AbstractSimpleSink { + + private Config config; + private SeaTunnelRowType seaTunnelRowType; + private DruidSinkOptions druidSinkOptions; + + @Override + public String getPluginName() { + return "Druid"; + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new DruidSinkWriter(seaTunnelRowType, druidSinkOptions); + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + this.config = pluginConfig; + this.druidSinkOptions = new DruidSinkOptions(this.config); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSinkWriter.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSinkWriter.java new file mode 100644 index 00000000000..a90e4f2a63d --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSinkWriter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.druid.client.DruidOutputFormat; +import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSinkOptions; + +import lombok.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class DruidSinkWriter extends AbstractSinkWriter { + private static final Logger LOGGER = LoggerFactory.getLogger(DruidSinkWriter.class); + private static final long serialVersionUID = -7210857670269773005L; + private SeaTunnelRowType seaTunnelRowType; + private DruidOutputFormat druidOutputFormat; + + public DruidSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowType, + @NonNull DruidSinkOptions druidSinkOptions) { + this.seaTunnelRowType = seaTunnelRowType; + druidOutputFormat = new DruidOutputFormat(druidSinkOptions.getCoordinatorURL(), + druidSinkOptions.getDatasource(), + druidSinkOptions.getTimestampColumn(), + druidSinkOptions.getTimestampFormat(), + druidSinkOptions.getTimestampMissingValue(), + druidSinkOptions.getColumns() + ); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + druidOutputFormat.write(element); + } + + @Override + public void close() throws IOException { + druidOutputFormat.closeOutputFormat(); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSource.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSource.java new file mode 100644 index 00000000000..dd4ba46b0a8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSource.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.source; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.druid.client.DruidInputFormat; +import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@AutoService(SeaTunnelSource.class) +public class DruidSource extends AbstractSingleSplitSource { + private static final Logger LOGGER = LoggerFactory.getLogger(DruidSource.class); + + private SeaTunnelRowType rowTypeInfo; + private DruidInputFormat druidInputFormat; + private DruidSourceOptions druidSourceOptions; + + @Override + public String getPluginName() { + return "Druid"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + LOGGER.info("Druid source prepare"); + try { + druidSourceOptions = new DruidSourceOptions(pluginConfig); + druidInputFormat = new DruidInputFormat(druidSourceOptions); + this.rowTypeInfo = druidInputFormat.getRowTypeInfo(); + } catch (Exception e) { + throw new PrepareFailException("Druid", PluginType.SOURCE, e.toString()); + } + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.rowTypeInfo; + } + + @Override + public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception { + LOGGER.info("Druid source createReader"); + return new DruidSourceReader(readerContext, this.druidInputFormat); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSourceReader.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSourceReader.java new file mode 100644 index 00000000000..061f0c24510 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSourceReader.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.source; + +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.druid.client.DruidInputFormat; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class DruidSourceReader extends AbstractSingleSplitReader { + private static final Logger LOGGER = LoggerFactory.getLogger(DruidSourceReader.class); + + private final SingleSplitReaderContext context; + private final DruidInputFormat druidInputFormat; + + public DruidSourceReader(SingleSplitReaderContext context, DruidInputFormat druidInputFormat) { + this.context = context; + this.druidInputFormat = druidInputFormat; + } + + @Override + public void open() throws Exception { + druidInputFormat.openInputFormat(); + } + + @Override + public void close() throws IOException { + druidInputFormat.closeInputFormat(); + } + + @Override + public void pollNext(Collector output) throws Exception { + while (!druidInputFormat.reachedEnd()) { + SeaTunnelRow seaTunnelRow = druidInputFormat.nextRecord(); + output.collect(seaTunnelRow); + } + druidInputFormat.closeInputFormat(); + if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + LOGGER.info("Closed the bounded Druid source"); + context.signalNoMoreElement(); + } + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 42b50f6b0cb..c408d3eaad0 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -52,6 +52,7 @@ connector-neo4j connector-redis connector-datahub + connector-druid diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml index 52db441007b..f5188549b50 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml @@ -76,6 +76,11 @@ connector-dingtalk ${project.version} + + org.apache.seatunnel + connector-druid + ${project.version} + diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDruidExample.java b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDruidExample.java new file mode 100644 index 00000000000..dfa4ca85e30 --- /dev/null +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDruidExample.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.example.flink.v2; + +import org.apache.seatunnel.core.starter.Seatunnel; +import org.apache.seatunnel.core.starter.command.Command; +import org.apache.seatunnel.core.starter.exception.CommandException; +import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; +import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder; + +import java.io.FileNotFoundException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; + +public class SeaTunnelDruidExample { + + public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException { + String configurePath = args.length > 0 ? args[0] : "/examples/druid_to_druid.conf"; + String configFile = getTestConfigFile(configurePath); + FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs(); + flinkCommandArgs.setConfigFile(configFile); + flinkCommandArgs.setCheckConfig(false); + flinkCommandArgs.setVariables(null); + Command flinkCommand = + new FlinkCommandBuilder().buildCommand(flinkCommandArgs); + Seatunnel.run(flinkCommand); + } + + public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException { + URL resource = SeaTunnelDruidExample.class.getResource(configFile); + if (resource == null) { + throw new FileNotFoundException("Can't find config file: " + configFile); + } + return Paths.get(resource.toURI()).toString(); + } +} diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/druid_to_druid.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/druid_to_druid.conf new file mode 100644 index 00000000000..c6631ee602a --- /dev/null +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/druid_to_druid.conf @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + execution.checkpoint.interval = 10000 + execution.checkpoint.data-uri = "file:///D://data//ck" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Druid { + result_table_name = "fake" + url = "jdbc:avatica:remote:url=http://192.168.0.173:8082/druid/v2/sql/avatica/" + datasource = "wikipedia" + start_date = "2016-06-27 21:30:00" + end_date = "2016-06-28 00:00:00" + columns = ["*"] + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake +} + +transform { + sql { + sql = "select * from fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql +} + +sink { +// Console {} + + Druid { + coordinator_url = "http://192.168.0.173:8081/" + datasource = "guang00000x" + timestamp_column = "timestamp" + timestamp_format = "auto" + timestamp_missing_value = "2022-02-09T02:02:02.222" + columns = ["__time,added,user"] +} + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console +} \ No newline at end of file diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml index 90dbe5c17d1..3cd5c6e9723 100644 --- a/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml +++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml @@ -55,6 +55,11 @@ connector-clickhouse ${project.version} + + org.apache.seatunnel + connector-druid + ${project.version} + @@ -79,6 +84,27 @@ ${spark.scope} + + org.antlr + antlr4-runtime + 4.7.1 + compile + + + + org.roaringbitmap + RoaringBitmap + 0.9.22 + compile + + + + com.fasterxml.jackson.core + jackson-databind + 2.6.7 + compile + + org.apache.spark spark-hive_${scala.binary.version} diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiToDruidExample.java b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiToDruidExample.java new file mode 100644 index 00000000000..e35b81642f3 --- /dev/null +++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiToDruidExample.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.example.spark.v2; + +import org.apache.seatunnel.core.starter.exception.CommandException; + +import java.io.FileNotFoundException; +import java.net.URISyntaxException; + +public class SeaTunnelApiToDruidExample { + + public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException { + String configurePath = args.length > 0 ? args[0] : "/examples/spark.batch.druid.conf"; + ExampleUtils.builder(configurePath); + } +} diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.druid.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.druid.conf new file mode 100644 index 00000000000..a3bf687c108 --- /dev/null +++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.druid.conf @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + spark.app.name = "SeaTunnelToDruidV2" + spark.executor.instances = 2 + spark.executor.cores = 2 + spark.executor.memory = "2g" + spark.master = local +} + +source { + Druid { + result_table_name = "fake" + url = "jdbc:avatica:remote:url=http://192.168.0.173:8082/druid/v2/sql/avatica/" + datasource = "wikipedia" + start_date = "2016-06-27 21:30:00" + end_date = "2016-06-28 00:00:00" + columns = ["*"] + } + # If you would like to get more information about how to configure seatunnel and see full list of input plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + + +transform { + sql { + sql = "select added, channel, cityName, comment, commentLength, countryIsoCode, countryName, deleted, delta, deltaBucket, diffUrl, flags, isAnonymous, isMinor, isNew, isRobot, isUnpatrolled, metroCode, namespace, page, regionIsoCode, regionName, user from fake" + result_table_name = "sql" + } +} + +sink { +// Console {} + Druid { + coordinator_url = "http://192.168.0.173:8081/" + datasource = "gu" + timestamp_column = "timestamp" + timestamp_format = "auto" + timestamp_missing_value = "2022-02-09T02:02:02.222" + columns = ["countryIsoCode"] + } +}