Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2][Doris] Change Doris Config Prefix #3856

Merged
merged 5 commits into from Jan 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 14 additions & 8 deletions docs/en/connector-v2/sink/Doris.md
Expand Up @@ -25,7 +25,7 @@ The internal implementation of Doris sink connector is cached and imported by st
| max_retries | int | no | 1 |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| sink.properties.* | doris stream load config | no | - |
| doris.config | map | no | - |

### node_urls [list]

Expand Down Expand Up @@ -75,11 +75,9 @@ Using as a multiplier for generating the next delay for backoff

The amount of time to wait before attempting to retry a request to `Doris`

### sink.properties.* [doris stream load config]
### doris.config [map]
TaoZex marked this conversation as resolved.
Show resolved Hide resolved

The parameter of the stream load `data_desc`
The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name
For example, the way to specify `strip_outer_array` is: `sink.properties.strip_outer_array`

#### Supported import data formats

Expand All @@ -98,8 +96,10 @@ sink {
database = "test"
table = "e2e_table_sink"
batch_max_rows = 100
sink.properties.format = "JSON"
sink.properties.strip_outer_array = true
doris.config = {
format = "JSON"
strip_outer_array = true
}
}
}

Expand All @@ -118,12 +118,18 @@ sink {
batch_max_rows = 100
sink.properties.format = "CSV"
sink.properties.column_separator = ","
doris.config = {
format = "CSV"
column_separator = ","
}
}
}
```

## Changelog

### next version
### 2.3.0-beta 2022-10-20
- Add Doris Sink Connector

- Add Doris Sink Connector
### Next version
- [Improve] Change Doris Config Prefix [3856](https://github.com/apache/incubator-seatunnel/pull/3856)
Expand Up @@ -19,7 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.config.CheckConfigUtil;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand Down Expand Up @@ -70,11 +70,11 @@ public class SinkConfig {
.noDefaultValue()
.withDescription("The name of Doris table");

public static final Option<String> DORIS_SINK_CONFIG_PREFIX = Options.key("sink.properties.")
.stringType()
public static final Option<Map<String, String>> DORIS_CONFIG = Options.key("doris.config")
.mapType()
.noDefaultValue()
.withDescription("The parameter of the stream load data_desc. " +
"The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name ");
"The way to specify the parameter is to add the original stream load parameter into map");

public static final Option<Integer> BATCH_MAX_SIZE = Options.key("batch_max_rows")
.intType()
Expand Down Expand Up @@ -178,11 +178,11 @@ public static SinkConfig loadConfig(Config pluginConfig) {
}

private static void parseSinkStreamLoadProperties(Config pluginConfig, SinkConfig sinkConfig) {
Config dorisConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
DORIS_SINK_CONFIG_PREFIX.key(), false);
dorisConfig.entrySet().forEach(entry -> {
final String configKey = entry.getKey().toLowerCase();
sinkConfig.streamLoadProps.put(configKey, entry.getValue().unwrapped().toString());
});
if (CheckConfigUtil.isValidParam(pluginConfig, DORIS_CONFIG.key())) {
pluginConfig.getObject(DORIS_CONFIG.key()).forEach((key, value) -> {
final String configKey = key.toLowerCase();
sinkConfig.streamLoadProps.put(configKey, value.unwrapped().toString());
});
}
}
}
Expand Up @@ -37,7 +37,7 @@ public OptionRule optionRule() {
.required(SinkConfig.NODE_URLS, SinkConfig.USERNAME, SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.TABLE)
.optional(SinkConfig.LABEL_PREFIX, SinkConfig.BATCH_MAX_SIZE, SinkConfig.BATCH_MAX_BYTES,
SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES, SinkConfig.MAX_RETRY_BACKOFF_MS,
SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, SinkConfig.DORIS_SINK_CONFIG_PREFIX)
SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, SinkConfig.DORIS_CONFIG)
.build();
}
}