-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21222] Move elimination of Distinct clause from analyzer to optimizer #18429
Conversation
@@ -160,6 +160,8 @@ package object dsl { | |||
def last(e: Expression): Expression = new Last(e).toAggregateExpression() | |||
def min(e: Expression): Expression = Min(e).toAggregateExpression() | |||
def max(e: Expression): Expression = Max(e).toAggregateExpression() | |||
def maxDistinct(e: Expression): Expression = Max(e).toAggregateExpression(isDistinct = true) | |||
def minDistinct(e: Expression): Expression = Min(e).toAggregateExpression(isDistinct = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to line 162
@@ -40,6 +40,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) | |||
protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations) | |||
|
|||
def batches: Seq[Batch] = { | |||
// DISTINCT is not meaningful for a Max or a Min. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove this line.
object EliminateDistinct extends Rule[LogicalPlan] { | ||
override def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions { | ||
case AggregateExpression(af @ Max(_), _, true, _) => AggregateExpression(af, Complete, false) | ||
case AggregateExpression(af @ Min(_), _, true, _) => AggregateExpression(af, Complete, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: -> af: Max
and af: Min
@@ -152,6 +154,16 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) | |||
} | |||
|
|||
/** | |||
* Remove useless DISTINCT for MAX and MIN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also need to emphasize EliminateDistinct
should be before ReplaceDeduplicateWithAggregate
Hi @gatorsmile , |
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LocalRelation, LogicalPlan} | ||
import org.apache.spark.sql.catalyst.rules.RuleExecutor | ||
|
||
class EliminateDistinceSuite extends PlanTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Distinct. not Distince.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo corrected. Thanks!
Test build #78676 has finished for PR 18429 at commit
|
Test build #78680 has finished for PR 18429 at commit
|
Test build #78682 has finished for PR 18429 at commit
|
@@ -40,6 +40,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) | |||
protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations) | |||
|
|||
def batches: Seq[Batch] = { | |||
Batch("Eliminate Distinct", Once, EliminateDistinct) :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, does it have to be executed before the "Finish Analysis" batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move this into the next batch, but it has to be before RewriteDistinctAggregates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RewriteDistinctAggregates is inside the "Finish Analysis" batch, so the new rule has to be placed before it.
*/ | ||
object EliminateDistinct extends Rule[LogicalPlan] { | ||
override def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions { | ||
case AggregateExpression(max: Max, _, true, _) => AggregateExpression(max, Complete, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it safe to always choose the Complete
mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ResolveFunctions
rule already set it to Complete
. @gengliangwang we should just keep the AggregateMode unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, this could be a terrible mistake! I Already corrected, thanks!
@@ -152,6 +153,19 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) | |||
} | |||
|
|||
/** | |||
* Remove useless DISTINCT for MAX and MIN. | |||
* This rule should be applied before ReplaceDeduplicateWithAggregate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"before RewriteDistinctAggregates"?
|
||
test("Eliminate Distinct in Max") { | ||
val query = testRelation | ||
.select(maxDistinct('a) as('result)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please use java style, i.e. maxDistinct('a).as('result)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, I have revised it.
comparePlans(Optimize.execute(query), answer) | ||
} | ||
|
||
test("Eliminate Distinct in Min") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually you can put them in one test:
val query = testRelation
.select(maxDistinct('a).as('max), minDistinct('a).as('min))
.analyze
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, I prefer to make it simple
LGTM except some minor comments |
Test build #78734 has finished for PR 18429 at commit
|
Test build #78737 has finished for PR 18429 at commit
|
Test build #78738 has finished for PR 18429 at commit
|
*/ | ||
object EliminateDistinct extends Rule[LogicalPlan] { | ||
override def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions { | ||
case AggregateExpression(max: Max, mode: AggregateMode, true, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's unclear what the "true" is. I'd either use named argument, or rewrite it to something like
case ae: AggregateExpression if ae.isDistinct =>
ae.aggregateFunction match {
case _: Max | _: Min => ae.copy(isDistinct = false)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. I will remember that and revise the current patch.
Also, how about this:
case ae @ AggregateExpression(_: Max | _: Min, _, isDistinct, _) if isDistinct =>
ae.copy(isDistinct = false)
Is it too long?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your version is more readable, the first param "aggregateFunction" is specified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in general i'm not a big fan of using extractors unless we need almost all arguments ... it makes refactoring a lot more complicated. Extractors are over/abused in Catalyst (there are some code that use extractors when they are simply doing type matching).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, appreciate your help!
Test build #78747 has finished for PR 18429 at commit
|
override def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions { | ||
case ae: AggregateExpression if ae.isDistinct => | ||
ae.aggregateFunction match { | ||
case _: Max | _: Min => ae.copy(isDistinct = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually i made a mistake earlier. you'd need to do a match on other cases and return the ae itself too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My fault...
Test build #78758 has finished for PR 18429 at commit
|
Test build #78762 has finished for PR 18429 at commit
|
retest this please |
LGTM |
1 similar comment
LGTM |
Test build #78811 has finished for PR 18429 at commit
|
thanks, merging to master! |
…timizer ## What changes were proposed in this pull request? Move elimination of Distinct clause from analyzer to optimizer Distinct clause is useless after MAX/MIN clause. For example, "Select MAX(distinct a) FROM src from" is equivalent of "Select MAX(a) FROM src from" However, this optimization is implemented in analyzer. It should be in optimizer. ## How was this patch tested? Unit test gatorsmile cloud-fan Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Wang Gengliang <ltnwgl@gmail.com> Closes apache#18429 from gengliangwang/distinct_opt.
What changes were proposed in this pull request?
Move elimination of Distinct clause from analyzer to optimizer
Distinct clause is useless after MAX/MIN clause. For example,
"Select MAX(distinct a) FROM src from"
is equivalent of
"Select MAX(a) FROM src from"
However, this optimization is implemented in analyzer. It should be in optimizer.
How was this patch tested?
Unit test
@gatorsmile @cloud-fan
Please review http://spark.apache.org/contributing.html before opening a pull request.