Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12720] [SQL] SQL Generation Support for Cube, Rollup, and Grouping Sets #11283

Closed
wants to merge 52 commits into from
Closed
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
fab24cf
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 1, 2016
8b2e33b
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 5, 2016
2ee1876
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 11, 2016
b9f0090
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 12, 2016
ade6f7e
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 15, 2016
9fd63d2
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 19, 2016
bc0c030
SQL generation support for cube, rollup, and grouping set
gatorsmile Feb 20, 2016
9292abe
Merge remote-tracking branch 'upstream/master' into groupingSetsToSQL
gatorsmile Feb 20, 2016
5199d49
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 22, 2016
ea5c567
Merge branch 'groupingSetsToSQL' into groupingSetsToSQLNew
gatorsmile Feb 22, 2016
a408325
override the pretty name of groupingid
gatorsmile Feb 22, 2016
3514213
fixed the test case
gatorsmile Feb 23, 2016
6a36593
support grouping()
gatorsmile Feb 23, 2016
404214c
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 23, 2016
bc5e96f
Merge remote-tracking branch 'upstream/master' into groupingSetsToSQLNew
gatorsmile Feb 23, 2016
1aef849
style fix.
gatorsmile Feb 23, 2016
c001dd9
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 25, 2016
9b1d420
Merge remote-tracking branch 'upstream/master' into groupingSetsToSQLNew
gatorsmile Mar 2, 2016
6f79df1
resolve comments.
gatorsmile Mar 2, 2016
37a9d8d
resolve comments.
gatorsmile Mar 3, 2016
640e45c
resolve comments.
gatorsmile Mar 3, 2016
749be1b
resolve comments.
gatorsmile Mar 3, 2016
ae768fe
resolve comments.
gatorsmile Mar 3, 2016
6cea658
resolve comments.
gatorsmile Mar 3, 2016
5d8923c
Merge remote-tracking branch 'upstream/master' into groupingSetsToSQLNew
gatorsmile Mar 3, 2016
b1925e5
cleaned the comment.
gatorsmile Mar 3, 2016
6f609fb
address comments.
gatorsmile Mar 4, 2016
9eaca51
address comments.
gatorsmile Mar 4, 2016
b8786b2
address comments.
gatorsmile Mar 5, 2016
59daa48
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 5, 2016
385c0d9
Merge branch 'groupingSetsToSQLNew' into groupingSetsToSQLNewNewNew
gatorsmile Mar 5, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,13 @@ def grouping_id(*cols):
grouping columns).

>>> df.cube("name").agg(grouping_id(), sum("age")).orderBy("name").show()
+-----+------------+--------+
| name|groupingid()|sum(age)|
+-----+------------+--------+
| null| 1| 7|
|Alice| 0| 2|
| Bob| 0| 5|
+-----+------------+--------+
+-----+-------------+--------+
| name|grouping_id()|sum(age)|
+-----+-------------+--------+
| null| 1| 7|
|Alice| 0| 2|
| Bob| 0| 5|
+-----+-------------+--------+
"""
sc = SparkContext._active_spark_context
jc = sc._jvm.functions.grouping_id(_to_seq(sc, cols, _to_java_column))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,5 @@ case class GroupingID(groupByExprs: Seq[Expression]) extends Expression with Une
override def children: Seq[Expression] = groupByExprs
override def dataType: DataType = IntegerType
override def nullable: Boolean = false
override def prettyName: String = "grouping_id"
}
74 changes: 72 additions & 2 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import scala.util.control.NonFatal
import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, NonSQLExpression,
SortOrder}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.{ByteType, IntegerType}

/**
* A builder class used to convert a resolved logical plan into a SQL query string. Note that this
Expand Down Expand Up @@ -107,6 +107,11 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case p: Project =>
projectToSQL(p, isDistinct = false)

case a @ Aggregate(_, _, e @ Expand(_, _, p: Project))
if sameOutput(e.output,
p.child.output ++ a.groupingExpressions.map(_.asInstanceOf[Attribute])) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry misses this: should check isInstanceOf before calling asInstanceOf directly.
We can put all of it in one method and use it as if condition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing it out! I will be more careful next time.

groupingSetToSQL(a, e, p)

case p: Aggregate =>
aggregateToSQL(p)

Expand Down Expand Up @@ -203,6 +208,10 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
throw new UnsupportedOperationException(s"unsupported plan $node")
}

private def sameOutput(left: Seq[Attribute], right: Seq[Attribute]): Boolean =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

left.forall(a => right.exists(_.semanticEquals(a))) &&
right.forall(a => left.exists(_.semanticEquals(a)))

/**
* Turns a bunch of string segments into a single string and separate each segment by a space.
* The segments are trimmed so only a single space appears in the separation.
Expand Down Expand Up @@ -233,6 +242,67 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
)
}

private def groupingSetToSQL(
plan: Aggregate,
expand: Expand,
project: Project): String = {
require(plan.groupingExpressions.length > 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think assert is better here, as if it breaks, it means something goes wrong in our system.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Thanks!


// The last column of Expand is always grouping ID
val gid = expand.output.last

val groupByAttributes = plan.groupingExpressions.dropRight(1).map(_.asInstanceOf[Attribute])
val groupByExprs =
project.projectList.drop(project.child.output.length).map(_.asInstanceOf[Alias].child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can define val numOriginalOutput = project.child.output.length first, and make val groupByExprs = ... fit it one line. Also add comment to explain the assumption here please.

val groupByAttrMap = AttributeMap(groupByAttributes.zip(groupByExprs))
val groupingSQL = groupByExprs.map(_.sql).mkString(", ")

val groupingSet = expand.projections.map { project =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Would be nice to add type annotation to groupingSet since it's a relatively complex nested data structure and can be hard to grasp.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

// Assumption: expand.projections are composed of
// 1) the original output (Project's child.output),
// 2) group by attributes(or null literal)
// 3) gid, which is always the last one in each project in Expand
project.dropRight(1).collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments to explain our assumption here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: project.drop(numOriginalOutput).dropRight(1) matches our assumption better, although correctness doesn't change.

case attr: Attribute if groupByAttrMap.contains(attr) => groupByAttrMap(attr)
}
}
val groupingSetSQL =
"GROUPING SETS(" +
groupingSet.map(e => s"(${e.map(_.sql).mkString(", ")})").mkString(", ") + ")"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the above be simplified to:

val groupByAttributes = plan.groupingExpressions.dropRight(1).map(_.asInstanceOf[Attribute])
val groupByAttrMap = AttributeMap(groupByAttributes.zip(
  project.projectList.drop(project.child.output.length).map(_.asInstanceOf[Alias].child))) 
val groupingExprs = groupByAliasMap.values.toArray

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The groupingExpressions could be expressions. For example, GROUP BY key % 5, key - 5 WITH ROLLUP. Thus, we are unable to do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the groupingExpressions must be all Attribute, see https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L302-L305 . Did I miss something here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, you are right. Let me change it. : )

val aggExprs = plan.aggregateExpressions.map { case expr =>
expr.transformDown {
// grouping_id() is converted to VirtualColumn.groupingIdName by Analyzer. Revert it back.
case ar: AttributeReference if ar eq gid => GroupingID(Nil)
case a @ Alias(_ @ Cast(BitwiseAnd(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also remove this Alias and the Alias in next case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let me remove it. Thanks!

ShiftRight(ar: AttributeReference, _ @ Literal(value: Any, IntegerType)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Why the _ @?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me remove it.

Literal(1, IntegerType)), ByteType), name) if ar == gid =>
// for converting an expression to its original SQL format grouping(col)
val idx = groupByExprs.length - 1 - value.asInstanceOf[Int]
val groupingCol = groupByExprs.lift(idx)
if (groupingCol.isDefined) {
Grouping(groupingCol.get)
} else {
throw new UnsupportedOperationException(s"unsupported operator $a")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following version might be clearer:

val groupingCol = groupByExprs.applyOrElse(
  idx, throw new UnsupportedOperationException(s"unsupported operator $a")
Grouping(groupingCol)

And I don't quite get the meaning of the exception error message...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, if the value is out of boundary, I thought we should not continue the conversion. After rethinking this, users might call grouping_id() inside such a function. Maybe we should not throw any exception. How about changing it to

groupByExprs.lift(idx).map(Grouping).getOrElse(a)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

case a @ Alias(ar: AttributeReference, _) if groupByAttrMap.contains(ar) =>
groupByAttrMap(ar)
case ar: AttributeReference if groupByAttrMap.contains(ar) =>
groupByAttrMap(ar)
}
}

build(
"SELECT",
aggExprs.map(_.sql).mkString(", "),
if (plan.child == OneRowRelation) "" else "FROM",
toSQL(project.child),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some test cases where project.child itself is a more complicated plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added another two test cases for this. Now, we have three rollup/cube #5 rollup/cube #8" rollup/cube #9 test cases for it.

"GROUP BY",
groupingSQL,
groupingSetSQL
)
}

object Canonicalizer extends RuleExecutor[LogicalPlan] {
override protected def batches: Seq[Batch] = Seq(
Batch("Canonicalizer", FixedPoint(100),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,113 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
checkHiveQl("SELECT DISTINCT id FROM parquet_t0")
}

test("rollup/cube #1") {
// Original logical plan:
// Aggregate [(key#17L % cast(5 as bigint))#47L,grouping__id#46],
// [(count(1),mode=Complete,isDistinct=false) AS cnt#43L,
// (key#17L % cast(5 as bigint))#47L AS _c1#45L,
// grouping__id#46 AS _c2#44]
// +- Expand [List(key#17L, value#18, (key#17L % cast(5 as bigint))#47L, 0),
// List(key#17L, value#18, null, 1)],
// [key#17L,value#18,(key#17L % cast(5 as bigint))#47L,grouping__id#46]
// +- Project [key#17L,
// value#18,
// (key#17L % cast(5 as bigint)) AS (key#17L % cast(5 as bigint))#47L]
// +- Subquery t1
// +- Relation[key#17L,value#18] ParquetRelation
// Converted SQL:
// SELECT count( 1) AS `cnt`,
// (`t1`.`key` % CAST(5 AS BIGINT)),
// grouping_id() AS `_c2`
// FROM `default`.`t1`
// GROUP BY (`t1`.`key` % CAST(5 AS BIGINT))
// GROUPING SETS (((`t1`.`key` % CAST(5 AS BIGINT))), ())
checkHiveQl(
"SELECT count(*) as cnt, key%5, grouping_id() FROM parquet_t1 GROUP BY key % 5 WITH ROLLUP")
checkHiveQl(
"SELECT count(*) as cnt, key%5, grouping_id() FROM parquet_t1 GROUP BY key % 5 WITH CUBE")
}

test("rollup/cube #2") {
checkHiveQl("SELECT key, value, count(value) FROM parquet_t1 GROUP BY key, value WITH ROLLUP")
checkHiveQl("SELECT key, value, count(value) FROM parquet_t1 GROUP BY key, value WITH CUBE")
}

test("rollup/cube #3") {
checkHiveQl(
"SELECT key, count(value), grouping_id() FROM parquet_t1 GROUP BY key, value WITH ROLLUP")
checkHiveQl(
"SELECT key, count(value), grouping_id() FROM parquet_t1 GROUP BY key, value WITH CUBE")
}

test("rollup/cube #4") {
checkHiveQl(
s"""
|SELECT count(*) as cnt, key % 5 as k1, key - 5 as k2, grouping_id() FROM parquet_t1
|GROUP BY key % 5, key - 5 WITH ROLLUP
""".stripMargin)
checkHiveQl(
s"""
|SELECT count(*) as cnt, key % 5 as k1, key - 5 as k2, grouping_id() FROM parquet_t1
|GROUP BY key % 5, key - 5 WITH CUBE
""".stripMargin)
}

test("rollup/cube #5") {
checkHiveQl(
s"""
|SELECT count(*) AS cnt, key % 5 AS k1, key - 5 AS k2, grouping_id(key % 5, key - 5) AS k3
|FROM (SELECT key, key%2, key - 5 FROM parquet_t1) t GROUP BY key%5, key-5
|WITH ROLLUP
""".stripMargin)
checkHiveQl(
s"""
|SELECT count(*) AS cnt, key % 5 AS k1, key - 5 AS k2, grouping_id(key % 5, key - 5) AS k3
|FROM (SELECT key, key % 2, key - 5 FROM parquet_t1) t GROUP BY key % 5, key - 5
|WITH CUBE
""".stripMargin)
}

test("rollup/cube #6") {
checkHiveQl("SELECT a, b, sum(c) FROM parquet_t2 GROUP BY ROLLUP(a, b) ORDER BY a, b")
checkHiveQl("SELECT a, b, sum(c) FROM parquet_t2 GROUP BY CUBE(a, b) ORDER BY a, b")
checkHiveQl("SELECT a, b, sum(a) FROM parquet_t2 GROUP BY ROLLUP(a, b) ORDER BY a, b")
checkHiveQl("SELECT a, b, sum(a) FROM parquet_t2 GROUP BY CUBE(a, b) ORDER BY a, b")
checkHiveQl("SELECT a + b, b, sum(a - b) FROM parquet_t2 GROUP BY a + b, b WITH ROLLUP")
checkHiveQl("SELECT a + b, b, sum(a - b) FROM parquet_t2 GROUP BY a + b, b WITH CUBE")
}

test("rollup/cube #7") {
checkHiveQl("SELECT a, b, grouping_id(a, b) FROM parquet_t2 GROUP BY cube(a, b)")
checkHiveQl("SELECT a, b, grouping(b) FROM parquet_t2 GROUP BY cube(a, b)")
checkHiveQl("SELECT a, b, grouping(a) FROM parquet_t2 GROUP BY cube(a, b)")
}

test("grouping sets #1") {
checkHiveQl(
s"""
|SELECT count(*) AS cnt, key % 5 AS k1, key - 5 AS k2, grouping_id() AS k3
|FROM (SELECT key, key % 2, key - 5 FROM parquet_t1) t GROUP BY key % 5, key - 5
|GROUPING SETS (key % 5, key - 5)
""".stripMargin)
}

test("grouping sets #2") {
checkHiveQl(
"SELECT a, b, sum(c) FROM parquet_t2 GROUP BY a, b GROUPING SETS (a, b) ORDER BY a, b")
checkHiveQl(
"SELECT a, b, sum(c) FROM parquet_t2 GROUP BY a, b GROUPING SETS (a) ORDER BY a, b")
checkHiveQl(
"SELECT a, b, sum(c) FROM parquet_t2 GROUP BY a, b GROUPING SETS (b) ORDER BY a, b")
checkHiveQl(
"SELECT a, b, sum(c) FROM parquet_t2 GROUP BY a, b GROUPING SETS (()) ORDER BY a, b")
checkHiveQl(
s"""
|SELECT a, b, sum(c) FROM parquet_t2 GROUP BY a, b
|GROUPING SETS ((), (a), (a, b)) ORDER BY a, b
""".stripMargin)
}

test("cluster by") {
checkHiveQl("SELECT id FROM parquet_t0 CLUSTER BY id")
}
Expand Down