Skip to content

Commit

Permalink
Fix Merge CDC delete with duplicate matches bug and add tests
Browse files Browse the repository at this point in the history
Resolves #1274.

This adds tests for a Merge + CDF bug for delete merges with duplicate matches and as well as a fix. Implementation details are explained throughout the code in comments.

This PR adds tests to `MergeCDCSuite`.

Closes #1309

GitOrigin-RevId: 2fdb845a6de144babce2efe393fae8b2e5eefbbf
  • Loading branch information
allisonport-db authored and tdas committed Aug 11, 2022
1 parent ff83a37 commit ef49ae2
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 31 deletions.
Expand Up @@ -32,10 +32,9 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, Literal, PredicateHelper, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.BasePredicate
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BasePredicate, Expression, Literal, NamedExpression, PredicateHelper, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
Expand Down Expand Up @@ -553,7 +552,32 @@ case class MergeIntoCommand(
): Seq[FileAction] = recordMergeOperation(sqlMetricName = "rewriteTimeMs") {
import org.apache.spark.sql.catalyst.expressions.Literal.{TrueLiteral, FalseLiteral}

val targetOutputCols = getTargetOutputCols(deltaTxn)
val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata)

var targetOutputCols = getTargetOutputCols(deltaTxn)
var outputRowSchema = deltaTxn.metadata.schema

// When we have duplicate matches (only allowed when the whenMatchedCondition is a delete with
// no match condition) we will incorrectly generate duplicate CDC rows.
// Duplicate matches can be due to:
// - Duplicate rows in the source w.r.t. the merge condition
// - A target-only or source-only merge condition, which essentially turns our join into a cross
// join with the target/source satisfiying the merge condition.
// These duplicate matches are dropped from the main data output since this is a delete
// operation, but the duplicate CDC rows are not removed by default.
// See https://github.com/delta-io/delta/issues/1274

// We address this specific scenario by adding row ids to the target before performing our join.
// There should only be one CDC delete row per target row so we can use these row ids to dedupe
// the duplicate CDC delete rows.

// We also need to address the scenario when there are duplicate matches with delete and we
// insert duplicate rows. Here we need to additionally add row ids to the source before the
// join to avoid dropping these valid duplicate inserted rows and their corresponding cdc rows.

// When there is an insert clause, we set SOURCE_ROW_ID_COL=null for all delete rows because we
// need to drop the duplicate matches.
val isDeleteWithDuplicateMatchesAndCdc = multipleMatchDeleteOnlyOvercount.nonEmpty && cdcEnabled

// Generate a new logical plan that has same output attributes exprIds as the target plan.
// This allows us to apply the existing resolved update/insert expressions.
Expand Down Expand Up @@ -583,16 +607,21 @@ case class MergeIntoCommand(
// with value `true`, one to each side of the join. Whether this field is null or not after
// the outer join, will allow us to identify whether the resultant joined row was a
// matched inner result or an unmatched result with null on one side.
val joinedDF = {
val sourceDF = Dataset.ofRows(spark, source)
.withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr))
val targetDF = Dataset.ofRows(spark, newTarget)
.withColumn(TARGET_ROW_PRESENT_COL, lit(true))
sourceDF.join(targetDF, new Column(condition), joinType)
// We add row IDs to the targetDF if we have a delete-when-matched clause with duplicate
// matches and CDC is enabled, and additionally add row IDs to the source if we also have an
// insert clause. See above at isDeleteWithDuplicateMatchesAndCdc definition for more details.
var sourceDF = Dataset.ofRows(spark, source)
.withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr))
var targetDF = Dataset.ofRows(spark, newTarget)
.withColumn(TARGET_ROW_PRESENT_COL, lit(true))
if (isDeleteWithDuplicateMatchesAndCdc) {
targetDF = targetDF.withColumn(TARGET_ROW_ID_COL, monotonically_increasing_id())
if (notMatchedClauses.nonEmpty) { // insert clause
sourceDF = sourceDF.withColumn(SOURCE_ROW_ID_COL, monotonically_increasing_id())
}
}

val joinedDF = sourceDF.join(targetDF, new Column(condition), joinType)
val joinedPlan = joinedDF.queryExecution.analyzed
val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata)

def resolveOnJoinedPlan(exprs: Seq[Expression]): Seq[Expression] = {
tryResolveReferencesForExpressions(spark, exprs, joinedPlan)
Expand All @@ -619,6 +648,31 @@ case class MergeIntoCommand(
// Depending on the clause and whether CDC is enabled, we output between 0 and 3 rows, as a
// Seq[Seq[Expression]]

// There is one corner case outlined above at isDeleteWithDuplicateMatchesAndCdc definition.
// When we have a delete-ONLY merge with duplicate matches we have N + 4 columns:
// N target cols, TARGET_ROW_ID_COL, ROW_DROPPED_COL, INCR_ROW_COUNT_COL, CDC_TYPE_COLUMN_NAME
// When we have a delete-when-matched merge with duplicate matches + an insert clause, we have
// N + 5 columns:
// N target cols, TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL, ROW_DROPPED_COL, INCR_ROW_COUNT_COL,
// CDC_TYPE_COLUMN_NAME
// These ROW_ID_COL will always be dropped before the final write.

if (isDeleteWithDuplicateMatchesAndCdc) {
targetOutputCols = targetOutputCols :+ UnresolvedAttribute(TARGET_ROW_ID_COL)
outputRowSchema = outputRowSchema.add(TARGET_ROW_ID_COL, DataTypes.LongType)
if (notMatchedClauses.nonEmpty) { // there is an insert clause, make SRC_ROW_ID_COL=null
targetOutputCols = targetOutputCols :+ Alias(Literal(null), SOURCE_ROW_ID_COL)()
outputRowSchema = outputRowSchema.add(SOURCE_ROW_ID_COL, DataTypes.LongType)
}
}

if (cdcEnabled) {
outputRowSchema = outputRowSchema
.add(ROW_DROPPED_COL, DataTypes.BooleanType)
.add(INCR_ROW_COUNT_COL, DataTypes.BooleanType)
.add(CDC_TYPE_COLUMN_NAME, DataTypes.StringType)
}

def matchedClauseOutput(clause: DeltaMergeIntoMatchedClause): Seq[Seq[Expression]] = {
val exprs = clause match {
case u: DeltaMergeIntoUpdateClause =>
Expand Down Expand Up @@ -663,9 +717,20 @@ case class MergeIntoCommand(
def notMatchedClauseOutput(clause: DeltaMergeIntoInsertClause): Seq[Seq[Expression]] = {
// Generate insert expressions and set ROW_DELETED_COL = false and
// CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC
val insertExprs = clause.resolvedActions.map(_.expr)
val mainDataOutput = resolveOnJoinedPlan(
clause.resolvedActions.map(_.expr) :+ FalseLiteral :+ incrInsertedCountExpr :+
Literal(CDC_TYPE_NOT_CDC))
if (isDeleteWithDuplicateMatchesAndCdc) {
// Must be delete-when-matched merge with duplicate matches + insert clause
// Therefore we must keep the target row id and source row id. Since this is a not-matched
// clause we know the target row-id will be null. See above at
// isDeleteWithDuplicateMatchesAndCdc definition for more details.
insertExprs :+
Alias(Literal(null), TARGET_ROW_ID_COL)() :+ UnresolvedAttribute(SOURCE_ROW_ID_COL) :+
FalseLiteral :+ incrInsertedCountExpr :+ Literal(CDC_TYPE_NOT_CDC)
} else {
insertExprs :+ FalseLiteral :+ incrInsertedCountExpr :+ Literal(CDC_TYPE_NOT_CDC)
}
)
if (cdcEnabled) {
// For insert we have the same expressions as for mainDataOutput, but with
// INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in
Expand All @@ -683,15 +748,6 @@ case class MergeIntoCommand(
resolveOnJoinedPlan(Seq(condExpr)).head
}

val outputRowSchema = if (!cdcEnabled) {
deltaTxn.metadata.schema
} else {
deltaTxn.metadata.schema
.add(ROW_DROPPED_COL, DataTypes.BooleanType)
.add(INCR_ROW_COUNT_COL, DataTypes.BooleanType)
.add(CDC_TYPE_COLUMN_NAME, DataTypes.StringType)
}

val joinedRowEncoder = RowEncoder(joinedPlan.schema)
val outputRowEncoder = RowEncoder(outputRowSchema).resolveAndBind()

Expand All @@ -712,9 +768,30 @@ case class MergeIntoCommand(
joinedRowEncoder = joinedRowEncoder,
outputRowEncoder = outputRowEncoder)

val outputDF =
var outputDF =
Dataset.ofRows(spark, joinedPlan).mapPartitions(processor.processPartition)(outputRowEncoder)
.drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL)

if (isDeleteWithDuplicateMatchesAndCdc) {
// When we have a delete when matched clause with duplicate matches we have to remove
// duplicate CDC rows. This scenario is further explained at
// isDeleteWithDuplicateMatchesAndCdc definition.

// To remove duplicate CDC rows generated by the duplicate matches we dedupe by
// TARGET_ROW_ID_COL since there should only be one CDC delete row per target row.
// When there is an insert clause in addition to the delete clause we additionally dedupe by
// SOURCE_ROW_ID_COL and CDC_TYPE_COLUMN_NAME to avoid dropping valid duplicate inserted rows
// and their corresponding CDC rows.
val columnsToDedupeBy = if (notMatchedClauses.nonEmpty) { // insert clause
Seq(TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL, CDC_TYPE_COLUMN_NAME)
} else {
Seq(TARGET_ROW_ID_COL)
}
outputDF = outputDF
.dropDuplicates(columnsToDedupeBy)
.drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL, TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL)
} else {
outputDF = outputDF.drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL)
}

logDebug("writeAllChanges: join output plan:\n" + outputDF.queryExecution)

Expand Down Expand Up @@ -873,6 +950,8 @@ object MergeIntoCommand {
val TOUCHED_FILES_ACCUM_NAME = "internal.metrics.MergeIntoDelta.touchedFiles"

val ROW_ID_COL = "_row_id_"
val TARGET_ROW_ID_COL = "_target_row_id_"
val SOURCE_ROW_ID_COL = "_source_row_id_"
val FILE_NAME_COL = "_file_name_"
val SOURCE_ROW_PRESENT_COL = "_source_row_present_"
val TARGET_ROW_PRESENT_COL = "_target_row_present_"
Expand Down
Expand Up @@ -201,13 +201,20 @@ trait MergeIntoMetricsBase
* @param mergeCmdFn The function that actually runs the merge command.
* @param expectedOpMetrics A map with values for expected operation metrics.
* @param testConfig The configuration options for this test
* @param overrideExpectedOpMetrics Sequence of expected operation metric values to override from
* those provided in expectedOpMetrics for specific
* configurations of partitioned and cdfEnabled. Elements
* provided as:
* ((partitioned, cdfEnabled), (metric_name, metric_value))
*/
private def runMergeCmdAndTestMetrics(
targetDf: DataFrame,
sourceDf: DataFrame,
mergeCmdFn: MergeCmd,
expectedOpMetrics: Map[String, Int],
testConfig: MergeTestConfiguration): Unit = {
testConfig: MergeTestConfiguration,
overrideExpectedOpMetrics: Seq[((Boolean, Boolean), (String, Int))] = Seq.empty
): Unit = {
withSQLConf(
DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true",
DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> testConfig.cdfEnabled.toString
Expand Down Expand Up @@ -245,9 +252,16 @@ trait MergeIntoMetricsBase

// Get the default row operation metrics and override them with the provided ones.
val metricsWithDefaultZeroValue = mergeRowMetrics.map(_ -> "0").toMap
val expectedOpMetricsWithDefaults = metricsWithDefaultZeroValue ++
var expectedOpMetricsWithDefaults = metricsWithDefaultZeroValue ++
expectedOpMetrics.filter(m => m._2 >= 0).mapValues(_.toString)

overrideExpectedOpMetrics.foreach { case ((partitioned, cdfEnabled), (metric, value)) =>
if (partitioned == testConfig.partitioned && cdfEnabled == testConfig.cdfEnabled) {
expectedOpMetricsWithDefaults = expectedOpMetricsWithDefaults +
(metric -> value.toString)
}
}

// Check that all operation metrics are positive numbers.
for ((metricName, metricValue) <- operationMetrics) {
assert(metricValue.toLong >= 0,
Expand Down Expand Up @@ -958,7 +972,13 @@ trait MergeIntoMetricsBase
sourceDf = sourceDf,
mergeCmdFn = mergeCmdFn,
expectedOpMetrics = expectedOpMetrics,
testConfig = testConfig
testConfig = testConfig,
// When cdf=true in this test we hit the corner case where there are duplicate matches with a
// delete clause and we generate duplicate cdc data. This is further detailed in
// MergeIntoCommand at the definition of isDeleteWithDuplicateMatchesAndCdc. Our fix for this
// scenario includes deduplicating the output data which reshuffles the output data.
// Thus when the table is not partitioned, the data is rewritten into 1 new file rather than 2
overrideExpectedOpMetrics = Seq(((false, true), ("numTargetFilesAdded", 1)))
)
}}

Expand Down
121 changes: 118 additions & 3 deletions core/src/test/scala/org/apache/spark/sql/delta/cdc/MergeCDCSuite.scala
Expand Up @@ -54,13 +54,19 @@ class MergeCDCSuite extends MergeIntoSQLSuite with DeltaColumnMappingTestUtils {
val insertClauses = Option(insert).map(i => this.insert(values = i)).toSeq
val deleteClauses = Option(deleteWhen).map(d => this.delete(condition = d)).toSeq
testMergeCdcUnlimitedClauses(name)(
target, source, deleteClauses ++ updateClauses ++ insertClauses,
expectedTableData, expectedCdcData, expectErrorContains, confs)
target = target,
source = source,
clauses = deleteClauses ++ updateClauses ++ insertClauses,
expectedTableData = expectedTableData,
expectedCdcData = expectedCdcData,
expectErrorContains = expectErrorContains,
confs = confs)
}

private def testMergeCdcUnlimitedClauses(name: String)(
target: => DataFrame,
source: => DataFrame,
mergeCondition: String = "s.key = t.key",
clauses: Seq[MergeClause],
expectedTableData: => DataFrame = null,
expectedCdcData: => DataFrame = null,
Expand All @@ -79,7 +85,7 @@ class MergeCDCSuite extends MergeIntoSQLSuite with DeltaColumnMappingTestUtils {
}
assert(ex.getMessage.contains(expectErrorContains))
} else {
executeMerge(s"delta.`$tempPath` t", s"source s", "s.key = t.key",
executeMerge(s"delta.`$tempPath` t", s"source s", mergeCondition,
clauses.toSeq: _*)
checkAnswer(
spark.read.format("delta").load(tempPath),
Expand Down Expand Up @@ -187,6 +193,115 @@ class MergeCDCSuite extends MergeIntoSQLSuite with DeltaColumnMappingTestUtils {
confs = (DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true") :: Nil
)

testMergeCdcUnlimitedClauses("unconditional delete only with duplicate matches")(
target = Seq(0, 1).toDF("value"),
source = Seq(1, 1).toDF("value"),
mergeCondition = "t.value = s.value",
clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil,
expectedTableData = Seq(0).toDF(),
expectedCdcData = ((1, "delete", 1) :: Nil).toDF()
)

testMergeCdcUnlimitedClauses(
"unconditional delete only with duplicate matches without duplicates rows in the source")(
target = Seq(0).toDF("value"),
source = ((0, 0) :: (0, 1) :: Nil).toDF("col1", "col2"),
mergeCondition = "t.value = s.col1",
clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil,
expectedTableData =
Nil.asInstanceOf[List[Integer]]
.toDF("value"),
expectedCdcData = ((0, "delete", 1) :: Nil).toDF()
)

testMergeCdcUnlimitedClauses(
"unconditional delete only with duplicate matches with duplicates in the target")(
target = Seq(0, 1, 1).toDF("value"),
source = Seq(1, 1).toDF("value"),
mergeCondition = "t.value = s.value",
clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil,
expectedTableData = Seq(0).toDF(),
expectedCdcData = ((1, "delete", 1) :: (1, "delete", 1) :: Nil).toDF()
)

testMergeCdcUnlimitedClauses("unconditional delete only with target-only merge condition")(
target = Seq(0, 1).toDF("value"),
source = Seq(0, 1).toDF("value"),
mergeCondition = "t.value > 0",
clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil,
expectedTableData = Seq(0).toDF(),
expectedCdcData = ((1, "delete", 1) :: Nil).toDF()
)

testMergeCdcUnlimitedClauses(
"unconditional delete only with target-only merge condition with duplicates in the target")(
target = Seq(0, 1, 1).toDF("value"),
source = Seq(0, 1).toDF("value"),
mergeCondition = "t.value > 0",
clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil,
expectedTableData = Seq(0).toDF(),
expectedCdcData = ((1, "delete", 1) :: (1, "delete", 1) :: Nil).toDF()
)

testMergeCdcUnlimitedClauses("unconditional delete only with source-only merge condition")(
target = Seq(0, 1).toDF("value"),
source = Seq(0, 1).toDF("value"),
mergeCondition = "s.value < 2",
clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil,
expectedTableData =
Nil.asInstanceOf[List[Integer]]
.toDF("value"),
expectedCdcData = ((0, "delete", 1) :: (1, "delete", 1) :: Nil).toDF()
)

testMergeCdcUnlimitedClauses(
"unconditional delete only with source-only merge condition with duplicates in the target")(
target = Seq(0, 1, 1).toDF("value"),
source = Seq(0, 1).toDF("value"),
mergeCondition = "s.value < 2",
clauses = MergeClause(isMatched = true, null, "DELETE") :: Nil,
expectedTableData =
Nil.asInstanceOf[List[Integer]]
.toDF("value"),
expectedCdcData = ((0, "delete", 1) :: (1, "delete", 1) :: (1, "delete", 1) :: Nil).toDF()
)

testMergeCdcUnlimitedClauses("unconditional delete with duplicate matches + insert")(
target = ((1, 1) :: (2, 2) :: Nil).toDF("key", "value"),
source = ((1, 10) :: (1, 100) :: (3, 30) :: (3, 300) :: Nil).toDF("key", "value"),
mergeCondition = "s.key = t.key",
clauses = MergeClause(isMatched = true, null, "DELETE") ::
insert(values = "(key, value) VALUES (s.key, s.value)") :: Nil,
expectedTableData = ((2, 2) :: (3, 30) :: (3, 300) :: Nil).toDF("key", "value"),
expectedCdcData =
((1, 1, "delete", 1) :: (3, 30, "insert", 1) :: (3, 300, "insert", 1) :: Nil).toDF()
)

testMergeCdcUnlimitedClauses(
"unconditional delete with duplicate matches + insert with duplicate rows")(
target = ((1, 1) :: (2, 2) :: Nil).toDF("key", "value"),
source = ((1, 10) :: (1, 100) :: (3, 30) :: (3, 300) :: (3, 300) :: Nil).toDF("key", "value"),
mergeCondition = "s.key = t.key",
clauses = MergeClause(isMatched = true, null, "DELETE") ::
insert(values = "(key, value) VALUES (s.key, s.value)") :: Nil,
expectedTableData = ((2, 2) :: (3, 30) :: (3, 300) :: (3, 300) :: Nil).toDF("key", "value"),
expectedCdcData =
((1, 1, "delete", 1) :: (3, 30, "insert", 1) :: (3, 300, "insert", 1) ::
(3, 300, "insert", 1) :: Nil).toDF()
)

testMergeCdcUnlimitedClauses("unconditional delete with duplicate matches " +
"+ insert a duplicate of the unmatched target rows")(
target = Seq(1, 2).toDF("value"),
source = ((1, 10) :: (1, 100) :: (3, 2) :: Nil).toDF("col1", "col2"),
mergeCondition = "s.col1 = t.value",
clauses = MergeClause(isMatched = true, null, "DELETE") ::
insert(values = "(value) VALUES (col2)") :: Nil,
expectedTableData = Seq(2, 2).toDF(),
expectedCdcData =
((1, "delete", 1) :: (2, "insert", 1) :: Nil).toDF()
)

testMergeCdcUnlimitedClauses("all conditions failed for all rows")(
target = Seq((1, "a"), (2, "b")).toDF("key", "val"),
source = Seq((1, "t"), (2, "u")).toDF("key", "val"),
Expand Down

0 comments on commit ef49ae2

Please sign in to comment.