-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][Engine] Unify job env parameters #6003
Conversation
…efore fetch task start (apache#5008)
Please check ci |
# Conflicts: # seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
public static void initTableEnvironmentConfiguration( | ||
Config config, Configuration configuration) { | ||
/** | ||
* flink table configuration items are prefixed with 'table.exec'. reference: {@link | ||
* org.apache.flink.table.api.config.ExecutionConfigOptions} | ||
*/ | ||
String prefixConf = "flink.table.exec"; | ||
String replacePrefix = "flink."; | ||
if (!config.isEmpty()) { | ||
for (Map.Entry<String, ConfigValue> entryConfKey : config.entrySet()) { | ||
String confKey = entryConfKey.getKey().trim(); | ||
if (confKey.startsWith(prefixConf)) { | ||
configuration.setString( | ||
confKey.replaceFirst(prefixConf, ""), entryConfKey.getValue().render()); | ||
confKey.replaceFirst(replacePrefix, ""), | ||
entryConfKey.getValue().unwrapped().toString()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add test case for this.
ignore below
Look like it remove prefix of flink.table.exec.config
to table.exec.config
. Not worked on flink.pipeline.max-parallelism
according doc https://github.com/apache/seatunnel/pull/6003/files#diff-8f9b660879c6a25ff9d0f64619025ea7be9a72cd0152c4b896843424cab6abbcR44
public static void initTableEnvironmentConfiguration( | ||
Config config, Configuration configuration) { | ||
/** | ||
* flink table configuration items are prefixed with 'table.exec'. reference: {@link | ||
* org.apache.flink.table.api.config.ExecutionConfigOptions} | ||
*/ | ||
String prefixConf = "flink.table.exec"; | ||
String replacePrefix = "flink."; | ||
if (!config.isEmpty()) { | ||
for (Map.Entry<String, ConfigValue> entryConfKey : config.entrySet()) { | ||
String confKey = entryConfKey.getKey().trim(); | ||
if (confKey.startsWith(prefixConf)) { | ||
configuration.setString( | ||
confKey.replaceFirst(prefixConf, ""), entryConfKey.getValue().render()); | ||
confKey.replaceFirst(replacePrefix, ""), | ||
entryConfKey.getValue().unwrapped().toString()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, but please add a test case for flink.table.exec.
feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, but please add a test case for
flink.table.exec.
feature.
I've added a test case for this that uses table env parallelism to override global parallelism. PTAL.
@Slf4j | ||
@DisabledOnContainer( | ||
value = {}, | ||
type = {EngineType.SEATUNNEL, EngineType.SPARK}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but I think we should create new place to store test only for flink in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but I think we should create new place to store test only for flink in the future.
Agree with you, we can add a new module under seatunnel-engine-e2e
, this module can be called seatunnel-engine-flink-e2e
or seatunnel-flink-engine-e2e
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Would you mind to do this after this PR be merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Would you mind to do this after this PR be merged?
Of course not, after this pr merge, I will do this.
Purpose of this pull request
flink.
and the parameter items provided by flink officialclose [Feature][Engine] Unify job env parameters #5937
Does this PR introduce any user-facing change?
How was this patch tested?
add new test cases to separately verify that the parameters are consistent before and after adjustments
Check list
New License Guide
release-note
.