From d39110dd889943595a0e0e41e38a8cec963a7d5e Mon Sep 17 00:00:00 2001 From: Roc Marshal Date: Mon, 8 Jan 2024 13:26:34 +0800 Subject: [PATCH 1/2] [FLINK-33988][configuration] Fix the invalid configuration when using initialized root logger level on yarn deployment mode --- .../flink/yarn/YarnClusterDescriptor.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index a0732992e8b00..97e49b8046af9 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -136,6 +136,8 @@ import static org.apache.flink.configuration.ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_OPT_DIR; +import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX; +import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -204,6 +206,8 @@ public YarnClusterDescriptor( this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration); this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration); + adaptEnvSetting(flinkConfiguration, CoreOptions.FLINK_LOG_LEVEL, "ROOT_LOG_LEVEL"); + getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath); decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_FILES) .ifPresent(this::addShipFiles); @@ -216,6 +220,21 @@ public YarnClusterDescriptor( this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL); } + /** Adapt flink env setting. */ + private static void adaptEnvSetting( + Configuration config, ConfigOption configOption, String envKey) { + config.getOptional(configOption) + .ifPresent( + value -> { + config.setString( + CONTAINERIZED_MASTER_ENV_PREFIX + envKey, + String.valueOf(value)); + config.setString( + CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + envKey, + String.valueOf(value)); + }); + } + private Optional> decodeFilesToShipToCluster( final Configuration configuration, final ConfigOption> configOption) { checkNotNull(configuration); From a746744c7e1e14d077316264c583c71bc6c6278d Mon Sep 17 00:00:00 2001 From: Roc Marshal Date: Tue, 16 Jan 2024 13:22:37 +0800 Subject: [PATCH 2/2] [FLINK-34102][configuration] Fix the invalid configuration when using 'env.log.max' on yarn deployment mode --- .../main/java/org/apache/flink/yarn/YarnClusterDescriptor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 97e49b8046af9..795567bc9f893 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -207,6 +207,7 @@ public YarnClusterDescriptor( this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration); adaptEnvSetting(flinkConfiguration, CoreOptions.FLINK_LOG_LEVEL, "ROOT_LOG_LEVEL"); + adaptEnvSetting(flinkConfiguration, CoreOptions.FLINK_LOG_MAX, "MAX_LOG_FILE_NUMBER"); getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath); decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_FILES)