Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][starter] support user define parameter on spark/flink engine #6387

Merged
merged 10 commits into from
Apr 30, 2024
110 changes: 110 additions & 0 deletions docs/en/concept/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,116 @@ configured with these two parameters, because in SeaTunnel, there is a default c
parameters are not configured, then the generated data from the last module of the previous node will be used.
This is much more convenient when there is only one source.

## Config variable substitution

In config file we can define some variables and replace it in run time. **This is only support `hocon` format file**.

```hocon
env {
job.mode = "BATCH"
job.name = ${jobName}
parallelism = 2
}

source {
FakeSource {
result_table_name = ${resName}
row.num = ${rowNum}
string.template = ${strTemplate}
int.template = [20, 21]
schema = {
fields {
name = ${nameType}
age = "int"
}
}
}
}

transform {
sql {
source_table_name = "fake"
result_table_name = "sql"
query = "select * from "${resName}" where name = '"${nameVal}"' "
}

}

sink {
Console {
source_table_name = "sql"
username = ${username}
password = ${password}
blankSpace = ${blankSpace}
}
}

```

In the above config, we define some variables, like `${rowNum}`, `${resName}`.
We can replace those parameters with this shell command:

```shell
./bin/seatunnel.sh -c <this_config_file>
-i jobName='st var job'
-i resName=fake
-i rowNum=10
-i strTemplate=['abc','d~f','h i']
-i nameType=string
-i nameVal=abc
-i username=seatunnel=2.3.1
-i password='$a^b%c.d~e0*9('
-i blankSpace='2023-12-26 11:30:00'
-e local
```

Then the final submitted config is:

```hocon
env {
job.mode = "BATCH"
job.name = "st var job"
parallelism = 2
}

source {
FakeSource {
result_table_name = "fake"
row.num = 10
string.template = ["abc","d~f","h i"]
int.template = [20, 21]
schema = {
fields {
name = string
age = "int"
}
}
}
}

transform {
sql {
source_table_name = "fake"
result_table_name = "sql"
query = "select * from fake where name = 'abc' "
}

}

sink {
Console {
source_table_name = "sql"
username = "seatunnel=2.3.1"
password = "$a^b%c.d~e0*9("
blankSpace = "2023-12-26 11:30:00"
}
}
```

Some Notes:
- quota with `'` if the value has space ` ` or special character (like `(`)
- if the replacement variables is in `"` or `'`, like `resName` and `nameVal`, you need add `"`

## What's More

If you want to know the details of this format configuration, Please
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;

import static org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist;

Expand All @@ -53,10 +54,18 @@ public void execute() throws CommandExecuteException, ConfigCheckException {
checkConfigExist(configPath);
Config config =
ConfigFactory.parseFile(configPath.toFile())
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
.resolveWith(
ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
if (abstractCommandArgs.getVariables() != null) {
abstractCommandArgs.getVariables().stream()
.filter(Objects::nonNull)
.map(variable -> variable.split("=", 2))
.filter(pair -> pair.length == 2)
.forEach(pair -> System.setProperty(pair[0], pair[1]));
config =
config.resolveWith(
ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
}
Config encryptConfig = ConfigShadeUtils.encryptConfig(config);
log.info(
"Encrypt config: \n{}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import org.apache.seatunnel.shade.com.typesafe.config.impl.Parseable;

import org.apache.seatunnel.api.configuration.ConfigAdapter;

Expand All @@ -29,7 +31,9 @@

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/** Used to build the {@link Config} from config file. */
Expand All @@ -43,28 +47,34 @@ private ConfigBuilder() {
// utility class and cannot be instantiated
}

private static Config ofInner(@NonNull Path filePath) {
private static Config ofInner(@NonNull Path filePath, List<String> variables) {
Config config =
ConfigFactory.parseFile(filePath.toFile())
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
.resolveWith(
ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
return ConfigShadeUtils.decryptConfig(config);
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
return ConfigShadeUtils.decryptConfig(backfillUserVariables(config, variables));
}

public static Config of(@NonNull String filePath) {
Path path = Paths.get(filePath);
return of(path);
}

public static Config of(@NonNull String filePath, List<String> variables) {
Path path = Paths.get(filePath);
return of(path, variables);
}

public static Config of(@NonNull Path filePath) {
return of(filePath, null);
}

public static Config of(@NonNull Path filePath, List<String> variables) {
log.info("Loading config file from path: {}", filePath);
Optional<ConfigAdapter> adapterSupplier = ConfigAdapterUtils.selectAdapter(filePath);
Config config =
adapterSupplier
.map(adapter -> of(adapter, filePath))
.orElseGet(() -> ofInner(filePath));
.map(adapter -> of(adapter, filePath, variables))
.orElseGet(() -> ofInner(filePath, variables));
log.info("Parsed config file: \n{}", config.root().render(CONFIG_RENDER_OPTIONS));
return config;
}
Expand All @@ -88,17 +98,38 @@ public static Config of(@NonNull Map<String, Object> objectMap, boolean isEncryp
return config;
}

public static Config of(@NonNull ConfigAdapter configAdapter, @NonNull Path filePath) {
public static Config of(
@NonNull ConfigAdapter configAdapter, @NonNull Path filePath, List<String> variables) {
log.info("With config adapter spi {}", configAdapter.getClass().getName());
try {
Map<String, Object> flattenedMap = configAdapter.loadConfig(filePath);
Config config = ConfigFactory.parseMap(flattenedMap);
return ConfigShadeUtils.decryptConfig(config);
return ConfigShadeUtils.decryptConfig(backfillUserVariables(config, variables));
} catch (Exception warn) {
log.warn(
"Loading config failed with spi {}, fallback to HOCON loader.",
configAdapter.getClass().getName());
return ofInner(filePath);
return ofInner(filePath, variables);
}
}

private static Config backfillUserVariables(Config config, List<String> variables) {
if (variables != null) {
variables.stream()
.filter(Objects::nonNull)
.map(variable -> variable.split("=", 2))
.filter(pair -> pair.length == 2)
.forEach(pair -> System.setProperty(pair[0], pair[1]));
Config systemConfig =
Parseable.newProperties(
System.getProperties(),
ConfigParseOptions.defaults()
.setOriginDescription("system properties"))
.parse()
.toConfig();
return config.resolveWith(
systemConfig, ConfigResolveOptions.defaults().setAllowUnresolved(true));
}
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;

import org.apache.seatunnel.api.configuration.ConfigShade;
import org.apache.seatunnel.common.utils.JsonUtils;
Expand Down Expand Up @@ -67,6 +68,55 @@ public void testParseConfig() throws URISyntaxException {
config.getConfigList("source").get(0).getString("password"), PASSWORD);
}

@Test
public void testVariableReplacement() throws URISyntaxException {
String jobName = "seatunnel variable test job";
String resName = "fake";
int rowNum = 10;
String nameType = "string";
String username = "seatunnel=2.3.1";
String password = "$a^b%c.d~e0*9(";
String blankSpace = "2023-12-26 11:30:00";
List<String> variables = new ArrayList<>();
variables.add("jobName=" + jobName);
variables.add("resName=" + resName);
variables.add("rowNum=" + rowNum);
variables.add("strTemplate=[abc,de~,f h]");
variables.add("nameType=" + nameType);
variables.add("nameVal=abc");
variables.add("username=" + username);
variables.add("password=" + password);
variables.add("blankSpace=" + blankSpace);
URL resource = ConfigShadeTest.class.getResource("/config.variables.conf");
Assertions.assertNotNull(resource);
Config config = ConfigBuilder.of(Paths.get(resource.toURI()), variables);
Config envConfig = config.getConfig("env");
Assertions.assertEquals(envConfig.getString("job.name"), jobName);
List<? extends ConfigObject> sourceConfigs = config.getObjectList("source");
for (ConfigObject configObject : sourceConfigs) {
Config sourceConfig = configObject.toConfig();
List<String> list1 = sourceConfig.getStringList("string.template");
Assertions.assertEquals(list1.get(0), "abc");
Assertions.assertEquals(list1.get(1), "de~");
Assertions.assertEquals(list1.get(2), "f h");
Assertions.assertEquals(sourceConfig.getInt("row.num"), rowNum);
Assertions.assertEquals(sourceConfig.getString("result_table_name"), resName);
}
List<? extends ConfigObject> transformConfigs = config.getObjectList("transform");
for (ConfigObject configObject : transformConfigs) {
Config transformConfig = configObject.toConfig();
Assertions.assertEquals(
transformConfig.getString("query"), "select * from fake where name = 'abc' ");
}
List<? extends ConfigObject> sinkConfigs = config.getObjectList("sink");
for (ConfigObject sinkObject : sinkConfigs) {
Config sinkConfig = sinkObject.toConfig();
Assertions.assertEquals(sinkConfig.getString("username"), username);
Assertions.assertEquals(sinkConfig.getString("password"), password);
Assertions.assertEquals(sinkConfig.getString("blankSpace"), blankSpace);
}
}

@Test
public void testDecryptAndEncrypt() {
String encryptUsername = ConfigShadeUtils.encryptOption("base64", USERNAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "BATCH"
job.name = ${jobName}
parallelism = 2
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = ${resName}
row.num = ${rowNum}
string.template = ${strTemplate}
int.template = [20, 21]
schema = {
fields {
name = ${nameType}
age = "int"
}
}
}

# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/category/source-v2
}

transform {

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform-v2
sql {
source_table_name = "fake"
result_table_name = "sql"
query = "select * from "${resName}" where name = '"${nameVal}"' "
}

}

sink {
Console {
source_table_name = "sql"
username = ${username}
password = ${password}
blankSpace = ${blankSpace}
}

# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public List<String> buildCommands() {
flinkCommandArgs.getVariables().stream()
.filter(Objects::nonNull)
.map(String::trim)
.forEach(variable -> command.add("-D" + variable));
.forEach(variable -> command.add("-i " + variable));
Hisoka-X marked this conversation as resolved.
Show resolved Hide resolved
return command;
}
}