Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2] Support TableSourceFactory on StarRocks #6498

Merged
merged 11 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/en/connector-v2/source/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ delivers the query plan as a parameter to BE nodes, and then obtains data result
| scan_batch_rows | int | no | 1024 |
| scan_mem_limit | long | no | 2147483648 |
| max_retries | int | no | 3 |
| scan.params.* | string | no | - |

### node_urls [list]

Expand Down Expand Up @@ -136,6 +137,10 @@ The maximum memory space allowed for a single query in the BE node, in bytes. Th

number of retry requests sent to StarRocks

### scan.params. [string]

The parameter of the scan data from be

## Example

```
Expand Down Expand Up @@ -164,6 +169,8 @@ source {
DATETIME_COL = TIMESTAMP
DATE_COL = DATE
}
scan.params.scanner_thread_pool_thread_num = "3"

}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,18 @@
package org.apache.seatunnel.common.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;

import lombok.NonNull;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public final class TypesafeConfigUtils {

private TypesafeConfigUtils() {}

/**
* Extract sub config with fixed prefix
*
* @param source config source
* @param prefix config prefix
* @param keepPrefix true if keep prefix
* @deprecated use org.apache.seatunnel.api.configuration.Option interface instead
*/
@Deprecated
public static Config extractSubConfig(Config source, String prefix, boolean keepPrefix) {

// use LinkedHashMap to keep insertion order
Map<String, String> values = new LinkedHashMap<>();

for (Map.Entry<String, ConfigValue> entry : source.entrySet()) {
final String key = entry.getKey();
final String value = String.valueOf(entry.getValue().unwrapped());

if (key.startsWith(prefix)) {

if (keepPrefix) {
values.put(key, value);
} else {
values.put(key.substring(prefix.length()), value);
}
}
}

return ConfigFactory.parseMap(values);
}

/**
* Check if config with specific prefix exists
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,10 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfig;
import static org.apache.seatunnel.common.config.TypesafeConfigUtils.hasSubConfig;

public class TypesafeConfigUtilsTest {

@Test
public void testExtractSubConfig() {
Config config = getConfig();
Config subConfig = extractSubConfig(config, "test.", true);
Map<String, String> configMap = new HashMap<>();
configMap.put("test.t0", "v0");
configMap.put("test.t1", "v1");
Assertions.assertEquals(ConfigFactory.parseMap(configMap), subConfig);

subConfig = extractSubConfig(config, "test.", false);
configMap = new HashMap<>();
configMap.put("t0", "v0");
configMap.put("t1", "v1");
Assertions.assertEquals(ConfigFactory.parseMap(configMap), subConfig);
}

@Test
public void testHasSubConfig() {
Config config = getConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class StarRocksCatalogFactory implements CatalogFactory {
public static final String IDENTIFIER = "StarRocks";
public static final String IDENTIFIER = CommonConfig.CONNECTOR_IDENTITY;

@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -31,6 +32,9 @@
@ToString
@AllArgsConstructor
public class CommonConfig implements Serializable {

public static final String CONNECTOR_IDENTITY = "StarRocks";

public static final Option<List<String>> NODE_URLS =
Options.key("nodeUrls")
.listType()
Expand Down Expand Up @@ -67,4 +71,12 @@ public class CommonConfig implements Serializable {
private String password;
private String database;
private String table;

public CommonConfig(ReadonlyConfig config) {
this.nodeUrls = config.get(NODE_URLS);
this.username = config.get(USERNAME);
this.password = config.get(PASSWORD);
this.database = config.get(DATABASE);
this.table = config.get(TABLE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Setter
Expand All @@ -37,13 +33,26 @@ public class SourceConfig extends CommonConfig {

private static final long DEFAULT_SCAN_MEM_LIMIT = 1024 * 1024 * 1024L;

public SourceConfig(
@NonNull List<String> nodeUrls,
@NonNull String username,
@NonNull String password,
@NonNull String database,
@NonNull String table) {
super(nodeUrls, username, password, database, table);
public SourceConfig(ReadonlyConfig config) {
super(config);
this.maxRetries = config.get(MAX_RETRIES);
this.requestTabletSize = config.get(QUERY_TABLET_SIZE);
this.scanFilter = config.get(SCAN_FILTER);
this.connectTimeoutMs = config.get(SCAN_CONNECT_TIMEOUT);
this.batchRows = config.get(SCAN_BATCH_ROWS);
this.keepAliveMin = config.get(SCAN_KEEP_ALIVE_MIN);
this.queryTimeoutSec = config.get(SCAN_QUERY_TIMEOUT_SEC);
this.memLimit = config.get(SCAN_MEM_LIMIT);

String prefix = STARROCKS_SCAN_CONFIG_PREFIX.key();
config.toMap()
.forEach(
(key, value) -> {
if (key.startsWith(prefix)) {
this.sourceOptionProps.put(
key.substring(prefix.length()).toLowerCase(), value);
}
});
}

public static final Option<Integer> MAX_RETRIES =
Expand Down Expand Up @@ -105,57 +114,5 @@ public SourceConfig(
private int keepAliveMin = SCAN_KEEP_ALIVE_MIN.defaultValue();
private int batchRows = SCAN_BATCH_ROWS.defaultValue();
private int connectTimeoutMs = SCAN_CONNECT_TIMEOUT.defaultValue();
private final Map<String, String> sourceOptionProps = new HashMap<>();

public static SourceConfig loadConfig(Config pluginConfig) {
SourceConfig sourceConfig =
new SourceConfig(
pluginConfig.getStringList(NODE_URLS.key()),
pluginConfig.getString(USERNAME.key()),
pluginConfig.getString(PASSWORD.key()),
pluginConfig.getString(DATABASE.key()),
pluginConfig.getString(TABLE.key()));

if (pluginConfig.hasPath(MAX_RETRIES.key())) {
sourceConfig.setMaxRetries(pluginConfig.getInt(MAX_RETRIES.key()));
}
if (pluginConfig.hasPath(QUERY_TABLET_SIZE.key())) {
sourceConfig.setRequestTabletSize(pluginConfig.getInt(QUERY_TABLET_SIZE.key()));
}
if (pluginConfig.hasPath(SCAN_FILTER.key())) {
sourceConfig.setScanFilter(pluginConfig.getString(SCAN_FILTER.key()));
}
if (pluginConfig.hasPath(SCAN_CONNECT_TIMEOUT.key())) {
sourceConfig.setConnectTimeoutMs(pluginConfig.getInt(SCAN_CONNECT_TIMEOUT.key()));
}
if (pluginConfig.hasPath(SCAN_BATCH_ROWS.key())) {
sourceConfig.setBatchRows(pluginConfig.getInt(SCAN_BATCH_ROWS.key()));
}
if (pluginConfig.hasPath(SCAN_KEEP_ALIVE_MIN.key())) {
sourceConfig.setKeepAliveMin(pluginConfig.getInt(SCAN_KEEP_ALIVE_MIN.key()));
}
if (pluginConfig.hasPath(SCAN_QUERY_TIMEOUT_SEC.key())) {
sourceConfig.setQueryTimeoutSec(pluginConfig.getInt(SCAN_QUERY_TIMEOUT_SEC.key()));
}
if (pluginConfig.hasPath(SCAN_MEM_LIMIT.key())) {
sourceConfig.setMemLimit(pluginConfig.getLong(SCAN_MEM_LIMIT.key()));
}
parseSourceOptionProperties(pluginConfig, sourceConfig);
return sourceConfig;
}

private static void parseSourceOptionProperties(
Config pluginConfig, SourceConfig sourceConfig) {
Config sourceOptionConfig =
TypesafeConfigUtils.extractSubConfig(
pluginConfig, STARROCKS_SCAN_CONFIG_PREFIX.key(), false);
sourceOptionConfig
.entrySet()
.forEach(
entry -> {
final String configKey = entry.getKey().toLowerCase();
sourceConfig.sourceOptionProps.put(
configKey, (String) entry.getValue().unwrapped());
});
}
private Map<String, String> sourceOptionProps = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
Expand All @@ -42,7 +43,7 @@
public class StarRocksSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
return "StarRocks";
return CommonConfig.CONNECTOR_IDENTITY;
}

@Override
Expand Down