From 59a06373dbd818b51f8fe9749ac3c0a5cc8660a6 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 05:18:44 +0000 Subject: [PATCH 01/15] [SPARK-46625][SQL] Place IDENTIFIER placeholder in command name slot Push `PlanWithUnresolvedIdentifier` into the identifier slot of write commands at parse time (`InsertIntoStatement.table`, `CreateTableAsSelect.name`, `ReplaceTableAsSelect.name`) instead of wrapping the entire command. `CTESubstitution` then sees the real `CTEInChildren` and places `WithCTE` on the command's children by construction, fixing the invalid `WithCTE(InsertIntoStatement, ...)` / `WithCTE(CreateTableAsSelect, ...)` shape produced for queries like `WITH t AS (...) INSERT INTO IDENTIFIER('t') SELECT * FROM t`. `InsertIntoStatement.table` is a non-child `LogicalPlan` field (`child = query`), which would prevent tree-pattern propagation and `BindParameters.bind` from reaching placeholders inside it (breaking legacy parameter substitution mode `INSERT ... IDENTIFIER(:p)`). Add a small `innerPlans` / `withNewInnerPlans` hook on `LogicalPlan`, override it on `InsertIntoStatement`, and wire it into `LogicalPlan.getDefaultTreePatternBits`, `BindParameters.bind`, and `ResolveIdentifierClause`. `CreateTableAsSelect.name` / `ReplaceTableAsSelect.name` are already children via `V2CreateTableAsSelectPlan.childrenToAnalyze`, so no hook is needed for those. A narrow post-hoc `WithCTE(c: CTEInChildren, _)` collapse is kept in `ResolveIdentifierClause` for the two call sites where the identifier slot's type prevents pushing the placeholder in directly: `OverwriteByExpression.table: NamedRelation` (INSERT REPLACE WHERE) and `CacheTableAsSelect.tempViewName: String`. Supersedes apache#55706. Credit to stevomitric for surfacing the legacy-mode `BindParameters` issue in the original PR thread. Co-authored-by: Isaac --- .../analysis/ResolveIdentifierClause.scala | 36 ++- .../sql/catalyst/analysis/parameters.scala | 15 +- .../sql/catalyst/parser/AstBuilder.scala | 247 ++++++++++-------- .../catalyst/plans/logical/LogicalPlan.scala | 35 +++ .../catalyst/plans/logical/statements.scala | 10 + .../apache/spark/sql/ParametersSuite.scala | 71 ++++- 6 files changed, 291 insertions(+), 123 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala index 7150c81ad64ec..8d2e2b9d18998 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, VariableReference} -import org.apache.spark.sql.catalyst.plans.logical.{CreateView, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{CreateView, CTEInChildren, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.errors.QueryCompilationErrors @@ -59,8 +59,8 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] private def apply0( plan: LogicalPlan, - referredTempVars: Option[mutable.ArrayBuffer[Seq[String]]] = None): LogicalPlan = - plan.resolveOperatorsUpWithPruning(_.containsAnyPattern( + referredTempVars: Option[mutable.ArrayBuffer[Seq[String]]] = None): LogicalPlan = { + val resolved = plan.resolveOperatorsUpWithPruning(_.containsAnyPattern( UNRESOLVED_IDENTIFIER, PLAN_WITH_UNRESOLVED_IDENTIFIER)) { case p: PlanWithUnresolvedIdentifier if p.identifierExpr.resolved && p.childrenResolved => @@ -70,6 +70,26 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] executor.execute(p.planBuilder.apply( IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) + case cmd if cmd.innerPlans.exists(_.exists { + case p: PlanWithUnresolvedIdentifier => p.identifierExpr.resolved && p.childrenResolved + case _ => false + }) => + // Materialize placeholders that live in non-child LogicalPlan slots (e.g. + // `InsertIntoStatement.table`). Without this case, the standard `resolveOperatorsUp` + // never visits these slots because they're not in `children`. + val newInnerPlans = cmd.innerPlans.map { inner => + inner.resolveOperatorsUpWithPruning( + _.containsAnyPattern(UNRESOLVED_IDENTIFIER, PLAN_WITH_UNRESOLVED_IDENTIFIER)) { + case p: PlanWithUnresolvedIdentifier + if p.identifierExpr.resolved && p.childrenResolved => + if (referredTempVars.isDefined) { + referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) + } + executor.execute(p.planBuilder.apply( + IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) + } + } + cmd.withNewInnerPlans(newInnerPlans) case other => other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) { case e: ExpressionWithUnresolvedIdentifier if e.identifierExpr.resolved => @@ -82,6 +102,16 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] IdentifierResolution.evalIdentifierExpr(e.identifierExpr), e.otherExprs) } } + // For the call sites we cannot refactor at the parser level (e.g. `OverwriteByExpression` + // whose `table` field is typed `NamedRelation`, or `CacheTableAsSelect` whose name is a + // plain String), `PlanWithUnresolvedIdentifier` still wraps the entire command. When that + // wrapper is itself inside `WithCTE`, push the CTE defs into the materialized command's + // children - restoring the invariant `CTESubstitution.withCTEDefs` enforces at substitution + // time. SPARK-46625. + resolved.resolveOperatorsUpWithPruning(_.containsPattern(CTE)) { + case WithCTE(c: CTEInChildren, cteDefs) => c.withCTEDefs(cteDefs) + } + } private def collectTemporaryVariablesInLogicalPlan(child: LogicalPlan): Seq[Seq[String]] = { def collectTempVars(child: LogicalPlan): Seq[Seq[String]] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala index bf9acb775ce10..d3f3485b70b1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala @@ -179,9 +179,18 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase { p0.resolveOperatorsDownWithPruning(_.containsPattern(PARAMETER) && !stop) { case p1 => stop = p1.isInstanceOf[ParameterizedQuery] - p1.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) (f orElse { - case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f)) - }) + val withBoundInnerPlans = if (p1.innerPlans.nonEmpty && + p1.innerPlans.exists(_.containsPattern(PARAMETER))) { + // Recurse into non-child LogicalPlan slots (e.g. `InsertIntoStatement.table`). + // Without this, parameters inside identifier placeholders would never be bound. + p1.withNewInnerPlans(p1.innerPlans.map(bind(_)(f))) + } else { + p1 + } + withBoundInnerPlans.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) ( + f orElse { + case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f)) + }) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a79f64cf53d94..8bb6a1a10c2b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -933,32 +933,33 @@ class AstBuilder extends DataTypeAstBuilder query: LogicalPlan, queryAliasCtx: TableAliasContext): LogicalPlan = withOrigin(ctx) { ctx match { - // We cannot push withIdentClause() into the write command because: - // 1. `PlanWithUnresolvedIdentifier` is not a NamedRelation - // 2. Write commands do not hold the table logical plan as a child, and we need to add - // additional resolution code to resolve identifiers inside the write commands. + // For `InsertIntoStatement`-producing branches, build the `table` slot directly via + // `buildWriteTableSlot` so that any `PlanWithUnresolvedIdentifier` lives *inside* the + // command. This preserves the `CTEInChildren` shape and lets `CTESubstitution` place + // `WithCTE` on the command's children correctly (SPARK-46625). + // `OverwriteByExpression.table` is typed `NamedRelation`, so the REPLACE WHERE branch + // still wraps the command with `PlanWithUnresolvedIdentifier`; that case is handled + // by the post-hoc `WithCTE(c: CTEInChildren, _)` collapse in `ResolveIdentifierClause`. case table: InsertIntoTableContext => val insertParams = visitInsertIntoTable(table) - withIdentClause(insertParams.relationCtx, Seq(query), (ident, otherPlans) => { - createInsertIntoStatement( - insertParams = insertParams, - ident = ident, - query = otherPlans.head, - overwrite = false, - writePrivileges = Set(TableWritePrivilege.INSERT), - withSchemaEvolution = table.EVOLUTION() != null) - }) + val privileges = Set(TableWritePrivilege.INSERT) + createInsertIntoStatement( + insertParams = insertParams, + tableSlot = buildWriteTableSlot( + insertParams.relationCtx, insertParams.options, privileges), + query = query, + overwrite = false, + withSchemaEvolution = table.EVOLUTION() != null) case table: InsertOverwriteTableContext => val insertParams = visitInsertOverwriteTable(table) - withIdentClause(insertParams.relationCtx, Seq(query), (ident, otherPlans) => { - createInsertIntoStatement( - insertParams = insertParams, - ident = ident, - query = otherPlans.head, - overwrite = true, - writePrivileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), - withSchemaEvolution = table.EVOLUTION() != null) - }) + val privileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE) + createInsertIntoStatement( + insertParams = insertParams, + tableSlot = buildWriteTableSlot( + insertParams.relationCtx, insertParams.options, privileges), + query = query, + overwrite = true, + withSchemaEvolution = table.EVOLUTION() != null) case ctx: InsertIntoReplaceBooleanCondContext => // Although REPLACE WHERE and REPLACE ON share a unified grammar rule, they have // different SQL semantics: @@ -969,6 +970,9 @@ class AstBuilder extends DataTypeAstBuilder val isInsertReplaceWhere = ctx.WHERE() != null if (isInsertReplaceWhere) { val options = Option(ctx.optionsClause()) + // OverwriteByExpression.table is `NamedRelation`, so we cannot put a + // `PlanWithUnresolvedIdentifier` directly in the slot - wrap the whole command and + // rely on the post-hoc collapse in `ResolveIdentifierClause`. withIdentClause(ctx.identifierReference, Seq(query), (ident, otherPlans) => { val table = createUnresolvedRelation( ctx = ctx.identifierReference, @@ -994,37 +998,34 @@ class AstBuilder extends DataTypeAstBuilder }) } else { val insertParams = visitInsertIntoReplaceOn(ctx) - withIdentClause(insertParams.relationCtx, Seq(query), (ident, otherPlans) => { - val query = { - val queryAliasOpt = - getTableAliasWithoutColumnAlias(queryAliasCtx, "INSERT REPLACE ON") - - queryAliasOpt.map { queryAlias => - withOrigin(queryAliasCtx) { - SubqueryAlias(queryAlias, child = otherPlans.head) - } - }.getOrElse(otherPlans.head) - } - createInsertIntoStatement( - insertParams = insertParams, - ident = ident, - query = query, - overwrite = true, - writePrivileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), - withSchemaEvolution = ctx.EVOLUTION() != null) - }) - } - case ctx: InsertIntoReplaceUsingContext => - val insertParams = visitInsertIntoReplaceUsing(ctx) - withIdentClause(insertParams.relationCtx, Seq(query), (ident, otherPlans) => { + val privileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE) + val finalQuery = { + val queryAliasOpt = + getTableAliasWithoutColumnAlias(queryAliasCtx, "INSERT REPLACE ON") + queryAliasOpt.map { queryAlias => + withOrigin(queryAliasCtx) { + SubqueryAlias(queryAlias, child = query) + } + }.getOrElse(query) + } createInsertIntoStatement( insertParams = insertParams, - ident = ident, - query = otherPlans.head, + tableSlot = buildWriteTableSlot( + insertParams.relationCtx, insertParams.options, privileges), + query = finalQuery, overwrite = true, - writePrivileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), withSchemaEvolution = ctx.EVOLUTION() != null) - }) + } + case ctx: InsertIntoReplaceUsingContext => + val insertParams = visitInsertIntoReplaceUsing(ctx) + val privileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE) + createInsertIntoStatement( + insertParams = insertParams, + tableSlot = buildWriteTableSlot( + insertParams.relationCtx, insertParams.options, privileges), + query = query, + overwrite = true, + withSchemaEvolution = ctx.EVOLUTION() != null) case dir: InsertOverwriteDirContext => val (isLocal, storage, provider) = visitInsertOverwriteDir(dir) InsertIntoDir(isLocal, storage, provider, query, overwrite = true) @@ -1153,18 +1154,12 @@ class AstBuilder extends DataTypeAstBuilder */ private def createInsertIntoStatement( insertParams: InsertTableParams, - ident: Seq[String], + tableSlot: LogicalPlan, query: LogicalPlan, overwrite: Boolean, - writePrivileges: Set[TableWritePrivilege], withSchemaEvolution: Boolean): InsertIntoStatement = { InsertIntoStatement( - table = createUnresolvedRelation( - ctx = insertParams.relationCtx, - ident = ident, - optionsClause = insertParams.options, - writePrivileges = writePrivileges, - isStreaming = false), + table = tableSlot, partitionSpec = insertParams.partitionSpec, userSpecifiedCols = insertParams.userSpecifiedCols, query = query, @@ -1175,6 +1170,24 @@ class AstBuilder extends DataTypeAstBuilder withSchemaEvolution = withSchemaEvolution) } + /** + * Build the `table` slot of a write command. If the identifier reference is a constant string, + * returns an [[UnresolvedRelation]] directly; otherwise returns a + * [[PlanWithUnresolvedIdentifier]] that materializes into an [[UnresolvedRelation]] once the + * identifier expression is resolved. + * + * Placing the placeholder in the identifier slot (rather than wrapping the entire write command) + * preserves the `CTEInChildren` shape at parse time, so `CTESubstitution` places `WithCTE` on the + * command's children correctly. See SPARK-46625. + */ + private def buildWriteTableSlot( + ctx: IdentifierReferenceContext, + optionsClause: Option[OptionsClauseContext], + writePrivileges: Set[TableWritePrivilege]): LogicalPlan = { + withIdentClause(ctx, parts => + createUnresolvedRelation(ctx, parts, optionsClause, writePrivileges, isStreaming = false)) + } + /** * Write to a directory, returning a [[InsertIntoDir]] logical plan. */ @@ -5687,42 +5700,45 @@ class AstBuilder extends DataTypeAstBuilder bucketSpec.map(_.asTransform) ++ clusterBySpec.map(_.asTransform) - val asSelectPlan = Option(ctx.query).map(plan).toSeq - withIdentClause(identifierContext, asSelectPlan, (identifiers, otherPlans) => { - val namedConstraints = - constraints.map(c => c.withTableName(identifiers.last)) - val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, - collation, serdeInfo, external, namedConstraints) - val identifier = withOrigin(identifierContext) { - UnresolvedIdentifier(identifiers) - } - otherPlans.headOption match { - case Some(_) if columns.nonEmpty => + Option(ctx.query).map(plan) match { + case Some(query) => + // CTAS path: push the identifier placeholder into the `name` slot so that + // `CTESubstitution` sees the `CreateTableAsSelect` (a `CTEInChildren`) directly + // and places `WithCTE` on its children (SPARK-46625). CTAS disallows constraints / + // user-specified columns / non-reference partition columns, so we don't need the + // identifier parts at parse time. + if (columns.nonEmpty) { operationNotAllowed( - "Schema may not be specified in a Create Table As Select (CTAS) statement", - ctx) - - case Some(_) if partCols.nonEmpty => - // non-reference partition columns are not allowed because schema can't be specified + "Schema may not be specified in a Create Table As Select (CTAS) statement", ctx) + } + if (partCols.nonEmpty) { operationNotAllowed( - "Partition column types may not be specified in Create Table As Select (CTAS)", - ctx) - - case Some(_) if constraints.nonEmpty => + "Partition column types may not be specified in Create Table As Select (CTAS)", ctx) + } + if (constraints.nonEmpty) { operationNotAllowed( - "Constraints may not be specified in a Create Table As Select (CTAS) statement", - ctx) - - case Some(query) => - CreateTableAsSelect(identifier, partitioning, query, tableSpec, Map.empty, ifNotExists) - - case _ => + "Constraints may not be specified in a Create Table As Select (CTAS) statement", ctx) + } + val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, + collation, serdeInfo, external, constraints = Nil) + val nameSlot = withIdentClause(identifierContext, identifiers => + withOrigin(identifierContext) { UnresolvedIdentifier(identifiers) }) + CreateTableAsSelect(nameSlot, partitioning, query, tableSpec, Map.empty, ifNotExists) + case None => + withIdentClause(identifierContext, identifiers => { + val namedConstraints = + constraints.map(c => c.withTableName(identifiers.last)) + val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, + collation, serdeInfo, external, namedConstraints) + val identifier = withOrigin(identifierContext) { + UnresolvedIdentifier(identifiers) + } // Note: table schema includes both the table columns list and the partition columns // with data type. val allColumns = columns ++ partCols CreateTable(identifier, allColumns, partitioning, tableSpec, ignoreIfExists = ifNotExists) - } - }) + }) + } } /** @@ -5771,43 +5787,42 @@ class AstBuilder extends DataTypeAstBuilder clusterBySpec.map(_.asTransform) val identifierContext = ctx.replaceTableHeader().identifierReference() - val asSelectPlan = Option(ctx.query).map(plan).toSeq - withIdentClause(identifierContext, asSelectPlan, (identifiers, otherPlans) => { - val namedConstraints = - constraints.map(c => c.withTableName(identifiers.last)) - val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, - collation, serdeInfo, external = false, namedConstraints) - val identifier = withOrigin(identifierContext) { - UnresolvedIdentifier(identifiers) - } - otherPlans.headOption match { - case Some(_) if columns.nonEmpty => + Option(ctx.query).map(plan) match { + case Some(query) => + // RTAS path: push the identifier placeholder into the `name` slot (see CTAS above). + if (columns.nonEmpty) { operationNotAllowed( - "Schema may not be specified in a Replace Table As Select (RTAS) statement", - ctx) - - case Some(_) if partCols.nonEmpty => - // non-reference partition columns are not allowed because schema can't be specified + "Schema may not be specified in a Replace Table As Select (RTAS) statement", ctx) + } + if (partCols.nonEmpty) { operationNotAllowed( - "Partition column types may not be specified in Replace Table As Select (RTAS)", - ctx) - - case Some(_) if constraints.nonEmpty => + "Partition column types may not be specified in Replace Table As Select (RTAS)", ctx) + } + if (constraints.nonEmpty) { operationNotAllowed( - "Constraints may not be specified in a Replace Table As Select (RTAS) statement", - ctx) - - case Some(query) => - ReplaceTableAsSelect(identifier, partitioning, query, tableSpec, - writeOptions = Map.empty, orCreate = orCreate) - - case _ => + "Constraints may not be specified in a Replace Table As Select (RTAS) statement", ctx) + } + val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, + collation, serdeInfo, external = false, constraints = Nil) + val nameSlot = withIdentClause(identifierContext, identifiers => + withOrigin(identifierContext) { UnresolvedIdentifier(identifiers) }) + ReplaceTableAsSelect(nameSlot, partitioning, query, tableSpec, + writeOptions = Map.empty, orCreate = orCreate) + case None => + withIdentClause(identifierContext, identifiers => { + val namedConstraints = + constraints.map(c => c.withTableName(identifiers.last)) + val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, + collation, serdeInfo, external = false, namedConstraints) + val identifier = withOrigin(identifierContext) { + UnresolvedIdentifier(identifiers) + } // Note: table schema includes both the table columns list and the partition columns // with data type. val allColumns = columns ++ partCols ReplaceTable(identifier, allColumns, partitioning, tableSpec, orCreate = orCreate) - } - }) + }) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index c236f7cf08e82..45c5907a5dd0a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.MetadataColumnHelper import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.collection.BitSet abstract class LogicalPlan @@ -80,6 +81,40 @@ abstract class LogicalPlan def isStreaming: Boolean = _isStreaming private[this] lazy val _isStreaming = children.exists(_.isStreaming) + /** + * Logical sub-plans that live in non-child slots of this node (e.g. + * `InsertIntoStatement.table`). These are not traversed by the standard + * `children`-based machinery, but rules that need to reach unresolved + * placeholders inside them (e.g. tree-pattern propagation, `BindParameters`, + * `ResolveIdentifierClause`) can opt in by reading `innerPlans` and + * rebuilding via `withNewInnerPlans`. + * + * Most plans have no such slots; the default is `Nil`. + */ + def innerPlans: Seq[LogicalPlan] = Nil + + /** + * Rebuilds this node with new versions of the plans returned by `innerPlans`. + * `newInnerPlans` must have the same length and ordering as `innerPlans`. + * Default no-op for plans that do not override `innerPlans`. + */ + def withNewInnerPlans(newInnerPlans: Seq[LogicalPlan]): LogicalPlan = { + assert(newInnerPlans.isEmpty, + s"$nodeName does not declare innerPlans; cannot replace with $newInnerPlans") + this + } + + override protected def getDefaultTreePatternBits: BitSet = { + val bits = super.getDefaultTreePatternBits + // Propagate tree-pattern bits from inner plans (non-child LogicalPlan slots) + // so that rules using `containsPattern` can prune correctly. + val it = innerPlans.iterator + while (it.hasNext) { + bits.union(it.next().treePatternBits) + } + bits + } + override def verboseStringWithSuffix(maxFields: Int): String = { super.verboseString(maxFields) + statsCache.map(", " + _.toString).getOrElse("") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 774c783ecf8a2..6c7b1f219dc72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -210,6 +210,16 @@ case class InsertIntoStatement( override def child: LogicalPlan = query override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoStatement = copy(query = newChild) + + // `table` is a non-child LogicalPlan slot. Expose it via `innerPlans` so that + // tree-pattern propagation, `BindParameters`, and `ResolveIdentifierClause` + // can reach unresolved placeholders inside it. + override def innerPlans: Seq[LogicalPlan] = Seq(table) + + override def withNewInnerPlans(newInnerPlans: Seq[LogicalPlan]): InsertIntoStatement = { + assert(newInnerPlans.length == 1) + copy(table = newInnerPlans.head) + } } sealed abstract class InsertReplaceCriteria extends Expression with Unevaluable { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index d6b22431e854e..a485a2b69bf37 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -22,7 +22,7 @@ import java.time.{Instant, LocalDate, LocalDateTime, ZoneId} import org.apache.spark.sql.catalyst.ExtendedAnalysisException import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.Limit +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, Limit, WithCTE} import org.apache.spark.sql.catalyst.trees.SQLQueryContext import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.functions.{array, call_function, lit, map, map_from_arrays, map_from_entries, str_to_map, struct} @@ -2460,4 +2460,73 @@ class ParametersSuite extends SharedSparkSession { spark.sql("SELECT 1", Array.empty[Any]), Row(1)) } + + // SPARK-46625: WITH ... SELECT ... FROM cte + // The placeholder is pushed into the command's identifier slot at parse time, so + // `CTESubstitution` sees the `CTEInChildren` directly and never produces the invalid + // `WithCTE(InsertIntoStatement, ...)` / `WithCTE(CreateTableAsSelect, ...)` shape. + private def assertNoWithCTEAroundCTEInChildren(df: DataFrame): Unit = { + df.queryExecution.analyzed.foreach { + case WithCTE(_: CTEInChildren, _) => + fail(s"Found invalid WithCTE(CTEInChildren, _) shape:\n${df.queryExecution.analyzed}") + case _ => + } + } + + test("SPARK-46625: WITH ... INSERT OVERWRITE TABLE IDENTIFIER(:p) SELECT ... FROM cte") { + withTable("t_cte_overwrite") { + sql("CREATE TABLE t_cte_overwrite (a INT) USING PARQUET") + sql("INSERT INTO t_cte_overwrite VALUES (10)") + val df = spark.sql( + """WITH transformation AS (SELECT 1 AS a) + |INSERT OVERWRITE TABLE IDENTIFIER(:tname) + |SELECT * FROM transformation""".stripMargin, + Map("tname" -> "t_cte_overwrite")) + assertNoWithCTEAroundCTEInChildren(df) + checkAnswer(spark.table("t_cte_overwrite"), Row(1)) + } + } + + test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) SELECT ... FROM cte") { + withTable("t_cte_into") { + sql("CREATE TABLE t_cte_into (a INT) USING PARQUET") + val df = spark.sql( + """WITH transformation AS (SELECT 7 AS a) + |INSERT INTO IDENTIFIER(:tname) + |SELECT * FROM transformation""".stripMargin, + Map("tname" -> "t_cte_into")) + assertNoWithCTEAroundCTEInChildren(df) + checkAnswer(spark.table("t_cte_into"), Row(7)) + } + } + + test("SPARK-46625: CREATE TABLE IDENTIFIER(:p) AS WITH ... SELECT ... FROM cte") { + withTable("t_cte_ctas") { + val df = spark.sql( + """CREATE TABLE IDENTIFIER(:tname) USING PARQUET AS + |WITH transformation AS (SELECT 3 AS a) + |SELECT * FROM transformation""".stripMargin, + Map("tname" -> "t_cte_ctas")) + assertNoWithCTEAroundCTEInChildren(df) + checkAnswer(spark.table("t_cte_ctas"), Row(3)) + } + } + + // SPARK-46625: legacy parameter-substitution mode triggers the parameters.scala traversal + // path. The placeholder lives in `InsertIntoStatement.table`, which is *not* a child, so this + // exercises the `innerPlans` hook on `LogicalPlan` and the corresponding recursion in + // `BindParameters.bind`. + test("SPARK-46625: INSERT IDENTIFIER(:p) under legacy parameter substitution") { + withSQLConf(SQLConf.LEGACY_PARAMETER_SUBSTITUTION_CONSTANTS_ONLY.key -> "true") { + withTable("t_legacy_param") { + sql("CREATE TABLE t_legacy_param (a INT) USING PARQUET") + spark.sql( + """WITH transformation AS (SELECT 11 AS a) + |INSERT INTO IDENTIFIER(:tname) + |SELECT * FROM transformation""".stripMargin, + Map("tname" -> "t_legacy_param")) + checkAnswer(spark.table("t_legacy_param"), Row(11)) + } + } + } } From d17669d0334ac10af8124fb2f6a21c3f42e2dc0e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 05:33:55 +0000 Subject: [PATCH 02/15] Drop innerPlans abstraction; special-case InsertIntoStatement Replace the generic `innerPlans` / `withNewInnerPlans` hooks on `LogicalPlan` (and the corresponding override in `LogicalPlan.getDefaultTreePatternBits`) with direct special-cases for `InsertIntoStatement`: - `InsertIntoStatement` overrides `getDefaultTreePatternBits` itself to add `table.treePatternBits`. - `BindParameters.bind` pattern-matches `InsertIntoStatement` to recurse into `table`. - `ResolveIdentifierClause.apply0` pattern-matches `InsertIntoStatement(p: PlanWithUnresolvedIdentifier, ...)` to materialize the placeholder in the `table` slot. `InsertIntoStatement.table` is the only non-child `LogicalPlan` slot that needs this traversal, so the abstraction was paying generality cost for one user. Co-authored-by: Isaac --- .../analysis/ResolveIdentifierClause.scala | 42 +++++++------------ .../sql/catalyst/analysis/parameters.scala | 19 +++++---- .../catalyst/plans/logical/LogicalPlan.scala | 35 ---------------- .../catalyst/plans/logical/statements.scala | 17 ++++---- 4 files changed, 35 insertions(+), 78 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala index 8d2e2b9d18998..dde415f86429c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, VariableReference} -import org.apache.spark.sql.catalyst.plans.logical.{CreateView, CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CreateView, CTEInChildren, InsertIntoStatement, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.errors.QueryCompilationErrors @@ -70,26 +70,16 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] executor.execute(p.planBuilder.apply( IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) - case cmd if cmd.innerPlans.exists(_.exists { - case p: PlanWithUnresolvedIdentifier => p.identifierExpr.resolved && p.childrenResolved - case _ => false - }) => - // Materialize placeholders that live in non-child LogicalPlan slots (e.g. - // `InsertIntoStatement.table`). Without this case, the standard `resolveOperatorsUp` - // never visits these slots because they're not in `children`. - val newInnerPlans = cmd.innerPlans.map { inner => - inner.resolveOperatorsUpWithPruning( - _.containsAnyPattern(UNRESOLVED_IDENTIFIER, PLAN_WITH_UNRESOLVED_IDENTIFIER)) { - case p: PlanWithUnresolvedIdentifier - if p.identifierExpr.resolved && p.childrenResolved => - if (referredTempVars.isDefined) { - referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) - } - executor.execute(p.planBuilder.apply( - IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) - } + // `InsertIntoStatement.table` is a non-child LogicalPlan slot (`child = query`), so the + // standard `resolveOperatorsUp` traversal never visits placeholders inside it. Materialize + // them explicitly. + case i @ InsertIntoStatement(p: PlanWithUnresolvedIdentifier, _, _, _, _, _, _, _, _) + if p.identifierExpr.resolved && p.childrenResolved => + if (referredTempVars.isDefined) { + referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) } - cmd.withNewInnerPlans(newInnerPlans) + i.copy(table = executor.execute(p.planBuilder.apply( + IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children))) case other => other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) { case e: ExpressionWithUnresolvedIdentifier if e.identifierExpr.resolved => @@ -102,12 +92,12 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] IdentifierResolution.evalIdentifierExpr(e.identifierExpr), e.otherExprs) } } - // For the call sites we cannot refactor at the parser level (e.g. `OverwriteByExpression` - // whose `table` field is typed `NamedRelation`, or `CacheTableAsSelect` whose name is a - // plain String), `PlanWithUnresolvedIdentifier` still wraps the entire command. When that - // wrapper is itself inside `WithCTE`, push the CTE defs into the materialized command's - // children - restoring the invariant `CTESubstitution.withCTEDefs` enforces at substitution - // time. SPARK-46625. + // For the `withIdentClause` call sites we cannot refactor at the parser level (e.g. + // `OverwriteByExpression` whose `table` field is typed `NamedRelation`, or + // `CacheTableAsSelect` whose name is a plain String), `PlanWithUnresolvedIdentifier` still + // wraps the entire command. When that wrapper is itself inside `WithCTE`, push the CTE + // defs into the materialized command's children - restoring the invariant + // `CTESubstitution.withCTEDefs` enforces at substitution time. SPARK-46625. resolved.resolveOperatorsUpWithPruning(_.containsPattern(CTE)) { case WithCTE(c: CTEInChildren, cteDefs) => c.withCTEDefs(cteDefs) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala index d3f3485b70b1a..4c03c0d26611d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, SubqueryExpression, Unevaluable} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SupervisingCommand} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, SupervisingCommand} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMAND, PARAMETER, PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} @@ -179,15 +179,16 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase { p0.resolveOperatorsDownWithPruning(_.containsPattern(PARAMETER) && !stop) { case p1 => stop = p1.isInstanceOf[ParameterizedQuery] - val withBoundInnerPlans = if (p1.innerPlans.nonEmpty && - p1.innerPlans.exists(_.containsPattern(PARAMETER))) { - // Recurse into non-child LogicalPlan slots (e.g. `InsertIntoStatement.table`). - // Without this, parameters inside identifier placeholders would never be bound. - p1.withNewInnerPlans(p1.innerPlans.map(bind(_)(f))) - } else { - p1 + // `InsertIntoStatement.table` is a non-child LogicalPlan slot, so the standard + // `resolveOperatorsDown` traversal never visits parameter markers inside it. + // Recurse explicitly so `INSERT ... IDENTIFIER(:p)` resolves under the legacy + // parameter-substitution mode (SPARK-46625). + val withBoundTable = p1 match { + case i: InsertIntoStatement if i.table.containsPattern(PARAMETER) => + i.copy(table = bind(i.table)(f)) + case other => other } - withBoundInnerPlans.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) ( + withBoundTable.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) ( f orElse { case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f)) }) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 45c5907a5dd0a..c236f7cf08e82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.MetadataColumnHelper import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.util.collection.BitSet abstract class LogicalPlan @@ -81,40 +80,6 @@ abstract class LogicalPlan def isStreaming: Boolean = _isStreaming private[this] lazy val _isStreaming = children.exists(_.isStreaming) - /** - * Logical sub-plans that live in non-child slots of this node (e.g. - * `InsertIntoStatement.table`). These are not traversed by the standard - * `children`-based machinery, but rules that need to reach unresolved - * placeholders inside them (e.g. tree-pattern propagation, `BindParameters`, - * `ResolveIdentifierClause`) can opt in by reading `innerPlans` and - * rebuilding via `withNewInnerPlans`. - * - * Most plans have no such slots; the default is `Nil`. - */ - def innerPlans: Seq[LogicalPlan] = Nil - - /** - * Rebuilds this node with new versions of the plans returned by `innerPlans`. - * `newInnerPlans` must have the same length and ordering as `innerPlans`. - * Default no-op for plans that do not override `innerPlans`. - */ - def withNewInnerPlans(newInnerPlans: Seq[LogicalPlan]): LogicalPlan = { - assert(newInnerPlans.isEmpty, - s"$nodeName does not declare innerPlans; cannot replace with $newInnerPlans") - this - } - - override protected def getDefaultTreePatternBits: BitSet = { - val bits = super.getDefaultTreePatternBits - // Propagate tree-pattern bits from inner plans (non-child LogicalPlan slots) - // so that rules using `containsPattern` can prune correctly. - val it = innerPlans.iterator - while (it.hasNext) { - bits.union(it.next().treePatternBits) - } - bits - } - override def verboseStringWithSuffix(maxFields: Int): String = { super.verboseString(maxFields) + statsCache.map(", " + _.toString).getOrElse("") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 6c7b1f219dc72..4fbe71ed7d3e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.trees.{LeafLike, UnaryLike} import org.apache.spark.sql.connector.catalog.ColumnDefaultValue import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types.DataType +import org.apache.spark.util.collection.BitSet /** * A logical plan node that contains exactly what was parsed from SQL. @@ -211,14 +212,14 @@ case class InsertIntoStatement( override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoStatement = copy(query = newChild) - // `table` is a non-child LogicalPlan slot. Expose it via `innerPlans` so that - // tree-pattern propagation, `BindParameters`, and `ResolveIdentifierClause` - // can reach unresolved placeholders inside it. - override def innerPlans: Seq[LogicalPlan] = Seq(table) - - override def withNewInnerPlans(newInnerPlans: Seq[LogicalPlan]): InsertIntoStatement = { - assert(newInnerPlans.length == 1) - copy(table = newInnerPlans.head) + // `table` is a non-child LogicalPlan slot (`child = query`), so the default tree-pattern + // propagation in TreeNode/QueryPlan does not see patterns inside it. Add `table`'s bits here + // so that `containsPattern(...)` pruning correctly reports patterns living in `table` + // (e.g. `PARAMETER`, `PLAN_WITH_UNRESOLVED_IDENTIFIER`). + override protected def getDefaultTreePatternBits: BitSet = { + val bits = super.getDefaultTreePatternBits + bits.union(table.treePatternBits) + bits } } From 9b263aa190d4b3bcafc98f460767faecafbab6e6 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 06:50:57 +0000 Subject: [PATCH 03/15] Fix stale innerPlans reference in ParametersSuite comment The innerPlans abstraction was dropped in the previous commit (d17669d0334) in favor of an InsertIntoStatement special-case, but the test-level comment still referenced the removed hook. Update the comment to describe the actual mechanism: the BindParameters.bind special-case plus the getDefaultTreePatternBits override on InsertIntoStatement. Co-authored-by: Isaac --- .../test/scala/org/apache/spark/sql/ParametersSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index a485a2b69bf37..422de09f258de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -2514,8 +2514,9 @@ class ParametersSuite extends SharedSparkSession { // SPARK-46625: legacy parameter-substitution mode triggers the parameters.scala traversal // path. The placeholder lives in `InsertIntoStatement.table`, which is *not* a child, so this - // exercises the `innerPlans` hook on `LogicalPlan` and the corresponding recursion in - // `BindParameters.bind`. + // exercises the `InsertIntoStatement` special-case in `BindParameters.bind` that recurses into + // the `table` slot, and the `getDefaultTreePatternBits` override on `InsertIntoStatement` that + // exposes `table`'s tree-pattern bits for pruning. test("SPARK-46625: INSERT IDENTIFIER(:p) under legacy parameter substitution") { withSQLConf(SQLConf.LEGACY_PARAMETER_SUBSTITUTION_CONSTANTS_ONLY.key -> "true") { withTable("t_legacy_param") { From 15d9e84a67274c3265db5c028ec313b7e19036e9 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 10:42:53 +0000 Subject: [PATCH 04/15] Skip post-hoc CTEInChildren collapse when non-child slots ref the cteDefs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The post-hoc `WithCTE(c: CTEInChildren, _) => c.withCTEDefs(cteDefs)` collapse in `ResolveIdentifierClause.apply0` was firing indiscriminately on any `WithCTE(CTEInChildren, _)` shape, not just the ones produced by materializing a `PlanWithUnresolvedIdentifier`. After `RewriteDeleteFromTable` (and `RewriteUpdateTable`) rewrites a `DeleteFromTable` (non-`CTEInChildren`, wrapped in `WithCTE` by `CTESubstitution.withCTEDefs`) into a `ReplaceData`/`WriteDelta` (`CTEInChildren`), the next fixed-point iteration of `ResolveIdentifierClause` ran the collapse: `WithCTE` got pushed into the `query` child only, but `ReplaceData.condition` / `groupFilterCondition` (non-child expression slots) still held the CTE refs — now dangling. `InlineCTE.buildCTEMap` then threw `NoSuchElementException` on the dangling ref's `cteId`. The failure reproduced as 9 CI test failures across the v2 DML rCTE suites (DeltaBased / GroupBased Delete/Update + recursive CTE). Restrict the collapse to safe cases: only collapse when no cteDef is referenced from a non-child expression slot of the wrapped command. This still handles the intended `OverwriteByExpression` (REPLACE WHERE) and `CacheTableAsSelect` cases — those don't normally have outer CTE refs in their non-child slots — and leaves the `WithCTE(ReplaceData, _)` shape intact post-rewrite. Co-authored-by: Isaac --- .../analysis/ResolveIdentifierClause.scala | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala index dde415f86429c..728650a37200d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, VariableReference} -import org.apache.spark.sql.catalyst.plans.logical.{CreateView, CTEInChildren, InsertIntoStatement, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CreateView, CTEInChildren, CTERelationRef, InsertIntoStatement, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.errors.QueryCompilationErrors @@ -97,9 +97,26 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] // `CacheTableAsSelect` whose name is a plain String), `PlanWithUnresolvedIdentifier` still // wraps the entire command. When that wrapper is itself inside `WithCTE`, push the CTE // defs into the materialized command's children - restoring the invariant - // `CTESubstitution.withCTEDefs` enforces at substitution time. SPARK-46625. + // `CTESubstitution.withCTEDefs` enforces at substitution time. Skip the push when any + // cteDef is referenced from a non-child expression slot of `c` (e.g. `ReplaceData.condition` + // produced by `RewriteDeleteFromTable`); in that case the outer `WithCTE` must stay so + // those refs remain in scope. SPARK-46625. resolved.resolveOperatorsUpWithPruning(_.containsPattern(CTE)) { - case WithCTE(c: CTEInChildren, cteDefs) => c.withCTEDefs(cteDefs) + case WithCTE(c: CTEInChildren, cteDefs) + if !cteDefsRefdInNonChildSlots(c, cteDefs.map(_.id).toSet) => + c.withCTEDefs(cteDefs) + } + } + + private def cteDefsRefdInNonChildSlots(c: LogicalPlan, cteIds: Set[Long]): Boolean = { + c.expressions.exists { e => + e.exists { + case sub: SubqueryExpression => + sub.plan.collectFirstWithSubqueries { + case ref: CTERelationRef if cteIds.contains(ref.cteId) => ref + }.isDefined + case _ => false + } } } From 44eb6c68b7f068cd1b99be752f9e2759ed640a6c Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 11:36:04 +0000 Subject: [PATCH 05/15] Push placeholder into OverwriteByExpression.table; drop post-hoc collapse MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The prior fix narrowed the post-hoc `WithCTE(CTEInChildren, _) => c.withCTEDefs(cteDefs)` collapse so it would not corrupt `WithCTE(ReplaceData, _)` shapes produced by `RewriteDeleteFromTable`. The collapse itself was still a fallback for the two write commands the parser refactor in 59a06373dbd could not yet handle natively (`OverwriteByExpression` whose `table` is typed `NamedRelation`, and `CacheTableAsSelect` whose name is `String`). Eliminate the fallback entirely: - Mix `NamedRelation` into `PlanWithUnresolvedIdentifier` so the placeholder can occupy `OverwriteByExpression.table` directly at parse time, mirroring the `InsertIntoStatement.table` treatment from 59a06373dbd. `AstBuilder` for REPLACE WHERE builds the table slot via the new `buildWriteTableSlotNamedRelation` helper instead of wrapping the whole command. - `ResolveIdentifierClause` materializes the slot via a new `OverwriteByExpression(p: PlanWithUnresolvedIdentifier, ...)` case, parallel to the existing `InsertIntoStatement` case. `OverwriteByExpression` gains a `getDefaultTreePatternBits` override (so `containsPattern` propagation reaches into `table`), and `BindParameters.bind` recurses into `table` (so legacy parameter substitution `INSERT ... REPLACE WHERE ... IDENTIFIER(:p)` resolves). - Mix `CTEInChildren` into `PlanWithUnresolvedIdentifier` so the remaining `CacheTableAsSelect` call site — where the placeholder still wraps the whole command because `tempViewName: String` cannot hold a `LogicalPlan` — gets the right treatment at substitution time: `CTESubstitution.withCTEDefs` dispatches on the placeholder's own `withCTEDefs`, which pushes `WithCTE` into the placeholder's `children` (the query slot). The empty-children case falls back to wrapping the placeholder itself so cteDefs are not silently lost. With the placeholder in the correct slot at parse time (or pushed into children at substitution time for the `CacheTableAsSelect` case), no `WithCTE(CTEInChildren, _)` shape ever reaches the analyzer for a freshly-materialized command, and the post-hoc collapse can be deleted along with its safety helper. Co-authored-by: Isaac --- .../analysis/ResolveIdentifierClause.scala | 43 ++++------- .../sql/catalyst/analysis/parameters.scala | 11 +-- .../sql/catalyst/analysis/unresolved.scala | 24 ++++++- .../sql/catalyst/parser/AstBuilder.scala | 71 ++++++++++--------- .../catalyst/plans/logical/v2Commands.scala | 11 +++ .../apache/spark/sql/ParametersSuite.scala | 30 +++++++- 6 files changed, 120 insertions(+), 70 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala index 728650a37200d..accecef82190e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, VariableReference} -import org.apache.spark.sql.catalyst.plans.logical.{CreateView, CTEInChildren, CTERelationRef, InsertIntoStatement, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CreateView, InsertIntoStatement, LogicalPlan, OverwriteByExpression} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.errors.QueryCompilationErrors @@ -70,9 +70,9 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] executor.execute(p.planBuilder.apply( IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) - // `InsertIntoStatement.table` is a non-child LogicalPlan slot (`child = query`), so the - // standard `resolveOperatorsUp` traversal never visits placeholders inside it. Materialize - // them explicitly. + // `InsertIntoStatement.table` and `OverwriteByExpression.table` are non-child + // LogicalPlan slots (`child = query`), so the standard `resolveOperatorsUp` traversal + // never visits placeholders inside them. Materialize them explicitly. case i @ InsertIntoStatement(p: PlanWithUnresolvedIdentifier, _, _, _, _, _, _, _, _) if p.identifierExpr.resolved && p.childrenResolved => if (referredTempVars.isDefined) { @@ -80,6 +80,14 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] } i.copy(table = executor.execute(p.planBuilder.apply( IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children))) + case o @ OverwriteByExpression(p: PlanWithUnresolvedIdentifier, _, _, _, _, _, _, _) + if p.identifierExpr.resolved && p.childrenResolved => + if (referredTempVars.isDefined) { + referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) + } + o.withNewTable(executor.execute(p.planBuilder.apply( + IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) + .asInstanceOf[NamedRelation]) case other => other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) { case e: ExpressionWithUnresolvedIdentifier if e.identifierExpr.resolved => @@ -92,32 +100,7 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] IdentifierResolution.evalIdentifierExpr(e.identifierExpr), e.otherExprs) } } - // For the `withIdentClause` call sites we cannot refactor at the parser level (e.g. - // `OverwriteByExpression` whose `table` field is typed `NamedRelation`, or - // `CacheTableAsSelect` whose name is a plain String), `PlanWithUnresolvedIdentifier` still - // wraps the entire command. When that wrapper is itself inside `WithCTE`, push the CTE - // defs into the materialized command's children - restoring the invariant - // `CTESubstitution.withCTEDefs` enforces at substitution time. Skip the push when any - // cteDef is referenced from a non-child expression slot of `c` (e.g. `ReplaceData.condition` - // produced by `RewriteDeleteFromTable`); in that case the outer `WithCTE` must stay so - // those refs remain in scope. SPARK-46625. - resolved.resolveOperatorsUpWithPruning(_.containsPattern(CTE)) { - case WithCTE(c: CTEInChildren, cteDefs) - if !cteDefsRefdInNonChildSlots(c, cteDefs.map(_.id).toSet) => - c.withCTEDefs(cteDefs) - } - } - - private def cteDefsRefdInNonChildSlots(c: LogicalPlan, cteIds: Set[Long]): Boolean = { - c.expressions.exists { e => - e.exists { - case sub: SubqueryExpression => - sub.plan.collectFirstWithSubqueries { - case ref: CTERelationRef if cteIds.contains(ref.cteId) => ref - }.isDefined - case _ => false - } - } + resolved } private def collectTemporaryVariablesInLogicalPlan(child: LogicalPlan): Seq[Seq[String]] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala index 4c03c0d26611d..5c420c0fe31ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, SubqueryExpression, Unevaluable} -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, SupervisingCommand} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, OverwriteByExpression, SupervisingCommand} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMAND, PARAMETER, PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} @@ -179,13 +179,16 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase { p0.resolveOperatorsDownWithPruning(_.containsPattern(PARAMETER) && !stop) { case p1 => stop = p1.isInstanceOf[ParameterizedQuery] - // `InsertIntoStatement.table` is a non-child LogicalPlan slot, so the standard - // `resolveOperatorsDown` traversal never visits parameter markers inside it. - // Recurse explicitly so `INSERT ... IDENTIFIER(:p)` resolves under the legacy + // `InsertIntoStatement.table` and `OverwriteByExpression.table` are non-child + // LogicalPlan slots, so the standard `resolveOperatorsDown` traversal never visits + // parameter markers inside them. Recurse explicitly so `INSERT ... IDENTIFIER(:p)` + // and `INSERT INTO REPLACE WHERE ... IDENTIFIER(:p)` resolve under the legacy // parameter-substitution mode (SPARK-46625). val withBoundTable = p1 match { case i: InsertIntoStatement if i.table.containsPattern(PARAMETER) => i.copy(table = bind(i.table)(f)) + case o: OverwriteByExpression if o.table.containsPattern(PARAMETER) => + o.withNewTable(bind(o.table)(f).asInstanceOf[NamedRelation]) case other => other } withBoundTable.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) ( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index bac5651265d94..874fb4b9259d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIden import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, SupportsSubquery, UnaryNode} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LeafNode, LogicalPlan, SupportsSubquery, UnaryNode, WithCTE} import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId @@ -59,12 +59,20 @@ trait UnresolvedUnaryNode extends UnaryNode with UnresolvedNode /** * A logical plan placeholder that holds the identifier clause string expression. It will be * replaced by the actual logical plan with the evaluated identifier string. + * + * Extends `NamedRelation` so it can occupy a `NamedRelation`-typed slot (e.g. + * `OverwriteByExpression.table`) directly at parse time, instead of wrapping the whole command. + * + * Extends `CTEInChildren` so when the placeholder wraps an entire CTEInChildren command (e.g. + * `CacheTableAsSelect`), `CTESubstitution.withCTEDefs` pushes the surrounding `WithCTE` into the + * placeholder's children at substitution time — the cteDefs land in the materialized command's + * children by construction, with no post-hoc collapse required. */ case class PlanWithUnresolvedIdentifier( identifierExpr: Expression, children: Seq[LogicalPlan], planBuilder: (Seq[String], Seq[LogicalPlan]) => LogicalPlan) - extends UnresolvedNode { + extends UnresolvedNode with NamedRelation with CTEInChildren { def this(identifierExpr: Expression, planBuilder: Seq[String] => LogicalPlan) = { this(identifierExpr, Nil, (ident, _) => planBuilder(ident)) @@ -72,6 +80,18 @@ case class PlanWithUnresolvedIdentifier( final override val nodePatterns: Seq[TreePattern] = Seq(PLAN_WITH_UNRESOLVED_IDENTIFIER) + override def name: String = identifierExpr.sql + + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + if (children.isEmpty) { + // No inner plan to wrap; keep the `WithCTE` outside (same shape `CTESubstitution.withCTEDefs` + // would have produced if this node weren't `CTEInChildren`). + WithCTE(this, cteDefs) + } else { + withNewChildren(children.map(WithCTE(_, cteDefs))) + } + } + override protected def withNewChildrenInternal( newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = copy(identifierExpr, newChildren, planBuilder) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8bb6a1a10c2b6..d07b1620308bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -933,13 +933,11 @@ class AstBuilder extends DataTypeAstBuilder query: LogicalPlan, queryAliasCtx: TableAliasContext): LogicalPlan = withOrigin(ctx) { ctx match { - // For `InsertIntoStatement`-producing branches, build the `table` slot directly via - // `buildWriteTableSlot` so that any `PlanWithUnresolvedIdentifier` lives *inside* the - // command. This preserves the `CTEInChildren` shape and lets `CTESubstitution` place - // `WithCTE` on the command's children correctly (SPARK-46625). - // `OverwriteByExpression.table` is typed `NamedRelation`, so the REPLACE WHERE branch - // still wraps the command with `PlanWithUnresolvedIdentifier`; that case is handled - // by the post-hoc `WithCTE(c: CTEInChildren, _)` collapse in `ResolveIdentifierClause`. + // For all `InsertIntoStatement` / `OverwriteByExpression`-producing branches, build the + // `table` slot directly via `buildWriteTableSlot` so that any + // `PlanWithUnresolvedIdentifier` lives *inside* the command's identifier slot. This + // preserves the `CTEInChildren` shape and lets `CTESubstitution` place `WithCTE` on the + // command's children correctly (SPARK-46625). case table: InsertIntoTableContext => val insertParams = visitInsertIntoTable(table) val privileges = Set(TableWritePrivilege.INSERT) @@ -970,32 +968,27 @@ class AstBuilder extends DataTypeAstBuilder val isInsertReplaceWhere = ctx.WHERE() != null if (isInsertReplaceWhere) { val options = Option(ctx.optionsClause()) - // OverwriteByExpression.table is `NamedRelation`, so we cannot put a - // `PlanWithUnresolvedIdentifier` directly in the slot - wrap the whole command and - // rely on the post-hoc collapse in `ResolveIdentifierClause`. - withIdentClause(ctx.identifierReference, Seq(query), (ident, otherPlans) => { - val table = createUnresolvedRelation( - ctx = ctx.identifierReference, - ident = ident, - optionsClause = options, - writePrivileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), - isStreaming = false) - val deleteExpr = expression(ctx.replaceCondition) - val isByName = ctx.NAME() != null - if (isByName) { - OverwriteByExpression.byName( - table, - df = otherPlans.head, - deleteExpr, - withSchemaEvolution = ctx.EVOLUTION() != null) - } else { - OverwriteByExpression.byPosition( - table, - query = otherPlans.head, - deleteExpr, - withSchemaEvolution = ctx.EVOLUTION() != null) - } - }) + val privileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE) + // `PlanWithUnresolvedIdentifier` is a `NamedRelation`, so it can occupy + // `OverwriteByExpression.table` directly; the materialization happens in + // `ResolveIdentifierClause` via its `OverwriteByExpression` special-case. + val table = buildWriteTableSlotNamedRelation( + ctx.identifierReference, options, privileges) + val deleteExpr = expression(ctx.replaceCondition) + val isByName = ctx.NAME() != null + if (isByName) { + OverwriteByExpression.byName( + table, + df = query, + deleteExpr, + withSchemaEvolution = ctx.EVOLUTION() != null) + } else { + OverwriteByExpression.byPosition( + table, + query = query, + deleteExpr, + withSchemaEvolution = ctx.EVOLUTION() != null) + } } else { val insertParams = visitInsertIntoReplaceOn(ctx) val privileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE) @@ -1188,6 +1181,18 @@ class AstBuilder extends DataTypeAstBuilder createUnresolvedRelation(ctx, parts, optionsClause, writePrivileges, isStreaming = false)) } + /** + * Variant of `buildWriteTableSlot` returning a `NamedRelation`, for slots typed as + * `NamedRelation` (e.g. `OverwriteByExpression.table`). `PlanWithUnresolvedIdentifier` is a + * `NamedRelation`, so the placeholder fits the slot type directly. + */ + private def buildWriteTableSlotNamedRelation( + ctx: IdentifierReferenceContext, + optionsClause: Option[OptionsClauseContext], + writePrivileges: Set[TableWritePrivilege]): NamedRelation = { + buildWriteTableSlot(ctx, optionsClause, writePrivileges).asInstanceOf[NamedRelation] + } + /** * Write to a directory, returning a [[InsertIntoDir]] logical plan. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index b1ab46ee94817..dd4b52bae2435 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructType} import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils +import org.apache.spark.util.collection.BitSet // For v2 DML commands, it may end up with the v1 fallback code path and need to build a DataFrame // which is required by the DS v1 API. We need to keep the analyzed input query plan to build @@ -242,6 +243,16 @@ case class OverwriteByExpression( override def storeAnalyzedQuery(): Command = copy(analyzedQuery = Some(query)) override protected def withNewChildInternal(newChild: LogicalPlan): OverwriteByExpression = copy(query = newChild) + + // `table` is a non-child slot, so the default tree-pattern propagation in TreeNode/QueryPlan + // does not see patterns inside it. Add `table`'s bits so that `containsPattern(...)` pruning + // correctly reports patterns living in `table` (e.g. `PLAN_WITH_UNRESOLVED_IDENTIFIER`, + // `PARAMETER`). + override protected def getDefaultTreePatternBits: BitSet = { + val bits = super.getDefaultTreePatternBits + bits.union(table.treePatternBits) + bits + } } object OverwriteByExpression { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index 422de09f258de..9472192d1cdd8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql import java.time.{Instant, LocalDate, LocalDateTime, ZoneId} import org.apache.spark.sql.catalyst.ExtendedAnalysisException +import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, PlanWithUnresolvedIdentifier} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, Limit, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, Limit, OverwriteByExpression, WithCTE} import org.apache.spark.sql.catalyst.trees.SQLQueryContext import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.functions.{array, call_function, lit, map, map_from_arrays, map_from_entries, str_to_map, struct} @@ -2530,4 +2531,31 @@ class ParametersSuite extends SharedSparkSession { } } } + + // SPARK-46625: INSERT INTO REPLACE WHERE goes through `OverwriteByExpression`, whose `table` + // slot is typed `NamedRelation`. `PlanWithUnresolvedIdentifier` extends `NamedRelation` so the + // placeholder sits in the slot directly. Verify on the parsed plan that the placeholder lives + // in `OverwriteByExpression.table` rather than wrapping the whole command — running the + // analyzer fully would require a v2 catalog. + test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) REPLACE WHERE ... — parser") { + // Use a non-literal-string expression so `withIdentClause` produces + // `PlanWithUnresolvedIdentifier` rather than short-circuiting to `UnresolvedRelation`. + val parsedPlan = spark.sessionState.sqlParser.parsePlan( + """WITH transformation AS (SELECT 99 AS a) + |INSERT INTO IDENTIFIER('some' || '_table') REPLACE WHERE a = 10 + |SELECT * FROM transformation""".stripMargin) + val overwrite = parsedPlan.collectFirst { case o: OverwriteByExpression => o }.getOrElse( + fail(s"Expected OverwriteByExpression in parsed plan:\n$parsedPlan")) + assert(overwrite.table.isInstanceOf[PlanWithUnresolvedIdentifier], + s"Expected OverwriteByExpression.table to be PlanWithUnresolvedIdentifier, " + + s"got ${overwrite.table.getClass.getSimpleName}:\n$parsedPlan") + // After CTESubstitution runs, the CTE defs should land on the command's children (because + // OverwriteByExpression is a CTEInChildren) — never as `WithCTE(OverwriteByExpression, _)`. + val substituted = CTESubstitution.apply(parsedPlan) + substituted.foreach { + case WithCTE(_: CTEInChildren, _) => + fail(s"Found invalid WithCTE(CTEInChildren, _) shape after CTESubstitution:\n$substituted") + case _ => + } + } } From 1cf623a4c8788809f576780dad8653d63394a534 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 11:55:17 +0000 Subject: [PATCH 06/15] Replace em-dashes with ASCII to satisfy scalastyle nonascii --- .../org/apache/spark/sql/catalyst/analysis/unresolved.scala | 2 +- .../test/scala/org/apache/spark/sql/ParametersSuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 874fb4b9259d3..955a766080dbd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -65,7 +65,7 @@ trait UnresolvedUnaryNode extends UnaryNode with UnresolvedNode * * Extends `CTEInChildren` so when the placeholder wraps an entire CTEInChildren command (e.g. * `CacheTableAsSelect`), `CTESubstitution.withCTEDefs` pushes the surrounding `WithCTE` into the - * placeholder's children at substitution time — the cteDefs land in the materialized command's + * placeholder's children at substitution time -- the cteDefs land in the materialized command's * children by construction, with no post-hoc collapse required. */ case class PlanWithUnresolvedIdentifier( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index 9472192d1cdd8..e4d2323b3ca55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -2535,9 +2535,9 @@ class ParametersSuite extends SharedSparkSession { // SPARK-46625: INSERT INTO REPLACE WHERE goes through `OverwriteByExpression`, whose `table` // slot is typed `NamedRelation`. `PlanWithUnresolvedIdentifier` extends `NamedRelation` so the // placeholder sits in the slot directly. Verify on the parsed plan that the placeholder lives - // in `OverwriteByExpression.table` rather than wrapping the whole command — running the + // in `OverwriteByExpression.table` rather than wrapping the whole command -- running the // analyzer fully would require a v2 catalog. - test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) REPLACE WHERE ... — parser") { + test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) REPLACE WHERE ... parser") { // Use a non-literal-string expression so `withIdentClause` produces // `PlanWithUnresolvedIdentifier` rather than short-circuiting to `UnresolvedRelation`. val parsedPlan = spark.sessionState.sqlParser.parsePlan( @@ -2550,7 +2550,7 @@ class ParametersSuite extends SharedSparkSession { s"Expected OverwriteByExpression.table to be PlanWithUnresolvedIdentifier, " + s"got ${overwrite.table.getClass.getSimpleName}:\n$parsedPlan") // After CTESubstitution runs, the CTE defs should land on the command's children (because - // OverwriteByExpression is a CTEInChildren) — never as `WithCTE(OverwriteByExpression, _)`. + // OverwriteByExpression is a CTEInChildren) -- never as `WithCTE(OverwriteByExpression, _)`. val substituted = CTESubstitution.apply(parsedPlan) substituted.foreach { case WithCTE(_: CTEInChildren, _) => From 9a1c9a97752ff7f95f3db354d23e6d4fdd2875db Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 13:15:00 +0000 Subject: [PATCH 07/15] Tighten OverwriteByExpression slot handling; assert placeholder has children - Replace asInstanceOf[NamedRelation] in ResolveIdentifierClause and BindParameters with explicit pattern matches that throw a clear internal-error if the materialized table is not a NamedRelation. - Convert the unreachable children.isEmpty branch in PlanWithUnresolvedIdentifier.withCTEDefs into an assert. The previous fallback WithCTE(this, cteDefs) would re-introduce the structurally invalid WithCTE(, _) shape this PR is eliminating. - Drop the leftover val resolved = ...; resolved wrapping in ResolveIdentifierClause.apply0. - Add a ParametersSuite test that exercises BindParameters' OverwriteByExpression branch (and the getDefaultTreePatternBits PARAMETER-bit propagation) without needing a v2 catalog. Co-authored-by: Isaac --- .../analysis/ResolveIdentifierClause.scala | 18 +++++++----- .../sql/catalyst/analysis/parameters.scala | 8 ++++- .../sql/catalyst/analysis/unresolved.scala | 19 +++++++----- .../apache/spark/sql/ParametersSuite.scala | 29 ++++++++++++++++++- 4 files changed, 58 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala index accecef82190e..6c8be88cf9cc9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, VariableReference} import org.apache.spark.sql.catalyst.plans.logical.{CreateView, InsertIntoStatement, LogicalPlan, OverwriteByExpression} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} @@ -59,8 +60,8 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] private def apply0( plan: LogicalPlan, - referredTempVars: Option[mutable.ArrayBuffer[Seq[String]]] = None): LogicalPlan = { - val resolved = plan.resolveOperatorsUpWithPruning(_.containsAnyPattern( + referredTempVars: Option[mutable.ArrayBuffer[Seq[String]]] = None): LogicalPlan = + plan.resolveOperatorsUpWithPruning(_.containsAnyPattern( UNRESOLVED_IDENTIFIER, PLAN_WITH_UNRESOLVED_IDENTIFIER)) { case p: PlanWithUnresolvedIdentifier if p.identifierExpr.resolved && p.childrenResolved => @@ -85,9 +86,14 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] if (referredTempVars.isDefined) { referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) } - o.withNewTable(executor.execute(p.planBuilder.apply( - IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) - .asInstanceOf[NamedRelation]) + executor.execute(p.planBuilder.apply( + IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) match { + case nr: NamedRelation => o.withNewTable(nr) + case other => + throw SparkException.internalError( + "PlanWithUnresolvedIdentifier in OverwriteByExpression.table must materialize " + + s"into a NamedRelation, but got: ${other.getClass.getName}") + } case other => other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) { case e: ExpressionWithUnresolvedIdentifier if e.identifierExpr.resolved => @@ -100,8 +106,6 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] IdentifierResolution.evalIdentifierExpr(e.identifierExpr), e.otherExprs) } } - resolved - } private def collectTemporaryVariablesInLogicalPlan(child: LogicalPlan): Seq[Seq[String]] = { def collectTempVars(child: LogicalPlan): Seq[Seq[String]] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala index 5c420c0fe31ec..48dfe26abb468 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala @@ -188,7 +188,13 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase { case i: InsertIntoStatement if i.table.containsPattern(PARAMETER) => i.copy(table = bind(i.table)(f)) case o: OverwriteByExpression if o.table.containsPattern(PARAMETER) => - o.withNewTable(bind(o.table)(f).asInstanceOf[NamedRelation]) + bind(o.table)(f) match { + case nr: NamedRelation => o.withNewTable(nr) + case other => + throw SparkException.internalError( + "Parameter binding on OverwriteByExpression.table must preserve " + + s"NamedRelation, but got: ${other.getClass.getName}") + } case other => other } withBoundTable.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) ( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 955a766080dbd..bf44d3a697923 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -83,13 +83,18 @@ case class PlanWithUnresolvedIdentifier( override def name: String = identifierExpr.sql override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { - if (children.isEmpty) { - // No inner plan to wrap; keep the `WithCTE` outside (same shape `CTESubstitution.withCTEDefs` - // would have produced if this node weren't `CTEInChildren`). - WithCTE(this, cteDefs) - } else { - withNewChildren(children.map(WithCTE(_, cteDefs))) - } + // `CTESubstitution.withCTEDefs` reaches this override only when the placeholder is the + // `firstSubstituted` root of a `WITH ... ` subtree, which happens only via the + // two-arg `withIdentClause(ctx, otherPlans, builder)` form -- and that form is only used + // for wrap-the-whole-command callers (currently just `CacheTableAsSelect`), which always + // pass a non-empty `otherPlans`. A `children.isEmpty` call here would mean the placeholder + // is wrapping nothing, and falling back to `WithCTE(this, cteDefs)` would re-introduce the + // structurally invalid `WithCTE(, _)` shape this PR is eliminating. + assert(children.nonEmpty, + "PlanWithUnresolvedIdentifier.withCTEDefs requires an inner plan to wrap; " + + "callers using the single-arg withIdentClause form must place the placeholder in a " + + "slot that is not the substitution root.") + withNewChildren(children.map(WithCTE(_, cteDefs))) } override protected def withNewChildrenInternal( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index e4d2323b3ca55..857f1c92851f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql import java.time.{Instant, LocalDate, LocalDateTime, ZoneId} import org.apache.spark.sql.catalyst.ExtendedAnalysisException -import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, PlanWithUnresolvedIdentifier} +import org.apache.spark.sql.catalyst.analysis.{BindParameters, CTESubstitution, NameParameterizedQuery, PlanWithUnresolvedIdentifier} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, Limit, OverwriteByExpression, WithCTE} +import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.trees.SQLQueryContext import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.functions.{array, call_function, lit, map, map_from_arrays, map_from_entries, str_to_map, struct} @@ -2558,4 +2559,30 @@ class ParametersSuite extends SharedSparkSession { case _ => } } + + // SPARK-46625: Parameter inside `IDENTIFIER(:p)` on REPLACE WHERE lives in + // `OverwriteByExpression.table`, which is a non-child slot. Verify that + // `BindParameters.bind` reaches into the slot via the explicit `OverwriteByExpression` + // recursion (parameters.scala) and that the `getDefaultTreePatternBits` override on + // `OverwriteByExpression` exposes the PARAMETER bit for pruning. Done at the rule level + // because driving REPLACE WHERE through full analysis would require a v2 catalog. + test("SPARK-46625: BindParameters recurses into OverwriteByExpression.table") { + val parsedPlan = spark.sessionState.sqlParser.parsePlan( + """INSERT INTO IDENTIFIER(:tname) REPLACE WHERE a = 10 + |SELECT 1 AS a""".stripMargin) + val overwrite = parsedPlan.collectFirst { case o: OverwriteByExpression => o }.getOrElse( + fail(s"Expected OverwriteByExpression in parsed plan:\n$parsedPlan")) + // Pruning prerequisite: the PARAMETER bit must be visible at the OverwriteByExpression + // level (it lives inside `table`, which is not a child); this exercises the + // `getDefaultTreePatternBits` override. + assert(overwrite.containsPattern(PARAMETER), + "OverwriteByExpression.getDefaultTreePatternBits must propagate `table`'s PARAMETER bit") + + val bound = BindParameters.apply( + NameParameterizedQuery(parsedPlan, Seq("tname"), Seq(Literal("foo_table")))) + val boundOverwrite = bound.collectFirst { case o: OverwriteByExpression => o }.getOrElse( + fail(s"Expected OverwriteByExpression in bound plan:\n$bound")) + assert(!boundOverwrite.table.containsPattern(PARAMETER), + s"Expected :tname inside OverwriteByExpression.table to be bound, got:\n$boundOverwrite") + } } From 5ba07b9c8c3c19b57f82be323c085d05f9fd3cf8 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 14:03:42 +0000 Subject: [PATCH 08/15] Rewrite withCTEDefs comment; add RTAS+CTE parser test Reword the block comment on `PlanWithUnresolvedIdentifier.withCTEDefs`: the prior text claimed the two-arg `withIdentClause` form always supplies a non-empty `otherPlans` (false -- `visitCacheTable` passes `Nil` when there is no AS query) and contained a "this PR is eliminating" phrasing that doesn't read well post-merge. Restate the reachability argument in steady-state terms based on the grammar. Add a parser-level test mirroring the existing REPLACE WHERE one for the RTAS path: asserts the placeholder lands in `ReplaceTableAsSelect.name` and that no `WithCTE(CTEInChildren, _)` shape survives `CTESubstitution`. Parser-only because running RTAS through full analysis requires a v2 catalog. Co-authored-by: Isaac --- .../sql/catalyst/analysis/unresolved.scala | 16 +++++++----- .../apache/spark/sql/ParametersSuite.scala | 26 ++++++++++++++++++- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index bf44d3a697923..841297548521d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -83,13 +83,15 @@ case class PlanWithUnresolvedIdentifier( override def name: String = identifierExpr.sql override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { - // `CTESubstitution.withCTEDefs` reaches this override only when the placeholder is the - // `firstSubstituted` root of a `WITH ... ` subtree, which happens only via the - // two-arg `withIdentClause(ctx, otherPlans, builder)` form -- and that form is only used - // for wrap-the-whole-command callers (currently just `CacheTableAsSelect`), which always - // pass a non-empty `otherPlans`. A `children.isEmpty` call here would mean the placeholder - // is wrapping nothing, and falling back to `WithCTE(this, cteDefs)` would re-introduce the - // structurally invalid `WithCTE(, _)` shape this PR is eliminating. + // `CTESubstitution.withCTEDefs` invokes this override only when the placeholder is the + // substituted root of a `WITH ... ` subtree -- i.e. when the parse tree has the + // shape `UnresolvedWith(this, _)`. Under the current grammar, `WITH` only precedes + // `query` or `dmlStatementNoWith`, and the parser places the placeholder inside the + // command's identifier slot rather than at that root, so this path is not reached in + // practice. The override remains as a safety net: if it does fire, `children` must be + // non-empty so `WithCTE` can be pushed inside the wrapped command. Falling back to + // `WithCTE(this, cteDefs)` with empty children would re-introduce the structurally + // invalid `WithCTE(, _)` shape this placement is designed to avoid. assert(children.nonEmpty, "PlanWithUnresolvedIdentifier.withCTEDefs requires an inner plan to wrap; " + "callers using the single-arg withIdentClause form must place the placeholder in a " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index 857f1c92851f9..73225da207dce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.ExtendedAnalysisException import org.apache.spark.sql.catalyst.analysis.{BindParameters, CTESubstitution, NameParameterizedQuery, PlanWithUnresolvedIdentifier} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, Limit, OverwriteByExpression, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, Limit, OverwriteByExpression, ReplaceTableAsSelect, WithCTE} import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.trees.SQLQueryContext import org.apache.spark.sql.catalyst.util.CharVarcharUtils @@ -2585,4 +2585,28 @@ class ParametersSuite extends SharedSparkSession { assert(!boundOverwrite.table.containsPattern(PARAMETER), s"Expected :tname inside OverwriteByExpression.table to be bound, got:\n$boundOverwrite") } + + // SPARK-46625: RTAS mirrors CTAS -- the placeholder goes into `ReplaceTableAsSelect.name` + // at parse time. Verify on the parsed plan that the placeholder lives in that slot and that + // no `WithCTE(CTEInChildren, _)` shape survives `CTESubstitution`. Running RTAS through full + // analysis would require a v2 catalog, so this is a parser-level test. + test("SPARK-46625: REPLACE TABLE IDENTIFIER(...) AS WITH ... SELECT ... parser") { + // Use a non-literal-string expression so `withIdentClause` produces + // `PlanWithUnresolvedIdentifier` rather than short-circuiting to `UnresolvedIdentifier`. + val parsedPlan = spark.sessionState.sqlParser.parsePlan( + """REPLACE TABLE IDENTIFIER('some' || '_table') USING PARQUET AS + |WITH transformation AS (SELECT 5 AS a) + |SELECT * FROM transformation""".stripMargin) + val rtas = parsedPlan.collectFirst { case r: ReplaceTableAsSelect => r }.getOrElse( + fail(s"Expected ReplaceTableAsSelect in parsed plan:\n$parsedPlan")) + assert(rtas.name.isInstanceOf[PlanWithUnresolvedIdentifier], + s"Expected ReplaceTableAsSelect.name to be PlanWithUnresolvedIdentifier, " + + s"got ${rtas.name.getClass.getSimpleName}:\n$parsedPlan") + val substituted = CTESubstitution.apply(parsedPlan) + substituted.foreach { + case WithCTE(_: CTEInChildren, _) => + fail(s"Found invalid WithCTE(CTEInChildren, _) shape after CTESubstitution:\n$substituted") + case _ => + } + } } From c58e9775896f7b5cd9e1a88d3500d3117add3f89 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 14:25:22 +0000 Subject: [PATCH 09/15] Match V2WriteCommand instead of OverwriteByExpression `OverwriteByExpression` is the only `V2WriteCommand` parser-built with a placeholder in `table` today, but every `V2WriteCommand` shares the same `child = query` shape, so `table` is a non-child slot for all of them. Match the trait in the related analyzer rules so the invariant stays uniform if any future analyzer-built node lands a placeholder in the same slot: - Move the `getDefaultTreePatternBits` override from `OverwriteByExpression` up to the `V2WriteCommand` trait. - Widen the `ResolveIdentifierClause` materialization case from `OverwriteByExpression` to `V2WriteCommand`, dispatching via `w.withNewTable(...)`. - Widen the `BindParameters.bind` table-slot recursion the same way. Behavior is unchanged today since `OverwriteByExpression` is still the only parser-built `V2WriteCommand` carrying a placeholder. Co-authored-by: Isaac --- .../analysis/ResolveIdentifierClause.scala | 21 +++++++++++------- .../sql/catalyst/analysis/parameters.scala | 22 ++++++++++--------- .../catalyst/plans/logical/v2Commands.scala | 22 ++++++++++--------- 3 files changed, 37 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala index 6c8be88cf9cc9..8d1a380a2cf6a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, VariableReference} -import org.apache.spark.sql.catalyst.plans.logical.{CreateView, InsertIntoStatement, LogicalPlan, OverwriteByExpression} +import org.apache.spark.sql.catalyst.plans.logical.{CreateView, InsertIntoStatement, LogicalPlan, V2WriteCommand} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.errors.QueryCompilationErrors @@ -71,9 +71,11 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] executor.execute(p.planBuilder.apply( IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) - // `InsertIntoStatement.table` and `OverwriteByExpression.table` are non-child - // LogicalPlan slots (`child = query`), so the standard `resolveOperatorsUp` traversal - // never visits placeholders inside them. Materialize them explicitly. + // `InsertIntoStatement.table` and `V2WriteCommand.table` are non-child LogicalPlan slots + // (`child = query`), so the standard `resolveOperatorsUp` traversal never visits + // placeholders inside them. Materialize them explicitly. Only `InsertIntoStatement` and + // `OverwriteByExpression` carry a parse-time placeholder today, but matching the + // `V2WriteCommand` trait keeps the rule consistent across the family. case i @ InsertIntoStatement(p: PlanWithUnresolvedIdentifier, _, _, _, _, _, _, _, _) if p.identifierExpr.resolved && p.childrenResolved => if (referredTempVars.isDefined) { @@ -81,17 +83,20 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] } i.copy(table = executor.execute(p.planBuilder.apply( IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children))) - case o @ OverwriteByExpression(p: PlanWithUnresolvedIdentifier, _, _, _, _, _, _, _) - if p.identifierExpr.resolved && p.childrenResolved => + case w: V2WriteCommand if w.table.isInstanceOf[PlanWithUnresolvedIdentifier] && { + val p = w.table.asInstanceOf[PlanWithUnresolvedIdentifier] + p.identifierExpr.resolved && p.childrenResolved + } => + val p = w.table.asInstanceOf[PlanWithUnresolvedIdentifier] if (referredTempVars.isDefined) { referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) } executor.execute(p.planBuilder.apply( IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) match { - case nr: NamedRelation => o.withNewTable(nr) + case nr: NamedRelation => w.withNewTable(nr) case other => throw SparkException.internalError( - "PlanWithUnresolvedIdentifier in OverwriteByExpression.table must materialize " + + "PlanWithUnresolvedIdentifier in V2WriteCommand.table must materialize " + s"into a NamedRelation, but got: ${other.getClass.getName}") } case other => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala index 48dfe26abb468..0ca7be0bd82b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, SubqueryExpression, Unevaluable} -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, OverwriteByExpression, SupervisingCommand} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, SupervisingCommand, V2WriteCommand} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMAND, PARAMETER, PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} @@ -179,20 +179,22 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase { p0.resolveOperatorsDownWithPruning(_.containsPattern(PARAMETER) && !stop) { case p1 => stop = p1.isInstanceOf[ParameterizedQuery] - // `InsertIntoStatement.table` and `OverwriteByExpression.table` are non-child - // LogicalPlan slots, so the standard `resolveOperatorsDown` traversal never visits - // parameter markers inside them. Recurse explicitly so `INSERT ... IDENTIFIER(:p)` - // and `INSERT INTO REPLACE WHERE ... IDENTIFIER(:p)` resolve under the legacy - // parameter-substitution mode (SPARK-46625). + // `InsertIntoStatement.table` and `V2WriteCommand.table` are non-child LogicalPlan + // slots, so the standard `resolveOperatorsDown` traversal never visits parameter + // markers inside them. Recurse explicitly so `INSERT ... IDENTIFIER(:p)` and + // `INSERT INTO REPLACE WHERE ... IDENTIFIER(:p)` resolve under the legacy + // parameter-substitution mode (SPARK-46625). Today only the `OverwriteByExpression` + // variant of `V2WriteCommand` is parser-built with a placeholder in `table`; the trait + // match keeps the rule consistent for any future analyzer-built node in the same shape. val withBoundTable = p1 match { case i: InsertIntoStatement if i.table.containsPattern(PARAMETER) => i.copy(table = bind(i.table)(f)) - case o: OverwriteByExpression if o.table.containsPattern(PARAMETER) => - bind(o.table)(f) match { - case nr: NamedRelation => o.withNewTable(nr) + case w: V2WriteCommand if w.table.containsPattern(PARAMETER) => + bind(w.table)(f) match { + case nr: NamedRelation => w.withNewTable(nr) case other => throw SparkException.internalError( - "Parameter binding on OverwriteByExpression.table must preserve " + + "Parameter binding on V2WriteCommand.table must preserve " + s"NamedRelation, but got: ${other.getClass.getName}") } case other => other diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index dd4b52bae2435..39118354a20ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -107,6 +107,18 @@ trait V2WriteCommand override def child: LogicalPlan = query + // `table` is a non-child slot, so the default tree-pattern propagation in TreeNode/QueryPlan + // does not see patterns inside it. Add `table`'s bits so that `containsPattern(...)` pruning + // correctly reports patterns living in `table` (e.g. `PLAN_WITH_UNRESOLVED_IDENTIFIER`, + // `PARAMETER`). Only `OverwriteByExpression` is constructed at parse time with a placeholder + // in `table`, but applying this uniformly across all `V2WriteCommand`s keeps the invariant + // consistent for any future analyzer-built node that lands a placeholder in the same slot. + override protected def getDefaultTreePatternBits: BitSet = { + val bits = super.getDefaultTreePatternBits + bits.union(table.treePatternBits) + bits + } + override lazy val resolved: Boolean = table.resolved && query.resolved && outputResolved def outputResolved: Boolean = { @@ -243,16 +255,6 @@ case class OverwriteByExpression( override def storeAnalyzedQuery(): Command = copy(analyzedQuery = Some(query)) override protected def withNewChildInternal(newChild: LogicalPlan): OverwriteByExpression = copy(query = newChild) - - // `table` is a non-child slot, so the default tree-pattern propagation in TreeNode/QueryPlan - // does not see patterns inside it. Add `table`'s bits so that `containsPattern(...)` pruning - // correctly reports patterns living in `table` (e.g. `PLAN_WITH_UNRESOLVED_IDENTIFIER`, - // `PARAMETER`). - override protected def getDefaultTreePatternBits: BitSet = { - val bits = super.getDefaultTreePatternBits - bits.union(table.treePatternBits) - bits - } } object OverwriteByExpression { From b61d7615efcd144e05bcd042452de2e835dee95a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 14:47:37 +0000 Subject: [PATCH 10/15] Tighten withCTEDefs invariant to single-child; document CTEInChildren safety Tighten the assertion in `PlanWithUnresolvedIdentifier.withCTEDefs` from `children.nonEmpty` to `children.length == 1` to encode the actual invariant under which the override is safe. Rewrite the comment to explain *why* extending `CTEInChildren` is safe: with exactly one child the placeholder acts as a transparent stand-in for its eventual command, and wrapping the single child with `WithCTE` reproduces the same shape the command's own `withCTEDefs` would produce (e.g. for `CacheTableAsSelect`, identical to `copy(plan = WithCTE(plan, cteDefs))`). Spell out the two ways a non-single-child case would break the equivalence so the assertion's failure mode is self-explanatory. Update the class-level Scaladoc accordingly. Co-authored-by: Isaac --- .../sql/catalyst/analysis/unresolved.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 841297548521d..e70483786ca0e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -63,10 +63,11 @@ trait UnresolvedUnaryNode extends UnaryNode with UnresolvedNode * Extends `NamedRelation` so it can occupy a `NamedRelation`-typed slot (e.g. * `OverwriteByExpression.table`) directly at parse time, instead of wrapping the whole command. * - * Extends `CTEInChildren` so when the placeholder wraps an entire CTEInChildren command (e.g. - * `CacheTableAsSelect`), `CTESubstitution.withCTEDefs` pushes the surrounding `WithCTE` into the - * placeholder's children at substitution time -- the cteDefs land in the materialized command's - * children by construction, with no post-hoc collapse required. + * Extends `CTEInChildren` so that if the placeholder is ever the substitution root of a + * `WITH ... ` subtree, `CTESubstitution` dispatches to the placeholder's own + * `withCTEDefs` and pushes `WithCTE` into the placeholder's single child. After materialization, + * the `WithCTE` lands inside the eventual command's body slot -- the same shape the command's + * own `withCTEDefs` would produce. See that override for the single-child safety argument. */ case class PlanWithUnresolvedIdentifier( identifierExpr: Expression, @@ -83,19 +84,22 @@ case class PlanWithUnresolvedIdentifier( override def name: String = identifierExpr.sql override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { - // `CTESubstitution.withCTEDefs` invokes this override only when the placeholder is the - // substituted root of a `WITH ... ` subtree -- i.e. when the parse tree has the - // shape `UnresolvedWith(this, _)`. Under the current grammar, `WITH` only precedes + // Safe to extend `CTEInChildren` because with exactly one child the placeholder is a + // transparent stand-in for its eventual command: wrapping the single child with `WithCTE` + // places the cteDefs in the same slot the materialized command's own `withCTEDefs` would. + // E.g. for `CacheTableAsSelect`, the single child is the AS-query and the result matches + // `CacheTableAsSelect.withCTEDefs(cteDefs) = copy(plan = WithCTE(plan, cteDefs))`. + // + // The single-child invariant matters: with 0 children the cteDefs would vanish, and with + // 2+ children they would be duplicated across slots that may not all be CTE-scope bodies. + // + // Under the current grammar this path is not actually reached -- `WITH` only precedes // `query` or `dmlStatementNoWith`, and the parser places the placeholder inside the - // command's identifier slot rather than at that root, so this path is not reached in - // practice. The override remains as a safety net: if it does fire, `children` must be - // non-empty so `WithCTE` can be pushed inside the wrapped command. Falling back to - // `WithCTE(this, cteDefs)` with empty children would re-introduce the structurally - // invalid `WithCTE(, _)` shape this placement is designed to avoid. - assert(children.nonEmpty, - "PlanWithUnresolvedIdentifier.withCTEDefs requires an inner plan to wrap; " + - "callers using the single-arg withIdentClause form must place the placeholder in a " + - "slot that is not the substitution root.") + // command's identifier slot rather than at the substitution root -- but the override stays + // as a safety net for future wrap-whole-command callers. + assert(children.length == 1, + "PlanWithUnresolvedIdentifier.withCTEDefs requires exactly one child so that the " + + "surrounding WithCTE lands in the materialized command's body slot.") withNewChildren(children.map(WithCTE(_, cteDefs))) } From 3bb3e676ded7cbb6ce30e017978a8dcbd304c88e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 15:29:20 +0000 Subject: [PATCH 11/15] Make CacheTableAsSelect.tempViewName an Expression; drop placeholder CTEInChildren `CacheTableAsSelect.tempViewName` was the last identifier slot still expressed as a plain `String`, which forced `visitCacheTable` to use the wrap-the-whole- command form of `withIdentClause` -- the same shape that motivated the `WithCTE(, _)` workaround chain in SPARK-46625. Change `tempViewName: String` to `tempViewName: Expression`. The parser produces either a non-null string `Literal` (for direct identifiers and `IDENTIFIER('literal')`) or an `ExpressionWithUnresolvedIdentifier` (for `IDENTIFIER()`). The expression slot is naturally visited by `transformExpressions`, so `ResolveIdentifierClause`'s existing `ExpressionWithUnresolvedIdentifier` branch resolves it without any new code path. `CheckAnalysis` enforces the post-analysis invariant that the slot is a non-null string literal, and `CacheTableAsSelect.tempViewNameString` extracts the string for execution. With this change, no parser caller places a `PlanWithUnresolvedIdentifier` as the substitution root of a `WITH ... ` subtree. The `CTEInChildren` extension on `PlanWithUnresolvedIdentifier` therefore has no reachable code path under the current grammar and is removed along with its `withCTEDefs` override. The class-level Scaladoc is updated accordingly. Add a parser-level test for `CACHE TABLE IDENTIFIER() AS WITH ... SELECT ...` asserting the expression placeholder lives in the name slot and no `WithCTE(CTEInChildren, _)` shape survives `CTESubstitution`. Update `DDLParserSuite` to construct `CacheTableAsSelect` with `Literal("t")` instead of `"t"`. Co-authored-by: Isaac --- .../sql/catalyst/analysis/CheckAnalysis.scala | 14 +++ .../sql/catalyst/analysis/unresolved.scala | 35 ++----- .../sql/catalyst/parser/AstBuilder.scala | 91 +++++++++++++------ .../catalyst/plans/logical/v2Commands.scala | 15 ++- .../sql/catalyst/parser/DDLParserSuite.scala | 2 +- .../datasources/v2/DataSourceV2Strategy.scala | 3 +- .../apache/spark/sql/ParametersSuite.scala | 28 +++++- 7 files changed, 130 insertions(+), 58 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index fa4c13bc24af3..fd3f14df2562a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -459,6 +459,20 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString messageParameters = Map("name" -> "IDENTIFIER", "expr" -> p.identifierExpr.sql) ) + case c: CacheTableAsSelect => + // The parser builds `tempViewName` as either a `Literal[StringType]` (for direct + // identifiers and `IDENTIFIER('literal')`) or an `ExpressionWithUnresolvedIdentifier` + // that resolves to such a Literal. Validate the post-analysis shape so any future + // construction path that violates the invariant fails loudly here, not deep inside + // execution via `tempViewNameString`. + c.tempViewName match { + case Literal(value, _: StringType) if value != null => // OK + case other => + throw SparkException.internalError( + "CacheTableAsSelect.tempViewName must be a non-null string literal after " + + s"analysis, but got: ${other.sql}") + } + case operator: LogicalPlan => operator transformExpressionsDown { case hof: HigherOrderFunction if hof.arguments.exists { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index e70483786ca0e..c32cf76f5f77d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIden import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LeafNode, LogicalPlan, SupportsSubquery, UnaryNode, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, SupportsSubquery, UnaryNode} import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId @@ -63,17 +63,18 @@ trait UnresolvedUnaryNode extends UnaryNode with UnresolvedNode * Extends `NamedRelation` so it can occupy a `NamedRelation`-typed slot (e.g. * `OverwriteByExpression.table`) directly at parse time, instead of wrapping the whole command. * - * Extends `CTEInChildren` so that if the placeholder is ever the substitution root of a - * `WITH ... ` subtree, `CTESubstitution` dispatches to the placeholder's own - * `withCTEDefs` and pushes `WithCTE` into the placeholder's single child. After materialization, - * the `WithCTE` lands inside the eventual command's body slot -- the same shape the command's - * own `withCTEDefs` would produce. See that override for the single-child safety argument. + * The parser always places this node inside the command's identifier slot (a child slot for + * DELETE/UPDATE/MERGE/CTAS/RTAS, or a non-child slot for `InsertIntoStatement.table` and + * `OverwriteByExpression.table` -- handled via explicit cases in `ResolveIdentifierClause` and + * `BindParameters`). It is never the substitution root of a `WITH ... ` subtree, so + * `CTEInChildren` semantics are not needed: any surrounding `WithCTE` produced by + * `CTESubstitution` targets the inner command directly. */ case class PlanWithUnresolvedIdentifier( identifierExpr: Expression, children: Seq[LogicalPlan], planBuilder: (Seq[String], Seq[LogicalPlan]) => LogicalPlan) - extends UnresolvedNode with NamedRelation with CTEInChildren { + extends UnresolvedNode with NamedRelation { def this(identifierExpr: Expression, planBuilder: Seq[String] => LogicalPlan) = { this(identifierExpr, Nil, (ident, _) => planBuilder(ident)) @@ -83,26 +84,6 @@ case class PlanWithUnresolvedIdentifier( override def name: String = identifierExpr.sql - override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { - // Safe to extend `CTEInChildren` because with exactly one child the placeholder is a - // transparent stand-in for its eventual command: wrapping the single child with `WithCTE` - // places the cteDefs in the same slot the materialized command's own `withCTEDefs` would. - // E.g. for `CacheTableAsSelect`, the single child is the AS-query and the result matches - // `CacheTableAsSelect.withCTEDefs(cteDefs) = copy(plan = WithCTE(plan, cteDefs))`. - // - // The single-child invariant matters: with 0 children the cteDefs would vanish, and with - // 2+ children they would be duplicated across slots that may not all be CTE-scope bodies. - // - // Under the current grammar this path is not actually reached -- `WITH` only precedes - // `query` or `dmlStatementNoWith`, and the parser places the placeholder inside the - // command's identifier slot rather than at the substitution root -- but the override stays - // as a safety net for future wrap-whole-command callers. - assert(children.length == 1, - "PlanWithUnresolvedIdentifier.withCTEDefs requires exactly one child so that the " + - "surrounding WithCTE lands in the materialized command's body slot.") - withNewChildren(children.map(WithCTE(_, cteDefs))) - } - override protected def withNewChildrenInternal( newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = copy(identifierExpr, newChildren, planBuilder) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d07b1620308bf..e639049a64c94 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -6569,35 +6569,74 @@ class AstBuilder extends DataTypeAstBuilder * }}} */ override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) { - val query = Option(ctx.query).map(plan) - withIdentClause(ctx.identifierReference, query.toSeq, (ident, children) => { - if (query.isDefined && ident.length > 1) { - val catalogAndNamespace = ident.init - throw QueryParsingErrors.addCatalogInCacheTableAsSelectNotAllowedError( - catalogAndNamespace.quoted, ctx) - } - val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) - val isLazy = ctx.LAZY != null - if (query.isDefined) { + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + val isLazy = ctx.LAZY != null + Option(ctx.query).map(plan) match { + case Some(query) => // Disallow parameter markers in the query of the cache. // We need this limitation because we store the original query text, pre substitution. - // To lift this we would need to reconstitute the query with parameter markers replaced with - // the values given at CACHE TABLE time, or we would need to store the parameter values - // alongside the text. - // The same rule can be found in CREATE VIEW builder. - checkInvalidParameter(query.get, "the query of CACHE TABLE") - CacheTableAsSelect(ident.head, children.head, source(ctx.query()), isLazy, options) - } else { - CacheTable( - createUnresolvedRelation( - ctx.identifierReference, - ident, - None, - writePrivileges = Set.empty, - isStreaming = false), - ident, isLazy, options) + // To lift this we would need to reconstitute the query with parameter markers replaced + // with the values given at CACHE TABLE time, or we would need to store the parameter + // values alongside the text. The same rule can be found in CREATE VIEW builder. + checkInvalidParameter(query, "the query of CACHE TABLE") + // `CacheTableAsSelect.name` is an `Expression` slot: a `Literal` for direct identifiers + // and `IDENTIFIER('literal-string')`, or an `ExpressionWithUnresolvedIdentifier` for + // `IDENTIFIER()`. Building the name as an expression avoids the + // wrap-the-whole-command form (where the `PlanWithUnresolvedIdentifier` would wrap the + // entire `CacheTableAsSelect`), which is the last shape that motivated the + // `WithCTE(, _)` workaround chain in SPARK-46625. + val nameExpr = buildCacheTableAsSelectName(ctx.identifierReference, ctx) + CacheTableAsSelect(nameExpr, query, source(ctx.query()), isLazy, options) + case None => + withIdentClause(ctx.identifierReference, ident => { + CacheTable( + createUnresolvedRelation( + ctx.identifierReference, + ident, + None, + writePrivileges = Set.empty, + isStreaming = false), + ident, isLazy, options) + }) + } + } + + /** + * Build the `name` expression for a `CACHE TABLE ... AS SELECT` command from an + * `identifierReference` context. + * + * `CacheTableAsSelect` requires a single-part temp view name (no catalog/namespace). For direct + * identifiers and `IDENTIFIER('literal-string')` we validate this at parse time and produce a + * non-null string `Literal`. For `IDENTIFIER()` we emit an + * `ExpressionWithUnresolvedIdentifier` whose builder validates the single-part invariant when + * the identifier expression is resolved. + */ + private def buildCacheTableAsSelectName( + ctx: IdentifierReferenceContext, + parentCtx: CacheTableContext): Expression = { + // Use the outer `parentCtx` for the multi-part error so the query context points at the + // whole `CACHE TABLE ... AS ...` statement, not just the identifier reference. The caller + // (`visitCacheTable`) already has `withOrigin(parentCtx)` in scope. + def singlePart(parts: Seq[String]): String = { + if (parts.length > 1) { + throw QueryParsingErrors.addCatalogInCacheTableAsSelectNotAllowedError( + parts.init.quoted, parentCtx) } - }) + parts.head + } + val exprCtx = ctx.expression + if (exprCtx != null) { + expression(exprCtx) match { + case Literal(value, _: StringType) if value != null => + Literal(singlePart(parseMultipartIdentifier(value.toString))) + case expr => + new ExpressionWithUnresolvedIdentifier( + withOrigin(exprCtx) { expr }, + parts => Literal(singlePart(parts))) + } + } else { + Literal(singlePart(visitMultipartIdentifier(ctx.multipartIdentifier))) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 39118354a20ae..5dd2f10c89cbe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -1968,7 +1968,7 @@ case class CacheTable( * The logical plan of the CACHE TABLE ... AS SELECT command. */ case class CacheTableAsSelect( - tempViewName: String, + tempViewName: Expression, plan: LogicalPlan, originalText: String, isLazy: Boolean, @@ -1976,6 +1976,19 @@ case class CacheTableAsSelect( isAnalyzed: Boolean = false, referredTempFunctions: Seq[String] = Seq.empty) extends AnalysisOnlyCommand with CTEInChildren { + + /** + * Returns the temp view name string. Must only be called after analysis, when `tempViewName` + * has been resolved to a non-null string `Literal`. `CheckAnalysis` enforces this invariant. + */ + def tempViewNameString: String = tempViewName match { + case Literal(value, _: StringType) if value != null => value.toString + case other => + throw SparkException.internalError( + "CacheTableAsSelect.tempViewName must be a non-null string literal after analysis, " + + s"but got: ${other.sql}") + } + override protected def withNewChildrenInternal( newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect = { assert(!isAnalyzed) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 1ac417ddc9376..4f4aa6e24c5aa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2761,7 +2761,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("CACHE TABLE t AS SELECT * FROM testData"), CacheTableAsSelect( - "t", + Literal("t"), Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("testData"))), "SELECT * FROM testData", false, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 3c58298ec9211..ed067a3f00d1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -778,7 +778,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case r: CacheTableAsSelect => CacheTableAsSelectExec( - r.tempViewName, r.plan, r.originalText, r.isLazy, r.options, r.referredTempFunctions) :: Nil + r.tempViewNameString, r.plan, r.originalText, r.isLazy, r.options, + r.referredTempFunctions) :: Nil case r: UncacheTable => def isTempView(table: LogicalPlan): Boolean = table match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index 73225da207dce..ee5acf63bfb93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql import java.time.{Instant, LocalDate, LocalDateTime, ZoneId} import org.apache.spark.sql.catalyst.ExtendedAnalysisException -import org.apache.spark.sql.catalyst.analysis.{BindParameters, CTESubstitution, NameParameterizedQuery, PlanWithUnresolvedIdentifier} +import org.apache.spark.sql.catalyst.analysis.{BindParameters, CTESubstitution, ExpressionWithUnresolvedIdentifier, NameParameterizedQuery, PlanWithUnresolvedIdentifier} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, Limit, OverwriteByExpression, ReplaceTableAsSelect, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, CTEInChildren, Limit, OverwriteByExpression, ReplaceTableAsSelect, WithCTE} import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.trees.SQLQueryContext import org.apache.spark.sql.catalyst.util.CharVarcharUtils @@ -2586,6 +2586,30 @@ class ParametersSuite extends SharedSparkSession { s"Expected :tname inside OverwriteByExpression.table to be bound, got:\n$boundOverwrite") } + // SPARK-46625: `CacheTableAsSelect.tempViewName` is an `Expression` slot, so an + // `IDENTIFIER()` produces an `ExpressionWithUnresolvedIdentifier` there instead of + // wrapping the entire command in a `PlanWithUnresolvedIdentifier`. Verify on the parsed plan + // that the name slot holds the expression placeholder and no `WithCTE(CTEInChildren, _)` shape + // survives `CTESubstitution` (running the cache through full analysis would require the temp + // view machinery, so this is a parser-level test). + test("SPARK-46625: CACHE TABLE IDENTIFIER(...) AS WITH ... SELECT ... parser") { + val parsedPlan = spark.sessionState.sqlParser.parsePlan( + """CACHE TABLE IDENTIFIER('some' || '_view') AS + |WITH transformation AS (SELECT 4 AS a) + |SELECT * FROM transformation""".stripMargin) + val ctas = parsedPlan.collectFirst { case c: CacheTableAsSelect => c }.getOrElse( + fail(s"Expected CacheTableAsSelect in parsed plan:\n$parsedPlan")) + assert(ctas.tempViewName.isInstanceOf[ExpressionWithUnresolvedIdentifier], + s"Expected CacheTableAsSelect.tempViewName to be ExpressionWithUnresolvedIdentifier, " + + s"got ${ctas.tempViewName.getClass.getSimpleName}:\n$parsedPlan") + val substituted = CTESubstitution.apply(parsedPlan) + substituted.foreach { + case WithCTE(_: CTEInChildren, _) => + fail(s"Found invalid WithCTE(CTEInChildren, _) shape after CTESubstitution:\n$substituted") + case _ => + } + } + // SPARK-46625: RTAS mirrors CTAS -- the placeholder goes into `ReplaceTableAsSelect.name` // at parse time. Verify on the parsed plan that the placeholder lives in that slot and that // no `WithCTE(CTEInChildren, _)` shape survives `CTESubstitution`. Running RTAS through full From 6a006d5b83a7aeded3b0c0b35fbbfb7d7380022a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 16:08:56 +0000 Subject: [PATCH 12/15] Use typed-guard match for InsertIntoStatement; narrow buildWriteTableSlot return type; comment fixes - ResolveIdentifierClause: replace the 9-field positional `InsertIntoStatement` pattern with the typed-guard shape already used for `V2WriteCommand`, so the rule survives field additions/reorderings on `InsertIntoStatement`. - AstBuilder: type `buildWriteTableSlot` to return `NamedRelation` directly (both branches already do); delete the casting wrapper `buildWriteTableSlotNamedRelation` and use `buildWriteTableSlot` at the `OverwriteByExpression` callsite. - Comments: `CacheTableAsSelect.name` -> `tempViewName` (field name correction) in `AstBuilder.visitCacheTable` and the `buildCacheTableAsSelectName` Scaladoc; fix the example SQL in `BindParameters` from `INSERT INTO REPLACE WHERE ... IDENTIFIER(:p)` to `INSERT INTO IDENTIFIER(:p) REPLACE WHERE ...`. Co-authored-by: Isaac --- .../analysis/ResolveIdentifierClause.scala | 7 ++-- .../sql/catalyst/analysis/parameters.scala | 2 +- .../sql/catalyst/parser/AstBuilder.scala | 36 +++++++------------ 3 files changed, 19 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala index 8d1a380a2cf6a..8dc82581aeef3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala @@ -76,8 +76,11 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] // placeholders inside them. Materialize them explicitly. Only `InsertIntoStatement` and // `OverwriteByExpression` carry a parse-time placeholder today, but matching the // `V2WriteCommand` trait keeps the rule consistent across the family. - case i @ InsertIntoStatement(p: PlanWithUnresolvedIdentifier, _, _, _, _, _, _, _, _) - if p.identifierExpr.resolved && p.childrenResolved => + case i: InsertIntoStatement if i.table.isInstanceOf[PlanWithUnresolvedIdentifier] && { + val p = i.table.asInstanceOf[PlanWithUnresolvedIdentifier] + p.identifierExpr.resolved && p.childrenResolved + } => + val p = i.table.asInstanceOf[PlanWithUnresolvedIdentifier] if (referredTempVars.isDefined) { referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala index 0ca7be0bd82b2..31c835986e20d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala @@ -182,7 +182,7 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase { // `InsertIntoStatement.table` and `V2WriteCommand.table` are non-child LogicalPlan // slots, so the standard `resolveOperatorsDown` traversal never visits parameter // markers inside them. Recurse explicitly so `INSERT ... IDENTIFIER(:p)` and - // `INSERT INTO REPLACE WHERE ... IDENTIFIER(:p)` resolve under the legacy + // `INSERT INTO IDENTIFIER(:p) REPLACE WHERE ...` resolve under the legacy // parameter-substitution mode (SPARK-46625). Today only the `OverwriteByExpression` // variant of `V2WriteCommand` is parser-built with a placeholder in `table`; the trait // match keeps the rule consistent for any future analyzer-built node in the same shape. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index e639049a64c94..6b08a9281f83a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -972,8 +972,7 @@ class AstBuilder extends DataTypeAstBuilder // `PlanWithUnresolvedIdentifier` is a `NamedRelation`, so it can occupy // `OverwriteByExpression.table` directly; the materialization happens in // `ResolveIdentifierClause` via its `OverwriteByExpression` special-case. - val table = buildWriteTableSlotNamedRelation( - ctx.identifierReference, options, privileges) + val table = buildWriteTableSlot(ctx.identifierReference, options, privileges) val deleteExpr = expression(ctx.replaceCondition) val isByName = ctx.NAME() != null if (isByName) { @@ -1167,7 +1166,9 @@ class AstBuilder extends DataTypeAstBuilder * Build the `table` slot of a write command. If the identifier reference is a constant string, * returns an [[UnresolvedRelation]] directly; otherwise returns a * [[PlanWithUnresolvedIdentifier]] that materializes into an [[UnresolvedRelation]] once the - * identifier expression is resolved. + * identifier expression is resolved. Both branches produce a [[NamedRelation]], so the result + * fits `NamedRelation`-typed slots (e.g. `OverwriteByExpression.table`) as well as the more + * general `LogicalPlan` slot of `InsertIntoStatement.table`. * * Placing the placeholder in the identifier slot (rather than wrapping the entire write command) * preserves the `CTEInChildren` shape at parse time, so `CTESubstitution` places `WithCTE` on the @@ -1176,21 +1177,10 @@ class AstBuilder extends DataTypeAstBuilder private def buildWriteTableSlot( ctx: IdentifierReferenceContext, optionsClause: Option[OptionsClauseContext], - writePrivileges: Set[TableWritePrivilege]): LogicalPlan = { + writePrivileges: Set[TableWritePrivilege]): NamedRelation = { withIdentClause(ctx, parts => createUnresolvedRelation(ctx, parts, optionsClause, writePrivileges, isStreaming = false)) - } - - /** - * Variant of `buildWriteTableSlot` returning a `NamedRelation`, for slots typed as - * `NamedRelation` (e.g. `OverwriteByExpression.table`). `PlanWithUnresolvedIdentifier` is a - * `NamedRelation`, so the placeholder fits the slot type directly. - */ - private def buildWriteTableSlotNamedRelation( - ctx: IdentifierReferenceContext, - optionsClause: Option[OptionsClauseContext], - writePrivileges: Set[TableWritePrivilege]): NamedRelation = { - buildWriteTableSlot(ctx, optionsClause, writePrivileges).asInstanceOf[NamedRelation] + .asInstanceOf[NamedRelation] } /** @@ -6579,12 +6569,12 @@ class AstBuilder extends DataTypeAstBuilder // with the values given at CACHE TABLE time, or we would need to store the parameter // values alongside the text. The same rule can be found in CREATE VIEW builder. checkInvalidParameter(query, "the query of CACHE TABLE") - // `CacheTableAsSelect.name` is an `Expression` slot: a `Literal` for direct identifiers - // and `IDENTIFIER('literal-string')`, or an `ExpressionWithUnresolvedIdentifier` for - // `IDENTIFIER()`. Building the name as an expression avoids the - // wrap-the-whole-command form (where the `PlanWithUnresolvedIdentifier` would wrap the - // entire `CacheTableAsSelect`), which is the last shape that motivated the - // `WithCTE(, _)` workaround chain in SPARK-46625. + // `CacheTableAsSelect.tempViewName` is an `Expression` slot: a `Literal` for direct + // identifiers and `IDENTIFIER('literal-string')`, or an + // `ExpressionWithUnresolvedIdentifier` for `IDENTIFIER()`. Building the name + // as an expression avoids the wrap-the-whole-command form (where the + // `PlanWithUnresolvedIdentifier` would wrap the entire `CacheTableAsSelect`), which is the + // last shape that motivated the `WithCTE(, _)` workaround chain in SPARK-46625. val nameExpr = buildCacheTableAsSelectName(ctx.identifierReference, ctx) CacheTableAsSelect(nameExpr, query, source(ctx.query()), isLazy, options) case None => @@ -6602,7 +6592,7 @@ class AstBuilder extends DataTypeAstBuilder } /** - * Build the `name` expression for a `CACHE TABLE ... AS SELECT` command from an + * Build the `tempViewName` expression for a `CACHE TABLE ... AS SELECT` command from an * `identifierReference` context. * * `CacheTableAsSelect` requires a single-part temp view name (no catalog/namespace). For direct From 90f09a90f581742f81b4fc748738de220aff0700 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 May 2026 16:31:12 +0000 Subject: [PATCH 13/15] Guard CacheTableAsSelect CheckAnalysis on tempViewName.resolved If `tempViewName` is still an unresolved `ExpressionWithUnresolvedIdentifier` (e.g. `CACHE TABLE IDENTIFIER() AS SELECT ...`), the case should not fire -- otherwise it throws an internal error and pre-empts the catch-all `LogicalPlan` case from raising the proper `UNRESOLVED_COLUMN` user-facing error. Co-authored-by: Isaac --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index fd3f14df2562a..9c4fbd719a966 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -459,12 +459,15 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString messageParameters = Map("name" -> "IDENTIFIER", "expr" -> p.identifierExpr.sql) ) - case c: CacheTableAsSelect => + case c: CacheTableAsSelect if c.tempViewName.resolved => // The parser builds `tempViewName` as either a `Literal[StringType]` (for direct // identifiers and `IDENTIFIER('literal')`) or an `ExpressionWithUnresolvedIdentifier` // that resolves to such a Literal. Validate the post-analysis shape so any future // construction path that violates the invariant fails loudly here, not deep inside - // execution via `tempViewNameString`. + // execution via `tempViewNameString`. The `resolved` guard ensures that when the + // IDENTIFIER expression itself failed to resolve (e.g. `IDENTIFIER()`), + // we fall through to the catch-all `LogicalPlan` case so the user sees the proper + // `UNRESOLVED_COLUMN` error rather than an internal error. c.tempViewName match { case Literal(value, _: StringType) if value != null => // OK case other => From 93902693fbccfb4dd98b189063c44135088b731d Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 19 May 2026 04:01:05 +0000 Subject: [PATCH 14/15] Address review: simplify identifier-clause cases, document name, add e2e cache test - Refactor ResolveIdentifierClause: lift the PlanWithUnresolvedIdentifier cast out of a compound guard into a single asInstanceOf at the top of the case body; resolution check moves to an inner if/else with `i`/`w` returned unchanged when not yet resolved. Same shape for InsertIntoStatement and V2WriteCommand cases. - Document `PlanWithUnresolvedIdentifier.name = identifierExpr.sql` as the error-path placeholder used by SparkStrategies.extractTableNameForError and the `r: NamedRelation` fallback in QueryCompilationErrors. - Add an end-to-end ParametersSuite test for CACHE TABLE IDENTIFIER(:p) AS WITH ... SELECT ..., exercising the tempViewNameString extraction in DataSourceV2Strategy and the CheckAnalysis invariant case for CacheTableAsSelect.tempViewName. Co-authored-by: Isaac --- .../analysis/ResolveIdentifierClause.scala | 46 ++++++++++--------- .../sql/catalyst/analysis/unresolved.scala | 4 ++ .../apache/spark/sql/ParametersSuite.scala | 16 +++++++ 3 files changed, 44 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala index 8dc82581aeef3..cfa6f33588062 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala @@ -76,31 +76,33 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] // placeholders inside them. Materialize them explicitly. Only `InsertIntoStatement` and // `OverwriteByExpression` carry a parse-time placeholder today, but matching the // `V2WriteCommand` trait keeps the rule consistent across the family. - case i: InsertIntoStatement if i.table.isInstanceOf[PlanWithUnresolvedIdentifier] && { + case i: InsertIntoStatement if i.table.isInstanceOf[PlanWithUnresolvedIdentifier] => val p = i.table.asInstanceOf[PlanWithUnresolvedIdentifier] - p.identifierExpr.resolved && p.childrenResolved - } => - val p = i.table.asInstanceOf[PlanWithUnresolvedIdentifier] - if (referredTempVars.isDefined) { - referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) + if (p.identifierExpr.resolved && p.childrenResolved) { + if (referredTempVars.isDefined) { + referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) + } + i.copy(table = executor.execute(p.planBuilder.apply( + IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children))) + } else { + i } - i.copy(table = executor.execute(p.planBuilder.apply( - IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children))) - case w: V2WriteCommand if w.table.isInstanceOf[PlanWithUnresolvedIdentifier] && { - val p = w.table.asInstanceOf[PlanWithUnresolvedIdentifier] - p.identifierExpr.resolved && p.childrenResolved - } => + case w: V2WriteCommand if w.table.isInstanceOf[PlanWithUnresolvedIdentifier] => val p = w.table.asInstanceOf[PlanWithUnresolvedIdentifier] - if (referredTempVars.isDefined) { - referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) - } - executor.execute(p.planBuilder.apply( - IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) match { - case nr: NamedRelation => w.withNewTable(nr) - case other => - throw SparkException.internalError( - "PlanWithUnresolvedIdentifier in V2WriteCommand.table must materialize " + - s"into a NamedRelation, but got: ${other.getClass.getName}") + if (p.identifierExpr.resolved && p.childrenResolved) { + if (referredTempVars.isDefined) { + referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) + } + executor.execute(p.planBuilder.apply( + IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) match { + case nr: NamedRelation => w.withNewTable(nr) + case other => + throw SparkException.internalError( + "PlanWithUnresolvedIdentifier in V2WriteCommand.table must materialize " + + s"into a NamedRelation, but got: ${other.getClass.getName}") + } + } else { + w } case other => other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index c32cf76f5f77d..a5b467d0f0816 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -82,6 +82,10 @@ case class PlanWithUnresolvedIdentifier( final override val nodePatterns: Seq[TreePattern] = Seq(PLAN_WITH_UNRESOLVED_IDENTIFIER) + // Placeholder name used by error paths that render `NamedRelation.name` for an unresolved + // table reference -- e.g. `SparkStrategies.extractTableNameForError` and the `r: NamedRelation` + // fallback in `QueryCompilationErrors`. Renders as the SQL text of the identifier expression + // (e.g. `IDENTIFIER(:p)` or `concat('a', 'b')`) so error messages remain informative. override def name: String = identifierExpr.sql override protected def withNewChildrenInternal( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index ee5acf63bfb93..2c0598bf8f72b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -2610,6 +2610,22 @@ class ParametersSuite extends SharedSparkSession { } } + // SPARK-46625: End-to-end CACHE TABLE IDENTIFIER(:p) AS WITH ... SELECT ... -- exercises the + // `tempViewNameString` extraction in `DataSourceV2Strategy` and the `CheckAnalysis` invariant + // case for `CacheTableAsSelect.tempViewName`. The parser-level test above already verifies + // the placement and CTE shape; this one drives the full analysis + execution path. + test("SPARK-46625: CACHE TABLE IDENTIFIER(:p) AS WITH ... SELECT ...") { + withTempView("t_cte_cache") { + val df = spark.sql( + """CACHE TABLE IDENTIFIER(:tname) AS + |WITH transformation AS (SELECT 21 AS a) + |SELECT * FROM transformation""".stripMargin, + Map("tname" -> "t_cte_cache")) + assertNoWithCTEAroundCTEInChildren(df) + checkAnswer(spark.table("t_cte_cache"), Row(21)) + } + } + // SPARK-46625: RTAS mirrors CTAS -- the placeholder goes into `ReplaceTableAsSelect.name` // at parse time. Verify on the parsed plan that the placeholder lives in that slot and that // no `WithCTE(CTEInChildren, _)` shape survives `CTESubstitution`. Running RTAS through full From bfd6789ba944f49cd6e356caed8368a808441029 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 19 May 2026 11:57:39 +0000 Subject: [PATCH 15/15] Pin tempViewName.resolved guard with a regression test; fix import order Add a test that asserts CACHE TABLE IDENTIFIER() AS SELECT ... produces UNRESOLVED_COLUMN and NOT the internal-error message thrown by the CheckAnalysis invariant case. A future change that drops the `if c.tempViewName.resolved` guard would silently swap the user-facing error for SparkException.internalError; this test catches that regression. Also fixes scalastyle import-order violation introduced earlier. Co-authored-by: Isaac --- .../org/apache/spark/sql/ParametersSuite.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index 2c0598bf8f72b..575fcc058169e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.analysis.{BindParameters, CTESubstitution, import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, CTEInChildren, Limit, OverwriteByExpression, ReplaceTableAsSelect, WithCTE} -import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.trees.SQLQueryContext +import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.functions.{array, call_function, lit, map, map_from_arrays, map_from_entries, str_to_map, struct} import org.apache.spark.sql.internal.SQLConf @@ -2610,6 +2610,21 @@ class ParametersSuite extends SharedSparkSession { } } + // SPARK-46625: Regression for the `if c.tempViewName.resolved` guard in CheckAnalysis. When + // the IDENTIFIER expression itself fails to resolve (e.g. references an unresolved column), + // the guard skips the invariant-validation case so the catch-all `LogicalPlan` case can + // produce `UNRESOLVED_COLUMN`. Without the guard, the invariant case would pre-empt this + // path and throw a `SparkException internal error` instead. + test("SPARK-46625: CACHE TABLE IDENTIFIER() reports UNRESOLVED_COLUMN") { + val ex = intercept[AnalysisException] { + spark.sql("CACHE TABLE IDENTIFIER(unresolved_col) AS SELECT 1 AS a") + } + assert(ex.getCondition != null && ex.getCondition.startsWith("UNRESOLVED_COLUMN"), + s"Expected UNRESOLVED_COLUMN.*, got ${ex.getCondition}: ${ex.getMessage}") + assert(!ex.getMessage.contains("CacheTableAsSelect.tempViewName must be"), + s"Internal-error message leaked into user-facing error: ${ex.getMessage}") + } + // SPARK-46625: End-to-end CACHE TABLE IDENTIFIER(:p) AS WITH ... SELECT ... -- exercises the // `tempViewNameString` extraction in `DataSourceV2Strategy` and the `CheckAnalysis` invariant // case for `CacheTableAsSelect.tempViewName`. The parser-level test above already verifies