diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index a0732992e8b00..795567bc9f893 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -136,6 +136,8 @@ import static org.apache.flink.configuration.ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_OPT_DIR; +import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX; +import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -204,6 +206,9 @@ public YarnClusterDescriptor( this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration); this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration); + adaptEnvSetting(flinkConfiguration, CoreOptions.FLINK_LOG_LEVEL, "ROOT_LOG_LEVEL"); + adaptEnvSetting(flinkConfiguration, CoreOptions.FLINK_LOG_MAX, "MAX_LOG_FILE_NUMBER"); + getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath); decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_FILES) .ifPresent(this::addShipFiles); @@ -216,6 +221,21 @@ public YarnClusterDescriptor( this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL); } + /** Adapt flink env setting. */ + private static void adaptEnvSetting( + Configuration config, ConfigOption configOption, String envKey) { + config.getOptional(configOption) + .ifPresent( + value -> { + config.setString( + CONTAINERIZED_MASTER_ENV_PREFIX + envKey, + String.valueOf(value)); + config.setString( + CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + envKey, + String.valueOf(value)); + }); + } + private Optional> decodeFilesToShipToCluster( final Configuration configuration, final ConfigOption> configOption) { checkNotNull(configuration);