Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data Synchronization/Matching] Delegate to Spark for checking existence of columns in the given dataframes #515

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ object DataSynchronization extends ComparisonBase {
assertion: Double => Boolean): ComparisonResult = {
val columnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap)
if (columnErrors.isEmpty) {
// Get all the non-key columns from DS1 and verify that they are present in DS2
val colsDS1 = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted
val colsDS2 = ds2.columns.filterNot(x => colKeyMap.values.toSeq.contains(x)).sorted
val nonKeyColsMatch = colsDS1.forall { col => Try { ds2(col) }.isSuccess }

if (!(colsDS1 sameElements colsDS2)) {
if (!nonKeyColsMatch) {
ComparisonFailed("Non key columns in the given data frames do not match.")
} else {
val mergedMaps = colKeyMap ++ colsDS1.map(x => x -> x).toMap
Expand Down Expand Up @@ -152,9 +153,11 @@ object DataSynchronization extends ComparisonBase {
case compCols => Right(compCols)
}
} else {
// Get all the non-key columns from DS1 and verify that they are present in DS2
val ds1NonKeyCols = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted
val ds2NonKeyCols = ds2.columns.filterNot(x => colKeyMap.values.toSeq.contains(x)).sorted
if (!(ds1NonKeyCols sameElements ds2NonKeyCols)) {
val nonKeyColsMatch = ds1NonKeyCols.forall { col => Try { ds2(col) }.isSuccess }

if (!nonKeyColsMatch) {
Left(ComparisonFailed("Non key columns in the given data frames do not match."))
} else {
Right(ds1NonKeyCols.map { c => c -> c}.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,4 +596,95 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec {
assert(expected == rowLevelResults)
}
}

"Data Synchronization Schema Test for non key columns" should {
def primaryDataset(spark: SparkSession, idColumnName: String): DataFrame = {
import spark.implicits._
spark.sparkContext.parallelize(
Seq(
(1, "John", "NY"),
(2, "Javier", "WI"),
(3, "Helena", "TX"),
(4, "Helena", "TX"),
(5, "Nick", "FL"),
(6, "Molly", "TX")
)
).toDF(idColumnName, "name", "state") // all lower case
}

def referenceDataset(spark: SparkSession, idColumnName: String): DataFrame = {
import spark.implicits._
spark.sparkContext.parallelize(
Seq(
(1, "John", "NY"),
(2, "Javier", "WI"),
(3, "Helena", "TX"),
(4, "Helena", "TX"),
(5, "Nicholas", "FL"),
(6, "Ms Molly", "TX")
)
).toDF(idColumnName, "Name", "State") // upper case except for id
}

"works when key column names have different casings" in withSparkSession { spark =>
val id1ColumnName = "id"
val id2ColumnName = "ID"
val ds1 = primaryDataset(spark, id1ColumnName)
val ds2 = referenceDataset(spark, id2ColumnName)

// Not using id1ColumnName -> id2ColumnName intentionally.
// In Glue DQ, we accept the column names in two formats: mapping and non-mapping
// Mapping format is "col1 -> col2", when customer wants to compare columns with different names.
// Non-mapping format is just "col1", when customer has same column, regardless of case, in both datasets.
// A non-mapping format would translate it into the map below.
// We want to test that the functionality works as expected in that case.
val colKeyMap = Map(id1ColumnName -> id1ColumnName)

// Overall
val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match
val overallResult = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion)
assert(overallResult.isInstanceOf[ComparisonSucceeded])

// Row Level
val outcomeColName = "outcome"
val rowLevelResult = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, None, Some(outcomeColName))

assert(rowLevelResult.isRight)

val dfResult = rowLevelResult.right.get
val rowLevelResults = dfResult.collect().map { row =>
row.getAs[Int]("id") -> row.getAs[Boolean](outcomeColName)
}.toMap

val expected = Map(1 -> true, 2 -> true, 3 -> true, 4 -> true, 5 -> false, 6 -> false)
assert(expected == rowLevelResults)
}

"works when non-key column names have different casings" in withSparkSession { spark =>
val idColumnName = "id"
val ds1 = primaryDataset(spark, idColumnName)
val ds2 = referenceDataset(spark, idColumnName)

val colKeyMap = Map(idColumnName -> idColumnName)

// Overall
val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match
val overallResult = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion)
assert(overallResult.isInstanceOf[ComparisonSucceeded])

// Row Level
val outcomeColName = "outcome"
val rowLevelResult = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, None, Some(outcomeColName))

assert(rowLevelResult.isRight)

val dfResult = rowLevelResult.right.get
val rowLevelResults = dfResult.collect().map { row =>
row.getAs[Int]("id") -> row.getAs[Boolean](outcomeColName)
}.toMap

val expected = Map(1 -> true, 2 -> true, 3 -> true, 4 -> true, 5 -> false, 6 -> false)
assert(expected == rowLevelResults)
}
}
}