Skip to content

Commit

Permalink
Fix inconsistent field metadata in MERGE
Browse files Browse the repository at this point in the history
-Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

Fixes an issue where internal metadata attached to fields leaks into the plan constructed when executing a MERGE command, causing the same attribute to appear both with and without the internal metadata.

This can cause plan validation to fail due to the same attribute having two apparently different data types (metadata is part of a field datatype).

Added test that would fail without the fix

Closes #2612

GitOrigin-RevId: 50688200c53f9450512b76ee2d375b2e55db8216
  • Loading branch information
johanl-db authored and vkorukanti committed Feb 14, 2024
1 parent cc1660b commit 8db9617
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ trait MergeIntoCommandBase extends LeafRunnableCommand
fileIndex: TahoeFileIndex,
columnsToDrop: Seq[String]): LogicalPlan = {

val targetOutputCols = getTargetOutputCols(deltaTxn)
val targetOutputCols = getTargetOutputCols(spark, deltaTxn)

val plan = {

Expand Down Expand Up @@ -339,8 +339,16 @@ trait MergeIntoCommandBase extends LeafRunnableCommand
* this transaction, since new columns will have a value of null for all existing rows.
*/
protected def getTargetOutputCols(
txn: OptimisticTransaction, makeNullable: Boolean = false): Seq[NamedExpression] = {
txn.metadata.schema.map { col =>
spark: SparkSession,
txn: OptimisticTransaction,
makeNullable: Boolean = false)
: Seq[NamedExpression] = {
// Internal metadata attached to the table schema must not leak into the the target plan to
// prevent inconsistencies - e.p. metadata matters when comparing data type of struct with
// nested fields.
val schema = DeltaColumnMapping.dropColumnMappingMetadata(
DeltaTableUtils.removeInternalMetadata(spark, txn.metadata.schema))
schema.map { col =>
targetOutputAttributesMap
.get(col.name)
.map { a =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {

// The target output columns need to be marked as nullable here, as they are going to be used
// to reference the output of an outer join.
val targetOutputCols = getTargetOutputCols(deltaTxn, makeNullable = true)
val targetOutputCols = getTargetOutputCols(spark, deltaTxn, makeNullable = true)

// If there are N columns in the target table, the full outer join output will have:
// - N columns for target table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ trait InsertOnlyMergeExecutor extends MergeOutputGeneration {
sourceDF
}

val outputDF = generateInsertsOnlyOutputDF(preparedSourceDF, deltaTxn)
val outputDF = generateInsertsOnlyOutputDF(spark, preparedSourceDF, deltaTxn)
logDebug(s"$extraOpType: output plan:\n" + outputDF.queryExecution)

val newFiles = writeFiles(spark, deltaTxn, outputDF)
Expand Down Expand Up @@ -142,10 +142,11 @@ trait InsertOnlyMergeExecutor extends MergeOutputGeneration {
* and when there are multiple insert clauses.
*/
private def generateInsertsOnlyOutputDF(
spark: SparkSession,
preparedSourceDF: DataFrame,
deltaTxn: OptimisticTransaction): DataFrame = {

val targetOutputColNames = getTargetOutputCols(deltaTxn).map(_.name)
val targetOutputColNames = getTargetOutputCols(spark, deltaTxn).map(_.name)

// When there is only one insert clause, there is no need for ROW_DROPPED_COL and
// output df can be generated without CaseWhen.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3154,6 +3154,37 @@ abstract class MergeIntoSuiteBase
expectedErrorMsgForDataSetTempView = null
)

test("merge correctly handle field metadata") {
withTable("source", "target") {
// Create a target table with user metadata (comments) and internal metadata (column mapping
// information) on both a top-level column and a nested field.
sql(
"""
|CREATE TABLE target(
| key int not null COMMENT 'data column',
| value int not null,
| cstruct struct<foo int COMMENT 'foo field'>)
|USING DELTA
|TBLPROPERTIES (
| 'delta.minReaderVersion' = '2',
| 'delta.minWriterVersion' = '5',
| 'delta.columnMapping.mode' = 'name')
""".stripMargin
)
sql(s"INSERT INTO target VALUES (0, 0, null)")

sql("CREATE TABLE source (key int not null, value int not null) USING DELTA")
sql(s"INSERT INTO source VALUES (1, 1)")

executeMerge(
tgt = "target",
src = "source",
cond = "source.key = target.key",
update(condition = "target.key = 1", set = "target.value = 42"),
updateNotMatched(condition = "target.key = 100", set = "target.value = 22"))
}
}

test("UDT Data Types - simple and nested") {
withTable("source") {
withTable("target") {
Expand Down

0 comments on commit 8db9617

Please sign in to comment.