From 69a47a113c56ff0ae452d0a927a6e9f93f0a73da Mon Sep 17 00:00:00 2001 From: ulysses Date: Tue, 16 Jun 2020 16:49:16 +0800 Subject: [PATCH 01/45] init --- .../spark/sql/catalyst/parser/SqlBase.g4 | 1 + .../sql/catalyst/catalog/SessionCatalog.scala | 10 ++++ .../sql/catalyst/parser/AstBuilder.scala | 5 ++ .../catalyst/plans/logical/statements.scala | 6 +++ .../sql/catalyst/parser/DDLParserSuite.scala | 6 +++ .../analysis/ResolveSessionCatalog.scala | 5 ++ .../sql/execution/command/functions.scala | 52 +++++++++++++++++++ .../sql/execution/command/DDLSuite.scala | 47 ++++++++++++++++- 8 files changed, 130 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 691fde8d48f94..f7fa651cac085 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -229,6 +229,7 @@ statement comment=(STRING | NULL) #commentNamespace | COMMENT ON TABLE multipartIdentifier IS comment=(STRING | NULL) #commentTable | REFRESH TABLE multipartIdentifier #refreshTable + | REFRESH FUNCTION multipartIdentifier #refreshFunction | REFRESH (STRING | .*?) #refreshResource | CACHE LAZY? TABLE multipartIdentifier (OPTIONS options=tablePropertyList)? (AS? query)? #cacheTable diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b79857cdccd22..053ba836f212c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1341,6 +1341,16 @@ class SessionCatalog( functionRegistry.registerFunction(func, info, builder) } + /** + * Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] + */ + def unregisterFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { + if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) { + throw new NoSuchFunctionException( + formatDatabaseName(name.database.getOrElse(currentDb)), name.funcName) + } + } + /** * Drop a temporary function. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 03571a740df3e..32d6ba29b8c0a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3650,6 +3650,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx.REPLACE != null) } + override def visitRefreshFunction(ctx: RefreshFunctionContext): LogicalPlan = withOrigin(ctx) { + val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier) + RefreshFunctionStatement(functionIdentifier) + } + override def visitCommentNamespace(ctx: CommentNamespaceContext): LogicalPlan = withOrigin(ctx) { val comment = ctx.comment.getType match { case SqlBaseParser.NULL => "" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index b1129e741221b..f4e531df60791 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -480,3 +480,9 @@ case class CreateFunctionStatement( isTemp: Boolean, ignoreIfExists: Boolean, replace: Boolean) extends ParsedStatement + +/** + * REFRESH FUNCTION statement, as parsed from SQL + */ +case class RefreshFunctionStatement( + functionName: Seq[String]) extends ParsedStatement diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 6499b5d8e7974..04aa0a1076b44 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2113,6 +2113,12 @@ class DDLParserSuite extends AnalysisTest { "Operation not allowed: CREATE FUNCTION with resource type 'other'") } + test("REFRESH FUNCTION") { + parseCompare("REFRESH FUNCTION c", RefreshFunctionStatement(Seq("c"))) + parseCompare("REFRESH FUNCTION b.c", RefreshFunctionStatement(Seq("b", "c"))) + parseCompare("REFRESH FUNCTION a.b.c", RefreshFunctionStatement(Seq("a", "b", "c"))) + } + private case class TableSpec( name: Seq[String], schema: Option[StructType], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index bf90875e511f8..dd7fd8a42581e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -611,6 +611,11 @@ class ResolveSessionCatalog( CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, replace) } + + case RefreshFunctionStatement(nameParts) => + val FunctionIdentifier(function, database) = + parseSessionCatalogFunctionIdentifier(nameParts, "REFRESH FUNCTION") + RefreshFunctionCommand(database, function) } // TODO: move function related v2 statements to the new framework. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 6fdc7f4a58195..4baadd6e89e13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -236,6 +236,58 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + * REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( + databaseName: Option[String], + functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh native function $functionName") + } else if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temp function $functionName") + } else { + // we only refresh the permanent function. + // there are 4 cases: + // 1. registry exists externalCatalog exists + // 2. registry exists externalCatalog not exists + // 3. registry not exists externalCatalog exists + // 4. registry not exists externalCatalog not exists + val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) + val isRegisteredFunction = catalog.isRegisteredFunction(identifier) + val isPersistentFunction = catalog.isPersistentFunction(identifier) + if (isRegisteredFunction && isPersistentFunction) { + // re-register function + catalog.unregisterFunction(identifier, true) + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) + } else if (isRegisteredFunction && !isPersistentFunction) { + // unregister function and throw NoSuchFunctionException + catalog.unregisterFunction(identifier, true) + throw new NoSuchFunctionException(identifier.database.get, functionName) + } else if (!isRegisteredFunction && isPersistentFunction) { + // register function + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) + } else { + throw new NoSuchFunctionException(identifier.database.get, functionName) + } + } + + Seq.empty[Row] + } +} + object FunctionsCommand { // operators that do not have corresponding functions. // They should be handled `DescribeFunctionCommand`, `ShowFunctionsCommand` diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e4709e469dca3..0a0b435bc5fe8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -28,8 +28,8 @@ import org.apache.spark.{SparkException, SparkFiles} import org.apache.spark.internal.config import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} -import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER @@ -3030,6 +3030,49 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + + test("REFRESH FUNCTION") { + val msg = intercept[AnalysisException] { + sql("REFRESH FUNCTION md5") + }.getMessage + assert(msg.contains("Cannot refresh native function")) + + withUserDefinedFunction("func1" -> true) { + sql("CREATE TEMPORARY FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + val msg = intercept[AnalysisException] { + sql("REFRESH FUNCTION func1") + }.getMessage + assert(msg.contains("Cannot refresh temp function")) + } + + withUserDefinedFunction("func1" -> false) { + intercept[NoSuchFunctionException] { + sql("REFRESH FUNCTION func1") + } + + val func = FunctionIdentifier("func1", Some("default")) + sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + assert(spark.sessionState.catalog.isRegisteredFunction(func) == false) + sql("REFRESH FUNCTION func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func) == true) + + spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func) == true) + intercept[NoSuchFunctionException] { + sql("REFRESH FUNCTION func1") + } + assert(spark.sessionState.catalog.isRegisteredFunction(func) == false) + + val function = CatalogFunction(func, "test.non.exists.udf", Seq.empty) + spark.sessionState.catalog.createFunction(function, false) + assert(spark.sessionState.catalog.isRegisteredFunction(func) == false) + val err = intercept[AnalysisException] { + sql("REFRESH FUNCTION func1") + }.getMessage + assert(err.contains("Can not load class")) + assert(spark.sessionState.catalog.isRegisteredFunction(func) == false) + } + } } object FakeLocalFsFileSystem { From a95dcb6f9c634dca416954d12441079f6a84e2c7 Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 17 Jun 2020 09:22:53 +0800 Subject: [PATCH 02/45] update doc --- docs/_data/menu-sql.yaml | 2 + docs/sql-ref-syntax-aux-cache-cache-table.md | 1 + docs/sql-ref-syntax-aux-cache-clear-cache.md | 1 + docs/sql-ref-syntax-aux-cache-refresh.md | 1 + .../sql-ref-syntax-aux-cache-uncache-table.md | 1 + docs/sql-ref-syntax-aux-cache.md | 3 +- docs/sql-ref-syntax-aux-refresh-function.md | 59 +++++++++++++++++++ docs/sql-ref-syntax-aux-refresh-table.md | 1 + docs/sql-ref-syntax.md | 1 + 9 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 docs/sql-ref-syntax-aux-refresh-function.md diff --git a/docs/_data/menu-sql.yaml b/docs/_data/menu-sql.yaml index 219e6809a96f0..d1df41dee787b 100644 --- a/docs/_data/menu-sql.yaml +++ b/docs/_data/menu-sql.yaml @@ -208,6 +208,8 @@ url: sql-ref-syntax-aux-cache-clear-cache.html - text: REFRESH TABLE url: sql-ref-syntax-aux-refresh-table.html + - text: REFRESH FUNCTION + url: sql-ref-syntax-aux-refresh-function.html - text: REFRESH url: sql-ref-syntax-aux-cache-refresh.html - text: DESCRIBE diff --git a/docs/sql-ref-syntax-aux-cache-cache-table.md b/docs/sql-ref-syntax-aux-cache-cache-table.md index 193e209d792b3..1b915739c94e1 100644 --- a/docs/sql-ref-syntax-aux-cache-cache-table.md +++ b/docs/sql-ref-syntax-aux-cache-cache-table.md @@ -80,3 +80,4 @@ CACHE TABLE testCache OPTIONS ('storageLevel' 'DISK_ONLY') SELECT * FROM testDat * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH TABLE](sql-ref-syntax-aux-refresh-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache-clear-cache.md b/docs/sql-ref-syntax-aux-cache-clear-cache.md index ee33e6a98296d..fba29b5ffabae 100644 --- a/docs/sql-ref-syntax-aux-cache-clear-cache.md +++ b/docs/sql-ref-syntax-aux-cache-clear-cache.md @@ -41,3 +41,4 @@ CLEAR CACHE; * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH TABLE](sql-ref-syntax-aux-refresh-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache-refresh.md b/docs/sql-ref-syntax-aux-cache-refresh.md index 82bc12da5d1ac..7924b9cccb2b1 100644 --- a/docs/sql-ref-syntax-aux-cache-refresh.md +++ b/docs/sql-ref-syntax-aux-cache-refresh.md @@ -54,3 +54,4 @@ REFRESH "hdfs://path/to/table"; * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH TABLE](sql-ref-syntax-aux-refresh-table.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache-uncache-table.md b/docs/sql-ref-syntax-aux-cache-uncache-table.md index c5a8fbbe08281..72e006d2972e5 100644 --- a/docs/sql-ref-syntax-aux-cache-uncache-table.md +++ b/docs/sql-ref-syntax-aux-cache-uncache-table.md @@ -50,3 +50,4 @@ UNCACHE TABLE t1; * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) * [REFRESH TABLE](sql-ref-syntax-aux-refresh-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache.md b/docs/sql-ref-syntax-aux-cache.md index 418b8cc3403b5..0897efe6f7e66 100644 --- a/docs/sql-ref-syntax-aux-cache.md +++ b/docs/sql-ref-syntax-aux-cache.md @@ -23,4 +23,5 @@ license: | * [UNCACHE TABLE statement](sql-ref-syntax-aux-cache-uncache-table.html) * [CLEAR CACHE statement](sql-ref-syntax-aux-cache-clear-cache.html) * [REFRESH TABLE statement](sql-ref-syntax-aux-refresh-table.html) - * [REFRESH statement](sql-ref-syntax-aux-cache-refresh.html) \ No newline at end of file + * [REFRESH statement](sql-ref-syntax-aux-cache-refresh.html) + * [REFRESH FUNCTION statement](sql-ref-syntax-aux-refresh-function.html)q \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-refresh-function.md b/docs/sql-ref-syntax-aux-refresh-function.md new file mode 100644 index 0000000000000..3722ff7b9b11f --- /dev/null +++ b/docs/sql-ref-syntax-aux-refresh-function.md @@ -0,0 +1,59 @@ +--- +layout: global +title: REFRESH FUNCTION +displayTitle: REFRESH FUNCTION +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +### Description + +`REFRESH FUNCTION` statement invalidates the cached entries, which include class name +and resource location of the given function. The invalidated cache is populated right now. + +### Syntax + +```sql +REFRESH FUNCTION function_identifier +``` + +### Parameters + +* **function_identifier** + + Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, use the current database. + + **Syntax:** `[ database_name. ] function_name` + +### Examples + +```sql +-- The cached entries of the function will be refreshed +-- The function is resolved from the current database as the function name is unqualified. +REFRESH FUNCTION func1; + +-- The cached entries of the function will be refreshed +-- The function is resolved from tempDB database as the function name is qualified. +REFRESH FUNCTION tempDB.func1; +``` + +### Related Statements + +* [CACHE TABLE](sql-ref-syntax-aux-cache-cache-table.html) +* [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) +* [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) +* [REFRESH TABLE](sql-ref-syntax-aux-refresh-table.html) +* [REFRESH](sql-ref-syntax-aux-cache-refresh.html) diff --git a/docs/sql-ref-syntax-aux-refresh-table.md b/docs/sql-ref-syntax-aux-refresh-table.md index 8d4a804f88671..c6cade8fd74ff 100644 --- a/docs/sql-ref-syntax-aux-refresh-table.md +++ b/docs/sql-ref-syntax-aux-refresh-table.md @@ -57,3 +57,4 @@ REFRESH TABLE tempDB.view1; * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax.md b/docs/sql-ref-syntax.md index d78a01fd655a2..5e5c81d81827e 100644 --- a/docs/sql-ref-syntax.md +++ b/docs/sql-ref-syntax.md @@ -83,6 +83,7 @@ Spark SQL is Apache Spark's module for working with structured data. The SQL Syn * [LIST JAR](sql-ref-syntax-aux-resource-mgmt-list-jar.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) * [REFRESH TABLE](sql-ref-syntax-aux-refresh-table.html) + * [REFRESH FUNCTION](sql-ref-syntax-aux-refresh-function.html) * [RESET](sql-ref-syntax-aux-conf-mgmt-reset.html) * [SET](sql-ref-syntax-aux-conf-mgmt-set.html) * [SHOW COLUMNS](sql-ref-syntax-aux-show-columns.html) From 3fc807e3f4ae62d516b00f7d55ea48919b039754 Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 17 Jun 2020 09:24:38 +0800 Subject: [PATCH 03/45] fix typo --- docs/sql-ref-syntax-aux-cache.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-ref-syntax-aux-cache.md b/docs/sql-ref-syntax-aux-cache.md index 0897efe6f7e66..761748f4a1ac2 100644 --- a/docs/sql-ref-syntax-aux-cache.md +++ b/docs/sql-ref-syntax-aux-cache.md @@ -24,4 +24,4 @@ license: | * [CLEAR CACHE statement](sql-ref-syntax-aux-cache-clear-cache.html) * [REFRESH TABLE statement](sql-ref-syntax-aux-refresh-table.html) * [REFRESH statement](sql-ref-syntax-aux-cache-refresh.html) - * [REFRESH FUNCTION statement](sql-ref-syntax-aux-refresh-function.html)q \ No newline at end of file + * [REFRESH FUNCTION statement](sql-ref-syntax-aux-refresh-function.html) \ No newline at end of file From b28234836139d84367b10328242d58c83ac389d0 Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 17 Jun 2020 17:17:17 +0800 Subject: [PATCH 04/45] update doc --- docs/sql-ref-syntax-aux-refresh-function.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/sql-ref-syntax-aux-refresh-function.md b/docs/sql-ref-syntax-aux-refresh-function.md index 3722ff7b9b11f..62514a7a2d86e 100644 --- a/docs/sql-ref-syntax-aux-refresh-function.md +++ b/docs/sql-ref-syntax-aux-refresh-function.md @@ -22,7 +22,8 @@ license: | ### Description `REFRESH FUNCTION` statement invalidates the cached entries, which include class name -and resource location of the given function. The invalidated cache is populated right now. +and resource location of the given function. The invalidated cache is populated right away. +Note that, refresh function only works for permanent function. Refresh native function or temporary function will cause exception. ### Syntax From f677a4a8beafd4f7aae1b40d09d60b8af08bf23c Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 17 Jun 2020 17:19:17 +0800 Subject: [PATCH 05/45] update doc again --- docs/sql-ref-syntax-aux-refresh-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-ref-syntax-aux-refresh-function.md b/docs/sql-ref-syntax-aux-refresh-function.md index 62514a7a2d86e..3d8282d6947e4 100644 --- a/docs/sql-ref-syntax-aux-refresh-function.md +++ b/docs/sql-ref-syntax-aux-refresh-function.md @@ -21,7 +21,7 @@ license: | ### Description -`REFRESH FUNCTION` statement invalidates the cached entries, which include class name +`REFRESH FUNCTION` statement invalidates the cached function entry, which include class name and resource location of the given function. The invalidated cache is populated right away. Note that, refresh function only works for permanent function. Refresh native function or temporary function will cause exception. From a6c5d8be00cab5f1413df066a3d36c7e3a1006ae Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 17 Jun 2020 18:32:50 +0800 Subject: [PATCH 06/45] use v2 command --- .../apache/spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../spark/sql/catalyst/plans/logical/statements.scala | 8 +------- .../spark/sql/catalyst/plans/logical/v2Commands.scala | 5 +++++ .../spark/sql/catalyst/parser/DDLParserSuite.scala | 9 ++++++--- .../sql/catalyst/analysis/ResolveSessionCatalog.scala | 5 +++-- 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 32d6ba29b8c0a..8c24129a7f656 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3652,7 +3652,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging override def visitRefreshFunction(ctx: RefreshFunctionContext): LogicalPlan = withOrigin(ctx) { val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier) - RefreshFunctionStatement(functionIdentifier) + RefreshFunction(UnresolvedNamespace(functionIdentifier)) } override def visitCommentNamespace(ctx: CommentNamespaceContext): LogicalPlan = withOrigin(ctx) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index f4e531df60791..be7f1ab5d3565 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -479,10 +479,4 @@ case class CreateFunctionStatement( resources: Seq[FunctionResource], isTemp: Boolean, ignoreIfExists: Boolean, - replace: Boolean) extends ParsedStatement - -/** - * REFRESH FUNCTION statement, as parsed from SQL - */ -case class RefreshFunctionStatement( - functionName: Seq[String]) extends ParsedStatement + replace: Boolean) extends ParsedStatement \ No newline at end of file diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 579157a6f2f2e..b47b44ff84a61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -516,3 +516,8 @@ case class CommentOnNamespace(child: LogicalPlan, comment: String) extends Comma case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } + +/** + * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. + */ +case class RefreshFunction(func: LogicalPlan) extends Command \ No newline at end of file diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 04aa0a1076b44..7efbc509b1c73 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2114,9 +2114,12 @@ class DDLParserSuite extends AnalysisTest { } test("REFRESH FUNCTION") { - parseCompare("REFRESH FUNCTION c", RefreshFunctionStatement(Seq("c"))) - parseCompare("REFRESH FUNCTION b.c", RefreshFunctionStatement(Seq("b", "c"))) - parseCompare("REFRESH FUNCTION a.b.c", RefreshFunctionStatement(Seq("a", "b", "c"))) + parseCompare("REFRESH FUNCTION c", + RefreshFunction(UnresolvedNamespace(Seq("c")))) + parseCompare("REFRESH FUNCTION b.c", + RefreshFunction(UnresolvedNamespace(Seq("b", "c")))) + parseCompare("REFRESH FUNCTION a.b.c", + RefreshFunction(UnresolvedNamespace(Seq("a", "b", "c")))) } private case class TableSpec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index dd7fd8a42581e..dff63337f99d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -612,9 +612,10 @@ class ResolveSessionCatalog( replace) } - case RefreshFunctionStatement(nameParts) => + case RefreshFunction(ResolvedNamespace(_, ns)) => val FunctionIdentifier(function, database) = - parseSessionCatalogFunctionIdentifier(nameParts, "REFRESH FUNCTION") + parseSessionCatalogFunctionIdentifier(ns, "REFRESH FUNCTION") + // Fallback to v1 command RefreshFunctionCommand(database, function) } From de54470e6de76b4a011d8b4960b57253f596c819 Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 17 Jun 2020 18:34:50 +0800 Subject: [PATCH 07/45] fix --- .../apache/spark/sql/catalyst/plans/logical/statements.scala | 2 +- .../apache/spark/sql/catalyst/plans/logical/v2Commands.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index be7f1ab5d3565..b1129e741221b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -479,4 +479,4 @@ case class CreateFunctionStatement( resources: Seq[FunctionResource], isTemp: Boolean, ignoreIfExists: Boolean, - replace: Boolean) extends ParsedStatement \ No newline at end of file + replace: Boolean) extends ParsedStatement diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index b47b44ff84a61..d1529cf9ab5d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -520,4 +520,4 @@ case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { /** * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. */ -case class RefreshFunction(func: LogicalPlan) extends Command \ No newline at end of file +case class RefreshFunction(func: LogicalPlan) extends Command From 9e09875bcb0d9481c9da216723dbb150312ca178 Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 17 Jun 2020 18:47:19 +0800 Subject: [PATCH 08/45] fix mistake --- .../apache/spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../spark/sql/catalyst/plans/logical/v2Commands.scala | 2 +- .../spark/sql/catalyst/parser/DDLParserSuite.scala | 9 +++------ .../sql/catalyst/analysis/ResolveSessionCatalog.scala | 4 ++-- 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8c24129a7f656..5d9b127205717 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3652,7 +3652,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging override def visitRefreshFunction(ctx: RefreshFunctionContext): LogicalPlan = withOrigin(ctx) { val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier) - RefreshFunction(UnresolvedNamespace(functionIdentifier)) + RefreshFunction(functionIdentifier) } override def visitCommentNamespace(ctx: CommentNamespaceContext): LogicalPlan = withOrigin(ctx) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index d1529cf9ab5d3..3a674b7ac073e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -520,4 +520,4 @@ case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { /** * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. */ -case class RefreshFunction(func: LogicalPlan) extends Command +case class RefreshFunction(func: Seq[String]) extends Command diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 7efbc509b1c73..5c1c5f60d6e56 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2114,12 +2114,9 @@ class DDLParserSuite extends AnalysisTest { } test("REFRESH FUNCTION") { - parseCompare("REFRESH FUNCTION c", - RefreshFunction(UnresolvedNamespace(Seq("c")))) - parseCompare("REFRESH FUNCTION b.c", - RefreshFunction(UnresolvedNamespace(Seq("b", "c")))) - parseCompare("REFRESH FUNCTION a.b.c", - RefreshFunction(UnresolvedNamespace(Seq("a", "b", "c")))) + parseCompare("REFRESH FUNCTION c", RefreshFunction(Seq("c"))) + parseCompare("REFRESH FUNCTION b.c", RefreshFunction(Seq("b", "c"))) + parseCompare("REFRESH FUNCTION a.b.c", RefreshFunction(Seq("a", "b", "c"))) } private case class TableSpec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index dff63337f99d5..7272eb8daa047 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -612,9 +612,9 @@ class ResolveSessionCatalog( replace) } - case RefreshFunction(ResolvedNamespace(_, ns)) => + case RefreshFunction(func) => val FunctionIdentifier(function, database) = - parseSessionCatalogFunctionIdentifier(ns, "REFRESH FUNCTION") + parseSessionCatalogFunctionIdentifier(func, "REFRESH FUNCTION") // Fallback to v1 command RefreshFunctionCommand(database, function) } From 63695c088384f5803eb5f0142da29f454cf59469 Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 17 Jun 2020 22:12:01 +0800 Subject: [PATCH 09/45] use v2 commnd analyze --- .../sql/catalyst/analysis/Analyzer.scala | 9 +++ .../catalyst/analysis/v2ResolutionPlans.scala | 11 ++++ .../sql/catalyst/parser/AstBuilder.scala | 2 +- .../catalyst/plans/logical/v2Commands.scala | 2 +- .../sql/connector/catalog/LookupCatalog.scala | 35 +++++++++++- .../sql/catalyst/parser/DDLParserSuite.scala | 11 ++-- .../analysis/ResolveSessionCatalog.scala | 55 ++++--------------- .../sql/connector/DataSourceV2SQLSuite.scala | 21 +++++-- 8 files changed, 91 insertions(+), 55 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9c97e1e9b441b..65170fb2529c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -217,6 +217,7 @@ class Analyzer( ResolveInsertInto :: ResolveRelations :: ResolveTables :: + ResolveFunc(catalogManager) :: ResolveReferences :: ResolveCreateNamedStruct :: ResolveDeserializer :: @@ -834,6 +835,14 @@ class Analyzer( } } + case class ResolveFunc(catalogManager: CatalogManager) + extends Rule[LogicalPlan] with LookupCatalog { + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case UnresolvedFunc(CatalogAndFunctionIdentifier(catalog, identifier)) => + ResolvedFunc(catalog, identifier) + } + } + private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index f3d40c6d36cc3..0d2dbb252e136 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, SupportsNamespaces, Table, TableCatalog} @@ -50,6 +51,11 @@ case class UnresolvedTableOrView(multipartIdentifier: Seq[String]) extends LeafN override def output: Seq[Attribute] = Nil } +case class UnresolvedFunc(multipartIdentifier: Seq[String]) extends LeafNode { + override lazy val resolved: Boolean = false + override def output: Seq[Attribute] = Nil +} + /** * A plan containing resolved namespace. */ @@ -74,3 +80,8 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T case class ResolvedView(identifier: Identifier) extends LeafNode { override def output: Seq[Attribute] = Nil } + +case class ResolvedFunc(catalog: CatalogPlugin, functionIdentifier: FunctionIdentifier) + extends LeafNode { + override def output: Seq[Attribute] = Nil +} \ No newline at end of file diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 5d9b127205717..b9507da8149f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3652,7 +3652,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging override def visitRefreshFunction(ctx: RefreshFunctionContext): LogicalPlan = withOrigin(ctx) { val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier) - RefreshFunction(functionIdentifier) + RefreshFunction(UnresolvedFunc(functionIdentifier)) } override def visitCommentNamespace(ctx: CommentNamespaceContext): LogicalPlan = withOrigin(ctx) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 3a674b7ac073e..d1529cf9ab5d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -520,4 +520,4 @@ case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { /** * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. */ -case class RefreshFunction(func: Seq[String]) extends Command +case class RefreshFunction(func: LogicalPlan) extends Command diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index 10c15747ec4ce..4724ad3c1b8af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.catalog import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} /** @@ -155,4 +155,37 @@ private[sql] trait LookupCatalog extends Logging { None } } + + /** + * Extract catalog and function identifier from a multi-part name with the current catalog if + * needed. + * + * Note that: now function is only supported in v1 catalog. + */ + object CatalogAndFunctionIdentifier { + def unapply(nameParts: Seq[String]): Some[(CatalogPlugin, FunctionIdentifier)] = { + + if (nameParts.length == 1 && catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) { + return Some(currentCatalog, FunctionIdentifier(nameParts.head)) + } + + nameParts match { + case SessionCatalogAndIdentifier(catalog, ident) => + if (nameParts.length == 1) { + // If there is only one name part, it means the current catalog is the session catalog. + // Here we don't fill the default database, to keep the error message unchanged for + // v1 commands. + Some(catalog, FunctionIdentifier(nameParts.head, None)) + } else { + ident.namespace match { + case Array(db) => Some(catalog, FunctionIdentifier(ident.name, Some(db))) + case _ => + throw new AnalysisException(s"Unsupported function name '$ident'") + } + } + + case _ => throw new AnalysisException(s"Function command is only supported in v1 catalog") + } + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 5c1c5f60d6e56..d5cfa6742aeda 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, FunctionResourceType, JarResource} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} import org.apache.spark.sql.catalyst.plans.logical._ @@ -2114,9 +2114,12 @@ class DDLParserSuite extends AnalysisTest { } test("REFRESH FUNCTION") { - parseCompare("REFRESH FUNCTION c", RefreshFunction(Seq("c"))) - parseCompare("REFRESH FUNCTION b.c", RefreshFunction(Seq("b", "c"))) - parseCompare("REFRESH FUNCTION a.b.c", RefreshFunction(Seq("a", "b", "c"))) + parseCompare("REFRESH FUNCTION c", + RefreshFunction(UnresolvedFunc(Seq("c")))) + parseCompare("REFRESH FUNCTION b.c", + RefreshFunction(UnresolvedFunc(Seq("b", "c")))) + parseCompare("REFRESH FUNCTION a.b.c", + RefreshFunction(UnresolvedFunc(Seq("a", "b", "c")))) } private case class TableSpec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 7272eb8daa047..781e551c277a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -566,24 +566,19 @@ class ResolveSessionCatalog( case ShowTableProperties(r: ResolvedView, propertyKey) => ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey) - case DescribeFunctionStatement(nameParts, extended) => - val functionIdent = - parseSessionCatalogFunctionIdentifier(nameParts, "DESCRIBE FUNCTION") + case DescribeFunctionStatement(CatalogAndFunctionIdentifier(_, functionIdent), extended) => DescribeFunctionCommand(functionIdent, extended) case ShowFunctionsStatement(userScope, systemScope, pattern, fun) => val (database, function) = fun match { - case Some(nameParts) => - val FunctionIdentifier(fn, db) = - parseSessionCatalogFunctionIdentifier(nameParts, "SHOW FUNCTIONS") + case Some(CatalogAndFunctionIdentifier(_, FunctionIdentifier(fn, db))) => (db, Some(fn)) case None => (None, pattern) } ShowFunctionsCommand(database, function, userScope, systemScope) - case DropFunctionStatement(nameParts, ifExists, isTemp) => - val FunctionIdentifier(function, database) = - parseSessionCatalogFunctionIdentifier(nameParts, "DROP FUNCTION") + case DropFunctionStatement( + CatalogAndFunctionIdentifier(_, FunctionIdentifier(function, database)), ifExists, isTemp) => DropFunctionCommand(database, function, ifExists, isTemp) case CreateFunctionStatement(nameParts, @@ -606,44 +601,16 @@ class ResolveSessionCatalog( ignoreIfExists, replace) } else { - val FunctionIdentifier(function, database) = - parseSessionCatalogFunctionIdentifier(nameParts, "CREATE FUNCTION") - CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, - replace) + nameParts match { + case CatalogAndFunctionIdentifier(_, FunctionIdentifier(function, database)) => + CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, + replace) + } } - case RefreshFunction(func) => - val FunctionIdentifier(function, database) = - parseSessionCatalogFunctionIdentifier(func, "REFRESH FUNCTION") + case RefreshFunction(ResolvedFunc(_, func)) => // Fallback to v1 command - RefreshFunctionCommand(database, function) - } - - // TODO: move function related v2 statements to the new framework. - private def parseSessionCatalogFunctionIdentifier( - nameParts: Seq[String], - sql: String): FunctionIdentifier = { - if (nameParts.length == 1 && isTempFunction(nameParts.head)) { - return FunctionIdentifier(nameParts.head) - } - - nameParts match { - case SessionCatalogAndIdentifier(_, ident) => - if (nameParts.length == 1) { - // If there is only one name part, it means the current catalog is the session catalog. - // Here we don't fill the default database, to keep the error message unchanged for - // v1 commands. - FunctionIdentifier(nameParts.head, None) - } else { - ident.namespace match { - case Array(db) => FunctionIdentifier(ident.name, Some(db)) - case _ => - throw new AnalysisException(s"Unsupported function name '$ident'") - } - } - - case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog") - } + RefreshFunctionCommand(func.database, func.funcName) } private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 8462ce5a6c44f..a0a02b41f821d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2166,7 +2166,7 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { sql("DESCRIBE FUNCTION testcat.ns1.ns2.fun") } - assert(e.message.contains("DESCRIBE FUNCTION is only supported in v1 catalog")) + assert(e.message.contains("Function command is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("DESCRIBE FUNCTION default.ns1.ns2.fun") @@ -2181,14 +2181,14 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { sql(s"SHOW FUNCTIONS LIKE $function") } - assert(e.message.contains("SHOW FUNCTIONS is only supported in v1 catalog")) + assert(e.message.contains("Function command is only supported in v1 catalog")) } test("DROP FUNCTION: only support session catalog") { val e = intercept[AnalysisException] { sql("DROP FUNCTION testcat.ns1.ns2.fun") } - assert(e.message.contains("DROP FUNCTION is only supported in v1 catalog")) + assert(e.message.contains("Function command is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("DROP FUNCTION default.ns1.ns2.fun") @@ -2201,7 +2201,7 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { sql("CREATE FUNCTION testcat.ns1.ns2.fun as 'f'") } - assert(e.message.contains("CREATE FUNCTION is only supported in v1 catalog")) + assert(e.message.contains("Function command is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("CREATE FUNCTION default.ns1.ns2.fun as 'f'") @@ -2210,6 +2210,19 @@ class DataSourceV2SQLSuite "The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun")) } + test("REFRESH FUNTION: only support session catalog") { + val e = intercept[AnalysisException] { + sql("REFRESH FUNCTION testcat.ns1.ns2.fun as 'f'") + } + assert(e.message.contains("Function command is only supported in v1 catalog")) + + val e1 = intercept[AnalysisException] { + sql("REFRESH FUNCTION default.ns1.ns2.fun as 'f'") + } + assert(e1.message.contains( + "The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun")) + } + test("global temp view should not be masked by v2 catalog") { val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName) From 9e9d5cea1249a264729045c982414739dd761fc8 Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 17 Jun 2020 22:14:19 +0800 Subject: [PATCH 10/45] add line --- .../apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index 0d2dbb252e136..4f92fbe0e7abf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -84,4 +84,4 @@ case class ResolvedView(identifier: Identifier) extends LeafNode { case class ResolvedFunc(catalog: CatalogPlugin, functionIdentifier: FunctionIdentifier) extends LeafNode { override def output: Seq[Attribute] = Nil -} \ No newline at end of file +} From c434821ddb1d7f335f0ede1e8b3802617b918fe2 Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 08:24:01 +0800 Subject: [PATCH 11/45] update doc --- docs/sql-ref-syntax-aux-refresh-function.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/sql-ref-syntax-aux-refresh-function.md b/docs/sql-ref-syntax-aux-refresh-function.md index 3d8282d6947e4..63a5a425e0f8b 100644 --- a/docs/sql-ref-syntax-aux-refresh-function.md +++ b/docs/sql-ref-syntax-aux-refresh-function.md @@ -21,9 +21,9 @@ license: | ### Description -`REFRESH FUNCTION` statement invalidates the cached function entry, which include class name +`REFRESH FUNCTION` statement invalidates the cached function entry, which includes class name and resource location of the given function. The invalidated cache is populated right away. -Note that, refresh function only works for permanent function. Refresh native function or temporary function will cause exception. +Note that REFRESH FUNCTION only works for permanent functions. Refreshing native functions or temporary functions will cause an exception. ### Syntax @@ -35,7 +35,7 @@ REFRESH FUNCTION function_identifier * **function_identifier** - Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, use the current database. + Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, uses the current database. **Syntax:** `[ database_name. ] function_name` From 35fd44b708e09d05a392dd13eae6807c7f1e431f Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 08:48:54 +0800 Subject: [PATCH 12/45] update doc --- docs/sql-ref-syntax-aux-refresh-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-ref-syntax-aux-refresh-function.md b/docs/sql-ref-syntax-aux-refresh-function.md index 63a5a425e0f8b..6c5a6d87eea53 100644 --- a/docs/sql-ref-syntax-aux-refresh-function.md +++ b/docs/sql-ref-syntax-aux-refresh-function.md @@ -23,7 +23,7 @@ license: | `REFRESH FUNCTION` statement invalidates the cached function entry, which includes class name and resource location of the given function. The invalidated cache is populated right away. -Note that REFRESH FUNCTION only works for permanent functions. Refreshing native functions or temporary functions will cause an exception. +Note that `REFRESH FUNCTION` only works for permanent functions. Refreshing native functions or temporary functions will cause an exception. ### Syntax From f83fd8b8b2f0badc97c8e85c5707b2efc9763952 Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 09:02:32 +0800 Subject: [PATCH 13/45] fix child --- .../apache/spark/sql/catalyst/plans/logical/v2Commands.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index d1529cf9ab5d3..52d4acd90f43e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -520,4 +520,4 @@ case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { /** * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. */ -case class RefreshFunction(func: LogicalPlan) extends Command +case class RefreshFunction(child: LogicalPlan) extends Command From e4449430fecd029606c4d48a0d8c82c887704649 Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 10:39:33 +0800 Subject: [PATCH 14/45] fix children --- .../apache/spark/sql/catalyst/plans/logical/v2Commands.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 52d4acd90f43e..031bb35ef0a56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -520,4 +520,6 @@ case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { /** * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. */ -case class RefreshFunction(child: LogicalPlan) extends Command +case class RefreshFunction(child: LogicalPlan) extends Command { + override def children: Seq[LogicalPlan] = child :: Nil +} From afd510b126758628b160c7a73218c94c016c03ae Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 15:01:52 +0800 Subject: [PATCH 15/45] add comment --- .../spark/sql/catalyst/analysis/v2ResolutionPlans.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index 4f92fbe0e7abf..c1788501a37cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -51,6 +51,10 @@ case class UnresolvedTableOrView(multipartIdentifier: Seq[String]) extends LeafN override def output: Seq[Attribute] = Nil } +/** + * Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to + * [[ResolvedFunc]] during analysis. + */ case class UnresolvedFunc(multipartIdentifier: Seq[String]) extends LeafNode { override lazy val resolved: Boolean = false override def output: Seq[Attribute] = Nil From 1241bde8b815089e674b471e6047b40fff43dbec Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 15:03:47 +0800 Subject: [PATCH 16/45] fix copy error --- .../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index a0a02b41f821d..d1cf7ad82af05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2212,12 +2212,12 @@ class DataSourceV2SQLSuite test("REFRESH FUNTION: only support session catalog") { val e = intercept[AnalysisException] { - sql("REFRESH FUNCTION testcat.ns1.ns2.fun as 'f'") + sql("REFRESH FUNCTION testcat.ns1.ns2.fun") } assert(e.message.contains("Function command is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { - sql("REFRESH FUNCTION default.ns1.ns2.fun as 'f'") + sql("REFRESH FUNCTION default.ns1.ns2.fun") } assert(e1.message.contains( "The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun")) From 93f5d712d10e2f7e9816fab0ae02c4173eb373ed Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 15:22:16 +0800 Subject: [PATCH 17/45] update doc --- docs/sql-ref-syntax-aux-refresh-function.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/sql-ref-syntax-aux-refresh-function.md b/docs/sql-ref-syntax-aux-refresh-function.md index 6c5a6d87eea53..0039b56fa6a8c 100644 --- a/docs/sql-ref-syntax-aux-refresh-function.md +++ b/docs/sql-ref-syntax-aux-refresh-function.md @@ -21,7 +21,7 @@ license: | ### Description -`REFRESH FUNCTION` statement invalidates the cached function entry, which includes class name +`REFRESH FUNCTION` statement invalidates the cached function entry, which includes a class name and resource location of the given function. The invalidated cache is populated right away. Note that `REFRESH FUNCTION` only works for permanent functions. Refreshing native functions or temporary functions will cause an exception. @@ -42,11 +42,11 @@ REFRESH FUNCTION function_identifier ### Examples ```sql --- The cached entries of the function will be refreshed +-- The cached entry of the function will be refreshed -- The function is resolved from the current database as the function name is unqualified. REFRESH FUNCTION func1; --- The cached entries of the function will be refreshed +-- The cached entry of the function will be refreshed -- The function is resolved from tempDB database as the function name is qualified. REFRESH FUNCTION tempDB.func1; ``` From dc684b506848bb73f6adb435d56b868c6655fdc3 Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 15:23:01 +0800 Subject: [PATCH 18/45] update comment --- .../apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index c1788501a37cd..5d309d92cf2e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -52,7 +52,7 @@ case class UnresolvedTableOrView(multipartIdentifier: Seq[String]) extends LeafN } /** - * Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to + * Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to * [[ResolvedFunc]] during analysis. */ case class UnresolvedFunc(multipartIdentifier: Seq[String]) extends LeafNode { From 0ea7dd6ed6a15f4b2e01bc6d37744f816c8db911 Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 15:28:06 +0800 Subject: [PATCH 19/45] fix LookupCatalog --- .../apache/spark/sql/connector/catalog/LookupCatalog.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index 4724ad3c1b8af..2656a0c237c13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -160,10 +160,10 @@ private[sql] trait LookupCatalog extends Logging { * Extract catalog and function identifier from a multi-part name with the current catalog if * needed. * - * Note that: now function is only supported in v1 catalog. + * Note that: function is only supported in v1 catalog. */ object CatalogAndFunctionIdentifier { - def unapply(nameParts: Seq[String]): Some[(CatalogPlugin, FunctionIdentifier)] = { + def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, FunctionIdentifier)] = { if (nameParts.length == 1 && catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) { return Some(currentCatalog, FunctionIdentifier(nameParts.head)) @@ -184,7 +184,7 @@ private[sql] trait LookupCatalog extends Logging { } } - case _ => throw new AnalysisException(s"Function command is only supported in v1 catalog") + case _ => throw new AnalysisException("Function command is only supported in v1 catalog") } } } From 643969c19388133fde03ede8dc7f0eca02bf91d6 Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 15:42:58 +0800 Subject: [PATCH 20/45] merge to ResolveFunctions --- .../spark/sql/catalyst/analysis/Analyzer.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 65170fb2529c7..efddd145f0ccc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -217,7 +217,6 @@ class Analyzer( ResolveInsertInto :: ResolveRelations :: ResolveTables :: - ResolveFunc(catalogManager) :: ResolveReferences :: ResolveCreateNamedStruct :: ResolveDeserializer :: @@ -835,14 +834,6 @@ class Analyzer( } } - case class ResolveFunc(catalogManager: CatalogManager) - extends Rule[LogicalPlan] with LookupCatalog { - def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case UnresolvedFunc(CatalogAndFunctionIdentifier(catalog, identifier)) => - ResolvedFunc(catalog, identifier) - } - } - private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty /** @@ -1899,6 +1890,9 @@ class Analyzer( object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case UnresolvedFunc(CatalogAndFunctionIdentifier(catalog, identifier)) => + ResolvedFunc(catalog, identifier) + case q: LogicalPlan => q transformExpressions { case u if !u.childrenResolved => u // Skip until children are resolved. From 6cb2edd60e1a74ee7f0464d816d97b72d4a20ef3 Mon Sep 17 00:00:00 2001 From: ulysses Date: Fri, 19 Jun 2020 14:33:14 +0800 Subject: [PATCH 21/45] remove ignoreIfNotExists --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 4 ++-- .../org/apache/spark/sql/execution/command/functions.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 053ba836f212c..6f6bd62ff8707 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1344,8 +1344,8 @@ class SessionCatalog( /** * Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] */ - def unregisterFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { - if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) { + def unregisterFunction(name: FunctionIdentifier): Unit = { + if (!functionRegistry.dropFunction(name)) { throw new NoSuchFunctionException( formatDatabaseName(name.database.getOrElse(currentDb)), name.funcName) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 4baadd6e89e13..364262442d0a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -268,12 +268,12 @@ case class RefreshFunctionCommand( val isPersistentFunction = catalog.isPersistentFunction(identifier) if (isRegisteredFunction && isPersistentFunction) { // re-register function - catalog.unregisterFunction(identifier, true) + catalog.unregisterFunction(identifier) val func = catalog.getFunctionMetadata(identifier) catalog.registerFunction(func, true) } else if (isRegisteredFunction && !isPersistentFunction) { // unregister function and throw NoSuchFunctionException - catalog.unregisterFunction(identifier, true) + catalog.unregisterFunction(identifier) throw new NoSuchFunctionException(identifier.database.get, functionName) } else if (!isRegisteredFunction && isPersistentFunction) { // register function From cffc207aa31083aea644fb9d9179793f75a9842c Mon Sep 17 00:00:00 2001 From: ulysses Date: Mon, 22 Jun 2020 13:17:44 +0800 Subject: [PATCH 22/45] fix ut --- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index d1cf7ad82af05..a9e054f67cbf8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2166,7 +2166,7 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { sql("DESCRIBE FUNCTION testcat.ns1.ns2.fun") } - assert(e.message.contains("Function command is only supported in v1 catalog")) + assert(e.message.contains("DESCRIBE FUNCTION is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("DESCRIBE FUNCTION default.ns1.ns2.fun") @@ -2181,14 +2181,14 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { sql(s"SHOW FUNCTIONS LIKE $function") } - assert(e.message.contains("Function command is only supported in v1 catalog")) + assert(e.message.contains("SHOW FUNCTIONS is only supported in v1 catalog")) } test("DROP FUNCTION: only support session catalog") { val e = intercept[AnalysisException] { sql("DROP FUNCTION testcat.ns1.ns2.fun") } - assert(e.message.contains("Function command is only supported in v1 catalog")) + assert(e.message.contains("DROP FUNCTION is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("DROP FUNCTION default.ns1.ns2.fun") @@ -2201,7 +2201,7 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { sql("CREATE FUNCTION testcat.ns1.ns2.fun as 'f'") } - assert(e.message.contains("Function command is only supported in v1 catalog")) + assert(e.message.contains("CREATE FUNCTION is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("CREATE FUNCTION default.ns1.ns2.fun as 'f'") @@ -2210,11 +2210,11 @@ class DataSourceV2SQLSuite "The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun")) } - test("REFRESH FUNTION: only support session catalog") { + test("REFRESH FUNCTION: only support session catalog") { val e = intercept[AnalysisException] { sql("REFRESH FUNCTION testcat.ns1.ns2.fun") } - assert(e.message.contains("Function command is only supported in v1 catalog")) + assert(e.message.contains("REFRESH FUNCTION is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("REFRESH FUNCTION default.ns1.ns2.fun") From 4b6408de32406482a9a7b0d4626bd9e09b25d355 Mon Sep 17 00:00:00 2001 From: ulysses Date: Mon, 22 Jun 2020 13:30:13 +0800 Subject: [PATCH 23/45] fix resolve --- .../sql/catalyst/analysis/Analyzer.scala | 5 +- .../sql/connector/catalog/LookupCatalog.scala | 48 ++++++++----------- .../analysis/ResolveSessionCatalog.scala | 22 +++++---- 3 files changed, 37 insertions(+), 38 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index efddd145f0ccc..eda2bc3edd27d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1890,8 +1890,9 @@ class Analyzer( object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case UnresolvedFunc(CatalogAndFunctionIdentifier(catalog, identifier)) => - ResolvedFunc(catalog, identifier) + case RefreshFunction(UnresolvedFunc(multipartIdent)) => + val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, "REFRESH FUNCTION") + ResolveFunctions(ResolvedFunc(currentCatalog, funcIdent)) case q: LogicalPlan => q transformExpressions { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index 2656a0c237c13..b84bf3e2786bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -156,36 +156,30 @@ private[sql] trait LookupCatalog extends Logging { } } - /** - * Extract catalog and function identifier from a multi-part name with the current catalog if - * needed. - * - * Note that: function is only supported in v1 catalog. - */ - object CatalogAndFunctionIdentifier { - def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, FunctionIdentifier)] = { - - if (nameParts.length == 1 && catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) { - return Some(currentCatalog, FunctionIdentifier(nameParts.head)) - } + // TODO: move function related v2 statements to the new framework. + def parseSessionCatalogFunctionIdentifier( + nameParts: Seq[String], + sql: String): FunctionIdentifier = { + if (nameParts.length == 1 && catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) { + return FunctionIdentifier(nameParts.head) + } - nameParts match { - case SessionCatalogAndIdentifier(catalog, ident) => - if (nameParts.length == 1) { - // If there is only one name part, it means the current catalog is the session catalog. - // Here we don't fill the default database, to keep the error message unchanged for - // v1 commands. - Some(catalog, FunctionIdentifier(nameParts.head, None)) - } else { - ident.namespace match { - case Array(db) => Some(catalog, FunctionIdentifier(ident.name, Some(db))) - case _ => - throw new AnalysisException(s"Unsupported function name '$ident'") - } + nameParts match { + case SessionCatalogAndIdentifier(_, ident) => + if (nameParts.length == 1) { + // If there is only one name part, it means the current catalog is the session catalog. + // Here we don't fill the default database, to keep the error message unchanged for + // v1 commands. + FunctionIdentifier(nameParts.head, None) + } else { + ident.namespace match { + case Array(db) => FunctionIdentifier(ident.name, Some(db)) + case _ => + throw new AnalysisException(s"Unsupported function name '$ident'") } + } - case _ => throw new AnalysisException("Function command is only supported in v1 catalog") - } + case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 781e551c277a1..9a6efb3776568 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -566,19 +566,24 @@ class ResolveSessionCatalog( case ShowTableProperties(r: ResolvedView, propertyKey) => ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey) - case DescribeFunctionStatement(CatalogAndFunctionIdentifier(_, functionIdent), extended) => + case DescribeFunctionStatement(nameParts, extended) => + val functionIdent = + parseSessionCatalogFunctionIdentifier(nameParts, "DESCRIBE FUNCTION") DescribeFunctionCommand(functionIdent, extended) case ShowFunctionsStatement(userScope, systemScope, pattern, fun) => val (database, function) = fun match { - case Some(CatalogAndFunctionIdentifier(_, FunctionIdentifier(fn, db))) => + case Some(nameParts) => + val FunctionIdentifier(fn, db) = + parseSessionCatalogFunctionIdentifier(nameParts, "SHOW FUNCTIONS") (db, Some(fn)) case None => (None, pattern) } ShowFunctionsCommand(database, function, userScope, systemScope) - case DropFunctionStatement( - CatalogAndFunctionIdentifier(_, FunctionIdentifier(function, database)), ifExists, isTemp) => + case DropFunctionStatement(nameParts, ifExists, isTemp) => + val FunctionIdentifier(function, database) = + parseSessionCatalogFunctionIdentifier(nameParts, "DROP FUNCTION") DropFunctionCommand(database, function, ifExists, isTemp) case CreateFunctionStatement(nameParts, @@ -601,11 +606,10 @@ class ResolveSessionCatalog( ignoreIfExists, replace) } else { - nameParts match { - case CatalogAndFunctionIdentifier(_, FunctionIdentifier(function, database)) => - CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, - replace) - } + val FunctionIdentifier(function, database) = + parseSessionCatalogFunctionIdentifier(nameParts, "CREATE FUNCTION") + CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, + replace) } case RefreshFunction(ResolvedFunc(_, func)) => From 5d5fe71f61d596062545a7e9cc0efd023840307f Mon Sep 17 00:00:00 2001 From: ulysses Date: Mon, 22 Jun 2020 13:32:43 +0800 Subject: [PATCH 24/45] brush functions --- .../sql/execution/command/functions.scala | 59 ++++++++++--------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 364262442d0a9..6953640141f9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -252,36 +252,37 @@ case class RefreshFunctionCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { - throw new AnalysisException(s"Cannot refresh native function $functionName") - } else if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { - throw new AnalysisException(s"Cannot refresh temp function $functionName") + throw new AnalysisException(s"Cannot refresh builtin function $functionName") + } + if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") + } + + // we only refresh the permanent function. + // there are 4 cases: + // 1. registry exists externalCatalog exists + // 2. registry exists externalCatalog not exists + // 3. registry not exists externalCatalog exists + // 4. registry not exists externalCatalog not exists + val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) + val isRegisteredFunction = catalog.isRegisteredFunction(identifier) + val isPersistentFunction = catalog.isPersistentFunction(identifier) + if (isRegisteredFunction && isPersistentFunction) { + // re-register function + catalog.unregisterFunction(identifier) + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) + } else if (isRegisteredFunction && !isPersistentFunction) { + // unregister function and throw NoSuchFunctionException + catalog.unregisterFunction(identifier) + throw new NoSuchFunctionException(identifier.database.get, functionName) + } else if (!isRegisteredFunction && isPersistentFunction) { + // register function + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) } else { - // we only refresh the permanent function. - // there are 4 cases: - // 1. registry exists externalCatalog exists - // 2. registry exists externalCatalog not exists - // 3. registry not exists externalCatalog exists - // 4. registry not exists externalCatalog not exists - val identifier = FunctionIdentifier( - functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) - val isRegisteredFunction = catalog.isRegisteredFunction(identifier) - val isPersistentFunction = catalog.isPersistentFunction(identifier) - if (isRegisteredFunction && isPersistentFunction) { - // re-register function - catalog.unregisterFunction(identifier) - val func = catalog.getFunctionMetadata(identifier) - catalog.registerFunction(func, true) - } else if (isRegisteredFunction && !isPersistentFunction) { - // unregister function and throw NoSuchFunctionException - catalog.unregisterFunction(identifier) - throw new NoSuchFunctionException(identifier.database.get, functionName) - } else if (!isRegisteredFunction && isPersistentFunction) { - // register function - val func = catalog.getFunctionMetadata(identifier) - catalog.registerFunction(func, true) - } else { - throw new NoSuchFunctionException(identifier.database.get, functionName) - } + throw new NoSuchFunctionException(identifier.database.get, functionName) } Seq.empty[Row] From 4ba345b2da195ac35c86d1575ca42b1661643242 Mon Sep 17 00:00:00 2001 From: ulysses Date: Mon, 22 Jun 2020 18:49:43 +0800 Subject: [PATCH 25/45] fix --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 8 ++++---- .../apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 2 +- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index eda2bc3edd27d..cc99872eb8c2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1885,14 +1885,14 @@ class Analyzer( } /** - * Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s. + * Replaces [[UnresolvedFunction]]s and [[UnresolvedFunc]] with concrete [[Expression]]s. */ object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case RefreshFunction(UnresolvedFunc(multipartIdent)) => - val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, "REFRESH FUNCTION") - ResolveFunctions(ResolvedFunc(currentCatalog, funcIdent)) + case UnresolvedFunc(multipartIdent) => + val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, s"${plan.nodeName}") + ResolvedFunc(currentCatalog, funcIdent) case q: LogicalPlan => q transformExpressions { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index a9e054f67cbf8..ec2e9d6ee0458 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2214,7 +2214,7 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { sql("REFRESH FUNCTION testcat.ns1.ns2.fun") } - assert(e.message.contains("REFRESH FUNCTION is only supported in v1 catalog")) + assert(e.message.contains("RefreshFunction is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("REFRESH FUNCTION default.ns1.ns2.fun") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 0a0b435bc5fe8..0ff078a7f8968 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -3035,14 +3035,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { val msg = intercept[AnalysisException] { sql("REFRESH FUNCTION md5") }.getMessage - assert(msg.contains("Cannot refresh native function")) + assert(msg.contains("Cannot refresh builtin function")) withUserDefinedFunction("func1" -> true) { sql("CREATE TEMPORARY FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") val msg = intercept[AnalysisException] { sql("REFRESH FUNCTION func1") }.getMessage - assert(msg.contains("Cannot refresh temp function")) + assert(msg.contains("Cannot refresh temporary function")) } withUserDefinedFunction("func1" -> false) { From 67653957f0a4ce6e91f84c88903c0b9462c2b040 Mon Sep 17 00:00:00 2001 From: ulysses Date: Mon, 22 Jun 2020 19:01:01 +0800 Subject: [PATCH 26/45] use catalogfunction --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../spark/sql/catalyst/analysis/v2ResolutionPlans.scala | 4 ++-- .../spark/sql/catalyst/analysis/ResolveSessionCatalog.scala | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index cc99872eb8c2e..92b34cefb7d13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1892,7 +1892,8 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case UnresolvedFunc(multipartIdent) => val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, s"${plan.nodeName}") - ResolvedFunc(currentCatalog, funcIdent) + val info = v1SessionCatalog.lookupFunctionInfo(funcIdent) + ResolvedFunc(currentCatalog, CatalogFunction(funcIdent, info.getClassName, Seq())) case q: LogicalPlan => q transformExpressions { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index 5d309d92cf2e0..e51f791d169d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogFunction import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, SupportsNamespaces, Table, TableCatalog} @@ -85,7 +85,7 @@ case class ResolvedView(identifier: Identifier) extends LeafNode { override def output: Seq[Attribute] = Nil } -case class ResolvedFunc(catalog: CatalogPlugin, functionIdentifier: FunctionIdentifier) +case class ResolvedFunc(catalog: CatalogPlugin, catalogFunction: CatalogFunction) extends LeafNode { override def output: Seq[Attribute] = Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 9a6efb3776568..d2be62e7cc83a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -614,7 +614,7 @@ class ResolveSessionCatalog( case RefreshFunction(ResolvedFunc(_, func)) => // Fallback to v1 command - RefreshFunctionCommand(func.database, func.funcName) + RefreshFunctionCommand(func.identifier.database, func.identifier.funcName) } private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match { From dc86b824bb702ec0fc37106d6eb5471313c5fc5a Mon Sep 17 00:00:00 2001 From: ulysses Date: Tue, 23 Jun 2020 16:15:24 +0800 Subject: [PATCH 27/45] fix --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 92b34cefb7d13..acfecd8a0f6a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1885,7 +1885,7 @@ class Analyzer( } /** - * Replaces [[UnresolvedFunction]]s and [[UnresolvedFunc]] with concrete [[Expression]]s. + * Replaces [[UnresolvedFunction]]s and [[UnresolvedFunc]]s with concrete [[Expression]]s. */ object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) @@ -1893,7 +1893,7 @@ class Analyzer( case UnresolvedFunc(multipartIdent) => val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, s"${plan.nodeName}") val info = v1SessionCatalog.lookupFunctionInfo(funcIdent) - ResolvedFunc(currentCatalog, CatalogFunction(funcIdent, info.getClassName, Seq())) + ResolvedFunc(currentCatalog, CatalogFunction(funcIdent, info.getClassName, Nil)) case q: LogicalPlan => q transformExpressions { From a38d6567ee6922d5d5ad82cfb137ff30fcb01722 Mon Sep 17 00:00:00 2001 From: ulysses Date: Tue, 23 Jun 2020 16:24:14 +0800 Subject: [PATCH 28/45] fix comment --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index acfecd8a0f6a8..c8a3ae9ea4b39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1885,7 +1885,8 @@ class Analyzer( } /** - * Replaces [[UnresolvedFunction]]s and [[UnresolvedFunc]]s with concrete [[Expression]]s. + * Replaces [[UnresolvedFunc]]s with concrete [[LogicalPlan]]s. + * Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s. */ object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) From cdea55b5cf63a08af3f9b704b51c3d15ebeb086b Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 24 Jun 2020 08:18:16 +0800 Subject: [PATCH 29/45] ut nit --- .../spark/sql/execution/command/DDLSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 0ff078a7f8968..faafcb721008f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -3052,25 +3052,25 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { val func = FunctionIdentifier("func1", Some("default")) sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") - assert(spark.sessionState.catalog.isRegisteredFunction(func) == false) + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) sql("REFRESH FUNCTION func1") - assert(spark.sessionState.catalog.isRegisteredFunction(func) == true) + assert(spark.sessionState.catalog.isRegisteredFunction(func)) spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1") - assert(spark.sessionState.catalog.isRegisteredFunction(func) == true) + assert(spark.sessionState.catalog.isRegisteredFunction(func)) intercept[NoSuchFunctionException] { sql("REFRESH FUNCTION func1") } - assert(spark.sessionState.catalog.isRegisteredFunction(func) == false) + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) val function = CatalogFunction(func, "test.non.exists.udf", Seq.empty) spark.sessionState.catalog.createFunction(function, false) - assert(spark.sessionState.catalog.isRegisteredFunction(func) == false) + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) val err = intercept[AnalysisException] { sql("REFRESH FUNCTION func1") }.getMessage assert(err.contains("Can not load class")) - assert(spark.sessionState.catalog.isRegisteredFunction(func) == false) + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) } } } From 5e227d752ce7397e1c8cb79e9fc482ae2c004bbf Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 24 Jun 2020 11:52:04 +0800 Subject: [PATCH 30/45] fix nit --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c8a3ae9ea4b39..6f0c3c42c42ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1892,7 +1892,7 @@ class Analyzer( val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case UnresolvedFunc(multipartIdent) => - val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, s"${plan.nodeName}") + val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, plan.nodeName) val info = v1SessionCatalog.lookupFunctionInfo(funcIdent) ResolvedFunc(currentCatalog, CatalogFunction(funcIdent, info.getClassName, Nil)) From 703ad47b84a46b0bdedfff209f42606af8bff899 Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 24 Jun 2020 17:44:22 +0800 Subject: [PATCH 31/45] nit --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6f0c3c42c42ed..0d113198d0364 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1892,7 +1892,7 @@ class Analyzer( val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case UnresolvedFunc(multipartIdent) => - val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, plan.nodeName) + val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, "function lookup") val info = v1SessionCatalog.lookupFunctionInfo(funcIdent) ResolvedFunc(currentCatalog, CatalogFunction(funcIdent, info.getClassName, Nil)) From a79f72bda7ee16fd19b7aac8b3d681320dbc56f1 Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 24 Jun 2020 18:20:24 +0800 Subject: [PATCH 32/45] update ResolvedFunc --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- .../sql/catalyst/analysis/v2ResolutionPlans.scala | 7 ++++++- .../sql/connector/catalog/CatalogV2Implicits.scala | 10 +++++++++- .../sql/catalyst/analysis/ResolveSessionCatalog.scala | 5 +++-- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 2 +- 5 files changed, 20 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0d113198d0364..c393ca439f0ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1893,8 +1893,7 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case UnresolvedFunc(multipartIdent) => val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, "function lookup") - val info = v1SessionCatalog.lookupFunctionInfo(funcIdent) - ResolvedFunc(currentCatalog, CatalogFunction(funcIdent, info.getClassName, Nil)) + ResolvedFunc(Identifier.of(funcIdent.database.toArray, funcIdent.funcName)) case q: LogicalPlan => q transformExpressions { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index e51f791d169d9..a16763f2cf943 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -85,7 +85,12 @@ case class ResolvedView(identifier: Identifier) extends LeafNode { override def output: Seq[Attribute] = Nil } -case class ResolvedFunc(catalog: CatalogPlugin, catalogFunction: CatalogFunction) +/** + * A plan containing resolved function. + */ +// TODO: create a generic representation for v1, v2 function, after we add function +// support to v2 catalog. For now we only need the identifier to fallback to v1 command. +case class ResolvedFunc(identifier: Identifier) extends LeafNode { override def output: Seq[Attribute] = Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala index d90804f4b6ff6..2ee760d4f60b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector.catalog import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.connector.expressions.{BucketTransform, IdentityTransform, LogicalExpressions, Transform} @@ -107,6 +107,14 @@ private[sql] object CatalogV2Implicits { throw new AnalysisException( s"$quoted is not a valid TableIdentifier as it has more than 2 name parts.") } + + def asFunctionIdentifier: FunctionIdentifier = ident.namespace() match { + case ns if ns.isEmpty => FunctionIdentifier(ident.name()) + case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName)) + case _ => + throw new AnalysisException( + s"$quoted is not a valid FunctionIdentifier as it has more than 2 name parts.") + } } implicit class MultipartIdentifierHelper(parts: Seq[String]) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index d2be62e7cc83a..ce3563870e243 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -612,9 +612,10 @@ class ResolveSessionCatalog( replace) } - case RefreshFunction(ResolvedFunc(_, func)) => + case RefreshFunction(ResolvedFunc(identifier)) => // Fallback to v1 command - RefreshFunctionCommand(func.identifier.database, func.identifier.funcName) + val funcIdentifier = identifier.asFunctionIdentifier + RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName) } private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ec2e9d6ee0458..977a8e98ecdaa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2214,7 +2214,7 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { sql("REFRESH FUNCTION testcat.ns1.ns2.fun") } - assert(e.message.contains("RefreshFunction is only supported in v1 catalog")) + assert(e.message.contains("function lookup is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("REFRESH FUNCTION default.ns1.ns2.fun") From 3bd8d2332f5c96c17e7739c8575765bdd2c61867 Mon Sep 17 00:00:00 2001 From: ulysses Date: Mon, 6 Jul 2020 08:29:04 +0800 Subject: [PATCH 33/45] update doc --- docs/sql-ref-syntax-aux-cache-cache-table.md | 2 +- docs/sql-ref-syntax-aux-cache-clear-cache.md | 2 +- docs/sql-ref-syntax-aux-cache-refresh-table.md | 2 +- docs/sql-ref-syntax-aux-cache-uncache-table.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/sql-ref-syntax-aux-cache-cache-table.md b/docs/sql-ref-syntax-aux-cache-cache-table.md index a682734778ed5..8829016fc17ac 100644 --- a/docs/sql-ref-syntax-aux-cache-cache-table.md +++ b/docs/sql-ref-syntax-aux-cache-cache-table.md @@ -80,4 +80,4 @@ CACHE TABLE testCache OPTIONS ('storageLevel' 'DISK_ONLY') SELECT * FROM testDat * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) -* [REFRESH FUNCTION](sql-ref-syntax-aux-refresh-function.html) \ No newline at end of file +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache-clear-cache.md b/docs/sql-ref-syntax-aux-cache-clear-cache.md index f38c14a4a39c8..aae4e39600375 100644 --- a/docs/sql-ref-syntax-aux-cache-clear-cache.md +++ b/docs/sql-ref-syntax-aux-cache-clear-cache.md @@ -41,4 +41,4 @@ CLEAR CACHE; * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) -* [REFRESH FUNCTION](sql-ref-syntax-aux-refresh-function.html) \ No newline at end of file +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache-refresh-table.md b/docs/sql-ref-syntax-aux-cache-refresh-table.md index c6cade8fd74ff..cc35c0451d5cb 100644 --- a/docs/sql-ref-syntax-aux-cache-refresh-table.md +++ b/docs/sql-ref-syntax-aux-cache-refresh-table.md @@ -57,4 +57,4 @@ REFRESH TABLE tempDB.view1; * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) -* [REFRESH FUNCTION](sql-ref-syntax-aux-refresh-function.html) \ No newline at end of file +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache-uncache-table.md b/docs/sql-ref-syntax-aux-cache-uncache-table.md index 86937d3317519..4456378cdee15 100644 --- a/docs/sql-ref-syntax-aux-cache-uncache-table.md +++ b/docs/sql-ref-syntax-aux-cache-uncache-table.md @@ -50,4 +50,4 @@ UNCACHE TABLE t1; * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) -* [REFRESH FUNCTION](sql-ref-syntax-aux-refresh-function.html) \ No newline at end of file +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) \ No newline at end of file From 60ac2a0b5f6613d319ce613347e450e982f7f738 Mon Sep 17 00:00:00 2001 From: ulysses Date: Mon, 6 Jul 2020 08:32:04 +0800 Subject: [PATCH 34/45] fix doc --- docs/_data/menu-sql.yaml | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/docs/_data/menu-sql.yaml b/docs/_data/menu-sql.yaml index 70a7a0771f283..f2c4e21132256 100644 --- a/docs/_data/menu-sql.yaml +++ b/docs/_data/menu-sql.yaml @@ -207,13 +207,9 @@ - text: CLEAR CACHE url: sql-ref-syntax-aux-cache-clear-cache.html - text: REFRESH TABLE -<<<<<<< HEAD - url: sql-ref-syntax-aux-refresh-table.html - - text: REFRESH FUNCTION - url: sql-ref-syntax-aux-refresh-function.html -======= url: sql-ref-syntax-aux-cache-refresh-table.html ->>>>>>> 492d5d174a435c624bd87af9ee3621f4f1c8d1c5 + - text: REFRESH FUNCTION + url: sql-ref-syntax-aux-cache-refresh-function.html - text: REFRESH url: sql-ref-syntax-aux-cache-refresh.html - text: DESCRIBE From b36b760418d078a40dde92412e7408b2ed83e7f6 Mon Sep 17 00:00:00 2001 From: ulysses Date: Mon, 6 Jul 2020 08:43:36 +0800 Subject: [PATCH 35/45] update comment --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1f0cd690560d6..7064162ed7886 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1892,6 +1892,7 @@ class Analyzer( object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + // Resolve functions with concrete relations from v2 catalog. case UnresolvedFunc(multipartIdent) => val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, "function lookup") ResolvedFunc(Identifier.of(funcIdent.database.toArray, funcIdent.funcName)) From c5937a2c33d99650b2bad352dbe2273fb1735242 Mon Sep 17 00:00:00 2001 From: ulysses Date: Mon, 6 Jul 2020 08:57:17 +0800 Subject: [PATCH 36/45] rewrite RefreshFunctionCommand --- .../sql/catalyst/catalog/SessionCatalog.scala | 4 ++-- .../sql/execution/command/functions.scala | 24 ++++--------------- 2 files changed, 7 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 6f6bd62ff8707..053ba836f212c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1344,8 +1344,8 @@ class SessionCatalog( /** * Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] */ - def unregisterFunction(name: FunctionIdentifier): Unit = { - if (!functionRegistry.dropFunction(name)) { + def unregisterFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { + if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) { throw new NoSuchFunctionException( formatDatabaseName(name.database.getOrElse(currentDb)), name.funcName) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 6953640141f9b..927b5b84ad54b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -258,27 +258,13 @@ case class RefreshFunctionCommand( throw new AnalysisException(s"Cannot refresh temporary function $functionName") } - // we only refresh the permanent function. - // there are 4 cases: - // 1. registry exists externalCatalog exists - // 2. registry exists externalCatalog not exists - // 3. registry not exists externalCatalog exists - // 4. registry not exists externalCatalog not exists val identifier = FunctionIdentifier( functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) - val isRegisteredFunction = catalog.isRegisteredFunction(identifier) - val isPersistentFunction = catalog.isPersistentFunction(identifier) - if (isRegisteredFunction && isPersistentFunction) { - // re-register function - catalog.unregisterFunction(identifier) - val func = catalog.getFunctionMetadata(identifier) - catalog.registerFunction(func, true) - } else if (isRegisteredFunction && !isPersistentFunction) { - // unregister function and throw NoSuchFunctionException - catalog.unregisterFunction(identifier) - throw new NoSuchFunctionException(identifier.database.get, functionName) - } else if (!isRegisteredFunction && isPersistentFunction) { - // register function + // we only refresh the permanent function. + // 1. clear cached function. + // 2. register function if exists. + catalog.unregisterFunction(identifier, true) + if (catalog.isPersistentFunction(identifier)) { val func = catalog.getFunctionMetadata(identifier) catalog.registerFunction(func, true) } else { From 56ec5eaab0022f0509bab849df23c94cc080f3b6 Mon Sep 17 00:00:00 2001 From: ulysses Date: Tue, 14 Jul 2020 07:55:46 +0800 Subject: [PATCH 37/45] update doc --- docs/sql-ref-syntax-aux-cache-refresh-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-ref-syntax-aux-cache-refresh-function.md b/docs/sql-ref-syntax-aux-cache-refresh-function.md index f19ccef4234e1..d91fc062eb714 100644 --- a/docs/sql-ref-syntax-aux-cache-refresh-function.md +++ b/docs/sql-ref-syntax-aux-cache-refresh-function.md @@ -48,7 +48,7 @@ REFRESH FUNCTION func1; -- The cached entry of the function will be refreshed -- The function is resolved from tempDB database as the function name is qualified. -REFRESH FUNCTION tempDB.func1; +REFRESH FUNCTION db1.func1; ``` ### Related Statements From c129a545b6ec92117728439e83842fccb54a6a66 Mon Sep 17 00:00:00 2001 From: ulysses Date: Tue, 14 Jul 2020 08:07:24 +0800 Subject: [PATCH 38/45] fix functions --- .../org/apache/spark/sql/execution/command/functions.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 927b5b84ad54b..05c44e37419cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -261,13 +261,13 @@ case class RefreshFunctionCommand( val identifier = FunctionIdentifier( functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) // we only refresh the permanent function. - // 1. clear cached function. - // 2. register function if exists. - catalog.unregisterFunction(identifier, true) if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. val func = catalog.getFunctionMetadata(identifier) catalog.registerFunction(func, true) } else { + // function is not exists, clear cached function. + catalog.unregisterFunction(identifier, true) throw new NoSuchFunctionException(identifier.database.get, functionName) } From a95614456685c5a12a95b6a1c3e64615a80ee28f Mon Sep 17 00:00:00 2001 From: ulysses Date: Tue, 14 Jul 2020 20:48:49 +0800 Subject: [PATCH 39/45] fix --- .../org/apache/spark/sql/execution/command/functions.scala | 5 +++-- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 4 +--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 05c44e37419cb..c63d214caba0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -265,9 +265,10 @@ case class RefreshFunctionCommand( // register overwrite function. val func = catalog.getFunctionMetadata(identifier) catalog.registerFunction(func, true) - } else { - // function is not exists, clear cached function. + } else if (catalog.isRegisteredFunction(identifier)) { + // clear cached function. catalog.unregisterFunction(identifier, true) + } else { throw new NoSuchFunctionException(identifier.database.get, functionName) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index faafcb721008f..ad54d5d8b568b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -3058,9 +3058,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1") assert(spark.sessionState.catalog.isRegisteredFunction(func)) - intercept[NoSuchFunctionException] { - sql("REFRESH FUNCTION func1") - } + sql("REFRESH FUNCTION func1") assert(!spark.sessionState.catalog.isRegisteredFunction(func)) val function = CatalogFunction(func, "test.non.exists.udf", Seq.empty) From 711656d7e4e1632b2a3dbf9e5030b92170ffbc1e Mon Sep 17 00:00:00 2001 From: ulysses Date: Tue, 14 Jul 2020 21:35:03 +0800 Subject: [PATCH 40/45] remove unnecessary param --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 4 ++-- .../org/apache/spark/sql/execution/command/functions.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 053ba836f212c..6f6bd62ff8707 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1344,8 +1344,8 @@ class SessionCatalog( /** * Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] */ - def unregisterFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { - if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) { + def unregisterFunction(name: FunctionIdentifier): Unit = { + if (!functionRegistry.dropFunction(name)) { throw new NoSuchFunctionException( formatDatabaseName(name.database.getOrElse(currentDb)), name.funcName) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index c63d214caba0f..73dde4cad0c86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -267,7 +267,7 @@ case class RefreshFunctionCommand( catalog.registerFunction(func, true) } else if (catalog.isRegisteredFunction(identifier)) { // clear cached function. - catalog.unregisterFunction(identifier, true) + catalog.unregisterFunction(identifier) } else { throw new NoSuchFunctionException(identifier.database.get, functionName) } From 5d4c1521ffede8ea1e23d9f7e3d5285768522ddd Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 16 Jul 2020 19:06:04 +0800 Subject: [PATCH 41/45] simplify --- .../org/apache/spark/sql/execution/command/functions.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 73dde4cad0c86..d5d1bc8317d35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -265,11 +265,9 @@ case class RefreshFunctionCommand( // register overwrite function. val func = catalog.getFunctionMetadata(identifier) catalog.registerFunction(func, true) - } else if (catalog.isRegisteredFunction(identifier)) { + } else { // clear cached function. catalog.unregisterFunction(identifier) - } else { - throw new NoSuchFunctionException(identifier.database.get, functionName) } Seq.empty[Row] From 94fa132ca4d58f631cc7666e25b126bc28c7f34e Mon Sep 17 00:00:00 2001 From: ulysses Date: Fri, 17 Jul 2020 08:08:14 +0800 Subject: [PATCH 42/45] fix --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 4 ++-- .../org/apache/spark/sql/execution/command/functions.scala | 5 +++-- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 4 +++- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 6f6bd62ff8707..053ba836f212c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1344,8 +1344,8 @@ class SessionCatalog( /** * Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] */ - def unregisterFunction(name: FunctionIdentifier): Unit = { - if (!functionRegistry.dropFunction(name)) { + def unregisterFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { + if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) { throw new NoSuchFunctionException( formatDatabaseName(name.database.getOrElse(currentDb)), name.funcName) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index d5d1bc8317d35..bc1b70bd078a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -266,8 +266,9 @@ case class RefreshFunctionCommand( val func = catalog.getFunctionMetadata(identifier) catalog.registerFunction(func, true) } else { - // clear cached function. - catalog.unregisterFunction(identifier) + // clear cached function if exists. + catalog.unregisterFunction(identifier, true) + throw new NoSuchFunctionException(identifier.database.get, functionName) } Seq.empty[Row] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index ad54d5d8b568b..faafcb721008f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -3058,7 +3058,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1") assert(spark.sessionState.catalog.isRegisteredFunction(func)) - sql("REFRESH FUNCTION func1") + intercept[NoSuchFunctionException] { + sql("REFRESH FUNCTION func1") + } assert(!spark.sessionState.catalog.isRegisteredFunction(func)) val function = CatalogFunction(func, "test.non.exists.udf", Seq.empty) From fc4789fcb5357bd1a7cfc88b76c7d76822457db7 Mon Sep 17 00:00:00 2001 From: ulysses Date: Fri, 17 Jul 2020 12:00:43 +0800 Subject: [PATCH 43/45] simplify --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 4 ++-- .../org/apache/spark/sql/execution/command/functions.scala | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 053ba836f212c..6f6bd62ff8707 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1344,8 +1344,8 @@ class SessionCatalog( /** * Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] */ - def unregisterFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { - if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) { + def unregisterFunction(name: FunctionIdentifier): Unit = { + if (!functionRegistry.dropFunction(name)) { throw new NoSuchFunctionException( formatDatabaseName(name.database.getOrElse(currentDb)), name.funcName) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index bc1b70bd078a0..2ded7ec208606 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -266,9 +266,8 @@ case class RefreshFunctionCommand( val func = catalog.getFunctionMetadata(identifier) catalog.registerFunction(func, true) } else { - // clear cached function if exists. - catalog.unregisterFunction(identifier, true) - throw new NoSuchFunctionException(identifier.database.get, functionName) + // clear cached function, if not exists throw exception + catalog.unregisterFunction(identifier) } Seq.empty[Row] From e83194f74ebfcb95b7a65f3b71014f3ab4a227cc Mon Sep 17 00:00:00 2001 From: ulysses Date: Tue, 21 Jul 2020 16:20:20 +0800 Subject: [PATCH 44/45] address comment --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 8 +++----- .../apache/spark/sql/execution/command/functions.scala | 4 +++- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 6f6bd62ff8707..18d61eb546bdb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1343,12 +1343,10 @@ class SessionCatalog( /** * Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] + * Return true if function exists. */ - def unregisterFunction(name: FunctionIdentifier): Unit = { - if (!functionRegistry.dropFunction(name)) { - throw new NoSuchFunctionException( - formatDatabaseName(name.database.getOrElse(currentDb)), name.funcName) - } + def unregisterFunction(name: FunctionIdentifier): Boolean = { + functionRegistry.dropFunction(name) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 2ded7ec208606..77bd59a630fb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -267,7 +267,9 @@ case class RefreshFunctionCommand( catalog.registerFunction(func, true) } else { // clear cached function, if not exists throw exception - catalog.unregisterFunction(identifier) + if (!catalog.unregisterFunction(identifier)) { + throw new NoSuchFunctionException(identifier.database.get, identifier.funcName) + } } Seq.empty[Row] From b18437c5998f5df50172732015f66b33c3908d2a Mon Sep 17 00:00:00 2001 From: ulysses Date: Tue, 21 Jul 2020 20:11:47 +0800 Subject: [PATCH 45/45] fix --- .../org/apache/spark/sql/execution/command/functions.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 77bd59a630fb1..252d188ff8fe2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -266,10 +266,9 @@ case class RefreshFunctionCommand( val func = catalog.getFunctionMetadata(identifier) catalog.registerFunction(func, true) } else { - // clear cached function, if not exists throw exception - if (!catalog.unregisterFunction(identifier)) { - throw new NoSuchFunctionException(identifier.database.get, identifier.funcName) - } + // clear cached function and throw exception + catalog.unregisterFunction(identifier) + throw new NoSuchFunctionException(identifier.database.get, identifier.funcName) } Seq.empty[Row]