From 36155156934fb397a7f92d98c31c78f2dc25c9b4 Mon Sep 17 00:00:00 2001 From: zeroflag Date: Mon, 12 Apr 2021 16:39:43 +0200 Subject: [PATCH] HIVE-24997. HPL/SQL udf doesn't work in tez container mode (amagyar) --- .../org/apache/hive/hplsql/Arguments.java | 8 ++- .../java/org/apache/hive/hplsql/Exec.java | 26 ++++---- .../hive/hplsql/executor/QueryExecutor.java | 5 ++ .../java/org/apache/hive/hplsql/udf/Udf.java | 64 ++++++++++++++----- .../hive/beeline/TestHplSqlViaBeeLine.java | 12 ++++ .../operation/ExecuteStatementOperation.java | 16 +++++ 6 files changed, 104 insertions(+), 27 deletions(-) diff --git a/hplsql/src/main/java/org/apache/hive/hplsql/Arguments.java b/hplsql/src/main/java/org/apache/hive/hplsql/Arguments.java index 47e414af4abd..7c4e33f1d622 100644 --- a/hplsql/src/main/java/org/apache/hive/hplsql/Arguments.java +++ b/hplsql/src/main/java/org/apache/hive/hplsql/Arguments.java @@ -37,7 +37,13 @@ public class Arguments { String fileName; String main; Map vars = new HashMap(); - + + public static Arguments script(String str) { + Arguments arguments = new Arguments(); + arguments.parse(new String[]{"-e", str}); + return arguments; + } + @SuppressWarnings("static-access") public Arguments() { // -e 'query' diff --git a/hplsql/src/main/java/org/apache/hive/hplsql/Exec.java b/hplsql/src/main/java/org/apache/hive/hplsql/Exec.java index 3b5c3d12d3ea..f65446d47766 100644 --- a/hplsql/src/main/java/org/apache/hive/hplsql/Exec.java +++ b/hplsql/src/main/java/org/apache/hive/hplsql/Exec.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; +import java.io.UncheckedIOException; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.sql.Connection; @@ -52,6 +53,7 @@ import org.antlr.v4.runtime.tree.ParseTree; import org.antlr.v4.runtime.tree.TerminalNode; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hive.hplsql.Var.Type; import org.apache.hive.hplsql.executor.JdbcQueryExecutor; @@ -761,10 +763,12 @@ public Integer run(String[] args) throws Exception { return getProgramReturnCode(); } - public void parseAndEval(Arguments arguments) throws IOException { + public Var parseAndEval(Arguments arguments) { ParseTree tree; try (InputStream input = sourceStream(arguments)) { tree = parse(input); + } catch (IOException e) { + throw new UncheckedIOException(e); } Var result = null; try { @@ -775,6 +779,7 @@ public void parseAndEval(Arguments arguments) throws IOException { if (result != null) { console.printLine(result.toString()); } + return result; } private Var evaluate(ParseTree tree, String execMain) { @@ -984,15 +989,6 @@ public Integer visitBegin_end_block(HplsqlParser.Begin_end_blockContext ctx) { return rc; } - public Var eval(String source) { - HplsqlLexer lexer = new HplsqlLexer(new ANTLRInputStream(source)); - CommonTokenStream tokens = new CommonTokenStream(lexer); - HplsqlParser parser = newParser(tokens); - HplsqlParser.ProgramContext program = parser.program(); - visit(program); - return !exec.stack.isEmpty() ? exec.stackPop() : null; - } - /** * Free resources before exit */ @@ -1020,7 +1016,7 @@ void printExceptions() { } else if (sig.type == Signal.Type.UNSUPPORTED_OPERATION) { console.printError(sig.value == null ? "Unsupported operation" : sig.value); } else if (sig.exception != null) { - sig.exception.printStackTrace(); + console.printError("HPL/SQL error: " + ExceptionUtils.getStackTrace(sig.exception)); } else if (sig.value != null) { console.printError(sig.value); } else { @@ -2861,4 +2857,12 @@ public boolean getOffline() { public Console getConsole() { return console; } + + public void setQueryExecutor(QueryExecutor queryExecutor) { + this.queryExecutor = queryExecutor; + } + + public IMetaStoreClient getMsc() { + return msc; + } } diff --git a/hplsql/src/main/java/org/apache/hive/hplsql/executor/QueryExecutor.java b/hplsql/src/main/java/org/apache/hive/hplsql/executor/QueryExecutor.java index 74b74551b78b..7ab1fb8cfaa7 100644 --- a/hplsql/src/main/java/org/apache/hive/hplsql/executor/QueryExecutor.java +++ b/hplsql/src/main/java/org/apache/hive/hplsql/executor/QueryExecutor.java @@ -21,7 +21,12 @@ package org.apache.hive.hplsql.executor; import org.antlr.v4.runtime.ParserRuleContext; +import org.apache.hive.hplsql.HplValidationException; public interface QueryExecutor { QueryResult executeQuery(String sql, ParserRuleContext ctx); + + QueryExecutor DISABLED = (sql, ctx) -> { + throw new HplValidationException(ctx, "Query execution is disabled in this context. Can not execute: " + sql); + }; } diff --git a/hplsql/src/main/java/org/apache/hive/hplsql/udf/Udf.java b/hplsql/src/main/java/org/apache/hive/hplsql/udf/Udf.java index 7cbf44346a6f..552100d777a1 100644 --- a/hplsql/src/main/java/org/apache/hive/hplsql/udf/Udf.java +++ b/hplsql/src/main/java/org/apache/hive/hplsql/udf/Udf.java @@ -18,6 +18,8 @@ package org.apache.hive.hplsql.udf; +import org.apache.hadoop.hive.metastore.api.StoredProcedure; +import org.apache.hadoop.hive.metastore.api.StoredProcedureRequest; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; @@ -25,22 +27,27 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hive.hplsql.Arguments; import org.apache.hive.hplsql.Exec; import org.apache.hive.hplsql.Scope; import org.apache.hive.hplsql.Var; +import org.apache.hive.hplsql.executor.QueryExecutor; +import org.apache.thrift.TException; @Description(name = "hplsql", value = "_FUNC_('query' [, :1, :2, ...n]) - Execute HPL/SQL query", extended = "Example:\n" + " > SELECT _FUNC_('CURRENT_DATE') FROM src LIMIT 1;\n") @UDFType(deterministic = false) public class Udf extends GenericUDF { - + public static String NAME = "hplsql"; transient Exec exec; StringObjectInspector queryOI; ObjectInspector[] argumentsOI; + private String functionDefinition; public Udf() { } @@ -60,32 +67,59 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen if (!(arguments[0] instanceof StringObjectInspector)) { throw new UDFArgumentException("First argument must be a string"); } + SessionState sessionState = SessionState.get(); + if (sessionState != null) { + // we are still in HiveServer, get the source of the HplSQL function and store it. + functionDefinition = loadSource(sessionState, functionName(arguments[0])); + } queryOI = (StringObjectInspector)arguments[0]; argumentsOI = arguments; - if (exec == null) { - exec = SessionState.get() == null ? null : SessionState.get().getDynamicVar(Exec.class); - } - if (exec == null) { - throw new UDFArgumentException("Cannot be used in non HPL/SQL mode."); - } return PrimitiveObjectInspectorFactory.javaStringObjectInspector; } + protected String loadSource(SessionState sessionState, String functionName) throws UDFArgumentException { + Exec exec = sessionState.getDynamicVar(Exec.class); + try { + StoredProcedure storedProcedure = exec.getMsc().getStoredProcedure( + new StoredProcedureRequest( + SessionState.get().getCurrentCatalog(), + SessionState.get().getCurrentDatabase(), + functionName)); + return storedProcedure != null ? storedProcedure.getSource() : null; + } catch (TException e) { + throw new UDFArgumentException(e); + } + } + + protected String functionName(ObjectInspector argument) { + ConstantObjectInspector inspector = (ConstantObjectInspector) (argument); + String functionCall = inspector.getWritableConstantValue().toString(); + return functionCall.split("\\(")[0].toUpperCase(); + } + /** * Execute UDF */ @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { - String query = queryOI.getPrimitiveJavaObject(arguments[0].get()); - exec.enterScope(Scope.Type.ROUTINE); - if (arguments.length > 1) { - setParameters(arguments); + if (exec == null) { + exec = new Exec(); + exec.setQueryExecutor(QueryExecutor.DISABLED); + exec.init(); + if (functionDefinition != null) { // if it's null, it can be a built-in function + exec.parseAndEval(Arguments.script(functionDefinition)); + } } + exec.enterScope(Scope.Type.ROUTINE); setParameters(arguments); - Var result = exec.eval(query); - exec.callStackPop(); - exec.leaveScope(); - return result != null ? result.toString() : null; + String query = queryOI.getPrimitiveJavaObject(arguments[0].get()); + try { + Var result = exec.parseAndEval(Arguments.script(query)); + exec.callStackPop(); + return result != null ? result.toString() : null; + } finally { + exec.close(); + } } /** diff --git a/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestHplSqlViaBeeLine.java b/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestHplSqlViaBeeLine.java index 6b3b826b3bcf..83b50b085f71 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestHplSqlViaBeeLine.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestHplSqlViaBeeLine.java @@ -170,6 +170,18 @@ public void testPackage() throws Throwable { "SELECT * FROM result;\n"; testScriptFile(SCRIPT_TEXT, args(), "12345"); } + + @Test + public void testUdf() throws Throwable { + String SCRIPT_TEXT = + "DROP TABLE IF EXISTS result;\n" + + "CREATE TABLE result (s string);\n" + + "INSERT INTO result VALUES('alice');\n" + + "INSERT INTO result VALUES('bob');\n" + + "CREATE FUNCTION hello(p STRING) RETURNS STRING BEGIN RETURN 'hello ' || p; END;\n" + + "SELECT hello(s) FROM result;\n"; + testScriptFile(SCRIPT_TEXT, args(), "hello alice.*hello bob"); + } @Test public void testDbChange() throws Throwable { diff --git a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java index 621838cf3c9c..aad4c7700a17 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java @@ -24,6 +24,9 @@ import java.sql.SQLException; import java.util.Map; +import org.apache.hive.service.cli.operation.hplsql.BeelineConsole; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.processors.CommandProcessor; import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; import org.apache.hadoop.hive.ql.session.SessionState; @@ -31,6 +34,8 @@ import org.apache.hive.hplsql.Conf; import org.apache.hive.hplsql.Exec; import org.apache.hive.hplsql.HplSqlSessionState; +import org.apache.hive.hplsql.ResultListener; +import org.apache.hive.hplsql.udf.Udf; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationType; import org.apache.hive.service.cli.operation.hplsql.BeelineConsole; @@ -71,6 +76,7 @@ public static ExecuteStatementOperation newExecuteStatementOperation(HiveSession new HiveHplSqlSessionState(SessionState.get()) ); interpreter.init(); + registerUdf(); SessionState.get().addDynamicVar(interpreter); } return new HplSqlOperation(parentSession, statement, confOverlay, runAsync, SessionState.get().getDynamicVar(Exec.class)); @@ -91,6 +97,16 @@ public static ExecuteStatementOperation newExecuteStatementOperation(HiveSession return new HiveCommandOperation(parentSession, cleanStatement, processor, confOverlay); } + private static void registerUdf() throws HiveSQLException { + try { + if (FunctionRegistry.getTemporaryFunctionInfo(Udf.NAME) == null) { + FunctionRegistry.registerTemporaryUDF(Udf.NAME, org.apache.hive.hplsql.udf.Udf.class); + } + } catch (SemanticException e) { + throw new HiveSQLException(e); + } + } + private static boolean proceduralMode(Map confOverlay) { return confOverlay != null && !HPLSQL.equals(confOverlay.get(QUERY_EXECUTOR)); }