Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Dec 24, 2019
1 parent 409998c commit 6d40611
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -811,14 +811,14 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr
}

/**
* This is similar with [[Limit]] except,
* This is similar with [[Limit]] except:
*
* - It does not have plans for global/local separately because currently there is only single
* implementation which initially mimics both global/local tails. See
* `org.apache.spark.sql.execution.CollectTailExec` and
* `org.apache.spark.sql.execution.CollectLimitExec`
*
* - Currently, it is always supposed to be wrapped by [[ReturnAnswer]].
* - Currently, this plan can only be a root node.
*/
case class Tail(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ case class LocalTableScanExec(
}

override def executeTail(limit: Int): Array[InternalRow] = {
var taken: Seq[InternalRow] = Seq.empty[InternalRow]
if (limit > 0) {
val slidingIter = unsafeRows.sliding(limit)
while(slidingIter.hasNext) { taken = slidingIter.next() }
}
val taken: Seq[InternalRow] = unsafeRows.takeRight(limit)
longMetric("numOutputRows").add(taken.size)
taken.toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,16 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also
* compressed.
*/
private def getByteArrayRdd(n: Int = -1, reverse: Boolean = false): RDD[(Long, Array[Byte])] = {
private def getByteArrayRdd(
n: Int = -1, reversePartitions: Boolean = false): RDD[(Long, Array[Byte])] = {
execute().mapPartitionsInternal { iter =>
var count = 0
val buffer = new Array[Byte](4 << 10) // 4K
val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
val bos = new ByteArrayOutputStream()
val out = new DataOutputStream(codec.compressedOutputStream(bos))

if (reverse) {
if (reversePartitions) {
// To collect n from the last, we should anyway read everything with keeping the n.
// Otherwise, we don't know where is the last from the iterator.
var last: Seq[UnsafeRow] = Seq.empty[UnsafeRow]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode {
override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray

override def executeTail(limit: Int): Array[InternalRow] = {
if (limit <= 0) return Array.empty[InternalRow]
val slidingIter = sideEffectResult.sliding(limit)
var taken: Seq[InternalRow] = Seq.empty[InternalRow]
while(slidingIter.hasNext) { taken = slidingIter.next() }
taken.toArray
sideEffectResult.takeRight(limit).toArray
}

protected override def doExecute(): RDD[InternalRow] = {
Expand Down Expand Up @@ -128,11 +124,7 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)
override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray

override def executeTail(limit: Int): Array[InternalRow] = {
if (limit <= 0) return Array.empty[InternalRow]
val slidingIter = sideEffectResult.sliding(limit)
var taken: Seq[InternalRow] = Seq.empty[InternalRow]
while(slidingIter.hasNext) { taken = slidingIter.next() }
taken.toArray
sideEffectResult.takeRight(limit).toArray
}

protected override def doExecute(): RDD[InternalRow] = {
Expand Down

0 comments on commit 6d40611

Please sign in to comment.