New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21043][SQL] Add unionByName in Dataset #18300
Conversation
Test build #78040 has finished for PR 18300 at commit
|
Test build #78041 has finished for PR 18300 at commit
|
Test build #78044 has finished for PR 18300 at commit
|
Test build #78046 has finished for PR 18300 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use sparkSession.sessionState.conf.resolver
to compare the column names. The target of this PR is to build a Project by column name comparison. Could we simplify the implementation by using a for loop with find
+ resolver
?
* followed by a [[distinct]]. | ||
* | ||
* The difference between this function and [[union]] is that this function | ||
* resolves columns by name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: by name
-> by name (not by position)
@@ -1764,6 +1764,68 @@ class Dataset[T] private[sql]( | |||
} | |||
|
|||
/** | |||
* Returns a new Dataset containing union of rows in this Dataset and another Dataset. | |||
* | |||
* To do a SQL-style set union (that does deduplication of elements), use this function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also add a comment This is different from both
UNION ALLand
UNION DISTINCTin SQL.
ok, I'll update soon. |
s"""Cannot resolve column name "${lattr.name}" among """ + | ||
s"""(${rightOutputAttrs.map(_.name).mkString(", ")})""") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile How about this impl.?
Test build #78073 has finished for PR 18300 at commit
|
Test build #78076 has finished for PR 18300 at commit
|
val resolver = sparkSession.sessionState.analyzer.resolver | ||
val rightProjectList = mutable.ArrayBuffer.empty[Attribute] | ||
val rightOutputAttrs = right.output | ||
for (lattr <- left.output) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since left
and right
always have the same number of columns (after L1796 assertAnalyzed()
), we do not need to add ArrayBuffer
if using map
to build the Project
of right. For example,
left.map { later =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, ok.
for (lattr <- left.output) { | ||
// To handle duplicate names, we first compute diff between `rightOutputAttrs` and | ||
// already-found attrs in `rightProjectList`. | ||
rightOutputAttrs.diff(rightProjectList).find { rattr => resolver(lattr.name, rattr.name)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inside the map, we can find the column names by using filter
+ resolver
.
- If the number of found columns is larger than two, throw an error for duplicate names.
- If the number is zero, throw an error.
- If the number is one, return the right-side attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the logic, it seems we cannot catch left column duplication, I think.
How about checking column name duplication first, then build a right project list?
// Check column name duplication in both sides first
val leftOutputAttrs = left.output
val rightOutputAttrs = right.output
val caseSensitiveAnalysis = sparkSession.sessionState.conf.caseSensitiveAnalysis
SchemaUtils.checkColumnNameDuplication(
leftOutputAttrs.map(_.name), "left column names", caseSensitiveAnalysis)
SchemaUtils.checkColumnNameDuplication(
rightOutputAttrs.map(_.name), "right column names", caseSensitiveAnalysis)
// Then, builds a project list for `other` based on `logicalPlan` output names
val resolver = sparkSession.sessionState.analyzer.resolver
val rightProjectList = left.output.map { lattr =>
val foundAttrs = rightOutputAttrs.filter { rattr => resolver(lattr.name, rattr.name) }
assert(foundAttrs.size <= 1)
if (foundAttrs.size == 1) {
foundAttrs.head
} else if (foundAttrs.size == 0) {
throw new AnalysisException(s"""Cannot resolve column name "${lattr.name}" among """ +
s"""(${rightOutputAttrs.map(_.name).mkString(", ")})""")
}
}
(I used SchemaUtils
here implemented in #17758
https://github.com/apache/spark/pull/17758/files#diff-dc9b15e4af298799d788b59d2baf96a9R29)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine about this. If so, we just need to use find
to get the first matched column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, so I'll update after #17758 finished.
*/ | ||
def unionByName(other: Dataset[T]): Dataset[T] = withSetOperator { | ||
// Creates a `Union` node and resolves it first to reorder output attributes in `other` by name | ||
val unionPlan = sparkSession.sessionState.executePlan(Union(logicalPlan, other.logicalPlan)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this always resolvable? If the columns don't have the same data type, the Union
may not be resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, I think we couldn't pass unionPlan.assertAnalyzed()
below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, so we don't plan to support it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think (you already know that) TypeCoercion
in Analyzer
resolves compatible types for that case like: https://github.com/apache/spark/pull/18300/files#diff-5d2ebf4e9ca5a990136b276859769289R122. You suggested other cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I mean the case looks like:
val df1 = Seq((1, "2", 3.4)).toDF("a", "b", "c")
val df2 = Seq((6.7, 4, "5")).toDF("c", "a", "b")
And the result should be Row(1, "2", 3.4) :: Row(4, "5", 6.7)
.
That's what I guess unionByName
should do?
Forcibly widening the types looks a bit weird for me. Because after the union, the schema is different to original datasets.
Or maybe I miss the purpose of this API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, I see. This is a bug, so I'll look into this. Thanks!
This target is just to union by name while keeping the union
semantics.
@viirya How about this?
|
oh, the current one does not work well..., so I need to consider more. |
Test build #78252 has finished for PR 18300 at commit
|
val unionDf = df1.unionByName(df2.unionByName(df3)) | ||
checkAnswer(unionDf, | ||
Row(1, "a", 3.0) :: Row(2, "bc", 1.2) :: Row(3, "def", 1.2) :: Nil | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @maropu .
To be clearer, could you add more test cases requiring type coercions here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, sure.
Test build #78385 has finished for PR 18300 at commit
|
def unionByName(other: Dataset[T]): Dataset[T] = withSetOperator { | ||
// Resolves children first to reorder output attributes in `other` by name | ||
val leftPlan = sparkSession.sessionState.executePlan(logicalPlan) | ||
val rightPlan = sparkSession.sessionState.executePlan(other.logicalPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a Dataset already guarantees its plan is analyzed and passes check? Do we need to resolve the plans again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, it seems we needn't. removed. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logicalPlan
and other.logicalPlan
are already analyzed plans. Looks like you just access analyzed plans below. So we can simply use logicalPlan
and other.logicalPlan
?
// SchemaUtils.checkColumnNameDuplication( | ||
// rightOutputAttrs.map(_.name), | ||
// "in the right attributes", | ||
// sparkSession.sessionState.conf.caseSensitiveAnalysis) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function to check name duplication is discussed in #17758. I'm planning to use the func to check the duplication and then do union-by. See the discussion: #18300 (comment)
Test build #78425 has finished for PR 18300 at commit
|
retest this please. |
Test build #78438 has finished for PR 18300 at commit
|
Retest this please |
Test build #79234 has finished for PR 18300 at commit
|
Test build #79268 has finished for PR 18300 at commit
|
Row(1, "a", 3.0) :: Row(2, "bc", 1.2) :: Row(3, "def", 1.2) :: Nil | ||
) | ||
|
||
// Failure cases |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we split the test case test("union by name")
to multiple ones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
LGTM waiting for #17758 |
Test build #79457 has finished for PR 18300 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM, too.
df1.unionByName(df2) | ||
}.getMessage | ||
assert(errMsg.contains("Found duplicate column(s) in the left attributes:")) | ||
df1 = Seq((1, 1)).toDF("c0", "c1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: indents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
Test build #79482 has finished for PR 18300 at commit
|
Thanks! Merging to master. |
What changes were proposed in this pull request?
This pr added
unionByName
inDataSet
.Here is how to use:
How was this patch tested?
Added tests in
DataFrameSuite
.