From 767e4c642f0e7577efe05f07c16f3d51756c8e2d Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 17 May 2017 18:15:27 +0200 Subject: [PATCH 1/3] [FLINK-6031][yarn] Add config parameter for user-jar inclusion in classpath --- docs/setup/yarn_setup.md | 12 ++ .../yarn/AbstractYarnClusterDescriptor.java | 169 ++++++++++++------ .../yarn/configuration/YarnConfigOptions.java | 18 ++ 3 files changed, 144 insertions(+), 55 deletions(-) diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md index 1ce45ad9c73a3..179e8976efa73 100644 --- a/docs/setup/yarn_setup.md +++ b/docs/setup/yarn_setup.md @@ -245,6 +245,18 @@ Note: You can use a different configuration directory per job by setting the env Note: It is possible to combine `-m yarn-cluster` with a detached YARN submission (`-yd`) to "fire and forget" a Flink job to the YARN cluster. In this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call! +### User jars & Classpath + +By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the `yarn.per-job-cluster.include-job-jar` parameter. + +When setting this to `DISABLED` Flink will include the jar in the user classpath instead. + +The user-jars position in the class path can be controlled by setting the parameter to one of the following: + +- `ORDER`: (default) Adds the jar to the system class path based on the lexicographic order. +- `FIRST`: Adds the jar to the beginning of the system class path. +- `LAST`: Adds the jar to the end of the system class path. + ## Recovery behavior of Flink on YARN Flink's YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` parameters. diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 65525f28b1b00..0e43aa9fe1f5e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -77,6 +78,7 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -144,7 +146,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor /** Optional Jar file to include in the system class loader of all application nodes * (for per-job submission) */ - private Set userJarFiles; + private final Set userJarFiles = new HashSet<>(); + + private YarnConfigOptions.UserJarInclusion userJarInclusion; public AbstractYarnClusterDescriptor() { // for unit tests only @@ -172,6 +176,16 @@ public AbstractYarnClusterDescriptor() { jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); + String configuredUserJarInclusion = flinkConfiguration.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); + try { + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); + } catch (IllegalArgumentException e) { + LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), + configuredUserJarInclusion, + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + } } catch (Exception e) { LOG.debug("Config couldn't be loaded from environment variable.", e); } @@ -200,6 +214,17 @@ public void setTaskManagerMemory(int memoryMb) { public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) { this.flinkConfiguration = conf; + + String configuredUserJarInclusion = flinkConfiguration.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); + try { + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); + } catch (IllegalArgumentException e) { + LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), + configuredUserJarInclusion, + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + } } public org.apache.flink.configuration.Configuration getFlinkConfiguration() { @@ -265,7 +290,10 @@ public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) { * Returns true if the descriptor has the job jars to include in the classpath. */ public boolean hasUserJarFiles(List requiredJarFiles) { - if (userJarFiles == null || userJarFiles.size() != requiredJarFiles.size()) { + if (userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED) { + return false; + } + if (userJarFiles.size() != requiredJarFiles.size()) { return false; } try { @@ -284,16 +312,14 @@ public boolean hasUserJarFiles(List requiredJarFiles) { * Sets the user jar which is included in the system classloader of all nodes. */ public void setProvidedUserJarFiles(List userJarFiles) { - Set localUserJarFiles = new HashSet<>(userJarFiles.size()); for (URL jarFile : userJarFiles) { try { - localUserJarFiles.add(new File(jarFile.toURI())); + this.userJarFiles.add(new File(jarFile.toURI())); } catch (URISyntaxException e) { throw new IllegalArgumentException("Couldn't add local user jar: " + jarFile + " Currently only file:/// URLs are supported."); } } - this.userJarFiles = localUserJarFiles; } public String getDynamicPropertiesEncoded() { @@ -603,22 +629,22 @@ public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient } ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); - Set effectiveShipFiles = new HashSet<>(shipFiles.size()); + Set systemShipFiles = new HashSet<>(shipFiles.size()); for (File file : shipFiles) { - effectiveShipFiles.add(file.getAbsoluteFile()); + systemShipFiles.add(file.getAbsoluteFile()); } //check if there is a logback or log4j file File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME); final boolean hasLogback = logbackFile.exists(); if (hasLogback) { - effectiveShipFiles.add(logbackFile); + systemShipFiles.add(logbackFile); } File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME); final boolean hasLog4j = log4jFile.exists(); if (hasLog4j) { - effectiveShipFiles.add(log4jFile); + systemShipFiles.add(log4jFile); if (hasLogback) { // this means there is already a logback configuration file --> fail LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " + @@ -626,13 +652,7 @@ public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient } } - addLibFolderToShipFiles(effectiveShipFiles); - - // add the user jar to the classpath of the to-be-created cluster - if (userJarFiles != null) { - effectiveShipFiles.addAll(userJarFiles); - } - + addLibFolderToShipFiles(systemShipFiles); // Set-up ApplicationSubmissionContext for the application @@ -665,58 +685,52 @@ public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient 1)); } + String configuredUserJarInclusion = flinkConfiguration.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); + YarnConfigOptions.UserJarInclusion userJarInclusion; + try { + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); + } catch (IllegalArgumentException e) { + LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), + configuredUserJarInclusion, + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + } + // local resource map for Yarn - final Map localResources = new HashMap<>(2 + effectiveShipFiles.size()); + final Map localResources = new HashMap<>(2 + systemShipFiles.size() + userJarFiles.size()); // list of remote paths (after upload) - final List paths = new ArrayList<>(2 + effectiveShipFiles.size()); + final List paths = new ArrayList<>(2 + systemShipFiles.size() + userJarFiles.size()); // ship list that enables reuse of resources for task manager containers StringBuilder envShipFileList = new StringBuilder(); - // upload and register ship files - final List classPaths = new ArrayList<>(); - for (File shipFile : effectiveShipFiles) { - LocalResource shipResources = Records.newRecord(LocalResource.class); - - Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); - Path remotePath = - Utils.setupLocalResource(fs, appId.toString(), shipLocalPath, shipResources, fs.getHomeDirectory()); + // upload and register ship files + List systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, localResources, envShipFileList); + List userClassPaths = uploadAndRegisterFiles(userJarFiles, fs, appId.toString(), paths, localResources, envShipFileList); - paths.add(remotePath); - - localResources.put(shipFile.getName(), shipResources); - - if (shipFile.isDirectory()) { - // add directories to the classpath - java.nio.file.Path shipPath = shipFile.toPath(); - final java.nio.file.Path parentPath = shipPath.getParent(); - - Files.walkFileTree(shipPath, new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) - throws IOException { - java.nio.file.Path relativePath = parentPath.relativize(file); - - classPaths.add(relativePath.toString()); - - return FileVisitResult.CONTINUE; - } - }); - } else { - // add files to the classpath - classPaths.add(shipFile.getName()); - } - - envShipFileList.append(remotePath).append(","); + if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) { + systemClassPaths.addAll(userClassPaths); } // normalize classpath by sorting - Collections.sort(classPaths); + Collections.sort(systemClassPaths); + Collections.sort(userClassPaths); // classpath assembler StringBuilder classPathBuilder = new StringBuilder(); - for (String classPath : classPaths) { + if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) { + for (String userClassPath : userClassPaths) { + classPathBuilder.append(userClassPath).append(File.pathSeparator); + } + } + for (String classPath : systemClassPaths) { classPathBuilder.append(classPath).append(File.pathSeparator); } + if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) { + for (String userClassPath : userClassPaths) { + classPathBuilder.append(userClassPath).append(File.pathSeparator); + } + } // Setup jar for ApplicationMaster LocalResource appMasterJar = Records.newRecord(LocalResource.class); @@ -936,6 +950,51 @@ public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes at } return report; } + + private static List uploadAndRegisterFiles( + Collection shipFiles, + FileSystem fs, + String appId, + List remotePaths, + Map localResources, + StringBuilder envShipFileList) throws IOException { + final List classPaths = new ArrayList<>(2 + shipFiles.size()); + for (File shipFile : shipFiles) { + LocalResource shipResources = Records.newRecord(LocalResource.class); + + Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); + Path remotePath = + Utils.setupLocalResource(fs, appId, shipLocalPath, shipResources, fs.getHomeDirectory()); + + remotePaths.add(remotePath); + + localResources.put(shipFile.getName(), shipResources); + + if (shipFile.isDirectory()) { + // add directories to the classpath + java.nio.file.Path shipPath = shipFile.toPath(); + final java.nio.file.Path parentPath = shipPath.getParent(); + + Files.walkFileTree(shipPath, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) + throws IOException { + java.nio.file.Path relativePath = parentPath.relativize(file); + + classPaths.add(relativePath.toString()); + + return FileVisitResult.CONTINUE; + } + }); + } else { + // add files to the classpath + classPaths.add(shipFile.getName()); + } + + envShipFileList.append(remotePath).append(","); + } + return classPaths; + } /** * Kills YARN application and stops YARN client. @@ -1220,7 +1279,7 @@ public void run() { } } - protected void addLibFolderToShipFiles(Set effectiveShipFiles) { + protected void addLibFolderToShipFiles(Collection effectiveShipFiles) { // Add lib folder to the ship files if the environment variable is set. // This is for convenience when running from the command-line. // (for other files users explicitly set the ship files) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java index 071bb7d06378e..8839c1e5ea107 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java @@ -42,8 +42,26 @@ public class YarnConfigOptions { key("yarn.appmaster.rpc.port") .defaultValue(-1); + /** + * Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning + * in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on + * their name ("ORDER"). + */ + public static final ConfigOption CLASSPATH_INCLUDE_USER_JAR = + key("yarn.per-job-cluster.include-user-jar") + .defaultValue("ORDER"); + + // ------------------------------------------------------------------------ /** This class is not meant to be instantiated */ private YarnConfigOptions() {} + + /** @see YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR */ + public enum UserJarInclusion { + DISABLED, + FIRST, + LAST, + ORDER + } } From 2ba9a1d2d06731c15903bf4b07e141a4ae050489 Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 18 May 2017 11:48:59 +0200 Subject: [PATCH 2/3] address code comments --- .../yarn/AbstractYarnClusterDescriptor.java | 47 ++++++------------- 1 file changed, 15 insertions(+), 32 deletions(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 0e43aa9fe1f5e..3110a5b5eb7c7 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -176,16 +176,7 @@ public AbstractYarnClusterDescriptor() { jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); - String configuredUserJarInclusion = flinkConfiguration.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); - try { - userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); - } catch (IllegalArgumentException e) { - LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", - YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), - configuredUserJarInclusion, - YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); - userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); - } + userJarInclusion = getUserJarInclusionMode(flinkConfiguration); } catch (Exception e) { LOG.debug("Config couldn't be loaded from environment variable.", e); } @@ -215,16 +206,7 @@ public void setTaskManagerMemory(int memoryMb) { public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) { this.flinkConfiguration = conf; - String configuredUserJarInclusion = flinkConfiguration.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); - try { - userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); - } catch (IllegalArgumentException e) { - LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", - YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), - configuredUserJarInclusion, - YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); - userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); - } + userJarInclusion = getUserJarInclusionMode(flinkConfiguration); } public org.apache.flink.configuration.Configuration getFlinkConfiguration() { @@ -685,18 +667,6 @@ public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient 1)); } - String configuredUserJarInclusion = flinkConfiguration.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); - YarnConfigOptions.UserJarInclusion userJarInclusion; - try { - userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); - } catch (IllegalArgumentException e) { - LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", - YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), - configuredUserJarInclusion, - YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); - userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); - } - // local resource map for Yarn final Map localResources = new HashMap<>(2 + systemShipFiles.size() + userJarFiles.size()); // list of remote paths (after upload) @@ -1357,6 +1327,19 @@ protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogb return amContainer; } + private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(org.apache.flink.configuration.Configuration config) { + String configuredUserJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); + try { + return YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); + } catch (IllegalArgumentException e) { + LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), + configuredUserJarInclusion, + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + return YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + } + } + /** * Creates a YarnClusterClient; may be overriden in tests */ From f5f396eb45a02d87e1e04213f9c9dd865ac3b523 Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 18 May 2017 11:59:26 +0200 Subject: [PATCH 3/3] address docs comments --- docs/setup/config.md | 2 ++ docs/setup/yarn_setup.md | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/setup/config.md b/docs/setup/config.md index c4a7354ea7e77..8a6f67da35b61 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -457,6 +457,8 @@ use the `env.java.opts` setting, which is the `%jvmopts%` variable in the String - `yarn.tags` A comma-separated list of tags to apply to the Flink YARN application. +- `yarn.per-job-cluster.include-user-jar` (Default: ORDER) Control whether and how the user-jar is included in the system class path for per-job clusters. Setting this parameter to `DISABLED` causes the jar to be included in the user class path instead. Setting this parameter to one of `FIRST`, `LAST` or `ORDER` causes the jar to be included in the system class path, with the jar either being placed at the beginning of the class path (`FIRST`), at the end (`LAST`), or based on the lexicographic order (`ORDER`). + ### Mesos diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md index 179e8976efa73..190a7965a4e07 100644 --- a/docs/setup/yarn_setup.md +++ b/docs/setup/yarn_setup.md @@ -247,7 +247,7 @@ Note: It is possible to combine `-m yarn-cluster` with a detached YARN submissio ### User jars & Classpath -By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the `yarn.per-job-cluster.include-job-jar` parameter. +By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the `yarn.per-job-cluster.include-user-jar` parameter. When setting this to `DISABLED` Flink will include the jar in the user classpath instead.