Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.flink.table.functions.hive;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;

import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.UDF;

import java.io.Serializable;
Expand All @@ -36,23 +38,53 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
public static final long serialVersionUID = 393313529306818205L;

private final String className;
// a field to hold the string serialized for the UDF.
// we sometimes need to hold it in case of some serializable UDF will contain
// additional information such as Hive's GenericUDFMacro and if we construct the UDF directly by
// getUDFClass#newInstance, the information will be missed.
private String udfSerializedString;

private transient UDFType instance = null;

public HiveFunctionWrapper(String className) {
this.className = className;
}

/**
* Create a HiveFunctionWrapper with a UDF instance. In this constructor, the instance will be
* serialized to string and held on in the HiveFunctionWrapper.
*/
public HiveFunctionWrapper(String className, UDFType serializableInstance) {
this(className);
Preconditions.checkArgument(
serializableInstance.getClass().getName().equals(className),
String.format(
"Expect the UDF is instance of %s, but is instance of %s.",
className, serializableInstance.getClass().getName()));
Preconditions.checkArgument(
serializableInstance instanceof Serializable,
String.format(
"The UDF %s should be an instance of Serializable.",
serializableInstance.getClass().getName()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check the class name is equal to the class of serializableInstance.

// we need to use the SerializationUtilities#serializeObject to serialize UDF for the UDF
// may not be serialized by Java serializer
this.udfSerializedString =
SerializationUtilities.serializeObject((Serializable) serializableInstance);
}

/**
* Instantiate a Hive function instance.
*
* @return a Hive function instance
*/
public UDFType createFunction() {
if (instance != null) {
if (udfSerializedString != null) {
// deserialize the string to udf instance
return deserializeUDF();
} else if (instance != null) {
return instance;
} else {
UDFType func = null;
UDFType func;
try {
func = getUDFClass().newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
Expand Down Expand Up @@ -88,4 +120,20 @@ public String getClassName() {
public Class<UDFType> getUDFClass() throws ClassNotFoundException {
return (Class<UDFType>) Thread.currentThread().getContextClassLoader().loadClass(className);
}

/**
* Deserialize UDF used the udfSerializedString held on.
*
* @return the UDF deserialized
*/
private UDFType deserializeUDF() {
try {
return (UDFType)
SerializationUtilities.deserializeObject(
udfSerializedString, (Class<Serializable>) getUDFClass());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is problematic in the case the hive connector is dynamically loaded, i.e. in the user classloader instead of the current class loader. We should use the user classloader to load classes. Could you create a JIRA issue for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, I have created FLINK-28430 for it.

} catch (ClassNotFoundException e) {
throw new FlinkHiveUDFException(
String.format("Failed to deserialize function %s.", className), e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Failed to deserialize function %s."

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ private List<Operation> processCmd(
this,
hiveShim,
context,
dmlHelper);
dmlHelper,
frameworkConfig,
plannerContext.getCluster());
operation = ddlAnalyzer.convertToOperation(node);
return Collections.singletonList(operation);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.delegation.hive.parse;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
Expand Down Expand Up @@ -55,6 +56,8 @@
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
import org.apache.flink.table.functions.hive.HiveGenericUDF;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
Expand Down Expand Up @@ -96,25 +99,36 @@
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.HiveParserRowFormatParams;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserContext;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserQueryState;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserRowResolver;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserStorageFormat;
import org.apache.flink.table.planner.utils.OperationConverterUtils;

import org.antlr.runtime.tree.CommonTree;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.PrincipalDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.slf4j.Logger;
Expand All @@ -125,6 +139,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -161,6 +176,7 @@
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_LOCATION_URI;
import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.NotNullConstraint;
import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.PrimaryKey;
import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getColumns;

/**
* Ported hive's org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer, and also incorporated
Expand All @@ -182,6 +198,8 @@ public class HiveParserDDLSemanticAnalyzer {
private final HiveShim hiveShim;
private final HiveParserContext context;
private final HiveParserDMLHelper dmlHelper;
private final FrameworkConfig frameworkConfig;
private final RelOptCluster cluster;

static {
TokenToTypeName.put(HiveASTParser.TOK_BOOLEAN, serdeConstants.BOOLEAN_TYPE_NAME);
Expand Down Expand Up @@ -242,7 +260,9 @@ public HiveParserDDLSemanticAnalyzer(
HiveParser hiveParser,
HiveShim hiveShim,
HiveParserContext context,
HiveParserDMLHelper dmlHelper)
HiveParserDMLHelper dmlHelper,
FrameworkConfig frameworkConfig,
RelOptCluster cluster)
throws SemanticException {
this.queryState = queryState;
this.conf = queryState.getConf();
Expand All @@ -254,6 +274,8 @@ public HiveParserDDLSemanticAnalyzer(
this.hiveShim = hiveShim;
this.context = context;
this.dmlHelper = dmlHelper;
this.frameworkConfig = frameworkConfig;
this.cluster = cluster;
reservedPartitionValues = new HashSet<>();
// Partition can't have this name
reservedPartitionValues.add(HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME));
Expand Down Expand Up @@ -339,6 +361,12 @@ public Operation convertToOperation(HiveParserASTNode ast) throws SemanticExcept
case HiveASTParser.TOK_DROPFUNCTION:
res = convertDropFunction(ast);
break;
case HiveASTParser.TOK_CREATEMACRO:
res = convertCreateMacro(ast);
break;
case HiveASTParser.TOK_DROPMACRO:
res = convertDropMacro(ast);
break;
case HiveASTParser.TOK_DESCFUNCTION:
case HiveASTParser.TOK_DESCDATABASE:
case HiveASTParser.TOK_TRUNCATETABLE:
Expand Down Expand Up @@ -506,6 +534,115 @@ private Operation convertCreateFunction(HiveParserASTNode ast) {
}
}

private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException {
String macroName = ast.getChild(0).getText();
if (FunctionUtils.isQualifiedFunctionName(macroName)) {
throw new SemanticException(
String.format(
"CREATE TEMPORARY MACRO doesn't allow \".\" character in the macro name, but the name is \"%s\".",
macroName));
}

// macro use table's columns as argument, so get the corresponding column
List<FieldSchema> arguments = getColumns((HiveParserASTNode) ast.getChild(1), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The arguments readability is not good? IMO, it refers the all columns of the table? In other words, we should add an annotation about it.

Set<String> actualColumnNames = getActualColumnNames(ast, arguments);

HiveParserRowResolver rowResolver = new HiveParserRowResolver();
Tuple2<List<String>, List<TypeInfo>> macroColumnNameAndType =
getMacroColumnData(arguments, actualColumnNames, rowResolver);
ExprNodeDesc body = getBody(ast, arguments, rowResolver);

GenericUDFMacro macro =
new GenericUDFMacro(
macroName, body, macroColumnNameAndType.f0, macroColumnNameAndType.f1);

FunctionDefinition macroDefinition =
new HiveGenericUDF(
new HiveFunctionWrapper<>(GenericUDFMacro.class.getName(), macro),
hiveShim);
// hive's marco is more like flink's temp system function
return new CreateTempSystemFunctionOperation(macroName, false, macroDefinition);
}

private Set<String> getActualColumnNames(HiveParserASTNode ast, List<FieldSchema> arguments)
throws SemanticException {
final Set<String> actualColumnNames = new HashSet<>();

if (!arguments.isEmpty()) {
// Walk down expression to see which arguments are actually used.
Node expression = (Node) ast.getChild(2);

PreOrderWalker walker =
new PreOrderWalker(
(nd, stack, nodeOutputs) -> {
if (nd instanceof HiveParserASTNode) {
HiveParserASTNode node = (HiveParserASTNode) nd;
if (node.getType() == HiveASTParser.TOK_TABLE_OR_COL) {
actualColumnNames.add(node.getChild(0).getText());
}
}
return null;
});
walker.startWalking(Collections.singleton(expression), null);
}
return actualColumnNames;
}

private Tuple2<List<String>, List<TypeInfo>> getMacroColumnData(
List<FieldSchema> arguments,
Set<String> actualColumnNames,
HiveParserRowResolver rowResolver)
throws SemanticException {
List<String> macroColumnNames = new ArrayList<>();
List<TypeInfo> macroColumnTypes = new ArrayList<>();
for (FieldSchema argument : arguments) {
TypeInfo columnType = TypeInfoUtils.getTypeInfoFromTypeString(argument.getType());
rowResolver.put(
StringUtils.EMPTY,
argument.getName(),
new ColumnInfo(argument.getName(), columnType, StringUtils.EMPTY, false));
macroColumnNames.add(argument.getName());
macroColumnTypes.add(columnType);
}
Set<String> expectedColumnNames = new LinkedHashSet<>(macroColumnNames);
if (!expectedColumnNames.equals(actualColumnNames)) {
throw new SemanticException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String.format("Expected columns [%s], but found [%s].", expectedColumnNames, actualColumnNames) here ?

String.format(
"Expected columns [%s], but found [%s].",
expectedColumnNames, actualColumnNames));
}
if (expectedColumnNames.size() != macroColumnNames.size()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about expectedColumnNames.size() > macroColumnNames.size() or expectedColumnNames.size() < macroColumnNames.size() case, whether the error message is the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expectedColumnNames.size() > macroColumnNames.size() will nerver happen for expectedColumnNames is a set of macroColumnNames.

throw new SemanticException(
"At least one parameter name was used more than once " + macroColumnNames);
}
return Tuple2.of(macroColumnNames, macroColumnTypes);
}

private ExprNodeDesc getBody(
HiveParserASTNode ast, List<FieldSchema> arguments, HiveParserRowResolver rowResolver)
throws SemanticException {
HiveParserSemanticAnalyzer semanticAnalyzer =
new HiveParserSemanticAnalyzer(queryState, hiveShim, frameworkConfig, cluster);
return arguments.isEmpty()
? semanticAnalyzer.genExprNodeDesc((HiveParserASTNode) ast.getChild(1), rowResolver)
: semanticAnalyzer.genExprNodeDesc(
(HiveParserASTNode) ast.getChild(2), rowResolver);
}

private Operation convertDropMacro(HiveParserASTNode ast) throws SemanticException {
String macroName = ast.getChild(0).getText();
if (FunctionUtils.isQualifiedFunctionName(macroName)) {
throw new SemanticException(
String.format(
"DROP TEMPORARY MACRO doesn't allow \".\" character in the macro name, but the name is \"%s\".",
macroName));
}

boolean ifExists = (ast.getFirstChildWithType(HiveASTParser.TOK_IFEXISTS) != null);
// macro is always temporary function
return new DropTempSystemFunctionOperation(macroName, ifExists);
}

private Operation convertAlterView(HiveParserASTNode ast) throws SemanticException {
Operation operation = null;
String[] qualified =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
import org.apache.hadoop.hive.serde.serdeConstants;
Expand Down Expand Up @@ -811,6 +812,52 @@ public void testShowPartitions() throws Exception {
assertThat(partitions.toString()).contains("dt=2020-04-30 01:02:03/country=china");
}

@Test
public void testMacro() throws Exception {
tableEnv.executeSql("create temporary macro string_len (x string) length(x)");
tableEnv.executeSql("create temporary macro string_len_plus(x string) length(x) + 1");
tableEnv.executeSql("create table macro_test (x string)");
tableEnv.executeSql("insert into table macro_test values ('bb'), ('a'), ('cc')").await();
List<Row> result =
CollectionUtil.iteratorToList(
tableEnv.executeSql(
"select string_len(x), string_len_plus(x) from macro_test")
.collect());
assertThat(result.toString()).isEqualTo("[+I[2, 3], +I[1, 2], +I[2, 3]]");
// drop macro
tableEnv.executeSql("drop temporary macro string_len_plus");
// create macro
tableEnv.executeSql("create temporary macro string_len_plus(x string) length(x) + 2");
result =
CollectionUtil.iteratorToList(
tableEnv.executeSql(
"select string_len(x), string_len_plus(x) from macro_test")
.collect());
assertThat(result.toString()).isEqualTo("[+I[2, 4], +I[1, 3], +I[2, 4]]");
String badMacroName = "db.string_len";
// should fail when create macro whose name contains "."
assertThatThrownBy(
() ->
tableEnv.executeSql(
String.format(
"create temporary macro `%s` (x string) length(x)",
badMacroName)))
.hasRootCauseInstanceOf(SemanticException.class)
.hasRootCauseMessage(
String.format(
"CREATE TEMPORARY MACRO doesn't allow \".\" character in the macro name, but the name is \"%s\".",
badMacroName));
// should fail when drop macro whose name contains "."
assertThatThrownBy(
() ->
tableEnv.executeSql(
String.format("drop temporary macro `%s`", badMacroName)))
.hasRootCauseInstanceOf(SemanticException.class)
.hasRootCauseMessage(
"DROP TEMPORARY MACRO doesn't allow \".\" character in the macro name, but the name is \"%s\".",
badMacroName);
}

@Test
public void testUnsupportedOperation() {
List<String> statements =
Expand Down
Loading