Skip to content

Commit

Permalink
Rename reverse to reversePartitions
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Dec 24, 2019
1 parent 6d40611 commit 8ad431d
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -419,23 +419,23 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*
* This is modeled after `RDD.take` but never runs any job locally on the driver.
*/
def executeTake(n: Int): Array[InternalRow] = executeTake(n, reverse = false)
def executeTake(n: Int): Array[InternalRow] = executeTake(n, reversePartitions = false)

/**
* Runs this query returning the last `n` rows as an array.
*
* This is modeled after `RDD.take` but never runs any job locally on the driver.
*/
def executeTail(n: Int): Array[InternalRow] = executeTake(n, reverse = true)
def executeTail(n: Int): Array[InternalRow] = executeTake(n, reversePartitions = true)

private def executeTake(n: Int, reverse: Boolean): Array[InternalRow] = {
private def executeTake(n: Int, reversePartitions: Boolean): Array[InternalRow] = {
if (n == 0) {
return new Array[InternalRow](0)
}

val childRDD = getByteArrayRdd(n, reverse)
val childRDD = getByteArrayRdd(n, reversePartitions)

val buf = if (reverse) new ListBuffer[InternalRow] else new ArrayBuffer[InternalRow]
val buf = if (reversePartitions) new ListBuffer[InternalRow] else new ArrayBuffer[InternalRow]
val totalParts = childRDD.partitions.length
var partsScanned = 0
while (buf.length < n && partsScanned < totalParts) {
Expand All @@ -458,7 +458,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}

val parts = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val partsToScan = if (reverse) {
val partsToScan = if (reversePartitions) {
// Reverse partitions to scan. So, if parts was [1, 2, 3] in 200 partitions (0 to 199),
// it becomes [198, 197, 196].
parts.map(p => (totalParts - 1) - p)
Expand All @@ -471,7 +471,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ

var i = 0

if (reverse) {
if (reversePartitions) {
while (buf.length < n && i < res.length) {
val rows = decodeUnsafeRows(res(i)._2)
if (n - buf.length >= res(i)._1) {
Expand Down

0 comments on commit 8ad431d

Please sign in to comment.