Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][starter] support user define parameter on spark/flink engine #6387

Merged
merged 10 commits into from
Apr 30, 2024

Conversation

liunaijie
Copy link
Contributor

Purpose of this pull request

close #6376

Does this PR introduce any user-facing change?

No

How was this patch tested?

add example to test.

Check list

Copy link
Contributor

@JueLance JueLance left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution! There are some problems in your commit. Unit test for project connector-cdc-sqlserver-e2e was failed, please check your commit.

Comment on lines 121 to 131
if (variables != null) {
Map<String, String> map =
variables.stream()
.filter(Objects::nonNull)
.map(variable -> variable.split("=", 2))
.filter(pair -> pair.length == 2F)
.collect(Collectors.toMap(pair -> pair[0], pair -> pair[1]));
config =
config.resolveWith(
ConfigFactory.parseMap(map),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem like this part show twice in this file. Could you refactor it with new method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

String configFile = getTestConfigFile(configurePath);
SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
sparkCommandArgs.setConfigFile(configFile);
sparkCommandArgs.setCheckConfig(false);
sparkCommandArgs.setVariables(null);
sparkCommandArgs.setVariables(Collections.singletonList("intVal=1"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

@@ -17,16 +17,36 @@

package org.apache.seatunnel.example.spark.v2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not touch example module.

@@ -0,0 +1,99 @@
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@liunaijie liunaijie force-pushed the user-define-parameter branch 2 times, most recently from 89f00f6 to facd28f Compare March 12, 2024 05:30
@liunaijie
Copy link
Contributor Author

@Hisoka-X PTAL

@hailin0 hailin0 added this to the 2.3.5 milestone Mar 29, 2024
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the processing parameter logic of different engines be unified together? It seems that zeta's processing code is a bit scattered now.

@Hisoka-X
Copy link
Member

Hisoka-X commented Apr 8, 2024

How about add some intro in https://github.com/apache/seatunnel/blob/e5df7f00f8e21edcc5168c4f3dedfc7f85a4a150/docs/en/concept/config.md? Then user can easiler to found this feature.

@liunaijie
Copy link
Contributor Author

How about add some intro in https://github.com/apache/seatunnel/blob/e5df7f00f8e21edcc5168c4f3dedfc7f85a4a150/docs/en/concept/config.md? Then user can easiler to found this feature.

Sure, will do it.

@Hisoka-X
Copy link
Member

Hisoka-X commented Apr 9, 2024

Could the processing parameter logic of different engines be unified together? It seems that zeta's processing code is a bit scattered now.

How about this one?

@liunaijie liunaijie force-pushed the user-define-parameter branch 2 times, most recently from a266f91 to 25da29d Compare April 9, 2024 08:17
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this ut, because we has e2e test
And in this ut, it's call clientCommandArgs.buildCommand() in this method, it will set the variables to system properties. then the config parse read from system properties.
as we unified the process, we won't set the variables to system properties, then this test will be fail.

@liunaijie
Copy link
Contributor Author

Could the processing parameter logic of different engines be unified together? It seems that zeta's processing code is a bit scattered now.

How about this one?

Done, PTAL

}
```

In the above config, i define some variables, like `${rowNum}`, `${resName}`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
In the above config, i define some variables, like `${rowNum}`, `${resName}`.
In the above config, we define some variables, like `${rowNum}`, `${resName}`.

./bin/seatunnel.sh -c <this_config_file> -i rowNum=10 -i resName=fake -i strTemplate=abc -i nameVal=abc
```

Then the finial submitted config is:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Then the finial submitted config is:
Then the final submitted config is:

Comment on lines -38 to -65
int fakeParallelism = 16;
String username = "seatunnel=2.3.1";
String password = "dsjr42=4wfskahdsd=w1chh";
String fakeSourceTable = "fake";
String fakeSinkTable = "sink";
String list = "[par1=20230829,par2=20230829]";
String blankSpace = "2023-12-26 11:30:00";
String[] args = {
"-c",
"/args/user_defined_params.conf",
"-e",
"local",
"-i",
"fake_source_table=" + fakeSourceTable,
"-i",
"fake_parallelism=" + fakeParallelism,
"-i",
"fake_sink_table=" + fakeSinkTable,
"-i",
"password=" + password,
"-i",
"username=" + username,
"-i",
"blankSpace=" + blankSpace,
"-i",
"list=" + list,
"-i",
"sql=" + "\"select a , b from fake_source_table\""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need add test case like this:

        String username = "seatunnel=2.3.1";
        String password = "dsjr42=4wfskahdsd=w1chh";
        String list = "[par1=20230829,par2=20230829]";
        String blankSpace = "2023-12-26 11:30:00";
        String sql =  "\"select a , b from fake_source_table\""

It contains =, , \" so on

.collect(Collectors.toMap(pair -> pair[0], pair -> pair[1]));
config =
config.resolveWith(
ConfigFactory.parseMap(map),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, i need change all of them to system properties.
when i test the array type variabel replacement, i find it only can be replace when use system properties.

Hisoka-X
Hisoka-X previously approved these changes Apr 12, 2024
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, please retrigger failed CI.

@Hisoka-X
Copy link
Member

Something not right.
image

@liunaijie
Copy link
Contributor Author

Something not right. image

yes i see this error. i have some test code in other branch.
in my local desktop (windows) this ut can run successful, so run some test by github action.

@liunaijie
Copy link
Contributor Author

Something not right. image

I has some test, find in github action windows test, it can't read the user variables system properties. But in my local window desktop, it can read the properties, so i am disable this test in window to pass to ci.

the screenshot in my local windows desktop
image

@Hisoka-X Hisoka-X removed this from the 2.3.5 milestone Apr 15, 2024
@Hisoka-X
Copy link
Member

Thanks @liunaijie for updated. But I think we should find the reason instead of disable it.

@hailin0
Copy link
Member

hailin0 commented Apr 17, 2024

Thanks @liunaijie for updated. But I think we should find the reason instead of disable it.

+1

@liunaijie

@liunaijie
Copy link
Contributor Author

liunaijie commented Apr 17, 2024

@Hisoka-X @hailin0 Done, PTAL.
When call ConfigFactory.systemProperties() method, it has an static variable to hold the system properties.
so it will only read system properties once. if the properties changed, it can't read the new properties.

@liunaijie
Copy link
Contributor Author

@Hisoka-X @hailin0 PTAL

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh almost forgot this one. LGTM. Thanks @liunaijie

@hailin0 hailin0 merged commit e2a4b64 into apache:dev Apr 30, 2024
5 of 6 checks passed
@liunaijie liunaijie deleted the user-define-parameter branch May 11, 2024 09:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Starter] Support user defined parameter on spark/flink engine
4 participants