Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is there a way to handle comparison for Map<T, Struct>? #155

Closed
asolimando opened this issue Apr 6, 2023 · 5 comments
Closed

Is there a way to handle comparison for Map<T, Struct>? #155

asolimando opened this issue Apr 6, 2023 · 5 comments

Comments

@asolimando
Copy link

asolimando commented Apr 6, 2023

I am dealing with a complex nested field of this form:

|-- my_field: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- f1: integer (nullable = true)
 |    |    |-- [...]
 |    |    |-- fn: struct (nullable = true)
 |    |    |    |-- x: float (nullable = true)
 |    |    |    |-- y: float (nullable = true)

I was wondering if there is a way to pre-process the data or have a way to support a comparison of this kind of data.

I am already flattening the Dataframe as much as possible, but I could not find an easy way of handling map<struct>.

I am aware of .withComparator(DiffComparators.map(), ...) method, but I don't think we can express the struct here unless we know the full structure.

Is there a generic trick to support Map[Any, Any] even by resorting to some dirty trick like a string representation with sorting on lexicographic ordering or similar?

@EnricoMi
Copy link
Contributor

EnricoMi commented Apr 6, 2023

You could turn the map into a string (hoping Spark has some order for the keys):

df.select($"my_field".cast(StringType))

or you could explode the map and flatten the value struct (assuming all map values have the same struct schema, which Spark map might require anyway):

df.select(explode($"my_field")).select($"key", $"value.*")

This will create one row per map key. Adding column key to your diff id columns should provide the best diff experience.

@EnricoMi
Copy link
Contributor

EnricoMi commented Apr 6, 2023

An example for the latter approach:

val left = spark.range(3).withColumn("m", map(lit(1), struct(lit(1).as("a"), lit("one").as("b")).as("s"), lit(2), struct(lit(2).as("a"), lit("two").as("b")).as("s")))
val right = spark.range(3).withColumn("m", when($"id" === 2, map(lit(1), struct(lit(1).as("a"), lit("one").as("b")).as("s"))).otherwise(map(lit(1), struct(lit(1).as("a"), when($"id" === 1, lit("One")).otherwise(lit("one")).as("b")).as("s"), lit(2), struct(lit(2).as("a"), lit("two").as("b")).as("s"))))

left.show(false)
+---+------------------------------+
|id |m                             |
+---+------------------------------+
|0  |{1 -> {1, one}, 2 -> {2, two}}|
|1  |{1 -> {1, one}, 2 -> {2, two}}|
|2  |{1 -> {1, one}, 2 -> {2, two}}|
+---+------------------------------+
right.show(false)
+---+------------------------------+
|id |m                             |
+---+------------------------------+
|0  |{1 -> {1, one}, 2 -> {2, two}}|
|1  |{1 -> {1, One}, 2 -> {2, two}}|
|2  |{1 -> {1, one}}               |
+---+------------------------------+
val flatleft = left.select($"id", explode($"m")).select($"id", $"key", $"value.*")
val flatright = right.select($"id", explode($"m")).select($"id", $"key", $"value.*")

flatleft.show(false)
+---+---+---+---+
|id |key|a  |b  |
+---+---+---+---+
|0  |1  |1  |one|
|0  |2  |2  |two|
|1  |1  |1  |one|
|1  |2  |2  |two|
|2  |1  |1  |one|
|2  |2  |2  |two|
+---+---+---+---+
flatright.show(false)
+---+---+---+---+
|id |key|a  |b  |
+---+---+---+---+
|0  |1  |1  |one|
|0  |2  |2  |two|
|1  |1  |1  |One|
|1  |2  |2  |two|
|2  |1  |1  |one|
+---+---+---+---+

Then you can diff:

import uk.co.gresearch.spark.diff._

flatleft.diff(flatright, "id", "key").show()
+----+---+---+------+-------+------+-------+
|diff| id|key|left_a|right_a|left_b|right_b|
+----+---+---+------+-------+------+-------+
|   N|  0|  1|     1|      1|   one|    one|
|   N|  0|  2|     2|      2|   two|    two|
|   C|  1|  1|     1|      1|   one|    One|
|   N|  1|  2|     2|      2|   two|    two|
|   N|  2|  1|     1|      1|   one|    one|
|   D|  2|  2|     2|   null|   two|   null|
+----+---+---+------+-------+------+-------+

@EnricoMi
Copy link
Contributor

EnricoMi commented Apr 6, 2023

You can flatten your deep struct like this: https://sparkbyexamples.com/spark/spark-flatten-nested-struct-column/

def flattenStructSchema(schema: StructType, prefix: String = null) : Array[Column] = {
    schema.fields.flatMap(f => {
      val columnName = if (prefix == null) f.name else (prefix + "." + f.name)

      f.dataType match {
        case st: StructType => flattenStructSchema(st, columnName)
        case _ => Array(col(columnName).as(columnName.replace(".","_")))
      }
    })
  }

df.select(flattenStructSchema(df.schema):_*)

@asolimando
Copy link
Author

asolimando commented Apr 12, 2023

Thanks a lot @EnricoMi for your reply, I am wrapping your library into a docker image (for internal use) which is ingesting arbitrary datasets, the flattening is surely doable but the explode would require too much introspection to make it generic and robust enough to work with highly nested datasets.

Hoping Spark has a deterministic serialization as String of the map as you suggested, I think that will be the best compromise for my use case, once I will have time to try this out I will report here.

Thanks for the great library, I have been using it for few years now and I like it very much!

@EnricoMi
Copy link
Contributor

You could always write your own serialization UDF.

Thanks for the feedback!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants