Skip to content

Commit

Permalink
[SPARK-36091][SQL] Support TimestampNTZ type in expression TimeWindow
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
The current implement of `TimeWindow` only supports `TimestampType`. Spark added a new type `TimestampNTZType`, so we should support `TimestampNTZType` in expression `TimeWindow`.

### Why are the changes needed?
 `TimestampNTZType` similar to `TimestampType`, we should support `TimestampNTZType` in expression `TimeWindow`.

### Does this PR introduce _any_ user-facing change?
'Yes'.
`TimeWindow` will accepts `TimestampNTZType`.

### How was this patch tested?
New tests.

Closes #33341 from beliefer/SPARK-36091.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
2 people authored and gengliangwang committed Jul 19, 2021
1 parent 2f42afc commit 7aa0179
Show file tree
Hide file tree
Showing 4 changed files with 321 additions and 168 deletions.
Expand Up @@ -3874,9 +3874,9 @@ object TimeWindowing extends Rule[LogicalPlan] {
case _ => Metadata.empty
}

def getWindow(i: Int, overlappingWindows: Int): Expression = {
def getWindow(i: Int, overlappingWindows: Int, dataType: DataType): Expression = {
val division = (PreciseTimestampConversion(
window.timeColumn, TimestampType, LongType) - window.startTime) / window.slideDuration
window.timeColumn, dataType, LongType) - window.startTime) / window.slideDuration
val ceil = Ceil(division)
// if the division is equal to the ceiling, our record is the start of a window
val windowId = CaseWhen(Seq((ceil === division, ceil + 1)), Some(ceil))
Expand All @@ -3886,17 +3886,17 @@ object TimeWindowing extends Rule[LogicalPlan] {

CreateNamedStruct(
Literal(WINDOW_START) ::
PreciseTimestampConversion(windowStart, LongType, TimestampType) ::
PreciseTimestampConversion(windowStart, LongType, dataType) ::
Literal(WINDOW_END) ::
PreciseTimestampConversion(windowEnd, LongType, TimestampType) ::
PreciseTimestampConversion(windowEnd, LongType, dataType) ::
Nil)
}

val windowAttr = AttributeReference(
WINDOW_COL_NAME, window.dataType, metadata = metadata)()

if (window.windowDuration == window.slideDuration) {
val windowStruct = Alias(getWindow(0, 1), WINDOW_COL_NAME)(
val windowStruct = Alias(getWindow(0, 1, window.timeColumn.dataType), WINDOW_COL_NAME)(
exprId = windowAttr.exprId, explicitMetadata = Some(metadata))

val replacedPlan = p transformExpressions {
Expand All @@ -3913,7 +3913,8 @@ object TimeWindowing extends Rule[LogicalPlan] {
val overlappingWindows =
math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
val windows =
Seq.tabulate(overlappingWindows)(i => getWindow(i, overlappingWindows))
Seq.tabulate(overlappingWindows)(i =>
getWindow(i, overlappingWindows, window.timeColumn.dataType))

val projections = windows.map(_ +: child.output)

Expand Down
Expand Up @@ -60,10 +60,10 @@ case class TimeWindow(
}

override def child: Expression = timeColumn
override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)
override def inputTypes: Seq[AbstractDataType] = Seq(AnyTimestampType)
override def dataType: DataType = new StructType()
.add(StructField("start", TimestampType))
.add(StructField("end", TimestampType))
.add(StructField("start", child.dataType))
.add(StructField("end", child.dataType))
override def prettyName: String = "window"
final override val nodePatterns: Seq[TreePattern] = Seq(TIME_WINDOW)

Expand Down
Expand Up @@ -21,7 +21,7 @@ import org.scalatest.PrivateMethodTester

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampNTZType, TimestampType}

class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper with PrivateMethodTester {

Expand Down Expand Up @@ -133,4 +133,18 @@ class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper with Priva
assert(applyValue == constructed)
}
}

test("SPARK-36091: Support TimestampNTZ type in expression TimeWindow") {
val timestampWindow =
TimeWindow(Literal(10L, TimestampType), "10 seconds", "10 seconds", "0 seconds")
assert(timestampWindow.child.dataType == TimestampType)
assert(timestampWindow.dataType == StructType(
Seq(StructField("start", TimestampType), StructField("end", TimestampType))))

val timestampNTZWindow =
TimeWindow(Literal(10L, TimestampNTZType), "10 seconds", "10 seconds", "0 seconds")
assert(timestampNTZWindow.child.dataType == TimestampNTZType)
assert(timestampNTZWindow.dataType == StructType(
Seq(StructField("start", TimestampNTZType), StructField("end", TimestampNTZType))))
}
}

0 comments on commit 7aa0179

Please sign in to comment.