Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/_includes/generated/yarn_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<tr>
<td><h5>yarn.per-job-cluster.include-user-jar</h5></td>
<td style="word-wrap: break-word;">"ORDER"</td>
<td>Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER"). Setting this parameter to "DISABLED" causes the jar to be included in the user class path instead.</td>
<td>Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER").</td>
</tr>
<tr>
<td><h5>yarn.properties-file.location</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Lightweight configuration object which stores key/value pairs.
*/
Expand Down Expand Up @@ -608,6 +611,33 @@ public String getValue(ConfigOption<?> configOption) {
return o == null ? null : o.toString();
}

/**
* Returns the value associated with the given config option as an enum.
*
* @param enumClass The return enum class
* @param configOption The configuration option
* @throws IllegalArgumentException If the string associated with the given config option cannot
* be parsed as a value of the provided enum class.
*/
@PublicEvolving
public <T extends Enum<T>> T getEnum(
final Class<T> enumClass,
final ConfigOption<String> configOption) {
checkNotNull(enumClass, "enumClass must not be null");
checkNotNull(configOption, "configOption must not be null");

final String configValue = getString(configOption);
try {
return Enum.valueOf(enumClass, configValue.toUpperCase(Locale.ROOT));
} catch (final IllegalArgumentException | NullPointerException e) {
final String errorMessage = String.format("Value for config option %s must be one of %s (was %s)",
configOption.key(),
Arrays.toString(enumClass.getEnumConstants()),
configValue);
throw new IllegalArgumentException(errorMessage, e);
}
}

// --------------------------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ public String getValue(ConfigOption<?> configOption) {
return this.backingConfig.getValue(prefixOption(configOption, prefix));
}

@Override
public <T extends Enum<T>> T getEnum(final Class<T> enumClass, final ConfigOption<String> configOption) {
return this.backingConfig.getEnum(enumClass, prefixOption(configOption, prefix));
}

@Override
public void addAllToProperties(Properties props) {
// only add keys with our prefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@

import org.junit.Test;

import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -403,4 +405,56 @@ public void testRemove(){
assertEquals("Wrong expectation about size", cfg.keySet().size(), 0);
assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption));
}

@Test
public void testShouldParseValidStringToEnum() {
final ConfigOption<String> configOption = createStringConfigOption();

final Configuration configuration = new Configuration();
configuration.setString(configOption.key(), TestEnum.VALUE1.toString());

final TestEnum parsedEnumValue = configuration.getEnum(TestEnum.class, configOption);
assertEquals(TestEnum.VALUE1, parsedEnumValue);
}

@Test
public void testShouldParseValidStringToEnumIgnoringCase() {
final ConfigOption<String> configOption = createStringConfigOption();

final Configuration configuration = new Configuration();
configuration.setString(configOption.key(), TestEnum.VALUE1.toString().toLowerCase());

final TestEnum parsedEnumValue = configuration.getEnum(TestEnum.class, configOption);
assertEquals(TestEnum.VALUE1, parsedEnumValue);
}

@Test
public void testThrowsExceptionIfTryingToParseInvalidStringForEnum() {
final ConfigOption<String> configOption = createStringConfigOption();

final Configuration configuration = new Configuration();
final String invalidValueForTestEnum = "InvalidValueForTestEnum";
configuration.setString(configOption.key(), invalidValueForTestEnum);

try {
configuration.getEnum(TestEnum.class, configOption);
fail("Expected exception not thrown");
} catch (IllegalArgumentException e) {
final String expectedMessage = "Value for config option " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Personally prefer the approach with @Rule

	@Rule
	public ExpectedException thrown = ExpectedException.none();

    @Test
     public testMethod() {
       ...
       thrown.expect(IllegalArgumentException.class);
       thrown.message(....);
      }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current solution is legible, and frequently used in Flink and elsewhere. I'll leave it as is.

configOption.key() + " must be one of [VALUE1, VALUE2] (was " +
invalidValueForTestEnum + ")";
assertThat(e.getMessage(), containsString(expectedMessage));
}
}

enum TestEnum {
VALUE1,
VALUE2
}

private static ConfigOption<String> createStringConfigOption() {
return ConfigOptions
.key("test-string-key")
.noDefaultValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1825,7 +1825,6 @@ void notifyExecutionChange(
// see what this means for us. currently, the first FAILED state means -> FAILED
if (newExecutionState == ExecutionState.FAILED) {
final Throwable ex = error != null ? error : new FlinkException("Unknown Error (missing cause)");
long timestamp = execution.getStateTimestamp(ExecutionState.FAILED);

// by filtering out late failure calls, we can save some work in
// avoiding redundant local failover
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,19 +763,14 @@ public ApplicationReport startAppMaster(
localResources,
envShipFileList);

List<String> userClassPaths;
if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) {
userClassPaths = uploadAndRegisterFiles(
userJarFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);
} else {
userClassPaths = Collections.emptyList();
}
final List<String> userClassPaths = uploadAndRegisterFiles(
userJarFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);

if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
Expand Down Expand Up @@ -1602,15 +1597,16 @@ protected ContainerLaunchContext setupApplicationMasterContainer(
}

private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(org.apache.flink.configuration.Configuration config) {
String configuredUserJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
try {
return YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase());
} catch (IllegalArgumentException e) {
LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).",
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(),
configuredUserJarInclusion,
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
return YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(config);

return config.getEnum(YarnConfigOptions.UserJarInclusion.class, YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
}

private static void throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(final Configuration config) {
final String userJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
if ("DISABLED".equalsIgnoreCase(userJarInclusion)) {
throw new IllegalArgumentException(String.format("Config option %s cannot be set to DISABLED anymore (see FLINK-11781)",
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public class YarnConfigOptions {
.defaultValue("ORDER")
.withDescription("Defines whether user-jars are included in the system class path for per-job-clusters as" +
" well as their positioning in the path. They can be positioned at the beginning (\"FIRST\"), at the" +
" end (\"LAST\"), or be positioned based on their name (\"ORDER\"). Setting this parameter to" +
" \"DISABLED\" causes the jar to be included in the user class path instead.");
" end (\"LAST\"), or be positioned based on their name (\"ORDER\").");

/**
* The vcores exposed by YARN.
Expand Down Expand Up @@ -156,7 +155,6 @@ private YarnConfigOptions() {}

/** @see YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR */
public enum UserJarInclusion {
DISABLED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check where this is parsed to the enum and throw a meaningful exception explaining that DISABLED has been removed.

Copy link
Member Author

@GJL GJL Mar 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Flink 1.5 to 1.7:

  • If DISABLED is set, the job keeps failing.
  • If an invalid value is set, the default value (ORDER) is used, and a warning is logged ("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).").

If we would apply the PR as is, we would always log a warning (because DISABLED will be considered an invalid value). You are proposing to throw an exception with a customized message if DISABLED is set. This would help users that are upgrading from 1.4 to 1.8 directly. So I could do that but I think it makes sense to also propagate the exception for all other invalid values instead of logging a warning. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for throwing exception rather than just logging, when mismatch.

FIRST,
LAST,
ORDER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
import java.util.Set;

import static junit.framework.TestCase.assertTrue;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

/**
Expand Down Expand Up @@ -89,6 +91,27 @@ public static void tearDownClass() {
yarnClient.stop();
}

/**
* @see <a href="https://issues.apache.org/jira/browse/FLINK-11781">FLINK-11781</a>
*/
@Test
public void testThrowsExceptionIfUserTriesToDisableUserJarInclusionInSystemClassPath() {
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR, "DISABLED");

try {
new YarnClusterDescriptor(
configuration,
yarnConfiguration,
temporaryFolder.getRoot().getAbsolutePath(),
yarnClient,
true);
fail("Expected exception not thrown");
} catch (final IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("cannot be set to DISABLED anymore"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: As before -> I prefer the @Rule approach.

}
}

@Test
public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentException {
final Configuration flinkConfiguration = new Configuration();
Expand Down