Skip to content

Commit

Permalink
[SPARK-28213][SQL][FOLLOWUP] code cleanup and bug fix for columnar ex…
Browse files Browse the repository at this point in the history
…ecution framework

## What changes were proposed in this pull request?

I did a post-hoc review of #25008 , and would like to propose some cleanups/fixes/improvements:

1. Do not track the scanTime metrics in `ColumnarToRowExec`. This metrics is specific to file scan, and doesn't make sense for a general batch-to-row operator.
2. Because of 2, we need to track scanTime when building RDDs in the file scan node.
3. use `RDD#mapPartitionsInternal` instead of `flatMap` in several places, as `mapPartitionsInternal` is created for Spark SQL and we use it in almost all the SQL operators.
4. Add `limitNotReachedCond` in `ColumnarToRowExec`. This was in the `ColumnarBatchScan` before and is critical for performance.
5. Clear the relationship between codegen stage and columnar stage. The whole-stage-codegen framework is completely row-based, so these 2 kinds of stages can NEVER overlap. When they are adjacent, it's either a `RowToColumnarExec` above `WholeStageExec`, or a `ColumnarToRowExec` above the `InputAdapter`.
6. Reuse the `ColumnarBatch` in `RowToColumnarExec`. We don't need to create a new one every time, just need to reset it.
7. Do not skip testing full scan node in `LogicalPlanTagInSparkPlanSuite`
8. Add back the removed tests in `WholeStageCodegenSuite`.

## How was this patch tested?

existing tests

Closes #25264 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed Aug 6, 2019
1 parent 6fb79af commit 03e3006
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 148 deletions.
116 changes: 53 additions & 63 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
Expand Up @@ -57,40 +57,38 @@ class ColumnarRule {
* [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and
* [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
*/
case class ColumnarToRowExec(child: SparkPlan)
extends UnaryExecNode with CodegenSupport {
case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
assert(child.supportsColumnar)

override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = child.outputPartitioning

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

// `ColumnarToRowExec` processes the input RDD directly, which is kind of a leaf node in the
// codegen stage and needs to do the limit check.
protected override def canCheckLimitNotReached: Boolean = true

override lazy val metrics: Map[String, SQLMetric] = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches")
)

override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val scanTime = longMetric("scanTime")
// UnsafeProjection is not serializable so do it on the executor side, which is why it is lazy
@transient lazy val outputProject = UnsafeProjection.create(output, output)
val batches = child.executeColumnar()
batches.flatMap(batch => {
val batchStartNs = System.nanoTime()
numInputBatches += 1
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
numOutputRows += batch.numRows()
val ret = batch.rowIterator().asScala
scanTime += ((System.nanoTime() - batchStartNs) / (1000 * 1000))
ret.map(outputProject)
})
// This avoids calling `output` in the RDD closure, so that we don't need to include the entire
// plan (this) in the closure.
val localOutput = this.output
child.executeColumnar().mapPartitionsInternal { batches =>
val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
batches.flatMap { batch =>
numInputBatches += 1
numOutputRows += batch.numRows()
batch.rowIterator().asScala.map(toUnsafe)
}
}
}

/**
Expand Down Expand Up @@ -136,9 +134,6 @@ case class ColumnarToRowExec(child: SparkPlan)
// metrics
val numOutputRows = metricTerm(ctx, "numOutputRows")
val numInputBatches = metricTerm(ctx, "numInputBatches")
val scanTimeMetric = metricTerm(ctx, "scanTime")
val scanTimeTotalNs =
ctx.addMutableState(CodeGenerator.JAVA_LONG, "scanTime") // init as scanTime = 0

val columnarBatchClz = classOf[ColumnarBatch].getName
val batch = ctx.addMutableState(columnarBatchClz, "batch")
Expand All @@ -156,15 +151,13 @@ case class ColumnarToRowExec(child: SparkPlan)
val nextBatchFuncName = ctx.addNewFunction(nextBatch,
s"""
|private void $nextBatch() throws java.io.IOException {
| long getBatchStart = System.nanoTime();
| if ($input.hasNext()) {
| $batch = ($columnarBatchClz)$input.next();
| $numInputBatches.add(1);
| $numOutputRows.add($batch.numRows());
| $idx = 0;
| ${columnAssigns.mkString("", "\n", "\n")}
| ${numInputBatches}.add(1);
| }
| $scanTimeTotalNs += System.nanoTime() - getBatchStart;
|}""".stripMargin)

ctx.currentVars = null
Expand All @@ -184,7 +177,7 @@ case class ColumnarToRowExec(child: SparkPlan)
|if ($batch == null) {
| $nextBatchFuncName();
|}
|while ($batch != null) {
|while ($limitNotReachedCond $batch != null) {
| int $numRows = $batch.numRows();
| int $localEnd = $numRows - $idx;
| for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
Expand All @@ -196,13 +189,11 @@ case class ColumnarToRowExec(child: SparkPlan)
| $batch = null;
| $nextBatchFuncName();
|}
|$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
|$scanTimeTotalNs = 0;
""".stripMargin
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
Seq(child.executeColumnar().asInstanceOf[RDD[InternalRow]]) // Hack because of type erasure
}
}

Expand Down Expand Up @@ -439,47 +430,46 @@ case class RowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
// Instead of creating a new config we are reusing columnBatchSize. In the future if we do
// combine with some of the Arrow conversion tools we will need to unify some of the configs.
val numRows = conf.columnBatchSize
val converters = new RowToColumnConverter(schema)
val rowBased = child.execute()
rowBased.mapPartitions(rowIterator => {
new Iterator[ColumnarBatch] {
var cb: ColumnarBatch = null

TaskContext.get().addTaskCompletionListener[Unit] { _ =>
if (cb != null) {
cb.close()
cb = null
// This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
// plan (this) in the closure.
val localSchema = this.schema
child.execute().mapPartitionsInternal { rowIterator =>
if (rowIterator.hasNext) {
new Iterator[ColumnarBatch] {
private val converters = new RowToColumnConverter(localSchema)
private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
OffHeapColumnVector.allocateColumns(numRows, localSchema)
} else {
OnHeapColumnVector.allocateColumns(numRows, localSchema)
}
}

override def hasNext: Boolean = {
rowIterator.hasNext
}
private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)

override def next(): ColumnarBatch = {
if (cb != null) {
TaskContext.get().addTaskCompletionListener[Unit] { _ =>
cb.close()
cb = null
}
val columnVectors : Array[WritableColumnVector] =
if (enableOffHeapColumnVector) {
OffHeapColumnVector.allocateColumns(numRows, schema).toArray
} else {
OnHeapColumnVector.allocateColumns(numRows, schema).toArray

override def hasNext: Boolean = {
rowIterator.hasNext
}

override def next(): ColumnarBatch = {
cb.setNumRows(0)
var rowCount = 0
while (rowCount < numRows && rowIterator.hasNext) {
val row = rowIterator.next()
converters.convert(row, vectors.toArray)
rowCount += 1
}
var rowCount = 0
while (rowCount < numRows && rowIterator.hasNext) {
val row = rowIterator.next()
converters.convert(row, columnVectors)
rowCount += 1
cb.setNumRows(rowCount)
numInputRows += rowCount
numOutputBatches += 1
cb
}
cb = new ColumnarBatch(columnVectors.toArray, rowCount)
numInputRows += rowCount
numOutputBatches += 1
cb
}
} else {
Iterator.empty
}
})
}
}
}

Expand Down
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -334,37 +334,61 @@ case class FileSourceScanExec(
inputRDD :: Nil
}

override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"),
"metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"),
"metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time")
) ++ {
// Tracking scan time has overhead, we can't afford to do it for each row, and can only do
// it for each batch.
if (supportsColumnar) {
Some("scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
} else {
None
}
}

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")

if (needsUnsafeRowConversion) {
inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map( r => {
val toUnsafe = UnsafeProjection.create(schema)
toUnsafe.initialize(index)
iter.map { row =>
numOutputRows += 1
proj(r)
})
toUnsafe(row)
}
}
} else {
inputRDD.map { r =>
numOutputRows += 1
r
inputRDD.mapPartitionsInternal { iter =>
iter.map { row =>
numOutputRows += 1
row
}
}
}
}

protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { batch =>
numOutputRows += batch.numRows()
batch
val scanTime = longMetric("scanTime")
inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches =>
new Iterator[ColumnarBatch] {

override def hasNext: Boolean = {
// The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call.
val startNs = System.nanoTime()
val res = batches.hasNext
scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
res
}

override def next(): ColumnarBatch = {
val batch = batches.next()
numOutputRows += batch.numRows()
batch
}
}
}
}

Expand Down
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution

import java.io.Writer
import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -491,12 +490,8 @@ trait InputRDDCodegen extends CodegenSupport {
*
* This is the leaf node of a tree with WholeStageCodegen that is used to generate code
* that consumes an RDD iterator of InternalRow.
*
* @param isChildColumnar true if the inputRDD is really columnar data hidden by type erasure,
* false if inputRDD is really an RDD[InternalRow]
*/
case class InputAdapter(child: SparkPlan, isChildColumnar: Boolean)
extends UnaryExecNode with InputRDDCodegen {
case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCodegen {

override def output: Seq[Attribute] = child.output

Expand All @@ -522,13 +517,10 @@ case class InputAdapter(child: SparkPlan, isChildColumnar: Boolean)
child.executeColumnar()
}

override def inputRDD: RDD[InternalRow] = {
if (isChildColumnar) {
child.executeColumnar().asInstanceOf[RDD[InternalRow]] // Hack because of type erasure
} else {
child.execute()
}
}
// `InputAdapter` can only generate code to process the rows from its child. If the child produces
// columnar batches, there must be a `ColumnarToRowExec` above `InputAdapter` to handle it by
// overriding `inputRDDs` and calling `InputAdapter#executeColumnar` directly.
override def inputRDD: RDD[InternalRow] = child.execute()

// This is a leaf node so the node can produce limit not reached checks.
override protected def canCheckLimitNotReached: Boolean = true
Expand Down Expand Up @@ -870,59 +862,45 @@ case class CollapseCodegenStages(
/**
* Inserts an InputAdapter on top of those that do not support codegen.
*/
private def insertInputAdapter(plan: SparkPlan, isColumnarInput: Boolean): SparkPlan = {
val isColumnar = adjustColumnar(plan, isColumnarInput)
private def insertInputAdapter(plan: SparkPlan): SparkPlan = {
plan match {
case p if !supportCodegen(p) =>
// collapse them recursively
InputAdapter(insertWholeStageCodegen(p, isColumnar), isColumnar)
InputAdapter(insertWholeStageCodegen(p))
case j: SortMergeJoinExec =>
// The children of SortMergeJoin should do codegen separately.
j.withNewChildren(j.children.map(
child => InputAdapter(insertWholeStageCodegen(child, isColumnar), isColumnar)))
case p =>
p.withNewChildren(p.children.map(insertInputAdapter(_, isColumnar)))
child => InputAdapter(insertWholeStageCodegen(child))))
case p => p.withNewChildren(p.children.map(insertInputAdapter))
}
}

/**
* Inserts a WholeStageCodegen on top of those that support codegen.
*/
private def insertWholeStageCodegen(plan: SparkPlan, isColumnarInput: Boolean): SparkPlan = {
val isColumnar = adjustColumnar(plan, isColumnarInput)
private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = {
plan match {
// For operators that will output domain object, do not insert WholeStageCodegen for it as
// domain object can not be written into unsafe row.
case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
plan.withNewChildren(plan.children.map(insertWholeStageCodegen(_, isColumnar)))
plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
case plan: LocalTableScanExec =>
// Do not make LogicalTableScanExec the root of WholeStageCodegen
// to support the fast driver-local collect/take paths.
plan
case plan: CodegenSupport if supportCodegen(plan) =>
WholeStageCodegenExec(
insertInputAdapter(plan, isColumnar))(codegenStageCounter.incrementAndGet())
// The whole-stage-codegen framework is row-based. If a plan supports columnar execution,
// it can't support whole-stage-codegen at the same time.
assert(!plan.supportsColumnar)
WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet())
case other =>
other.withNewChildren(other.children.map(insertWholeStageCodegen(_, isColumnar)))
other.withNewChildren(other.children.map(insertWholeStageCodegen))
}
}

/**
* Depending on the stage in the plan and if we currently are columnar or not
* return if we are still columnar or not.
*/
private def adjustColumnar(plan: SparkPlan, isColumnar: Boolean): Boolean =
// We are walking up the plan, so columnar starts when we transition to rows
// and ends when we transition to columns
plan match {
case c2r: ColumnarToRowExec => true
case r2c: RowToColumnarExec => false
case _ => isColumnar
}

def apply(plan: SparkPlan): SparkPlan = {
if (conf.wholeStageEnabled) {
insertWholeStageCodegen(plan, false)
insertWholeStageCodegen(plan)
} else {
plan
}
Expand Down

0 comments on commit 03e3006

Please sign in to comment.