Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,27 +337,37 @@ class Analyzer(
AnalysisContext.reset()
try {
AnalysisHelper.markInAnalyzer {
sessionConf match {
case Some(c) => SQLConf.withExistingConf(c) { runAnalysis() }
case None => runAnalysis()
}
runWithSessionConf(runAnalysis())
}
} finally {
AnalysisContext.reset()
}
} else {
AnalysisContext.withNewAnalysisContext {
AnalysisHelper.markInAnalyzer {
sessionConf match {
case Some(c) => SQLConf.withExistingConf(c) { runAnalysis() }
case None => runAnalysis()
}
runWithSessionConf(runAnalysis())
}
}
}
}
}

/**
* Runs `thunk` under the analyzer's [[sessionConf]] for analyzer isolation, but yields to any
* outer [[SQLConf.withExistingConf]] scope (e.g. a SQL UDF / view body that pinned the
* creation-time configs). Falls through unchanged when [[sessionConf]] is unset, or when the
* outer scope already installed a different conf -- otherwise the outer scope's conf would be
* silently clobbered.
*/
private def runWithSessionConf[T](thunk: => T): T = sessionConf match {
case None => thunk
case Some(c) =>
SQLConf.getExistingConfIfSet match {
case Some(outer) if outer ne c => thunk
case _ => SQLConf.withExistingConf(c) { thunk }
}
}

/**
* Returns a copy of this analyzer that uses the given [[CatalogManager]] for all catalog
* lookups. All other configuration (extended rules, checks, etc.) is preserved. Used by
Expand Down Expand Up @@ -391,13 +401,8 @@ class Analyzer(
}
}

private def executeSameContext(plan: LogicalPlan): LogicalPlan = sessionConf match {
// Respect explicit nested SQLConf overrides (e.g. persisted SQL UDF/view configs).
// Otherwise, run analysis with the captured session conf for analyzer isolation.
case Some(c) if SQLConf.get ne c => super.execute(plan)
case Some(c) => SQLConf.withExistingConf(c) { super.execute(plan) }
case None => super.execute(plan)
}
private def executeSameContext(plan: LogicalPlan): LogicalPlan =
runWithSessionConf(super.execute(plan))

def resolver: Resolver = conf.resolver

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ class FunctionResolution(

private val trimWarningEnabled = new AtomicBoolean(true)

/** Returns the current catalog path, preferring the view's context if resolving a view. */
private def currentCatalogPath: Seq[String] = {
val ctx = AnalysisContext.get.catalogAndNamespace
if (ctx.nonEmpty) ctx
else (Seq(catalogManager.currentCatalog.name) ++ catalogManager.currentNamespace).toSeq
}

/** True if nameParts is 3-part and the first part is the system catalog name. */
private def isSystemCatalogQualified(nameParts: Seq[String]): Boolean =
nameParts.length == 3 &&
Expand All @@ -101,18 +94,10 @@ class FunctionResolution(
* directly, matching [[RelationResolution.relationResolutionEntries]] so routine order stays
* aligned with relation order.
*/
private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] = {
AnalysisContext.get.resolutionPathEntries match {
case Some(entries) if conf.pathEnabled => entries
case _ =>
val pathDefault = currentCatalogPath
catalogManager.sqlResolutionPathEntries(
pathDefault.head,
pathDefault.tail.toSeq,
catalogManager.currentCatalog.name,
catalogManager.currentNamespace.toSeq)
}
}
private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] =
catalogManager.resolutionPathEntriesForAnalysis(
AnalysisContext.get.resolutionPathEntries,
AnalysisContext.get.catalogAndNamespace)

private def resolutionCandidates(nameParts: Seq[String]): Seq[Seq[String]] = {
if (nameParts.size == 1) {
Expand Down Expand Up @@ -370,7 +355,20 @@ class FunctionResolution(
if (nameParts.length == 1) {
// Must match [[resolutionCandidates]] / [[resolveFunction]]: single-part names use PATH +
// session order, not only the current namespace (LookupCatalog single-part rule).
for (candidate <- resolutionCandidates(nameParts)) {
// `system.session.<name>` and `system.builtin.<name>` candidates were already resolved by
// [[lookupBuiltinOrTempFunction]] / [[lookupBuiltinOrTempTableFunction]] above (they
// route through `identifierFromSystemNameParts`, which only accepts those two
// namespaces); skip them here to avoid redundant catalog calls. Other `system.<x>`
// namespaces -- if any are ever added -- still go through persistent lookup.
val persistentCandidates = resolutionCandidates(nameParts).filterNot { c =>
c.length >= 2 &&
c.head.equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME) && {
val ns = c(1)
ns.equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE) ||
ns.equalsIgnoreCase(CatalogManager.BUILTIN_NAMESPACE)
}
}
for (candidate <- persistentCandidates) {
try {
candidate match {
case CatalogAndIdentifier(catalog, ident) =>
Expand All @@ -380,7 +378,12 @@ class FunctionResolution(
case _ =>
}
} catch {
case NonFatal(_) =>
// Only treat explicit "not found" / "forbidden" signals as a miss. Any other failure
// (e.g. permission denied, transient catalog error) propagates.
case _: NoSuchFunctionException
| _: NoSuchNamespaceException
| _: CatalogNotFoundException =>
case e: AnalysisException if e.getCondition == "FORBIDDEN_OPERATION" =>
}
}
return FunctionType.NotFound
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,25 +131,9 @@ class RelationResolution(
* When PATH is disabled, legacy resolution rules apply.
*/
private def relationResolutionEntries: Seq[Seq[String]] = {
val pinned = AnalysisContext.get.resolutionPathEntries
if (pinned.isDefined && conf.pathEnabled) {
pinned.get
} else {
val expandCatalog = catalogManager.currentCatalog.name
val expandNamespace = catalogManager.currentNamespace.toSeq
val (pathCatalog, pathNamespace) =
if (isResolvingView) {
val p = AnalysisContext.get.catalogAndNamespace
(p.head, p.tail.toSeq)
} else {
(expandCatalog, expandNamespace)
}
catalogManager.sqlResolutionPathEntries(
pathCatalog,
pathNamespace,
expandCatalog,
expandNamespace)
}
catalogManager.resolutionPathEntriesForAnalysis(
AnalysisContext.get.resolutionPathEntries,
AnalysisContext.get.catalogAndNamespace)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ CreateVariable(identifiers, _, _) =>
// We resolve only UnresolvedIdentifiers, and pass on the other nodes
val resolved = identifiers.map {
case UnresolvedIdentifier(nameParts, _) =>
case u @ UnresolvedIdentifier(nameParts, _) =>
if (withinLocalVariableScope) {
if (c.replace) {
throw new AnalysisException(
Expand All @@ -67,26 +67,22 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
val resolvedIdentifier
= catalogManager.tempVariableManager.qualify(nameParts.last)

assertValidSessionVariableNameParts(nameParts, resolvedIdentifier)
assertValidSessionVariableNameParts(nameParts, resolvedIdentifier, u.origin)
resolvedIdentifier
}
case plan => plan
}
c.copy(names = resolved)

case d @ DropVariable(UnresolvedIdentifier(nameParts, _), _) =>
case d @ DropVariable(u @ UnresolvedIdentifier(nameParts, _), _) =>
if (withinLocalVariableScope) {
throw new AnalysisException(
"UNSUPPORTED_FEATURE.SQL_SCRIPTING_DROP_TEMPORARY_VARIABLE", Map.empty)
}
if (nameParts.length == 1 &&
!catalogManager.sessionScopeUnqualifiedAllowed(
catalogManager.currentCatalog.name(),
catalogManager.currentNamespace.toSeq)) {
throw QueryCompilationErrors.unresolvedVariableError(nameParts, Seq("SYSTEM", "SESSION"))
}
// DDL on session variables targets `system.session` directly; the SQL path only applies
// to DML (see [[VariableResolution.allowUnqualifiedSessionTempVariableLookup]]).
val resolved = catalogManager.tempVariableManager.qualify(nameParts.last)
assertValidSessionVariableNameParts(nameParts, resolved)
assertValidSessionVariableNameParts(nameParts, resolved, u.origin)
d.copy(name = resolved)

case CreateFunction(UnresolvedIdentifier(nameParts, _), _, _, _, _)
Expand Down Expand Up @@ -221,13 +217,15 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

private def assertValidSessionVariableNameParts(
nameParts: Seq[String],
resolvedIdentifier: ResolvedIdentifier): Unit = {
resolvedIdentifier: ResolvedIdentifier,
origin: Origin): Unit = {
if (!validSessionVariableName(nameParts)) {
throw QueryCompilationErrors.unresolvedVariableError(
nameParts,
Seq(
Seq(Seq(
resolvedIdentifier.catalog.name(),
resolvedIdentifier.identifier.namespace().head)
resolvedIdentifier.identifier.namespace().head)),
origin
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class ResolveFetchCursor(val catalogManager: CatalogManager) extends Rule[Logica
nameParts = u.nameParts
) match {
case Some(variable) => variable.copy(canFold = false)
case _ => throw unresolvedVariableError(u.nameParts, Seq("SYSTEM", "SESSION"))
case _ => throw unresolvedVariableError(
u.nameParts, variableResolution.searchPathEntriesForError, u.origin)
Comment thread
srielau marked this conversation as resolved.
}

case other => throw SparkException.internalError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ class ResolveSetVariable(val catalogManager: CatalogManager) extends Rule[Logica
nameParts = u.nameParts
) match {
case Some(variable) => variable.copy(canFold = false)
case _ => throw unresolvedVariableError(u.nameParts, Seq("SYSTEM", "SESSION"))
case _ =>
throw unresolvedVariableError(
u.nameParts,
variableResolution.searchPathEntriesForError,
u.origin)
}

case other => throw SparkException.internalError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,25 @@ class VariableResolution(
* (PATH enabled and explicitly set).
*/
private def allowUnqualifiedSessionTempVariableLookup(nameParts: Seq[String]): Boolean = {
if (nameParts.length != 1) return true
catalogManager.sessionScopeUnqualifiedAllowed(
catalogManager.currentCatalog.name(),
catalogManager.currentNamespace.toSeq)
nameParts.length != 1 || catalogManager.isSystemSessionOnPath
}

/**
* Search-path entries to report in `UNRESOLVED_VARIABLE` for DML lookups (`SET VAR`,
* `FETCH ... INTO`). The full SQL path is reported regardless of how the name was
* qualified, matching the convention used by `TABLE_OR_VIEW_NOT_FOUND` and
* `UNRESOLVED_ROUTINE`. Keeping the rendering qualification-independent also avoids
* re-shaping the error if Spark ever grows struct-field assignment, where 2-part forms
* become genuinely ambiguous.
*
* DDL paths (`DECLARE` / `DROP` name validation in
* [[org.apache.spark.sql.catalyst.analysis.ResolveCatalogs]]) do not consult the SQL path
* and report `[system.session]` directly at their throw site.
*/
def searchPathEntriesForError: Seq[Seq[String]] = {
Comment thread
srielau marked this conversation as resolved.
catalogManager.resolutionPathEntriesForAnalysis(
AnalysisContext.get.resolutionPathEntries,
AnalysisContext.get.catalogAndNamespace)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{FakeSystemCatalog, ResolvedIdentifier}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier}
import org.apache.spark.sql.connector.catalog.CatalogManager.{SESSION_NAMESPACE, SYSTEM_CATALOG_NAME}
import org.apache.spark.sql.errors.DataTypeErrorsBase
Expand All @@ -49,8 +50,11 @@ trait VariableManager {
*
* @param nameParts Name parts of the variable.
* @param varDef The new VariableDefinition of the variable.
* @param origin Origin of the SET reference, used in
* [[org.apache.spark.sql.errors.QueryCompilationErrors.unresolvedVariableError]]
* if the variable is unexpectedly absent at execution time.
*/
def set(nameParts: Seq[String], varDef: VariableDefinition): Unit
def set(nameParts: Seq[String], varDef: VariableDefinition, origin: Origin): Unit

/**
* Get an existing variable.
Expand Down Expand Up @@ -130,11 +134,14 @@ class TempVariableManager extends VariableManager with DataTypeErrorsBase {
variables.put(name, varDef)
}

override def set(nameParts: Seq[String], varDef: VariableDefinition): Unit = synchronized {
override def set(
nameParts: Seq[String],
varDef: VariableDefinition,
origin: Origin): Unit = synchronized {
val name = nameParts.last
// Sanity check as this is already checked in ResolveSetVariable.
if (!variables.contains(name)) {
throw unresolvedVariableError(nameParts, Seq("SYSTEM", "SESSION"))
throw unresolvedVariableError(nameParts, Seq(Seq("SYSTEM", "SESSION")), origin)
}
variables.put(name, varDef)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7023,11 +7023,13 @@ class AstBuilder extends DataTypeAstBuilder
dataTypeOpt.map { dt => default.copy(child = Cast(default.child, dt)) }.getOrElse(default)
}
CreateVariable(
ctx.identifierReferences.asScala.map (
identifierReference => {
withIdentClause(identifierReference, UnresolvedIdentifier(_))
}
).toSeq,
ctx.identifierReferences.asScala.map { identifierReference =>
// Give each `UnresolvedIdentifier` its own origin pointing at the variable name
// fragment so analyzer-time errors (e.g. UNRESOLVED_VARIABLE) can highlight just
// that identifier rather than the whole `DECLARE ...` statement.
withIdentClause(identifierReference, parts =>
withOrigin(identifierReference) { UnresolvedIdentifier(parts) })
}.toSeq,
defaultExpression,
ctx.REPLACE() != null
)
Expand All @@ -7043,7 +7045,8 @@ class AstBuilder extends DataTypeAstBuilder
*/
override def visitDropVariable(ctx: DropVariableContext): LogicalPlan = withOrigin(ctx) {
DropVariable(
withIdentClause(ctx.identifierReference(), UnresolvedIdentifier(_)),
withIdentClause(ctx.identifierReference(), parts =>
withOrigin(ctx.identifierReference()) { UnresolvedIdentifier(parts) }),
ctx.EXISTS() != null
)
}
Expand Down Expand Up @@ -7168,7 +7171,7 @@ class AstBuilder extends DataTypeAstBuilder
// The SET variable source is a query
val variables = multipartIdentifierList.multipartIdentifier.asScala.map { variableIdent =>
val varName = visitMultipartIdentifier(variableIdent)
UnresolvedAttribute(varName)
withOrigin(variableIdent) { UnresolvedAttribute(varName) }
}.toSeq
SetVariable(variables, visitQuery(query))
} else {
Expand All @@ -7180,7 +7183,7 @@ class AstBuilder extends DataTypeAstBuilder
case n: NamedExpression => n
case e => Alias(e, varIdent.last)()
}
(UnresolvedAttribute(varIdent), varNamedExpr)
(withOrigin(assign.key) { UnresolvedAttribute(varIdent) }, varNamedExpr)
}.toSeq.unzip
SetVariable(variables, Project(values, OneRowRelation()))
}
Expand Down
Loading