Skip to content

Commit

Permalink
SHARK-92: Queries with limits will launch one task at a time.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jun 1, 2013
1 parent a07c000 commit 89b690d
Showing 1 changed file with 36 additions and 12 deletions.
48 changes: 36 additions & 12 deletions src/main/scala/shark/execution/FileSinkOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package shark.execution

import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.reflect.BeanProperty

import org.apache.hadoop.fs.FileSystem
Expand Down Expand Up @@ -69,7 +67,11 @@ class FileSinkOperator extends TerminalOperator with Serializable {
}

override def processPartition(split: Int, iter: Iterator[_]): Iterator[_] = {

var numRows = 0

iter.foreach { row =>
numRows += 1
localHiveOp.processOp(row, 0)
}

Expand Down Expand Up @@ -103,37 +105,59 @@ class FileSinkOperator extends TerminalOperator with Serializable {
createFilesMethod.invoke(localHiveOp, fileSystemPaths)
finalPath = finalPaths(idx)
}
if (!fileSystem.exists(finalPath.getParent())) fileSystem.mkdirs(finalPath.getParent())
if (!fileSystem.exists(finalPath.getParent)) {
fileSystem.mkdirs(finalPath.getParent)
}
}
}

localHiveOp.closeOp(false)
iter
Iterator(numRows)
}

override def execute(): RDD[_] = {
val inputRdd = if (parentOperators.size == 1) executeParents().head._2 else null
val rddPreprocessed = preprocessRdd(inputRdd)
rddPreprocessed.context.runJob(
rddPreprocessed, FileSinkOperator.executeProcessFileSinkPartition(this))
val rdd = preprocessRdd(inputRdd)

parentOperators.head match {
case op: LimitOperator =>
// If there is a limit operator, let's only run one partition at a time to avoid
// launching too many tasks.
val limit = op.limit
var totalRows = 0
var nextPartition = 0
while (totalRows < limit) {
// Run one partition and get back the number of rows processed there.
totalRows += rdd.context.runJob(
rdd,
FileSinkOperator.executeProcessFileSinkPartition(this),
Seq(nextPartition),
allowLocal = false).sum
nextPartition += 1
}

case _ =>
val rows = rdd.context.runJob(rdd, FileSinkOperator.executeProcessFileSinkPartition(this))
logInfo("Total number of rows written: " + rows.sum)
}

hiveOp.jobClose(localHconf, true, new JobCloseFeedBack)
rddPreprocessed
rdd
}
}


object FileSinkOperator {
def executeProcessFileSinkPartition(operator: FileSinkOperator) = {
val op = OperatorSerializationWrapper(operator)
def writeFiles(context: TaskContext, iter: Iterator[_]): Boolean = {
def writeFiles(context: TaskContext, iter: Iterator[_]): Int = {
op.logDebug("Started executing mapPartitions for operator: " + op)
op.logDebug("Input object inspectors: " + op.objectInspectors)

op.initializeOnSlave(context)
val newPart = op.processPartition(-1, iter)
val numRows = op.processPartition(-1, iter).next().asInstanceOf[Int]
op.logDebug("Finished executing mapPartitions for operator: " + op)

true
numRows
}
writeFiles _
}
Expand Down

0 comments on commit 89b690d

Please sign in to comment.