diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 323a7db9c7ad7..005945aaae975 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -139,10 +139,9 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog with Suppo * even if a temp view `t` has been created. * @param outerPlan The query plan from the outer query that can be used to resolve star * expressions in a subquery. - * @param resolutionPathEntries When resolving a view body, the ordered path for unqualified - * relation names. Stays [[None]] in this PR; population from the - * frozen path stored in view metadata is wired in a follow-up. - * Outside views: compute from session + * @param resolutionPathEntries When resolving a view or SQL function body, the ordered frozen + * path for unqualified relation/function names (if persisted in + * metadata). Outside views/functions, compute from session * [[CatalogManager.sqlResolutionPathEntries]]. */ case class AnalysisContext( @@ -211,6 +210,8 @@ object AnalysisContext { val context = AnalysisContext( isDefault = false, catalogAndNamespace = viewDesc.viewCatalogAndNamespace, + resolutionPathEntries = viewDesc.viewStoredResolutionPath + .flatMap(CatalogManager.deserializePathEntries), nestedViewDepth = originContext.nestedViewDepth + 1, maxNestedViewDepth = maxNestedViewDepth, relationCache = originContext.relationCache, @@ -224,7 +225,10 @@ object AnalysisContext { def withAnalysisContext[A](function: SQLFunction)(f: => A): A = { val originContext = value.get() - val context = originContext.copy(collation = function.collation) + val context = originContext.copy( + resolutionPathEntries = function.functionStoredResolutionPath + .flatMap(CatalogManager.deserializePathEntries), + collation = function.collation) set(context) try f finally { set(originContext) } } @@ -1003,8 +1007,8 @@ class Analyzer( // This is done by keeping the catalog and namespace in `AnalysisContext`, and analyzer will // look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name. // If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names - // with it, instead of current catalog and namespace. Unqualified relation PATH will be - // snapshotted in `AnalysisContext.resolutionPathEntries` in a follow-up PR. + // with it, instead of current catalog and namespace. For views/functions with persisted frozen + // PATH, `AnalysisContext.resolutionPathEntries` drives unqualified relation lookup order. private def resolveViews( plan: LogicalPlan, options: CaseInsensitiveStringMap): LogicalPlan = plan match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index 7a5077a8a3e11..185f739050a4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -126,8 +126,8 @@ class RelationResolution( /** * Path entries for unqualified relation resolution. * - * Inside a view, [[AnalysisContext.resolutionPathEntries]] will be - * populated from the frozen path stored in view metadata (follow-up PR). + * Inside a view or SQL function, [[AnalysisContext.resolutionPathEntries]] uses the + * persisted frozen path from metadata when available. * When PATH is disabled, legacy resolution rules apply. */ private def relationResolutionEntries: Seq[Seq[String]] = { @@ -135,8 +135,6 @@ class RelationResolution( if (pinned.isDefined && conf.pathEnabled) { pinned.get } else { - // Keep expanding CurrentSchemaEntry using the live session catalog/namespace until the - // follow-up PR wires frozen resolutionPathEntries for view analysis. val expandCatalog = catalogManager.currentCatalog.name val expandNamespace = catalogManager.currentNamespace.toSeq val (pathCatalog, pathNamespace) = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 3f5afd9ce0de7..8a7090d822fc1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.catalog import scala.collection.mutable +import scala.util.Try import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.SQLConfHelper @@ -349,4 +350,30 @@ private[sql] object CatalogManager { compact(JArray(entries.map(parts => JArray(parts.map(JString(_)).toList)).toList)) } + + /** + * Parse a stored frozen path string from view/function metadata. + * Returns None if the payload is malformed. + */ + def deserializePathEntries(storedPathStr: String): Option[Seq[Seq[String]]] = { + import org.json4s.JsonAST.{JArray, JString} + import org.json4s.jackson.JsonMethods.parse + + Try(parse(storedPathStr)).toOption match { + case Some(JArray(entries)) if entries.nonEmpty => + val converted = entries.foldLeft(Option(Seq.empty[Seq[String]])) { (acc, entry) => + acc.flatMap { collected => + entry match { + case JArray(parts) if parts.nonEmpty => + val strings = parts.collect { case JString(s) => s } + if (strings.size == parts.size) Some(collected :+ strings) + else None + case _ => None + } + } + } + converted.filter(_.nonEmpty) + case _ => None + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/AlwaysPersistedConfigsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/AlwaysPersistedConfigsSuite.scala index c0f1d7ebaa05b..d871a8873e973 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/AlwaysPersistedConfigsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/AlwaysPersistedConfigsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.util.Locale + import org.scalactic.source.Position import org.scalatest.Tag @@ -31,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.{ } import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, View} +import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType @@ -169,6 +172,62 @@ class AlwaysPersistedConfigsSuite extends SharedSparkSession { assert(sqlConf.settings.get("spark.sql.ansi.enabled") == "false") } + test("Current schema marker is materialized in persisted view path") { + withView(testViewName) { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + sql("CREATE DATABASE IF NOT EXISTS path_materialized_view") + try { + sql("USE path_materialized_view") + sql("SET PATH = current_schema, system.builtin") + sql(s"CREATE VIEW $testViewName AS SELECT 1") + val metadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(testViewName)) + val storedPath = metadata.viewStoredResolutionPath.getOrElse( + fail("Expected persisted view resolution path to be set")) + val parsed = CatalogManager.deserializePathEntries(storedPath).getOrElse( + fail(s"Expected a valid serialized path, got: $storedPath")) + assert(parsed.head == Seq("spark_catalog", "path_materialized_view")) + assert(!storedPath.toLowerCase(Locale.ROOT).contains("current_schema")) + } finally { + sql("SET PATH = DEFAULT_PATH") + sql(s"DROP VIEW IF EXISTS path_materialized_view.$testViewName") + sql(s"DROP VIEW IF EXISTS $testViewName") + sql("USE default") + sql("DROP DATABASE IF EXISTS path_materialized_view") + } + } + } + } + + test("Current schema marker is materialized in persisted function path") { + withUserDefinedFunction(testFunctionName -> false) { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + sql("CREATE DATABASE IF NOT EXISTS path_materialized_fn") + try { + sql("USE path_materialized_fn") + sql("SET PATH = current_schema, system.builtin") + sql( + s""" + |CREATE OR REPLACE FUNCTION $testFunctionName() + |RETURN SELECT 1 + |""".stripMargin) + val function = analyzedSqlFunction(testFunctionName) + val storedPath = function.functionStoredResolutionPath.getOrElse( + fail("Expected persisted function resolution path to be set")) + val parsed = CatalogManager.deserializePathEntries(storedPath).getOrElse( + fail(s"Expected a valid serialized path, got: $storedPath")) + assert(parsed.head == Seq("spark_catalog", "path_materialized_fn")) + assert(!storedPath.toLowerCase(Locale.ROOT).contains("current_schema")) + } finally { + sql("SET PATH = DEFAULT_PATH") + sql(s"DROP FUNCTION IF EXISTS path_materialized_fn.$testFunctionName") + sql(s"DROP FUNCTION IF EXISTS $testFunctionName") + sql("USE default") + sql("DROP DATABASE IF EXISTS path_materialized_fn") + } + } + } + } + private def testView(confName: String, expectedValue: String): Unit = { sql(s"CREATE VIEW $testViewName AS SELECT CAST('string' AS BIGINT) AS alias") @@ -185,21 +244,19 @@ class AlwaysPersistedConfigsSuite extends SharedSparkSession { |RETURN SELECT CAST('string' AS BIGINT) AS alias |""".stripMargin) - val df = sql(s"select $testFunctionName()") + assert(analyzedSqlFunction(testFunctionName).properties.get(confName).get == expectedValue) + } - assert( - df.queryExecution.analyzed - .asInstanceOf[Project] - .projectList - .head - .asInstanceOf[Alias] - .child - .asInstanceOf[SQLScalarFunction] - .function - .asInstanceOf[SQLFunction] - .properties - .get(confName) - .get == expectedValue - ) + private def analyzedSqlFunction(functionName: String): SQLFunction = { + val df = sql(s"select $functionName()") + df.queryExecution.analyzed + .asInstanceOf[Project] + .projectList + .head + .asInstanceOf[Alias] + .child + .asInstanceOf[SQLScalarFunction] + .function + .asInstanceOf[SQLFunction] } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala index 805fff7fa60f2..2a51d4e6bce61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession /** @@ -111,4 +112,117 @@ class SQLFunctionSuite extends SharedSparkSession { ) } } + + test("SPARK-56639: SQL function uses frozen SQL path") { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + withDatabase("path_func_db_a", "path_func_db_b") { + withTable("path_func_db_a.frozen_t", "path_func_db_b.frozen_t") { + withUserDefinedFunction("frozen_fn" -> false) { + sql("USE default") + sql("CREATE DATABASE path_func_db_a") + sql("CREATE DATABASE path_func_db_b") + sql("CREATE TABLE path_func_db_a.frozen_t USING parquet AS SELECT 10 AS id") + sql("CREATE TABLE path_func_db_b.frozen_t USING parquet AS SELECT 20 AS id") + try { + sql("SET PATH = spark_catalog.path_func_db_a, system.builtin") + sql( + """ + |CREATE FUNCTION frozen_fn() + |RETURNS INT + |RETURN (SELECT MAX(id) FROM frozen_t) + |""".stripMargin) + sql("SET PATH = spark_catalog.path_func_db_b, system.builtin") + + checkAnswer(sql("SELECT MAX(id) FROM frozen_t"), Row(20)) + checkAnswer(sql("SELECT default.frozen_fn()"), Row(10)) + } finally { + sql("SET PATH = DEFAULT_PATH") + } + } + } + } + } + } + + test("SPARK-56639: SQL table function uses frozen SQL path") { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + withDatabase("path_tvf_db_a", "path_tvf_db_b") { + withTable("path_tvf_db_a.frozen_t", "path_tvf_db_b.frozen_t") { + withUserDefinedFunction("frozen_tvf" -> false) { + sql("USE default") + sql("CREATE DATABASE path_tvf_db_a") + sql("CREATE DATABASE path_tvf_db_b") + sql("CREATE TABLE path_tvf_db_a.frozen_t USING parquet AS SELECT 100 AS id") + sql("CREATE TABLE path_tvf_db_b.frozen_t USING parquet AS SELECT 200 AS id") + try { + sql("SET PATH = spark_catalog.path_tvf_db_a, system.builtin") + sql( + """ + |CREATE FUNCTION frozen_tvf() + |RETURNS TABLE(id INT) + |RETURN SELECT MAX(id) AS id FROM frozen_t + |""".stripMargin) + sql("SET PATH = spark_catalog.path_tvf_db_b, system.builtin") + + checkAnswer(sql("SELECT MAX(id) FROM frozen_t"), Row(200)) + checkAnswer(sql("SELECT * FROM default.frozen_tvf()"), Row(100)) + } finally { + sql("SET PATH = DEFAULT_PATH") + } + } + } + } + } + } + + test("SPARK-56639: current_schema/current_path in SQL functions use invoker context") { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + withDatabase("path_ctx_fn_a", "path_ctx_fn_b") { + withUserDefinedFunction("path_ctx_fn_a.f_scalar_ctx" -> false, + "path_ctx_fn_a.f_table_ctx" -> false) { + sql("CREATE DATABASE path_ctx_fn_a") + sql("CREATE DATABASE path_ctx_fn_b") + try { + sql("USE path_ctx_fn_a") + sql( + """ + |CREATE FUNCTION path_ctx_fn_a.f_scalar_ctx() + |RETURNS STRING + |RETURN concat(current_schema(), '::', current_path()) + |""".stripMargin) + sql( + """ + |CREATE FUNCTION path_ctx_fn_a.f_table_ctx() + |RETURNS TABLE(cs STRING, cp STRING) + |RETURN SELECT current_schema() AS cs, current_path() AS cp + |""".stripMargin) + + sql("USE path_ctx_fn_b") + sql("SET PATH = DEFAULT_PATH") + + val scalar = sql("SELECT path_ctx_fn_a.f_scalar_ctx()").head().getString(0) + assert(scalar.startsWith("path_ctx_fn_b::"), + s"Expected scalar function to use invoker current_schema, got: $scalar") + assert(scalar.contains("path_ctx_fn_b"), + s"Expected scalar function to use invoker current_path, got: $scalar") + assert(!scalar.contains("path_ctx_fn_a"), + s"Did not expect creator schema in scalar function context, got: $scalar") + + val table = sql("SELECT cs, cp FROM path_ctx_fn_a.f_table_ctx()").head() + val tableSchema = table.getString(0) + val tablePath = table.getString(1) + assert(tableSchema == "path_ctx_fn_b", + s"Expected table function to use invoker current_schema, got: $tableSchema") + assert(tablePath.contains("path_ctx_fn_b"), + s"Expected table function to use invoker current_path, got: $tablePath") + assert(!tablePath.contains("path_ctx_fn_a"), + s"Did not expect creator schema in table function context, got: $tablePath") + } finally { + sql("SET PATH = DEFAULT_PATH") + sql("USE default") + } + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index d44737cd2ffd1..0d922e90da544 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -1409,4 +1409,65 @@ abstract class SQLViewSuite extends QueryTest { } } } + + test("SPARK-56639: permanent view uses frozen SQL path") { + withSQLConf(PATH_ENABLED.key -> "true") { + withDatabase("path_view_db_a", "path_view_db_b") { + withTable("path_view_db_a.frozen_t", "path_view_db_b.frozen_t") { + withView("default.v_path_frozen") { + sql("USE default") + sql("CREATE DATABASE path_view_db_a") + sql("CREATE DATABASE path_view_db_b") + sql("CREATE TABLE path_view_db_a.frozen_t USING parquet AS SELECT 1 AS id") + sql("CREATE TABLE path_view_db_b.frozen_t USING parquet AS SELECT 2 AS id") + try { + sql("SET PATH = spark_catalog.path_view_db_a, system.builtin") + sql("CREATE VIEW default.v_path_frozen AS SELECT id FROM frozen_t") + sql("SET PATH = spark_catalog.path_view_db_b, system.builtin") + + checkAnswer(sql("SELECT id FROM frozen_t"), Row(2)) + checkAnswer(sql("SELECT id FROM default.v_path_frozen"), Row(1)) + } finally { + sql("SET PATH = DEFAULT_PATH") + } + } + } + } + } + } + + test("SPARK-56639: current_schema/current_path in persisted view use invoker context") { + withSQLConf(PATH_ENABLED.key -> "true") { + withDatabase("path_ctx_view_a", "path_ctx_view_b") { + withView("path_ctx_view_a.v_ctx") { + sql("CREATE DATABASE path_ctx_view_a") + sql("CREATE DATABASE path_ctx_view_b") + try { + sql("USE path_ctx_view_a") + sql( + """ + |CREATE VIEW path_ctx_view_a.v_ctx AS + |SELECT current_schema() AS cs, current_path() AS cp + |""".stripMargin) + + sql("USE path_ctx_view_b") + sql("SET PATH = DEFAULT_PATH") + val row = sql("SELECT cs, cp FROM path_ctx_view_a.v_ctx").head() + val currentSchema = row.getString(0) + val currentPath = row.getString(1) + + assert(currentSchema == "path_ctx_view_b", + s"Expected invoker current_schema, got: $currentSchema") + assert(currentPath.contains("path_ctx_view_b"), + s"Expected invoker current_path to include path_ctx_view_b, got: $currentPath") + assert(!currentPath.contains("path_ctx_view_a"), + s"Did not expect creator schema in current_path, got: $currentPath") + } finally { + sql("SET PATH = DEFAULT_PATH") + sql("USE default") + } + } + } + } + } }