Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31839][filesystems] Fix flink-s3-fs-hadoop and flink-s3-fs-presto plugin collision #22420

Merged
merged 1 commit into from
Apr 19, 2023

Conversation

gaborgsomogyi
Copy link
Contributor

What is the purpose of the change

At the moment it's not possible to add flink-s3-fs-hadoop and flink-s3-fs-presto plugins at the same time.

Brief change log

  • Now both the plugins are having a provider with a different name
  • Fixed bad error message in the DefaultDelegationTokenManager
  • Added a warning message when multiple providers are loaded with the same prefix.

Verifying this change

Existing + additional unit tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: yes

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@gaborgsomogyi
Copy link
Contributor Author

cc @mbalassi @gyfora

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 18, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@mbalassi
Copy link
Contributor

Let us make sure to backport this to 1.17.1, @gaborgsomogyi.

@gaborgsomogyi
Copy link
Contributor Author

OK, when the PR is approved and merged I'm intended to create a PR against 1.17.

@MartijnVisser
Copy link
Contributor

Let us make sure to backport this to 1.17.1, @gaborgsomogyi.

I've bumped the ticket to a Critical as well, given that the impact is likely to be quite big. We should consider thinking about creating a 1.17.1 soon

@gaborgsomogyi
Copy link
Contributor Author

There is a workaround if no tokens are expected: security.delegation.token.provider.s3.enabled=false

if (!providerPrefixes.add(split[0])) {
String msg =
String.format(
"Multiple providers loaded with the same prefix: %s", split[0]);
Copy link
Contributor

@mbalassi mbalassi Apr 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us add a bit more context to this, I like this message as the beginning but let us also add:
This might lead to unintended consequences, please consider using only one of them.

We should also add some content to the flink docs about these unintended consequences. :)

Copy link
Contributor

@mbalassi mbalassi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from mine minor comment above for improving the log message this looks good to me. I have built locally with adding both the s3-hadoop and s3-presto jars as plugins (I assume same happened to the user that issued the report):

2023-04-19 09:26:08,635 INFO  org.apache.flink.core.plugin.DefaultPluginManager            [] - Plugin loader with ID found, reusing it: s3-hadoop
2023-04-19 09:26:08,635 INFO  org.apache.flink.core.plugin.DefaultPluginManager            [] - Plugin loader with ID found, reusing it: s3-presto
2023-04-19 09:26:08,635 INFO  org.apache.flink.core.plugin.DefaultPluginManager            [] - Plugin loader with ID found, reusing it: metrics-datadog
2023-04-19 09:26:08,637 INFO  org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - Delegation token receiver s3-hadoop loaded and initialized
2023-04-19 09:26:08,637 INFO  org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - Delegation token receiver s3-presto loaded and initialized
2023-04-19 09:26:08,637 INFO  org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - Delegation token receivers loaded successfully
2023-04-19 09:26:08,637 INFO  org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - Checking provider and receiver instances consistency
2023-04-19 09:26:08,637 INFO  org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - Provider and receiver instances are consistent
2023-04-19 09:26:08,637 WARN  org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - Multiple providers loaded with the same prefix: s3

As the change outlines this is usually not a recommended scenario for the users, but this improvement makes it possible only showing a warning instead of failing.

@gaborgsomogyi
Copy link
Contributor Author

In tests and example execution it's fine to add all the plugins to the classpath just for testing purposes but discouraged to do it in production because of the written down unintended consequences.

@gaborgsomogyi
Copy link
Contributor Author

I've also seen patterns that users are just adding plugins to the main classpath of the job as normal maven/gradle dependency which is also bad practice. Such cases the plugins are loaded by the Flink's main classloader which can cause dependency issues easily. Just for the record the good example can be found here how a plugin must be added in a proper way.

@gaborgsomogyi
Copy link
Contributor Author

If one wants to execute job source code locally together with plugins then I suggest the following:

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        ...
        PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
        FileSystem.initialize(configuration, pluginManager);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        ...
    }

This loads the plugins in separate classloader.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants