Skip to content

Commit

Permalink
SQL generation support for cube, rollup, and grouping set
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Feb 20, 2016
1 parent 95e1ab2 commit bc0c030
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,11 @@ case class GroupingID(groupByExprs: Seq[Expression]) extends Expression with Une
override def children: Seq[Expression] = groupByExprs
override def dataType: DataType = IntegerType
override def nullable: Boolean = false

// TODO: remove this when SPARK-12799 is resolved. That will provide a general to-sql solution
// for all the expressions.
override def sql: String = {
val childrenSQL = children.map(_.sql).mkString(", ")
s"grouping_id($childrenSQL)"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.util.control.NonFatal
import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
Expand Down Expand Up @@ -74,6 +74,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case p: Project =>
projectToSQL(p, isDistinct = false)

case a @ Aggregate(_, _, e @ Expand(_, _, p: Project)) =>
groupingSetToSQL(a, e, p)

case p: Aggregate =>
aggregateToSQL(p)

Expand Down Expand Up @@ -172,6 +175,58 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
)
}

private def groupingSetToSQL(
plan: Aggregate,
expand: Expand,
project: Project): String = {
// In cube/rollup/groupingsets, Analyzer creates new aliases for all group by expressions.
// Since conversion from attribute back SQL ignore expression IDs, the alias of attribute
// references are ignored in aliasMap
val aliasMap = AttributeMap(project.projectList.collect {
case a @ Alias(child, name) if !child.isInstanceOf[AttributeReference] => (a.toAttribute, a)
})

val aggExprs = plan.aggregateExpressions.map{
// VirtualColumn.groupingIdName is added by Analyzer, and thus remove it.
case a @ Alias(child: AttributeReference, name)
if child.name == VirtualColumn.groupingIdName =>
Alias(GroupingID(Nil), name)()
case a @ Alias(child: AttributeReference, name) if aliasMap.contains(child) =>
aliasMap(child).child
case o => o
}

val groupingExprs = plan.groupingExpressions.filterNot {
case a: NamedExpression => a.name == VirtualColumn.groupingIdName
case o => false
}.map {
case a: AttributeReference if aliasMap.contains(a) => aliasMap(a).child
case o => o
}

val groupingSQL = groupingExprs.map(_.sql).mkString(", ")

val groupingSet = expand.projections.map(_.filter {
case _: Literal => false
case e: Expression if plan.groupingExpressions.exists(_.semanticEquals(e)) => true
case _ => false
}.map {
case a: AttributeReference if aliasMap.contains(a) => aliasMap(a).child
case o => o
})

build(
"SELECT",
aggExprs.map(_.sql).mkString(", "),
if (plan.child == OneRowRelation) "" else "FROM",
toSQL(project.child),
if (groupingSQL.isEmpty) "" else "GROUP BY",
groupingSQL,
"GROUPING SETS",
"(" + groupingSet.map(e => s"(${e.map(_.sql).mkString(", ")})").mkString(", ") + ")"
)
}

object Canonicalizer extends RuleExecutor[LogicalPlan] {
override protected def batches: Seq[Batch] = Seq(
Batch("Canonicalizer", FixedPoint(100),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,97 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
checkHiveQl("SELECT DISTINCT id FROM t0")
}

test("rollup/cube #1") {
// Original logical plan:
// Aggregate [(key#17L % cast(5 as bigint))#47L,grouping__id#46],
// [(count(1),mode=Complete,isDistinct=false) AS cnt#43L,
// (key#17L % cast(5 as bigint))#47L AS _c1#45L,
// grouping__id#46 AS _c2#44]
// +- Expand [List(key#17L, value#18, (key#17L % cast(5 as bigint))#47L, 0),
// List(key#17L, value#18, null, 1)],
// [key#17L,value#18,(key#17L % cast(5 as bigint))#47L,grouping__id#46]
// +- Project [key#17L,
// value#18,
// (key#17L % cast(5 as bigint)) AS (key#17L % cast(5 as bigint))#47L]
// +- Subquery t1
// +- Relation[key#17L,value#18] ParquetRelation
// Converted SQL:
// SELECT count( 1) AS `cnt`,
// (`t1`.`key` % CAST(5 AS BIGINT)),
// grouping_id() AS `_c2`
// FROM `default`.`t1`
// GROUP BY (`t1`.`key` % CAST(5 AS BIGINT))
// GROUPING SETS (((`t1`.`key` % CAST(5 AS BIGINT))), ())
checkHiveQl(
"SELECT count(*) as cnt, key % 5, grouping_id() FROM t1 GROUP BY key % 5 WITH ROLLUP")
checkHiveQl(
"SELECT count(*) as cnt, key % 5, grouping_id() FROM t1 GROUP BY key % 5 WITH CUBE")
}

test("rollup/cube #2") {
checkHiveQl("SELECT key, value, count(value) FROM t1 GROUP BY key, value WITH ROLLUP")
checkHiveQl("SELECT key, value, count(value) FROM t1 GROUP BY key, value WITH CUBE")
}

test("rollup/cube #3") {
checkHiveQl(
"SELECT key, count(value), grouping_id() FROM t1 GROUP BY key, value WITH ROLLUP")
checkHiveQl(
"SELECT key, count(value), grouping_id() FROM t1 GROUP BY key, value WITH CUBE")
}

test("rollup/cube #4") {
checkHiveQl(
s"""
|SELECT count(*) as cnt, key % 5 as k1, key - 5 as k2, grouping_id() FROM t1
|GROUP BY key % 5, key - 5 WITH ROLLUP
""".stripMargin)
checkHiveQl(
s"""
|SELECT count(*) as cnt, key % 5 as k1, key - 5 as k2, grouping_id() FROM t1
|GROUP BY key % 5, key - 5 WITH CUBE
""".stripMargin)
}

test("rollup/cube #5") {
checkHiveQl(
s"""
|SELECT count(*) AS cnt, key % 5 AS k1, key-5 AS k2, grouping_id() AS k3
|FROM (SELECT key, key%2, key - 5 FROM t1) t GROUP BY key%5, key-5
|WITH ROLLUP
""".stripMargin)
checkHiveQl(
s"""
|SELECT count(*) AS cnt, key % 5 AS k1, key-5 AS k2, grouping_id() AS k3
|FROM (SELECT key, key%2, key - 5 FROM t1) t GROUP BY key%5, key-5
|WITH CUBE
""".stripMargin)
}

test("rollup/cube #6") {
checkHiveQl("SELECT a, b, sum(c) FROM t2 GROUP BY ROLLUP(a, b) ORDER BY a, b")
checkHiveQl("SELECT a, b, sum(c) FROM t2 GROUP BY CUBE(a, b) ORDER BY a, b")
checkHiveQl("SELECT a, b, sum(a) FROM t2 GROUP BY ROLLUP(a, b) ORDER BY a, b")
checkHiveQl("SELECT a, b, sum(a) FROM t2 GROUP BY CUBE(a, b) ORDER BY a, b")
checkHiveQl("SELECT a + b, b, sum(a - b) FROM t2 GROUP BY a + b, b WITH ROLLUP")
checkHiveQl("SELECT a + b, b, sum(a - b) FROM t2 GROUP BY a + b, b WITH CUBE")
}

test("grouping sets #1") {
checkHiveQl(
s"""
|SELECT count(*) AS cnt, key % 5 AS k1, key - 5 AS k2, grouping_id() AS k3
|FROM (SELECT key, key % 2, key - 5 FROM t1) t GROUP BY key % 5, key - 5
|GROUPING SETS (key % 5, key - 5)
""".stripMargin)
}

test("grouping sets #2") {
checkHiveQl("SELECT a, b, sum(c) FROM t2 GROUP BY a, b GROUPING SETS (a, b) ORDER BY a, b")
checkHiveQl("SELECT a, b, sum(c) FROM t2 GROUP BY a, b GROUPING SETS (a) ORDER BY a, b")
checkHiveQl("SELECT a, b, sum(c) FROM t2 GROUP BY a, b GROUPING SETS (b) ORDER BY a, b")
}

test("cluster by") {
checkHiveQl("SELECT id FROM t0 CLUSTER BY id")
}
Expand Down

0 comments on commit bc0c030

Please sign in to comment.