Skip to content

Commit

Permalink
update ResolvedFunc
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you committed Jun 24, 2020
1 parent 703ad47 commit a79f72b
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 7 deletions.
Expand Up @@ -1893,8 +1893,7 @@ class Analyzer(
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case UnresolvedFunc(multipartIdent) =>
val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, "function lookup")
val info = v1SessionCatalog.lookupFunctionInfo(funcIdent)
ResolvedFunc(currentCatalog, CatalogFunction(funcIdent, info.getClassName, Nil))
ResolvedFunc(Identifier.of(funcIdent.database.toArray, funcIdent.funcName))

case q: LogicalPlan =>
q transformExpressions {
Expand Down
Expand Up @@ -85,7 +85,12 @@ case class ResolvedView(identifier: Identifier) extends LeafNode {
override def output: Seq[Attribute] = Nil
}

case class ResolvedFunc(catalog: CatalogPlugin, catalogFunction: CatalogFunction)
/**
* A plan containing resolved function.
*/
// TODO: create a generic representation for v1, v2 function, after we add function
// support to v2 catalog. For now we only need the identifier to fallback to v1 command.
case class ResolvedFunc(identifier: Identifier)
extends LeafNode {
override def output: Seq[Attribute] = Nil
}
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.connector.catalog

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.expressions.{BucketTransform, IdentityTransform, LogicalExpressions, Transform}
Expand Down Expand Up @@ -107,6 +107,14 @@ private[sql] object CatalogV2Implicits {
throw new AnalysisException(
s"$quoted is not a valid TableIdentifier as it has more than 2 name parts.")
}

def asFunctionIdentifier: FunctionIdentifier = ident.namespace() match {
case ns if ns.isEmpty => FunctionIdentifier(ident.name())
case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName))
case _ =>
throw new AnalysisException(
s"$quoted is not a valid FunctionIdentifier as it has more than 2 name parts.")
}
}

implicit class MultipartIdentifierHelper(parts: Seq[String]) {
Expand Down
Expand Up @@ -612,9 +612,10 @@ class ResolveSessionCatalog(
replace)
}

case RefreshFunction(ResolvedFunc(_, func)) =>
case RefreshFunction(ResolvedFunc(identifier)) =>
// Fallback to v1 command
RefreshFunctionCommand(func.identifier.database, func.identifier.funcName)
val funcIdentifier = identifier.asFunctionIdentifier
RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
}

private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match {
Expand Down
Expand Up @@ -2214,7 +2214,7 @@ class DataSourceV2SQLSuite
val e = intercept[AnalysisException] {
sql("REFRESH FUNCTION testcat.ns1.ns2.fun")
}
assert(e.message.contains("RefreshFunction is only supported in v1 catalog"))
assert(e.message.contains("function lookup is only supported in v1 catalog"))

val e1 = intercept[AnalysisException] {
sql("REFRESH FUNCTION default.ns1.ns2.fun")
Expand Down

0 comments on commit a79f72b

Please sign in to comment.