Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
}
val resolvedAssignments =
resolveAssignments(resolve, assignments, merge, SOURCE_ONLY)
UpdateAction(resolvedCond, resolvedAssignments)
// Tag so merge-schema can distinguish `UPDATE *` from a fully-listed explicit clause.
PaimonMergeActionTags.markFromStar(UpdateAction(resolvedCond, resolvedAssignments))
case action =>
throw new RuntimeException(s"Can't recognize this action: $action")
}
Expand All @@ -97,7 +98,7 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
}
val resolvedAssignments =
resolveAssignments(resolve, assignments, merge, SOURCE_ONLY)
InsertAction(resolvedCond, resolvedAssignments)
PaimonMergeActionTags.markFromStar(InsertAction(resolvedCond, resolvedAssignments))
case action =>
throw new RuntimeException(s"Can't recognize this action: $action")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, SubstituteUnresolvedOrdinals}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, UnresolvedWith}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, UnresolvedWith, UpdateAction}
import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Keep
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.SparkFormatTable
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataTypes, StructType, VariantType}
Expand Down Expand Up @@ -138,6 +139,25 @@ class Spark4Shim extends SparkShim {
withSchemaEvolution)
}

override def copyDataSourceV2Relation(
relation: DataSourceV2Relation,
table: Table,
output: Seq[AttributeReference]): DataSourceV2Relation = {
relation.copy(table = table, output = output)
}

override def copyUpdateAction(
action: UpdateAction,
assignments: Seq[Assignment]): UpdateAction = {
action.copy(assignments = assignments)
}

override def copyInsertAction(
action: InsertAction,
assignments: Seq[Assignment]): InsertAction = {
action.copy(assignments = assignments)
}

// Spark 4.0 still has `SubstituteUnresolvedOrdinals` (Spark 4.1 removed it because the new
// resolver framework handles ordinals inline). `PaimonViewResolver` applies the shim's early
// rules to the parsed view text before storing, so we must substitute `ORDER BY 1` →
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ package org.apache.paimon.spark.catalyst.analysis
import org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{PaimonUtils, SparkSession}
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayTransform, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, LambdaFunction, Literal, MapFromArrays, MapKeys, MapValues, NamedExpression, NamedLambdaVariable}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, InsertAction, InsertStarAction, MergeAction, MergeIntoTable, UpdateAction, UpdateStarAction}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.paimon.shims.SparkShimLoader
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}

trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {

Expand Down Expand Up @@ -67,26 +68,23 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {
}
}

/**
* Align assignments in a MergeAction based on the target table's output attributes.
* - DeleteAction: returns as-is
* - UpdateAction: aligns assignments for update
* - InsertAction: aligns assignments for insert
*/
/** Align a MergeAction's assignments to target output. Star actions must already be expanded. */
protected def alignMergeAction(action: MergeAction, targetOutput: Seq[Attribute]): MergeAction = {
action match {
// `copyXxxAction` rebuilds the node and drops tags; re-carry FROM_STAR so merge-schema works.
val aligned = action match {
case d @ DeleteAction(_) => d
case u @ PaimonUpdateAction(_, assignments) =>
u.copy(assignments = alignAssignments(targetOutput, assignments))
SparkShimLoader.shim.copyUpdateAction(u, alignAssignments(targetOutput, assignments))
case i @ InsertAction(_, assignments) =>
i.copy(assignments = alignAssignments(targetOutput, assignments, isInsert = true))
case _: UpdateStarAction =>
throw new RuntimeException("UpdateStarAction should not be here.")
case _: InsertStarAction =>
throw new RuntimeException("InsertStarAction should not be here.")
SparkShimLoader.shim.copyInsertAction(
i,
alignAssignments(targetOutput, assignments, isInsert = true))
case _: UpdateStarAction | _: InsertStarAction =>
throw new RuntimeException(s"Star action should already be expanded: $action")
case _ =>
throw new RuntimeException(s"Can't recognize this action: $action")
}
PaimonMergeActionTags.carryFromStar(action, aligned)
}

private def recursiveAlignUpdates(
Expand All @@ -112,7 +110,7 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {
if (exactMatchedUpdate.isDefined) {
if (headMatchedUpdates.size == 1) {
// when an exact match (no nested fields) occurs, it must be the only match, then return it's expr
castIfNeeded(exactMatchedUpdate.get.expr, targetAttr.dataType)
resolveByNameAndCast(exactMatchedUpdate.get.expr, targetAttr.dataType)
} else {
// otherwise, there must be conflicting updates, for example:
// - update the same attr multiple times
Expand Down Expand Up @@ -177,4 +175,76 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {
}
}

/**
* Resolve an assignment value expression by-name against the target type, then cast if needed.
* Recursively reorders nested type fields (Struct, Array, Map and any combination) by name to
* match target field order before casting. This is consistent with Spark's native MERGE INTO
* behavior (see TableOutputResolver.resolveUpdate).
*/
private def resolveByNameAndCast(expression: Expression, targetType: DataType): Expression = {
if (PaimonUtils.sameType(expression.dataType, targetType)) {
// Types already structurally identical — no reordering needed.
// This guarantees idempotence when the rule is applied multiple times.
castIfNeeded(expression, targetType)
} else {
val reordered = reorderFieldsByName(expression, expression.dataType, targetType)
castIfNeeded(reordered, targetType)
}
}

/**
* Recursively reorder nested type fields by name to match target type's field order. Supports
* StructType, ArrayType and MapType in any nesting combination. Returns the original expression
* if no reordering is needed.
*/
private def reorderFieldsByName(
expression: Expression,
sourceType: DataType,
targetType: DataType): Expression = {
(sourceType, targetType) match {
case (s: StructType, t: StructType) if s != t =>
reorderStructByName(expression, s, t)
case (ArrayType(sElem, sNull), ArrayType(tElem, _)) if sElem != tElem =>
val elementVar = NamedLambdaVariable("element", sElem, sNull)
val reordered = reorderFieldsByName(elementVar, sElem, tElem)
ArrayTransform(expression, LambdaFunction(reordered, Seq(elementVar)))
case (MapType(sKey, sVal, sValNull), MapType(tKey, tVal, _))
if sKey != tKey || sVal != tVal =>
val keyVar = NamedLambdaVariable("key", sKey, nullable = false)
val valVar = NamedLambdaVariable("value", sVal, sValNull)
val reorderedKey = reorderFieldsByName(keyVar, sKey, tKey)
val reorderedVal = reorderFieldsByName(valVar, sVal, tVal)
val newKeys = ArrayTransform(MapKeys(expression), LambdaFunction(reorderedKey, Seq(keyVar)))
val newVals =
ArrayTransform(MapValues(expression), LambdaFunction(reorderedVal, Seq(valVar)))
MapFromArrays(newKeys, newVals)
case _ =>
expression
}
}

/** Reorder source struct fields to match target field order by name, recursing into nested types. */
private def reorderStructByName(
Comment thread
Zouxxyy marked this conversation as resolved.
expression: Expression,
sourceStruct: StructType,
targetStruct: StructType): Expression = {
val reorderedFields = targetStruct.map {
targetField =>
sourceStruct.fields.zipWithIndex.find(_._1.name == targetField.name) match {
case Some((sourceField, sourceIdx)) =>
val fieldExpr = GetStructField(expression, sourceIdx, Some(sourceField.name))
val reordered =
reorderFieldsByName(fieldExpr, sourceField.dataType, targetField.dataType)
Alias(reordered, targetField.name)()
case None =>
Alias(Literal(null, targetField.dataType), targetField.name)()
}
}
val struct = CreateNamedStruct(reorderedFields.flatMap(a => Seq(Literal(a.name), a.child)))
if (expression.nullable) {
If(IsNull(expression), Literal(null, struct.dataType), struct)
} else {
struct
}
}
}
Loading