Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ object TimeWindowResolution {
final val WINDOW_COL_NAME = "window"
final val SESSION_COL_NAME = "session_window"

private final val WINDOW_START = "start"
private final val WINDOW_END = "end"
private final val SESSION_START = "start"
private final val SESSION_END = "end"
final val WINDOW_START = "start"
final val WINDOW_END = "end"
final val SESSION_START = "start"
final val SESSION_END = "end"

/**
* Synthesizes the [[Project]]/[[Expand]]+[[Filter]] sub-plan for a resolved [[TimeWindow]] and
Expand Down Expand Up @@ -90,8 +90,7 @@ object TimeWindowResolution {
Project(windowStruct +: child.output,
Filter(filterExpr, child))
} else {
val overlappingWindows =
math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
val overlappingWindows = overlappingWindowCount(window)
val windows =
Seq.tabulate(overlappingWindows)(i =>
getWindow(i, window.timeColumn.dataType))
Expand Down Expand Up @@ -165,12 +164,7 @@ object TimeWindowResolution {
val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))

val filterByTimeRange = if (gapDuration.foldable) {
val interval = gapDuration.eval().asInstanceOf[CalendarInterval]
interval == null || interval.months + interval.days + interval.microseconds <= 0
} else {
true
}
val filterByTimeRange = sessionFilterByTimeRange(gapDuration)

// As same as tumbling window, we add a filter to filter out nulls.
// And we also filter out events with negative or zero or invalid gap duration.
Expand Down Expand Up @@ -222,19 +216,7 @@ object TimeWindowResolution {

val attr = AttributeReference(colName, windowTime.dataType, metadata = newMetadata)()

// NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as
// it is, it is going to be bound to the different window even if we apply the same window
// spec. Decrease 1 microsecond from window.end to let the window_time be bound to the
// correct window range.
val subtractExpr =
PreciseTimestampConversion(
Subtract(PreciseTimestampConversion(
GetStructField(windowTime.windowColumn, 1),
windowTime.dataType, LongType), Literal(1L)),
LongType,
windowTime.dataType)

val newColumn = Alias(subtractExpr, colName)(
val newColumn = Alias(windowTimeExtractionExpression(windowTime), colName)(
exprId = attr.exprId, explicitMetadata = Some(newMetadata))

windowTime -> (attr, newColumn)
Expand All @@ -250,4 +232,36 @@ object TimeWindowResolution {

(windowTimeToAttr, newChild)
}

/**
* Builds the expression extracting a [[WindowTime]]'s timestamp: the last microsecond of the
* source window (`window.end - 1us`). `window.end` is the exclusive upper bound, so using it
* as-is would bind the result to the next window under the same window spec.
*/
def windowTimeExtractionExpression(windowTime: WindowTime): Expression =
PreciseTimestampConversion(
Subtract(
PreciseTimestampConversion(
GetStructField(windowTime.windowColumn, 1),
windowTime.dataType,
LongType),
Literal(1L)),
LongType,
windowTime.dataType)

/** Number of overlapping sliding windows a single row can fall into. */
def overlappingWindowCount(window: TimeWindow): Int =
math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt

/**
* Whether the session-window filter must also drop empty windows (`end <= start`): true when the
* gap is non-foldable (unknown at plan time) or a foldable gap is null or non-positive.
*/
def sessionFilterByTimeRange(gapDuration: Expression): Boolean =
if (gapDuration.foldable) {
val interval = gapDuration.eval().asInstanceOf[CalendarInterval]
interval == null || interval.months + interval.days + interval.microseconds <= 0
} else {
true
}
}