diff --git a/docs/en/connector-v2/sink/Druid.md b/docs/en/connector-v2/sink/Druid.md new file mode 100644 index 00000000000..cbf290b2c37 --- /dev/null +++ b/docs/en/connector-v2/sink/Druid.md @@ -0,0 +1,101 @@ +# Druid + +> Druid sink connector + +## Description + +Write data to Apache Druid. + + +## Options + +| name | type | required | default value | +| ----------------------- | -------- | -------- | ------------- | +| coordinator_url | `String` | yes | - | +| datasource | `String` | yes | - | +| columns | `List` | yes| __time | +| timestamp_column | `String` | no | timestamp | +| timestamp_format | `String` | no | auto | +| timestamp_missing_value | `String` | no | - | + +### coordinator_url [`String`] + +The URL of Coordinator service in Apache Druid. + +### datasource [`String`] + +The DataSource name in Apache Druid. + +### columns [`List`] + +These columns that you want to write of Druid. + +### timestamp_column [`String`] + +The timestamp column name in Apache Druid, the default value is `timestamp`. + +### timestamp_format [`String`] + +The timestamp format in Apache Druid, the default value is `auto`, it could be: + +- `iso` + - ISO8601 with 'T' separator, like "2000-01-01T01:02:03.456" + +- `posix` + - seconds since epoch + +- `millis` + - milliseconds since epoch + +- `micro` + - microseconds since epoch + +- `nano` + - nanoseconds since epoch + +- `auto` + - automatically detects ISO (either 'T' or space separator) or millis format + +- any [Joda DateTimeFormat](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html) string + +### timestamp_missing_value [`String`] + +The timestamp missing value in Apache Druid, which is used for input records that have a null or missing timestamp. The value of `timestamp_missing_value` should be in ISO 8601 format, for example `"2022-02-02T02:02:02.222"`. + +## Example + +### Simple + +```hocon +DruidSink { + coordinator_url = "http://localhost:8081/" + datasource = "wikipedia" + columns = ["flags","page"] +} +``` + +### Specified timestamp column and format + +```hocon +DruidSink { + coordinator_url = "http://localhost:8081/" + datasource = "wikipedia" + timestamp_column = "timestamp" + timestamp_format = "auto" + columns = ["flags","page"] +} +``` + +### Specified timestamp column, format and missing value + +```hocon +DruidSink { + coordinator_url = "http://localhost:8081/" + datasource = "wikipedia" + timestamp_column = "timestamp" + timestamp_format = "auto" + timestamp_missing_value = "2022-02-02T02:02:02.222" + columns = ["flags","page"] +} +``` + diff --git a/docs/en/connector-v2/source/Druid.md b/docs/en/connector-v2/source/Druid.md new file mode 100644 index 00000000000..4343fd31aa0 --- /dev/null +++ b/docs/en/connector-v2/source/Druid.md @@ -0,0 +1,54 @@ +# Druid + +> Druid source connector + +## Description + +Read data from Apache Druid. + +## Options + +| name | type | required | default value | +| ---------- | -------------- | -------- | ------------- | +| url | `String` | yes | - | +| datasource | `String` | yes | - | +| start_date | `String` | no | - | +| end_date | `String` | no | - | +| columns | `List` | no | `*` | + +### url [`String`] + +The URL of JDBC of Apache Druid. + +### datasource [`String`] + +The DataSource name in Apache Druid. + +### start_date [`String`] + +The start date of DataSource, for example, `'2016-06-27'`, `'2016-06-27 00:00:00'`, etc. + +### end_date [`String`] + +The end date of DataSource, for example, `'2016-06-28'`, `'2016-06-28 00:00:00'`, etc. + +### columns [`List`] + +These columns that you want to write of DataSource. + +### common options [string] + +Source Plugin common parameters, refer to [Source Plugin](common-options.mdx) for details + + +## Example + +```hocon +DruidSource { + url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/" + datasource = "wikipedia" + start_date = "2016-06-27 00:00:00" + end_date = "2016-06-28 00:00:00" + columns = ["flags","page"] +} +``` diff --git a/plugin-mapping.properties b/plugin-mapping.properties index bf4b0ce5f9a..4ff38175f4d 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -130,3 +130,5 @@ seatunnel.sink.Sentry = connector-sentry seatunnel.source.MongoDB = connector-mongodb seatunnel.sink.MongoDB = connector-mongodb seatunnel.source.Iceberg = connector-iceberg +seatunnel.source.Druid=connector-druid +seatunnel.sink.Druid=connector-druid \ No newline at end of file diff --git a/pom.xml b/pom.xml index e54d67d69ac..1678604b297 100644 --- a/pom.xml +++ b/pom.xml @@ -198,6 +198,12 @@ 1.1.8.3 3.10.0 4.2.0 + 4.5.13 + 1.2.83 + 1.17.0 + 0.23.0 + 2.10.14 + 0.22.1 @@ -350,6 +356,43 @@ ${awaitility.version} test + + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + + com.alibaba + fastjson + ${fastjson.version} + + + + joda-time + joda-time + ${joda-tim.version} + + + + org.apache.druid + druid-indexing-service + ${druid.version} + + + + org.apache.druid + druid-core + ${druid-core.version} + + + + org.apache.calcite.avatica + avatica-core + ${avatica-core.version} + + diff --git a/seatunnel-connectors-v2/connector-druid/pom.xml b/seatunnel-connectors-v2/connector-druid/pom.xml new file mode 100644 index 00000000000..1ab81976133 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/pom.xml @@ -0,0 +1,55 @@ + + + + seatunnel-connectors-v2 + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-druid + + + 2.6.7 + + + + + org.apache.seatunnel + connector-common + ${project.version} + + + com.fasterxml.jackson.datatype + jackson-datatype-joda + ${jackson-datatype-joda.version} + + + org.apache.httpcomponents + httpclient + + + com.alibaba + fastjson + + + joda-time + joda-time + + + org.apache.druid + druid-indexing-service + + + org.apache.druid + druid-core + + + org.apache.calcite.avatica + avatica-core + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java new file mode 100644 index 00000000000..a4d7e69a00e --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.client; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions; +import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper; + +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.TimeZone; + +@Data +public class DruidInputFormat implements Serializable { + protected static final String COLUMNS_DEFAULT = "*"; + protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE 1=1"; + private static final Logger LOGGER = LoggerFactory.getLogger(DruidInputFormat.class); + protected transient Connection connection; + protected transient PreparedStatement statement; + protected transient ResultSet resultSet; + protected SeaTunnelRowType rowTypeInfo; + protected DruidSourceOptions druidSourceOptions; + protected String quarySQL; + protected boolean hasNext; + + public DruidInputFormat(DruidSourceOptions druidSourceOptions) { + this.druidSourceOptions = druidSourceOptions; + this.rowTypeInfo = initTableField(); + } + + public ResultSetMetaData getResultSetMetaData() throws SQLException { + try { + quarySQL = getSQL(); + connection = DriverManager.getConnection(druidSourceOptions.getUrl()); + statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + return statement.getMetaData(); + } catch (SQLException se) { + throw new SQLException("ResultSetMetaData() failed." + se.getMessage(), se); + } + } + + public void openInputFormat() { + try { + connection = DriverManager.getConnection(druidSourceOptions.getUrl()); + statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + resultSet = statement.executeQuery(); + hasNext = resultSet.next(); + } catch (SQLException se) { + throw new IllegalArgumentException("openInputFormat() failed." + se.getMessage(), se); + } + } + + private String getSQL() throws SQLException { + String columns = COLUMNS_DEFAULT; + String startTimestamp = druidSourceOptions.getStartTimestamp(); + String endTimestamp = druidSourceOptions.getEndTimestamp(); + String dataSource = druidSourceOptions.getDatasource(); + if (druidSourceOptions.getColumns() != null && druidSourceOptions.getColumns().size() > 0) { + columns = String.join(",", druidSourceOptions.getColumns()); + } + String sql = String.format(QUERY_TEMPLATE, columns, dataSource); + if (startTimestamp != null) { + sql += " AND __time >= '" + startTimestamp + "'"; + } + if (endTimestamp != null) { + sql += " AND __time < '" + endTimestamp + "'"; + } + return sql; + } + + public void closeInputFormat() { + try { + if (resultSet != null) { + resultSet.close(); + } + if (statement != null) { + statement.close(); + } + } catch (SQLException se) { + LOGGER.error("DruidInputFormat Statement couldn't be closed", se); + } finally { + statement = null; + resultSet = null; + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException se) { + LOGGER.error("DruidInputFormat Connection couldn't be closed", se); + } finally { + connection = null; + } + } + } + + public boolean reachedEnd() throws IOException { + return !hasNext; + } + + public SeaTunnelRow nextRecord() throws IOException { + try { + if (!hasNext) { + return null; + } + SeaTunnelRow seaTunnelRow = toInternal(resultSet, rowTypeInfo); + // update hasNext after we've read the record + hasNext = resultSet.next(); + return seaTunnelRow; + } catch (SQLException se) { + throw new IOException("Couldn't read data - " + se.getMessage(), se); + } catch (NullPointerException npe) { + throw new IOException("Couldn't access resultSet", npe); + } + } + + public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType rowTypeInfo) throws SQLException { + List fields = new ArrayList<>(); + SeaTunnelDataType[] seaTunnelDataTypes = rowTypeInfo.getFieldTypes(); + + for (int i = 1; i <= seaTunnelDataTypes.length; i++) { + Object seatunnelField; + SeaTunnelDataType seaTunnelDataType = seaTunnelDataTypes[i - 1]; + if (null == rs.getObject(i)) { + seatunnelField = null; + } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getBoolean(i); + } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getByte(i); + } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getShort(i); + } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getInt(i); + } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getLong(i); + } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getFloat(i); + } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getDouble(i); + } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getString(i); + } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) { + Timestamp ts = rs.getTimestamp(i, Calendar.getInstance(TimeZone.getTimeZone("UTC"))); + LocalDateTime localDateTime = LocalDateTime.ofInstant(ts.toInstant(), ZoneId.of("UTC")); // good + seatunnelField = localDateTime; + } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getDate(i); + } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getDate(i); + } else { + throw new IllegalStateException("Unexpected value: " + seaTunnelDataType); + } + + fields.add(seatunnelField); + } + + return new SeaTunnelRow(fields.toArray()); + } + + private SeaTunnelRowType initTableField() { + ArrayList> seaTunnelDataTypes = new ArrayList<>(); + ArrayList fieldNames = new ArrayList<>(); + + try { + ResultSetMetaData resultSetMetaData = getResultSetMetaData(); + for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { + fieldNames.add(resultSetMetaData.getColumnName(i)); + seaTunnelDataTypes.add(DruidTypeMapper.DRUID_TYPE_MAPPS.get(resultSetMetaData.getColumnTypeName(i))); + } + } catch (SQLException e) { + LOGGER.warn("get row type info exception", e); + } + rowTypeInfo = new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType[seaTunnelDataTypes.size()])); + + return rowTypeInfo; + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java new file mode 100644 index 00000000000..62ef37ea87d --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.client; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.joda.JodaModule; +import lombok.Data; +import org.apache.commons.collections.CollectionUtils; +import org.apache.druid.data.input.MaxSizeSplitHintSpec; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InlineInputSource; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Data +public class DruidOutputFormat implements Serializable { + public static final String DEFAULT_LINE_DELIMITER = "\n"; + public static final String DEFAULT_FIELD_DELIMITER = ","; + private static final Logger LOGGER = LoggerFactory.getLogger(DruidOutputFormat.class); + private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp"; + private static final String DEFAULT_TIMESTAMP_FORMAT = "auto"; + private static final String DEFAULT_COLUMN = "__time"; + private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = new DateTime(); + private final transient StringBuilder data; + private final String coordinatorURL; + private final String datasource; + private final String timestampColumn; + private final String timestampFormat; + private final DateTime timestampMissingValue; + private List columns; + + public DruidOutputFormat(String coordinatorURL, + String datasource, + String timestampColumn, + String timestampFormat, + String timestampMissingValue, + List columns + ) { + this.data = new StringBuilder(); + this.coordinatorURL = coordinatorURL; + this.datasource = datasource; + this.timestampColumn = timestampColumn == null ? DEFAULT_TIMESTAMP_COLUMN : timestampColumn; + this.timestampFormat = timestampFormat == null ? DEFAULT_TIMESTAMP_FORMAT : timestampFormat; + this.timestampMissingValue = timestampMissingValue == null ? DEFAULT_TIMESTAMP_MISSING_VALUE : DateTimes.of(timestampMissingValue); + this.columns = columns; + } + + public void write(SeaTunnelRow element) { + int fieldIndex = element.getArity(); + for (int i = 0; i < fieldIndex; i++) { + Object v = element.getField(i); + if (i != 0) { + this.data.append(DEFAULT_FIELD_DELIMITER); + } + if (v != null) { + this.data.append(v); + } + } + this.data.append(DEFAULT_LINE_DELIMITER); + } + + public void closeOutputFormat() { + try { + ParallelIndexIOConfig ioConfig = parallelIndexIOConfig(); + ParallelIndexTuningConfig tuningConfig = tuningConfig(); + ParallelIndexSupervisorTask indexTask = parallelIndexSupervisorTask(ioConfig, tuningConfig); + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JodaModule()); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.configure(MapperFeature.AUTO_DETECT_GETTERS, false); + mapper.configure(MapperFeature.AUTO_DETECT_FIELDS, false); + mapper.configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false); + mapper.configure(MapperFeature.AUTO_DETECT_SETTERS, false); + mapper.configure(SerializationFeature.INDENT_OUTPUT, false); + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + String taskJSON = mapper.writeValueAsString(indexTask); + JSONObject jsonObject = JSON.parseObject(taskJSON); + jsonObject.remove("id"); + jsonObject.remove("groupId"); + jsonObject.remove("resource"); + JSONObject spec = jsonObject.getJSONObject("spec"); + spec.remove("tuningConfig"); + jsonObject.put("spec", spec); + taskJSON = jsonObject.toJSONString(); + + URL url = new URL(this.coordinatorURL + "druid/indexer/v1/task"); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + con.setRequestMethod("POST"); + con.setRequestProperty("Content-Type", "application/json"); + con.setRequestProperty("Accept", "application/json, text/plain, */*"); + con.setDoOutput(true); + try (OutputStream os = con.getOutputStream()) { + byte[] input = taskJSON.getBytes(StandardCharsets.UTF_8); + os.write(input, 0, input.length); + } + try (BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream(), StandardCharsets.UTF_8))) { + StringBuilder response = new StringBuilder(); + String responseLine; + while ((responseLine = br.readLine()) != null) { + response.append(responseLine.trim()); + } + LOGGER.info("Druid write task has been sent, and the response is {}", response.toString()); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private ParallelIndexSupervisorTask parallelIndexSupervisorTask(ParallelIndexIOConfig ioConfig, ParallelIndexTuningConfig tuningConfig) { + return new ParallelIndexSupervisorTask( + null, + null, + null, + new ParallelIndexIngestionSpec( + new DataSchema( + this.datasource, + new TimestampSpec(this.timestampColumn, this.timestampFormat, this.timestampMissingValue), + new DimensionsSpec(Collections.emptyList()), + null, + new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, false, null), + null + ), + ioConfig, + tuningConfig + ), + null + ); + } + + private ParallelIndexIOConfig parallelIndexIOConfig() { + Set columnss = new HashSet<>(); + columnss.add(DEFAULT_COLUMN); + CollectionUtils.addAll(columnss, this.getColumns().get(0).split(",")); + columnss.add(timestampColumn); + + return new ParallelIndexIOConfig( + null, + new InlineInputSource(this.data.toString()), + new CsvInputFormat( + new ArrayList<>(columnss), + "|", + null, + false, + 0 + ), + false, + null + ); + } + + private ParallelIndexTuningConfig tuningConfig() { + return new ParallelIndexTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + new MaxSizeSplitHintSpec(null, 1), + null, + null, + null, + null, + false, + null, + null, + null, + null, + 1, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkConfig.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkConfig.java new file mode 100644 index 00000000000..493db86a6c9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkConfig.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.config; + +public class DruidSinkConfig { + public static final String COORDINATOR_URL = "coordinator_url"; + public static final String DATASOURCE = "datasource"; + public static final String TIMESTAMP_COLUMN = "timestamp_column"; + public static final String TIMESTAMP_FORMAT = "timestamp_format"; + public static final String TIMESTAMP_MISSING_VALUE = "timestamp_missing_value"; + public static final String COLUMNS = "columns"; +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java new file mode 100644 index 00000000000..c351767eb0f --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.joda.time.DateTime; + +import java.io.Serializable; +import java.util.List; + +@Data +@AllArgsConstructor +public class DruidSinkOptions implements Serializable { + private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp"; + private static final String DEFAULT_TIMESTAMP_FORMAT = "auto"; + private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = null; + private static final int DEFAULT_PARALLELISM = 1; + private String coordinatorURL; + private String datasource; + private String timestampColumn; + private String timestampFormat; + private String timestampMissingValue; + private List columns; + private int parallelism; + + public DruidSinkOptions(Config pluginConfig) { + this.coordinatorURL = pluginConfig.getString(DruidSinkConfig.COORDINATOR_URL); + this.datasource = pluginConfig.getString(DruidSinkConfig.DATASOURCE); + this.columns = pluginConfig.getStringList(DruidSinkConfig.COLUMNS); + this.timestampColumn = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_COLUMN) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_COLUMN) : null; + this.timestampFormat = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_FORMAT) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_FORMAT) : null; + this.timestampMissingValue = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_MISSING_VALUE) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_MISSING_VALUE) : null; + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceConfig.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceConfig.java new file mode 100644 index 00000000000..4fe7aa144f0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceConfig.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.config; + +public class DruidSourceConfig { + public static final String URL = "url"; + public static final String DATASOURCE = "datasource"; + public static final String START_TIMESTAMP = "start_date"; + public static final String END_TIMESTAMP = "end_date"; + public static final String COLUMNS = "columns"; +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceOptions.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceOptions.java new file mode 100644 index 00000000000..87333ca78cb --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceOptions.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +@AllArgsConstructor +public class DruidSourceOptions implements Serializable { + private String url; + private String datasource; + private String startTimestamp; + private String endTimestamp; + private List columns; + + private String partitionColumn; + private Long partitionUpperBound; + private Long partitionLowerBound; + private Integer parallelism; + + public DruidSourceOptions(Config pluginConfig) { + this.url = pluginConfig.getString(DruidSourceConfig.URL); + this.datasource = pluginConfig.getString(DruidSourceConfig.DATASOURCE); + this.columns = pluginConfig.hasPath(DruidSourceConfig.COLUMNS) ? pluginConfig.getStringList(DruidSourceConfig.COLUMNS) : null; + this.startTimestamp = pluginConfig.hasPath(DruidSourceConfig.START_TIMESTAMP) ? pluginConfig.getString(DruidSourceConfig.START_TIMESTAMP) : null; + this.endTimestamp = pluginConfig.hasPath(DruidSourceConfig.END_TIMESTAMP) ? pluginConfig.getString(DruidSourceConfig.END_TIMESTAMP) : null; + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidTypeMapper.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidTypeMapper.java new file mode 100644 index 00000000000..9506b84f252 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidTypeMapper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.config; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; + +import java.util.HashMap; +import java.util.Map; + +public class DruidTypeMapper { + public static Map> DRUID_TYPE_MAPPS = new HashMap<>(); + + static { + // https://druid.apache.org/docs/latest/querying/sql.html#data-types + DRUID_TYPE_MAPPS.put("CHAR", BasicType.STRING_TYPE); + DRUID_TYPE_MAPPS.put("VARCHAR", BasicType.STRING_TYPE); + DRUID_TYPE_MAPPS.put("DECIMAL", BasicType.DOUBLE_TYPE); + DRUID_TYPE_MAPPS.put("FLOAT", BasicType.FLOAT_TYPE); + DRUID_TYPE_MAPPS.put("REAL", BasicType.DOUBLE_TYPE); + DRUID_TYPE_MAPPS.put("DOUBLE", BasicType.DOUBLE_TYPE); + DRUID_TYPE_MAPPS.put("BOOLEAN", BasicType.LONG_TYPE); + DRUID_TYPE_MAPPS.put("TINYINT", BasicType.LONG_TYPE); + DRUID_TYPE_MAPPS.put("SMALLINT", BasicType.LONG_TYPE); + DRUID_TYPE_MAPPS.put("INTEGER", BasicType.LONG_TYPE); + DRUID_TYPE_MAPPS.put("BIGINT", BasicType.LONG_TYPE); + DRUID_TYPE_MAPPS.put("TIMESTAMP", LocalTimeType.LOCAL_DATE_TIME_TYPE); + DRUID_TYPE_MAPPS.put("DATE", LocalTimeType.LOCAL_DATE_TYPE); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSink.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSink.java new file mode 100644 index 00000000000..448343d66b7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSink.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSinkOptions; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSink.class) +public class DruidSink extends AbstractSimpleSink { + + private Config config; + private SeaTunnelRowType seaTunnelRowType; + private DruidSinkOptions druidSinkOptions; + + @Override + public String getPluginName() { + return "Druid"; + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new DruidSinkWriter(seaTunnelRowType, druidSinkOptions); + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + this.config = pluginConfig; + this.druidSinkOptions = new DruidSinkOptions(this.config); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSinkWriter.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSinkWriter.java new file mode 100644 index 00000000000..a90e4f2a63d --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSinkWriter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.druid.client.DruidOutputFormat; +import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSinkOptions; + +import lombok.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class DruidSinkWriter extends AbstractSinkWriter { + private static final Logger LOGGER = LoggerFactory.getLogger(DruidSinkWriter.class); + private static final long serialVersionUID = -7210857670269773005L; + private SeaTunnelRowType seaTunnelRowType; + private DruidOutputFormat druidOutputFormat; + + public DruidSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowType, + @NonNull DruidSinkOptions druidSinkOptions) { + this.seaTunnelRowType = seaTunnelRowType; + druidOutputFormat = new DruidOutputFormat(druidSinkOptions.getCoordinatorURL(), + druidSinkOptions.getDatasource(), + druidSinkOptions.getTimestampColumn(), + druidSinkOptions.getTimestampFormat(), + druidSinkOptions.getTimestampMissingValue(), + druidSinkOptions.getColumns() + ); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + druidOutputFormat.write(element); + } + + @Override + public void close() throws IOException { + druidOutputFormat.closeOutputFormat(); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSource.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSource.java new file mode 100644 index 00000000000..dd4ba46b0a8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSource.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.source; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.druid.client.DruidInputFormat; +import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@AutoService(SeaTunnelSource.class) +public class DruidSource extends AbstractSingleSplitSource { + private static final Logger LOGGER = LoggerFactory.getLogger(DruidSource.class); + + private SeaTunnelRowType rowTypeInfo; + private DruidInputFormat druidInputFormat; + private DruidSourceOptions druidSourceOptions; + + @Override + public String getPluginName() { + return "Druid"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + LOGGER.info("Druid source prepare"); + try { + druidSourceOptions = new DruidSourceOptions(pluginConfig); + druidInputFormat = new DruidInputFormat(druidSourceOptions); + this.rowTypeInfo = druidInputFormat.getRowTypeInfo(); + } catch (Exception e) { + throw new PrepareFailException("Druid", PluginType.SOURCE, e.toString()); + } + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.rowTypeInfo; + } + + @Override + public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception { + LOGGER.info("Druid source createReader"); + return new DruidSourceReader(readerContext, this.druidInputFormat); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSourceReader.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSourceReader.java new file mode 100644 index 00000000000..061f0c24510 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSourceReader.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid.source; + +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.druid.client.DruidInputFormat; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class DruidSourceReader extends AbstractSingleSplitReader { + private static final Logger LOGGER = LoggerFactory.getLogger(DruidSourceReader.class); + + private final SingleSplitReaderContext context; + private final DruidInputFormat druidInputFormat; + + public DruidSourceReader(SingleSplitReaderContext context, DruidInputFormat druidInputFormat) { + this.context = context; + this.druidInputFormat = druidInputFormat; + } + + @Override + public void open() throws Exception { + druidInputFormat.openInputFormat(); + } + + @Override + public void close() throws IOException { + druidInputFormat.closeInputFormat(); + } + + @Override + public void pollNext(Collector output) throws Exception { + while (!druidInputFormat.reachedEnd()) { + SeaTunnelRow seaTunnelRow = druidInputFormat.nextRecord(); + output.collect(seaTunnelRow); + } + druidInputFormat.closeInputFormat(); + if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + LOGGER.info("Closed the bounded Druid source"); + context.signalNoMoreElement(); + } + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 4b6c1c3f2d5..2a62c89e4a0 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -55,6 +55,7 @@ connector-sentry connector-mongodb connector-iceberg + connector-druid