Skip to content

Commit

Permalink
review comments #2
Browse files Browse the repository at this point in the history
  • Loading branch information
tejasapatil committed Mar 15, 2017
1 parent e714a08 commit 23acc3f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -718,18 +718,21 @@ object SQLConf {

val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.windowExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in window operator")
.intConf
.createWithDefault(4096)

val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in sort merge join operator")
.intConf
.createWithDefault(Int.MaxValue)

val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in cartesian product operator")
.intConf
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ private[window] abstract class WindowFunctionFrame {
def write(index: Int, current: InternalRow): Unit
}

object WindowFunctionFrame {
def getNextOrNull(iterator: Iterator[UnsafeRow]): UnsafeRow = {
if (iterator.hasNext) iterator.next() else null
}
}

/**
* The offset window frame calculates frames containing LEAD/LAG statements.
*
Expand Down Expand Up @@ -123,7 +129,7 @@ private[window] final class OffsetWindowFunctionFrame(

override def write(index: Int, current: InternalRow): Unit = {
if (inputIndex >= 0 && inputIndex < input.length) {
val r = if (inputIterator.hasNext) inputIterator.next() else null
val r = WindowFunctionFrame.getNextOrNull(inputIterator)
projection(r)
} else {
// Use default values since the offset row does not exist.
Expand Down Expand Up @@ -179,9 +185,7 @@ private[window] final class SlidingWindowFunctionFrame(
override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
input = rows
inputIterator = input.generateIterator()
if (inputIterator.hasNext) {
nextRow = inputIterator.next()
}
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
inputHighIndex = 0
inputLowIndex = 0
buffer.clear()
Expand All @@ -195,7 +199,7 @@ private[window] final class SlidingWindowFunctionFrame(
// the output row upper bound.
while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
buffer.add(nextRow.copy())
nextRow = if (inputIterator.hasNext) inputIterator.next() else null
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
inputHighIndex += 1
bufferUpdated = true
}
Expand Down Expand Up @@ -311,7 +315,7 @@ private[window] final class UnboundedPrecedingWindowFunctionFrame(
// the output row upper bound.
while (nextRow != null && ubound.compare(nextRow, inputIndex, current, index) <= 0) {
processor.update(nextRow)
nextRow = if (inputIterator.hasNext) inputIterator.next() else null
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
inputIndex += 1
bufferUpdated = true
}
Expand Down Expand Up @@ -368,23 +372,21 @@ private[window] final class UnboundedFollowingWindowFunctionFrame(
// the output row lower bound.
val iterator = input.generateIterator(startIndex = inputIndex)

def getNextOrNull(iterator: Iterator[UnsafeRow]): UnsafeRow = {
if (iterator.hasNext) iterator.next() else null
}

var nextRow = getNextOrNull(iterator)
var nextRow = WindowFunctionFrame.getNextOrNull(iterator)
while (nextRow != null && lbound.compare(nextRow, inputIndex, current, index) < 0) {
inputIndex += 1
bufferUpdated = true
nextRow = getNextOrNull(iterator)
nextRow = WindowFunctionFrame.getNextOrNull(iterator)
}

// Only recalculate and update when the buffer changes.
if (bufferUpdated) {
processor.initialize(input.length)
while (nextRow != null) {
if (nextRow != null) {
processor.update(nextRow)
nextRow = getNextOrNull(iterator)
}
while (iterator.hasNext) {
processor.update(iterator.next())
}
processor.evaluate(target)
}
Expand Down

0 comments on commit 23acc3f

Please sign in to comment.