diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala index 1969239f84..f493946d7e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala @@ -32,10 +32,9 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.SparkContext import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, Literal, PredicateHelper, UnsafeProjection} -import org.apache.spark.sql.catalyst.expressions.BasePredicate -import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BasePredicate, Expression, Literal, NamedExpression, PredicateHelper, UnsafeProjection} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -553,7 +552,32 @@ case class MergeIntoCommand( ): Seq[FileAction] = recordMergeOperation(sqlMetricName = "rewriteTimeMs") { import org.apache.spark.sql.catalyst.expressions.Literal.{TrueLiteral, FalseLiteral} - val targetOutputCols = getTargetOutputCols(deltaTxn) + val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata) + + var targetOutputCols = getTargetOutputCols(deltaTxn) + var outputRowSchema = deltaTxn.metadata.schema + + // When we have duplicate matches (only allowed when the whenMatchedCondition is a delete with + // no match condition) we will incorrectly generate duplicate CDC rows. + // Duplicate matches can be due to: + // - Duplicate rows in the source w.r.t. the merge condition + // - A target-only or source-only merge condition, which essentially turns our join into a cross + // join with the target/source satisfiying the merge condition. + // These duplicate matches are dropped from the main data output since this is a delete + // operation, but the duplicate CDC rows are not removed by default. + // See https://github.com/delta-io/delta/issues/1274 + + // We address this specific scenario by adding row ids to the target before performing our join. + // There should only be one CDC delete row per target row so we can use these row ids to dedupe + // the duplicate CDC delete rows. + + // We also need to address the scenario when there are duplicate matches with delete and we + // insert duplicate rows. Here we need to additionally add row ids to the source before the + // join to avoid dropping these valid duplicate inserted rows and their corresponding cdc rows. + + // When there is an insert clause, we set SOURCE_ROW_ID_COL=null for all delete rows because we + // need to drop the duplicate matches. + val isDeleteWithDuplicateMatchesAndCdc = multipleMatchDeleteOnlyOvercount.nonEmpty && cdcEnabled // Generate a new logical plan that has same output attributes exprIds as the target plan. // This allows us to apply the existing resolved update/insert expressions. @@ -583,16 +607,21 @@ case class MergeIntoCommand( // with value `true`, one to each side of the join. Whether this field is null or not after // the outer join, will allow us to identify whether the resultant joined row was a // matched inner result or an unmatched result with null on one side. - val joinedDF = { - val sourceDF = Dataset.ofRows(spark, source) - .withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr)) - val targetDF = Dataset.ofRows(spark, newTarget) - .withColumn(TARGET_ROW_PRESENT_COL, lit(true)) - sourceDF.join(targetDF, new Column(condition), joinType) + // We add row IDs to the targetDF if we have a delete-when-matched clause with duplicate + // matches and CDC is enabled, and additionally add row IDs to the source if we also have an + // insert clause. See above at isDeleteWithDuplicateMatchesAndCdc definition for more details. + var sourceDF = Dataset.ofRows(spark, source) + .withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr)) + var targetDF = Dataset.ofRows(spark, newTarget) + .withColumn(TARGET_ROW_PRESENT_COL, lit(true)) + if (isDeleteWithDuplicateMatchesAndCdc) { + targetDF = targetDF.withColumn(TARGET_ROW_ID_COL, monotonically_increasing_id()) + if (notMatchedClauses.nonEmpty) { // insert clause + sourceDF = sourceDF.withColumn(SOURCE_ROW_ID_COL, monotonically_increasing_id()) + } } - + val joinedDF = sourceDF.join(targetDF, new Column(condition), joinType) val joinedPlan = joinedDF.queryExecution.analyzed - val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata) def resolveOnJoinedPlan(exprs: Seq[Expression]): Seq[Expression] = { tryResolveReferencesForExpressions(spark, exprs, joinedPlan) @@ -619,6 +648,31 @@ case class MergeIntoCommand( // Depending on the clause and whether CDC is enabled, we output between 0 and 3 rows, as a // Seq[Seq[Expression]] + // There is one corner case outlined above at isDeleteWithDuplicateMatchesAndCdc definition. + // When we have a delete-ONLY merge with duplicate matches we have N + 4 columns: + // N target cols, TARGET_ROW_ID_COL, ROW_DROPPED_COL, INCR_ROW_COUNT_COL, CDC_TYPE_COLUMN_NAME + // When we have a delete-when-matched merge with duplicate matches + an insert clause, we have + // N + 5 columns: + // N target cols, TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL, ROW_DROPPED_COL, INCR_ROW_COUNT_COL, + // CDC_TYPE_COLUMN_NAME + // These ROW_ID_COL will always be dropped before the final write. + + if (isDeleteWithDuplicateMatchesAndCdc) { + targetOutputCols = targetOutputCols :+ UnresolvedAttribute(TARGET_ROW_ID_COL) + outputRowSchema = outputRowSchema.add(TARGET_ROW_ID_COL, DataTypes.LongType) + if (notMatchedClauses.nonEmpty) { // there is an insert clause, make SRC_ROW_ID_COL=null + targetOutputCols = targetOutputCols :+ Alias(Literal(null), SOURCE_ROW_ID_COL)() + outputRowSchema = outputRowSchema.add(SOURCE_ROW_ID_COL, DataTypes.LongType) + } + } + + if (cdcEnabled) { + outputRowSchema = outputRowSchema + .add(ROW_DROPPED_COL, DataTypes.BooleanType) + .add(INCR_ROW_COUNT_COL, DataTypes.BooleanType) + .add(CDC_TYPE_COLUMN_NAME, DataTypes.StringType) + } + def matchedClauseOutput(clause: DeltaMergeIntoMatchedClause): Seq[Seq[Expression]] = { val exprs = clause match { case u: DeltaMergeIntoUpdateClause => @@ -663,9 +717,20 @@ case class MergeIntoCommand( def notMatchedClauseOutput(clause: DeltaMergeIntoInsertClause): Seq[Seq[Expression]] = { // Generate insert expressions and set ROW_DELETED_COL = false and // CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC + val insertExprs = clause.resolvedActions.map(_.expr) val mainDataOutput = resolveOnJoinedPlan( - clause.resolvedActions.map(_.expr) :+ FalseLiteral :+ incrInsertedCountExpr :+ - Literal(CDC_TYPE_NOT_CDC)) + if (isDeleteWithDuplicateMatchesAndCdc) { + // Must be delete-when-matched merge with duplicate matches + insert clause + // Therefore we must keep the target row id and source row id. Since this is a not-matched + // clause we know the target row-id will be null. See above at + // isDeleteWithDuplicateMatchesAndCdc definition for more details. + insertExprs :+ + Alias(Literal(null), TARGET_ROW_ID_COL)() :+ UnresolvedAttribute(SOURCE_ROW_ID_COL) :+ + FalseLiteral :+ incrInsertedCountExpr :+ Literal(CDC_TYPE_NOT_CDC) + } else { + insertExprs :+ FalseLiteral :+ incrInsertedCountExpr :+ Literal(CDC_TYPE_NOT_CDC) + } + ) if (cdcEnabled) { // For insert we have the same expressions as for mainDataOutput, but with // INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in @@ -683,15 +748,6 @@ case class MergeIntoCommand( resolveOnJoinedPlan(Seq(condExpr)).head } - val outputRowSchema = if (!cdcEnabled) { - deltaTxn.metadata.schema - } else { - deltaTxn.metadata.schema - .add(ROW_DROPPED_COL, DataTypes.BooleanType) - .add(INCR_ROW_COUNT_COL, DataTypes.BooleanType) - .add(CDC_TYPE_COLUMN_NAME, DataTypes.StringType) - } - val joinedRowEncoder = RowEncoder(joinedPlan.schema) val outputRowEncoder = RowEncoder(outputRowSchema).resolveAndBind() @@ -712,9 +768,30 @@ case class MergeIntoCommand( joinedRowEncoder = joinedRowEncoder, outputRowEncoder = outputRowEncoder) - val outputDF = + var outputDF = Dataset.ofRows(spark, joinedPlan).mapPartitions(processor.processPartition)(outputRowEncoder) - .drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL) + + if (isDeleteWithDuplicateMatchesAndCdc) { + // When we have a delete when matched clause with duplicate matches we have to remove + // duplicate CDC rows. This scenario is further explained at + // isDeleteWithDuplicateMatchesAndCdc definition. + + // To remove duplicate CDC rows generated by the duplicate matches we dedupe by + // TARGET_ROW_ID_COL since there should only be one CDC delete row per target row. + // When there is an insert clause in addition to the delete clause we additionally dedupe by + // SOURCE_ROW_ID_COL and CDC_TYPE_COLUMN_NAME to avoid dropping valid duplicate inserted rows + // and their corresponding CDC rows. + val columnsToDedupeBy = if (notMatchedClauses.nonEmpty) { // insert clause + Seq(TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL, CDC_TYPE_COLUMN_NAME) + } else { + Seq(TARGET_ROW_ID_COL) + } + outputDF = outputDF + .dropDuplicates(columnsToDedupeBy) + .drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL, TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL) + } else { + outputDF = outputDF.drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL) + } logDebug("writeAllChanges: join output plan:\n" + outputDF.queryExecution) @@ -873,6 +950,8 @@ object MergeIntoCommand { val TOUCHED_FILES_ACCUM_NAME = "internal.metrics.MergeIntoDelta.touchedFiles" val ROW_ID_COL = "_row_id_" + val TARGET_ROW_ID_COL = "_target_row_id_" + val SOURCE_ROW_ID_COL = "_source_row_id_" val FILE_NAME_COL = "_file_name_" val SOURCE_ROW_PRESENT_COL = "_source_row_present_" val TARGET_ROW_PRESENT_COL = "_target_row_present_" diff --git a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala index 7e5f855206..9cf43198ba 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala @@ -201,13 +201,20 @@ trait MergeIntoMetricsBase * @param mergeCmdFn The function that actually runs the merge command. * @param expectedOpMetrics A map with values for expected operation metrics. * @param testConfig The configuration options for this test + * @param overrideExpectedOpMetrics Sequence of expected operation metric values to override from + * those provided in expectedOpMetrics for specific + * configurations of partitioned and cdfEnabled. Elements + * provided as: + * ((partitioned, cdfEnabled), (metric_name, metric_value)) */ private def runMergeCmdAndTestMetrics( targetDf: DataFrame, sourceDf: DataFrame, mergeCmdFn: MergeCmd, expectedOpMetrics: Map[String, Int], - testConfig: MergeTestConfiguration): Unit = { + testConfig: MergeTestConfiguration, + overrideExpectedOpMetrics: Seq[((Boolean, Boolean), (String, Int))] = Seq.empty + ): Unit = { withSQLConf( DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true", DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> testConfig.cdfEnabled.toString @@ -245,9 +252,16 @@ trait MergeIntoMetricsBase // Get the default row operation metrics and override them with the provided ones. val metricsWithDefaultZeroValue = mergeRowMetrics.map(_ -> "0").toMap - val expectedOpMetricsWithDefaults = metricsWithDefaultZeroValue ++ + var expectedOpMetricsWithDefaults = metricsWithDefaultZeroValue ++ expectedOpMetrics.filter(m => m._2 >= 0).mapValues(_.toString) + overrideExpectedOpMetrics.foreach { case ((partitioned, cdfEnabled), (metric, value)) => + if (partitioned == testConfig.partitioned && cdfEnabled == testConfig.cdfEnabled) { + expectedOpMetricsWithDefaults = expectedOpMetricsWithDefaults + + (metric -> value.toString) + } + } + // Check that all operation metrics are positive numbers. for ((metricName, metricValue) <- operationMetrics) { assert(metricValue.toLong >= 0, @@ -958,7 +972,13 @@ trait MergeIntoMetricsBase sourceDf = sourceDf, mergeCmdFn = mergeCmdFn, expectedOpMetrics = expectedOpMetrics, - testConfig = testConfig + testConfig = testConfig, + // When cdf=true in this test we hit the corner case where there are duplicate matches with a + // delete clause and we generate duplicate cdc data. This is further detailed in + // MergeIntoCommand at the definition of isDeleteWithDuplicateMatchesAndCdc. Our fix for this + // scenario includes deduplicating the output data which reshuffles the output data. + // Thus when the table is not partitioned, the data is rewritten into 1 new file rather than 2 + overrideExpectedOpMetrics = Seq(((false, true), ("numTargetFilesAdded", 1))) ) }} diff --git a/core/src/test/scala/org/apache/spark/sql/delta/cdc/MergeCDCSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/cdc/MergeCDCSuite.scala index 8c577a0364..53b82cf7a7 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/cdc/MergeCDCSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/cdc/MergeCDCSuite.scala @@ -54,13 +54,19 @@ class MergeCDCSuite extends MergeIntoSQLSuite with DeltaColumnMappingTestUtils { val insertClauses = Option(insert).map(i => this.insert(values = i)).toSeq val deleteClauses = Option(deleteWhen).map(d => this.delete(condition = d)).toSeq testMergeCdcUnlimitedClauses(name)( - target, source, deleteClauses ++ updateClauses ++ insertClauses, - expectedTableData, expectedCdcData, expectErrorContains, confs) + target = target, + source = source, + clauses = deleteClauses ++ updateClauses ++ insertClauses, + expectedTableData = expectedTableData, + expectedCdcData = expectedCdcData, + expectErrorContains = expectErrorContains, + confs = confs) } private def testMergeCdcUnlimitedClauses(name: String)( target: => DataFrame, source: => DataFrame, + mergeCondition: String = "s.key = t.key", clauses: Seq[MergeClause], expectedTableData: => DataFrame = null, expectedCdcData: => DataFrame = null, @@ -79,7 +85,7 @@ class MergeCDCSuite extends MergeIntoSQLSuite with DeltaColumnMappingTestUtils { } assert(ex.getMessage.contains(expectErrorContains)) } else { - executeMerge(s"delta.`$tempPath` t", s"source s", "s.key = t.key", + executeMerge(s"delta.`$tempPath` t", s"source s", mergeCondition, clauses.toSeq: _*) checkAnswer( spark.read.format("delta").load(tempPath), @@ -187,6 +193,115 @@ class MergeCDCSuite extends MergeIntoSQLSuite with DeltaColumnMappingTestUtils { confs = (DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true") :: Nil ) + testMergeCdcUnlimitedClauses("unconditional delete only with duplicate matches")( + target = Seq(0, 1).toDF("value"), + source = Seq(1, 1).toDF("value"), + mergeCondition = "t.value = s.value", + clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil, + expectedTableData = Seq(0).toDF(), + expectedCdcData = ((1, "delete", 1) :: Nil).toDF() + ) + + testMergeCdcUnlimitedClauses( + "unconditional delete only with duplicate matches without duplicates rows in the source")( + target = Seq(0).toDF("value"), + source = ((0, 0) :: (0, 1) :: Nil).toDF("col1", "col2"), + mergeCondition = "t.value = s.col1", + clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil, + expectedTableData = + Nil.asInstanceOf[List[Integer]] + .toDF("value"), + expectedCdcData = ((0, "delete", 1) :: Nil).toDF() + ) + + testMergeCdcUnlimitedClauses( + "unconditional delete only with duplicate matches with duplicates in the target")( + target = Seq(0, 1, 1).toDF("value"), + source = Seq(1, 1).toDF("value"), + mergeCondition = "t.value = s.value", + clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil, + expectedTableData = Seq(0).toDF(), + expectedCdcData = ((1, "delete", 1) :: (1, "delete", 1) :: Nil).toDF() + ) + + testMergeCdcUnlimitedClauses("unconditional delete only with target-only merge condition")( + target = Seq(0, 1).toDF("value"), + source = Seq(0, 1).toDF("value"), + mergeCondition = "t.value > 0", + clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil, + expectedTableData = Seq(0).toDF(), + expectedCdcData = ((1, "delete", 1) :: Nil).toDF() + ) + + testMergeCdcUnlimitedClauses( + "unconditional delete only with target-only merge condition with duplicates in the target")( + target = Seq(0, 1, 1).toDF("value"), + source = Seq(0, 1).toDF("value"), + mergeCondition = "t.value > 0", + clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil, + expectedTableData = Seq(0).toDF(), + expectedCdcData = ((1, "delete", 1) :: (1, "delete", 1) :: Nil).toDF() + ) + + testMergeCdcUnlimitedClauses("unconditional delete only with source-only merge condition")( + target = Seq(0, 1).toDF("value"), + source = Seq(0, 1).toDF("value"), + mergeCondition = "s.value < 2", + clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil, + expectedTableData = + Nil.asInstanceOf[List[Integer]] + .toDF("value"), + expectedCdcData = ((0, "delete", 1) :: (1, "delete", 1) :: Nil).toDF() + ) + + testMergeCdcUnlimitedClauses( + "unconditional delete only with source-only merge condition with duplicates in the target")( + target = Seq(0, 1, 1).toDF("value"), + source = Seq(0, 1).toDF("value"), + mergeCondition = "s.value < 2", + clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil, + expectedTableData = + Nil.asInstanceOf[List[Integer]] + .toDF("value"), + expectedCdcData = ((0, "delete", 1) :: (1, "delete", 1) :: (1, "delete", 1) :: Nil).toDF() + ) + + testMergeCdcUnlimitedClauses("unconditional delete with duplicate matches + insert")( + target = ((1, 1) :: (2, 2) :: Nil).toDF("key", "value"), + source = ((1, 10) :: (1, 100) :: (3, 30) :: (3, 300) :: Nil).toDF("key", "value"), + mergeCondition = "s.key = t.key", + clauses = MergeClause(isMatched = true, null, "DELETE") :: + insert(values = "(key, value) VALUES (s.key, s.value)") :: Nil, + expectedTableData = ((2, 2) :: (3, 30) :: (3, 300) :: Nil).toDF("key", "value"), + expectedCdcData = + ((1, 1, "delete", 1) :: (3, 30, "insert", 1) :: (3, 300, "insert", 1) :: Nil).toDF() + ) + + testMergeCdcUnlimitedClauses( + "unconditional delete with duplicate matches + insert with duplicate rows")( + target = ((1, 1) :: (2, 2) :: Nil).toDF("key", "value"), + source = ((1, 10) :: (1, 100) :: (3, 30) :: (3, 300) :: (3, 300) :: Nil).toDF("key", "value"), + mergeCondition = "s.key = t.key", + clauses = MergeClause(isMatched = true, null, "DELETE") :: + insert(values = "(key, value) VALUES (s.key, s.value)") :: Nil, + expectedTableData = ((2, 2) :: (3, 30) :: (3, 300) :: (3, 300) :: Nil).toDF("key", "value"), + expectedCdcData = + ((1, 1, "delete", 1) :: (3, 30, "insert", 1) :: (3, 300, "insert", 1) :: + (3, 300, "insert", 1) :: Nil).toDF() + ) + + testMergeCdcUnlimitedClauses("unconditional delete with duplicate matches " + + "+ insert a duplicate of the unmatched target rows")( + target = Seq(1, 2).toDF("value"), + source = ((1, 10) :: (1, 100) :: (3, 2) :: Nil).toDF("col1", "col2"), + mergeCondition = "s.col1 = t.value", + clauses = MergeClause(isMatched = true, null, "DELETE") :: + insert(values = "(value) VALUES (col2)") :: Nil, + expectedTableData = Seq(2, 2).toDF(), + expectedCdcData = + ((1, "delete", 1) :: (2, "insert", 1) :: Nil).toDF() + ) + testMergeCdcUnlimitedClauses("all conditions failed for all rows")( target = Seq((1, "a"), (2, "b")).toDF("key", "val"), source = Seq((1, "t"), (2, "u")).toDF("key", "val"),