From 4eec9457248c3ab03038b2878cb786e7c216d38e Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Mon, 27 Apr 2026 12:10:24 -0700 Subject: [PATCH 1/4] [SPARK-56639][SQL] Apply frozen SQL PATH during view and UDF analysis Use persisted resolution-path metadata to seed analysis context for views and SQL functions so unqualified lookup stays stable across session PATH changes. Add regression tests that verify view and SQL function bodies keep creation-time PATH semantics. --- .../sql/catalyst/analysis/Analyzer.scala | 14 +++++--- .../analysis/RelationResolution.scala | 6 ++-- .../connector/catalog/CatalogManager.scala | 27 ++++++++++++++++ .../sql/execution/SQLFunctionSuite.scala | 32 +++++++++++++++++++ .../spark/sql/execution/SQLViewSuite.scala | 26 +++++++++++++++ 5 files changed, 96 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 323a7db9c7ad7..ad33fd42546fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -139,10 +139,9 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog with Suppo * even if a temp view `t` has been created. * @param outerPlan The query plan from the outer query that can be used to resolve star * expressions in a subquery. - * @param resolutionPathEntries When resolving a view body, the ordered path for unqualified - * relation names. Stays [[None]] in this PR; population from the - * frozen path stored in view metadata is wired in a follow-up. - * Outside views: compute from session + * @param resolutionPathEntries When resolving a view or SQL function body, the ordered frozen + * path for unqualified relation/function names (if persisted in + * metadata). Outside views/functions, compute from session * [[CatalogManager.sqlResolutionPathEntries]]. */ case class AnalysisContext( @@ -211,6 +210,8 @@ object AnalysisContext { val context = AnalysisContext( isDefault = false, catalogAndNamespace = viewDesc.viewCatalogAndNamespace, + resolutionPathEntries = viewDesc.viewStoredResolutionPath + .flatMap(CatalogManager.deserializePathEntries), nestedViewDepth = originContext.nestedViewDepth + 1, maxNestedViewDepth = maxNestedViewDepth, relationCache = originContext.relationCache, @@ -224,7 +225,10 @@ object AnalysisContext { def withAnalysisContext[A](function: SQLFunction)(f: => A): A = { val originContext = value.get() - val context = originContext.copy(collation = function.collation) + val context = originContext.copy( + resolutionPathEntries = function.functionStoredResolutionPath + .flatMap(CatalogManager.deserializePathEntries), + collation = function.collation) set(context) try f finally { set(originContext) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index 7a5077a8a3e11..185f739050a4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -126,8 +126,8 @@ class RelationResolution( /** * Path entries for unqualified relation resolution. * - * Inside a view, [[AnalysisContext.resolutionPathEntries]] will be - * populated from the frozen path stored in view metadata (follow-up PR). + * Inside a view or SQL function, [[AnalysisContext.resolutionPathEntries]] uses the + * persisted frozen path from metadata when available. * When PATH is disabled, legacy resolution rules apply. */ private def relationResolutionEntries: Seq[Seq[String]] = { @@ -135,8 +135,6 @@ class RelationResolution( if (pinned.isDefined && conf.pathEnabled) { pinned.get } else { - // Keep expanding CurrentSchemaEntry using the live session catalog/namespace until the - // follow-up PR wires frozen resolutionPathEntries for view analysis. val expandCatalog = catalogManager.currentCatalog.name val expandNamespace = catalogManager.currentNamespace.toSeq val (pathCatalog, pathNamespace) = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 3f5afd9ce0de7..8a7090d822fc1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.catalog import scala.collection.mutable +import scala.util.Try import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.SQLConfHelper @@ -349,4 +350,30 @@ private[sql] object CatalogManager { compact(JArray(entries.map(parts => JArray(parts.map(JString(_)).toList)).toList)) } + + /** + * Parse a stored frozen path string from view/function metadata. + * Returns None if the payload is malformed. + */ + def deserializePathEntries(storedPathStr: String): Option[Seq[Seq[String]]] = { + import org.json4s.JsonAST.{JArray, JString} + import org.json4s.jackson.JsonMethods.parse + + Try(parse(storedPathStr)).toOption match { + case Some(JArray(entries)) if entries.nonEmpty => + val converted = entries.foldLeft(Option(Seq.empty[Seq[String]])) { (acc, entry) => + acc.flatMap { collected => + entry match { + case JArray(parts) if parts.nonEmpty => + val strings = parts.collect { case JString(s) => s } + if (strings.size == parts.size) Some(collected :+ strings) + else None + case _ => None + } + } + } + converted.filter(_.nonEmpty) + case _ => None + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala index 805fff7fa60f2..c0d48960afd00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession /** @@ -111,4 +112,35 @@ class SQLFunctionSuite extends SharedSparkSession { ) } } + + test("SPARK-56639: SQL function uses frozen SQL path") { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + withDatabase("path_func_db_a", "path_func_db_b") { + withTable("path_func_db_a.frozen_t", "path_func_db_b.frozen_t") { + withUserDefinedFunction("frozen_fn" -> false) { + sql("USE default") + sql("CREATE DATABASE path_func_db_a") + sql("CREATE DATABASE path_func_db_b") + sql("CREATE TABLE path_func_db_a.frozen_t USING parquet AS SELECT 10 AS id") + sql("CREATE TABLE path_func_db_b.frozen_t USING parquet AS SELECT 20 AS id") + try { + sql("SET PATH = spark_catalog.path_func_db_a, system.builtin") + sql( + """ + |CREATE FUNCTION frozen_fn() + |RETURNS INT + |RETURN (SELECT MAX(id) FROM frozen_t) + |""".stripMargin) + sql("SET PATH = spark_catalog.path_func_db_b, system.builtin") + + checkAnswer(sql("SELECT MAX(id) FROM frozen_t"), Row(20)) + checkAnswer(sql("SELECT default.frozen_fn()"), Row(10)) + } finally { + sql("SET PATH = DEFAULT_PATH") + } + } + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index d44737cd2ffd1..fbf0cc2f1dd8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -1409,4 +1409,30 @@ abstract class SQLViewSuite extends QueryTest { } } } + + test("SPARK-56639: permanent view uses frozen SQL path") { + withSQLConf(PATH_ENABLED.key -> "true") { + withDatabase("path_view_db_a", "path_view_db_b") { + withTable("path_view_db_a.frozen_t", "path_view_db_b.frozen_t") { + withView("default.v_path_frozen") { + sql("USE default") + sql("CREATE DATABASE path_view_db_a") + sql("CREATE DATABASE path_view_db_b") + sql("CREATE TABLE path_view_db_a.frozen_t USING parquet AS SELECT 1 AS id") + sql("CREATE TABLE path_view_db_b.frozen_t USING parquet AS SELECT 2 AS id") + try { + sql("SET PATH = spark_catalog.path_view_db_a, system.builtin") + sql("CREATE VIEW default.v_path_frozen AS SELECT id FROM frozen_t") + sql("SET PATH = spark_catalog.path_view_db_b, system.builtin") + + checkAnswer(sql("SELECT id FROM frozen_t"), Row(2)) + checkAnswer(sql("SELECT id FROM default.v_path_frozen"), Row(1)) + } finally { + sql("SET PATH = DEFAULT_PATH") + } + } + } + } + } + } } From 85e85b0819f5090c1c4c45e2133280b0afe4b281 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Tue, 28 Apr 2026 10:43:03 -0700 Subject: [PATCH 2/4] [SPARK-56639][SQL] Add frozen-path coverage for SQL table UDFs Extend SQLFunctionSuite with a table-function regression that verifies persisted SQL PATH semantics remain stable after session PATH changes. --- .../sql/execution/SQLFunctionSuite.scala | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala index c0d48960afd00..a6ab92134ca7b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala @@ -143,4 +143,35 @@ class SQLFunctionSuite extends SharedSparkSession { } } } + + test("SPARK-56639: SQL table function uses frozen SQL path") { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + withDatabase("path_tvf_db_a", "path_tvf_db_b") { + withTable("path_tvf_db_a.frozen_t", "path_tvf_db_b.frozen_t") { + withUserDefinedFunction("frozen_tvf" -> false) { + sql("USE default") + sql("CREATE DATABASE path_tvf_db_a") + sql("CREATE DATABASE path_tvf_db_b") + sql("CREATE TABLE path_tvf_db_a.frozen_t USING parquet AS SELECT 100 AS id") + sql("CREATE TABLE path_tvf_db_b.frozen_t USING parquet AS SELECT 200 AS id") + try { + sql("SET PATH = spark_catalog.path_tvf_db_a, system.builtin") + sql( + """ + |CREATE FUNCTION frozen_tvf() + |RETURNS TABLE(id INT) + |RETURN SELECT MAX(id) AS id FROM frozen_t + |""".stripMargin) + sql("SET PATH = spark_catalog.path_tvf_db_b, system.builtin") + + checkAnswer(sql("SELECT MAX(id) FROM frozen_t"), Row(200)) + checkAnswer(sql("SELECT * FROM default.frozen_tvf()"), Row(100)) + } finally { + sql("SET PATH = DEFAULT_PATH") + } + } + } + } + } + } } From d2500212e26a48812659e333818904a692a18a0f Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Tue, 28 Apr 2026 11:30:10 -0700 Subject: [PATCH 3/4] [SPARK-56639][SQL] Add invoker-context tests for current_schema/current_path Add regressions for persisted views and SQL scalar/table functions to verify CURRENT_SCHEMA() and CURRENT_PATH() resolve from the invoker session context at query time. --- .../sql/execution/SQLFunctionSuite.scala | 51 +++++++++++++++++++ .../spark/sql/execution/SQLViewSuite.scala | 35 +++++++++++++ 2 files changed, 86 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala index a6ab92134ca7b..2a51d4e6bce61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala @@ -174,4 +174,55 @@ class SQLFunctionSuite extends SharedSparkSession { } } } + + test("SPARK-56639: current_schema/current_path in SQL functions use invoker context") { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + withDatabase("path_ctx_fn_a", "path_ctx_fn_b") { + withUserDefinedFunction("path_ctx_fn_a.f_scalar_ctx" -> false, + "path_ctx_fn_a.f_table_ctx" -> false) { + sql("CREATE DATABASE path_ctx_fn_a") + sql("CREATE DATABASE path_ctx_fn_b") + try { + sql("USE path_ctx_fn_a") + sql( + """ + |CREATE FUNCTION path_ctx_fn_a.f_scalar_ctx() + |RETURNS STRING + |RETURN concat(current_schema(), '::', current_path()) + |""".stripMargin) + sql( + """ + |CREATE FUNCTION path_ctx_fn_a.f_table_ctx() + |RETURNS TABLE(cs STRING, cp STRING) + |RETURN SELECT current_schema() AS cs, current_path() AS cp + |""".stripMargin) + + sql("USE path_ctx_fn_b") + sql("SET PATH = DEFAULT_PATH") + + val scalar = sql("SELECT path_ctx_fn_a.f_scalar_ctx()").head().getString(0) + assert(scalar.startsWith("path_ctx_fn_b::"), + s"Expected scalar function to use invoker current_schema, got: $scalar") + assert(scalar.contains("path_ctx_fn_b"), + s"Expected scalar function to use invoker current_path, got: $scalar") + assert(!scalar.contains("path_ctx_fn_a"), + s"Did not expect creator schema in scalar function context, got: $scalar") + + val table = sql("SELECT cs, cp FROM path_ctx_fn_a.f_table_ctx()").head() + val tableSchema = table.getString(0) + val tablePath = table.getString(1) + assert(tableSchema == "path_ctx_fn_b", + s"Expected table function to use invoker current_schema, got: $tableSchema") + assert(tablePath.contains("path_ctx_fn_b"), + s"Expected table function to use invoker current_path, got: $tablePath") + assert(!tablePath.contains("path_ctx_fn_a"), + s"Did not expect creator schema in table function context, got: $tablePath") + } finally { + sql("SET PATH = DEFAULT_PATH") + sql("USE default") + } + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index fbf0cc2f1dd8f..0d922e90da544 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -1435,4 +1435,39 @@ abstract class SQLViewSuite extends QueryTest { } } } + + test("SPARK-56639: current_schema/current_path in persisted view use invoker context") { + withSQLConf(PATH_ENABLED.key -> "true") { + withDatabase("path_ctx_view_a", "path_ctx_view_b") { + withView("path_ctx_view_a.v_ctx") { + sql("CREATE DATABASE path_ctx_view_a") + sql("CREATE DATABASE path_ctx_view_b") + try { + sql("USE path_ctx_view_a") + sql( + """ + |CREATE VIEW path_ctx_view_a.v_ctx AS + |SELECT current_schema() AS cs, current_path() AS cp + |""".stripMargin) + + sql("USE path_ctx_view_b") + sql("SET PATH = DEFAULT_PATH") + val row = sql("SELECT cs, cp FROM path_ctx_view_a.v_ctx").head() + val currentSchema = row.getString(0) + val currentPath = row.getString(1) + + assert(currentSchema == "path_ctx_view_b", + s"Expected invoker current_schema, got: $currentSchema") + assert(currentPath.contains("path_ctx_view_b"), + s"Expected invoker current_path to include path_ctx_view_b, got: $currentPath") + assert(!currentPath.contains("path_ctx_view_a"), + s"Did not expect creator schema in current_path, got: $currentPath") + } finally { + sql("SET PATH = DEFAULT_PATH") + sql("USE default") + } + } + } + } + } } From 21d9baa7c5e97f0b5be3be01fca3ee86f4fe319d Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Tue, 28 Apr 2026 19:04:37 -0700 Subject: [PATCH 4/4] [SPARK-54810][SQL] Polish frozen-path comments and persistence coverage Clarify the Analyzer comment now that frozen PATH wiring is merged, and add regression coverage that persisted view/function path metadata materializes current_schema entries at create time. --- .../sql/catalyst/analysis/Analyzer.scala | 4 +- .../sql/AlwaysPersistedConfigsSuite.scala | 87 +++++++++++++++---- 2 files changed, 74 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ad33fd42546fb..005945aaae975 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1007,8 +1007,8 @@ class Analyzer( // This is done by keeping the catalog and namespace in `AnalysisContext`, and analyzer will // look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name. // If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names - // with it, instead of current catalog and namespace. Unqualified relation PATH will be - // snapshotted in `AnalysisContext.resolutionPathEntries` in a follow-up PR. + // with it, instead of current catalog and namespace. For views/functions with persisted frozen + // PATH, `AnalysisContext.resolutionPathEntries` drives unqualified relation lookup order. private def resolveViews( plan: LogicalPlan, options: CaseInsensitiveStringMap): LogicalPlan = plan match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/AlwaysPersistedConfigsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/AlwaysPersistedConfigsSuite.scala index c0f1d7ebaa05b..d871a8873e973 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/AlwaysPersistedConfigsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/AlwaysPersistedConfigsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.util.Locale + import org.scalactic.source.Position import org.scalatest.Tag @@ -31,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.{ } import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, View} +import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType @@ -169,6 +172,62 @@ class AlwaysPersistedConfigsSuite extends SharedSparkSession { assert(sqlConf.settings.get("spark.sql.ansi.enabled") == "false") } + test("Current schema marker is materialized in persisted view path") { + withView(testViewName) { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + sql("CREATE DATABASE IF NOT EXISTS path_materialized_view") + try { + sql("USE path_materialized_view") + sql("SET PATH = current_schema, system.builtin") + sql(s"CREATE VIEW $testViewName AS SELECT 1") + val metadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(testViewName)) + val storedPath = metadata.viewStoredResolutionPath.getOrElse( + fail("Expected persisted view resolution path to be set")) + val parsed = CatalogManager.deserializePathEntries(storedPath).getOrElse( + fail(s"Expected a valid serialized path, got: $storedPath")) + assert(parsed.head == Seq("spark_catalog", "path_materialized_view")) + assert(!storedPath.toLowerCase(Locale.ROOT).contains("current_schema")) + } finally { + sql("SET PATH = DEFAULT_PATH") + sql(s"DROP VIEW IF EXISTS path_materialized_view.$testViewName") + sql(s"DROP VIEW IF EXISTS $testViewName") + sql("USE default") + sql("DROP DATABASE IF EXISTS path_materialized_view") + } + } + } + } + + test("Current schema marker is materialized in persisted function path") { + withUserDefinedFunction(testFunctionName -> false) { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + sql("CREATE DATABASE IF NOT EXISTS path_materialized_fn") + try { + sql("USE path_materialized_fn") + sql("SET PATH = current_schema, system.builtin") + sql( + s""" + |CREATE OR REPLACE FUNCTION $testFunctionName() + |RETURN SELECT 1 + |""".stripMargin) + val function = analyzedSqlFunction(testFunctionName) + val storedPath = function.functionStoredResolutionPath.getOrElse( + fail("Expected persisted function resolution path to be set")) + val parsed = CatalogManager.deserializePathEntries(storedPath).getOrElse( + fail(s"Expected a valid serialized path, got: $storedPath")) + assert(parsed.head == Seq("spark_catalog", "path_materialized_fn")) + assert(!storedPath.toLowerCase(Locale.ROOT).contains("current_schema")) + } finally { + sql("SET PATH = DEFAULT_PATH") + sql(s"DROP FUNCTION IF EXISTS path_materialized_fn.$testFunctionName") + sql(s"DROP FUNCTION IF EXISTS $testFunctionName") + sql("USE default") + sql("DROP DATABASE IF EXISTS path_materialized_fn") + } + } + } + } + private def testView(confName: String, expectedValue: String): Unit = { sql(s"CREATE VIEW $testViewName AS SELECT CAST('string' AS BIGINT) AS alias") @@ -185,21 +244,19 @@ class AlwaysPersistedConfigsSuite extends SharedSparkSession { |RETURN SELECT CAST('string' AS BIGINT) AS alias |""".stripMargin) - val df = sql(s"select $testFunctionName()") + assert(analyzedSqlFunction(testFunctionName).properties.get(confName).get == expectedValue) + } - assert( - df.queryExecution.analyzed - .asInstanceOf[Project] - .projectList - .head - .asInstanceOf[Alias] - .child - .asInstanceOf[SQLScalarFunction] - .function - .asInstanceOf[SQLFunction] - .properties - .get(confName) - .get == expectedValue - ) + private def analyzedSqlFunction(functionName: String): SQLFunction = { + val df = sql(s"select $functionName()") + df.queryExecution.analyzed + .asInstanceOf[Project] + .projectList + .head + .asInstanceOf[Alias] + .child + .asInstanceOf[SQLScalarFunction] + .function + .asInstanceOf[SQLFunction] } }