-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36797][SQL] Union should resolve nested columns as top-level columns #34038
Conversation
|column types. ${dt1.catalogString} <> ${dt2.catalogString} at the | ||
|${ordinalNumber(ci)} column of the ${ordinalNumber(ti + 1)} table | ||
""".stripMargin.replace("\n", " ").trim()) | ||
if (!isUnion) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we should also generalize to all set operations? Although it looks reasonable, but by their API definition seems we don't have the by-position definition as Union.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are top-level columns handled for other set operations? In general I feel Spark SQL is built around column names by default, not positions, so I would expect it to be by-name. I was surprised to realize recently that union
is by-position.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these set operations work basically the same. But at the API doc, we don't have document it for all set operations except for union. The by-position resolution for union, I think, is to follow SQL. It only requires the columns to union have the same data types in same order, but not column names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to make top-level and nested columns consistent in other set operations as well, which is, do by-position resolution. We can't go with the other direction as that will be a breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I think it makes more sense. I will make other set operations as by-position too at nested column level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW I will make the change for other set operations in another PR (JIRA). It might require more change (doc, test..).
Kubernetes integration test starting |
Kubernetes integration test status failure |
This comment has been minimized.
This comment has been minimized.
Kubernetes integration test starting |
Kubernetes integration test status failure |
This comment has been minimized.
This comment has been minimized.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #143438 has finished for PR 34038 at commit
|
Thank you for pinging me, @viirya . |
|column types. ${dt1.catalogString} <> ${dt2.catalogString} at the | ||
|${ordinalNumber(ci)} column of the ${ordinalNumber(ti + 1)} table | ||
""".stripMargin.replace("\n", " ").trim()) | ||
if (!isUnion) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are top-level columns handled for other set operations? In general I feel Spark SQL is built around column names by default, not positions, so I would expect it to be by-name. I was surprised to realize recently that union
is by-position.
if (!isUnion) { | ||
dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) => | ||
// SPARK-18058: we shall not care about the nullability of columns | ||
if (TypeCoercion.findWiderTypeForTwo(dt1.asNullable, dt2.asNullable).isEmpty) { | ||
failAnalysis( | ||
s""" | ||
|${operator.nodeName} can only be performed on tables with the compatible | ||
|column types. ${dt1.catalogString} <> ${dt2.catalogString} at the | ||
|${ordinalNumber(ci)} column of the ${ordinalNumber(ti + 1)} table | ||
""".stripMargin.replace("\n", " ").trim()) | ||
} | ||
} | ||
} else { | ||
// `TypeCoercion` takes care of type coercion already. If any columns or nested | ||
// columns are not compatible, we detect it here and throw analysis exception. | ||
val typeChecker = (dt1: DataType, dt2: DataType) => { | ||
!TypeCoercion.findWiderTypeForTwo(dt1.asNullable, dt2.asNullable).isEmpty | ||
} | ||
dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) => | ||
if (!DataType.equalsStructurally(dt1, dt2, true, typeChecker)) { | ||
failAnalysis( | ||
s""" | ||
|${operator.nodeName} can only be performed on tables with the compatible | ||
|column types. ${dt1.catalogString} <> ${dt2.catalogString} at the | ||
|${ordinalNumber(ci)} column of the ${ordinalNumber(ti + 1)} table | ||
""".stripMargin.replace("\n", " ").trim()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can simplify like:
val dataTypesAreCompatibleFn = if (isUnion) {
// `TypeCoercion` takes care of type coercion already. If any columns or nested
// columns are not compatible, we detect it here and throw analysis exception.
val typeChecker = (dt1: DataType, dt2: DataType) => {
!TypeCoercion.findWiderTypeForTwo(dt1.asNullable, dt2.asNullable).isEmpty
}
(dt1: DataType, dt2: DataType) => !DataType.equalsStructurally(dt1, dt2, true, typeChecker)
} else {
// SPARK-18058: we shall not care about the nullability of columns
(dt1: DataType, dt2: DataType) => TypeCoercion.findWiderTypeForTwo(dt1.asNullable, dt2.asNullable).isEmpty
}
dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) =>
if (dataTypesAreCompatibleFn(dt1, dt2)) {
failAnalysis(
s"""
|${operator.nodeName} can only be performed on tables with the compatible
|column types. ${dt1.catalogString} <> ${dt2.catalogString} at the
|${ordinalNumber(ci)} column of the ${ordinalNumber(ti + 1)} table
""".stripMargin.replace("\n", " ").trim())
}
}
...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status failure |
This comment has been minimized.
This comment has been minimized.
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #143519 has finished for PR 34038 at commit
|
retest this please |
Test build #143520 has finished for PR 34038 at commit
|
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status failure |
This comment has been minimized.
This comment has been minimized.
Kubernetes integration test starting |
Kubernetes integration test status failure |
// Otherwise, record the result in the queue and find the type for the next column | ||
case Some(widenType) => | ||
castedTypes.enqueue(widenType) | ||
castedTypes.enqueue(Some(widenType)) | ||
getWidestTypes(children, attrIndex + 1, castedTypes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code can be simplified
findWiderCommonType(children.map(_.output(attrIndex).dataType)).map { widenTypeOpt =>
castedTypes.enqueue(widenTypeOpt)
getWidestTypes(children, attrIndex + 1, castedTypes)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
findWiderCommonType
returns Opion[DataType]
. map
can iterate over the DataType
if any, but we still need to enqueue the None
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just simplified to
val widenTypeOpt = findWiderCommonType(children.map(_.output(attrIndex).dataType))
castedTypes.enqueue(widenTypeOpt)
getWidestTypes(children, attrIndex + 1, castedTypes)
org.apache.spark.SparkException | ||
Failed to merge incompatible data types decimal(38,18) and string | ||
org.apache.spark.sql.AnalysisException | ||
Union can only be performed on tables with the compatible column types. string <> decimal(38,18) at the first column of the second table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
string <> decimal(38,18) at the first column of the second table
is it valid English syntax?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha, this comes from CheckAnalysis
's original error message. We can improve it, although there are some more tests relying on the error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the error message. Please let me know if it looks good to you.
Test build #143589 has finished for PR 34038 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #143607 has finished for PR 34038 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #143620 has finished for PR 34038 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #143624 has finished for PR 34038 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #143628 has finished for PR 34038 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
This patch proposes to generalize the resolving-by-position behavior to nested columns for Union.
Why are the changes needed?
Union, by the API definition, resolves columns by position. Currently we only follow this behavior at top-level columns, but not nested columns.
As we are making nested columns as first-class citizen, the nested-column-only limitation and the difference between top-level column and nested column do not make sense. We should also resolve nested columns like top-level columns for Union.
Does this PR introduce any user-facing change?
Yes. After this change, Union also resolves nested columns by position.
How was this patch tested?
Added tests.