From d08a4c39ca2f257642f707264086bd825946459a Mon Sep 17 00:00:00 2001 From: Eric Simmerman Date: Wed, 24 Apr 2024 17:14:15 -0400 Subject: [PATCH] Changeset values should honor comparators (#238) * Alter change column behavior so that it honors comparators when determining what fields have changed * Address requested changes * Linter updates --- .../uk/co/gresearch/spark/diff/Diff.scala | 12 ++++--- .../spark/diff/DiffComparatorSuite.scala | 31 +++++++++++++++++++ 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala index 9106de15..1573b27f 100644 --- a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala +++ b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala @@ -19,6 +19,7 @@ package uk.co.gresearch.spark.diff import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{ArrayType, StringType} +import uk.co.gresearch.spark.diff.comparator.DiffComparator import uk.co.gresearch.spark.{backticks, distinctPrefixFor} import scala.collection.JavaConverters @@ -144,7 +145,7 @@ class Differ(options: DiffOptions) { private def getChangeColumn( existsColumnName: String, - valueColumns: Seq[String], + valueVolumnsWithComparator: Seq[(String, DiffComparator)], left: Dataset[_], right: Dataset[_] ): Option[Column] = { @@ -152,12 +153,14 @@ class Differ(options: DiffOptions) { .map(changeColumn => when(left(existsColumnName).isNull || right(existsColumnName).isNull, lit(null)) .otherwise( - Some(valueColumns.toSeq) + Some(valueVolumnsWithComparator) .filter(_.nonEmpty) .map(columns => concat( columns - .map(c => when(left(backticks(c)) <=> right(backticks(c)), array()).otherwise(array(lit(c)))): _* + .map { case (c, cmp) => + when(cmp.equiv(left(backticks(c)), right(backticks(c))), array()).otherwise(array(lit(c))) + }: _* ) ) .getOrElse( @@ -282,6 +285,7 @@ class Differ(options: DiffOptions) { cmp.equiv(leftWithExists(backticks(c)), rightWithExists(backticks(c))) } .reduceOption(_ && _) + val changeCondition = not(unChanged.getOrElse(lit(true))) val diffActionColumn = @@ -292,7 +296,7 @@ class Differ(options: DiffOptions) { .as(options.diffColumn) val diffColumns = getDiffColumns(pkColumns, valueColumns, left, right, ignoreColumns).map(_._2) - val changeColumn = getChangeColumn(existsColumnName, valueColumns, leftWithExists, rightWithExists) + val changeColumn = getChangeColumn(existsColumnName, valueVolumnsWithComparator, leftWithExists, rightWithExists) // turn this column into a sequence of one or none column so we can easily concat it below with diffActionColumn and diffColumns .map(Seq(_)) .getOrElse(Seq.empty[Column]) diff --git a/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala index 5b6022a7..af43781c 100644 --- a/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala +++ b/src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala @@ -33,6 +33,7 @@ import uk.co.gresearch.spark.diff.comparator._ import java.sql.{Date, Timestamp} import java.time.Duration +import java.util case class Numbers( id: Int, @@ -412,6 +413,36 @@ class DiffComparatorSuite extends AnyFunSuite with SparkTestSession { DiffOptions.default.withComparator(DiffComparators.duration(Duration.ofSeconds(61)).asExclusive(), "time") doTest(optionsWithTightComparator, optionsWithRelaxedComparator, leftTimes.toDF, rightTimes.toDF) } + + test("changeset accounts for comparators") { + val changesetOptions = DiffOptions.default + .withComparator(DiffComparators.epsilon(10).asAbsolute().asInclusive(), "longValue") + .withChangeColumn("changeset") + + lazy val left: Dataset[Numbers] = Seq( + Numbers(1, 1L, 1.0f, 1.0, Decimal(10, 8, 3), None, None), + Numbers(2, 2L, 2.0f, 2.0, Decimal(20, 8, 3), Some(2), Some(2L)), + Numbers(3, 3L, 3.0f, 3.0, Decimal(30, 8, 3), Some(3), Some(3L)), + Numbers(4, 4L, 4.0f, 4.0, Decimal(40, 8, 3), Some(4), None), + Numbers(5, 5L, 5.0f, 5.0, Decimal(50, 8, 3), None, Some(5L)), + ).toDS() + + lazy val right: Dataset[Numbers] = Seq( + Numbers(1, 1L, 1.0f, 1.0, Decimal(10, 8, 3), None, None), + Numbers(2, 8L, 2.0f, 2.0, Decimal(20, 8, 3), Some(2), Some(2L)), + Numbers(3, 9L, 6.0f, 3.0, Decimal(30, 8, 3), Some(3), Some(3L)), + Numbers(4, 10L, 4.0f, 4.0, Decimal(40, 8, 3), Some(4), None), + Numbers(5, 11L, 5.0f, 5.0, Decimal(50, 8, 3), None, Some(5L)), + ).toDS() + + val rs = left.diff(right, changesetOptions, "id").where($"diff" === "C") + assert(rs.count() == 1, "Only one row should differ with the numeric comparator applied") + val changesInDifferingRow: util.List[String] = rs.head.getList[String](1) + assert( + changesInDifferingRow.get(0) == "floatValue", + "Only floatVal differs after considering the comparators so the changeset should be size 1" + ) + } } Seq(true, false).foreach { sensitive =>