New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10925] Load user-provided jars in CalcFn. #13835
Conversation
Note that it is possible for one CalcFn to require multiple jars, so for now it is the user's responsibility to ensure there are no conflicts. This is no different from user-provided jars in the Java SDK. This adds an optional jar path field to ScalarFunctionImpl. This field will be populated in a later PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me it is ok to have test cases when the jar path field is populated in another PR.
private static final Map<String, FunctionDefinitions> functionCache = new HashMap<>(); | ||
|
||
/** Maps potentially remote jar paths to their local file copies. */ | ||
private static final Map<String, File> jarCache = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To confirm: for the created random files, as I recall they will live while the JVM is alive?
Just to confirm that there will not be the case the there is a entry in this jarCache but the jar file does not exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can only think of a couple ways the cache could contain handles to nonexistent files:
- If the file was deleted somehow.
- If the cache was serialized and shipped to workers. But we don't include a JavaUdfLoader instance as part of CalcFn, instead we construct a new JavaUdfLoader instance in
CalcFn::setup
. Besides, the cache is a static variable, and static variables are never serialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
I see there are IOException thrown when loading jars. As long as there are meaningful exception are caught, at least we will have exceptions in log.
if (!jarPaths.isEmpty()) { | ||
JavaUdfLoader udfLoader = new JavaUdfLoader(); | ||
ClassLoader classLoader = udfLoader.createClassLoader(jarPaths); | ||
se.setParentClassLoader(classLoader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the java doc I cannot really tell what this setParentClassLoader
behave: https://janino-compiler.github.io/janino/apidocs/org/codehaus/janino/ScriptEvaluator.html#setParentClassLoader(java.lang.ClassLoader)
Does it use classLoader
to replace the default one or combine with the default one? Basically I guess to make ScriptEvaluator
work functionally, it needs to know both Beam and UDF implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new classloader replaces the default one, but that is okay. The classloader we construct does not only contain the UDF jars. It also has a parent classloader, which is the same as the default one.
run sql postcommit |
Run SQL PostCommit |
There we go, Jenkins tests didn't trigger for some reason. |
List<String> jarPaths = new ArrayList<>(); | ||
for (RexNode node : program.getExprList()) { | ||
if (node instanceof RexCall) { | ||
SqlOperator op = ((RexCall) node).op; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I am not sure whether there could be nested call case here (i.e. one of the operands are actually a RexCall). If there is, maybe add a TODO or a check for such case.
We don't need to handle nested call in this PR. The solution for that will be complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A RexNode can only refer to an input or previous RexNode in the list returned by getExprList
, so that is handled here.
https://github.com/apache/calcite/blob/3d13846a13398a1ba6c1fa84a7d0c0cc543f23d4/core/src/main/java/org/apache/calcite/rex/RexProgram.java#L80-L85
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
public ClassLoader createClassLoader(List<String> inputJarPaths) throws IOException { | ||
List<File> localJars = new ArrayList<>(); | ||
for (String inputJar : inputJarPaths) { | ||
localJars.add(getLocalJar(inputJar)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't you merge these into a single loop?
urls.add(getLocalJar(inputJar).toURI().toURL())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, done. (I originally tried to do this the functional way using streams, but Java doesn't really let you throw checked exceptions from lambdas. 😢 )
List<String> jarPaths = new ArrayList<>(); | ||
for (RexNode node : program.getExprList()) { | ||
if (node instanceof RexCall) { | ||
SqlOperator op = ((RexCall) node).op; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A RexNode can only refer to an input or previous RexNode in the list returned by getExprList
, so that is handled here.
https://github.com/apache/calcite/blob/3d13846a13398a1ba6c1fa84a7d0c0cc543f23d4/core/src/main/java/org/apache/calcite/rex/RexProgram.java#L80-L85
calcFn.compile(); | ||
try { | ||
calcFn.compile(); | ||
} catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: possibly handle this inside compile()
so it doesn't expose an IOException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -274,6 +298,25 @@ public void processElement(ProcessContext c) { | |||
} | |||
} | |||
|
|||
private static List<String> getJarPaths(RexProgram program) { | |||
List<String> jarPaths = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ImmutableList.Builder
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
I don't think this code will ever actually be invoked by unprivileged code, but this makes spotbugs happy.
This is required to pass the jar path to BeamCalcRel (see apache#13835).
This is required to pass the jar path to BeamCalcRel (see apache#13835).
Note that it is possible for one CalcFn to require multiple jars, so for
now it is the user's responsibility to ensure there are no conflicts.
This is no different from user-provided jars in the Java SDK.
This adds an optional jar path field to ScalarFunctionImpl. This field
will be populated in a later PR.
R: @amaliujia @apilloud
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.