From 19c97f9fad8ab71bf206a6fa5c4eed61e2f54774 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 9 Jul 2019 10:28:48 +0800 Subject: [PATCH 1/6] Add SparkGetFunctionsOperation --- .../SparkGetFunctionsOperation.scala | 117 ++++++++++++++++++ .../server/SparkSQLOperationManager.scala | 15 +++ .../SparkMetadataOperationSuite.scala | 10 ++ .../cli/operation/GetFunctionsOperation.java | 2 +- .../cli/operation/GetFunctionsOperation.java | 2 +- 5 files changed, 144 insertions(+), 2 deletions(-) create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala new file mode 100644 index 0000000000000..bbdefcf6e3766 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import java.util.UUID + +import scala.collection.JavaConverters.seqAsJavaListConverter + +import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObjectUtils} +import org.apache.hive.service.cli._ +import org.apache.hive.service.cli.operation.GetFunctionsOperation +import org.apache.hive.service.cli.session.HiveSession +import org.apache.thrift.TException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.util.{Utils => SparkUtils} + +/** + * Spark's own GetTablesOperation + * + * @param sqlContext SQLContext to use + * @param parentSession a HiveSession from SessionManager + * @param catalogName catalog name. null if not applicable + * @param schemaName database name, null or a concrete database name + * @param functionName function name pattern + */ +private[hive] class SparkGetFunctionsOperation( + sqlContext: SQLContext, + parentSession: HiveSession, + catalogName: String, + schemaName: String, + functionName: String) + extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName) + with Logging { + + override def runInternal(): Unit = { + val statementId = UUID.randomUUID().toString + // Do not change cmdStr. It's used for Hive auditing and authorization. + val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" + val logMsg = s"Listing functions '$cmdStr'" + logInfo(s"$logMsg with $statementId") + setState(OperationState.RUNNING) + // Always use the latest class loader provided by executionHive's state. + val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + Thread.currentThread().setContextClassLoader(executionHiveClassLoader) + + val catalog = sqlContext.sessionState.catalog + + if (isAuthV2Enabled) { + // get databases for schema pattern + val schemaPattern = convertSchemaPattern(schemaName) + var matchingDbs: Seq[String] = null + try { + matchingDbs = catalog.listDatabases(schemaPattern) + } catch { + case e: TException => + setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw new HiveSQLException(e) + } + // authorize this call on the schema objects + val privObjs = + HivePrivilegeObjectUtils.getHivePrivDbObjects(seqAsJavaListConverter(matchingDbs).asJava) + authorizeMetaGets(HiveOperationType.GET_FUNCTIONS, privObjs, cmdStr) + } + + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) + + try { + val functionPattern = CLIServiceUtils.patternToRegex(functionName) + if ((null == catalogName || "".equals(catalogName)) + && (null == schemaName || "".equals(schemaName))) { + catalog.listFunctions(catalog.getCurrentDatabase, functionPattern).foreach { + case (functionIdentifier, _) => + val rowData = Array[AnyRef]( + null, // FUNCTION_CAT + null, // FUNCTION_SCHEM + functionIdentifier.funcName, // FUNCTION_NAME + "", // REMARKS + 1.asInstanceOf[AnyRef], // FUNCTION_TYPE + "") + rowSet.addRow(rowData); + } + } + setState(OperationState.FINISHED) + } catch { + case e: HiveSQLException => + setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw e + } + HiveThriftServer2.listener.onStatementFinish(statementId) + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 9b4198d7e7a77..dfcd3333742a2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -119,6 +119,21 @@ private[thriftserver] class SparkSQLOperationManager() operation } + override def newGetFunctionsOperation( + parentSession: HiveSession, + catalogName: String, + schemaName: String, + functionName: String): GetFunctionsOperation = synchronized { + val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) + require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + + " initialized or had already closed.") + val operation = new SparkGetFunctionsOperation(sqlContext, parentSession, + catalogName, schemaName, functionName) + handleToOperation.put(operation.getHandle, operation) + logDebug(s"Created GetFunctionsOperation with session=$parentSession.") + operation + } + def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = { val iterator = confMap.entrySet().iterator() while (iterator.hasNext) { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index 80a7db5405ca8..ee9db116fb572 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -182,4 +182,14 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { checkResult(metaData.getTableTypes, Seq("TABLE", "VIEW")) } } + + test("Spark's own GetFunctionsOperation(SparkGetFunctionsOperation)") { + withJdbcStatement() { statement => + val metaData = statement.getConnection.getMetaData + // Hive does not have an overlay function, we use overlay to test. + val rs = metaData.getFunctions(null, "", "overlay") + assert(rs.next()) + assert(rs.getString("FUNCTION_NAME") === "overlay") + } + } } diff --git a/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java b/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java index 5273c386b83d4..5dec8bdbf45de 100644 --- a/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java +++ b/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java @@ -63,7 +63,7 @@ public class GetFunctionsOperation extends MetadataOperation { private final String schemaName; private final String functionName; - private final RowSet rowSet; + protected final RowSet rowSet; public GetFunctionsOperation(HiveSession parentSession, String catalogName, String schemaName, String functionName) { diff --git a/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java b/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java index 6aa0c41458b53..7f906f6765fb2 100644 --- a/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java +++ b/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java @@ -63,7 +63,7 @@ public class GetFunctionsOperation extends MetadataOperation { private final String schemaName; private final String functionName; - private final RowSet rowSet; + protected final RowSet rowSet; public GetFunctionsOperation(HiveSession parentSession, String catalogName, String schemaName, String functionName) { From 47b05fd492c482627cfda2452757867c18513be8 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 25 Jul 2019 07:25:32 +0800 Subject: [PATCH 2/6] fix --- .../SparkGetFunctionsOperation.scala | 33 ++++++++----------- .../SparkMetadataOperationSuite.scala | 19 ++++++++--- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index bbdefcf6e3766..ea1c349c353a7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.thriftserver +import java.sql.DatabaseMetaData import java.util.UUID import scala.collection.JavaConverters.seqAsJavaListConverter @@ -24,8 +25,8 @@ import scala.collection.JavaConverters.seqAsJavaListConverter import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObjectUtils} import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.GetFunctionsOperation +import org.apache.hive.service.cli.operation.MetadataOperation.DEFAULT_HIVE_CATALOG import org.apache.hive.service.cli.session.HiveSession -import org.apache.thrift.TException import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext @@ -61,20 +62,15 @@ private[hive] class SparkGetFunctionsOperation( Thread.currentThread().setContextClassLoader(executionHiveClassLoader) val catalog = sqlContext.sessionState.catalog + // If no database is specified, we will return the functions in the current database. + val schemaPattern = if (null == schemaName || "".equals(schemaName)) { + catalog.getCurrentDatabase + } else { + convertSchemaPattern(schemaName) + } + val matchingDbs: Seq[String] = catalog.listDatabases(schemaPattern) if (isAuthV2Enabled) { - // get databases for schema pattern - val schemaPattern = convertSchemaPattern(schemaName) - var matchingDbs: Seq[String] = null - try { - matchingDbs = catalog.listDatabases(schemaPattern) - } catch { - case e: TException => - setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw new HiveSQLException(e) - } // authorize this call on the schema objects val privObjs = HivePrivilegeObjectUtils.getHivePrivDbObjects(seqAsJavaListConverter(matchingDbs).asJava) @@ -90,17 +86,16 @@ private[hive] class SparkGetFunctionsOperation( try { val functionPattern = CLIServiceUtils.patternToRegex(functionName) - if ((null == catalogName || "".equals(catalogName)) - && (null == schemaName || "".equals(schemaName))) { + matchingDbs.foreach { schema => catalog.listFunctions(catalog.getCurrentDatabase, functionPattern).foreach { case (functionIdentifier, _) => val rowData = Array[AnyRef]( - null, // FUNCTION_CAT - null, // FUNCTION_SCHEM + DEFAULT_HIVE_CATALOG, // FUNCTION_CAT + schema, // FUNCTION_SCHEM functionIdentifier.funcName, // FUNCTION_NAME "", // REMARKS - 1.asInstanceOf[AnyRef], // FUNCTION_TYPE - "") + DatabaseMetaData.functionResultUnknown.asInstanceOf[AnyRef], // FUNCTION_TYPE + "") // SPECIFIC_NAME rowSet.addRow(rowData); } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index ee9db116fb572..2fb1a1edc3130 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.thriftserver -import java.sql.ResultSet +import java.sql.{DatabaseMetaData, ResultSet} class SparkMetadataOperationSuite extends HiveThriftJdbcTest { @@ -184,12 +184,23 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { } test("Spark's own GetFunctionsOperation(SparkGetFunctionsOperation)") { + def checkResult(rs: ResultSet, functionName: Seq[String]): Unit = { + for (i <- functionName.indices) { + assert(rs.next()) + assert(rs.getString("FUNCTION_NAME") === functionName(i)) + assert(rs.getInt("FUNCTION_TYPE") === DatabaseMetaData.functionResultUnknown) + } + // Make sure there are no more elements + assert(!rs.next()) + } + withJdbcStatement() { statement => val metaData = statement.getConnection.getMetaData // Hive does not have an overlay function, we use overlay to test. - val rs = metaData.getFunctions(null, "", "overlay") - assert(rs.next()) - assert(rs.getString("FUNCTION_NAME") === "overlay") + checkResult(metaData.getFunctions(null, null, "overlay"), Seq("overlay")) + checkResult(metaData.getFunctions(null, null, "overla*"), Seq("overlay")) + checkResult(metaData.getFunctions(null, "default", "overla*"), Seq("overlay")) + checkResult(metaData.getFunctions(null, null, "does-not-exist*"), Seq.empty) } } } From fd4990695b4d1879f89a7d9ac64460cfd6ba2698 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 25 Jul 2019 09:36:06 +0800 Subject: [PATCH 3/6] revert --- .../SparkGetFunctionsOperation.scala | 30 +++++++++++-------- .../SparkMetadataOperationSuite.scala | 3 +- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index ea1c349c353a7..435c358417959 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -25,8 +25,8 @@ import scala.collection.JavaConverters.seqAsJavaListConverter import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObjectUtils} import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.GetFunctionsOperation -import org.apache.hive.service.cli.operation.MetadataOperation.DEFAULT_HIVE_CATALOG import org.apache.hive.service.cli.session.HiveSession +import org.apache.thrift.TException import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext @@ -62,15 +62,20 @@ private[hive] class SparkGetFunctionsOperation( Thread.currentThread().setContextClassLoader(executionHiveClassLoader) val catalog = sqlContext.sessionState.catalog - // If no database is specified, we will return the functions in the current database. - val schemaPattern = if (null == schemaName || "".equals(schemaName)) { - catalog.getCurrentDatabase - } else { - convertSchemaPattern(schemaName) - } - val matchingDbs: Seq[String] = catalog.listDatabases(schemaPattern) if (isAuthV2Enabled) { + // get databases for schema pattern + val schemaPattern = convertSchemaPattern(schemaName) + var matchingDbs: Seq[String] = null + try { + matchingDbs = catalog.listDatabases(schemaPattern) + } catch { + case e: TException => + setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw new HiveSQLException(e) + } // authorize this call on the schema objects val privObjs = HivePrivilegeObjectUtils.getHivePrivDbObjects(seqAsJavaListConverter(matchingDbs).asJava) @@ -86,16 +91,17 @@ private[hive] class SparkGetFunctionsOperation( try { val functionPattern = CLIServiceUtils.patternToRegex(functionName) - matchingDbs.foreach { schema => + if ((null == catalogName || "".equals(catalogName)) + && (null == schemaName || "".equals(schemaName))) { catalog.listFunctions(catalog.getCurrentDatabase, functionPattern).foreach { case (functionIdentifier, _) => val rowData = Array[AnyRef]( - DEFAULT_HIVE_CATALOG, // FUNCTION_CAT - schema, // FUNCTION_SCHEM + null, // FUNCTION_CAT + null, // FUNCTION_SCHEM functionIdentifier.funcName, // FUNCTION_NAME "", // REMARKS DatabaseMetaData.functionResultUnknown.asInstanceOf[AnyRef], // FUNCTION_TYPE - "") // SPECIFIC_NAME + "") rowSet.addRow(rowData); } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index 2fb1a1edc3130..d9cdf621aeca5 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -199,8 +199,9 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { // Hive does not have an overlay function, we use overlay to test. checkResult(metaData.getFunctions(null, null, "overlay"), Seq("overlay")) checkResult(metaData.getFunctions(null, null, "overla*"), Seq("overlay")) - checkResult(metaData.getFunctions(null, "default", "overla*"), Seq("overlay")) + checkResult(metaData.getFunctions(null, "", "overla*"), Seq("overlay")) checkResult(metaData.getFunctions(null, null, "does-not-exist*"), Seq.empty) + checkResult(metaData.getFunctions(null, "default", "overlay"), Seq.empty) } } } From ee85a38e6bad5d18a3d55cef111be32f0f78b483 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 29 Jul 2019 20:24:37 +0800 Subject: [PATCH 4/6] fix --- .../SparkGetFunctionsOperation.scala | 32 ++++++------------- .../SparkMetadataOperationSuite.scala | 2 +- 2 files changed, 11 insertions(+), 23 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index 435c358417959..7a75f1f5674b4 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -26,14 +26,13 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationTyp import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.GetFunctionsOperation import org.apache.hive.service.cli.session.HiveSession -import org.apache.thrift.TException import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext import org.apache.spark.util.{Utils => SparkUtils} /** - * Spark's own GetTablesOperation + * Spark's own GetFunctionsOperation * * @param sqlContext SQLContext to use * @param parentSession a HiveSession from SessionManager @@ -47,14 +46,13 @@ private[hive] class SparkGetFunctionsOperation( catalogName: String, schemaName: String, functionName: String) - extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName) - with Logging { + extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName) with Logging { override def runInternal(): Unit = { val statementId = UUID.randomUUID().toString // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" - val logMsg = s"Listing functions '$cmdStr'" + val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'" logInfo(s"$logMsg with $statementId") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. @@ -62,20 +60,12 @@ private[hive] class SparkGetFunctionsOperation( Thread.currentThread().setContextClassLoader(executionHiveClassLoader) val catalog = sqlContext.sessionState.catalog + // get databases for schema pattern + val schemaPattern = convertSchemaPattern(schemaName) + val matchingDbs = catalog.listDatabases(schemaPattern) + val functionPattern = CLIServiceUtils.patternToRegex(functionName) if (isAuthV2Enabled) { - // get databases for schema pattern - val schemaPattern = convertSchemaPattern(schemaName) - var matchingDbs: Seq[String] = null - try { - matchingDbs = catalog.listDatabases(schemaPattern) - } catch { - case e: TException => - setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw new HiveSQLException(e) - } // authorize this call on the schema objects val privObjs = HivePrivilegeObjectUtils.getHivePrivDbObjects(seqAsJavaListConverter(matchingDbs).asJava) @@ -90,14 +80,12 @@ private[hive] class SparkGetFunctionsOperation( parentSession.getUsername) try { - val functionPattern = CLIServiceUtils.patternToRegex(functionName) - if ((null == catalogName || "".equals(catalogName)) - && (null == schemaName || "".equals(schemaName))) { - catalog.listFunctions(catalog.getCurrentDatabase, functionPattern).foreach { + matchingDbs.foreach { db => + catalog.listFunctions(db, functionPattern).foreach { case (functionIdentifier, _) => val rowData = Array[AnyRef]( null, // FUNCTION_CAT - null, // FUNCTION_SCHEM + db, // FUNCTION_SCHEM functionIdentifier.funcName, // FUNCTION_NAME "", // REMARKS DatabaseMetaData.functionResultUnknown.asInstanceOf[AnyRef], // FUNCTION_TYPE diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index d9cdf621aeca5..ae8674018bf7a 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -201,7 +201,7 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { checkResult(metaData.getFunctions(null, null, "overla*"), Seq("overlay")) checkResult(metaData.getFunctions(null, "", "overla*"), Seq("overlay")) checkResult(metaData.getFunctions(null, null, "does-not-exist*"), Seq.empty) - checkResult(metaData.getFunctions(null, "default", "overlay"), Seq.empty) + checkResult(metaData.getFunctions(null, "default", "overlay"), Seq("overlay")) } } } From f37d6539fc58b39c89870d8526b37558094729ef Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 31 Jul 2019 15:19:55 +0800 Subject: [PATCH 5/6] Support Usage and ClassName --- .../sql/catalyst/expressions/ExpressionInfo.java | 14 +++++++++++--- .../spark/sql/execution/command/functions.scala | 12 ++---------- .../thriftserver/SparkGetFunctionsOperation.scala | 12 +++++++----- .../thriftserver/SparkMetadataOperationSuite.scala | 5 +++++ 4 files changed, 25 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java index 0d1f6c2b4d5b3..769cf36c3df3f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java @@ -37,7 +37,7 @@ public String getClassName() { } public String getUsage() { - return usage; + return replaceFunctionName(usage); } public String getName() { @@ -45,7 +45,7 @@ public String getName() { } public String getExtended() { - return extended; + return replaceFunctionName(extended); } public String getSince() { @@ -57,7 +57,7 @@ public String getArguments() { } public String getExamples() { - return examples; + return replaceFunctionName(examples); } public String getNote() { @@ -150,4 +150,12 @@ public ExpressionInfo(String className, String db, String name, String usage, St // simply pass the `extended` as `arguments` and an empty string for `examples`. this(className, db, name, usage, extended, "", "", "", ""); } + + private String replaceFunctionName(String usage) { + if (usage == null) { + return "N/A."; + } else { + return usage.replaceAll("_FUNC_", name); + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 1f7808c2f8e80..d3b2491cd7056 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -112,14 +112,6 @@ case class DescribeFunctionCommand( schema.toAttributes } - private def replaceFunctionName(usage: String, functionName: String): String = { - if (usage == null) { - "N/A." - } else { - usage.replaceAll("_FUNC_", functionName) - } - } - override def run(sparkSession: SparkSession): Seq[Row] = { // Hard code "<>", "!=", "between", and "case" for now as there is no corresponding functions. functionName.funcName.toLowerCase(Locale.ROOT) match { @@ -148,11 +140,11 @@ case class DescribeFunctionCommand( val result = Row(s"Function: $name") :: Row(s"Class: ${info.getClassName}") :: - Row(s"Usage: ${replaceFunctionName(info.getUsage, info.getName)}") :: Nil + Row(s"Usage: ${info.getUsage}") :: Nil if (isExtended) { result :+ - Row(s"Extended Usage:${replaceFunctionName(info.getExtended, info.getName)}") + Row(s"Extended Usage:${info.getExtended}") } else { result } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index 7a75f1f5674b4..cc068dc11b7c5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters.seqAsJavaListConverter import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObjectUtils} import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.GetFunctionsOperation +import org.apache.hive.service.cli.operation.MetadataOperation.DEFAULT_HIVE_CATALOG import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging @@ -82,14 +83,15 @@ private[hive] class SparkGetFunctionsOperation( try { matchingDbs.foreach { db => catalog.listFunctions(db, functionPattern).foreach { - case (functionIdentifier, _) => + case (funcIdentifier, _) => + val info = catalog.lookupFunctionInfo(funcIdentifier) val rowData = Array[AnyRef]( - null, // FUNCTION_CAT + DEFAULT_HIVE_CATALOG, // FUNCTION_CAT db, // FUNCTION_SCHEM - functionIdentifier.funcName, // FUNCTION_NAME - "", // REMARKS + funcIdentifier.funcName, // FUNCTION_NAME + info.getUsage, // REMARKS DatabaseMetaData.functionResultUnknown.asInstanceOf[AnyRef], // FUNCTION_TYPE - "") + info.getClassName) // SPECIFIC_NAME rowSet.addRow(rowData); } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index ae8674018bf7a..37a829afe378d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -187,8 +187,11 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { def checkResult(rs: ResultSet, functionName: Seq[String]): Unit = { for (i <- functionName.indices) { assert(rs.next()) + assert(rs.getString("FUNCTION_SCHEM") === "default") assert(rs.getString("FUNCTION_NAME") === functionName(i)) + assert(rs.getString("REMARKS").startsWith(s"${functionName(i)}(")) assert(rs.getInt("FUNCTION_TYPE") === DatabaseMetaData.functionResultUnknown) + assert(rs.getString("SPECIFIC_NAME").startsWith("org.apache.spark.sql.catalyst")) } // Make sure there are no more elements assert(!rs.next()) @@ -202,6 +205,8 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { checkResult(metaData.getFunctions(null, "", "overla*"), Seq("overlay")) checkResult(metaData.getFunctions(null, null, "does-not-exist*"), Seq.empty) checkResult(metaData.getFunctions(null, "default", "overlay"), Seq("overlay")) + checkResult(metaData.getFunctions(null, "default", "shift*"), + Seq("shiftleft", "shiftright", "shiftrightunsigned")) } } } From c95db44f741b2c524090f7258adcd5414c8fa4af Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 1 Aug 2019 17:21:23 +0800 Subject: [PATCH 6/6] onOperationClosed --- .../thriftserver/SparkGetFunctionsOperation.scala | 9 ++++++++- .../thriftserver/SparkMetadataOperationSuite.scala | 14 ++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index cc068dc11b7c5..462e57300e82b 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -49,8 +49,15 @@ private[hive] class SparkGetFunctionsOperation( functionName: String) extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName) with Logging { + private var statementId: String = _ + + override def close(): Unit = { + super.close() + HiveThriftServer2.listener.onOperationClosed(statementId) + } + override def runInternal(): Unit = { - val statementId = UUID.randomUUID().toString + statementId = UUID.randomUUID().toString // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'" diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index 37a829afe378d..45fe8a89b9934 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -208,5 +208,19 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { checkResult(metaData.getFunctions(null, "default", "shift*"), Seq("shiftleft", "shiftright", "shiftrightunsigned")) } + + withJdbcStatement() { statement => + val metaData = statement.getConnection.getMetaData + val rs = metaData.getFunctions(null, "default", "upPer") + assert(rs.next()) + assert(rs.getString("FUNCTION_SCHEM") === "default") + assert(rs.getString("FUNCTION_NAME") === "upper") + assert(rs.getString("REMARKS") === + "upper(str) - Returns `str` with all characters changed to uppercase.") + assert(rs.getInt("FUNCTION_TYPE") === DatabaseMetaData.functionResultUnknown) + assert(rs.getString("SPECIFIC_NAME") === "org.apache.spark.sql.catalyst.expressions.Upper") + // Make sure there are no more elements + assert(!rs.next()) + } } }