From bf1878d01da836dec6bdd3a34a1936f84bdbfb5c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 12 Oct 2014 19:56:15 +0800 Subject: [PATCH 1/2] Adds checks for non-aggregate attributes with aggregation --- .../sql/catalyst/analysis/Analyzer.scala | 36 ++++++++++++++++--- .../org/apache/spark/sql/SQLQuerySuite.scala | 26 ++++++++++++++ 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fe83eb12502dc..f07336a4fef60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -63,7 +63,8 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool typeCoercionRules ++ extendedRules : _*), Batch("Check Analysis", Once, - CheckResolution), + CheckResolution, + CheckAggregation), Batch("AnalysisOperators", fixedPoint, EliminateAnalysisOperators) ) @@ -88,6 +89,32 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool } } + /** + * Checks for non-aggregated attributes with aggregation + */ + object CheckAggregation extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + plan.transform { + case aggregatePlan @ Aggregate(groupingExprs, aggregateExprs, child) => + def isValidAggregateExpression(expr: Expression): Boolean = expr match { + case _: AggregateExpression => true + case e: Attribute if groupingExprs.contains(e) => true + case e if groupingExprs.contains(e) => true + case e if e.references.isEmpty => true + case e => e.children.forall(isValidAggregateExpression) + } + + aggregateExprs.foreach { e => + if (!isValidAggregateExpression(e)) { + throw new TreeNodeException(plan, s"Expression not in GROUP BY: $e") + } + } + + aggregatePlan + } + } + } + /** * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ @@ -204,18 +231,17 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool */ object UnresolvedHavingClauseAttributes extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case filter @ Filter(havingCondition, aggregate @ Aggregate(_, originalAggExprs, _)) + case filter @ Filter(havingCondition, aggregate @ Aggregate(_, originalAggExprs, _)) if aggregate.resolved && containsAggregate(havingCondition) => { val evaluatedCondition = Alias(havingCondition, "havingCondition")() val aggExprsWithHaving = evaluatedCondition +: originalAggExprs - + Project(aggregate.output, Filter(evaluatedCondition.toAttribute, aggregate.copy(aggregateExpressions = aggExprsWithHaving))) } - } - + protected def containsAggregate(condition: Expression): Boolean = condition .collect { case ae: AggregateExpression => ae } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index a94022c0cf6e3..15f6ba4f72bbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.joins.BroadcastHashJoin import org.apache.spark.sql.test._ import org.scalatest.BeforeAndAfterAll @@ -694,4 +695,29 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { checkAnswer( sql("SELECT CASE WHEN key = 1 THEN 1 ELSE 2 END FROM testData WHERE key = 1 group by key"), 1) } + + test("throw errors for non-aggregate attributes with aggregation") { + def checkAggregation(query: String, isInvalidQuery: Boolean = true) { + val logicalPlan = sql(query).queryExecution.logical + + if (isInvalidQuery) { + val e = intercept[TreeNodeException[LogicalPlan]](sql(query).queryExecution.analyzed) + assert( + e.getMessage.startsWith("Expression not in GROUP BY"), + "Non-aggregate attribute(s) not detected\n" + logicalPlan) + } else { + // Should not throw + sql(query).queryExecution.analyzed + } + } + + checkAggregation("SELECT key, COUNT(*) FROM testData") + checkAggregation("SELECT COUNT(key), COUNT(*) FROM testData", false) + + checkAggregation("SELECT value, COUNT(*) FROM testData GROUP BY key") + checkAggregation("SELECT COUNT(value), SUM(key) FROM testData GROUP BY key", false) + + checkAggregation("SELECT key + 2, COUNT(*) FROM testData GROUP BY key + 1") + checkAggregation("SELECT key + 1 + 1, COUNT(*) FROM testData GROUP BY key + 1", false) + } } From 524600477543bb7eb552c62b45884857e911dc19 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 13 Oct 2014 11:13:28 +0800 Subject: [PATCH 2/2] Passes test suites --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f07336a4fef60..82553063145b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -98,7 +98,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool case aggregatePlan @ Aggregate(groupingExprs, aggregateExprs, child) => def isValidAggregateExpression(expr: Expression): Boolean = expr match { case _: AggregateExpression => true - case e: Attribute if groupingExprs.contains(e) => true + case e: Attribute => groupingExprs.contains(e) case e if groupingExprs.contains(e) => true case e if e.references.isEmpty => true case e => e.children.forall(isValidAggregateExpression)