Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19851][SQL] Add support for EVERY and ANY (SOME) aggregates #22809

Closed

Conversation

dilipbiswal
Copy link
Contributor

What changes were proposed in this pull request?

Implements Every, Some, Any aggregates in SQL. These new aggregate expressions are analyzed in normal way and rewritten to equivalent existing aggregate expressions in the optimizer.

Every(x) => Min(x) where x is boolean.
Some(x) => Max(x) where x is boolean.

Any is a synonym for Some.
SQL

explain extended select every(v) from test_agg group by k;

Plan :

== Parsed Logical Plan ==
'Aggregate ['k], [unresolvedalias('every('v), None)]
+- 'UnresolvedRelation `test_agg`

== Analyzed Logical Plan ==
every(v): boolean
Aggregate [k#0], [every(v#1) AS every(v)#5]
+- SubqueryAlias `test_agg`
   +- Project [k#0, v#1]
      +- SubqueryAlias `test_agg`
         +- LocalRelation [k#0, v#1]

== Optimized Logical Plan ==
Aggregate [k#0], [min(v#1) AS every(v)#5]
+- LocalRelation [k#0, v#1]

== Physical Plan ==
*(2) HashAggregate(keys=[k#0], functions=[min(v#1)], output=[every(v)#5])
+- Exchange hashpartitioning(k#0, 200)
   +- *(1) HashAggregate(keys=[k#0], functions=[partial_min(v#1)], output=[k#0, min#7])
      +- LocalTableScan [k#0, v#1]
Time taken: 0.512 seconds, Fetched 1 row(s)

How was this patch tested?

Added tests in SQLQueryTestSuite, DataframeAggregateSuite

@SparkQA
Copy link

SparkQA commented Oct 23, 2018

Test build #97930 has finished for PR 22809 at commit b793d06.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait UnevaluableAggrgate extends DeclarativeAggregate
  • abstract class AnyAggBase(arg: Expression)
  • case class AnyAgg(arg: Expression) extends AnyAggBase(arg)
  • case class SomeAgg(arg: Expression) extends AnyAggBase(arg)
  • case class EveryAgg(arg: Expression)

@dilipbiswal
Copy link
Contributor Author

cc @cloud-fan @gatorsmile

@@ -38,6 +39,18 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
}
}

/**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan We could also add these transformations in ReplaceExpressions rule and not require a dedicated rule (fyi).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah this sounds better!

@cloud-fan
Copy link
Contributor

Can we use these functions in window with this approach?

@dilipbiswal
Copy link
Contributor Author

@cloud-fan Yeah.. I have some tests in group-by.sql. Please take a look.

SELECT every("true");

-- every/some/any aggregates are not supported as windows expression.
SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg;
Copy link
Contributor Author

@dilipbiswal dilipbiswal Oct 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan here are a few window tests. (fyi)

@SparkQA
Copy link

SparkQA commented Oct 24, 2018

Test build #97986 has finished for PR 22809 at commit 9e194b5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -727,4 +728,67 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
"grouping expressions: [current_date(None)], value: [key: int, value: string], " +
"type: GroupBy]"))
}

def getEveryAggColumn(columnName: String): Column = {
Column(new EveryAgg(Column(columnName).expr).toAggregateExpression(false))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't have APIs for them in functions, it's not likely users will use then with DataFrame. Thus I think we don't need these tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Ok, let me remove these.

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Oct 25, 2018

Test build #97999 has finished for PR 22809 at commit 6abf844.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

expression[EveryAgg]("every"),
expression[AnyAgg]("any"),
expression[SomeAgg]("some"),

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded newline

}

@ExpressionDescription(
usage = "_FUNC_(expr) - Returns true if at least one value of `expr` is true.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add since = "3.0.0"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add. One observation :

I see that, only in the API's we specify the @SInCE .. for example for aggregate Max, we have it in api function.scala:Max .. These aggregates are not exposed in the dataset apis and none of the other aggregates seem to have it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the point is that it was not available until 2.3, so earlier methods don't have it. Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 Yeah... If we look at the definition of other aggregate function like Max, Min etc, they don't seem to have the @Since. However, they are defined in the for functions.scala for max and min as @Since 1.3. So basically i was not sure on the what is the rule when a function is not exposed in dataset api but only from SQL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to be clear, here I am not talking about @Since, I am talking about since as parameter of @ExpressionDescription

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we need Since here. Some functions don't have them because at that time the Since method was not there. We should add missing Since to them as well, if other people have time to do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 @cloud-fan Thanks .. will add.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(btw, let's add since at ExpressionDescription wherever possible .. )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Sure.. I will open a pr shortly.

}

@ExpressionDescription(
usage = "_FUNC_(expr) - Returns true if at least one value of `expr` is true.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -57,3 +57,27 @@ case class Min(child: Expression) extends DeclarativeAggregate {

override lazy val evaluateExpression: AttributeReference = min
}

@ExpressionDescription(
usage = "_FUNC_(expr) - Returns true if all values of `expr` are true.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

* with other databases. For example, we use this to support every, any/some aggregates by rewriting
* them with Min and Max respectively.
*/
trait UnevaluableAggrgate extends DeclarativeAggregate {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: UnevaluableAggrgate -> UnevaluableAggregate

@@ -57,3 +57,34 @@ case class Max(child: Expression) extends DeclarativeAggregate {

override lazy val evaluateExpression: AttributeReference = max
}

abstract class AnyAggBase(arg: Expression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change this to something like UnevaluableBooleanAggBase and make also EveryAgg extend this, in order to avoid code duplication?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Oct 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 I had a confusion on where to house this class ? Thats why i kept it separate :-) . Is it okay if i just rename it and keep it in Max.scala ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can move it close to UnevaluableAggrgate. @cloud-fan @dilipbiswal WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's hard to decide where to put it, I think putting it in a new file can be considered.

* be evaluated. This is mainly used to provide compatibility with other databases.
* For example, we use this to support "nvl" by replacing it with "coalesce".
* Finds all the expressions that are unevaluable and replace/rewrite them with semantically
* equivalent expressions that can be evaluated. Currently we replace two kinds of expressions :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space before :

}
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneded change

@@ -144,6 +144,8 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite {
assertSuccess(Sum('stringField))
assertSuccess(Average('stringField))
assertSuccess(Min('arrayField))
assertSuccess(new EveryAgg('booleanField))
assertSuccess(new AnyAgg('booleanField))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add also SomeAgg?

@SparkQA
Copy link

SparkQA commented Oct 25, 2018

Test build #98033 has finished for PR 22809 at commit 08999f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait UnevaluableAggregate extends DeclarativeAggregate
  • abstract class UnevaluableBooleanAggBase(arg: Expression)
  • case class EveryAgg(arg: Expression) extends UnevaluableBooleanAggBase(arg)
  • case class AnyAgg(arg: Expression) extends UnevaluableBooleanAggBase(arg)
  • case class SomeAgg(arg: Expression) extends UnevaluableBooleanAggBase(arg)

@SparkQA
Copy link

SparkQA commented Oct 25, 2018

Test build #98035 has finished for PR 22809 at commit 2bc9965.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor Author

@cloud-fan @mgaido91 I have incorporated the comments. Could we please check if things look okay now ?

*/
trait UnevaluableAggregate extends DeclarativeAggregate {

override def nullable: Boolean = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we set them always as nullable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 most of the aggregates are nullable, no ? Did you have an suggestion here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be nullable only if the incoming expression is?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Oct 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 I think for aggregates, its different ? Please see Max, Min, they all define it to be nullable. I think they work on group of rows and can return null on empty input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right, I was missing that case, sorry, thanks.

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

abstract class UnevaluableBooleanAggBase(arg: Expression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we reuse RuntimeReplaceable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 RuntimeReplaceble works for scalar expressions but not for aggregate expressions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it doesn't work? Sorry if the question is dumb, but I can't see the problem in using it here, probably I am missing something.

Copy link
Contributor Author

@dilipbiswal dilipbiswal Oct 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 Actually i tried a few different ideas. They are in the comment of original PR link. I had prepared two branches with two approaches .. [comment] (#22047 (comment)). Could you please take a look ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see you mentioned that there were issues using RuntimeReplaceble. Can we have a similar approach to that though? I mean introducing a new method here which returns the replaced value, so that in ReplaceExpressions we can simply match on UnevaluableAggregate and include the logic of replacement in the single items?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 I tried it here. I had trouble getting it to work for window expressions. Thats why @cloud-fan suggested to try the current approach in this comment

Copy link
Contributor Author

@dilipbiswal dilipbiswal Oct 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 I re-read your comments again and here is what i think you are implying :

  1. In UnevaluableAggregate, define a method say replacedExpression
  2. The sub-class overrides it and return an actual rewritten expression
  3. In optimized, match UnevaluableAggregate and call replacedExpression

Did i understand it correctly ? If so, actually, i wouldn't prefer that way. The reason is, the replacedExpression is hidden from analyzer and so it may not be safe. The way ReplaceExpression framework is nicely designed, the analyzer resolves the rewritten expression normally (as its the child expression). Thats the reason, i have opted for specific/targeted rewrites.

If Wenchen and you think otherwise, then i can change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but in the analyzer you analyze the children of the current expression right? So we would just have something like def replacedExpression = Min(arg), which means doing exactly the same which is done now, the only difference is where the conversion logic is put. Adn IMHO having all the conversion logic for all the expression in ReplaceExpressions is harder to maintain than having all the logic related to the expression contained in it.

Anyway, since you have a different opinion, let's see what @cloud-fan thinks on this. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave a TODO saying that we should create a framework to replace aggregate functions, but I think the current patch is good enough for these 3 functions, and I'm not aware of more functions like them that we need to deal with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @mgaido91 Thank you. I have added a TODO for now.

@mgaido91
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Oct 28, 2018

Test build #98139 has finished for PR 22809 at commit 07205de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in e545811 Oct 28, 2018
@dilipbiswal
Copy link
Contributor Author

Thanks a lot @cloud-fan @mgaido91

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

Implements Every, Some, Any aggregates in SQL. These new aggregate expressions are analyzed in normal way and rewritten to equivalent existing aggregate expressions in the optimizer.

Every(x) => Min(x)  where x is boolean.
Some(x) => Max(x) where x is boolean.

Any is a synonym for Some.
SQL
```
explain extended select every(v) from test_agg group by k;
```
Plan :
```
== Parsed Logical Plan ==
'Aggregate ['k], [unresolvedalias('every('v), None)]
+- 'UnresolvedRelation `test_agg`

== Analyzed Logical Plan ==
every(v): boolean
Aggregate [k#0], [every(v#1) AS every(v)apache#5]
+- SubqueryAlias `test_agg`
   +- Project [k#0, v#1]
      +- SubqueryAlias `test_agg`
         +- LocalRelation [k#0, v#1]

== Optimized Logical Plan ==
Aggregate [k#0], [min(v#1) AS every(v)apache#5]
+- LocalRelation [k#0, v#1]

== Physical Plan ==
*(2) HashAggregate(keys=[k#0], functions=[min(v#1)], output=[every(v)apache#5])
+- Exchange hashpartitioning(k#0, 200)
   +- *(1) HashAggregate(keys=[k#0], functions=[partial_min(v#1)], output=[k#0, min#7])
      +- LocalTableScan [k#0, v#1]
Time taken: 0.512 seconds, Fetched 1 row(s)
```

## How was this patch tested?
Added tests in SQLQueryTestSuite, DataframeAggregateSuite

Closes apache#22809 from dilipbiswal/SPARK-19851-specific-rewrite.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants