From 9ac14a6048982d5bdd19379114ef109483c2abce Mon Sep 17 00:00:00 2001 From: Gary Yao Date: Fri, 1 Mar 2019 18:06:38 +0100 Subject: [PATCH 1/2] [hotfix][runtime] Remove unused local variable in ExecutionGraph --- .../org/apache/flink/runtime/executiongraph/ExecutionGraph.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index e64b2b9b76db0..ff7e1233a7828 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1825,7 +1825,6 @@ void notifyExecutionChange( // see what this means for us. currently, the first FAILED state means -> FAILED if (newExecutionState == ExecutionState.FAILED) { final Throwable ex = error != null ? error : new FlinkException("Unknown Error (missing cause)"); - long timestamp = execution.getStateTimestamp(ExecutionState.FAILED); // by filtering out late failure calls, we can save some work in // avoiding redundant local failover From 6f840082144c8a340a1ce0a820d268510e2853a1 Mon Sep 17 00:00:00 2001 From: Gary Yao Date: Sun, 3 Mar 2019 19:12:54 +0100 Subject: [PATCH 2/2] [FLINK-11781][yarn] Remove "DISABLED" as possible value for yarn.per-job-cluster.include-user-jar Remove this feature because it is broken since Flink 1.5 This closes #7883. --- .../generated/yarn_config_configuration.html | 2 +- .../flink/configuration/Configuration.java | 30 +++++++++++ .../DelegatingConfiguration.java | 5 ++ .../configuration/ConfigurationTest.java | 54 +++++++++++++++++++ .../yarn/AbstractYarnClusterDescriptor.java | 40 +++++++------- .../yarn/configuration/YarnConfigOptions.java | 4 +- .../flink/yarn/YarnClusterDescriptorTest.java | 23 ++++++++ 7 files changed, 132 insertions(+), 26 deletions(-) diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html index bbe25499f175a..ab7e22482b942 100644 --- a/docs/_includes/generated/yarn_config_configuration.html +++ b/docs/_includes/generated/yarn_config_configuration.html @@ -45,7 +45,7 @@
yarn.per-job-cluster.include-user-jar
"ORDER" - Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER"). Setting this parameter to "DISABLED" causes the jar to be included in the user class path instead. + Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER").
yarn.properties-file.location
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index 07c0290d66c3b..186cf10722be9 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -33,10 +33,13 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.Set; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Lightweight configuration object which stores key/value pairs. */ @@ -608,6 +611,33 @@ public String getValue(ConfigOption configOption) { return o == null ? null : o.toString(); } + /** + * Returns the value associated with the given config option as an enum. + * + * @param enumClass The return enum class + * @param configOption The configuration option + * @throws IllegalArgumentException If the string associated with the given config option cannot + * be parsed as a value of the provided enum class. + */ + @PublicEvolving + public > T getEnum( + final Class enumClass, + final ConfigOption configOption) { + checkNotNull(enumClass, "enumClass must not be null"); + checkNotNull(configOption, "configOption must not be null"); + + final String configValue = getString(configOption); + try { + return Enum.valueOf(enumClass, configValue.toUpperCase(Locale.ROOT)); + } catch (final IllegalArgumentException | NullPointerException e) { + final String errorMessage = String.format("Value for config option %s must be one of %s (was %s)", + configOption.key(), + Arrays.toString(enumClass.getEnumConstants()), + configValue); + throw new IllegalArgumentException(errorMessage, e); + } + } + // -------------------------------------------------------------------------------------------- /** diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java index 0a0a7773e2b09..b0249a050707a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java @@ -244,6 +244,11 @@ public String getValue(ConfigOption configOption) { return this.backingConfig.getValue(prefixOption(configOption, prefix)); } + @Override + public > T getEnum(final Class enumClass, final ConfigOption configOption) { + return this.backingConfig.getEnum(enumClass, prefixOption(configOption, prefix)); + } + @Override public void addAllToProperties(Properties props) { // only add keys with our prefix diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java index f02d59e432a7c..9727b44ee16a9 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java @@ -23,10 +23,12 @@ import org.junit.Test; +import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -403,4 +405,56 @@ public void testRemove(){ assertEquals("Wrong expectation about size", cfg.keySet().size(), 0); assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption)); } + + @Test + public void testShouldParseValidStringToEnum() { + final ConfigOption configOption = createStringConfigOption(); + + final Configuration configuration = new Configuration(); + configuration.setString(configOption.key(), TestEnum.VALUE1.toString()); + + final TestEnum parsedEnumValue = configuration.getEnum(TestEnum.class, configOption); + assertEquals(TestEnum.VALUE1, parsedEnumValue); + } + + @Test + public void testShouldParseValidStringToEnumIgnoringCase() { + final ConfigOption configOption = createStringConfigOption(); + + final Configuration configuration = new Configuration(); + configuration.setString(configOption.key(), TestEnum.VALUE1.toString().toLowerCase()); + + final TestEnum parsedEnumValue = configuration.getEnum(TestEnum.class, configOption); + assertEquals(TestEnum.VALUE1, parsedEnumValue); + } + + @Test + public void testThrowsExceptionIfTryingToParseInvalidStringForEnum() { + final ConfigOption configOption = createStringConfigOption(); + + final Configuration configuration = new Configuration(); + final String invalidValueForTestEnum = "InvalidValueForTestEnum"; + configuration.setString(configOption.key(), invalidValueForTestEnum); + + try { + configuration.getEnum(TestEnum.class, configOption); + fail("Expected exception not thrown"); + } catch (IllegalArgumentException e) { + final String expectedMessage = "Value for config option " + + configOption.key() + " must be one of [VALUE1, VALUE2] (was " + + invalidValueForTestEnum + ")"; + assertThat(e.getMessage(), containsString(expectedMessage)); + } + } + + enum TestEnum { + VALUE1, + VALUE2 + } + + private static ConfigOption createStringConfigOption() { + return ConfigOptions + .key("test-string-key") + .noDefaultValue(); + } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index a87772f123166..783a459799d3b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -763,19 +763,14 @@ public ApplicationReport startAppMaster( localResources, envShipFileList); - List userClassPaths; - if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) { - userClassPaths = uploadAndRegisterFiles( - userJarFiles, - fs, - homeDir, - appId, - paths, - localResources, - envShipFileList); - } else { - userClassPaths = Collections.emptyList(); - } + final List userClassPaths = uploadAndRegisterFiles( + userJarFiles, + fs, + homeDir, + appId, + paths, + localResources, + envShipFileList); if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) { systemClassPaths.addAll(userClassPaths); @@ -1602,15 +1597,16 @@ protected ContainerLaunchContext setupApplicationMasterContainer( } private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(org.apache.flink.configuration.Configuration config) { - String configuredUserJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); - try { - return YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); - } catch (IllegalArgumentException e) { - LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", - YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), - configuredUserJarInclusion, - YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); - return YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(config); + + return config.getEnum(YarnConfigOptions.UserJarInclusion.class, YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); + } + + private static void throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(final Configuration config) { + final String userJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); + if ("DISABLED".equalsIgnoreCase(userJarInclusion)) { + throw new IllegalArgumentException(String.format("Config option %s cannot be set to DISABLED anymore (see FLINK-11781)", + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key())); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java index b0594756e9e93..6abeb0d910f14 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java @@ -57,8 +57,7 @@ public class YarnConfigOptions { .defaultValue("ORDER") .withDescription("Defines whether user-jars are included in the system class path for per-job-clusters as" + " well as their positioning in the path. They can be positioned at the beginning (\"FIRST\"), at the" + - " end (\"LAST\"), or be positioned based on their name (\"ORDER\"). Setting this parameter to" + - " \"DISABLED\" causes the jar to be included in the user class path instead."); + " end (\"LAST\"), or be positioned based on their name (\"ORDER\")."); /** * The vcores exposed by YARN. @@ -156,7 +155,6 @@ private YarnConfigOptions() {} /** @see YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR */ public enum UserJarInclusion { - DISABLED, FIRST, LAST, ORDER diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index 16b3586900018..6c2e42e313396 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -53,7 +53,9 @@ import java.util.Set; import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; /** @@ -89,6 +91,27 @@ public static void tearDownClass() { yarnClient.stop(); } + /** + * @see FLINK-11781 + */ + @Test + public void testThrowsExceptionIfUserTriesToDisableUserJarInclusionInSystemClassPath() { + final Configuration configuration = new Configuration(); + configuration.setString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR, "DISABLED"); + + try { + new YarnClusterDescriptor( + configuration, + yarnConfiguration, + temporaryFolder.getRoot().getAbsolutePath(), + yarnClient, + true); + fail("Expected exception not thrown"); + } catch (final IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("cannot be set to DISABLED anymore")); + } + } + @Test public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentException { final Configuration flinkConfiguration = new Configuration();