diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala index af2b4fb92062b..156002ef58fbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala @@ -195,15 +195,6 @@ private[window] final class SlidingWindowFunctionFrame( override def write(index: Int, current: InternalRow): Unit = { var bufferUpdated = index == 0 - // Add all rows to the buffer for which the input row value is equal to or less than - // the output row upper bound. - while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) { - buffer.add(nextRow.copy()) - nextRow = WindowFunctionFrame.getNextOrNull(inputIterator) - inputHighIndex += 1 - bufferUpdated = true - } - // Drop all rows from the buffer for which the input row value is smaller than // the output row lower bound. while (!buffer.isEmpty && lbound.compare(buffer.peek(), inputLowIndex, current, index) < 0) { @@ -212,6 +203,19 @@ private[window] final class SlidingWindowFunctionFrame( bufferUpdated = true } + // Add all rows to the buffer for which the input row value is equal to or less than + // the output row upper bound. + while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) { + if (lbound.compare(nextRow, inputLowIndex, current, index) < 0) { + inputLowIndex += 1 + } else { + buffer.add(nextRow.copy()) + bufferUpdated = true + } + nextRow = WindowFunctionFrame.getNextOrNull(inputIterator) + inputHighIndex += 1 + } + // Only recalculate and update when the buffer changes. if (bufferUpdated) { processor.initialize(input.length) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index 52e4f047225de..a9f3fb355c775 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala @@ -356,6 +356,46 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext { spark.catalog.dropTempView("nums") } + test("window function: mutiple window expressions specified by range in a single expression") { + val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") + nums.createOrReplaceTempView("nums") + withTempView("nums") { + val expected = + Row(1, 1, 1, 4, null, 8, 25) :: + Row(1, 3, 4, 9, 1, 12, 24) :: + Row(1, 5, 9, 15, 4, 16, 21) :: + Row(1, 7, 16, 21, 8, 9, 16) :: + Row(1, 9, 25, 16, 12, null, 9) :: + Row(0, 2, 2, 6, null, 10, 30) :: + Row(0, 4, 6, 12, 2, 14, 28) :: + Row(0, 6, 12, 18, 6, 18, 24) :: + Row(0, 8, 20, 24, 10, 10, 18) :: + Row(0, 10, 30, 18, 14, null, 10) :: + Nil + + val actual = sql( + """ + |SELECT + | y, + | x, + | sum(x) over w1 as history_sum, + | sum(x) over w2 as period_sum1, + | sum(x) over w3 as period_sum2, + | sum(x) over w4 as period_sum3, + | sum(x) over w5 as future_sum + |FROM nums + |WINDOW + | w1 AS (PARTITION BY y ORDER BY x RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), + | w2 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING), + | w3 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 4 PRECEDING AND 2 PRECEDING ), + | w4 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 2 FOLLOWING AND 4 FOLLOWING), + | w5 AS (PARTITION BY y ORDER BY x RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) + """.stripMargin + ) + checkAnswer(actual, expected) + } + } + test("SPARK-7595: Window will cause resolve failed with self join") { checkAnswer(sql( """