Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22084][SQL] Fix performance regression in aggregation strategy #19301

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.internal.SQLConf
* view resolution, in this way, we are able to get the correct view column ordering and
* omit the extra columns that we don't require);
* 1.2. Else set the child output attributes to `queryOutput`.
* 2. Map the `queryQutput` to view output by index, if the corresponding attributes don't match,
* 2. Map the `queryOutput` to view output by index, if the corresponding attributes don't match,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks all the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q -> O

* try to up cast and alias the attribute in `queryOutput` to the attribute in the view output.
* 3. Add a Project over the child, with the new output generated by the previous steps.
* If the view output doesn't have the same number of columns neither with the child output, nor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.expressions.aggregate

import java.util.Objects

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -72,11 +74,19 @@ object AggregateExpression {
aggregateFunction: AggregateFunction,
mode: AggregateMode,
isDistinct: Boolean): AggregateExpression = {
val state = if (aggregateFunction.resolved) {
Seq(aggregateFunction.toString, aggregateFunction.dataType,
aggregateFunction.nullable, mode, isDistinct)
} else {
Seq(aggregateFunction.toString, mode, isDistinct)
}
val hashCode = state.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose here?


AggregateExpression(
aggregateFunction,
mode,
isDistinct,
NamedExpression.newExprId)
ExprId(hashCode))
Copy link
Contributor

@cloud-fan cloud-fan Sep 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right fix. Semantically the b0 and b1 in SELECT SUM(b) AS b0, SUM(b) AS b1 are different aggregate functions, so they should have different resultId.

It's kind of an optimization in aggregate planner, we should detect these semantically different but duplicated aggregate functions and only plan one of them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agreed with @cloud-fan. This should be an optimization done in aggregate planner, instead of forcibly setting expr id here.

Copy link
Contributor Author

@stanzhai stanzhai Sep 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @viirya I've tried to optimize in aggregate planner (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala#L211).

      // A single aggregate expression might appear multiple times in resultExpressions.
      // In order to avoid evaluating an individual aggregate function multiple times, we'll
      // build a set of the distinct aggregate expressions and build a function which can
      // be used to re-write expressions so that they reference the single copy of the
      // aggregate function which actually gets computed.
      val aggregateExpressions = resultExpressions.flatMap { expr =>
        expr.collect {
          case agg: AggregateExpression =>
            val aggregateFunction = agg.aggregateFunction
            val state = if (aggregateFunction.resolved) {
              Seq(aggregateFunction.toString, aggregateFunction.dataType,
                aggregateFunction.nullable, agg.mode, agg.isDistinct)
            } else {
              Seq(aggregateFunction.toString, agg.mode, agg.isDistinct)
            }
            val hashCode = state.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
            (hashCode, agg)
        }
      }.groupBy(_._1).map { case (_, values) =>
        values.head._2
      }.toSeq

But it's difficult to distinguish between different typed aggregators without expr id. Current solution can works well for all of aggregate functions.

I'm not familiar with typed aggregators, any suggestions will be appreciated.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.expressions
import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.{Dataset, Encoder, TypedColumn}
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete}
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression

Expand Down Expand Up @@ -104,7 +105,8 @@ abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
AggregateExpression(
TypedAggregateExpression(this),
Complete,
isDistinct = false)
isDistinct = false,
NamedExpression.newExprId)

new TypedColumn[IN, OUT](expr, encoderFor[OUT])
}
Expand Down