-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-11470][runtime] Pass configurations to filesystems when executing in LocalStreamEnvironment #17580
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit bbad41f (Wed Oct 27 14:32:56 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
Outdated
Show resolved
Hide resolved
AHeise
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mostly looks good to me. However, I'd like to get a second opinion as quite a few things are not clear enough to me.
| import org.apache.flink.api.common.functions.MapFunction; | ||
| import org.apache.flink.api.java.DataSet; | ||
| import org.apache.flink.api.java.ExecutionEnvironment; | ||
| import org.apache.flink.api.java.io.DiscardingOutputFormat; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commit + PR message should rather read: Initialize filesystems when run in LocalStreamEnvironment.
The issue is not that configuration was incorrectly passed to the fs but rather that plugins where not initialized at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The filesystems are initialized anyways but if we do not load them through the pluginloader the configuration is not passed to them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, we should probably remove the fallback and fail hard to exactly find these kind of issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw same question as below: have we actually initialized plugins before your fix at all? I don't think so.
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
Outdated
Show resolved
Hide resolved
| final PluginManager pluginManager = | ||
| PluginUtils.createPluginManagerFromRootFolder(configuration); | ||
| FileSystem.initialize(configuration, pluginManager); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is MiniCluster always and only used for LocalExecutionEnvironment? I think the fix is legit but I'm wondering if the description is accurate. Are we solving an issue in the LocalExecutor or in MiniCluster?
I also don't quite understand why ClusterEntrypoint is not used at all in LocalExecutionEnvironment. @zentol could you please also have peek?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is MiniCluster always and only used for LocalExecutionEnvironment?
No. It can also be used if the Test(Stream)Environments are explicitly set by the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ClusterEntryPoint is used for actual clusters where the TMs run externally. The MiniCluster is the single-JVM equivalent.
| final StreamExecutionEnvironment environment = new LocalStreamEnvironment(config); | ||
| environment.getCheckpointConfig().setCheckpointStorage("testConfig:/" + tmp.newFolder()); | ||
| environment.enableCheckpointing(100); | ||
| environment.fromElements(1, 2, 3, 4).map((MapFunction<Integer, Integer>) value -> value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this cast needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not about the cast but rather the execution, without inserting the mapper the job is not executable.
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
Outdated
Show resolved
Hide resolved
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
Outdated
Show resolved
Hide resolved
flink-clients/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
Outdated
Show resolved
Hide resolved
zentol
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I get the idea I'm very concerned about the singleton nature of the FileSystem class, and I don't know how to resolve that.
| final PluginManager pluginManager = | ||
| PluginUtils.createPluginManagerFromRootFolder(configuration); | ||
| FileSystem.initialize(configuration, pluginManager); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ClusterEntryPoint is used for actual clusters where the TMs run externally. The MiniCluster is the single-JVM equivalent.
|
|
||
| try { | ||
| final PluginManager pluginManager = | ||
| PluginUtils.createPluginManagerFromRootFolder(configuration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pretty much doesn't do anything so I'd suggest to use FileSystem#initialize(Configuration).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FileSystem#initialize(Configuration) method is marked as deprecated I did not want to add a usage again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then you can also pass null for the PluginManager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is null sufficient? I think the pluginManager is needed such that files in the plugin folder are loaded at all. That's the only way to read/write to S3 in local setup. I'm probably missing something here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filesystems can still be loaded through the FileSystemFactory service if they are on the classpath.
Plugins in local environments are a bit eh because there's a) no sane way to control the plugin directory (no, setting some internal env variable is not sane) and b) you must not add a dependency on said FS otherwise you lose all the benefits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but users are trying out their Flink job locally and use a plugin folder in their working directories for s3 (that works by default).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| try { | ||
| final PluginManager pluginManager = | ||
| PluginUtils.createPluginManagerFromRootFolder(configuration); | ||
| FileSystem.initialize(configuration, pluginManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems sketchy. Because FileSystem is partially a singleton this could have surprising effects on other tests if a FS is provided for one of the default schemes (e.g., a custom LocalFileSystem).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be safer to move it to the LocalExecutor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so because all tests using the MiniClusters implicitly go through the LocalEnvironment.
While it is true that users cannot set credentials via the configuration passed to the environment, they could call FileSystem#initialize() themselves.
Yes, it is surprising and not ergonomic and all that.
Alternatively we would need a way to reset the FileSystem (i.e., call initialize again with no configuration) after each test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we add a test-only uninitialize? Wouldn't that be the cleanest option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And who calls that? Every test that uses a filesystem? Every test that does something special? How do you enforce that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every test that calls initialize must call uninitialize. We can't enforce it but rather make sure that all existing tests are abiding it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests might not explicitly call initialize; if they have a factory on the CP then it is loaded implicitly on first use.
21df3dc to
d1682e7
Compare
|
@flinkbot run azure |
…ing in LocalStreamEnvironment Before this change, Flink's configuration where not passed to the filesytems if used in LocalStreamEnvironment. This may lead to i.e. missing credentials. This change initializes the filesytems through the pluginloader which forwards the configuration as it is done when running a real cluster.
AHeise
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change looks good. I left one more point to discuss before approval.
| /** MiniCluster to execute Flink jobs locally. */ | ||
| public class MiniCluster implements AutoCloseableAsync { | ||
|
|
||
| public static final ConfigOption<String> PLUGIN_DIRECTORY = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a config option here? I was thinking that it would be better in this case to add a simple String field to MiniClusterConfiguration iff the option is only ever used in the MiniCluster. Then we can make it easier to use in MiniClusterConfiguration#Builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this depends a bit if a user only wants to execute a jar locally. In this case one usually does not interact with the minicluster it is only started in the background by the LocalExecutionEnvironment.
Perhaps one option can be to offer both possibilities but this may also confuse users. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to support LocalExecutionEnvironment then, we should move the options to somewhere more visible. Then the question arises why we are not supporting all envs ;).
Or we just say that for MiniCluster that we just use MiniClusterConfiguration and for everything else we use the ENV_FLINK_PLUGINS_DIR?
|
Would appreciate this change making it in. It's pretty involved to be able to safely use e.g. flink-gs-fs-hadoop locally. |
|
This PR is being marked as stale since it has not had any activity in the last 180 days. If you are having difficulty finding a reviewer, please reach out to the [community](https://flink.apache.org/what-is-flink/community/). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 90 days, it will be automatically closed. |
|
This PR has been closed since it has not had any activity in 120 days. |
What is the purpose of the change
Before this change, Flink's configuration where not passed to the
filesytems if used in LocalStreamEnvironment. This may lead to i.e.
missing credentials. This change initializes the filesytems through the
pluginloader which forwards the configuration as it is done when
running a real cluster.
Brief change log
Build the pluginloader in the Minicluster and initialize the filesystems with it.
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation