-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39887][SQL] RemoveRedundantAliases should keep aliases that make the output of projection nodes unique #37334
Conversation
… Union's first child
@@ -559,6 +559,17 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { | |||
}) | |||
Join(newLeft, newRight, joinType, newCondition, hint) | |||
|
|||
case _: Union => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we update the method comment as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed in db6c73f
2eb2d6a
to
db6c73f
Compare
@@ -568,9 +581,6 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { | |||
newChild | |||
} | |||
|
|||
// Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate | |||
// keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this | |||
// case we use the first mapping (which should be provided by the first child). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we remove this comment, shall we add an assert to make sure there is no duplicate keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in 11020ee
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Show resolved
Hide resolved
d5193ce
to
c546757
Compare
c546757
to
11020ee
Compare
child match { | ||
case p: Project => | ||
var passThroughAttributes = | ||
AttributeSet(p.projectList.filter(_.isInstanceOf[Attribute])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this works. The Project
may not be the direct child of Union
. I think we should add a new bool flag to removeRedundantAliases
which indicates that removing alias should not introduce conflicting expr IDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or we should always guarantee it. the plan looks quite weird if a plan node has output columns with conflicting expr IDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this works. The Project may not be the direct child of Union. I think we should add a new bool flag to removeRedundantAliases which indicates that removing alias should not introduce conflicting expr IDs.
I wanted to handle this case with line 585: case _ => removeRedundantAliases(child, excluded ++ child.outputSet)
to keep all attributes if we have intermediate nodes between Union
and Project
. But I see your point, I can further refine this fix...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
608f83a would be the change if we always wanted to guarantee it.
But it looks like an extra exchange is inserted into q70: d212dab:
(24) HashAggregate [codegen id : 5]
Input [2]: [s_state#13, sum#16]
Keys [1]: [s_state#13]
Functions [1]: [sum(UnscaledValue(ss_net_profit#10))]
Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_profit#10))#17]
-Results [3]: [s_state#13, s_state#13, MakeDecimal(sum(UnscaledValue(ss_net_profit#10))#17,17,2) AS _w2#18]
+Results [3]: [s_state#13 AS s_state#18, s_state#13, MakeDecimal(sum(UnscaledValue(ss_net_profit#10))#17,17,2) AS _w2#19]
-(25) Sort [codegen id : 5]
-Input [3]: [s_state#13, s_state#13, _w2#18]
-Arguments: [s_state#13 ASC NULLS FIRST, _w2#18 DESC NULLS LAST], false, 0
+(25) Exchange
+Input [3]: [s_state#18, s_state#13, _w2#19]
+Arguments: hashpartitioning(s_state#13, 5), ENSURE_REQUIREMENTS, [plan_id=3]
-(26) Window
-Input [3]: [s_state#13, s_state#13, _w2#18]
-Arguments: [rank(_w2#18) windowspecdefinition(s_state#13, _w2#18 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#19], [s_state#13], [_w2#18 DESC NULLS LAST]
+(26) Sort [codegen id : 6]
+Input [3]: [s_state#18, s_state#13, _w2#19]
+Arguments: [s_state#13 ASC NULLS FIRST, _w2#19 DESC NULLS LAST], false, 0
and I'm not sure yet why. Tomorrow I can take a deeper look at it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think the issue is in AliasAwareOutputPartitioning
, it doesn't take into account if outputExpressions
contain both an attribute and the attribute's alias...
(25) Sort [codegen id : 5] | ||
Input [3]: [s_state#14, s_state#14, _w2#18] | ||
Arguments: [s_state#14 ASC NULLS FIRST, _w2#18 DESC NULLS LAST], false, 0 | ||
(25) Exchange |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we have an extra exchange now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we have.
I think that's because a HashAggregate
in q70 have an output attribute and also have it aliased as a new attribute (we kept that alias with this PR to avoid the same output attribute appear multiple times).
-Results [3]: [s_state#13, s_state#13, MakeDecimal(sum(UnscaledValue(ss_net_profit#10))#17,17,2) AS _w2#18]
+Results [3]: [s_state#13 AS s_state#18, s_state#13, MakeDecimal(sum(UnscaledValue(ss_net_profit#10))#17,17,2) AS _w2#19]
IMO that shouldn't be a bad thing and the plan looks ok. But the root cause of the issue is in AliasAwareOutputPartitioning
as it generates outputPartitioning
based on the aliased output attribute only and doesn't produce a PartitioningCollection
with both the base attribute and the aliased version.
Because the parent of the aggregate requires a distribution based on the base attribute EnsureRequirements
inserts the extra exchange...
We also have test failures, I'm not sure yet why. Will check tomorrow...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put the WIP flag into the PR description for now and giving a try to an experiment with b3b7d24 to fix AliasAwareOutputPartitioning
and AliasAwareOutputOrdering
to handle cases when AliasAwareOutputExpression.outputExpressions
contains both an attribute and its aliased version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removeRedundantAliases(mapAttributes(a.aggregateExpressions, mapping), excluded), | ||
newChild) | ||
|
||
case w: Window => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to deal with these 3 nodes separately now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removeRedundantAliases()
is dealing with the output defining named expression lists as a whole (Project.projectList
, Aggregate.aggregateExpressions
, ...) to be able to keep those aliases that are required to keep the output attributes unique.
deac8ce
to
a737020
Compare
fddd468
to
155cf31
Compare
@@ -681,7 +681,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit | |||
df.createTempView("df") | |||
|
|||
val sqlText = "EXPLAIN CODEGEN SELECT key, MAX(value) FROM df GROUP BY key" | |||
val expectedCodegenText = "Found 2 WholeStageCodegen subtrees." | |||
val expectedCodegenText = "Found 1 WholeStageCodegen subtrees." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the AliasAwareOutputPartitioning
fix the explain codegen output (plan) changed from:
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:348(0.53% used); numInnerClasses:2) ==
*(1) HashAggregate(keys=[key#xL], functions=[partial_max(value#xL)], output=[key#xL, max#xL])
+- *(1) Project [id#xL AS key#xL, id#xL AS value#xL]
+- *(1) Range (0, 5, step=1, splits=2)
== Subtree 2 / 2 (maxMethodCodeSize:252; maxConstantPoolSize:214(0.33% used); numInnerClasses:0) ==
*(2) HashAggregate(keys=[key#xL], functions=[max(value#xL)], output=[key#xL, max(value)#xL])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(key#xL, 5), ENSURE_REQUIREMENTS, [plan_id=55]
+- *(1) HashAggregate(keys=[key#xL], functions=[partial_max(value#xL)], output=[key#xL, max#xL])
+- *(1) Project [id#xL AS key#xL, id#xL AS value#xL]
+- *(1) Range (0, 5, step=1, splits=2)
to:
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:308; maxConstantPoolSize:374(0.57% used); numInnerClasses:2) ==
*(1) HashAggregate(keys=[key#xL], functions=[max(value#xL)], output=[key#xL, max(value)#xL])
+- *(1) HashAggregate(keys=[key#xL], functions=[partial_max(value#xL)], output=[key#xL, max#xL])
+- *(1) Project [id#xL AS key#xL, id#xL AS value#xL]
+- *(1) Range (0, 5, step=1, splits=2)
@@ -339,12 +339,12 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite { | |||
// ShuffleQueryStage 0 | |||
// ShuffleQueryStage 2 | |||
// ReusedQueryStage 0 | |||
val grouped = df.groupBy("key").agg(max("value").as("value")) | |||
val grouped = df.groupBy((col("key") + 1).as("key")).agg(max("value").as("value")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to modify the test because the AliasAwareOutputPartitioning
fix modified the the explain plan of the original query from:
Union
:- *(5) HashAggregate(keys=[_groupingexpression#79L], functions=[max(value#38L)], output=[(key + 1)#44L, max(value)#45L])
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 3
: +- Exchange hashpartitioning(_groupingexpression#79L, 5), ENSURE_REQUIREMENTS, [plan_id=693]
: +- *(3) HashAggregate(keys=[_groupingexpression#79L], functions=[partial_max(value#38L)], output=[_groupingexpression#79L, max#62L])
: +- *(3) HashAggregate(keys=[key#12L], functions=[max(value#13L)], output=[value#38L, _groupingexpression#79L])
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(key#12L, 5), ENSURE_REQUIREMENTS, [plan_id=623]
: +- *(1) HashAggregate(keys=[key#12L], functions=[partial_max(value#13L)], output=[key#12L, max#64L])
: +- *(1) Project [id#10L AS key#12L, id#10L AS value#13L]
: +- *(1) Range (0, 6, step=1, splits=10)
+- *(6) HashAggregate(keys=[_groupingexpression#80L], functions=[max(value#38L)], output=[(key + 2)#51L, max(value)#52L])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 4
+- Exchange hashpartitioning(_groupingexpression#80L, 5), ENSURE_REQUIREMENTS, [plan_id=719]
+- *(4) HashAggregate(keys=[_groupingexpression#80L], functions=[partial_max(value#38L)], output=[_groupingexpression#80L, max#66L])
+- *(4) HashAggregate(keys=[key#12L], functions=[max(value#13L)], output=[value#38L, _groupingexpression#80L])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 2
+- ReusedExchange [key#12L, max#64L], Exchange hashpartitioning(key#12L, 5), ENSURE_REQUIREMENTS, [plan_id=623]
to (1 less exchange):
Union
:- *(3) HashAggregate(keys=[_groupingexpression#75L], functions=[max(value#38L)], output=[(key + 1)#44L, max(value)#45L])
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(_groupingexpression#75L, 5), ENSURE_REQUIREMENTS, [plan_id=514]
: +- *(1) HashAggregate(keys=[_groupingexpression#75L], functions=[partial_max(value#38L)], output=[_groupingexpression#75L, max#62L])
: +- *(1) HashAggregate(keys=[key#12L], functions=[max(value#13L)], output=[value#38L, _groupingexpression#75L])
: +- *(1) HashAggregate(keys=[key#12L], functions=[partial_max(value#13L)], output=[key#12L, max#64L])
: +- *(1) Project [id#10L AS key#12L, id#10L AS value#13L]
: +- *(1) Range (0, 6, step=1, splits=10)
+- *(4) HashAggregate(keys=[_groupingexpression#76L], functions=[max(value#38L)], output=[(key + 2)#51L, max(value)#52L])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(_groupingexpression#76L, 5), ENSURE_REQUIREMENTS, [plan_id=532]
+- *(2) HashAggregate(keys=[_groupingexpression#76L], functions=[partial_max(value#38L)], output=[_groupingexpression#76L, max#66L])
+- *(2) HashAggregate(keys=[key#12L], functions=[max(value#13L)], output=[value#38L, _groupingexpression#76L])
+- *(2) HashAggregate(keys=[key#12L], functions=[partial_max(value#13L)], output=[key#12L, max#64L])
+- *(2) Project [id#55L AS key#12L, id#55L AS value#13L]
+- *(2) Range (0, 6, step=1, splits=10)
and so the query didn't match the test case 2
description.
case ne: NamedExpression => ne | ||
case other => Alias(other, other.toString)() | ||
} | ||
|
||
val rightGroupingNamedExpressions = r.groupingExprs.map { | ||
case a: Attribute => Alias(a, a.name)() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes are needed to make the output of Project
s in line 615/617 unique. Without these the new assert in removeRedundantAliases()
would fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, does it mean it's legitimate to have conflicting attribute IDs within one operator? This looks totally fine in FlatMapCoGroupsInPandas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know, this was the only place where I had to modify the code to avoid conflicts.
But I was thinking about that instead of the new assert
in the rule, we could fix these conflicting attributes with inserting a Project
. How about that?
@cloud-fan, I've updates this PR that Remaining failures in sparkr/linters tests don't seem related to this change. |
@@ -157,31 +157,31 @@ Input [2]: [s_state#14, sum#16] | |||
Keys [1]: [s_state#14] | |||
Functions [1]: [sum(UnscaledValue(ss_net_profit#10))] | |||
Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_profit#10))#17] | |||
Results [3]: [s_state#14, s_state#14, MakeDecimal(sum(UnscaledValue(ss_net_profit#10))#17,17,2) AS _w2#18] | |||
Results [3]: [s_state#14 AS s_state#18, s_state#14, MakeDecimal(sum(UnscaledValue(ss_net_profit#10))#17,17,2) AS _w2#19] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised that this query plan works fine today (two s_state#14
). Reading the source code a bit more, I think Spark is actually fine with duplicated attribute ids. It assumes columns with the same attr id always output same values, so it can safely bind attributes with the first match. See BindReferences
.
So the problem is still at Union
. It's not fine to introduce duplicated attr ids in the first child of Union, as it will introduce duplicated attr ids in the output of Union, which causes wrong results.
I think eventually we should make Union
use new attr ids to build its output columns. As a bug fix, your first attempt to simply not remove alias in the first child of Union looks the best one: it's simple, and having a bit more unnecessary alias won't impact performance. Sorry for the back and forth!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. No problem, I can revert this PR to the fisrt version next week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted the PR and its description to the first version in 7062ed8
Can we create a new PR for this? |
Sure, I can open a new one for this next week. |
var first = true | ||
plan.mapChildren { child => | ||
if (first) { | ||
first = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some comments to explain why union's first child is special?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added in: db10b5f
cc @sigmod |
@peter-toth do you know which Spark version starts to have this issue? |
This rule is quite old so I just ran a quick check on the latest 2.4 branch and even that is affected. |
The GA failure is unrelated, thanks, merging to master! Since there are conflicts, can you create backport PRs for 3.3? thanks! |
Thanks for the review @cloud-fan! |
What changes were proposed in this pull request?
Keep the output attributes of a
Union
node's first child in theRemoveRedundantAliases
rule to avoid correctness issues.Why are the changes needed?
To fix the result of the following query:
Before this PR the query returns the incorrect result:
After this PR it returns the expected result:
Does this PR introduce any user-facing change?
Yes, fixes a correctness issue.
How was this patch tested?
Added new UTs.