Skip to content

Commit

Permalink
[SPARK-12727][SQL] support SQL generation for aggregate with multi-di…
Browse files Browse the repository at this point in the history
…stinct

## What changes were proposed in this pull request?

This PR add SQL generation support for aggregate with multi-distinct, by simply moving the `DistinctAggregationRewriter` rule to optimizer.

More discussions are needed as this breaks an import contract: analyzed plan should be able to run without optimization.  However, the `ComputeCurrentTime` rule has kind of broken it already, and I think maybe we should add a new phase for this kind of rules, because strictly speaking they don't belong to analysis and is coupled with the physical plan implementation.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #11579 from cloud-fan/distinct.
  • Loading branch information
cloud-fan authored and rxin committed Mar 8, 2016
1 parent ad3c9a9 commit 46881b4
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class Analyzer(
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
DistinctAggregationRewriter(conf) ::
HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Nondeterministic", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Complete}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPlan}
Expand Down Expand Up @@ -100,13 +99,10 @@ import org.apache.spark.sql.types.IntegerType
* we could improve this in the current rule by applying more advanced expression cannocalization
* techniques.
*/
case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalPlan] {
object DistinctAggregationRewriter extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case p if !p.resolved => p
// We need to wait until this Aggregate operator is resolved.
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case a: Aggregate => rewrite(a)
case p => p
}

def rewrite(a: Aggregate): Aggregate = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import scala.collection.immutable.HashSet

import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubqueryAliases}
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
Expand All @@ -42,7 +42,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
ComputeCurrentTime) ::
ComputeCurrentTime,
DistinctAggregationRewriter) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
""".stripMargin)
}


test("intersect") {
checkHiveQl("SELECT * FROM t0 INTERSECT SELECT * FROM t0")
}
Expand Down Expand Up @@ -367,9 +366,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
checkHiveQl("SELECT * FROM parquet_t0 TABLESAMPLE(0.1 PERCENT) WHERE 1=0")
}

// TODO Enable this
// Query plans transformed by DistinctAggregationRewriter are not recognized yet
ignore("multi-distinct columns") {
test("multi-distinct columns") {
checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM parquet_t2 GROUP BY a")
}

Expand Down

0 comments on commit 46881b4

Please sign in to comment.