Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,9 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog with Suppo
* even if a temp view `t` has been created.
* @param outerPlan The query plan from the outer query that can be used to resolve star
* expressions in a subquery.
* @param resolutionPathEntries When resolving a view body, the ordered path for unqualified
* relation names. Stays [[None]] in this PR; population from the
* frozen path stored in view metadata is wired in a follow-up.
* Outside views: compute from session
* @param resolutionPathEntries When resolving a view or SQL function body, the ordered frozen
* path for unqualified relation/function names (if persisted in
* metadata). Outside views/functions, compute from session
* [[CatalogManager.sqlResolutionPathEntries]].
*/
case class AnalysisContext(
Expand Down Expand Up @@ -211,6 +210,8 @@ object AnalysisContext {
val context = AnalysisContext(
isDefault = false,
catalogAndNamespace = viewDesc.viewCatalogAndNamespace,
resolutionPathEntries = viewDesc.viewStoredResolutionPath
.flatMap(CatalogManager.deserializePathEntries),
nestedViewDepth = originContext.nestedViewDepth + 1,
maxNestedViewDepth = maxNestedViewDepth,
relationCache = originContext.relationCache,
Expand All @@ -224,7 +225,10 @@ object AnalysisContext {

def withAnalysisContext[A](function: SQLFunction)(f: => A): A = {
val originContext = value.get()
val context = originContext.copy(collation = function.collation)
val context = originContext.copy(
resolutionPathEntries = function.functionStoredResolutionPath
.flatMap(CatalogManager.deserializePathEntries),
collation = function.collation)
set(context)
try f finally { set(originContext) }
}
Expand Down Expand Up @@ -1003,8 +1007,8 @@ class Analyzer(
// This is done by keeping the catalog and namespace in `AnalysisContext`, and analyzer will
// look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name.
// If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names
// with it, instead of current catalog and namespace. Unqualified relation PATH will be
// snapshotted in `AnalysisContext.resolutionPathEntries` in a follow-up PR.
// with it, instead of current catalog and namespace. For views/functions with persisted frozen
// PATH, `AnalysisContext.resolutionPathEntries` drives unqualified relation lookup order.
private def resolveViews(
plan: LogicalPlan,
options: CaseInsensitiveStringMap): LogicalPlan = plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,15 @@ class RelationResolution(
/**
* Path entries for unqualified relation resolution.
*
* Inside a view, [[AnalysisContext.resolutionPathEntries]] will be
* populated from the frozen path stored in view metadata (follow-up PR).
* Inside a view or SQL function, [[AnalysisContext.resolutionPathEntries]] uses the
* persisted frozen path from metadata when available.
* When PATH is disabled, legacy resolution rules apply.
*/
private def relationResolutionEntries: Seq[Seq[String]] = {
val pinned = AnalysisContext.get.resolutionPathEntries
if (pinned.isDefined && conf.pathEnabled) {
pinned.get
} else {
// Keep expanding CurrentSchemaEntry using the live session catalog/namespace until the
// follow-up PR wires frozen resolutionPathEntries for view analysis.
val expandCatalog = catalogManager.currentCatalog.name
val expandNamespace = catalogManager.currentNamespace.toSeq
val (pathCatalog, pathNamespace) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.catalog

import scala.collection.mutable
import scala.util.Try

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.SQLConfHelper
Expand Down Expand Up @@ -349,4 +350,30 @@ private[sql] object CatalogManager {
compact(JArray(entries.map(parts =>
JArray(parts.map(JString(_)).toList)).toList))
}

/**
* Parse a stored frozen path string from view/function metadata.
* Returns None if the payload is malformed.
*/
def deserializePathEntries(storedPathStr: String): Option[Seq[Seq[String]]] = {
import org.json4s.JsonAST.{JArray, JString}
import org.json4s.jackson.JsonMethods.parse

Try(parse(storedPathStr)).toOption match {
case Some(JArray(entries)) if entries.nonEmpty =>
val converted = entries.foldLeft(Option(Seq.empty[Seq[String]])) { (acc, entry) =>
acc.flatMap { collected =>
entry match {
case JArray(parts) if parts.nonEmpty =>
val strings = parts.collect { case JString(s) => s }
if (strings.size == parts.size) Some(collected :+ strings)
else None
case _ => None
}
}
}
converted.filter(_.nonEmpty)
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import java.util.Locale

import org.scalactic.source.Position
import org.scalatest.Tag

Expand All @@ -31,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.{
}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, View}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -169,6 +172,62 @@ class AlwaysPersistedConfigsSuite extends SharedSparkSession {
assert(sqlConf.settings.get("spark.sql.ansi.enabled") == "false")
}

test("Current schema marker is materialized in persisted view path") {
withView(testViewName) {
withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
sql("CREATE DATABASE IF NOT EXISTS path_materialized_view")
try {
sql("USE path_materialized_view")
sql("SET PATH = current_schema, system.builtin")
sql(s"CREATE VIEW $testViewName AS SELECT 1")
val metadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(testViewName))
val storedPath = metadata.viewStoredResolutionPath.getOrElse(
fail("Expected persisted view resolution path to be set"))
val parsed = CatalogManager.deserializePathEntries(storedPath).getOrElse(
fail(s"Expected a valid serialized path, got: $storedPath"))
assert(parsed.head == Seq("spark_catalog", "path_materialized_view"))
assert(!storedPath.toLowerCase(Locale.ROOT).contains("current_schema"))
} finally {
sql("SET PATH = DEFAULT_PATH")
sql(s"DROP VIEW IF EXISTS path_materialized_view.$testViewName")
sql(s"DROP VIEW IF EXISTS $testViewName")
sql("USE default")
sql("DROP DATABASE IF EXISTS path_materialized_view")
}
}
}
}

test("Current schema marker is materialized in persisted function path") {
withUserDefinedFunction(testFunctionName -> false) {
withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
sql("CREATE DATABASE IF NOT EXISTS path_materialized_fn")
try {
sql("USE path_materialized_fn")
sql("SET PATH = current_schema, system.builtin")
sql(
s"""
|CREATE OR REPLACE FUNCTION $testFunctionName()
|RETURN SELECT 1
|""".stripMargin)
val function = analyzedSqlFunction(testFunctionName)
val storedPath = function.functionStoredResolutionPath.getOrElse(
fail("Expected persisted function resolution path to be set"))
val parsed = CatalogManager.deserializePathEntries(storedPath).getOrElse(
fail(s"Expected a valid serialized path, got: $storedPath"))
assert(parsed.head == Seq("spark_catalog", "path_materialized_fn"))
assert(!storedPath.toLowerCase(Locale.ROOT).contains("current_schema"))
} finally {
sql("SET PATH = DEFAULT_PATH")
sql(s"DROP FUNCTION IF EXISTS path_materialized_fn.$testFunctionName")
sql(s"DROP FUNCTION IF EXISTS $testFunctionName")
sql("USE default")
sql("DROP DATABASE IF EXISTS path_materialized_fn")
}
}
}
}

private def testView(confName: String, expectedValue: String): Unit = {
sql(s"CREATE VIEW $testViewName AS SELECT CAST('string' AS BIGINT) AS alias")

Expand All @@ -185,21 +244,19 @@ class AlwaysPersistedConfigsSuite extends SharedSparkSession {
|RETURN SELECT CAST('string' AS BIGINT) AS alias
|""".stripMargin)

val df = sql(s"select $testFunctionName()")
assert(analyzedSqlFunction(testFunctionName).properties.get(confName).get == expectedValue)
}

assert(
df.queryExecution.analyzed
.asInstanceOf[Project]
.projectList
.head
.asInstanceOf[Alias]
.child
.asInstanceOf[SQLScalarFunction]
.function
.asInstanceOf[SQLFunction]
.properties
.get(confName)
.get == expectedValue
)
private def analyzedSqlFunction(functionName: String): SQLFunction = {
val df = sql(s"select $functionName()")
df.queryExecution.analyzed
.asInstanceOf[Project]
.projectList
.head
.asInstanceOf[Alias]
.child
.asInstanceOf[SQLScalarFunction]
.function
.asInstanceOf[SQLFunction]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

/**
Expand Down Expand Up @@ -111,4 +112,117 @@ class SQLFunctionSuite extends SharedSparkSession {
)
}
}

test("SPARK-56639: SQL function uses frozen SQL path") {
withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
withDatabase("path_func_db_a", "path_func_db_b") {
withTable("path_func_db_a.frozen_t", "path_func_db_b.frozen_t") {
withUserDefinedFunction("frozen_fn" -> false) {
sql("USE default")
sql("CREATE DATABASE path_func_db_a")
sql("CREATE DATABASE path_func_db_b")
sql("CREATE TABLE path_func_db_a.frozen_t USING parquet AS SELECT 10 AS id")
sql("CREATE TABLE path_func_db_b.frozen_t USING parquet AS SELECT 20 AS id")
try {
sql("SET PATH = spark_catalog.path_func_db_a, system.builtin")
sql(
"""
|CREATE FUNCTION frozen_fn()
|RETURNS INT
|RETURN (SELECT MAX(id) FROM frozen_t)
|""".stripMargin)
sql("SET PATH = spark_catalog.path_func_db_b, system.builtin")

checkAnswer(sql("SELECT MAX(id) FROM frozen_t"), Row(20))
checkAnswer(sql("SELECT default.frozen_fn()"), Row(10))
} finally {
sql("SET PATH = DEFAULT_PATH")
}
}
}
}
}
}

test("SPARK-56639: SQL table function uses frozen SQL path") {
withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
withDatabase("path_tvf_db_a", "path_tvf_db_b") {
withTable("path_tvf_db_a.frozen_t", "path_tvf_db_b.frozen_t") {
withUserDefinedFunction("frozen_tvf" -> false) {
sql("USE default")
sql("CREATE DATABASE path_tvf_db_a")
sql("CREATE DATABASE path_tvf_db_b")
sql("CREATE TABLE path_tvf_db_a.frozen_t USING parquet AS SELECT 100 AS id")
sql("CREATE TABLE path_tvf_db_b.frozen_t USING parquet AS SELECT 200 AS id")
try {
sql("SET PATH = spark_catalog.path_tvf_db_a, system.builtin")
sql(
"""
|CREATE FUNCTION frozen_tvf()
|RETURNS TABLE(id INT)
|RETURN SELECT MAX(id) AS id FROM frozen_t
|""".stripMargin)
sql("SET PATH = spark_catalog.path_tvf_db_b, system.builtin")

checkAnswer(sql("SELECT MAX(id) FROM frozen_t"), Row(200))
checkAnswer(sql("SELECT * FROM default.frozen_tvf()"), Row(100))
} finally {
sql("SET PATH = DEFAULT_PATH")
}
}
}
}
}
}

test("SPARK-56639: current_schema/current_path in SQL functions use invoker context") {
withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
withDatabase("path_ctx_fn_a", "path_ctx_fn_b") {
withUserDefinedFunction("path_ctx_fn_a.f_scalar_ctx" -> false,
"path_ctx_fn_a.f_table_ctx" -> false) {
sql("CREATE DATABASE path_ctx_fn_a")
sql("CREATE DATABASE path_ctx_fn_b")
try {
sql("USE path_ctx_fn_a")
sql(
"""
|CREATE FUNCTION path_ctx_fn_a.f_scalar_ctx()
|RETURNS STRING
|RETURN concat(current_schema(), '::', current_path())
|""".stripMargin)
sql(
"""
|CREATE FUNCTION path_ctx_fn_a.f_table_ctx()
|RETURNS TABLE(cs STRING, cp STRING)
|RETURN SELECT current_schema() AS cs, current_path() AS cp
|""".stripMargin)

sql("USE path_ctx_fn_b")
sql("SET PATH = DEFAULT_PATH")

val scalar = sql("SELECT path_ctx_fn_a.f_scalar_ctx()").head().getString(0)
assert(scalar.startsWith("path_ctx_fn_b::"),
s"Expected scalar function to use invoker current_schema, got: $scalar")
assert(scalar.contains("path_ctx_fn_b"),
s"Expected scalar function to use invoker current_path, got: $scalar")
assert(!scalar.contains("path_ctx_fn_a"),
s"Did not expect creator schema in scalar function context, got: $scalar")

val table = sql("SELECT cs, cp FROM path_ctx_fn_a.f_table_ctx()").head()
val tableSchema = table.getString(0)
val tablePath = table.getString(1)
assert(tableSchema == "path_ctx_fn_b",
s"Expected table function to use invoker current_schema, got: $tableSchema")
assert(tablePath.contains("path_ctx_fn_b"),
s"Expected table function to use invoker current_path, got: $tablePath")
assert(!tablePath.contains("path_ctx_fn_a"),
s"Did not expect creator schema in table function context, got: $tablePath")
} finally {
sql("SET PATH = DEFAULT_PATH")
sql("USE default")
}
}
}
}
}
}
Loading