Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql.catalyst.analysis

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsRowLevelOperations, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
Expand All @@ -42,15 +45,19 @@ object ResolveMergeIntoSchemaEvolution extends Rule[LogicalPlan] {
if (changes.isEmpty) {
m
} else {
m transformUpWithNewOutput {
case r @ DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _, _) =>
val finalAttrMapping = ArrayBuffer.empty[(Attribute, Attribute)]
Copy link
Member Author

@szehon-ho szehon-ho Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bug here, as it actually hits both the sourceTable and targetTable and tries schema evolution on both, when actually schema evolution should always be performed only for target table.

I had done it this way because of the limitation of transformUpWithNewOutput that it doesn't re-map the attribues of the top level object (MergeIntoTable). See #52866 (comment) for my finding. So I transformed all children of MergeIntoTable and assumed that the match with SupportsRowLevelOperation Table would be enough to only do schema evolution on the targetTable, but I was wrong.

So the fix is to explicitly transform on the MergeIntoTable's targetTable, and I add an extra rewriteAttrs to rewrite the top level MergeIntoTable object.

val newTarget = m.targetTable.transform {
case r: DataSourceV2Relation =>
val referencedSourceSchema = MergeIntoTable.sourceSchemaForSchemaEvolution(m)
val newTarget = performSchemaEvolution(r, referencedSourceSchema, changes)
val oldTargetOutput = m.targetTable.output
val newTargetOutput = newTarget.output
val attributeMapping = oldTargetOutput.zip(newTargetOutput)
newTarget -> attributeMapping
finalAttrMapping ++= attributeMapping
newTarget
}
val res = m.copy(targetTable = newTarget)
res.rewriteAttrs(AttributeMap(finalAttrMapping.toSeq))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -916,19 +916,29 @@ case class MergeIntoTable(
false
} else {
val actions = matchedActions ++ notMatchedActions
val assignments = actions.collect {
case a: UpdateAction => a.assignments
case a: InsertAction => a.assignments
}.flatten
val sourcePaths = DataTypeUtils.extractAllFieldPaths(sourceTable.schema)
assignments.forall { assignment =>
assignment.resolved ||
(assignment.value.resolved && sourcePaths.exists {
path => MergeIntoTable.isEqual(assignment, path)
})
val hasStarActions = actions.exists {
Copy link
Member Author

@szehon-ho szehon-ho Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to canEvaluateSchemaEvolution (that guards whether the schema evolution check gets evaluated) gets called with updateStar and insertStar. This is triggered in the Dataframe API, and revealed I had missed this case. So here I return false to explicitly skip until they are resolved. Else this hits an analysis error later. This was not triggered in SQL case where the stars got resolved earlier.

case _: UpdateStarAction => true
case _: InsertStarAction => true
case _ => false
}
if (hasStarActions) {
// need to resolve star actions first
false
} else {
val assignments = actions.collect {
case a: UpdateAction => a.assignments
case a: InsertAction => a.assignments
}.flatten
val sourcePaths = DataTypeUtils.extractAllFieldPaths(sourceTable.schema)
assignments.forall { assignment =>
assignment.resolved ||
(assignment.value.resolved && sourcePaths.exists {
path => MergeIntoTable.isEqual(assignment, path)
})
}
}
}
}

private lazy val sourceSchemaForEvolution: StructType =
MergeIntoTable.sourceSchemaForSchemaEvolution(this)
Expand Down
Loading