Skip to content

Commit

Permalink
HIVE-24997. HPL/SQL udf doesn't work in tez container mode (amagyar)
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroflag committed Apr 12, 2021
1 parent a2d50ef commit 17c0a7e
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 23 deletions.
8 changes: 7 additions & 1 deletion hplsql/src/main/java/org/apache/hive/hplsql/Arguments.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ public class Arguments {
String fileName;
String main;
Map<String, String> vars = new HashMap<String, String>();


public static Arguments script(String str) {
Arguments arguments = new Arguments();
arguments.parse(new String[]{"-e", str});
return arguments;
}

@SuppressWarnings("static-access")
public Arguments() {
// -e 'query'
Expand Down
19 changes: 9 additions & 10 deletions hplsql/src/main/java/org/apache/hive/hplsql/Exec.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
Expand Down Expand Up @@ -761,10 +762,12 @@ public Integer run(String[] args) throws Exception {
return getProgramReturnCode();
}

public void parseAndEval(Arguments arguments) throws IOException {
public Var parseAndEval(Arguments arguments) {
ParseTree tree;
try (InputStream input = sourceStream(arguments)) {
tree = parse(input);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Var result = null;
try {
Expand All @@ -775,6 +778,7 @@ public void parseAndEval(Arguments arguments) throws IOException {
if (result != null) {
console.printLine(result.toString());
}
return result;
}

private Var evaluate(ParseTree tree, String execMain) {
Expand Down Expand Up @@ -984,15 +988,6 @@ public Integer visitBegin_end_block(HplsqlParser.Begin_end_blockContext ctx) {
return rc;
}

public Var eval(String source) {
HplsqlLexer lexer = new HplsqlLexer(new ANTLRInputStream(source));
CommonTokenStream tokens = new CommonTokenStream(lexer);
HplsqlParser parser = newParser(tokens);
HplsqlParser.ProgramContext program = parser.program();
visit(program);
return !exec.stack.isEmpty() ? exec.stackPop() : null;
}

/**
* Free resources before exit
*/
Expand Down Expand Up @@ -2861,4 +2856,8 @@ public boolean getOffline() {
public Console getConsole() {
return console;
}

public IMetaStoreClient getMsc() {
return msc;
}
}
53 changes: 41 additions & 12 deletions hplsql/src/main/java/org/apache/hive/hplsql/udf/Udf.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,35 @@

package org.apache.hive.hplsql.udf;

import org.apache.hadoop.hive.metastore.api.StoredProcedure;
import org.apache.hadoop.hive.metastore.api.StoredProcedureRequest;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hive.hplsql.Arguments;
import org.apache.hive.hplsql.Exec;
import org.apache.hive.hplsql.Scope;
import org.apache.hive.hplsql.Var;
import org.apache.thrift.TException;

@Description(name = "hplsql", value = "_FUNC_('query' [, :1, :2, ...n]) - Execute HPL/SQL query", extended = "Example:\n" + " > SELECT _FUNC_('CURRENT_DATE') FROM src LIMIT 1;\n")
@UDFType(deterministic = false)
public class Udf extends GenericUDF {

public static String NAME = "hplsql";
transient Exec exec;
StringObjectInspector queryOI;
ObjectInspector[] argumentsOI;
private String functionDefinition;

public Udf() {
}
Expand All @@ -60,29 +66,52 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen
if (!(arguments[0] instanceof StringObjectInspector)) {
throw new UDFArgumentException("First argument must be a string");
}
SessionState sessionState = SessionState.get();
if (sessionState != null) {
// we are still in HiveServer, get the source of the HplSQL function and store it.
functionDefinition = loadSource(sessionState, functionName(arguments[0]));
}
queryOI = (StringObjectInspector)arguments[0];
argumentsOI = arguments;
if (exec == null) {
exec = SessionState.get() == null ? null : SessionState.get().getDynamicVar(Exec.class);
}
if (exec == null) {
throw new UDFArgumentException("Cannot be used in non HPL/SQL mode.");
}
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}

protected String loadSource(SessionState sessionState, String functionName) throws UDFArgumentException {
Exec exec = sessionState.getDynamicVar(Exec.class);
try {
StoredProcedure storedProcedure = exec.getMsc().getStoredProcedure(
new StoredProcedureRequest(
SessionState.get().getCurrentCatalog(),
SessionState.get().getCurrentDatabase(),
functionName));
return storedProcedure != null ? storedProcedure.getSource() : null;
} catch (TException e) {
throw new UDFArgumentException(e);
}
}

protected String functionName(ObjectInspector argument) {
ConstantObjectInspector inspector = (ConstantObjectInspector) (argument);
String functionCall = inspector.getWritableConstantValue().toString();
return functionCall.split("\\(")[0].toUpperCase();
}

/**
* Execute UDF
*/
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
String query = queryOI.getPrimitiveJavaObject(arguments[0].get());
exec.enterScope(Scope.Type.ROUTINE);
if (arguments.length > 1) {
setParameters(arguments);
if (exec == null) {
exec = new Exec();
exec.init();
if (functionDefinition != null) { // if it's null, it can be a built-in function
exec.parseAndEval(Arguments.script(functionDefinition));
}
}
exec.enterScope(Scope.Type.ROUTINE);
setParameters(arguments);
Var result = exec.eval(query);
String query = queryOI.getPrimitiveJavaObject(arguments[0].get());
Var result = exec.parseAndEval(Arguments.script(query));
exec.callStackPop();
exec.leaveScope();
return result != null ? result.toString() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ public void testPackage() throws Throwable {
"SELECT * FROM result;\n";
testScriptFile(SCRIPT_TEXT, args(), "12345");
}

@Test
public void testUdf() throws Throwable {
String SCRIPT_TEXT =
"DROP TABLE IF EXISTS result;\n" +
"CREATE TABLE result (s string);\n" +
"INSERT INTO result VALUES('alice');\n" +
"INSERT INTO result VALUES('bob');\n" +
"CREATE FUNCTION hello(p STRING) RETURNS STRING BEGIN RETURN 'hello ' || p; END;\n" +
"SELECT hello(s) FROM result;\n";
testScriptFile(SCRIPT_TEXT, args(), "hello alice.*hello bob");
}

@Test
public void testDbChange() throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
import java.sql.SQLException;
import java.util.Map;

import org.apache.hive.service.cli.operation.hplsql.BeelineConsole;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.processors.CommandProcessor;
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.hplsql.Conf;
import org.apache.hive.hplsql.Exec;
import org.apache.hive.hplsql.HplSqlSessionState;
import org.apache.hive.hplsql.ResultListener;
import org.apache.hive.hplsql.udf.Udf;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.OperationType;
import org.apache.hive.service.cli.operation.hplsql.BeelineConsole;
Expand Down Expand Up @@ -71,6 +76,7 @@ public static ExecuteStatementOperation newExecuteStatementOperation(HiveSession
new HiveHplSqlSessionState(SessionState.get())
);
interpreter.init();
registerUdf();
SessionState.get().addDynamicVar(interpreter);
}
return new HplSqlOperation(parentSession, statement, confOverlay, runAsync, SessionState.get().getDynamicVar(Exec.class));
Expand All @@ -91,6 +97,16 @@ public static ExecuteStatementOperation newExecuteStatementOperation(HiveSession
return new HiveCommandOperation(parentSession, cleanStatement, processor, confOverlay);
}

private static void registerUdf() throws HiveSQLException {
try {
if (FunctionRegistry.getTemporaryFunctionInfo(Udf.NAME) == null) {
FunctionRegistry.registerTemporaryUDF(Udf.NAME, org.apache.hive.hplsql.udf.Udf.class);
}
} catch (SemanticException e) {
throw new HiveSQLException(e);
}
}

private static boolean proceduralMode(Map<String, String> confOverlay) {
return confOverlay != null && !HPLSQL.equals(confOverlay.get(QUERY_EXECUTOR));
}
Expand Down

0 comments on commit 17c0a7e

Please sign in to comment.