From 853fcc41b582dc65ee23a7e02a935ba49c33a115 Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Wed, 4 Jan 2023 00:31:53 +0800 Subject: [PATCH 1/4] [Improve][Connector-V2][Doris] Change Doris Config Prefix --- docs/en/connector-v2/sink/Doris.md | 22 ++++++++++++------- .../connectors/doris/config/SinkConfig.java | 20 ++++++++--------- .../doris/sink/DorisSinkFactory.java | 2 +- 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index df75b5ee7f2..9c85c587cdc 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -25,7 +25,7 @@ The internal implementation of Doris sink connector is cached and imported by st | max_retries | int | no | 1 | | retry_backoff_multiplier_ms | int | no | - | | max_retry_backoff_ms | int | no | - | -| sink.properties.* | doris stream load config | no | - | +| doris.config | map | no | - | ### node_urls [list] @@ -75,11 +75,9 @@ Using as a multiplier for generating the next delay for backoff The amount of time to wait before attempting to retry a request to `Doris` -### sink.properties.* [doris stream load config] +### doris.config [map] The parameter of the stream load `data_desc` -The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name -For example, the way to specify `strip_outer_array` is: `sink.properties.strip_outer_array` #### Supported import data formats @@ -98,8 +96,10 @@ sink { database = "test" table = "e2e_table_sink" batch_max_rows = 100 - sink.properties.format = "JSON" - sink.properties.strip_outer_array = true + doris.config = { + format = "JSON" + strip_outer_array = true + } } } @@ -118,12 +118,18 @@ sink { batch_max_rows = 100 sink.properties.format = "CSV" sink.properties.column_separator = "," + doris.config = { + format = "CSV" + column_separator = "," + } } } ``` ## Changelog -### next version +### 2.3.0-beta 2022-10-20 +- Add Doris Sink Connector -- Add Doris Sink Connector \ No newline at end of file +### Next version +- [Improve] Change Doris Config Prefix \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java index d1f88cbc678..dab49a8c09d 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java @@ -19,7 +19,7 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -70,11 +70,11 @@ public class SinkConfig { .noDefaultValue() .withDescription("The name of Doris table"); - public static final Option DORIS_SINK_CONFIG_PREFIX = Options.key("sink.properties.") - .stringType() + public static final Option> DORIS_CONFIG = Options.key("doris.config") + .mapType() .noDefaultValue() .withDescription("The parameter of the stream load data_desc. " + - "The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name "); + "The way to specify the parameter is to add the original stream load parameter into map"); public static final Option BATCH_MAX_SIZE = Options.key("batch_max_rows") .intType() @@ -178,11 +178,11 @@ public static SinkConfig loadConfig(Config pluginConfig) { } private static void parseSinkStreamLoadProperties(Config pluginConfig, SinkConfig sinkConfig) { - Config dorisConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig, - DORIS_SINK_CONFIG_PREFIX.key(), false); - dorisConfig.entrySet().forEach(entry -> { - final String configKey = entry.getKey().toLowerCase(); - sinkConfig.streamLoadProps.put(configKey, entry.getValue().unwrapped().toString()); - }); + if (CheckConfigUtil.isValidParam(pluginConfig, DORIS_CONFIG.key())) { + pluginConfig.getObject(DORIS_CONFIG.key()).forEach((key, value) -> { + final String configKey = key.toLowerCase(); + sinkConfig.streamLoadProps.put(configKey, value.unwrapped().toString()); + }); + } } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java index 5fb721e7e20..6d111378fad 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java @@ -37,7 +37,7 @@ public OptionRule optionRule() { .required(SinkConfig.NODE_URLS, SinkConfig.USERNAME, SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.TABLE) .optional(SinkConfig.LABEL_PREFIX, SinkConfig.BATCH_MAX_SIZE, SinkConfig.BATCH_MAX_BYTES, SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES, SinkConfig.MAX_RETRY_BACKOFF_MS, - SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, SinkConfig.DORIS_SINK_CONFIG_PREFIX) + SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, SinkConfig.DORIS_CONFIG) .build(); } } From fc8a7dbb9fff5e2d12568476c55da044aa7f5c6a Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Wed, 4 Jan 2023 00:34:12 +0800 Subject: [PATCH 2/4] [Improve][Connector-V2][Doris] add doc --- docs/en/connector-v2/sink/Doris.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index 9c85c587cdc..d38f7bb553b 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -132,4 +132,4 @@ sink { - Add Doris Sink Connector ### Next version -- [Improve] Change Doris Config Prefix \ No newline at end of file +- [Improve] Change Doris Config Prefix [3856](https://github.com/apache/incubator-seatunnel/pull/3856) \ No newline at end of file From 579674b92a7d1eb047727488c8a7fe2461ad0149 Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Wed, 4 Jan 2023 16:51:28 +0800 Subject: [PATCH 3/4] [Improve][Connector-V2][Doris] change doc --- docs/en/connector-v2/sink/Doris.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index d38f7bb553b..1886fd52309 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -77,7 +77,9 @@ The amount of time to wait before attempting to retry a request to `Doris` ### doris.config [map] -The parameter of the stream load `data_desc` +The parameter of the stream load `data_desc`, you can get more detail at this link: + +https://doris.apache.org/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD/ #### Supported import data formats From 20a380114bf4905da46504866fbad71473f2f6bf Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Wed, 4 Jan 2023 18:56:13 +0800 Subject: [PATCH 4/4] [Improve][Connector-V2][Doris] change e2e test config prefix --- .../src/test/resources/doris-jdbc-to-doris.conf | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris-jdbc-to-doris.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris-jdbc-to-doris.conf index ca6fb043718..3d2ea5f9b9c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris-jdbc-to-doris.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris-jdbc-to-doris.conf @@ -41,8 +41,10 @@ sink { database = "test" table = "e2e_table_sink" batch_max_rows = 100 - sink.properties.format = "JSON" - sink.properties.strip_outer_array = true max_retries = 3 + doris.config = { + format = "JSON" + strip_outer_array = true + } } } \ No newline at end of file