Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37914][SQL] Make RuntimeReplaceable works for AggregateFunction #35213

Closed
wants to merge 7 commits into from

Conversation

beliefer
Copy link
Contributor

@beliefer beliefer commented Jan 15, 2022

What changes were proposed in this pull request?

Currently, Spark provides RuntimeReplaceable to replace function with another function. The last function must be exists in build-in functions, so RuntimeReplaceable make Spark could reuse the implement of build-in function.
But RuntimeReplaceable not works for aggregate function.

Why are the changes needed?

Make RuntimeReplaceable works for AggregateFunction

Does this PR introduce any user-facing change?

'No'.
This change is for spark developers.

How was this patch tested?

Exists tests.

@github-actions github-actions bot added the SQL label Jan 15, 2022
@beliefer
Copy link
Contributor Author

ping @cloud-fan

@cloud-fan
Copy link
Contributor

Does this PR introduce any user-facing change?

This is for end-users, not spark developers. It's not a user-facing change because end-users who only use DataFrame/SQL APIs can't notice this change.

@@ -366,6 +366,8 @@ trait RuntimeReplaceable extends UnaryExpression with Unevaluable {
// are semantically equal.
override lazy val preCanonicalized: Expression = child.preCanonicalized

def isAggregate: Boolean = child.isInstanceOf[AggregateFunction]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More thoughts about it: technically a RuntimeReplaceable can combine one or more expressions, even for something like f(x) = max(x) - min(x).

For aggregate functions, it's very hard to reason about f(distinct x) FILTER WHERE ... if f(x) combines many expressions, so I think it makes sense to only allow a direct mapping here.

We can make this assumption more explicit here

def isAggregate: Boolean = {
  if (child.isInstanceOf[AggregateFunction]) {
    true
  } else {
    assert(child.find(_.isInstanceOf[AggregateFunction]).isEmpty)
    false
  }
}

@@ -46,12 +46,11 @@ import org.apache.spark.util.Utils
*/
object ReplaceExpressions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressionsWithPruning(
_.containsAnyPattern(RUNTIME_REPLACEABLE, COUNT_IF, BOOL_AGG, REGR_COUNT)) {
_.containsAnyPattern(RUNTIME_REPLACEABLE, COUNT_IF, BOOL_AGG)) {
case e: RuntimeReplaceable => e.child
case CountIf(predicate) => Count(new NullIf(predicate, Literal.FalseLiteral))
case BoolOr(arg) => Max(arg)
case BoolAnd(arg) => Min(arg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the above 3 as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

| org.apache.spark.sql.catalyst.expressions.aggregate.BoolOr | any | SELECT any(col) FROM VALUES (true), (false), (false) AS tab(col) | struct<any(col):boolean> |
| org.apache.spark.sql.catalyst.expressions.aggregate.BoolOr | bool_or | SELECT bool_or(col) FROM VALUES (true), (false), (false) AS tab(col) | struct<bool_or(col):boolean> |
| org.apache.spark.sql.catalyst.expressions.aggregate.BoolOr | some | SELECT some(col) FROM VALUES (true), (false), (false) AS tab(col) | struct<some(col):boolean> |
| org.apache.spark.sql.catalyst.expressions.aggregate.BoolAnd | bool_and | SELECT bool_and(col) FROM VALUES (true), (true), (true) AS tab(col) | struct<min(col):boolean> |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know how we introduce this change?

-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 'every('true')' due to data type mismatch: Input to function 'every' should have been boolean, but it's [string].; line 1 pos 11
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the behavior change is a bit scarying. Can you explain it?

@beliefer
Copy link
Contributor Author

beliefer commented Jan 18, 2022

It seems the output schema is difficult to be consistent with the previous one.
#35241 is a better way.

@beliefer
Copy link
Contributor Author

#35534 merged

@beliefer beliefer closed this Feb 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants