Skip to content

Commit

Permalink
SQL generation support for window functions
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Mar 7, 2016
1 parent 8ff8809 commit 3ce072b
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ class Analyzer(

// Finally, we create a Project to output currentChild's output
// newExpressionsWithWindowFunctions.
Project(currentChild.output ++ newExpressionsWithWindowFunctions, currentChild)
Project(child.output ++ newExpressionsWithWindowFunctions, currentChild)
} // end of addWindow

// We have to use transformDown at here to make sure the rule of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ case class SortOrder(child: Expression, direction: SortDirection)
override def dataType: DataType = child.dataType
override def nullable: Boolean = child.nullable

override def toString: String = s"$child ${if (direction == Ascending) "ASC" else "DESC"}"
override def toString: String = s"$child ${direction.sql}"
override def sql: String = child.sql + " " + direction.sql

def isAscending: Boolean = direction == Ascending
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ case class WindowSpecDefinition(
override def nullable: Boolean = true
override def foldable: Boolean = false
override def dataType: DataType = throw new UnsupportedOperationException

override def sql: String = {
val partition = if (partitionSpec.isEmpty) {
""
} else {
"PARTITION BY " + partitionSpec.map(_.sql).mkString(", ")
}

val order = if (orderSpec.isEmpty) {
""
} else {
"ORDER BY " + orderSpec.map(_.sql).mkString(", ")
}

s"($partition $order ${frameSpecification.toString})"
}
}

/**
Expand Down Expand Up @@ -278,6 +294,7 @@ case class WindowExpression(
override def nullable: Boolean = windowFunction.nullable

override def toString: String = s"$windowFunction $windowSpec"
override def sql: String = windowFunction.sql + " OVER " + windowSpec.sql
}

/**
Expand Down
37 changes: 35 additions & 2 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class SubqueryHolder(query: String) extends LeafExpression with Unevaluable
}

/**
* A builder class used to convert a resolved logical plan into a SQL query string. Note that this
* A builder class used to convert a resolved logical plan into a SQL query string. Note that not
* all resolved logical plan are convertible. They either don't have corresponding SQL
* representations (e.g. logical plans that operate on local Scala collections), or are simply not
* supported by this builder (yet).
Expand Down Expand Up @@ -103,6 +103,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case p: Aggregate =>
aggregateToSQL(p)

case w: Window =>
windowToSQL(w)

case Limit(limitExpr, child) =>
s"${toSQL(child)} LIMIT ${limitExpr.sql}"

Expand Down Expand Up @@ -179,7 +182,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
build(
toSQL(p.child),
if (p.global) "ORDER BY" else "SORT BY",
p.order.map { case SortOrder(e, dir) => s"${e.sql} ${dir.sql}" }.mkString(", ")
p.order.map(_.sql).mkString(", ")
)

case p: RepartitionByExpression =>
Expand Down Expand Up @@ -297,6 +300,15 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
)
}

private def windowToSQL(w: Window): String = {
build(
"SELECT",
(w.child.output ++ w.windowExpressions).map(_.sql).mkString(", "),
if (w.child == OneRowRelation) "" else "FROM",
toSQL(w.child)
)
}

object Canonicalizer extends RuleExecutor[LogicalPlan] {
override protected def batches: Seq[Batch] = Seq(
Batch("Canonicalizer", FixedPoint(100),
Expand Down Expand Up @@ -340,6 +352,27 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi

case plan: Project =>
wrapChildWithSubquery(plan)

case w @ Window(_, _, _, _,
_: SubqueryAlias
| _: Filter
| _: Join
| _: MetastoreRelation
| OneRowRelation
| _: LocalLimit
| _: GlobalLimit
| _: Sample
) => w

case w: Window =>
val alias = SQLBuilder.newSubqueryName
val childAttributes = w.child.outputSet
val qualified = w.windowExpressions.map(_.transform {
case a: Attribute if childAttributes.contains(a) =>
a.withQualifiers(alias :: Nil)
}.asInstanceOf[NamedExpression])

w.copy(windowExpressions = qualified, child = SubqueryAlias(alias, w.child))
}

def wrapChildWithSubquery(project: Project): Project = project match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,4 +445,19 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
"f1", "b[0].f1", "f1", "c[foo]", "d[0]"
)
}

test("window functions") {
checkHiveQl("SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY d) AS sum FROM parquet_t2")
checkHiveQl(
"""
|SELECT a + 1, SUM(b * 2) OVER (PARTITION BY c + d ORDER BY c - d) AS sum
|FROM parquet_t2
""".stripMargin)
checkHiveQl(
"""
|SELECT a, SUM(b) OVER w1 AS sum, AVG(b) over w2 AS avg
|FROM parquet_t2
|WINDOW w1 AS (PARTITION BY c ORDER BY d), w2 AS (PARTITION BY d ORDER BY c)
""".stripMargin)
}
}

0 comments on commit 3ce072b

Please sign in to comment.