Skip to content
Permalink
Browse files

[SPARK-24168][SQL] WindowExec should not access SQLConf at executor side

## What changes were proposed in this pull request?

This PR is extracted from #21190 , to make it easier to backport.

`WindowExec#createBoundOrdering` is called on executor side, so we can't use `conf.sessionLocalTimezone` there.

## How was this patch tested?

tested in #21190

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21225 from cloud-fan/minor3.

(cherry picked from commit e646ae6)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information...
cloud-fan authored and gatorsmile committed May 4, 2018
1 parent 8509284 commit d35eb2f9b0af1a625749ca8b7f12d8eceed28766
Showing with 9 additions and 6 deletions.
  1. +9 −6 sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -112,9 +112,11 @@ case class WindowExec(
*
* @param frame to evaluate. This can either be a Row or Range frame.
* @param bound with respect to the row.
* @param timeZone the session local timezone for time related calculations.
* @return a bound ordering object.
*/
private[this] def createBoundOrdering(frame: FrameType, bound: Expression): BoundOrdering = {
private[this] def createBoundOrdering(
frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
(frame, bound) match {
case (RowFrame, CurrentRow) =>
RowBoundOrdering(0)
@@ -144,7 +146,7 @@ case class WindowExec(
val boundExpr = (expr.dataType, boundOffset.dataType) match {
case (DateType, IntegerType) => DateAdd(expr, boundOffset)
case (TimestampType, CalendarIntervalType) =>
TimeAdd(expr, boundOffset, Some(conf.sessionLocalTimeZone))
TimeAdd(expr, boundOffset, Some(timeZone))
case (a, b) if a== b => Add(expr, boundOffset)
}
val bound = newMutableProjection(boundExpr :: Nil, child.output)
@@ -197,6 +199,7 @@ case class WindowExec(

// Map the groups to a (unbound) expression and frame factory pair.
var numExpressions = 0
val timeZone = conf.sessionLocalTimeZone
framedFunctions.toSeq.map {
case (key, (expressions, functionSeq)) =>
val ordinal = numExpressions
@@ -237,7 +240,7 @@ case class WindowExec(
new UnboundedPrecedingWindowFunctionFrame(
target,
processor,
createBoundOrdering(frameType, upper))
createBoundOrdering(frameType, upper, timeZone))
}

// Shrinking Frame.
@@ -246,7 +249,7 @@ case class WindowExec(
new UnboundedFollowingWindowFunctionFrame(
target,
processor,
createBoundOrdering(frameType, lower))
createBoundOrdering(frameType, lower, timeZone))
}

// Moving Frame.
@@ -255,8 +258,8 @@ case class WindowExec(
new SlidingWindowFunctionFrame(
target,
processor,
createBoundOrdering(frameType, lower),
createBoundOrdering(frameType, upper))
createBoundOrdering(frameType, lower, timeZone),
createBoundOrdering(frameType, upper, timeZone))
}
}

0 comments on commit d35eb2f

Please sign in to comment.
You can’t perform that action at this time.