From c7b59780c0d69fcf48c524f0b3807bad4af3eaf1 Mon Sep 17 00:00:00 2001 From: Patrick Lucas Date: Fri, 3 Feb 2017 19:17:55 -0500 Subject: [PATCH 1/2] [FLINK-5153] Support YARN application tags Adds a new config option `yarn.tags`, a comma-separated list of strings passed to YARN as application tags. --- docs/setup/config.md | 2 + .../flink/configuration/ConfigConstants.java | 5 + .../yarn/AbstractYarnClusterDescriptor.java | 114 +++++++++++++----- 3 files changed, 93 insertions(+), 28 deletions(-) diff --git a/docs/setup/config.md b/docs/setup/config.md index 6bde26a79ce6b..c8bb0b5882eef 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -431,6 +431,8 @@ use the `env.java.opts` setting, which is the `%jvmopts%` variable in the String For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports. +- `yarn.tags` A comma-separated list of tags to apply to the Flink YARN application. + ### Mesos diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 14ba9ddcb486c..c608fdeb92e52 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -442,6 +442,11 @@ public final class ConfigConstants { */ public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port"; + /** + * A comma-separated list of strings to use as YARN application tags. + */ + public static final String YARN_APPLICATION_TAGS = "yarn.tags"; + // ------------------------ Mesos Configuration ------------------------ diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index b537e0952689f..21599c1dbc356 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -855,7 +855,7 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib name = customName; } - appContext.setApplicationName(name); // application name + appContext.setApplicationName(name); appContext.setApplicationType("Apache Flink"); appContext.setAMContainerSpec(amContainer); appContext.setResource(capability); @@ -863,6 +863,8 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib appContext.setQueue(yarnQueue); } + setApplicationTags(appContext); + // add a hook to clean up in case deployment fails Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication); Runtime.getRuntime().addShutdownHook(deploymentFailureHook); @@ -1024,75 +1026,117 @@ public void setName(String name) { customName = name; } - private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException { + private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws + InvocationTargetException, IllegalAccessException { + ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance(); reflector.setKeepContainersAcrossApplicationAttempts(appContext, true); reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout(flinkConfiguration).toMillis()); } + private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException, + IllegalAccessException { + + final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance(); + final String tagsString = flinkConfiguration.getString(ConfigConstants.YARN_APPLICATION_TAGS, ""); + + final Set applicationTags = new HashSet<>(); + + // Trim whitespace and cull empty tags + for (final String tag : tagsString.split(",")) { + final String trimmedTag = tag.trim(); + if (!trimmedTag.isEmpty()) { + applicationTags.add(trimmedTag); + } + } + + reflector.setApplicationTags(appContext, applicationTags); + } + /** * Singleton object which uses reflection to determine whether the {@link ApplicationSubmissionContext} - * supports the setKeepContainersAcrossApplicationAttempts and the setAttemptFailuresValidityInterval - * methods. Depending on the Hadoop version these methods are supported or not. If the methods - * are not supported, then nothing happens when setKeepContainersAcrossApplicationAttempts or - * setAttemptFailuresValidityInterval are called. + * supports various methods which, depending on the Hadoop version, may or may not be supported. + * + * If an unsupported method is invoked, nothing happens. + * + * Currently three methods are proxied: + * - setApplicationTags (>= 2.4.0) + * - setAttemptFailuresValidityInterval (>= 2.6.0) + * - setKeepContainersAcrossApplicationAttempts (>= 2.4.0) */ private static class ApplicationSubmissionContextReflector { private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class); - private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class); + private static final ApplicationSubmissionContextReflector instance = + new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class); public static ApplicationSubmissionContextReflector getInstance() { return instance; } - private static final String keepContainersMethodName = "setKeepContainersAcrossApplicationAttempts"; - private static final String attemptsFailuresValidityIntervalMethodName = "setAttemptFailuresValidityInterval"; + private static final String APPLICATION_TAGS_METHOD_NAME = "setApplicationTags"; + private static final String ATTEMPT_FAILURES_METHOD_NAME = "setAttemptFailuresValidityInterval"; + private static final String KEEP_CONTAINERS_METHOD_NAME = "setKeepContainersAcrossApplicationAttempts"; - private final Method keepContainersMethod; + private final Method applicationTagsMethod; private final Method attemptFailuresValidityIntervalMethod; + private final Method keepContainersMethod; private ApplicationSubmissionContextReflector(Class clazz) { - Method keepContainersMethod; + Method applicationTagsMethod; Method attemptFailuresValidityIntervalMethod; + Method keepContainersMethod; try { // this method is only supported by Hadoop 2.4.0 onwards - keepContainersMethod = clazz.getMethod(keepContainersMethodName, boolean.class); - LOG.debug("{} supports method {}.", clazz.getCanonicalName(), keepContainersMethodName); + applicationTagsMethod = clazz.getMethod(APPLICATION_TAGS_METHOD_NAME, Set.class); + LOG.debug("{} supports method {}.", clazz.getCanonicalName(), APPLICATION_TAGS_METHOD_NAME); } catch (NoSuchMethodException e) { - LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), keepContainersMethodName); + LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), APPLICATION_TAGS_METHOD_NAME); // assign null because the Hadoop version apparently does not support this call. - keepContainersMethod = null; + applicationTagsMethod = null; } - this.keepContainersMethod = keepContainersMethod; + this.applicationTagsMethod = applicationTagsMethod; try { // this method is only supported by Hadoop 2.6.0 onwards - attemptFailuresValidityIntervalMethod = clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class); - LOG.debug("{} supports method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName); + attemptFailuresValidityIntervalMethod = clazz.getMethod(ATTEMPT_FAILURES_METHOD_NAME, long.class); + LOG.debug("{} supports method {}.", clazz.getCanonicalName(), ATTEMPT_FAILURES_METHOD_NAME); } catch (NoSuchMethodException e) { - LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName); + LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), ATTEMPT_FAILURES_METHOD_NAME); // assign null because the Hadoop version apparently does not support this call. attemptFailuresValidityIntervalMethod = null; } this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod; - } - public void setKeepContainersAcrossApplicationAttempts( - ApplicationSubmissionContext appContext, - boolean keepContainers) throws InvocationTargetException, IllegalAccessException { + try { + // this method is only supported by Hadoop 2.4.0 onwards + keepContainersMethod = clazz.getMethod(KEEP_CONTAINERS_METHOD_NAME, boolean.class); + LOG.debug("{} supports method {}.", clazz.getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME); + } catch (NoSuchMethodException e) { + LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME); + // assign null because the Hadoop version apparently does not support this call. + keepContainersMethod = null; + } - if (keepContainersMethod != null) { - LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(), + this.keepContainersMethod = keepContainersMethod; + } + + public void setApplicationTags( + ApplicationSubmissionContext appContext, + Set applicationTags) throws InvocationTargetException, IllegalAccessException { + if (applicationTagsMethod != null) { + LOG.debug("Calling method {} of {}.", + applicationTagsMethod.getName(), appContext.getClass().getCanonicalName()); - keepContainersMethod.invoke(appContext, keepContainers); + applicationTagsMethod.invoke(appContext, applicationTags); } else { LOG.debug("{} does not support method {}. Doing nothing.", - appContext.getClass().getCanonicalName(), keepContainersMethodName); + appContext.getClass().getCanonicalName(), + APPLICATION_TAGS_METHOD_NAME); } } @@ -1107,7 +1151,21 @@ public void setAttemptFailuresValidityInterval( } else { LOG.debug("{} does not support method {}. Doing nothing.", appContext.getClass().getCanonicalName(), - attemptsFailuresValidityIntervalMethodName); + ATTEMPT_FAILURES_METHOD_NAME); + } + } + + public void setKeepContainersAcrossApplicationAttempts( + ApplicationSubmissionContext appContext, + boolean keepContainers) throws InvocationTargetException, IllegalAccessException { + + if (keepContainersMethod != null) { + LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(), + appContext.getClass().getCanonicalName()); + keepContainersMethod.invoke(appContext, keepContainers); + } else { + LOG.debug("{} does not support method {}. Doing nothing.", + appContext.getClass().getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME); } } } From 9623f368697e28021351e88a0699f62839397a2e Mon Sep 17 00:00:00 2001 From: Patrick Lucas Date: Tue, 7 Feb 2017 12:47:21 -0500 Subject: [PATCH 2/2] [FLINK-5153] Add test for YARN application tags --- .../YARNSessionCapacitySchedulerITCase.java | 43 +++++++++++++++---- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index ec66eb23617e5..2a3b6c6cfb007 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.base.Joiner; +import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; @@ -53,13 +54,9 @@ import java.io.File; import java.io.FilenameFilter; import java.io.IOException; -import java.util.EnumSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Collections; -import java.util.Comparator; -import java.util.Arrays; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -413,11 +410,14 @@ private void testDetachedPerJobYarnClusterInternal(String job) { throw new RuntimeException(e); } - Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), + Runner runner = startWithArgs(new String[]{ + "run", "-m", "yarn-cluster", + "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), "-yn", "1", "-yjm", "768", "-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly + "-yD", "yarn.tags=test-tag", "-ytm", "1024", "-ys", "2", // test requesting slots from YARN. "--yarndetached", job, @@ -516,6 +516,7 @@ public boolean accept(File dir, String name) { LOG.info("Got report {}", rep); } while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING); + verifyApplicationTags(rep); } catch(Throwable t) { LOG.warn("Error while detached yarn session was running", t); Assert.fail(t.getMessage()); @@ -543,6 +544,32 @@ public boolean accept(File dir, String name) { } } + /** + * Ensures that the YARN application tags were set properly. + * + * Since YARN application tags were only added in Hadoop 2.4, but Flink still supports Hadoop 2.3, reflection is + * required to invoke the methods. If the method does not exist, this test passes. + */ + private void verifyApplicationTags(final ApplicationReport report) throws InvocationTargetException, + IllegalAccessException { + + final Method applicationTagsMethod; + + Class clazz = ApplicationReport.class; + try { + // this method is only supported by Hadoop 2.4.0 onwards + applicationTagsMethod = clazz.getMethod("getApplicationTags"); + } catch (NoSuchMethodException e) { + // only verify the tags if the method exists + return; + } + + @SuppressWarnings("unchecked") + Set applicationTags = (Set) applicationTagsMethod.invoke(report); + + Assert.assertEquals(applicationTags, Sets.newHashSet("test-tag")); + } + @After public void checkForProhibitedLogContents() { ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);