Skip to content

Commit

Permalink
Adapting the bind reference of agg that contains subquery in agg expr…
Browse files Browse the repository at this point in the history
…essions
  • Loading branch information
liujiayi771 committed Feb 18, 2024
1 parent 4048629 commit 0872183
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,14 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu
getExecutedPlan(df).count(plan => plan.isInstanceOf[HashAggregateExecTransformer]) >= 2)
}
}

test("partial agg pull out pre-project for agg function") {
runQueryAndCompare("""
|select sum(if(c > (select sum(a) from values (1), (-1) AS tab(a)), 1, -1))
|from values (5), (-10), (15) AS tab(c);
|""".stripMargin)(
df => assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecTransformer]) == 2))
}
}

class VeloxAggregateFunctionsDefaultSuite extends VeloxAggregateFunctionsSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,22 @@ abstract class HashAggregateExecBaseTransformer(
case PartialMerge | Final =>
aggregateFunc.inputAggBufferAttributes.toList.map(
attr => {
val sameNameAttr = originalInputAttributes.find(_.name == attr.name)
val rewriteAttr =
if (sameNameAttr.isDefined && attr.exprId != sameNameAttr.get.exprId) {
// When aggregateExpressions includes subquery, Spark's PlanAdaptiveSubqueries
// Rule will transform the subquery within the final agg. The aggregateFunction
// in the aggregateExpressions of the final aggregation will be cloned,
// resulting in creating new aggregateFunction object. The
// inputAggBufferAttributes will also generate new AttributeReference instances
// with larger exprId, which leads to a failure in binding with the output of
// the partial agg. We need to adapt to this situation; when encountering a
// failure to bind, it is necessary to allow the binding of
// inputAggBufferAttribute with the same name but different exprId.
sameNameAttr.get
} else attr
ExpressionConverter
.replaceWithExpressionTransformer(attr, originalInputAttributes)
.replaceWithExpressionTransformer(rewriteAttr, originalInputAttributes)
.doTransform(args)
})
case other =>
Expand Down

0 comments on commit 0872183

Please sign in to comment.