Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10925] Load SQL UDFs from jar. #13629

Merged
merged 4 commits into from Jan 26, 2021
Merged

[BEAM-10925] Load SQL UDFs from jar. #13629

merged 4 commits into from Jan 26, 2021

Conversation

ibzib
Copy link
Contributor

@ibzib ibzib commented Dec 29, 2020

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@ibzib
Copy link
Contributor Author

ibzib commented Dec 29, 2020

R: @amaliujia @apilloud

There should be ~1 more PR after this to finish the implementation of Java scalar UDFs (minus calc splitting).

@codecov
Copy link

codecov bot commented Dec 29, 2020

Codecov Report

Merging #13629 (1fbf45c) into master (7ef8bd1) will increase coverage by 0.04%.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #13629      +/-   ##
==========================================
+ Coverage   82.73%   82.77%   +0.04%     
==========================================
  Files         466      466              
  Lines       57514    57558      +44     
==========================================
+ Hits        47584    47645      +61     
+ Misses       9930     9913      -17     
Impacted Files Coverage Δ
sdks/python/apache_beam/io/kafka.py 80.76% <0.00%> (-4.95%) ⬇️
sdks/python/apache_beam/internal/metrics/metric.py 86.45% <0.00%> (-1.05%) ⬇️
sdks/python/apache_beam/io/localfilesystem.py 91.66% <0.00%> (-0.76%) ⬇️
sdks/python/apache_beam/io/iobase.py 84.81% <0.00%> (-0.27%) ⬇️
...ks/python/apache_beam/runners/worker/sdk_worker.py 89.69% <0.00%> (-0.16%) ⬇️
sdks/python/apache_beam/io/gcp/bigquery.py 75.11% <0.00%> (-0.16%) ⬇️
...hon/apache_beam/runners/worker/bundle_processor.py 93.70% <0.00%> (-0.13%) ⬇️
setup.py 0.00% <0.00%> (ø)
...dks/python/apache_beam/options/pipeline_options.py 94.60% <0.00%> (ø)
.../python/apache_beam/portability/api/metrics_pb2.py 100.00% <0.00%> (ø)
... and 10 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7ef8bd1...1fbf45c. Read the comment docs.

@amaliujia
Copy link
Contributor

Thanks! This is a relatively large PR so will spend more time to review tomorrow.

@apilloud
Copy link
Member

I'm about to go on vacation, but I'll try to give this a quick pass tonight. It looks like nothing in here is ZetaSQL specific? If that is the case, you should move this to core SQL. Otherwise, how is this going to work for Calcite?

@ibzib
Copy link
Contributor Author

ibzib commented Dec 29, 2020

It looks like nothing in here is ZetaSQL specific? If that is the case, you should move this to core SQL.

Done.

Since JavaUdfLoader should be the only consumer anyway.
File tmpJar = downloadFile(inputJarPath, "application/java-archive");
// Set the thread's context class loader so that the jar can be staged by the runner.
Thread.currentThread()
.setContextClassLoader(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've leaked the class loader here. setContextClassLoader should always be inside a try finally block that resets the class loader. For example:

final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intended to leave UDF provider jars on the class loader here, so that later, in the DF SQL launcher, we can use the class loader to detect which files need to be staged to the runner.

If this way is unsafe for some reason, we will have to come up with a different way to pass jars from JavaUdfLoader into pipeline options. The only other ways I could think of are:

  1. Passing jars all the way up through the stack to SqlTransform, which will require changing SqlTransform and every other API surface in between.
  2. Handling jars (independently from SqlTransform construction) a second time in the SQL launcher. In other words, parse the query text again to extract the UDF jars, download the UDF jars a second time, and add them to filesToStage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not resetting the ContextClassLoader after your method returns is unsafe. You need to find another way to communicate the classes upstream. Our current UDF loading happens in SqlTransform.expand(), seems like this should be called from there too. So option 1 should be trivial.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I will look into option 1 and probably implement it in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 1 would require several API changes:

  1. Add UDF provider jars to BeamRelNode.
  2. Either change the signature of ZetaSQLPlannerImpl::rel (currently returns Calcite RelRoot) or give ZetaSQLPlannerImpl a mutable field to hold UDF provider jars.
  3. Give SqlTransform a mutable field to hold UDF provider jars.

Of these, I'm most concerned with 3. Should PTransforms be mutable? I couldn't find anything in the Javadoc that explicitly warned against it, but mutability seems like something we should avoid if we can help it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've lost all isolation of the JARs on the worker, filesToStage is in the default classpath. I think you need to plumb the jar path all the way down to BeamCalcRel, and that is the layer at which you'll need to stage the jar and load it on the worker.

Copy link
Contributor

@amaliujia amaliujia Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO The third option is the one that is easy to implement and without obvious drawbacks.

I think the third option says to specify a known directory on the launcher VM or local machine (so not on workers, thus does not affect jars isolation on workers). We might can control that directory by a pipeline option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the options I listed were to address the question: how should we pass jars from JavaUdfLoader to the filesToStage pipeline option. Andrew's concern is bigger and applies to all three options: putting all the UDF jars in filesToStage puts them all on the classpath for workers. UDF jars are only needed when compiling the generated code in CalcFn, so they should not be on the classpath except for that.

Copy link
Contributor

@amaliujia amaliujia Jan 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I recall we have discussed or at least mentioned such topic offline.

For Java SDK, user's UDF is put into the classpath without an isolation. It won't be a regression for SQL if we follow up the same rule.

We could do better for the SQL case as we know the user defined scalar function will be executed in CalcFn, but do we need to handle it at this moment? Is it a required thing to launch the first version of UDF? IMO because SQL will be aligned with Java's practice thus it is not required.

A further thought on jar isolation, will solution for the isolation in CalcFn for user-defined scalar func too specific. Will it better to have a separate discussion to cover also UDAF and UDTVF thus we can achieve the isolation on all types of UDF by a same solution?

Copy link
Contributor Author

@ibzib ibzib Jan 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Java SDK, user's UDF is put into the classpath without an isolation. It won't be a regression for SQL if we follow up the same rule.

One difference is that in the Java SDK, the user is given direct control of the classpath via filesToStage. But when we construct a SQL pipeline for them, the user does not have direct control of the classpath. The API we show to users does not expose how their Java transforms are fused into a single pipeline, nor should it. So isolating the classpath for each CalcFn better matches the API and hides complexity from the user.

A further thought on jar isolation, will solution for the isolation in CalcFn for user-defined scalar func too specific. Will it better to have a separate discussion to cover also UDAF and UDTVF thus we can achieve the isolation on all types of UDF by a same solution?

That's a good question. Isolating the CalcFn is a nice solution for scalar functions, but I'm not sure how we could isolate UDAF since CombineFn is more abstract than DoFn and aggregates aren't compiled separately like CalcFn.

* Maps the external jar location to the functions the jar defines. Static so it can persist
* across multiple SQL transforms.
*/
private static final Map<String, FunctionDefinitions> cache = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is safe as is. It will probably work for most cases, but I think you might want to key it on original class loader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment about users creating their own class loaders above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still can't imagine exactly how the user might break this by setting class loaders. I think we might just have to accept some level of risk here, since messing with the context class loader seems to be inherently kind of dangerous anyway.

Copy link
Member

@apilloud apilloud left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM minus the few class loader issues. Only the try-finally block is something that must be fixed before this goes in.

Copy link
Contributor

@amaliujia amaliujia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

After reading through this PR, as I recall there was a bunch of comments related UDF jar loading in the original prototype and those were addressed in #13200 already. So I don't have more comments really.

After existing open comments are addressed, this PR will be good to go.

* Creates a temporary local copy of the file at {@code inputPath}, and returns a handle to the
* local copy.
*/
private File downloadFile(String inputPath, String mimeType) throws IOException {
Copy link
Member

@apilloud apilloud Jan 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method duplicates functionality built into the JVM. It might be possible to just construct a URL directly? If not, here is an example that grabs the path to the temp file from the builtin URL jar cache:
((JarURLConnection)new URL("jar:https://repo1.maven.org/maven2/com/google/api/gax-grpc/1.60.0/gax-grpc-1.60.0-guavashaded.jar!/").openConnection()).getJarFile().getName()

(JarURLConnection also provides a getInputStream method if you still want to log a hash.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer. I'm not sure how GCS permissions work in this case, but it's worth a try.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll have to use the GCS client to make an authenticated read off of GCS, the current code might be useful for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apilloud In order to use JarURLConnection we would have to write some code to transform a String (which is a Beam filesystem resource spec) into a jar URL. We could make the URL could point to either a local copy of the jar, or the original jar, which could be either local or remote.

  • If we make and point to a local copy, the code wouldn't look too much different than it does currently.
  • If we want to point to the original jar, our code would need to know how to convert a resource from any Beam filesystem into a URL. While that's possible, it would basically require adding a getUrl method to ResourceId and implementing it for every implementation of ResourceId.

tl;dr I'm not sure the benefit of using JarURLConnection outweighs the amount of additional code we'd need to write and maintain, since the current implementation supports every Beam filesystem for free.

@ibzib
Copy link
Contributor Author

ibzib commented Jan 26, 2021

Temporarily closing this as I am making some major changes.

@ibzib ibzib closed this Jan 26, 2021
The classloader will need to be set in CalcFn#compile. We will do that
in a subsequent PR.

Also fixes nullability errors.
@ibzib ibzib reopened this Jan 26, 2021
@github-actions
Copy link
Contributor

The Workflow run is cancelling this PR. It is an earlier duplicate of 2173354 run.

@github-actions
Copy link
Contributor

The Workflow run is cancelling this PR. It is an earlier duplicate of 2173354 run.

@ibzib
Copy link
Contributor Author

ibzib commented Jan 26, 2021

@apilloud @amaliujia I changed this to no longer set the context class loader as we discussed. If I understand correctly, the only place we really need the class loader to be set properly is in CalcFn#compile, so we can deal with that in a future PR. PTAL

@ibzib
Copy link
Contributor Author

ibzib commented Jan 26, 2021

Run Java PreCommit

@ibzib
Copy link
Contributor Author

ibzib commented Jan 26, 2021

PythonLint failure is obviously a flake, since there are 0 python changes: BEAM-11540

@amaliujia
Copy link
Contributor

amaliujia commented Jan 26, 2021

Thanks!

+1 to not set the class loader now and address when to set the class loader to execute UDF in another PR.

@ibzib ibzib merged commit 4f43726 into apache:master Jan 26, 2021
@suztomo
Copy link
Contributor

suztomo commented Jan 26, 2021

"Run SQL Postcommit" fails in my PR #13804. @ibzib Do you think this PR is related?

java.lang.AssertionError: System property beam.sql.udf.test.jar_path must be set to run JavaUdfLoaderTest.
	at org.junit.Assert.fail(Assert.java:89)
	at org.apache.beam.sdk.extensions.sql.impl.JavaUdfLoaderTest.setUp(JavaUdfLoaderTest.java:50)

https://ci-beam.apache.org/job/beam_PostCommit_SQL_PR/lastBuild/testReport/junit/org.apache.beam.sdk.extensions.sql.impl/JavaUdfLoaderTest/testLoadScalarFunction_4/

@ibzib
Copy link
Contributor Author

ibzib commented Jan 26, 2021

"Run SQL Postcommit" fails in my PR #13804. @ibzib Do you think this PR is related?

It's definitely related (that test was newly added by this PR). I will look into it.

@suztomo
Copy link
Contributor

suztomo commented Jan 26, 2021

Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants