New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') wrongly managed on Yarn #6284
Conversation
a4b15dc
to
106e25e
Compare
find related test error, please recheck~ |
2cbd8e3
to
8048552
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution @JTaky. I had some comments how we could streamline it a bit more.
Did you test the changes on a cluster? I think the Yarn part should not have worked since we passed the cloned config as the wrong argument to Utils#createTaskExecutorContext
.
@@ -181,12 +181,17 @@ | |||
* The config parameter defining the directories for temporary files, separated by | |||
* ",", "|", or the system's {@link java.io.File#pathSeparator}. | |||
*/ | |||
@Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")") | |||
@Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn and '_FLINK_TMP_DIR' on Mesos.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also keep saying that in the standalone mode we use System.getProperty("java.io.tmpdir")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public static void setTmpDirectoriesConfig(Configuration configuration, String defaultDirs){ | ||
if (configuration.contains(CoreOptions.TMP_DIRS) && !configuration.getString(CoreOptions.TMP_DIRS).isEmpty()) { | ||
LOG.info("Overriding Fink's temporary file directories with those " + | ||
"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use logging placeholder {}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* @param configuration flink config to patch | ||
* @param defaultDirs in case no tmp directories is set, next directories will be applied | ||
*/ | ||
public static void setTmpDirectoriesConfig(Configuration configuration, String defaultDirs){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we maybe call it updateTmpDirectoriesInConfiguration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/** | ||
* String key, which says if variable `java.io.tmpdir` has been overridden for the cluster. | ||
*/ | ||
public static final String TMP_DIRS_OVERRIDDEN = "io.tmp.dirs.overridden"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this a proper ConfigOption<Boolean>
with name USE_LOCAL_DEFAULT_TMP_DIRS
key name internal.io.tmp.dirs.use-local-default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -250,6 +250,10 @@ public static Configuration generateTaskManagerConfiguration( | |||
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots); | |||
} | |||
|
|||
if (!baseConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){ | |||
cfg.setString(CoreOptions.TMP_DIRS, ""); // HACK: emulate removal for the given key | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to add a BootstratpTools#cloneConfiguration(Configuration)
method which clones the given Configuration
and resets all node specific fields. One of these fields is CoreOptions#TMP_DIRS
. This means if USE_LOCAL_DEFAULT_TMP_DIRS
is true, then we clear the TMP_DIRS
option. We actually need for that a Configuration#clear(ConfigOption)
method which removes the option entry from Configuration#confData
. Otherwise we might risk that we don't use the TMP_DIRS default value but instead ""
if there are no local default tmp dirs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, magic values are the dirtiest way.
Will go with a 'clear' (probably remove, in order to mimic java collection API) method.
Didn't get the point with clone method. Is it in order to factorize this 4 lines or do you want to make it generic and use for all custom settings? In such case we should extract list of predicates per each configuration which looks quite complex as an API
Configuration taskManagerConfig = flinkConfig.clone(); | ||
if (!flinkConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){ | ||
taskManagerConfig.setString(CoreOptions.TMP_DIRS, ""); // HACK: emulate removal for the given key | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace with BootstratpTools#cloneConfiguration(Configuration)
in order to reduce code duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question as before :( didn't get the idea
@@ -473,8 +474,13 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource | |||
|
|||
log.debug("TaskManager configuration: {}", flinkConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log the flinkConfig
after cloning/resetting partial values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext( | ||
flinkConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we should keep the flinkConfig
and instead replace the 5th argument with taskManagerConfig
because it is this Configuration
which is given to the new TaskManager
container.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. Thank you very much!
Since you seem fine with this hacky approach I have tested and made stable the last PR on our Yarn cluster.
85c34ba
to
684aa97
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @JTaky. Merging this PR now.
What is the purpose of the change
Currently TMP_DIRS (io.tmp.dirs) option has a bug in case of default behaviour on Yarn platform:
If not value has been set via parameter on Yarn platform the env value 'LOCAL_DIRS' is used on Application Master.
When task manager is started this 'LOCAL_DIRS' is passed to the TaskManager, so TaskManager do not understand that he needs to have a default behaviour.
The proposed
hackfix is to remove TMP_DIRS value in case if default behaviour is required before starting TaskManager.Brief change log
** Add a new flag, saying io.tmp.dirs is value has been overridden
** If io.tmp.dirs has not been overridden we remove io.tmp.dirs remove config value, by setting the empty value (should we remove? clone without a given key?)
Verifying this change
This change is already covered by existing tests, such as MiniClusterResource for Mesos part.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation