Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-26414][hive] Hive dialect supports macro #19561

Merged
merged 3 commits into from
Jul 7, 2022

Conversation

luoyuxia
Copy link
Contributor

@luoyuxia luoyuxia commented Apr 24, 2022

What is the purpose of the change

Hive dialect supports macro.

Brief change log

  • Support parse create/drop macro
  • Convert calling macro to invoking Hive's GenericUDFMacro

Verifying this change

Added test in ...link-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java#testMacro

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? N/A

@luoyuxia luoyuxia changed the title [FLINK-26366][hive] Hive dialect supports macro [FLINK-26414][hive] Hive dialect supports macro Apr 24, 2022
@flinkbot
Copy link
Collaborator

flinkbot commented Apr 24, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@lsyldliu lsyldliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luoyuxia Thanks for your contribution, I left some comments.

private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException {
String macroName = ast.getChild(0).getText();
if (FunctionUtils.isQualifiedFunctionName(macroName)) {
throw new SemanticException("Temporary macro cannot be created with a qualified name.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the method FunctionUtils.isQualifiedFunctionName, this error message may be not cleared for user? so I think we can improve it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

@@ -139,6 +139,13 @@ public void testFunction() throws Exception {
assertDDLType(HiveASTParser.TOK_SHOWFUNCTIONS, "show functions");
}

@Test
public void testMacro() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be better if we add a test that create a mcro with qualified name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may not need for macro with a qualified name will fail in parse phase.
The antrl segment is like follows:
KW_CREATE KW_TEMPORARY KW_MACRO Identifier LPAREN columnNameTypeList? RPAREN expression -> ^(TOK_CREATEMACRO Identifier columnNameTypeList? expression)

* serialized to string and held on in the HiveFunctionWrapper.
*/
public HiveFunctionWrapper(String className, UDFType serializableInstance) {
this.className = className;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can call it directly this(className);

} catch (ClassNotFoundException e) {
throw new FlinkHiveUDFException(
String.format("Failed to deserialize function %s", className), e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Failed to deserialize function %s."

throw new SemanticException("Temporary macro cannot be created with a qualified name.");
}

List<FieldSchema> arguments = getColumns((HiveParserASTNode) ast.getChild(1), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The arguments readability is not good? IMO, it refers the all columns of the table? In other words, we should add an annotation about it.

return actualColumnNames;
}

private void getMacroColumnData(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can return a Tuple2 here directly? BTW, maybe we can merge the method getMacroColumnData and getBody into one, then return a Tuple3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return a Tuple2 is good to me. But I would like separate the getMacroColumnData and getBody,

}
Set<String> expectedColumnNames = new LinkedHashSet<>(macroColumnNames);
if (!expectedColumnNames.equals(actualColumnNames)) {
throw new SemanticException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String.format("Expected columns [%s], but found [%s].", expectedColumnNames, actualColumnNames) here ?

throw new SemanticException(
"Expected columns " + expectedColumnNames + " but found " + actualColumnNames);
}
if (expectedColumnNames.size() != macroColumnNames.size()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about expectedColumnNames.size() > macroColumnNames.size() or expectedColumnNames.size() < macroColumnNames.size() case, whether the error message is the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expectedColumnNames.size() > macroColumnNames.size() will nerver happen for expectedColumnNames is a set of macroColumnNames.

for (FieldSchema argument : arguments) {
TypeInfo columnType = TypeInfoUtils.getTypeInfoFromTypeString(argument.getType());
rowResolver.put(
"",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.commons.lang3.StringUtils.EMPTY?

rowResolver.put(
"",
argument.getName(),
new ColumnInfo(argument.getName(), columnType, "", false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above

@luoyuxia
Copy link
Contributor Author

luoyuxia commented Jun 8, 2022

@lsyldliu Thanks for your review. I have adrressed your comments.

@luoyuxia
Copy link
Contributor Author

@flinkbot run azure

serializableInstance instanceof Serializable,
String.format(
"The UDF %s should be an instance of Serializable.",
serializableInstance.getClass().getName()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check the class name is equal to the class of serializableInstance.

try {
return (UDFType)
SerializationUtilities.deserializeObject(
udfSerializedString, (Class<Serializable>) getUDFClass());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is problematic in the case the hive connector is dynamically loaded, i.e. in the user classloader instead of the current class loader. We should use the user classloader to load classes. Could you create a JIRA issue for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, I have created FLINK-28430 for it.

private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException {
String macroName = ast.getChild(0).getText();
if (FunctionUtils.isQualifiedFunctionName(macroName)) {
throw new SemanticException("The name of the temporary macro can't contain `.`.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also print the wrong function name. For example:

CREATE TEMPORARY MACRO doesn't allow "." character in the macro name, but the name is "%s".

Besides, is there a test for the exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add the test for the exception in HiveDialectITCase#testMacro just now.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wuchong wuchong merged commit db633e8 into apache:master Jul 7, 2022
ericccarlson pushed a commit to ericccarlson/flink that referenced this pull request Jul 11, 2022
liujiawinds pushed a commit to liujiawinds/flink that referenced this pull request Jul 22, 2022
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants