Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') wrongly managed on Yarn #6284

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/_includes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
<td style="word-wrap: break-word;">"child-first"</td>
<td>Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).</td>
</tr>
<tr>
<td><h5>internal.io.tmp.dirs.use-local-default</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>key, which says if default value is used for `io.tmp.dirs` config variable.</td>
</tr>
<tr>
<td><h5>io.tmp.dirs</h5></td>
<td style="word-wrap: break-word;">System.getProperty("java.io.tmpdir")</td>
<td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td>
<td></td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,32 @@ public Map<String, String> toMap() {
}
}

/**
* Removes given config option from the configuration.
*
* @param configOption config option to remove
* @param <T> Type of the config option
* @return true is config has been removed, false otherwise
*/
public <T> boolean removeConfig(ConfigOption<T> configOption){
synchronized (this.confData){
// try the current key
Object oldValue = this.confData.remove(configOption.key());
if (oldValue == null){
for (String deprecatedKey : configOption.deprecatedKeys()){
oldValue = this.confData.remove(deprecatedKey);
if (oldValue != null){
LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
deprecatedKey, configOption.key());
return true;
}
}
return false;
}
return true;
}
}


// --------------------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,19 @@ public static String[] getParentFirstLoaderPatterns(Configuration config) {
* The config parameter defining the directories for temporary files, separated by
* ",", "|", or the system's {@link java.io.File#pathSeparator}.
*/
@Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
@Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty(\"java.io.tmpdir\") in standalone.")
public static final ConfigOption<String> TMP_DIRS =
key("io.tmp.dirs")
.defaultValue(System.getProperty("java.io.tmpdir"))
.withDeprecatedKeys("taskmanager.tmp.dirs");

/**
* String key, which says if default value is used for `io.tmp.dirs` config variable.
*/
public static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmp.dirs.use-local-default")
.defaultValue(true)
.withDescription("key, which says if default value is used for `io.tmp.dirs` config variable.");

// ------------------------------------------------------------------------
// program
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ public Map<String, String> toMap() {
return prefixed;
}

@Override
public <T> boolean removeConfig(ConfigOption<T> configOption){
return backingConfig.removeConfig(configOption);
}

@Override
public boolean containsKey(String key) {
return backingConfig.containsKey(prefix + key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -303,4 +304,32 @@ public void testDeprecatedKeys() {
assertEquals(13, cfg.getInteger(matchesThird));
assertEquals(-1, cfg.getInteger(notContained));
}

@Test
public void testRemove(){
Configuration cfg = new Configuration();
cfg.setInteger("a", 1);
cfg.setInteger("b", 2);

ConfigOption<Integer> validOption = ConfigOptions
.key("a")
.defaultValue(-1);

ConfigOption<Integer> deprecatedOption = ConfigOptions
.key("c")
.defaultValue(-1)
.withDeprecatedKeys("d", "b");

ConfigOption<Integer> unexistedOption = ConfigOptions
.key("e")
.defaultValue(-1)
.withDeprecatedKeys("f", "g", "j");

assertEquals("Wrong expectation about size", cfg.keySet().size(), 2);
assertTrue("Expected 'validOption' is removed", cfg.removeConfig(validOption));
assertEquals("Wrong expectation about size", cfg.keySet().size(), 1);
assertTrue("Expected 'existedOption' is removed", cfg.removeConfig(deprecatedOption));
assertEquals("Wrong expectation about size", cfg.keySet().size(), 0);
assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.flink.mesos.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
Expand Down Expand Up @@ -173,15 +173,7 @@ public static Configuration loadConfiguration(Configuration dynamicProperties, L
final Map<String, String> envs = System.getenv();
final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);

// configure local directory
if (configuration.contains(CoreOptions.TMP_DIRS)) {
log.info("Overriding Mesos' temporary file directories with those " +
"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
}
else if (tmpDirs != null) {
log.info("Setting directories for temporary files to: {}", tmpDirs);
configuration.setString(CoreOptions.TMP_DIRS, tmpDirs);
}
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, tmpDirs);

return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,11 @@ public static Configuration generateTaskManagerConfiguration(
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
}

return cfg;
if (baseConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
cfg.removeConfig(CoreOptions.TMP_DIRS);
}

return cfg;
}

/**
Expand Down Expand Up @@ -467,4 +471,22 @@ public static String getStartCommand(String template,
}
return template;
}

/**
* Set temporary configuration directories if necessary
*
* @param configuration flink config to patch
* @param defaultDirs in case no tmp directories is set, next directories will be applied
*/
public static void updateTmpDirectoriesInConfiguration(Configuration configuration, String defaultDirs){
if (configuration.contains(CoreOptions.TMP_DIRS)) {
LOG.info("Overriding Fink's temporary file directories with those " +
"specified in the Flink config: {}", configuration.getValue(CoreOptions.TMP_DIRS));
configuration.setBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS, false);
}
else {
LOG.info("Setting directories for temporary files to: {}", defaultDirs);
configuration.setString(CoreOptions.TMP_DIRS, defaultDirs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,23 @@ public void testGetTaskManagerShellCommand() {
true, true, true, this.getClass()));

}

@Test
public void testUpdateTmpDirectoriesInConfiguration(){
Configuration config = new Configuration();

// test that default value is taken
BootstrapTools.updateTmpDirectoriesInConfiguration(config, "default/directory/path");
assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path");

// test that we ignore default value is value is set before
BootstrapTools.updateTmpDirectoriesInConfiguration(config, "not/default/directory/path");
assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path");

//test that empty value is not a magic string
config.setString(CoreOptions.TMP_DIRS, "");
BootstrapTools.updateTmpDirectoriesInConfiguration(config, "some/new/path");
assertEquals(config.getString(CoreOptions.TMP_DIRS), "");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
Expand Down Expand Up @@ -523,16 +522,8 @@ private static Configuration createConfiguration(String baseDirectory, Map<Strin
ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);

// configure local directory
if (configuration.contains(CoreOptions.TMP_DIRS)) {
log.info("Overriding YARN's temporary file directories with those " +
"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
}
else {
final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
log.info("Setting directories for temporary files to: {}", localDirs);
configuration.setString(CoreOptions.TMP_DIRS, localDirs);
}
final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);

return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
Expand Down Expand Up @@ -471,14 +472,19 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource
taskManagerParameters.taskManagerHeapSizeMB(),
taskManagerParameters.taskManagerDirectMemoryLimitMB());

log.debug("TaskManager configuration: {}", flinkConfig);
Configuration taskManagerConfig = flinkConfig.clone();
if (flinkConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
taskManagerConfig.removeConfig(CoreOptions.TMP_DIRS);
}

log.debug("TaskManager configuration: {}", taskManagerConfig);

ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
flinkConfig,
taskManagerConfig,
yarnConfig,
env,
taskManagerParameters,
flinkConfig,
taskManagerConfig,
currDir,
YarnTaskExecutorRunner.class,
log);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
Expand Down Expand Up @@ -100,15 +100,7 @@ private static void run(String[] args) {
final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);

// configure local directory
if (configuration.contains(CoreOptions.TMP_DIRS)) {
LOG.info("Overriding YARN's temporary file directories with those " +
"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
}
else {
LOG.info("Setting directories for temporary files to: {}", localDirs);
configuration.setString(CoreOptions.TMP_DIRS, localDirs);
}
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);

// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
Expand Down Expand Up @@ -121,15 +121,7 @@ public static Runner create(
final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);

// configure local directory
if (configuration.contains(CoreOptions.TMP_DIRS)) {
LOG.info("Overriding YARN's temporary file directories with those " +
"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
}
else {
LOG.info("Setting directories for temporary files to: {}", localDirs);
configuration.setString(CoreOptions.TMP_DIRS, localDirs);
}
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);

// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
Expand Down Expand Up @@ -130,16 +129,8 @@ public static Configuration loadConfiguration(String workingDirectory, Map<Strin
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}

// configure local directory
if (configuration.contains(CoreOptions.TMP_DIRS)) {
log.info("Overriding YARN's temporary file directories with those " +
"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
}
else {
final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
log.info("Setting directories for temporary files to: {}", localDirs);
configuration.setString(CoreOptions.TMP_DIRS, localDirs);
}
final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);

return configuration;
}
Expand Down