-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use classloaders to load Java functions #4685
Use classloaders to load Java functions #4685
Conversation
rerun java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just a couple of comments
@@ -131,6 +131,8 @@ | |||
<exclude>io.netty:netty-transport-native-epoll</exclude> | |||
<exclude>io.netty:netty-transport-native-unix-common</exclude> | |||
|
|||
<exclude>org.apache.pulsar:pulsar-functions-runtime-all</exclude> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't the runtime-all module just be deleted at this point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of having it in the pom is to make sure that the pulsar-functions-runtime-all module is built before this module, which we still need it to
pom.xml
Outdated
@@ -160,7 +160,8 @@ flexible messaging model and an intuitive client API.</description> | |||
<rocksdb.version>5.13.3</rocksdb.version> | |||
<slf4j.version>1.7.25</slf4j.version> | |||
<commons.collections.version>3.2.2</commons.collections.version> | |||
<log4j2.version>2.10.0</log4j2.version> | |||
<log4j2.version>2.11.2</log4j2.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this upgrade required by this PR? In any case, I'd leave it separate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there is a bug related to classloading in previous version
try { | ||
this.sink.write(new SinkRecord<>(srcRecord, output)); | ||
} catch (Exception e) { | ||
log.info("Encountered exception in sink write: ", e); | ||
stats.incrSinkExceptions(e); | ||
throw new RuntimeException(e); | ||
} | ||
Thread.currentThread().setContextClassLoader(frameworkClassLoader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be done in a finally {}
block, otherwise the classloader is not reset after the runtime exception is thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
if (sourceSpec.getConfigs().isEmpty()) { | ||
this.source.open(new HashMap<>(), contextImpl); | ||
} else { | ||
this.source.open(new Gson().fromJson(sourceSpec.getConfigs(), | ||
new TypeToken<Map<String, Object>>(){}.getType()), contextImpl); | ||
} | ||
Thread.currentThread().setContextClassLoader(this.frameworkClassLoader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, we should use try / finally
to make sure it goes back to frameworkClassLoader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
…ctor_function_classloading
rerun integration tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
rerun integration tests |
rerun cpp tests |
rerun integration tests |
rerun java8 tests |
2 similar comments
rerun java8 tests |
rerun java8 tests |
return (Class<T>) DefaultImplementation.class.getClassLoader().loadClass(className); | ||
try { | ||
// when the API is loaded in the same classloader as the impl | ||
return (Class<T>) DefaultImplementation.class.getClassLoader().loadClass(className); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just use Thread.currentThread().getContextClassLoader().loadClass(className);
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there can be scenario in which pulsar-client-api and pulsar-client-impl is loaded in a classloader that is NOT the context classloader for whatever reason. For BC purposes lets first check if pulsar-client-impl is loaded in the same classloader as the api and then we can check the context classloader.
This problem was discovered because one of tests in the pulsar-kafka-compat module failed due do powermock loading pulsar-client-api and impl in the powermock classloader and not the context/application classloader.
In theory a user can do that as well.
@@ -62,6 +62,14 @@ | |||
</exclusion> | |||
</exclusions> | |||
</dependency> | |||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are the changes for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These imports were missing for the pulsar-client-kafka module. A test in this module was failing because of a classloader powermock issue but I could not compile this module by itself without these dependencies. Somehow there is no issue if you compile the whole pulsar project.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if so, can you do it in a separated pull request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
@@ -136,4 +138,22 @@ public static String getDefaultSubscriptionName(Function.FunctionDetails functio | |||
properties.put("instance_id", String.valueOf(instanceId)); | |||
return properties; | |||
} | |||
|
|||
public static boolean isAssignable(Class from, Class to) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
who is using this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this
@@ -22,9 +22,11 @@ | |||
import lombok.Getter; | |||
import lombok.extern.slf4j.Slf4j; | |||
|
|||
import org.apache.pulsar.functions.api.Context; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are these two imports for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was for debugging but not needed anymore
@@ -132,6 +130,9 @@ | |||
|
|||
private final Map<String, String> properties; | |||
|
|||
private final ClassLoader frameworkClassLoader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't use framework
elsewhere. shall we just call it instanceClassLoader
or runtimeClassLoader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can call it runtimeClassLoader
@@ -425,23 +432,33 @@ private void processResult(Record srcRecord, | |||
} | |||
|
|||
private void sendOutputMessage(Record srcRecord, Object output) { | |||
if (!(this.sink instanceof PulsarSink)) { | |||
Thread.currentThread().setContextClassLoader(functionClassLoader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you benchmarked this? since this is done per message basis. I see this has been used in quite a lot of places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I have benchmarked setting and getting the context class loader. It takes negligible time (in the order on tens of nano seconds). If you look that the implementation, the code is only setting a pointer or returning a pointer
|
||
// Get classpath for framework | ||
String framework_classpath = System.getProperty(FUNCTIONS_FRAMEWORK_CLASSPATH); | ||
assert framework_classpath != null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use assert. use checkArgument
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that uses guava. I don't want to pull in that dependency at this point. This class should have minimum dependencies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you can't pull in guava, then just throw IllegalArgument runtime exception if framework_class is not. because assert
doesn't always work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
*/ | ||
public class JavaInstanceMain { | ||
|
||
private static final String FUNCTIONS_FRAMEWORK_CLASSPATH = "pulsar.functions.framework.classpath"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change "framework"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
…/incubator-pulsar into refactor_function_classloading
@sijie thanks for the review. PTAL |
rerun integration tests |
* Use classloading to load use code for functions
conflict with #5046, mark as 2.4.1 |
* Use classloading to load use code for functions (cherry picked from commit 6ff1bba)
Master Issue: #4277
Motivation
Currently we use a heavily shaded JAR to run Java functions. There are some disadvantages to this approach
Modifications
Instead of using a shaded JAR to start a java function instance, use different classloaders to load the internal pulsar code, user code, and the interfaces that the two interacts with each other.
key changes:
removed all shading from runtime-all module. Now it just produces a small java-instance jar that contains only the interfaces and logging dependencies.
Changed log4j files from yaml format to xml because using yaml format requires additional dependencies.
Please refer to issue for more details. I can also explain in more detail if the changes are unclear