New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29013][SQL] Structurally equivalent subexpression elimination #25717
Conversation
I observed this problem when running the test in #25642 . Now it is WIP. I would like to see if whether all tests work and listen to the opinions from others. |
This comment has been minimized.
This comment has been minimized.
4be455c
to
c5d443a
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
Ur, it looks a cool idea. I'll check the code tonight. |
This comment has been minimized.
This comment has been minimized.
thanks @viirya. I actually was thinking the same, so I like your proposal. Just one question: why are you limiting this to |
910967b
to
6b44659
Compare
For sub-expression elimination, I observed that many similar functions generated are only different in input slots. So this works for BoundReference. For BoundReference, we can parameterize the ordinals out of generated functions to achieve this feature. I think it is probably very hard to generalize this. |
* structurally equivalent expressions. Non-recursive. | ||
*/ | ||
def addStructExpr(ctx: CodegenContext, expr: Expression): Unit = { | ||
if (expr.deterministic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot always share a function for non-deterministic cases? e.g.,
int subExpr1 = input[0] + random();
int subExpr2 = input[1] + random();
=>
int subExpr1 = subExpr(input[0]);
int subExpr2 = subExpr(input[1]);
int subExpr(int v) { return v + random(); }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-deterministic expressions can't do sub-expression elimination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, this idea is limited to common subexprs? For example, the idea can cover a case like;
select sum(a + b), sum(b + c), sum(c + d), sum(d + e) from values (1, 1, 1, 1, 1) t(a, b, c, d, e)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is probably suitable. Only if we want these functions sum(a+b)...etc. to be called in split functions. Their inputs can be parameterized.
This comment has been minimized.
This comment has been minimized.
* but different slots of input tuple, we replace `BoundReference` with this parameterized | ||
* version. The slot position is parameterized and is given at runtime. | ||
*/ | ||
case class ParameterizedBoundReference(parameter: String, dataType: DataType, nullable: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is only used for codegen, how about moving this to org.apache.spark.sql.catalyst.expressions.codegen
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: parameter
-> variableNameForOrdinal
or paramNameForOrdinal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, ok, I will.
private def parameterizedBoundReferences(ctx: CodegenContext, expr: Expression): Expression = { | ||
expr.transformUp { | ||
case b: BoundReference => | ||
val param = ctx.freshName("boundInput") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: boundInput
-> ordinal
?
|
||
if (!skip && !addExpr(expr)) { | ||
childrenToRecurse.foreach(addExprTree) | ||
if (!skip) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot do it like !skip && !addStructExpr(expr, exprMap)
in the same way with addExprTree
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this also recursively adds children if the parent expr was added. I tested with adding or not in prototyping this. Adding children saves more code text.
case class StructuralExpr(e: Expression) { | ||
def normalized(expr: Expression): Expression = { | ||
expr.transformUp { | ||
case b: ParameterizedBoundReference => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid unnecessary plan copys, we can check this equality based on BoundReference
(by just copying it like b.copy(ordinal = 0 or -1?)
)? IIUC its ok to replace BoundReference
with ParameterizedBoundReference
just when generating code in https://github.com/apache/spark/pull/25717/files#diff-8bcc5aea39c73d4bf38aef6f6951d42cR1117?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't b.copy also copying the input expr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. But, if we write it like this (c1c5052), we don't need to pass CodegenContext
into EquivalentExpressions
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just modified like this way, but it failed HashAggregationQuerySuite in jenkins and local.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I figured why. Made another commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current one looks super good to me. Thanks!
|
||
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
assert(ctx.currentVars == null && ctx.INPUT_ROW != null, | ||
"ParameterizedBoundReference can not be used in whole-stage codegen yet.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any barrier to support the whole-stage codegen case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can be used in whole-stage codegen. As this is applied to sub-expression elimination which is non whole-stage only, I'd also like to reduce the change of code in a single PR.
def addStructExpr(ctx: CodegenContext, expr: Expression): Unit = { | ||
if (expr.deterministic) { | ||
val refs = expr.collect { | ||
case b: BoundReference => b |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case b: BoundReference => Literal(0)
?
// We calculate function parameter length by the number of ints plus `INPUT_ROW` plus | ||
// a int type result array index. | ||
val parameterLength = CodeGenerator.calculateParamLength(refs.map(_ => Literal(0))) + 2 | ||
if (CodeGenerator.isValidParamLength(parameterLength)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the length goes over the limit, the current logic gives up eliminating common exprs? If so, can we fall back into the non-structural mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
...atalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
Show resolved
Hide resolved
6b44659
to
c660561
Compare
This comment has been minimized.
This comment has been minimized.
c660561
to
81bb6ce
Compare
This comment has been minimized.
This comment has been minimized.
81bb6ce
to
13f5ca6
Compare
Test build #110315 has finished for PR 25717 at commit
|
retest this please |
Test build #110320 has finished for PR 25717 at commit
|
retest this please |
Test build #110352 has finished for PR 25717 at commit
|
c7f03a9
to
cc0ee12
Compare
Test build #110379 has finished for PR 25717 at commit
|
Test build #110397 has finished for PR 25717 at commit
|
retest this please |
Test build #110407 has finished for PR 25717 at commit
|
...atalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
Outdated
Show resolved
Hide resolved
Test build #110594 has finished for PR 25717 at commit
|
69b6ba4
to
f447042
Compare
Test build #110598 has finished for PR 25717 at commit
|
@cloud-fan @rednaxelafx @mgaido91 @kiszk Anyone can check this? |
btw, this pr doesn't include end-to-end tests and do you think the queries in the existing tests are enough for the end-to-end tests of this pr? I'm not sure about how much the existing tests include structurally equivalent exprs though... Also, no negative performance impacts on the existing queries, e.g., TPCDS? |
Regarding end-to-end tests, I am not sure what test we need. If there are end-to-end tests of current sub-expresssion elimination, I think I can write tests based on that.
I have not run a TPCDS benchmark with this. Just |
Test build #110602 has finished for PR 25717 at commit
|
I will take a look on Monday or Tuesday. |
/** | ||
* Adds each expression to this data structure, grouping them with existing equivalent | ||
* expressions. Non-recursive. | ||
* Returns true if there was already a matching expression. | ||
*/ | ||
def addExpr(expr: Expression): Boolean = { | ||
def addExpr(expr: Expression, exprMap: EquivalenceMap = this.equivalenceMap): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Do we need = this.equivalenceMap
? It seems that all of the callers pass two arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addExpr is also used at PhysicalAggregation:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
Lines 222 to 229 in 2f3997f
expr.collect { | |
// addExpr() always returns false for non-deterministic expressions and do not add them. | |
case agg: AggregateExpression | |
if !equivalentAggregateExpressions.addExpr(agg) => agg | |
case udf: PythonUDF | |
if PythonUDF.isGroupedAggPandasUDF(udf) && | |
!equivalentAggregateExpressions.addExpr(udf) => udf | |
} |
...atalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
Show resolved
Hide resolved
if (!skip && !addExpr(expr)) { | ||
childrenToRecurse.foreach(addExprTree) | ||
if (!skip && addStructExpr(expr)) { | ||
childrenToRecurse(expr).foreach(addStructuralExprTree) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we want to add (_)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addExprTree doesn't add (_) too. Just followed it. If no special reason, I will leave it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am neutral on this.
I was curious why this line added (_)
, but here does not add.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At that line, if not add a (_) to call addExprTree, will see compilation error:
[error] found : (org.apache.spark.sql.catalyst.expressions.Expression, equivalentExpressions.EquivalenceMap) => Unit
[error] (which expands to) (org.apache.spark.sql.catalyst.expressions.Expression, scala.collection.mutable.HashMap[equivalentExpressions.Expr,scala.collection.mutable.ArrayBuff$r[org.apache.spark.sql.catalyst.expressions.Expression]]) => Unit
[error] required: org.apache.spark.sql.catalyst.expressions.Expression => ?
[error] expressions.foreach(equivalentExpressions.addExprTree)
[error] ^
Because addExprTree actually needs to arguments, it doesn't match foreach's argument type.
Test build #110665 has finished for PR 25717 at commit
|
We're closing this PR because it hasn't been updated in a while. If you'd like to revive this PR, please reopen it! |
What changes were proposed in this pull request?
We do semantically equivalent subexpression elimination in SparkSQL. However, for some expressions that are not semantically equivalent, but structurally equivalent, current subexpression elimination generates too many similar functions. These functions share same computation structure but only differ in input slots of current processing row.
For example, expression a is input[1] + input[2], expression b is input[3] + input[4]. They are not semantically equivalent in SparkSQL, but they have the same computation on different input data.
For such expressions, we can generate just one function, and pass in input slots during runtime.
It can reduce the length of generated code text, and save compilation time.
Why are the changes needed?
For complex query, current sub-expression elimination could generate too many similar functions. It leads long generated code text and increases compilation time.
For example, run the following query:
The longest compilation time observed in this query is 25816.203394ms. After this patch, the same compilation is reduced to 9143.778397ms.
Does this PR introduce any user-facing change?
This doesn't introduce user-facing change.
This feature is controlled by a SQL config
spark.sql.structuralSubexpressionElimination.enabled
.How was this patch tested?
Added tests.