Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-24558][API/DataStream]make parent ClassLoader variable which c… #17521

Closed
wants to merge 9 commits into from

Conversation

baisui1981
Copy link

@baisui1981 baisui1981 commented Oct 19, 2021

…reate by ClassLoaderFactory ,relevant issue https://issues.apache.org/jira/projects/FLINK/issues/FLINK-24558

What is the purpose of the change

make server side classLoader which create by BloblibraryCacheManager.DefaultClassLoaderFactory pluggable in able to make the parent classloader of ChildFirstClassLoader variable

Brief change log

  • add a new interface ClassLoaderFactoryBuilder which can be extensible by user,the instance of ClassLoaderFactoryBuilder is instantiable by ServiceLoader
  • add loading phase in DefaultClassLoaderFactory , if a customize ClassLoaderFactoryBuilder instance can be loaded by ServiceLoader ,then the class loading applies will delegate to it.

Verifying this change

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 19, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 4f842b9 (Tue Oct 19 12:03:33 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@tisonkun
Copy link
Member

@baisui1981 checkstyle failed. You can run mvn spotless:apply to fix it.

@tisonkun
Copy link
Member

cc @guoweiM @StephanEwen

Also I'd like to know who can be cc to for API/DataStream related changes now?

@tisonkun
Copy link
Member

cc @AHeise from the mailing list discussion.

@baisui1981
Copy link
Author

checkstyle failed. You can run mvn spotless:apply to fix it.

i have fix it

@AHeise
Copy link
Contributor

AHeise commented Oct 22, 2021

Hey, thank you very much for your contribution.
Could you show (ideally by adding a test case) how your solution is fixing the initial issue? I don't quite understand what exactly you are solving and how.
@zentol could you please also have a look?

Also I'd like to know who can be cc to for API/DataStream related changes now?

You can ping me. But this is not really datastream looking at the approach.

@baisui1981
Copy link
Author

@AHeise , thanks for your reply , by checking of git histories commit logs , Maybe this issue is belong to Command Line Client more appropriate.
@kl0u could you help me review this PR .

@zentol
Copy link
Contributor

zentol commented Oct 27, 2021

I don't think it is a good idea to give users direct access to this part of the code. It just yet again increases the API surface, and for some very important internal thing that we need to be able to change at a whim.

Furthermore, I don't see yet how this would solve the issue at hand. The proposed interface provides no differentiating factor that could be used to create different classloaders for each task (like the Task ID). Even then, the classloader is shared across different tasks running on the same TM, so it must behave the same way? Given that they all have access to the same jars, I'm curious how the behavior is supposed to be different in the first place.

All in all, I think this needs way more discussion.

@baisui1981
Copy link
Author

baisui1981 commented Oct 28, 2021

I don't think it is a good idea to give users direct access to this part of the code. It just yet again increases the API surface, and for some very important internal thing that we need to be able to change at a whim.

Furthermore, I don't see yet how this would solve the issue at hand. The proposed interface provides no differentiating factor that could be used to create different classloaders for each task (like the Task ID). Even then, the classloader is shared across different tasks running on the same TM, so it must behave the same way? Given that they all have access to the same jars, I'm curious how the behavior is supposed to be different in the first place.

All in all, I think this needs way more discussion.

hi @zentol thanks for your reply, We are building a data center product based on flink, expecting to integrate various third-party components which is provided as flink SourceFunction(like various source cdc connectors ) and SinkFunction(like elasticsearch7) and so on, then a new problem is always unavoidable as described in the FLINK-24558
to solve this problem, i am intend to add an extend point for customized org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.ClassLoaderFactory ,the main motivation is making the parent class can be variable to my designated parent classLoader, the parent classloader can be delegated to load clientSide class which is wrapped in multiple plugin bundle

in my project I have written a class TISFlinClassLoaderFactory by extend with new introduced interface of ClassLoaderFactoryBuilder and write an service meta configuration ,and then , make a package of the artifact, put it in dir $FLINK_HOME/lib

The proposed interface provides no differentiating factor that could be used to create different classloaders for each task (like the Task ID)

thanks for @zentol your reminding , maybe I shall make the parent classloader unchangeable, because as you say it will shared across different tasks running on the same TM, Instead , extend directly from FlinkUserCodeClassLoader would be better, example: TISChildFirstClassLoader

plugin inventory (differentiating factor that could be used to create different classloaders) is store in jar manifest , that submit from flink clientSide as param libraryURLs. On serverSide extract the plugin inventory by parse the jar manifest, and then take the plugin inventory as param to initialize the PluginManager for creating the uberClassloader .
TISFlinClassLoaderFactory.java

    public BlobLibraryCacheManager.ClassLoaderFactory buildServerLoaderFactory(
            FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder
            , String[] alwaysParentFirstPatterns, @Nullable Consumer<Throwable> exceptionHander, boolean checkClassLoaderLeak) {

        return new BlobLibraryCacheManager.DefaultClassLoaderFactory(classLoaderResolveOrder
                , alwaysParentFirstPatterns, exceptionHander, checkClassLoaderLeak) {

            @Override
            public URLClassLoader createClassLoader(URL[] libraryURLs) {

                try {
                    PluginManager pluginManager = TIS.get().getPluginManager();
                    if (libraryURLs.length != 1) {
                        throw new IllegalStateException("length of libraryURLs must be 1 , but now is:" + libraryURLs.length);
                    }
                    for (URL lib : libraryURLs) {
                        try (JarInputStream jarReader = new JarInputStream(lib.openStream())) {
                            Manifest manifest = jarReader.getManifest();
                            Attributes pluginInventory = manifest.getAttributes("plugin_inventory");
                            if (pluginInventory == null) {
                                throw new IllegalStateException("plugin inventory can not be empty in lib:" + lib);
                            }
                            for (Map.Entry<Object, Object> pluginDesc : pluginInventory.entrySet()) {
                                pluginManager.dynamicLoadPlugin(String.valueOf(pluginDesc.getKey()));
                            }
                        }
                    }
                    return new TISChildFirstClassLoader(pluginManager.uberClassLoader, libraryURLs, this.getParentClassLoader()
                            , this.alwaysParentFirstPatterns, this.classLoadingExceptionHandler);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

@zentol how about it , give me some suggestions ,thanks

@zentol
Copy link
Contributor

zentol commented Oct 28, 2021

But the user-jar is the same for all tasks. You can't use a manifest in said jar for differentiating which classes should now be loaded / made accessible. You either a) need the CL factory to create a task-specific classloader (which you can't because jars are the same and CLs are shared) or b) need the cooperation from the user-code to load classes in a specific way.

We are painfully aware of the dependency conflicts between connectors; there are some ideas floating around to make connectors themselves work as plugins such that they each have their own classloader, but I'm not sure where exactly we are at on that front. I think we'd rather focus on that than introducing a separate mechanism for effectively the same use-case that would also be more difficult to use.

@AHeise
Copy link
Contributor

AHeise commented Oct 28, 2021

I must admit that I still haven't fully understood the use case and the solution.

Each application bundles the connectors. Say application A is using Kafka 2.4 and application B is using Kafka 2.6. Now I can run both applications in the same cluster without any issues afaik, as both are executed in separate class loaders. That's the state of Flink since a couple of versions already.

With your proposal, application A may now use multiple classloaders but it can still only bundle one Kafka version. So I don't see the added value. We would need to add something like a jar in jar approach, where several Kafka versions are bundled. That could be implemented in the application jar entirely.

Then the question is: is it worth it? How often does one specific application have conflicting connector versions? For most use cases, it does not seem to matter. There are very large Flink cluster installation, where the current approach seems to be sufficient.

I'm probably overlooking something basic here.

@baisui1981
Copy link
Author

But the user-jar is the same for all tasks. You can't use a manifest in said jar for differentiating which classes should now be loaded / made accessible. You either a) need the CL factory to create a task-specific classloader (which you can't because jars are the same and CLs are shared) or b) need the cooperation from the user-code to load classes in a specific way.

@zentol Please forgive me for my negligence, I didn’t explain the process clearly.
Before submit the job to Flink cluster, will package a jar dynamically, at then,the metadata info for the manifest which is relevant to the job will be put in the jar togother:

code as below FlinkTaskNodeController.java

Manifest manifest = new Manifest();
Map<String, Attributes> entries = manifest.getEntries();
Attributes attrs = new Attributes();
attrs.put(new Attributes.Name(collection.getName()), String.valueOf(timestamp));
 // put Flink job name
entries.put(TISFlinkCDCStart.TIS_APP_NAME, attrs);

when submit to the server side, in my customized extend point implementation method ClassLoaderFactoryBuilder.buildServerLoaderFactory,will extract the param from the submitted jar manifest, at then pull the plugin bundles from TIS plugin repository with http protocol, and initialize the PluginManager which is responsible for load class .

@baisui1981
Copy link
Author

@zentol @AHeise Thanks to both of you, have already understood me and have understood what I want to express,
We want to build a product similar to Jenkins in the field of bigData, We will package various functions as Plugin as Jenkins Plugin and put it in the repository in advance.

dirive by this mission, The ISSUE mentioned above was found in the process of building this product, Because building a Flink Job which is drived by user defined DSL in a production environment and submitting it is fully automated, if problems such as dependency conflicts between connectors are found in the process, they cannot be solved by manual intervention, so they just can be solved by facility of CL isolation. One of the ways I thought of this kind of problem.

For FLink, only a new extension point is added, which has no effect on the existing functions of flink. For the OCP principle, I think this is a good way to implement it. And that will Added a new implementation option for users like me

@AHeise
Copy link
Contributor

AHeise commented Nov 3, 2021

Sorry for coming back so late: I still don't quite understand how the flow is supposed to look like.

Let's say I have a user.jar that through DSL depends on flink-kafka and flink-kinesis, both of them use incompatible guava. Now with your approach, the same user.jar is loaded through 2 classloaders to access kafka and kinesis. Did I get that right?

What I fail to understand is how the incompatible version of Guava are put into the user.jar. If I just shade without relocation then one Guava version simply wins by overwriting the files. So you need to relocate Guava into kafka.guava and kinesis.guava. But at that point, I don't see the need for separate CL anymore.

So I'm missing a piece of information here.

@baisui1981
Copy link
Author

baisui1981 commented Nov 7, 2021

thanks @AHeise for your reply, Sorry, I have not explained a detail before。For better illustration, I drew a diagram as below:
http://qlangtech.oss-us-west-1.aliyuncs.com/Flink%20ClassLoader.png

There is a prerequisite for the processing flow. We need to build a plug-in repository based on the http protocol. The user needs to deploy the plugin to the warehouse in advance

there are four steps in the process:

  1. build two classloader which is respect to the plugin bundle name ‘ kafka’ and ‘ kinesis’ . specially the jars relevant to the plugin is store in the remote plugin-warehouse. the gava with different version as you have mentioned will be loaded separately in different URLClassloader,So there will be no class conflict

  2. compile the client side code and make a package which is named as 'user.jar' and submit to Flink Cluster. the 'user.jar' is compose of two component, one is classes and another is plugin meta such as 'kafka' and 'kinesis' is stored in the jar manifest

    Regarding a question you raised:

    What I fail to understand is how the incompatible version of Guava are put into the user.jar.

    Actually,Guava artifact is not attached in 'user.jar' whereas it is store in the remote plugin warehouse in advance

  3. when flink server side received client apply , create instance of TISClientFirstClassLoader which is extended from FlinkUserCodeClassLoader, and this instance is response for loading class which is relevant to the plugin bundle of 'kafka' and 'kinesis'. In the TISClientFirstClassLoader will build a classloader array from 'kafka' and 'kinesis' which is parsed from user.jar manifest, code as below:

    public TISClientFirstClassLoader(String[] plugins){
      this.tisClassLoaders = new URLClassLoader[plugins.length];
      int i = 0;
      for(String plugin : plugins){
       this.tisClassLoaders[i++] = createChildClassLoader(plugin);
      }
    }
    
    private URLClassLoader createChildClassLoader(String plugin){
    // get the urls which is belong to the plugin  
    URL[] urls = ....;
     return new URLClassLoader(urls);
    }
  4. TISClientFirstClassLoader is response for loading class which is defined at client side, by the classloader array which have just created in the 3 step. code as below:

    public Class<?> findClass(String name) throws ClassNotFoundException {
       for (ClassLoader loader :  tisClassLoaders) {
       try {
           return loader.loadClass(name);       
        } catch (ClassNotFoundException e) {             // not found. try next        }   
     }
     return null;
    }

@zentol
Copy link
Contributor

zentol commented Nov 8, 2021

I'm sorry, but that implementation sounds really unsafe and will only work for the simplest of use-cases.
Let's say you have 2 of your plugin jars that both bundle the same dependency with incompatible version (let's say guava), and said dependency is used by the user (e.g., because it is exposed in the API of the connector).

It's now a coin toss as to whether the correct guava version will be loaded.

Actually, if we take an application cluster, it is equivalent to just adding the connectors to lib/, and in terms of classloader isolation, is generally also equivalent to just bundling everything in the user-jar.

These kind of problems are exactly why we don't want to expose this to users.

@baisui1981
Copy link
Author

I'm sorry, but that implementation sounds really unsafe and will only work for the simplest of use-cases. Let's say you have 2 of your plugin jars that both bundle the same dependency with incompatible version (let's say guava), and said dependency is used by the user (e.g., because it is exposed in the API of the connector).

It's now a coin toss as to whether the correct guava version will be loaded.

@zentol ,I must admit that this situation exists, but whether this is a security consideration can be left to the Flink Developer to judge? And i believe that this kind of problem can be circumvented by some means

For flink, all need to do is to add an Extension Point. When the user does not extend this Extension Point, it has no effect on flink.

@RocMarshal
Copy link
Contributor

RocMarshal commented Dec 3, 2021

Hi, @baisui1981 , Would you mind checking the CI result 'src/main/java/org/apache/flink/runtime/execution/librarycache/ClassLoaderFactoryBuilder.java:[22,8] (imports) UnusedImports: Unused import: org.apache.flink.runtime.rpc.FatalErrorHandler.' ? Thx.
or run mvn spotless:apply in flink directory.

@baisui1981
Copy link
Author

Hi, @baisui1981 , Would you mind checking the CI result 'src/main/java/org/apache/flink/runtime/execution/librarycache/ClassLoaderFactoryBuilder.java:[22,8] (imports) UnusedImports: Unused import: org.apache.flink.runtime.rpc.FatalErrorHandler.' ? Thx. or run mvn spotless:apply in flink directory.

@RocMarshal thanks for your remind , I'm sorry for having not remove this unused import class from carelessness. I have fixed it.

@RocMarshal
Copy link
Contributor

@baisui1981 Thanks for the update.
Please run mvn spotless:apply in the flink root directory for fixing the checkstyle.
In my limited read, would you mind introducing some test cases for the change ?

@baisui1981
Copy link
Author

@baisui1981 Thanks for the update. Please run mvn spotless:apply in the flink root directory for fixing the checkstyle. In my limited read, would you mind introducing some test cases for the change ?

thanks for your remind,I will add some test cases for the change and fix the checkstyle erros.

@baisui1981
Copy link
Author

I have make the fixes done,thanks for your reviews @RocMarshal

@baisui1981
Copy link
Author

@RocMarshal @zentol @AHeise PTAL thx

@baisui1981
Copy link
Author

@RocMarshal @zentol @AHeise PTAL thx

1 similar comment
@baisui1981
Copy link
Author

@RocMarshal @zentol @AHeise PTAL thx

@baisui1981
Copy link
Author

@RocMarshal @zentol @AHeise could you please give me some suggestion ,how to continue processing for this PR?

3 similar comments
@baisui1981
Copy link
Author

@RocMarshal @zentol @AHeise could you please give me some suggestion ,how to continue processing for this PR?

@baisui1981
Copy link
Author

@RocMarshal @zentol @AHeise could you please give me some suggestion ,how to continue processing for this PR?

@baisui1981
Copy link
Author

@RocMarshal @zentol @AHeise could you please give me some suggestion ,how to continue processing for this PR?

@zentol
Copy link
Contributor

zentol commented Nov 11, 2022

Will not be supported,

@zentol zentol closed this Nov 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants