Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns #29587

Closed
wants to merge 25 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Aug 31, 2020

What changes were proposed in this pull request?

SPARK-29358 added support for unionByName to work when the two datasets didn't necessarily have the same schema, but it does not work with nested columns like structs. This patch adds the support to work with struct columns.

The behavior before this PR:

scala> val df1 = spark.range(1).selectExpr("id c0", "named_struct('c', id + 1, 'b', id + 2, 'a', id + 3) c1")
scala> val df2 = spark.range(1).selectExpr("id c0", "named_struct('c', id + 1, 'b', id + 2) c1")
scala> df1.unionByName(df2, true).printSchema
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<c:bigint,b:bigint> <> struct<c:bigint,b:bigint,a:bigint> at the second column of the second table;;
'Union false, false
:- Project [id#0L AS c0#2L, named_struct(c, (id#0L + cast(1 as bigint)), b, (id#0L + cast(2 as bigint)), a, (id#0L + cast(3 as bigint))) AS c1#3]
:  +- Range (0, 1, step=1, splits=Some(12))
+- Project [c0#8L, c1#9]
   +- Project [id#6L AS c0#8L, named_struct(c, (id#6L + cast(1 as bigint)), b, (id#6L + cast(2 as bigint))) AS c1#9]
      +- Range (0, 1, step=1, splits=Some(12))

The behavior after this PR:

scala> df1.unionByName(df2, true).printSchema
root
 |-- c0: long (nullable = false)
 |-- c1: struct (nullable = false)
 |    |-- a: long (nullable = true)
 |    |-- b: long (nullable = false)
 |    |-- c: long (nullable = false)
scala> df1.unionByName(df2, true).show()
+---+-------------+
| c0|           c1|
+---+-------------+
|  0|    {3, 2, 1}|
|  0|{ null, 2, 1}|
+---+-------------+

Why are the changes needed?

The allowMissingColumns of unionByName is a feature allowing merging different schema from two datasets when unioning them together. Nested column support makes the feature more general and flexible for usage.

Does this PR introduce any user-facing change?

Yes, after this change users can union two datasets with different schema with different structs.

How was this patch tested?

Unit tests.

@SparkQA
Copy link

SparkQA commented Aug 31, 2020

Test build #128065 has finished for PR 29587 at commit 95e8fd4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Aug 31, 2020

GitHub Actions were passed. I think it should be ready for review. But I think I will still add some comments to code later.

@viirya viirya changed the title [WIP][SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns Aug 31, 2020
@SparkQA
Copy link

SparkQA commented Aug 31, 2020

Test build #128069 has finished for PR 29587 at commit 5db1e0f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

private def addFieldsInto(col: Expression, base: String, fields: Seq[StructField]): Expression = {
var currCol = col
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To remove var here, could we use fields.foldLeft(col) { case (currCol, f) => instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good. rewritten. thanks.

* `col` expression.
*/
private def addFields(col: NamedExpression, target: StructType): Option[Expression] = {
require(col.dataType.isInstanceOf[StructType], "Only support StructType.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert instead?

* Returns a `StructType` that contains missing fields recursively from `source` to `target`.
* Note that this doesn't support looking into array type and map type recursively.
*/
def findMissingFields(source: StructType, target: StructType, resolver: Resolver): StructType = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to define this method in the StructType side instead of a private method in ResolveUnion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this is more general method related to StructType. So putting it in StructType.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, one nit: a return type Option[StructType] for findXXX methods is more natural just like scala collection (e.g., Seq.find)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it sounds good.


/**
* Returns a `StructType` that contains missing fields recursively from `source` to `target`.
* Note that this doesn't support looking into array type and map type recursively.
Copy link
Member

@maropu maropu Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this limitation come?; we don't need to support this case, or supporting it is technically difficult? Ah, I see. Is this an unsupported case, right?
https://github.com/apache/spark/pull/29587/files#diff-4d656d696512d6bcb03a48f7e0af6251R106-R107

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I leverage WithFields to add missing nested fields into structs. WithFields doesn't support array or map types currently.

}.getOrElse(found.get) // Data type doesn't change. We should add fields at other side.
case _ =>
// Same struct type, or
// unsupported: different types, array or map types, or
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Array and map types aren't supported by WithFields. I think it is still possible to add them to WithFields. Once WithFields supports these types, we can add them here too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok.

@@ -103,4 +104,30 @@ class StructTypeSuite extends SparkFunSuite {
val interval = "`a` INTERVAL"
assert(fromDDL(interval).toDDL === interval)
}

test("find missing (nested) fields") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you test the behaviours of unsupported cases (array and map), too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, sure.

@@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: Expression, keyValueDelim: E
case class WithFields(
structExpr: Expression,
names: Seq[String],
valExprs: Seq[Expression]) extends Unevaluable {
valExprs: Seq[Expression],
sortColumns: Boolean = false) extends Unevaluable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about defining toString for not displaying this value in explain output?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: sortColumns -> sortOutputColumns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed sortColumns to sortOutputColumns. I'm not sure we want to hide sortColumns?

Copy link
Member

@maropu maropu Sep 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this param is not related to the withField operation, so I left the comment. But, either is okay (Just a suggestion).

scala> val df = sql("SELECT named_struct('a', 1, 'b', 2) struct_col")
scala> df.select($"struct_col".withField("c", lit(3))).explain(true)
== Analyzed Logical Plan ==
with_fields(struct_col, 3): struct<a:int,b:int,c:int>
Project [with_fields(struct_col#0, c, 3, false) AS with_fields(struct_col, 3)#4]
                                         ^^^^^
+- Project [named_struct(a, 1, b, 2) AS struct_col#0]
   +- OneRowRelation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not certain if we want to show it or not. Let's keep it as is and see what others think.

@SparkQA
Copy link

SparkQA commented Sep 1, 2020

Test build #128125 has finished for PR 29587 at commit 8bec8a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 2, 2020

@SparkQA
Copy link

SparkQA commented Sep 2, 2020

Test build #128185 has finished for PR 29587 at commit 2515d78.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 2, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Sep 3, 2020

Test build #128211 has finished for PR 29587 at commit 2515d78.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

/**
* Adds missing fields recursively into given `col` expression. The missing fields are given
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary spaces found at the beginning of the sentences

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

* Returns a `StructType` that contains missing fields recursively from `source` to `target`.
* Note that this doesn't support looking into array type and map type recursively.
*/
def findMissingFields(source: StructType, target: StructType, resolver: Resolver): StructType = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, one nit: a return type Option[StructType] for findXXX methods is more natural just like scala collection (e.g., Seq.find)?

Project(leftOutputAttrs ++ missingAttrs, left)
// Add missing (nested) fields to left plan.
val (leftProjectList, _) = compareAndAddFields(rightChild, left, allowMissingCol)
if (leftProjectList.map(_.toAttribute) != left.output) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

      if (leftProjectList.length != left.output.length ||
          leftProjectList.map(_.toAttribute) != left.output) {

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't leftProjectList.map(_.toAttribute) != left.output already cover leftProjectList.length != left.output.length?

@SparkQA
Copy link

SparkQA commented Sep 3, 2020

Test build #128233 has finished for PR 29587 at commit 4398e77.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 3, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Sep 3, 2020

Test build #128262 has finished for PR 29587 at commit 4398e77.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 8, 2020

@maropu Any more comments? cc @cloud-fan @HyukjinKwon

// Builds a project list for `right` based on `left` output names
val (rightProjectList, aliased) = compareAndAddFields(left, right, allowMissingCol)


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this unnecessary blank line.

val foundDt = found.get.dataType
(foundDt, lattr.dataType) match {
case (source: StructType, target: StructType)
if allowMissingCol && !source.sameType(target) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the logic simpler, could we filter out all the unsupported case (e.g., nested struct in array) here? This is it like this;

          case (source: StructType, target: StructType)
              if allowMissingCol && canMergeSchemas(source, target) =>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure where we can simplify the logic? By adding canMergeSchemas, doesn't it look more complicated?

Copy link
Member

@maropu maropu Sep 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the comment and I thought first that all the unsupported cases are handled in the line 108-112. But, it also means unsupported cases if addFields returning None? This might be a issue that can be fixed just by improving comments though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I will add more comments explaining this.

val missing1 = StructType.fromDDL(
"c2 STRUCT<c3: INT, c4: STRUCT<c5: INT, c6: INT>>")
assert(StructType.findMissingFields(source1, schema, resolver)
.map(_.sameType(missing1)).getOrElse(false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we use .exists() instead?

test("find missing (nested) fields") {
val schema = StructType.fromDDL(
"c1 INT, c2 STRUCT<c3: INT, c4: STRUCT<c5: INT, c6: INT>>")
val resolver = SQLConf.get.resolver
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some tests for case-sensitivity?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, added.

@@ -536,4 +536,71 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession {
assert(union2.schema.fieldNames === Array("a", "B", "C", "c"))
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some tests for case-sensitivity here, too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@@ -536,4 +536,71 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession {
assert(union2.schema.fieldNames === Array("a", "B", "C", "c"))
}
}

test("SPARK-32376: Make unionByName null-filling behavior work with struct columns - 1") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make the test title clearer? e.g.,
- 1 -> simple tests
- 2 -> nested cases

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.


/**
* Resolves different children of Union to a common set of columns.
*/
object ResolveUnion extends Rule[LogicalPlan] {
private def unionTwoSides(
/**
* This method sorts recursively columns in a struct expression based on column names.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorts recursively columns -> sorts columns recursively


def simplifyWithFields(expr: Expression): Expression = {
expr.transformUp {
case UpdateFields(UpdateFields(struct, fieldOps1), fieldOps2) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it important to have this optimization inside this analyzer rule?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. Without optimizing the expressions, we cannot scale up well for deeply nested schema, e.g. the added test SPARK-32376: Make unionByName null-filling behavior work with struct columns - deep expr. in DataFrameSetOperationsSuite.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I plan to move this optimization out of ResolveUnion into a separate rule in analyzer in #29812. For complex deeply nested schema, it is easier to write inefficient expression tree that is very slow in analysis phase. For the test case in this PR, it is unable to evaluate the query at all, but after adding this optimization, it can normally evaluate.

// We need to sort columns in result, because we might add another column in other side.
// E.g., we want to union two structs "a int, b long" and "a int, c string".
// If we don't sort, we will have "a int, b long, c string" and
// "a int, c string, b long", which are not compatible.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this behavior consistent with top-level columns?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is related to the comment: #29587 (comment)

assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")

val resolver = SQLConf.get.resolver
val missingFields =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is a bit misleading and I though it's a Seq. How about missingFieldsOpt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good. Fixed.

addFieldsInto(ExtractValue(currCol, Literal(field.name), resolver), st.fields))
}
case dt =>
UpdateFields(currCol, field.name, Literal(null, dt))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if byName is true but allowMissingCol is false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If allowMissingCol is false, we don't compare and add top-level/nested columns. If two sides have inconsistent schema, the union doesn't pass analysis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The top-level columns support byName and allowMissingCol individually, shall we do it for nested columns as well? Or we plan to do it in followup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. byName support actually means we need to adjust columns between two sides to have a consistent schema. It could be top-level or nested column cases.

So it is actually the same issue as #29587 (comment), a.k.a adjusting the nested columns to have a more natural schema. As replied in the discussion, I plan to do it in followup.

@@ -2740,6 +2740,19 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val UNION_BYNAME_STRUCT_SUPPORT_ENABLED =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this worths a config. It's natural that the byName and allowMissingCol flag should apply to nested column, and these 2 flags are newly added in the master branch so there is no backward compatibility issues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sounds more correct. I will remove this config.

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34376/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34376/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Test build #129770 has finished for PR 29587 at commit 9e73928.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Oct 16, 2020

I will wait for a while for further comments. If no more comments from others, I will merge this and work on the followup. Thanks!

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34509/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34509/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34515/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34515/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34520/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34520/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Test build #129903 has finished for PR 29587 at commit c07e30f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya closed this in e574fcd Oct 16, 2020
@viirya
Copy link
Member Author

viirya commented Oct 16, 2020

Thanks for reviewing. Merged to master.

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Test build #129909 has finished for PR 29587 at commit 2ca1379.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 17, 2020

Test build #129914 has finished for PR 29587 at commit 3d907d0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya deleted the SPARK-32376 branch December 27, 2023 18:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
7 participants