New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10925] Enable user-defined Java scalar functions in ZetaSQL. #13891
Conversation
This completes the proof of concept. Note however that when UDFs and built-in ZetaSQL operators are mixed, the program will crash without warning.
Created a JIRA to track the mixed Java UDF and built-in operator case: https://issues.apache.org/jira/browse/BEAM-11747 |
|
||
ResolvedStatement statement; | ||
ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql); | ||
do { | ||
statement = analyzer.analyzeNextStatement(parseResumeLocation, options, catalog); | ||
if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) { | ||
ResolvedCreateFunctionStmt createFunctionStmt = (ResolvedCreateFunctionStmt) statement; | ||
udfBuilder.put(createFunctionStmt.getNamePath(), createFunctionStmt); | ||
String functionGroup = SqlAnalyzer.getFunctionGroup(createFunctionStmt); | ||
if (SqlAnalyzer.USER_DEFINED_FUNCTIONS.equals(functionGroup)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remind me that whether USER_DEFINED_FUNCTIONS
here refers to SQL-native UDF? If so can you update USER_DEFINED_FUNCTIONS
to USER_DEFINED_SQL_FUNCTIONS
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remind me that whether USER_DEFINED_FUNCTIONS here refers to SQL-native UDF?
yes
If so can you update USER_DEFINED_FUNCTIONS to USER_DEFINED_SQL_FUNCTIONS?
done
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); | ||
BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); | ||
thrown.expect(RuntimeException.class); | ||
thrown.expectMessage("CalcFn failed to evaluate"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: do you know where the exception is thrown?
public static class IncrementFn extends ScalarFn {
@ApplyMethod
public Long increment(Long i) {
return i + 1;
}
}
The increment
seems does not handle NULL
at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
increment throws a NullPointerException which is caught by the script evaluator and then by BeamCalcRel.
Line 291 in 989c317
"CalcFn failed to evaluate: " + processElementBlock, e.getCause()); |
I added assertions to make this test more strict and clear.
...sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
Show resolved
Hide resolved
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); | ||
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); | ||
thrown.expect(UnsupportedOperationException.class); | ||
thrown.expectMessage("Could not compile CalcFn"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I have a hard time to understand why this test case has failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some assertions and comments that should explain it.
} | ||
|
||
@Test | ||
public void testBinaryJavaUdf() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you link https://issues.apache.org/jira/browse/BEAM-11747 to here.
We will need to figure out how to better handle the mixed case. To me the better way is to reject such cases before we implement Calc splitting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure you fix your call to getValue
to check for null before merging, otherwise LGTM.
I left some comments here on bypassing layers. I'll let you decide if any of those need to be fixed now, in a followup, or if they can wait. (My thoughts: Supporting SqlTransform
UDFs on ZetaSQL is something that is expected to come out of this work, and those currently go through SchemaPlus. Building out a TableProvider interface and supporting this new UDF format in Calcite is a larger project that can probably wait until we add UDFs to DataCatalog.)
@@ -127,6 +144,53 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) { | |||
return tables.build(); | |||
} | |||
|
|||
/** Returns the fully qualified name of the function defined in the statement. */ | |||
static String getFunctionQualifiedName(ResolvedCreateFunctionStmt createFunctionStmt) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it.
} | ||
} | ||
|
||
private Function createFunction(ResolvedCreateFunctionStmt createFunctionStmt) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like you bypassed a few abstraction layers here. Probably ResolvedCreateFunctionStmt
should add a udf to the TableProvider
(or an equivalent for UDFs). For an example, see CREATE EXTERNAL TABLE in the calcite dialect:
Line 136 in 3bb232f
schema.getTableProvider().createTable(toTable()); |
@@ -115,6 +116,22 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) { | |||
>= parseResumeLocation.getInput().getBytes(UTF_8).length; | |||
} | |||
|
|||
static String getOptionStringValue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This method appears to be used exactly once in another file. It should go right next to the method that calls it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} else if (SqlAnalyzer.USER_DEFINED_JAVA_SCALAR_FUNCTIONS.equals(functionGroup)) { | ||
String jarPath = getJarPath(createFunctionStmt); | ||
ScalarFn scalarFn = | ||
javaUdfLoader.loadScalarFunction(createFunctionStmt.getNamePath(), jarPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again on bypassing layers, it seems like all this should be in a TableProvider
like interface, and be built through a buildBeamSqlUDF
method called from BeamCalciteSchema
(see just above the line in this link for the table example):
Line 122 in 68d6c8e
public Collection<Function> getFunctions(String name) { |
static String getOptionStringValue( | ||
ResolvedCreateFunctionStmt createFunctionStmt, String optionName) { | ||
for (ResolvedNodes.ResolvedOption option : createFunctionStmt.getOptionList()) { | ||
if (option.getName().equals(optionName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about optionName.equals(option.getName())
. That will avoid potential crashes if getName
returns null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ResolvedCreateFunctionStmt createFunctionStmt, String optionName) { | ||
for (ResolvedNodes.ResolvedOption option : createFunctionStmt.getOptionList()) { | ||
if (option.getName().equals(optionName)) { | ||
if (option.getValue().getType().getKind() != TypeKind.TYPE_STRING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getValue
can return null here. I didn't check the other two, but you can find a copy of the generated ResolvedNodes.java
in internal code search. https://github.com/google/zetasql/blob/862a192a6da487757e860166a9666120b16773f5/java/com/google/zetasql/resolvedast/ResolvedNodes.java.template#L295
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done (hopefully at some point we can enable the nullness checker on this file)
"Native SQL aggregate functions are not supported (BEAM-9954)."); | ||
} | ||
return USER_DEFINED_FUNCTIONS; | ||
case "PY": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious as to where these came from. Is there another engine that supports these constants?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BigQuery supports JS
. I made up the rest of them.
|
||
ResolvedStatement statement; | ||
ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql); | ||
do { | ||
statement = analyzer.analyzeNextStatement(parseResumeLocation, options, catalog); | ||
if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) { | ||
ResolvedCreateFunctionStmt createFunctionStmt = (ResolvedCreateFunctionStmt) statement; | ||
udfBuilder.put(createFunctionStmt.getNamePath(), createFunctionStmt); | ||
String functionGroup = SqlAnalyzer.getFunctionGroup(createFunctionStmt); | ||
if (SqlAnalyzer.USER_DEFINED_FUNCTIONS.equals(functionGroup)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: switch/case is better than if for this pattern if your string isn't null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
I'll leave those for a follow-up (tracked by BEAM-10943). |
Run Java_Examples_Dataflow_Java11 PreCommit |
LGTM |
This completes the proof of concept. Note however that when UDFs and
built-in ZetaSQL operators are mixed, the program will crash without
warning.
R: @amaliujia @apilloud
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.