From bb020e69ca5ad16a912adaff6ecd617380df3458 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sat, 27 Jun 2015 11:39:53 -0400 Subject: [PATCH 1/9] Major overhaul of Window operator. --- .../expressions/windowExpressions.scala | 12 + .../apache/spark/sql/execution/Window.scala | 1016 ++++++++++------- .../sql/hive/HiveDataFrameWindowSuite.scala | 2 +- 3 files changed, 626 insertions(+), 404 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index c8aa571df64f..a2e376b964b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -335,3 +335,15 @@ case class WindowExpression( override def toString: String = s"$windowFunction $windowSpec" } + +/** + * Extractor for making working with frame boundaries easier. + */ +object FrameBoundaryExtractor { + def unapply(boundary: FrameBoundary): Option[Int] = boundary match { + case CurrentRow => Some(0) + case ValuePreceding(offset) => Some(-offset) + case ValueFollowing(offset) => Some(offset) + case _ => None + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 6e127e548a12..5c09f72ea031 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -19,18 +19,48 @@ package org.apache.spark.sql.execution import java.util -import org.apache.spark.rdd.RDD +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.CompactBuffer +import scala.collection.mutable /** * :: DeveloperApi :: - * For every row, evaluates `windowExpression` containing Window Functions and attaches - * the results with other regular expressions (presented by `projectList`). - * Evert operator handles a single Window Specification, `windowSpec`. + * This class calculates and outputs (windowed) aggregates over the rows in a single (sorted) + * partition. The aggregates are calculated for each row in the group. Special processing + * instructions, frames, are used to calculate these aggregates. Frames are processed in the order + * specified in the window specification (the ORDER BY ... clause). There are four different frame + * types: + * - Entire partition: The frame is the entire partition, i.e. + * UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. For this case, window function will take all + * rows as inputs and be evaluated once. + * - Growing frame: We only add new rows into the frame, i.e. UNBOUNDED PRECEDING AND .... + * Every time we move to a new row to process, we add some rows to the frame. We do not remove + * rows from this frame. + * - Shrinking frame: We only remove rows from the frame, i.e. ... AND UNBOUNDED FOLLOWING. + * Every time we move to a new row to process, we remove some rows from the frame. We do not add + * rows to this frame. + * - Moving frame: Every time we move to a new row to process, we remove some rows from the frame + * and we add some rows to the frame. Examples are: + * 1 PRECEDING AND CURRENT ROW and 1 FOLLOWING AND 2 FOLLOWING. + * + * Different frame boundaries can be used in Growing, Shrinking and Moving frames. A frame + * boundary can be either Row or Range based. + * + * This is quite an expensive operator because every row for a single group must be in the same + * partition and partitions must be sorted according to the grouping and sort order. This can be + * infeasible in some extreme cases. The operator does not repartition or sort itself, but requires + * the planner to this. + * + * The current implementation is semi-blocking. The aggregates and final projection are calculated + * one group at a time, this is possible due to the aforementioned partitioning and ordering + * constraints. */ +@DeveloperApi case class Window( projectList: Seq[Attribute], windowExpression: Seq[NamedExpression], @@ -38,443 +68,623 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = - (projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " + + "partition, this can cause serious performance degradation.") AllTuples :: Nil - } else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil - } - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { - // The required child ordering has two parts. - // The first part is the expressions in the partition specification. - // We add these expressions to the required ordering to make sure input rows are grouped - // based on the partition specification. So, we only need to process a single partition - // at a time. - // The second part is the expressions specified in the ORDER BY cluase. - // Basically, we first use sort to group rows based on partition specifications and then sort - // Rows in a group based on the order specification. - (windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil + } else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = + Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( - unbound: WindowExpression, - windowFunction: WindowFunction, - resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => - window.collect { - case w: WindowExpression => - ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) - } - }.toArray - - private[this] val windowFrame = - windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { - val functions = new Array[WindowFunction](computedWindowExpressions.length) - var i = 0 - while (i < computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 - } - functions + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * @param frameType to evaluate. A row frame will + * @param offset with respect to the row. + * @return a bound ordering object. + */ + private[this] def createBoundOrdering(frameType: FrameType, offset: Int) = frameType match { + case RangeFrame => + // Use the entire order expression when the offset is 0. + val (exprs, current, bound) = if (offset == 0) { + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) + } + // Use only the first order expression when the offset is non-null. + else { + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + val current = newMutableProjection(expr :: Nil, child.output)() + val bound = newMutableProjection(boundExpr :: Nil, child.output)() + (sortExpr :: Nil, current, bound) + } + // Construct the ordering. + val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) => + val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)() + (SortOrder(ref, e.direction), ref) + }.unzip + val ordering = newOrdering(sortExprs, schema) + RangeBoundOrdering(ordering, current, bound) + case RowFrame => RowBoundOrdering(offset) } - // The schema of the result of all window function evaluations - private[this] val computedSchema = computedWindowExpressions.map(_.resultAttribute) + @transient + private[this] lazy val (factories, projection, numColumns) = { + // Collect all window expressions + val windowExprs = windowExpression.flatMap { + _.collect { + case e: WindowExpression => e + } + } - private[this] val computedResultMap = - computedWindowExpressions.map { w => w.unbound -> w.resultAttribute }.toMap + // Group the window expression by their processing frame. + val groupedWindowExprs = windowExprs.groupBy(_.windowSpec.frameSpecification) + + // Create factories and collect unbound expressions for each frame. + val factories = mutable.Buffer.empty[WindowFunctionFrame] + val unboundExpressions = mutable.Buffer.empty[Expression] + groupedWindowExprs.foreach { + case (frame, unboundFrameExpressions) => + // Register the ordinal. + val ordinal = unboundExpressions.size + + // Track the unbound expressions + unboundExpressions ++= unboundFrameExpressions + + // Bind the expressions. + val functions = unboundFrameExpressions.map { e => + BindReferences.bindReference(e.windowFunction, child.output) + }.toArray + + // Create the frame processor. + val factory = frame match { + // Growing Frame. + case SpecifiedWindowFrame(frameType, UnboundedPreceding, FrameBoundaryExtractor(high)) => + val uBoundOrdering = createBoundOrdering(frameType, high) + new UnboundedPrecedingWindowFunctionFrame(ordinal, functions, uBoundOrdering) + // Shrinking Frame. + case SpecifiedWindowFrame(frameType, FrameBoundaryExtractor(low), UnboundedFollowing) => + val lBoundOrdering = createBoundOrdering(frameType, low) + new UnboundedFollowingWindowFunctionFrame(ordinal, functions, lBoundOrdering) + // Moving Frame. + case SpecifiedWindowFrame(frameType, + FrameBoundaryExtractor(low), FrameBoundaryExtractor(high)) => + val lBoundOrdering = createBoundOrdering(frameType, low) + val uBoundOrdering = createBoundOrdering(frameType, high) + new SlidingWindowFunctionFrame(ordinal, functions, lBoundOrdering, uBoundOrdering) + // Entire Partition Frame. + case SpecifiedWindowFrame(_, UnboundedPreceding, UnboundedFollowing) => + new UnboundedWindowFunctionFrame(ordinal, functions) + // Error + case fr => + sys.error(s"Unsupported Frame $fr for expressions: $unboundFrameExpressions") + } + factories += factory + } - private[this] val windowExpressionResult = windowExpression.map { window => - window.transform { - case w: WindowExpression if computedResultMap.contains(w) => computedResultMap(w) + // Create the schema projection. + val unboundToAttr = unboundExpressions.map { + e => (e, AttributeReference(s"aggResult:$e", e.dataType, e.nullable)()) } + val unboundToAttrMap = unboundToAttr.toMap + val patchedWindowExpression = windowExpression.map(_.transform(unboundToAttrMap)) + val projection = newMutableProjection( + projectList ++ patchedWindowExpression, + child.output ++ unboundToAttr.map(_._2)) + + // Done + (factories.toArray, projection, unboundExpressions.size) } protected override def doExecute(): RDD[InternalRow] = { - child.execute().mapPartitions { iter => + child.execute().mapPartitions { stream => new Iterator[InternalRow] { - - // Although input rows are grouped based on windowSpec.partitionSpec, we need to - // know when we have a new partition. - // This is to manually construct an ordering that can be used to compare rows. - // TODO: We may want to have a newOrdering that takes BoundReferences. - // So, we can take advantave of code gen. - private val partitionOrdering: Ordering[InternalRow] = - RowOrdering.forSchema(windowSpec.partitionSpec.map(_.dataType)) - - // This is used to project expressions for the partition specification. - protected val partitionGenerator = - newMutableProjection(windowSpec.partitionSpec, child.output)() - - // This is ued to project expressions for the order specification. - protected val rowOrderGenerator = - newMutableProjection(windowSpec.orderSpec.map(_.child), child.output)() - - // The position of next output row in the inputRowBuffer. - var rowPosition: Int = 0 - // The number of buffered rows in the inputRowBuffer (the size of the current partition). - var partitionSize: Int = 0 - // The buffer used to buffer rows in a partition. - var inputRowBuffer: CompactBuffer[InternalRow] = _ - // The partition key of the current partition. - var currentPartitionKey: InternalRow = _ - // The partition key of next partition. - var nextPartitionKey: InternalRow = _ - // The first row of next partition. - var firstRowInNextPartition: InternalRow = _ - // Indicates if this partition is the last one in the iter. - var lastPartition: Boolean = false - - def createBoundaryEvaluator(): () => Unit = { - def findPhysicalBoundary( - boundary: FrameBoundary): () => Int = boundary match { - case UnboundedPreceding => () => 0 - case UnboundedFollowing => () => partitionSize - 1 - case CurrentRow => () => rowPosition - case ValuePreceding(value) => - () => - val newPosition = rowPosition - value - if (newPosition > 0) newPosition else 0 - case ValueFollowing(value) => - () => - val newPosition = rowPosition + value - if (newPosition < partitionSize) newPosition else partitionSize - 1 + // Get all relevant projections. + val result = projection() + val grouping = newProjection(windowSpec.partitionSpec, child.output) + + // Manage the stream and the grouping. + var nextRow: InternalRow = EmptyRow + var nextGroup: InternalRow = EmptyRow + var nextRowAvailable: Boolean = false + private[this] def fetchNextRow() { + nextRowAvailable = stream.hasNext + if (nextRowAvailable) { + nextRow = stream.next() + nextGroup = grouping(nextRow) + } else { + nextRow = EmptyRow + nextGroup = EmptyRow } - - def findLogicalBoundary( - boundary: FrameBoundary, - searchDirection: Int, - evaluator: Expression, - joinedRow: JoinedRow): () => Int = boundary match { - case UnboundedPreceding => () => 0 - case UnboundedFollowing => () => partitionSize - 1 - case other => - () => { - // CurrentRow, ValuePreceding, or ValueFollowing. - var newPosition = rowPosition + searchDirection - var stopSearch = false - // rowOrderGenerator is a mutable projection. - // We need to make a copy of the returned by rowOrderGenerator since we will - // compare searched row with this currentOrderByValue. - val currentOrderByValue = rowOrderGenerator(inputRowBuffer(rowPosition)).copy() - while (newPosition >= 0 && newPosition < partitionSize && !stopSearch) { - val r = rowOrderGenerator(inputRowBuffer(newPosition)) - stopSearch = - !(evaluator.eval(joinedRow(currentOrderByValue, r)).asInstanceOf[Boolean]) - if (!stopSearch) { - newPosition += searchDirection - } - } - newPosition -= searchDirection - - if (newPosition < 0) { - 0 - } else if (newPosition >= partitionSize) { - partitionSize - 1 - } else { - newPosition - } - } + } + fetchNextRow() + + // Manage the current partition. + var rows: CompactBuffer[InternalRow] = _ + val frames: Array[WindowFunctionFrame] = factories.map(_.copy) + val numFrames = frames.length + private[this] def fetchNextPartition() { + // Collect all the rows in the current partition. + val currentGroup = nextGroup + rows = new CompactBuffer + while (nextRowAvailable && nextGroup == currentGroup) { + rows += nextRow.copy() + fetchNextRow() } - windowFrame.frameType match { - case RowFrame => - val findStart = findPhysicalBoundary(windowFrame.frameStart) - val findEnd = findPhysicalBoundary(windowFrame.frameEnd) - () => { - frameStart = findStart() - frameEnd = findEnd() - } - case RangeFrame => - val joinedRowForBoundaryEvaluation: JoinedRow = new JoinedRow() - val orderByExpr = windowSpec.orderSpec.head - val currentRowExpr = - BoundReference(0, orderByExpr.dataType, orderByExpr.nullable) - val examedRowExpr = - BoundReference(1, orderByExpr.dataType, orderByExpr.nullable) - val differenceExpr = Abs(Subtract(currentRowExpr, examedRowExpr)) - - val frameStartEvaluator = windowFrame.frameStart match { - case CurrentRow => EqualTo(currentRowExpr, examedRowExpr) - case ValuePreceding(value) => - LessThanOrEqual(differenceExpr, Cast(Literal(value), orderByExpr.dataType)) - case ValueFollowing(value) => - GreaterThanOrEqual(differenceExpr, Cast(Literal(value), orderByExpr.dataType)) - case o => Literal(true) // This is just a dummy expression, we will not use it. - } - - val frameEndEvaluator = windowFrame.frameEnd match { - case CurrentRow => EqualTo(currentRowExpr, examedRowExpr) - case ValuePreceding(value) => - GreaterThanOrEqual(differenceExpr, Cast(Literal(value), orderByExpr.dataType)) - case ValueFollowing(value) => - LessThanOrEqual(differenceExpr, Cast(Literal(value), orderByExpr.dataType)) - case o => Literal(true) // This is just a dummy expression, we will not use it. - } - - val findStart = - findLogicalBoundary( - boundary = windowFrame.frameStart, - searchDirection = -1, - evaluator = frameStartEvaluator, - joinedRow = joinedRowForBoundaryEvaluation) - val findEnd = - findLogicalBoundary( - boundary = windowFrame.frameEnd, - searchDirection = 1, - evaluator = frameEndEvaluator, - joinedRow = joinedRowForBoundaryEvaluation) - () => { - frameStart = findStart() - frameEnd = findEnd() - } + // Setup the frames. + var i = 0 + while (i < numFrames) { + frames(i).prepare(rows) + i += 1 } + + // Setup iteration + rowIndex = 0 + rowsSize = rows.size } - val boundaryEvaluator = createBoundaryEvaluator() - // Indicates if we the specified window frame requires us to maintain a sliding frame - // (e.g. RANGES BETWEEN 1 PRECEDING AND CURRENT ROW) or the window frame - // is the entire partition (e.g. ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING). - val requireUpdateFrame: Boolean = { - def requireUpdateBoundary(boundary: FrameBoundary): Boolean = boundary match { - case UnboundedPreceding => false - case UnboundedFollowing => false - case _ => true - } + // Iteration + var rowIndex = 0 + var rowsSize = 0 + def hasNext: Boolean = rowIndex < rowsSize || nextRowAvailable - requireUpdateBoundary(windowFrame.frameStart) || - requireUpdateBoundary(windowFrame.frameEnd) - } - // The start position of the current frame in the partition. - var frameStart: Int = 0 - // The end position of the current frame in the partition. - var frameEnd: Int = -1 - // Window functions. - val functions: Array[WindowFunction] = windowFunctions() - // Buffers used to store input parameters for window functions. Because we may need to - // maintain a sliding frame, we use this buffer to avoid evaluate the parameters from - // the same row multiple times. - val windowFunctionParameterBuffers: Array[util.LinkedList[AnyRef]] = - functions.map(_ => new util.LinkedList[AnyRef]()) - - // The projection used to generate the final result rows of this operator. - private[this] val resultProjection = - newMutableProjection( - projectList ++ windowExpressionResult, - projectList ++ computedSchema)() - - // The row used to hold results of window functions. - private[this] val windowExpressionResultRow = - new GenericMutableRow(computedSchema.length) - - private[this] val joinedRow = new JoinedRow6 - - // Initialize this iterator. - initialize() - - private def initialize(): Unit = { - if (iter.hasNext) { - val currentRow = iter.next().copy() - // partitionGenerator is a mutable projection. Since we need to track nextPartitionKey, - // we are making a copy of the returned partitionKey at here. - nextPartitionKey = partitionGenerator(currentRow).copy() - firstRowInNextPartition = currentRow + val join = new JoinedRow6 + val windowFunctionResult = new GenericMutableRow(numColumns) + def next(): InternalRow = { + // Load the next partition if we need to. + if (rowIndex >= rowsSize && nextRowAvailable) { fetchNextPartition() - } else { - // The iter is an empty one. So, we set all of the following variables - // to make sure hasNext will return false. - lastPartition = true - rowPosition = 0 - partitionSize = 0 } - } - // Indicates if we will have new output row. - override final def hasNext: Boolean = { - !lastPartition || (rowPosition < partitionSize) - } - - override final def next(): InternalRow = { - if (hasNext) { - if (rowPosition == partitionSize) { - // All rows of this buffer have been consumed. - // We will move to next partition. - fetchNextPartition() - } - // Get the input row for the current output row. - val inputRow = inputRowBuffer(rowPosition) - // Get all results of the window functions for this output row. + if (rowIndex < rowsSize) { + // Get the results for the window frames. var i = 0 - while (i < functions.length) { - windowExpressionResultRow.update(i, functions(i).get(rowPosition)) + while (i < numFrames) { + frames(i).write(windowFunctionResult) i += 1 } - // Construct the output row. - val outputRow = resultProjection(joinedRow(inputRow, windowExpressionResultRow)) - // We will move to the next one. - rowPosition += 1 - if (requireUpdateFrame && rowPosition < partitionSize) { - // If we need to maintain a sliding frame and - // we will still work on this partition when next is called next time, do the update. - updateFrame() - } + // 'Merge' the input row with the window function result + join(rows(rowIndex), windowFunctionResult) + rowIndex += 1 - // Return the output row. - outputRow - } else { - // no more result - throw new NoSuchElementException - } + // Return the projection. + result(join) + } else throw new NoSuchElementException } + } + } + } +} - // Fetch the next partition. - private def fetchNextPartition(): Unit = { - // Create a new buffer for input rows. - inputRowBuffer = new CompactBuffer[InternalRow]() - // We already have the first row for this partition - // (recorded in firstRowInNextPartition). Add it back. - inputRowBuffer += firstRowInNextPartition - // Set the current partition key. - currentPartitionKey = nextPartitionKey - // Now, we will start to find all rows belonging to this partition. - // Create a variable to track if we see the next partition. - var findNextPartition = false - // The search will stop when we see the next partition or there is no - // input row left in the iter. - while (iter.hasNext && !findNextPartition) { - // Make a copy of the input row since we will put it in the buffer. - val currentRow = iter.next().copy() - // Get the partition key based on the partition specification. - // For the below compare method, we do not need to make a copy of partitionKey. - val partitionKey = partitionGenerator(currentRow) - // Check if the current row belongs the current input row. - val comparing = partitionOrdering.compare(currentPartitionKey, partitionKey) - if (comparing == 0) { - // This row is still in the current partition. - inputRowBuffer += currentRow - } else { - // The current input row is in a different partition. - findNextPartition = true - // partitionGenerator is a mutable projection. - // Since we need to track nextPartitionKey and we determine that it should be set - // as partitionKey, we are making a copy of the partitionKey at here. - nextPartitionKey = partitionKey.copy() - firstRowInNextPartition = currentRow - } - } +/** + * Function for comparing boundary values. + */ +private[execution] abstract class BoundOrdering { + def compare(input: Seq[InternalRow], inputIndex: Int, outputIndex: Int): Int +} - // We have not seen a new partition. It means that there is no new row in the - // iter. The current partition is the last partition of the iter. - if (!findNextPartition) { - lastPartition = true - } +/** + * Compare the input index to the bound of the output index. + */ +private[execution] final case class RowBoundOrdering(offset: Int) extends BoundOrdering { + override def compare(input: Seq[InternalRow], inputIndex: Int, outputIndex: Int): Int = + inputIndex - (outputIndex + offset) +} - // We have got all rows for the current partition. - // Set rowPosition to 0 (the next output row will be based on the first - // input row of this partition). - rowPosition = 0 - // The size of this partition. - partitionSize = inputRowBuffer.size - // Reset all parameter buffers of window functions. - var i = 0 - while (i < windowFunctionParameterBuffers.length) { - windowFunctionParameterBuffers(i).clear() - i += 1 - } - frameStart = 0 - frameEnd = -1 - // Create the first window frame for this partition. - // If we do not need to maintain a sliding frame, this frame will - // have the entire partition. - updateFrame() - } +/** + * Compare the value of the input index to the value bound of the output index. + */ +private[execution] final case class RangeBoundOrdering( + ordering: Ordering[InternalRow], + current: Projection, + bound: Projection) extends BoundOrdering { + override def compare(input: Seq[InternalRow], inputIndex: Int, outputIndex: Int): Int = + ordering.compare(current(input(inputIndex)), bound(input(outputIndex))) +} - /** The function used to maintain the sliding frame. */ - private def updateFrame(): Unit = { - // Based on the difference between the new frame and old frame, - // updates the buffers holding input parameters of window functions. - // We will start to prepare input parameters starting from the row - // indicated by offset in the input row buffer. - def updateWindowFunctionParameterBuffers( - numToRemove: Int, - numToAdd: Int, - offset: Int): Unit = { - // First, remove unneeded entries from the head of every buffer. - var i = 0 - while (i < numToRemove) { - var j = 0 - while (j < windowFunctionParameterBuffers.length) { - windowFunctionParameterBuffers(j).remove() - j += 1 - } - i += 1 - } - // Then, add needed entries to the tail of every buffer. - i = 0 - while (i < numToAdd) { - var j = 0 - while (j < windowFunctionParameterBuffers.length) { - // Ask the function to prepare the input parameters. - val parameters = functions(j).prepareInputParameters(inputRowBuffer(i + offset)) - windowFunctionParameterBuffers(j).add(parameters) - j += 1 - } - i += 1 - } - } +/** + * A window function calculates the results of a number of window functions for a window frame. + * Before use a frame must be prepared by passing it all the rows in the current partition. After + * preparation the update method can be called to fill the output rows. + * + * TODO How to improve performance? A few thoughts: + * - Window functions are expensive due to its distribution and ordering requirements. + * Unfortunately it is up to the Spark engine to solve this. Improvements in the form of project + * Tungsten are on the way. + * - The window frame processing bit can be improved though. But before we start doing that we + * need to see how much of the time and resources are spent on partitioning and ordering, and + * how much time and resources are spent processing the partitions. There are a couple ways to + * improve on the current situation: + * - Reduce memory footprint by performing streaming calculations. This can only be done when + * there are no Unbound/Unbounded Following calculations present. + * - Use Tungsten style memory usage. + * - Use code generation in general, and use the approach to aggregation taken in the + * GeneratedAggregate class in specific. + * + * @param ordinal of the first column written by this frame. + * @param functions to calculate the row values with. + */ +private[execution] abstract class WindowFunctionFrame( + ordinal: Int, + functions: Array[WindowFunction]) { + + /** Number of columns the window function frame is managing */ + val numColumns = functions.length + + /** + * Create a fresh thread safe copy of the frame. + * + * @return the copied frame. + */ + def copy: WindowFunctionFrame + + /** + * Create new instances of the functions. + * + * @return an array containing copies of the current window functions. + */ + protected final def copyFunctions: Array[WindowFunction] = { + val copies = new Array[WindowFunction](numColumns) + var i = 0 + while (i < numColumns) { + val copy = functions(i).newInstance() + copy.init() + copies(i) = copy + i += 1 + } + copies + } - // Record the current frame start point and end point before - // we update them. - val previousFrameStart = frameStart - val previousFrameEnd = frameEnd - boundaryEvaluator() - updateWindowFunctionParameterBuffers( - frameStart - previousFrameStart, - frameEnd - previousFrameEnd, - previousFrameEnd + 1) - // Evaluate the current frame. - evaluateCurrentFrame() - } + /** + * Prepare the frame for calculating the results for a partition. + * + * @param rows to calculate the frame results for. + */ + def prepare(rows: Seq[InternalRow]): Unit + + /** + * Write the result for the current row to the given target row. + * + * @param target row to write the result for the current row to. + */ + def write(target: GenericMutableRow): Unit + + /** Reset the current window functions. */ + protected final def reset(): Unit = { + var i = 0 + while (i < numColumns) { + functions(i).reset() + i += 1 + } + } - /** Evaluate the current window frame. */ - private def evaluateCurrentFrame(): Unit = { - var i = 0 - while (i < functions.length) { - // Reset the state of the window function. - functions(i).reset() - // Get all buffered input parameters based on rows of this window frame. - val inputParameters = windowFunctionParameterBuffers(i).toArray() - // Send these input parameters to the window function. - functions(i).batchUpdate(inputParameters) - // Ask the function to evaluate based on this window frame. - functions(i).evaluate() - i += 1 - } - } + /** Prepare an input row for processing. */ + protected final def prepare(input: InternalRow): Array[AnyRef] = { + val prepared = new Array[AnyRef](numColumns) + var i = 0 + while (i < numColumns) { + prepared(i) = functions(i).prepareInputParameters(input) + i += 1 + } + prepared + } + + /** Evaluate a prepared buffer. */ + protected final def evaluatePrepared(iterator: java.util.Iterator[Array[AnyRef]]): Unit = { + reset() + while (iterator.hasNext) { + val prepared = iterator.next() + var i = 0 + while (i < numColumns) { + functions(i).update(prepared(i)) + i += 1 } } + evaluate() + } + + /** Update an array of window functions. */ + protected final def update(input: InternalRow): Unit = { + var i = 0 + while (i < numColumns) { + val aggregate = functions(i) + val preparedInput = aggregate.prepareInputParameters(input) + aggregate.update(preparedInput) + i += 1 + } + } + + /** Evaluate the window functions. */ + protected final def evaluate(): Unit = { + var i = 0 + while (i < numColumns) { + functions(i).evaluate() + i += 1 + } + } + + /** Fill a target row with the current window function results. */ + protected final def fill(target: GenericMutableRow, rowIndex: Int): Unit = { + var i = 0 + while (i < numColumns) { + target.update(ordinal + i, functions(i).get(rowIndex)) + i += 1 + } + } +} + +/** + * The sliding window frame calculates frames with the following SQL form: + * ... BETWEEN 1 PRECEDING AND 1 FOLLOWING + * + * @param ordinal of the first column written by this frame. + * @param functions to calculate the row values with. + * @param lbound comparator used to identify the lower bound of an output row. + * @param ubound comparator used to identify the upper bound of an output row. + */ +private[execution] final class SlidingWindowFunctionFrame( + ordinal: Int, + functions: Array[WindowFunction], + lbound: BoundOrdering, + ubound: BoundOrdering) extends WindowFunctionFrame(ordinal, functions) { + + /** Rows of the partition currently being processed. */ + private[this] var input: Seq[InternalRow] = null + + /** Index of the first input row with a value greater than the upper bound of the current + * output row. */ + private[this] var inputHighIndex = 0 + + /** Index of the first input row with a value equal to or greater than the lower bound of the + * current output row. */ + private[this] var inputLowIndex = 0 + + /** Buffer used for storing prepared input for the window functions. */ + private[this] val buffer = new util.ArrayDeque[Array[AnyRef]] + + /** Index of the row we are currently writing. */ + private[this] var outputIndex = 0 + + /** Prepare the frame for calculating a new partition. Reset all variables. */ + override def prepare(rows: Seq[InternalRow]): Unit = { + input = rows + inputHighIndex = 0 + inputLowIndex = 0 + outputIndex = 0 + buffer.clear() + } + + /** Write the frame columns for the current row to the given target row. */ + override def write(target: GenericMutableRow): Unit = { + var bufferUpdated = false + + // Add all rows to the buffer for which the input row value is equal to or less than + // the output row upper bound. + while (inputHighIndex < input.size && + ubound.compare(input, inputHighIndex, outputIndex) <= 0) { + buffer.offer(prepare(input(inputHighIndex))) + inputHighIndex += 1 + bufferUpdated = true + } + + // Drop all rows from the buffer for which the input row value is smaller than + // the output row lower bound. + while (inputLowIndex < inputHighIndex && + lbound.compare(input, inputLowIndex, outputIndex) < 0) { + buffer.pop() + inputLowIndex += 1 + bufferUpdated = true + } + + // Only recalculate when the buffer changes. + if (bufferUpdated) { + evaluatePrepared(buffer.iterator()) + } + + // Write to the target. + // TODO technically this is only needed when something changes... + fill(target, outputIndex) + + // Move to the next row. + outputIndex += 1 + } + + /** Copy the frame. */ + override def copy: SlidingWindowFunctionFrame = + new SlidingWindowFunctionFrame(ordinal, copyFunctions, lbound, ubound) +} + +/** + * The unbounded window frame calculates frames with the following SQL forms: + * ... (No Frame Definition) + * ... BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + * + * Its results are the same for each and every row in the partition. This class can be seen as a + * special case of a sliding window, but is optimized for the unbound case. + * + * @param ordinal of the first column written by this frame. + * @param functions to calculate the row values with. + */ +private[execution] final class UnboundedWindowFunctionFrame( + ordinal: Int, + functions: Array[WindowFunction]) extends WindowFunctionFrame(ordinal, functions) { + + /** Index of the row we are currently writing. */ + private[this] var outputIndex = 0 + + /** Prepare the frame for calculating a new partition. Process all rows eagerly. */ + override def prepare(rows: Seq[InternalRow]): Unit = { + reset() + outputIndex = 0 + val iterator = rows.iterator + while (iterator.hasNext) { + update(iterator.next()) + } + evaluate() + } + + /** Write the frame columns for the current row to the given target row. */ + override def write(target: GenericMutableRow): Unit = { + fill(target, outputIndex) + outputIndex += 1 + } + + /** Copy the frame. */ + override def copy: UnboundedWindowFunctionFrame = + new UnboundedWindowFunctionFrame(ordinal, copyFunctions) +} + +/** + * The UnboundPreceding window frame calculates frames with the following SQL form: + * ... BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + * + * There is only an upper bound. Very common use cases are for instance running sums or counts + * (row_number). Technically this is a special case of a sliding window. However a sliding window + * has to maintain a buffer, and it must do a full evaluation everytime the buffer changes. This + * is not the case when there is no lower bound, given the additive nature of most aggregates + * streaming updates and partial evaluation suffice and no buffering is needed. + * + * @param ordinal of the first column written by this frame. + * @param functions to calculate the row values with. + * @param ubound comparator used to identify the upper bound of an output row. + */ +private[execution] final class UnboundedPrecedingWindowFunctionFrame( + ordinal: Int, + functions: Array[WindowFunction], + ubound: BoundOrdering) extends WindowFunctionFrame(ordinal, functions) { + + /** Rows of the partition currently being processed. */ + private[this] var input: Seq[InternalRow] = null + + /** Index of the first input row with a value greater than the upper bound of the current + * output row. */ + private[this] var inputIndex = 0 + + /** Index of the row we are currently writing. */ + private[this] var outputIndex = 0 + + /** Prepare the frame for calculating a new partition. */ + override def prepare(rows: Seq[InternalRow]): Unit = { + reset() + input = rows + inputIndex = 0 + outputIndex = 0 + } + + /** Write the frame columns for the current row to the given target row. */ + override def write(target: GenericMutableRow): Unit = { + var bufferUpdated = false + + // Add all rows to the aggregates for which the input row value is equal to or less than + // the output row upper bound. + while (inputIndex < input.size && ubound.compare(input, inputIndex, outputIndex) <= 0) { + update(input(inputIndex)) + inputIndex += 1 + bufferUpdated = true + } + + // Evaluate if we need to. + if (bufferUpdated) { + evaluate() + } + + // Write to the target. + fill(target, outputIndex) + + // Move to the next row. + outputIndex += 1 + } + + /** Copy the frame. */ + override def copy: UnboundedPrecedingWindowFunctionFrame = + new UnboundedPrecedingWindowFunctionFrame(ordinal, copyFunctions, ubound) +} + +/** + * The UnboundFollowing window frame calculates frames with the following SQL form: + * ... BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + * + * There is only an upper bound. This is a slightly modified version of the sliding window. The + * sliding window operator has to check if both upper and the lower bound change when a new row + * gets processed, where as the unbounded following only has to check the lower bound. + * + * This is a very expensive operator to use, O(n * (n - 1) /2), because we need to maintain a + * buffer and must do full recalculation after each row. Reverse iteration would be possible, if + * the communitativity of the used window functions can be guaranteed. + * + * @param ordinal of the first column written by this frame. + * @param functions to calculate the row values with. + * @param lbound comparator used to identify the lower bound of an output row. + */ +private[execution] final class UnboundedFollowingWindowFunctionFrame( + ordinal: Int, + functions: Array[WindowFunction], + lbound: BoundOrdering) extends WindowFunctionFrame(ordinal, functions) { + + /** Buffer used for storing prepared input for the window functions. */ + private[this] val buffer = new util.ArrayDeque[Array[AnyRef]] + + /** Rows of the partition currently being processed. */ + private[this] var input: Seq[InternalRow] = null + + /** Index of the first input row with a value equal to or greater than the lower bound of the + * current output row. */ + private[this] var inputIndex = 0 + + /** Index of the row we are currently writing. */ + private[this] var outputIndex = 0 + + /** Prepare the frame for calculating a new partition. */ + override def prepare(rows: Seq[InternalRow]): Unit = { + input = rows + inputIndex = 0 + outputIndex = 0 + buffer.clear() + val iterator = rows.iterator + while (iterator.hasNext) { + buffer.offer(prepare(iterator.next())) + } + evaluatePrepared(buffer.iterator()) + } + + /** Write the frame columns for the current row to the given target row. */ + override def write(target: GenericMutableRow): Unit = { + var bufferUpdated = false + + // Drop all rows from the buffer for which the input row value is smaller than + // the output row lower bound. + while (inputIndex < input.size && lbound.compare(input, inputIndex, outputIndex) < 0) { + buffer.pop() + inputIndex += 1 + bufferUpdated = true + } + + // Only recalculate when the buffer changes. + if (bufferUpdated) { + evaluatePrepared(buffer.iterator()) + } + + // Write to the target. + fill(target, outputIndex) + + // Move to the next row. + outputIndex += 1 } + + /** Copy the frame. */ + override def copy: UnboundedFollowingWindowFunctionFrame = + new UnboundedFollowingWindowFunctionFrame(ordinal, copyFunctions, lbound) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala index efb3f2545db8..6b871918d443 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala @@ -189,7 +189,7 @@ class HiveDataFrameWindowSuite extends QueryTest { df.select( $"key", last("value").over( - Window.partitionBy($"value").orderBy($"key").rangeBetween(1, Long.MaxValue)) + Window.partitionBy($"value").orderBy($"key").rangeBetween(-1, 0)) .equalTo("2") .as("last_v"), avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(Long.MinValue, 1)) From 19383122da11e9c3b65ce77e2f166eff8db52953 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 8 Jul 2015 17:58:07 -0400 Subject: [PATCH 2/9] Added Documentation to the createBoundOrdering methods. --- .../apache/spark/sql/execution/Window.scala | 76 ++++++++++++------- 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 5c09f72ea031..a3c71f88ab9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -86,37 +86,59 @@ case class Window( /** * Create a bound ordering object for a given frame type and offset. A bound ordering object is - * used to determine which input row lies within the frame boundaries of an output row. + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY expressions: there can be only one expression and this + * expression must have a numerical data type. An exception can be made when the offset is 0, + * because no value modification is needed, in this case multiple and non-numeric ORDER BY + * expression are allowed. * - * @param frameType to evaluate. A row frame will + * @param frameType to evaluate. This can either be Row or Range based. * @param offset with respect to the row. * @return a bound ordering object. */ - private[this] def createBoundOrdering(frameType: FrameType, offset: Int) = frameType match { - case RangeFrame => - // Use the entire order expression when the offset is 0. - val (exprs, current, bound) = if (offset == 0) { - val exprs = windowSpec.orderSpec.map(_.child) - val projection = newMutableProjection(exprs, child.output) - (windowSpec.orderSpec, projection(), projection()) - } - // Use only the first order expression when the offset is non-null. - else { - val sortExpr = windowSpec.orderSpec.head - val expr = sortExpr.child - val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) - val current = newMutableProjection(expr :: Nil, child.output)() - val bound = newMutableProjection(boundExpr :: Nil, child.output)() - (sortExpr :: Nil, current, bound) - } - // Construct the ordering. - val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) => - val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)() - (SortOrder(ref, e.direction), ref) - }.unzip - val ordering = newOrdering(sortExprs, schema) - RangeBoundOrdering(ordering, current, bound) - case RowFrame => RowBoundOrdering(offset) + private[this] def createBoundOrdering(frameType: FrameType, offset: Int) = { + frameType match { + case RangeFrame => + // Use the entire order expression when the offset is 0. + val (exprs, current, bound) = if (offset == 0) { + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) + } + // Use only the first order expression when the offset is non-null. + else if (windowSpec.orderSpec.size == 1) { + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + val current = newMutableProjection(expr :: Nil, child.output)() + val bound = newMutableProjection(boundExpr :: Nil, child.output)() + (sortExpr :: Nil, current, bound) + } + // Fail when we try to use range offsets on a window with multiple order + // expressions. + else { + sys.error("Non-Zero range offsets are not supported for windows " + + "with multiple order expressions.") + } + // Construct the ordering. + val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) => + val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)() + (SortOrder(ref, e.direction), ref) + }.unzip + val ordering = newOrdering(sortExprs, schema) + RangeBoundOrdering(ordering, current, bound) + case RowFrame => RowBoundOrdering(offset) + } } @transient From ac2f682087bcca5fb87b602cf7592c123595b51b Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 8 Jul 2015 18:04:19 -0400 Subject: [PATCH 3/9] Added a few more comments. --- .../src/main/scala/org/apache/spark/sql/execution/Window.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index a3c71f88ab9a..1c32376469ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -153,7 +153,7 @@ case class Window( // Group the window expression by their processing frame. val groupedWindowExprs = windowExprs.groupBy(_.windowSpec.frameSpecification) - // Create factories and collect unbound expressions for each frame. + // Create processors and collect unbound expressions for each frame. val factories = mutable.Buffer.empty[WindowFunctionFrame] val unboundExpressions = mutable.Buffer.empty[Expression] groupedWindowExprs.foreach { From 1fdb5582742486563dfb14e2beda69b24c729f02 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 16 Jul 2015 12:27:23 -0400 Subject: [PATCH 4/9] Changed Data In HiveDataFrameWindowSuite. --- .../apache/spark/sql/hive/HiveDataFrameWindowSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala index 6b871918d443..15b5f418f0a8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala @@ -183,13 +183,13 @@ class HiveDataFrameWindowSuite extends QueryTest { } test("aggregation and range betweens with unbounded") { - val df = Seq((1, "1"), (2, "2"), (2, "2"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value") + val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, "2")).toDF("key", "value") df.registerTempTable("window_table") checkAnswer( df.select( $"key", last("value").over( - Window.partitionBy($"value").orderBy($"key").rangeBetween(-1, 0)) + Window.partitionBy($"value").orderBy($"key").rangeBetween(-2, -1)) .equalTo("2") .as("last_v"), avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(Long.MinValue, 1)) @@ -203,7 +203,7 @@ class HiveDataFrameWindowSuite extends QueryTest { """SELECT | key, | last_value(value) OVER - | (PARTITION BY value ORDER BY key RANGE 1 preceding) == "2", + | (PARTITION BY value ORDER BY key RANGE BETWEEN 2 preceding and 1 preceding) == "2", | avg(key) OVER | (PARTITION BY value ORDER BY key RANGE BETWEEN unbounded preceding and 1 following), | avg(key) OVER From e75b76e1186d043b88cecb73975da7902d2e20a9 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 16 Jul 2015 18:13:31 -0400 Subject: [PATCH 5/9] More docs, added support for reverse sliding range frames, and some reorganization of code. --- .../apache/spark/sql/execution/Window.scala | 226 ++++++++++-------- 1 file changed, 129 insertions(+), 97 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 1c32376469ae..57031e282ba7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -49,16 +49,32 @@ import scala.collection.mutable * 1 PRECEDING AND CURRENT ROW and 1 FOLLOWING AND 2 FOLLOWING. * * Different frame boundaries can be used in Growing, Shrinking and Moving frames. A frame - * boundary can be either Row or Range based. + * boundary can be either Row or Range based: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * An offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). An offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY expressions: there can be only one expression and this + * expression must have a numerical data type. An exception can be made when the offset is 0, + * because no value modification is needed, in this case multiple and non-numeric ORDER BY + * expression are allowed. * * This is quite an expensive operator because every row for a single group must be in the same - * partition and partitions must be sorted according to the grouping and sort order. This can be - * infeasible in some extreme cases. The operator does not repartition or sort itself, but requires - * the planner to this. + * partition and partitions must be sorted according to the grouping and sort order. The operator + * requires the planner to take care of the partitioning and sorting. * - * The current implementation is semi-blocking. The aggregates and final projection are calculated - * one group at a time, this is possible due to the aforementioned partitioning and ordering - * constraints. + * The operator is semi-blocking. The window functions and aggregates are calculated one group at + * a time, the result will only be made available after the processing for the entire group has + * finished. The operator is able to process different frame configurations at the same time. This + * is done by delegating the actual frame processing (i.e. calculation of the window functions) to + * specialized classes, see [[WindowFunctionFrame]], which take care of their own frame type: + * Entire Partition, Sliding, Growing & Shrinking. Boundary evaluation is also delegated to a pair + * of specialized classes: [[RowBoundOrdering]] & [[RangeBoundOrdering]]. */ @DeveloperApi case class Window( @@ -86,53 +102,43 @@ case class Window( /** * Create a bound ordering object for a given frame type and offset. A bound ordering object is - * used to determine which input row lies within the frame boundaries of an output row. There - * are two types of boundaries that can be evaluated: - * - Row Based: A row based boundary is based on the position of the row within the partition. - * The offset indicates the number of rows above or below the current row, the frame for the - * current row starts or ends. For instance, given a row based sliding frame with a lower bound - * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from - * index 4 to index 6. - * - Range based: A range based boundary is based on the actual value of the ORDER BY - * expression(s). The offset is used to alter the value of the ORDER BY expression, for - * instance if the current order by expression has a value of 10 and the lower bound offset - * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a - * number of constraints on the ORDER BY expressions: there can be only one expression and this - * expression must have a numerical data type. An exception can be made when the offset is 0, - * because no value modification is needed, in this case multiple and non-numeric ORDER BY - * expression are allowed. + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. * * @param frameType to evaluate. This can either be Row or Range based. * @param offset with respect to the row. * @return a bound ordering object. */ - private[this] def createBoundOrdering(frameType: FrameType, offset: Int) = { + private[this] def createBoundOrdering(frameType: FrameType, offset: Int): BoundOrdering = { frameType match { case RangeFrame => - // Use the entire order expression when the offset is 0. val (exprs, current, bound) = if (offset == 0) { + // Use the entire order expression when the offset is 0. val exprs = windowSpec.orderSpec.map(_.child) val projection = newMutableProjection(exprs, child.output) (windowSpec.orderSpec, projection(), projection()) } - // Use only the first order expression when the offset is non-null. else if (windowSpec.orderSpec.size == 1) { + // Use only the first order expression when the offset is non-null. val sortExpr = windowSpec.orderSpec.head val expr = sortExpr.child - val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + // Create the projection which returns the current 'value'. val current = newMutableProjection(expr :: Nil, child.output)() + // Create the projection which returns the current 'value' modified by adding the offset. + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) val bound = newMutableProjection(boundExpr :: Nil, child.output)() (sortExpr :: Nil, current, bound) } - // Fail when we try to use range offsets on a window with multiple order - // expressions. else { sys.error("Non-Zero range offsets are not supported for windows " + "with multiple order expressions.") } - // Construct the ordering. - val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) => - val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)() + // Construct the ordering. This is used to compare the result of current value projection + // to the result of bound value projection. This is done manually because we want to use + // Code Generation (if it is enabled). + val (sortExprs, schema) = exprs.map { case e => + val ref = AttributeReference("ordExpr", e.dataType, e.nullable)() (SortOrder(ref, e.direction), ref) }.unzip val ordering = newOrdering(sortExprs, schema) @@ -141,24 +147,92 @@ case class Window( } } - @transient - private[this] lazy val (factories, projection, numColumns) = { - // Collect all window expressions + /** + * Create a frame processor. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frame boundaries. + * @param functions to process in the frame. + * @param ordinal at which the processor starts writing to the output. + * @return a frame processor. + */ + private[this] def createFrameProcessor( + frame: WindowFrame, + functions: Array[WindowFunction], + ordinal: Int): WindowFunctionFrame = frame match { + // Growing Frame. + case SpecifiedWindowFrame(frameType, UnboundedPreceding, FrameBoundaryExtractor(high)) => + val uBoundOrdering = createBoundOrdering(frameType, high) + new UnboundedPrecedingWindowFunctionFrame(ordinal, functions, uBoundOrdering) + + // Shrinking Frame. + case SpecifiedWindowFrame(frameType, FrameBoundaryExtractor(low), UnboundedFollowing) => + val lBoundOrdering = createBoundOrdering(frameType, low) + new UnboundedFollowingWindowFunctionFrame(ordinal, functions, lBoundOrdering) + + // Moving Frame: reverse processed range frame - bounds need to flipped. + case SpecifiedWindowFrame(RangeFrame, + FrameBoundaryExtractor(low), FrameBoundaryExtractor(high)) + if (low != 0 || high != 0) && windowSpec.orderSpec.head.direction == Descending => + val lBoundOrdering = createBoundOrdering(RangeFrame, high) + val uBoundOrdering = createBoundOrdering(RangeFrame, low) + new SlidingWindowFunctionFrame(ordinal, functions, lBoundOrdering, uBoundOrdering) + + // Moving Frame. + case SpecifiedWindowFrame(frameType, + FrameBoundaryExtractor(low), FrameBoundaryExtractor(high)) => + val lBoundOrdering = createBoundOrdering(frameType, low) + val uBoundOrdering = createBoundOrdering(frameType, high) + new SlidingWindowFunctionFrame(ordinal, functions, lBoundOrdering, uBoundOrdering) + + // Entire Partition Frame. + case SpecifiedWindowFrame(_, UnboundedPreceding, UnboundedFollowing) => + new UnboundedWindowFunctionFrame(ordinal, functions) + + // Error + case fr => + sys.error(s"Unsupported Frame $fr for functions: $functions") + } + + /** + * Create the resulting projection. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param expressions unbound ordered function expressions. + * @return the final resulting projection. + */ + private[this] def createResultProjection( + expressions: Seq[Expression]): MutableProjection = { + val unboundToAttr = expressions.map { + e => (e, AttributeReference("windowResult", e.dataType, e.nullable)()) + } + val unboundToAttrMap = unboundToAttr.toMap + val patchedWindowExpression = windowExpression.map(_.transform(unboundToAttrMap)) + newMutableProjection( + projectList ++ patchedWindowExpression, + child.output ++ unboundToAttr.map(_._2))() + } + + protected override def doExecute(): RDD[InternalRow] = { + // Prepare processing. + // Group the window expression by their processing frame. val windowExprs = windowExpression.flatMap { _.collect { case e: WindowExpression => e } } - // Group the window expression by their processing frame. - val groupedWindowExprs = windowExprs.groupBy(_.windowSpec.frameSpecification) - - // Create processors and collect unbound expressions for each frame. - val factories = mutable.Buffer.empty[WindowFunctionFrame] + // Create Frame processor factories and order the unbound window expressions by the frame they + // are processed in; this is the order in which their results will be written to window + // function result buffer. + val framedWindowExprs = windowExprs.groupBy(_.windowSpec.frameSpecification) + val factories = Array.ofDim[() => WindowFunctionFrame](framedWindowExprs.size) val unboundExpressions = mutable.Buffer.empty[Expression] - groupedWindowExprs.foreach { - case (frame, unboundFrameExpressions) => - // Register the ordinal. + framedWindowExprs.zipWithIndex.foreach { + case ((frame, unboundFrameExpressions), index) => + // Track the ordinal. val ordinal = unboundExpressions.size // Track the unbound expressions @@ -169,51 +243,16 @@ case class Window( BindReferences.bindReference(e.windowFunction, child.output) }.toArray - // Create the frame processor. - val factory = frame match { - // Growing Frame. - case SpecifiedWindowFrame(frameType, UnboundedPreceding, FrameBoundaryExtractor(high)) => - val uBoundOrdering = createBoundOrdering(frameType, high) - new UnboundedPrecedingWindowFunctionFrame(ordinal, functions, uBoundOrdering) - // Shrinking Frame. - case SpecifiedWindowFrame(frameType, FrameBoundaryExtractor(low), UnboundedFollowing) => - val lBoundOrdering = createBoundOrdering(frameType, low) - new UnboundedFollowingWindowFunctionFrame(ordinal, functions, lBoundOrdering) - // Moving Frame. - case SpecifiedWindowFrame(frameType, - FrameBoundaryExtractor(low), FrameBoundaryExtractor(high)) => - val lBoundOrdering = createBoundOrdering(frameType, low) - val uBoundOrdering = createBoundOrdering(frameType, high) - new SlidingWindowFunctionFrame(ordinal, functions, lBoundOrdering, uBoundOrdering) - // Entire Partition Frame. - case SpecifiedWindowFrame(_, UnboundedPreceding, UnboundedFollowing) => - new UnboundedWindowFunctionFrame(ordinal, functions) - // Error - case fr => - sys.error(s"Unsupported Frame $fr for expressions: $unboundFrameExpressions") - } - factories += factory - } - - // Create the schema projection. - val unboundToAttr = unboundExpressions.map { - e => (e, AttributeReference(s"aggResult:$e", e.dataType, e.nullable)()) + // Create the frame processor factory. + factories(index) = () => createFrameProcessor(frame, functions, ordinal) } - val unboundToAttrMap = unboundToAttr.toMap - val patchedWindowExpression = windowExpression.map(_.transform(unboundToAttrMap)) - val projection = newMutableProjection( - projectList ++ patchedWindowExpression, - child.output ++ unboundToAttr.map(_._2)) - // Done - (factories.toArray, projection, unboundExpressions.size) - } - - protected override def doExecute(): RDD[InternalRow] = { + // Start processing. child.execute().mapPartitions { stream => new Iterator[InternalRow] { + // Get all relevant projections. - val result = projection() + val result = createResultProjection(unboundExpressions) val grouping = newProjection(windowSpec.partitionSpec, child.output) // Manage the stream and the grouping. @@ -234,7 +273,7 @@ case class Window( // Manage the current partition. var rows: CompactBuffer[InternalRow] = _ - val frames: Array[WindowFunctionFrame] = factories.map(_.copy) + val frames: Array[WindowFunctionFrame] = factories.map(_()) val numFrames = frames.length private[this] def fetchNextPartition() { // Collect all the rows in the current partition. @@ -260,11 +299,11 @@ case class Window( // Iteration var rowIndex = 0 var rowsSize = 0 - def hasNext: Boolean = rowIndex < rowsSize || nextRowAvailable + override final def hasNext: Boolean = rowIndex < rowsSize || nextRowAvailable val join = new JoinedRow6 - val windowFunctionResult = new GenericMutableRow(numColumns) - def next(): InternalRow = { + val windowFunctionResult = new GenericMutableRow(unboundExpressions.size) + override final def next(): InternalRow = { // Load the next partition if we need to. if (rowIndex >= rowsSize && nextRowAvailable) { fetchNextPartition() @@ -343,6 +382,9 @@ private[execution] abstract class WindowFunctionFrame( ordinal: Int, functions: Array[WindowFunction]) { + // Make sure functions are initialized. + functions.foreach(_.init()) + /** Number of columns the window function frame is managing */ val numColumns = functions.length @@ -358,17 +400,7 @@ private[execution] abstract class WindowFunctionFrame( * * @return an array containing copies of the current window functions. */ - protected final def copyFunctions: Array[WindowFunction] = { - val copies = new Array[WindowFunction](numColumns) - var i = 0 - while (i < numColumns) { - val copy = functions(i).newInstance() - copy.init() - copies(i) = copy - i += 1 - } - copies - } + protected final def copyFunctions: Array[WindowFunction] = functions.map(_.newInstance()) /** * Prepare the frame for calculating the results for a partition. From b0654d73b1009f57234e82ad4e95bf13c1a292a4 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 16 Jul 2015 18:14:02 -0400 Subject: [PATCH 6/9] Tests for exotic frame specifications. --- .../sql/hive/execution/WindowSuite.scala | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala new file mode 100644 index 000000000000..030db4ef401e --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.{Row, QueryTest} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hive.test.TestHive.implicits._ + +/** + * Window expressions are tested extensively by the following test suites: + * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]] + * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]] + * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]] + * However these suites do not cover all possible (i.e. more exotic) settings. This suite fill + * this gap. + * + * TODO Move this class to the sql/core project when we move to Native Spark UDAFs. + */ +class WindowSuite extends QueryTest { + + test("reverse sliding range frame") { + val df = Seq( + (1, "Thin", "Cell Phone", 6000), + (2, "Normal", "Tablet", 1500), + (3, "Mini", "Tablet", 5500), + (4, "Ultra thin", "Cell Phone", 5500), + (5, "Very thin", "Cell Phone", 6000), + (6, "Big", "Tablet", 2500), + (7, "Bendable", "Cell Phone", 3000), + (8, "Foldable", "Cell Phone", 3000), + (9, "Pro", "Tablet", 4500), + (10, "Pro2", "Tablet", 6500)). + toDF("id", "product", "category", "revenue") + checkAnswer( + df.select( + $"id", + avg($"revenue").over(Window. + partitionBy($"category"). + orderBy($"revenue".desc). + rangeBetween(-2000L, 1000L)). + cast("int")), + Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) :: + Row(4, 5833) :: Row(5, 5833) :: Row(6, 2000) :: + Row(7, 3000) :: Row(8, 3000) :: Row(9, 4166) :: + Row(10, 5500) :: Nil) + } + + // This is here to illustrate the fact that reverse order also reverses bounds. + test("reverse unbounded range frame") { + val df = Seq(1, 2, 4, 3, 2, 1). + map(Tuple1.apply). + toDF("value") + val window = Window.orderBy($"value".desc) + checkAnswer( + df.select( + $"value", + sum($"value").over(window.rangeBetween(Long.MinValue, 1)), + sum($"value").over(window.rangeBetween(1, Long.MaxValue))), + Row(1, 11, 6) :: Row(2, 7, 9) :: Row(4, null, 13) :: + Row(3, 4, 13) :: Row(2, 7, 9) :: Row(1, 11, 6) :: Nil) + + } +} From 2cd2d5b515e26fb48f332aa1fa66cf247b1c5a25 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 16 Jul 2015 22:12:50 -0400 Subject: [PATCH 7/9] Corrected reverse range frame processing. --- .../org/apache/spark/sql/execution/Window.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 57031e282ba7..4fd94a46b8df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -125,8 +125,11 @@ case class Window( val expr = sortExpr.child // Create the projection which returns the current 'value'. val current = newMutableProjection(expr :: Nil, child.output)() + // Flip the sign of the offset when processing the order is descending + val boundOffset = if (sortExpr.direction == Descending) -offset + else offset // Create the projection which returns the current 'value' modified by adding the offset. - val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + val boundExpr = Add(expr, Cast(Literal.create(boundOffset, IntegerType), expr.dataType)) val bound = newMutableProjection(boundExpr :: Nil, child.output)() (sortExpr :: Nil, current, bound) } @@ -171,14 +174,6 @@ case class Window( val lBoundOrdering = createBoundOrdering(frameType, low) new UnboundedFollowingWindowFunctionFrame(ordinal, functions, lBoundOrdering) - // Moving Frame: reverse processed range frame - bounds need to flipped. - case SpecifiedWindowFrame(RangeFrame, - FrameBoundaryExtractor(low), FrameBoundaryExtractor(high)) - if (low != 0 || high != 0) && windowSpec.orderSpec.head.direction == Descending => - val lBoundOrdering = createBoundOrdering(RangeFrame, high) - val uBoundOrdering = createBoundOrdering(RangeFrame, low) - new SlidingWindowFunctionFrame(ordinal, functions, lBoundOrdering, uBoundOrdering) - // Moving Frame. case SpecifiedWindowFrame(frameType, FrameBoundaryExtractor(low), FrameBoundaryExtractor(high)) => From 2eb3b33c6eae6badaef9aa323d1b39ca3477302c Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 16 Jul 2015 22:13:06 -0400 Subject: [PATCH 8/9] Corrected reverse range frame processing. --- .../sql/hive/execution/WindowSuite.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala index 030db4ef401e..a089d0d16519 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala @@ -47,21 +47,21 @@ class WindowSuite extends QueryTest { (9, "Pro", "Tablet", 4500), (10, "Pro2", "Tablet", 6500)). toDF("id", "product", "category", "revenue") + val window = Window. + partitionBy($"category"). + orderBy($"revenue".desc). + rangeBetween(-2000L, 1000L) checkAnswer( df.select( $"id", - avg($"revenue").over(Window. - partitionBy($"category"). - orderBy($"revenue".desc). - rangeBetween(-2000L, 1000L)). - cast("int")), - Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) :: - Row(4, 5833) :: Row(5, 5833) :: Row(6, 2000) :: - Row(7, 3000) :: Row(8, 3000) :: Row(9, 4166) :: - Row(10, 5500) :: Nil) + avg($"revenue").over(window).cast("int")), + Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) :: + Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) :: + Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) :: + Row(10, 6000) :: Nil) } - // This is here to illustrate the fact that reverse order also reverses bounds. + // This is here to illustrate the fact that reverse order also reverses offsets. test("reverse unbounded range frame") { val df = Seq(1, 2, 4, 3, 2, 1). map(Tuple1.apply). @@ -72,8 +72,8 @@ class WindowSuite extends QueryTest { $"value", sum($"value").over(window.rangeBetween(Long.MinValue, 1)), sum($"value").over(window.rangeBetween(1, Long.MaxValue))), - Row(1, 11, 6) :: Row(2, 7, 9) :: Row(4, null, 13) :: - Row(3, 4, 13) :: Row(2, 7, 9) :: Row(1, 11, 6) :: Nil) + Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) :: + Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil) } } From 3bfdc491a3c1cad3cf07585388cc448b04bacd98 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sat, 18 Jul 2015 19:58:52 -0400 Subject: [PATCH 9/9] Fixed Perfomance Regression for Shrinking Window Frames (+Rebase) --- .../apache/spark/sql/execution/Window.scala | 77 +++++++++++-------- 1 file changed, 44 insertions(+), 33 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 4fd94a46b8df..a054f52b8b48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -402,7 +402,7 @@ private[execution] abstract class WindowFunctionFrame( * * @param rows to calculate the frame results for. */ - def prepare(rows: Seq[InternalRow]): Unit + def prepare(rows: CompactBuffer[InternalRow]): Unit /** * Write the result for the current row to the given target row. @@ -431,7 +431,7 @@ private[execution] abstract class WindowFunctionFrame( prepared } - /** Evaluate a prepared buffer. */ + /** Evaluate a prepared buffer (iterator). */ protected final def evaluatePrepared(iterator: java.util.Iterator[Array[AnyRef]]): Unit = { reset() while (iterator.hasNext) { @@ -445,6 +445,23 @@ private[execution] abstract class WindowFunctionFrame( evaluate() } + /** Evaluate a prepared buffer (array). */ + protected final def evaluatePrepared(prepared: Array[Array[AnyRef]], + fromIndex: Int, toIndex: Int): Unit = { + var i = 0 + while (i < numColumns) { + val function = functions(i) + function.reset() + var j = fromIndex + while (j < toIndex) { + function.update(prepared(j)(i)) + j += 1 + } + function.evaluate() + i += 1 + } + } + /** Update an array of window functions. */ protected final def update(input: InternalRow): Unit = { var i = 0 @@ -491,7 +508,7 @@ private[execution] final class SlidingWindowFunctionFrame( ubound: BoundOrdering) extends WindowFunctionFrame(ordinal, functions) { /** Rows of the partition currently being processed. */ - private[this] var input: Seq[InternalRow] = null + private[this] var input: CompactBuffer[InternalRow] = null /** Index of the first input row with a value greater than the upper bound of the current * output row. */ @@ -508,7 +525,7 @@ private[execution] final class SlidingWindowFunctionFrame( private[this] var outputIndex = 0 /** Prepare the frame for calculating a new partition. Reset all variables. */ - override def prepare(rows: Seq[InternalRow]): Unit = { + override def prepare(rows: CompactBuffer[InternalRow]): Unit = { input = rows inputHighIndex = 0 inputLowIndex = 0 @@ -518,7 +535,7 @@ private[execution] final class SlidingWindowFunctionFrame( /** Write the frame columns for the current row to the given target row. */ override def write(target: GenericMutableRow): Unit = { - var bufferUpdated = false + var bufferUpdated = outputIndex == 0 // Add all rows to the buffer for which the input row value is equal to or less than // the output row upper bound. @@ -538,15 +555,12 @@ private[execution] final class SlidingWindowFunctionFrame( bufferUpdated = true } - // Only recalculate when the buffer changes. + // Only recalculate and update when the buffer changes. if (bufferUpdated) { evaluatePrepared(buffer.iterator()) + fill(target, outputIndex) } - // Write to the target. - // TODO technically this is only needed when something changes... - fill(target, outputIndex) - // Move to the next row. outputIndex += 1 } @@ -575,7 +589,7 @@ private[execution] final class UnboundedWindowFunctionFrame( private[this] var outputIndex = 0 /** Prepare the frame for calculating a new partition. Process all rows eagerly. */ - override def prepare(rows: Seq[InternalRow]): Unit = { + override def prepare(rows: CompactBuffer[InternalRow]): Unit = { reset() outputIndex = 0 val iterator = rows.iterator @@ -616,7 +630,7 @@ private[execution] final class UnboundedPrecedingWindowFunctionFrame( ubound: BoundOrdering) extends WindowFunctionFrame(ordinal, functions) { /** Rows of the partition currently being processed. */ - private[this] var input: Seq[InternalRow] = null + private[this] var input: CompactBuffer[InternalRow] = null /** Index of the first input row with a value greater than the upper bound of the current * output row. */ @@ -626,7 +640,7 @@ private[execution] final class UnboundedPrecedingWindowFunctionFrame( private[this] var outputIndex = 0 /** Prepare the frame for calculating a new partition. */ - override def prepare(rows: Seq[InternalRow]): Unit = { + override def prepare(rows: CompactBuffer[InternalRow]): Unit = { reset() input = rows inputIndex = 0 @@ -635,7 +649,7 @@ private[execution] final class UnboundedPrecedingWindowFunctionFrame( /** Write the frame columns for the current row to the given target row. */ override def write(target: GenericMutableRow): Unit = { - var bufferUpdated = false + var bufferUpdated = outputIndex == 0 // Add all rows to the aggregates for which the input row value is equal to or less than // the output row upper bound. @@ -645,14 +659,12 @@ private[execution] final class UnboundedPrecedingWindowFunctionFrame( bufferUpdated = true } - // Evaluate if we need to. + // Only recalculate and update when the buffer changes. if (bufferUpdated) { evaluate() + fill(target, outputIndex) } - // Write to the target. - fill(target, outputIndex) - // Move to the next row. outputIndex += 1 } @@ -684,10 +696,10 @@ private[execution] final class UnboundedFollowingWindowFunctionFrame( lbound: BoundOrdering) extends WindowFunctionFrame(ordinal, functions) { /** Buffer used for storing prepared input for the window functions. */ - private[this] val buffer = new util.ArrayDeque[Array[AnyRef]] + private[this] var buffer: Array[Array[AnyRef]] = _ /** Rows of the partition currently being processed. */ - private[this] var input: Seq[InternalRow] = null + private[this] var input: CompactBuffer[InternalRow] = null /** Index of the first input row with a value equal to or greater than the lower bound of the * current output row. */ @@ -697,38 +709,37 @@ private[execution] final class UnboundedFollowingWindowFunctionFrame( private[this] var outputIndex = 0 /** Prepare the frame for calculating a new partition. */ - override def prepare(rows: Seq[InternalRow]): Unit = { + override def prepare(rows: CompactBuffer[InternalRow]): Unit = { input = rows inputIndex = 0 outputIndex = 0 - buffer.clear() - val iterator = rows.iterator - while (iterator.hasNext) { - buffer.offer(prepare(iterator.next())) + val size = input.size + buffer = Array.ofDim(size) + var i = 0 + while (i < size) { + buffer(i) = prepare(input(i)) + i += 1 } - evaluatePrepared(buffer.iterator()) + evaluatePrepared(buffer, 0, buffer.length) } /** Write the frame columns for the current row to the given target row. */ override def write(target: GenericMutableRow): Unit = { - var bufferUpdated = false + var bufferUpdated = outputIndex == 0 // Drop all rows from the buffer for which the input row value is smaller than // the output row lower bound. while (inputIndex < input.size && lbound.compare(input, inputIndex, outputIndex) < 0) { - buffer.pop() inputIndex += 1 bufferUpdated = true } - // Only recalculate when the buffer changes. + // Only recalculate and update when the buffer changes. if (bufferUpdated) { - evaluatePrepared(buffer.iterator()) + evaluatePrepared(buffer, inputIndex, buffer.length) + fill(target, outputIndex) } - // Write to the target. - fill(target, outputIndex) - // Move to the next row. outputIndex += 1 }