Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,19 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime}
import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.expressions.{SessionWindow, TimeWindow, WindowTime}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW, WINDOW_TIME}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType, Metadata, MetadataBuilder, StructType}
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.sql.types.StructType

/**
* Maps a time column to multiple time windows using the Expand operator. Since it's non-trivial to
* figure out how many windows a time column can map to, we over-estimate the number of windows and
* filter out the rows where the time column is not inside the time window.
*/
object TimeWindowing extends Rule[LogicalPlan] {
import org.apache.spark.sql.catalyst.dsl.expressions._

private final val WINDOW_COL_NAME = "window"
private final val WINDOW_START = "start"
private final val WINDOW_END = "end"

/**
* Generates the logical plan for generating window ranges on a timestamp column. Without
Expand Down Expand Up @@ -93,80 +85,13 @@ object TimeWindowing extends Rule[LogicalPlan] {
case t: TimeWindow => t.copy(timeColumn = WindowTime(window.timeColumn))
}
} else {
val metadata = window.timeColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}

val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.putBoolean(TimeWindow.marker, true)
.build()

def getWindow(i: Int, dataType: DataType): Expression = {
val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType)
val remainder = (timestamp - window.startTime) % window.slideDuration
val lastStart = timestamp - CaseWhen(Seq((LessThan(remainder, 0),
remainder + window.slideDuration)), Some(remainder))
val windowStart = lastStart - i * window.slideDuration
val windowEnd = windowStart + window.windowDuration

// We make sure value fields are nullable since the dataType of TimeWindow defines them
// as nullable.
CreateNamedStruct(
Literal(WINDOW_START) ::
PreciseTimestampConversion(windowStart, LongType, dataType).castNullable() ::
Literal(WINDOW_END) ::
PreciseTimestampConversion(windowEnd, LongType, dataType).castNullable() ::
Nil)
}

val windowAttr = AttributeReference(
WINDOW_COL_NAME, window.dataType, metadata = newMetadata)()

if (window.windowDuration == window.slideDuration) {
val windowStruct = Alias(getWindow(0, window.timeColumn.dataType), WINDOW_COL_NAME)(
exprId = windowAttr.exprId, explicitMetadata = Some(newMetadata))

val replacedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
}

// For backwards compatibility we add a filter to filter out nulls
val filterExpr = IsNotNull(window.timeColumn)
val (windowAttr, newChild) =
TimeWindowResolution.buildTimeWindowRewrite(window, child)

replacedPlan.withNewChildren(
Project(windowStruct +: child.output,
Filter(filterExpr, child)) :: Nil)
} else {
val overlappingWindows =
math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
val windows =
Seq.tabulate(overlappingWindows)(i =>
getWindow(i, window.timeColumn.dataType))

val projections = windows.map(_ +: child.output)

// When the condition windowDuration % slideDuration = 0 is fulfilled,
// the estimation of the number of windows becomes exact one,
// which means all produced windows are valid.
val filterExpr =
if (window.windowDuration % window.slideDuration == 0) {
IsNotNull(window.timeColumn)
} else {
window.timeColumn >= windowAttr.getField(WINDOW_START) &&
window.timeColumn < windowAttr.getField(WINDOW_END)
}

val substitutedPlan = Filter(filterExpr,
Expand(projections, windowAttr +: child.output, child))

val renamedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
}

renamedPlan.withNewChildren(substitutedPlan :: Nil)
val replacedPlan = p.transformExpressions {
case _: TimeWindow => windowAttr
}
replacedPlan.withNewChildren(newChild :: Nil)
}
} else if (numWindowExpr > 1) {
throw QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
Expand All @@ -178,11 +103,6 @@ object TimeWindowing extends Rule[LogicalPlan] {

/** Maps a time column to a session window. */
object SessionWindowing extends Rule[LogicalPlan] {
import org.apache.spark.sql.catalyst.dsl.expressions._

private final val SESSION_COL_NAME = "session_window"
private final val SESSION_START = "start"
private final val SESSION_END = "end"

/**
* Generates the logical plan for generating session window on a timestamp column.
Expand Down Expand Up @@ -211,73 +131,17 @@ object SessionWindowing extends Rule[LogicalPlan] {
val session = sessionExpressions.head

if (StructType.acceptsType(session.timeColumn.dataType)) {
p transformExpressions {
p.transformExpressions {
case t: SessionWindow => t.copy(timeColumn = WindowTime(session.timeColumn))
}
} else {
val metadata = session.timeColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}

val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.putBoolean(SessionWindow.marker, true)
.build()

val sessionAttr = AttributeReference(
SESSION_COL_NAME, session.dataType, metadata = newMetadata)()
val (sessionAttr, newChild) =
TimeWindowResolution.buildSessionWindowRewrite(session, child)

val sessionStart =
PreciseTimestampConversion(session.timeColumn, session.timeColumn.dataType, LongType)
val gapDuration = session.gapDuration match {
case expr if expr.dataType == CalendarIntervalType =>
expr
case expr if Cast.canCast(expr.dataType, CalendarIntervalType) =>
Cast(expr, CalendarIntervalType)
case other =>
throw QueryCompilationErrors.sessionWindowGapDurationDataTypeError(other.dataType)
val replacedPlan = p.transformExpressions {
case _: SessionWindow => sessionAttr
}
val sessionEnd = PreciseTimestampConversion(session.timeColumn + gapDuration,
session.timeColumn.dataType, LongType)

// We make sure value fields are nullable since the dataType of SessionWindow defines them
// as nullable.
val literalSessionStruct = CreateNamedStruct(
Literal(SESSION_START) ::
PreciseTimestampConversion(sessionStart, LongType, session.timeColumn.dataType)
.castNullable() ::
Literal(SESSION_END) ::
PreciseTimestampConversion(sessionEnd, LongType, session.timeColumn.dataType)
.castNullable() ::
Nil)

val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))

val replacedPlan = p transformExpressions {
case s: SessionWindow => sessionAttr
}

val filterByTimeRange = if (gapDuration.foldable) {
val interval = gapDuration.eval().asInstanceOf[CalendarInterval]
interval == null || interval.months + interval.days + interval.microseconds <= 0
} else {
true
}

// As same as tumbling window, we add a filter to filter out nulls.
// And we also filter out events with negative or zero or invalid gap duration.
val filterExpr = if (filterByTimeRange) {
IsNotNull(session.timeColumn) &&
(sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START))
} else {
IsNotNull(session.timeColumn)
}

replacedPlan.withNewChildren(
Filter(filterExpr,
Project(sessionStruct +: child.output, child)) :: Nil)
replacedPlan.withNewChildren(newChild :: Nil)
}
} else if (numWindowExpr > 1) {
throw QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
Expand All @@ -292,7 +156,7 @@ object SessionWindowing extends Rule[LogicalPlan] {
* window column generated as the output of the window aggregating operators. The
* window column is of type struct { start: TimestampType, end: TimestampType }.
* The correct representative event time of a window is ``window.end - 1``.
* */
*/
object ResolveWindowTime extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsPattern(WINDOW_TIME), ruleId) {
Expand All @@ -306,56 +170,13 @@ object ResolveWindowTime extends Rule[LogicalPlan] {
}

if (windowTimeExpressions.nonEmpty && allWindowTimeExprsResolved) {
val windowTimeToAttrAndNewColumn = windowTimeExpressions.map { windowTime =>
val metadata = windowTime.windowColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}

if (!metadata.contains(TimeWindow.marker) &&
!metadata.contains(SessionWindow.marker)) {
throw new ExtendedAnalysisException(
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3101",
messageParameters = Map("windowTime" -> windowTime.toString)),
plan = p)
}

val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.remove(TimeWindow.marker)
.remove(SessionWindow.marker)
.build()

val colName = windowTime.sql
val (windowTimeToAttr, newChild) =
TimeWindowResolution.buildWindowTimeRewrite(windowTimeExpressions, child, p)

val attr = AttributeReference(colName, windowTime.dataType, metadata = newMetadata)()

// NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as
// it is, it is going to be bound to the different window even if we apply the same window
// spec. Decrease 1 microsecond from window.end to let the window_time be bound to the
// correct window range.
val subtractExpr =
PreciseTimestampConversion(
Subtract(PreciseTimestampConversion(
GetStructField(windowTime.windowColumn, 1),
windowTime.dataType, LongType), Literal(1L)),
LongType,
windowTime.dataType)

val newColumn = Alias(subtractExpr, colName)(
exprId = attr.exprId, explicitMetadata = Some(newMetadata))

windowTime -> (attr, newColumn)
}.toMap

val replacedPlan = p transformExpressions {
case w: WindowTime => windowTimeToAttrAndNewColumn(w)._1
val replacedPlan = p.transformExpressions {
case w: WindowTime => windowTimeToAttr(w)
}

val newColumnsToAdd = windowTimeToAttrAndNewColumn.values.map(_._2)
replacedPlan.withNewChildren(
Project(newColumnsToAdd ++: child.output, child) :: Nil)
replacedPlan.withNewChildren(newChild :: Nil)
} else {
p // Return unchanged. Analyzer will throw exception later
}
Expand Down
Loading