Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7442] Add option for using child-first classloader for loading user code #4554

Closed

Conversation

aljoscha
Copy link
Contributor

What is the purpose of the change

This PR introduces a new core option (classloader.resolve-order: child-first) that allows using a child-first class loader for user code. The default is still to use a parent-first class loader.

This also does a minor refactoring in the way the blob manager retrieves the cleanup interval. It's now also read from the Configuration, since we already have the Configuration for the class loader settings.

Brief change log

  • Introduce new option
  • Pass Configuration thought to all places where we previously created a user class loader
  • Instantiate correct class loader based on config

Verifying this change

This PR introduces new end-to-end tests that verify the new feature in a complete Flink workflow, including starting the program using bin/flink run.

Does this pull request potentially affect one of the following parts:

This affects class loader, which is quite important to get right.

Documentation

  • the new flag is documented in the config documentation

@aljoscha
Copy link
Contributor Author

@StephanEwen Could you please have a look since this ClassLoader business is quite important to get right.

@StephanEwen
Copy link
Contributor

I took only a very brief look at this, but I am not totally sure whether the ChildFirstClassLoader implementation is actually correct. Even if it is, it seems to do redundant work, like looking at the URLs twice (in the findClass(name); call and the super.loadClass(name, resolve); call).

We have a working version of a ChildFirstClassLoader here, why not use that? Is that implementation suboptimal?

We should also never reference the System Classloader directly, it breaks all embedded setups or service architecture (OSGI) setups where you end up with hierarchical class loaders.

My feeling is also that this does many changes that may not be necessary, like change the setup of the client, packaged program, etc. Passing the configuration through everything makes this change rather involved.

I was wondering if it is not sufficient to simply let the TaskManager pass this as a flag to the library cache manager. Then you would not need to pass configs everywhere - the only ever config access is by the TaskManager or Task when it creates the classloader, and the config is available there anyways.

Concerning class loader setup on the client - not sure if we should change this in the same PR. This is probably much less critical (the main method does not instantiate many of the dependencies) and that part changes so heavily with flip-6 already. Various setups may not even have separate classloaders on the client anyways, but everything is in the app class loader there.

@aljoscha
Copy link
Contributor Author

aljoscha commented Aug 18, 2017

The system class loader stuff was leftover from an earlier version, I didn't mean to have that in there.

In the first version I copied the ChildFirstClassLoader from


However, when I added the end-to-end tests I noticed that Akka was not working correctly anymore because it couldn't find some configuration stuff. (I'm running that again so that I can post the actual error messages)

I also changed the class loader code on the client because I thought people might be just as likely to use their clashing dependencies there as in operators. In that case they would suffer from the same problems. We're, after all, also creating a user-code class loader here. I'm happy to be convinced otherwise, though.

@aljoscha
Copy link
Contributor Author

This is the exception I got with the class loader from https://github.com/apache/flink/blob/fa11845b926f8371e9cee47775ca0e48176b686e/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java:

org.apache.flink.client.program.ProgramInvocationException: Could not start the ActorSystem needed to talk to the JobManager.
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:462)
	at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
	at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
	at org.apache.flink.streaming.tests.ClassLoaderTestProgram.main(ClassLoaderTestProgram.java:95)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:538)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:430)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840)
	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285)
	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088)
	at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135)
	at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132)
	at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.util.FlinkException: Could not start the ActorSystem lazily.
	at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:235)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:460)
	... 22 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version'
	at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
	at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
	at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
	at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
	at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
	at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:206)
	at akka.actor.ActorSystem$Settings.<init>(ActorSystem.scala:169)
	at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:505)
	at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
	at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
	at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
	at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:106)
	at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:94)
	at org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
	at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:231)
	... 23 more

This is on the client but just seeing it makes me believe that that class loader could cause problems.

@StephanEwen
Copy link
Contributor

Ah, I see - that is probably related to reading resources from a jar file (like the config), methods like getResourceAsStream().
I can see that the "delegate to parent after child" lookup logic might not quite work for that, yes.

I think that particular bug you encountered there is almost an argument to not change the logic on the client, yet. Since we would run the entire Flink code through that classloader as well, the implications are trickier than in the runtime task, where we only instantiate the user code functions.

@aljoscha
Copy link
Contributor Author

Probably, yes. I also don't know what other stuff this could break, though. For example, I don't know if queryable state, which starts netty (and Akka) stuff will still work with this. Unfortunately we don't have any end-to-end tests for that.

@StephanEwen
Copy link
Contributor

I think what you mentioned is one more reason to not use this in too many places for now, but only inside the TaskManager / Tasks. Let's introduce that as a tool that users can use to resolve conflicts and gather some feedback before we pull that into client / queryableStateClient / etc...

@aljoscha
Copy link
Contributor Author

Yes, I created an alternative PR for that: #4564

My only worry is that all the tests in that PR would also pass with the other child-first class loader implementation from the RocksDB test, meaning that we don't actually have coverage for something that I discovered by having the class loader in the client. If we're fine with that I will close this PR and merge the other one once it's reviewed. 👌

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants