Skip to content

Commit

Permalink
Allow any literals in window range frames.
Browse files Browse the repository at this point in the history
  • Loading branch information
hvanhovell committed Feb 17, 2017
1 parent 6a9a85b commit 596f53c
Show file tree
Hide file tree
Showing 19 changed files with 392 additions and 409 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class Analyzer(
case WithWindowDefinition(windowDefinitions, child) =>
child.transform {
case p => p.transformExpressions {
case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
case WindowExpression(c, WindowSpecReference(windowName)) =>
val errorMessage =
s"Window specification $windowName is not defined in the WINDOW clause."
val windowSpecDefinition =
Expand Down Expand Up @@ -1939,7 +1939,7 @@ class Analyzer(
// Second, we group extractedWindowExprBuffer based on their Partition and Order Specs.
val groupedWindowExpressions = extractedWindowExprBuffer.groupBy { expr =>
val distinctWindowSpec = expr.collect {
case window: WindowExpression => window.windowSpec
case WindowExpression(_, spec) => spec
}.distinct

// We do a final check and see if we only have a single Window Spec defined in an
Expand Down Expand Up @@ -2101,18 +2101,14 @@ class Analyzer(
object ResolveWindowFrame extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case logical: LogicalPlan => logical transformExpressions {
case WindowExpression(wf: WindowFunction,
WindowSpecDefinition(_, _, f: SpecifiedWindowFrame))
if wf.frame != UnspecifiedFrame && wf.frame != f =>
case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, Some(f)))
if wf.frame != f =>
failAnalysis(s"Window Frame $f must match the required frame ${wf.frame}")
case WindowExpression(wf: WindowFunction,
s @ WindowSpecDefinition(_, o, UnspecifiedFrame))
if wf.frame != UnspecifiedFrame =>
WindowExpression(wf, s.copy(frameSpecification = wf.frame))
case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o, UnspecifiedFrame))
if e.resolved =>
val frame = SpecifiedWindowFrame.defaultWindowFrame(o.nonEmpty, acceptWindowFrame = true)
we.copy(windowSpec = s.copy(frameSpecification = frame))
case WindowExpression(wf: WindowFunction, s @ WindowSpecDefinition(_, _, None)) =>
WindowExpression(wf, s.copy(frameSpecification = Option(wf.frame)))
case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o, None)) if e.resolved =>
val frame = WindowFrame.defaultWindowFrame(o.nonEmpty, acceptWindowFrame = true)
we.copy(windowSpec = s.copy(frameSpecification = Option(frame)))
}
}
}
Expand All @@ -2123,12 +2119,14 @@ class Analyzer(
object ResolveWindowOrder extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case logical: LogicalPlan => logical transformExpressions {
case WindowExpression(wf: WindowFunction, spec) if spec.orderSpec.isEmpty =>
case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, orderSpec, _))
if orderSpec.isEmpty =>
failAnalysis(s"Window function $wf requires window to be ordered, please add ORDER BY " +
s"clause. For example SELECT $wf(value_expr) OVER (PARTITION BY window_partition " +
s"ORDER BY window_ordering) from table")
case WindowExpression(rank: RankLike, spec) if spec.resolved =>
val order = spec.orderSpec.map(_.child)
case WindowExpression(rank: RankLike, spec @ WindowSpecDefinition(_, orderSpec, _))
if spec.resolved =>
val order = orderSpec.map(_.child)
WindowExpression(rank.withOrder(order), spec)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog.SimpleCatalogRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.UsingJoin
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -95,11 +93,8 @@ trait CheckAnalysis extends PredicateHelper {
case w @ WindowExpression(AggregateExpression(_, _, true, _), _) =>
failAnalysis(s"Distinct window functions are not supported: $w")

case w @ WindowExpression(_: OffsetWindowFunction, WindowSpecDefinition(_, order,
SpecifiedWindowFrame(frame,
FrameBoundary(l),
FrameBoundary(h))))
if order.isEmpty || frame != RowFrame || l != h =>
case w @ WindowExpression(_: OffsetWindowFunction, spec: WindowSpecDefinition)
if !spec.frameSpecification.exists(_.isOffset) =>
failAnalysis("An offset window function can only be evaluated in an ordered " +
s"row-based window frame with a single offset: $w")

Expand All @@ -108,15 +103,10 @@ trait CheckAnalysis extends PredicateHelper {
// function.
e match {
case _: AggregateExpression | _: OffsetWindowFunction | _: AggregateWindowFunction =>
w
case _ =>
failAnalysis(s"Expression '$e' not supported within a window function.")
}
// Make sure the window specification is valid.
s.validate match {
case Some(m) =>
failAnalysis(s"Window specification $s is not valid because $m")
case None => w
}

case s @ ScalarSubquery(query, conditions, _) =>
// If no correlation, the output must be exactly one column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ object TypeCoercion {
PropagateTypes ::
ImplicitTypeCasts ::
DateTimeOperations ::
WindowFrameCoercion ::
Nil

// See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
Expand Down Expand Up @@ -726,4 +727,25 @@ object TypeCoercion {
Option(ret)
}
}

/**
* Cast WindowFrame boundaries to the type they operate upon.
*/
object WindowFrameCoercion extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case s @ WindowSpecDefinition(_, Seq(order), Some(WindowFrame(RangeFrame, lower, upper)))
if order.resolved =>
s.copy(frameSpecification = Option(
WindowFrame(
RangeFrame,
createBoundaryCast(lower, order.dataType),
createBoundaryCast(upper, order.dataType))))
}

private def createBoundaryCast(boundary: AnyRef, dt: DataType): AnyRef = boundary match {
case e: Expression if e.dataType != dt && Cast.canCast(e.dataType, dt) =>
Cast(e, dt)
case _ => boundary
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ package object dsl {
def windowSpec(
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
frame: WindowFrame): WindowSpecDefinition =
frame: Option[WindowFrame] = None): WindowSpecDefinition =
WindowSpecDefinition(partitionSpec, orderSpec, frame)

def windowExpr(windowFunc: Expression, windowSpec: WindowSpecDefinition): WindowExpression =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ package object expressions {
def initialize(partitionIndex: Int): Unit = {}
}

/**
* An identity projection. This returns the input row.
*/
object IdentityProjection extends Projection {
override def apply(row: InternalRow): InternalRow = row
}

/**
* Converts a [[InternalRow]] to another Row given a sequence of expression that define each
* column of the new row. If the schema of the input row is specified, then the given expression
Expand Down
Loading

0 comments on commit 596f53c

Please sign in to comment.