From c3d349bdb6625e5e75768f5e7a0079a5f666c528 Mon Sep 17 00:00:00 2001 From: Vladimir Golubev Date: Mon, 1 Jun 2026 10:59:25 +0000 Subject: [PATCH] [SPARK-57193][SQL] Refactor some helpers out of TimeWindowResolution --- .../analysis/TimeWindowResolution.scala | 64 +++++++++++-------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala index 8dbe6ed44d1cd..63ff894c48d4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala @@ -35,10 +35,10 @@ object TimeWindowResolution { final val WINDOW_COL_NAME = "window" final val SESSION_COL_NAME = "session_window" - private final val WINDOW_START = "start" - private final val WINDOW_END = "end" - private final val SESSION_START = "start" - private final val SESSION_END = "end" + final val WINDOW_START = "start" + final val WINDOW_END = "end" + final val SESSION_START = "start" + final val SESSION_END = "end" /** * Synthesizes the [[Project]]/[[Expand]]+[[Filter]] sub-plan for a resolved [[TimeWindow]] and @@ -90,8 +90,7 @@ object TimeWindowResolution { Project(windowStruct +: child.output, Filter(filterExpr, child)) } else { - val overlappingWindows = - math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt + val overlappingWindows = overlappingWindowCount(window) val windows = Seq.tabulate(overlappingWindows)(i => getWindow(i, window.timeColumn.dataType)) @@ -165,12 +164,7 @@ object TimeWindowResolution { val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)( exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata)) - val filterByTimeRange = if (gapDuration.foldable) { - val interval = gapDuration.eval().asInstanceOf[CalendarInterval] - interval == null || interval.months + interval.days + interval.microseconds <= 0 - } else { - true - } + val filterByTimeRange = sessionFilterByTimeRange(gapDuration) // As same as tumbling window, we add a filter to filter out nulls. // And we also filter out events with negative or zero or invalid gap duration. @@ -222,19 +216,7 @@ object TimeWindowResolution { val attr = AttributeReference(colName, windowTime.dataType, metadata = newMetadata)() - // NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as - // it is, it is going to be bound to the different window even if we apply the same window - // spec. Decrease 1 microsecond from window.end to let the window_time be bound to the - // correct window range. - val subtractExpr = - PreciseTimestampConversion( - Subtract(PreciseTimestampConversion( - GetStructField(windowTime.windowColumn, 1), - windowTime.dataType, LongType), Literal(1L)), - LongType, - windowTime.dataType) - - val newColumn = Alias(subtractExpr, colName)( + val newColumn = Alias(windowTimeExtractionExpression(windowTime), colName)( exprId = attr.exprId, explicitMetadata = Some(newMetadata)) windowTime -> (attr, newColumn) @@ -250,4 +232,36 @@ object TimeWindowResolution { (windowTimeToAttr, newChild) } + + /** + * Builds the expression extracting a [[WindowTime]]'s timestamp: the last microsecond of the + * source window (`window.end - 1us`). `window.end` is the exclusive upper bound, so using it + * as-is would bind the result to the next window under the same window spec. + */ + def windowTimeExtractionExpression(windowTime: WindowTime): Expression = + PreciseTimestampConversion( + Subtract( + PreciseTimestampConversion( + GetStructField(windowTime.windowColumn, 1), + windowTime.dataType, + LongType), + Literal(1L)), + LongType, + windowTime.dataType) + + /** Number of overlapping sliding windows a single row can fall into. */ + def overlappingWindowCount(window: TimeWindow): Int = + math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt + + /** + * Whether the session-window filter must also drop empty windows (`end <= start`): true when the + * gap is non-foldable (unknown at plan time) or a foldable gap is null or non-positive. + */ + def sessionFilterByTimeRange(gapDuration: Expression): Boolean = + if (gapDuration.foldable) { + val interval = gapDuration.eval().asInstanceOf[CalendarInterval] + interval == null || interval.months + interval.days + interval.microseconds <= 0 + } else { + true + } }