Skip to content

Commit

Permalink
[SPARK-19451][SQL] rangeBetween method should accept Long value as bo…
Browse files Browse the repository at this point in the history
…undary

## What changes were proposed in this pull request?

Long values can be passed to `rangeBetween` as range frame boundaries, but we silently convert it to Int values, this can cause wrong results and we should fix this.

Further more, we should accept any legal literal values as range frame boundaries. In this PR, we make it possible for Long values, and make accepting other DataTypes really easy to add.

This PR is mostly based on Herman's previous amazing work: hvanhovell@596f53c

After this been merged, we can close apache#16818 .

## How was this patch tested?

Add new tests in `DataFrameWindowFunctionsSuite` and `TypeCoercionSuite`.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes apache#18540 from jiangxb1987/rangeFrame.

(cherry picked from commit 92d8563)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
jiangxb1987 authored and gatorsmile committed Jul 29, 2017
1 parent 24a9bac commit 66fa6bd
Show file tree
Hide file tree
Showing 17 changed files with 533 additions and 304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,9 @@ trait CheckAnalysis extends PredicateHelper {
case w @ WindowExpression(AggregateExpression(_, _, true, _), _) =>
failAnalysis(s"Distinct window functions are not supported: $w")

case w @ WindowExpression(_: OffsetWindowFunction, WindowSpecDefinition(_, order,
SpecifiedWindowFrame(frame,
FrameBoundary(l),
FrameBoundary(h))))
if order.isEmpty || frame != RowFrame || l != h =>
case w @ WindowExpression(_: OffsetWindowFunction,
WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame))
if order.isEmpty || !frame.isOffset =>
failAnalysis("An offset window function can only be evaluated in an ordered " +
s"row-based window frame with a single offset: $w")

Expand All @@ -119,15 +117,10 @@ trait CheckAnalysis extends PredicateHelper {
// function.
e match {
case _: AggregateExpression | _: OffsetWindowFunction | _: AggregateWindowFunction =>
w
case _ =>
failAnalysis(s"Expression '$e' not supported within a window function.")
}
// Make sure the window specification is valid.
s.validate match {
case Some(m) =>
failAnalysis(s"Window specification $s is not valid because $m")
case None => w
}

case s @ ScalarSubquery(query, conditions, _) =>
checkAnalysis(query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ object TypeCoercion {
PropagateTypes ::
ImplicitTypeCasts ::
DateTimeOperations ::
WindowFrameCoercion ::
Nil

// See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
Expand Down Expand Up @@ -785,4 +786,26 @@ object TypeCoercion {
Option(ret)
}
}

/**
* Cast WindowFrame boundaries to the type they operate upon.
*/
object WindowFrameCoercion extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case s @ WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper))
if order.resolved =>
s.copy(frameSpecification = SpecifiedWindowFrame(
RangeFrame,
createBoundaryCast(lower, order.dataType),
createBoundaryCast(upper, order.dataType)))
}

private def createBoundaryCast(boundary: Expression, dt: DataType): Expression = {
boundary match {
case e: SpecialFrameBoundary => e
case e: Expression if e.dataType != dt && Cast.canCast(e.dataType, dt) => Cast(e, dt)
case _ => boundary
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ package object expressions {
def initialize(partitionIndex: Int): Unit = {}
}

/**
* An identity projection. This returns the input row.
*/
object IdentityProjection extends Projection {
override def apply(row: InternalRow): InternalRow = row
}

/**
* Converts a [[InternalRow]] to another Row given a sequence of expression that define each
* column of the new row. If the schema of the input row is specified, then the given expression
Expand Down
Loading

0 comments on commit 66fa6bd

Please sign in to comment.