Skip to content

Commit

Permalink
[SPARK-21414] Refine SlidingWindowFunctionFrame to avoid OOM.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In `SlidingWindowFunctionFrame`, it is now adding all rows to the buffer for which the input row value is equal to or less than the output row upper bound, then drop all rows from the buffer for which the input row value is smaller than the output row lower bound.
This could result in the buffer is very big though the window is small.
For example:
```
select a, b, sum(a)
over (partition by b order by a range between 1000000 following and 1000001 following)
from table
```
We can refine the logic and just add the qualified rows into buffer.

## How was this patch tested?
Manual test:
Run sql
`select shop, shopInfo, district, sum(revenue) over(partition by district order by revenue range between 100 following and 200 following) from revenueList limit 10`
against a table with 4  columns(shop: String, shopInfo: String, district: String, revenue: Int). The biggest partition is around 2G bytes, containing 200k lines.
Configure the executor with 2G bytes memory.
With the change in this pr, it works find. Without this change, below exception will be thrown.
```
MemoryError: Java heap space
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:62)
	at org.apache.spark.sql.execution.window.SlidingWindowFunctionFrame.write(WindowFunctionFrame.scala:201)
	at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:365)
	at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:289)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
```

Author: jinxing <jinxing6042@126.com>

Closes #18634 from jinxing64/SPARK-21414.
  • Loading branch information
jinxing authored and cloud-fan committed Jul 19, 2017
1 parent 46307b2 commit 4eb081c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,6 @@ private[window] final class SlidingWindowFunctionFrame(
override def write(index: Int, current: InternalRow): Unit = {
var bufferUpdated = index == 0

// Add all rows to the buffer for which the input row value is equal to or less than
// the output row upper bound.
while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
buffer.add(nextRow.copy())
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
inputHighIndex += 1
bufferUpdated = true
}

// Drop all rows from the buffer for which the input row value is smaller than
// the output row lower bound.
while (!buffer.isEmpty && lbound.compare(buffer.peek(), inputLowIndex, current, index) < 0) {
Expand All @@ -212,6 +203,19 @@ private[window] final class SlidingWindowFunctionFrame(
bufferUpdated = true
}

// Add all rows to the buffer for which the input row value is equal to or less than
// the output row upper bound.
while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
if (lbound.compare(nextRow, inputLowIndex, current, index) < 0) {
inputLowIndex += 1
} else {
buffer.add(nextRow.copy())
bufferUpdated = true
}
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
inputHighIndex += 1
}

// Only recalculate and update when the buffer changes.
if (bufferUpdated) {
processor.initialize(input.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,46 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext {
spark.catalog.dropTempView("nums")
}

test("window function: mutiple window expressions specified by range in a single expression") {
val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
nums.createOrReplaceTempView("nums")
withTempView("nums") {
val expected =
Row(1, 1, 1, 4, null, 8, 25) ::
Row(1, 3, 4, 9, 1, 12, 24) ::
Row(1, 5, 9, 15, 4, 16, 21) ::
Row(1, 7, 16, 21, 8, 9, 16) ::
Row(1, 9, 25, 16, 12, null, 9) ::
Row(0, 2, 2, 6, null, 10, 30) ::
Row(0, 4, 6, 12, 2, 14, 28) ::
Row(0, 6, 12, 18, 6, 18, 24) ::
Row(0, 8, 20, 24, 10, 10, 18) ::
Row(0, 10, 30, 18, 14, null, 10) ::
Nil

val actual = sql(
"""
|SELECT
| y,
| x,
| sum(x) over w1 as history_sum,
| sum(x) over w2 as period_sum1,
| sum(x) over w3 as period_sum2,
| sum(x) over w4 as period_sum3,
| sum(x) over w5 as future_sum
|FROM nums
|WINDOW
| w1 AS (PARTITION BY y ORDER BY x RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
| w2 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING),
| w3 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 4 PRECEDING AND 2 PRECEDING ),
| w4 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 2 FOLLOWING AND 4 FOLLOWING),
| w5 AS (PARTITION BY y ORDER BY x RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
""".stripMargin
)
checkAnswer(actual, expected)
}
}

test("SPARK-7595: Window will cause resolve failed with self join") {
checkAnswer(sql(
"""
Expand Down

0 comments on commit 4eb081c

Please sign in to comment.