Skip to content

Commit

Permalink
[SPARK-13250] [SQL] Update PhysicallRDD to convert to UnsafeRow if us…
Browse files Browse the repository at this point in the history
…ing the vectorized scanner.

Some parts of the engine rely on UnsafeRow which the vectorized parquet scanner does not want
to produce. This add a conversion in Physical RDD. In the case where codegen is used (and the
scan is the start of the pipeline), there is no requirement to use UnsafeRow. This patch adds
update PhysicallRDD to support codegen, which eliminates the need for the UnsafeRow conversion
in all cases.

The result of these changes for TPCDS-Q19 at the 10gb sf reduces the query time from 9.5 seconds
to 6.5 seconds.

Author: Nong Li <nong@databricks.com>

Closes #11141 from nongli/spark-13250.
  • Loading branch information
nongli authored and davies committed Feb 25, 2016
1 parent cbb0b65 commit 5a7af9e
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 71 deletions.
3 changes: 2 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ def explain(self, extended=False):
>>> df.explain()
== Physical Plan ==
Scan ExistingRDD[age#0,name#1]
WholeStageCodegen
: +- Scan ExistingRDD[age#0,name#1]
>>> df.explain(True)
== Parsed Logical Plan ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.{AnalysisException, Row, SQLConf, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
import org.apache.spark.sql.types.DataType
Expand Down Expand Up @@ -102,7 +104,7 @@ private[sql] case class PhysicalRDD(
override val metadata: Map[String, String] = Map.empty,
isUnsafeRow: Boolean = false,
override val outputPartitioning: Partitioning = UnknownPartitioning(0))
extends LeafNode {
extends LeafNode with CodegenSupport {

private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
Expand All @@ -128,6 +130,36 @@ private[sql] case class PhysicalRDD(
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
}

override def upstreams(): Seq[RDD[InternalRow]] = {
rdd :: Nil
}

// Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
// never requires UnsafeRow as input.
override protected def doProduce(ctx: CodegenContext): String = {
val input = ctx.freshName("input")
// PhysicalRDD always just has one input
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")

val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true))
val row = ctx.freshName("row")
val numOutputRows = metricTerm(ctx, "numOutputRows")
ctx.INPUT_ROW = row
ctx.currentVars = null
val columns = exprs.map(_.gen(ctx))
s"""
| while ($input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| $numOutputRows.add(1);
| ${columns.map(_.code).mkString("\n").trim}
| ${consume(ctx, columns).trim}
| if (shouldStop()) {
| return;
| }
| }
""".stripMargin
}
}

private[sql] object PhysicalRDD {
Expand All @@ -140,8 +172,13 @@ private[sql] object PhysicalRDD {
rdd: RDD[InternalRow],
relation: BaseRelation,
metadata: Map[String, String] = Map.empty): PhysicalRDD = {
// All HadoopFsRelations output UnsafeRows
val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation]
val outputUnsafeRows = if (relation.isInstanceOf[ParquetRelation]) {
// The vectorized parquet reader does not produce unsafe rows.
!SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
} else {
// All HadoopFsRelations output UnsafeRows
relation.isInstanceOf[HadoopFsRelation]
}

val bucketSpec = relation match {
case r: HadoopFsRelation => r.getBucketSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ trait CodegenSupport extends SparkPlan {
case _: TungstenAggregate => "agg"
case _: BroadcastHashJoin => "bhj"
case _: SortMergeJoin => "smj"
case _: PhysicalRDD => "rdd"
case _ => nodeName.toLowerCase
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ class JDBCSuite extends SparkFunSuite
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is removed in a physical plan and
// the plan only has PhysicalRDD to scan JDBCRelation.
assert(parentPlan.isInstanceOf[PhysicalRDD])
assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD])
assert(node.plan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
df
}
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,30 +304,38 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
expectedCount: Int,
requiredColumnNames: Set[String],
expectedUnhandledFilters: Set[Filter]): Unit = {

test(s"PushDown Returns $expectedCount: $sqlString") {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
case p: execution.PhysicalRDD => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
}
val rawCount = rawPlan.execute().count()
assert(ColumnsRequired.set === requiredColumnNames)

val table = caseInsensitiveContext.table("oneToTenFiltered")
val relation = table.queryExecution.logical.collectFirst {
case LogicalRelation(r, _, _) => r
}.get

assert(
relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)

if (rawCount != expectedCount) {
fail(
s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" +
s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
queryExecution)
// These tests check a particular plan, disable whole stage codegen.
caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
try {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
case p: execution.PhysicalRDD => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
}
val rawCount = rawPlan.execute().count()
assert(ColumnsRequired.set === requiredColumnNames)

val table = caseInsensitiveContext.table("oneToTenFiltered")
val relation = table.queryExecution.logical.collectFirst {
case LogicalRelation(r, _, _) => r
}.get

assert(
relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)

if (rawCount != expectedCount) {
fail(
s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" +
s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
queryExecution)
}
} finally {
caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,28 +117,35 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {

def testPruning(sqlString: String, expectedColumns: String*): Unit = {
test(s"Columns output ${expectedColumns.mkString(",")}: $sqlString") {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
case p: execution.PhysicalRDD => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
}
val rawColumns = rawPlan.output.map(_.name)
val rawOutput = rawPlan.execute().first()

if (rawColumns != expectedColumns) {
fail(
s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" +
s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
queryExecution)
}

if (rawOutput.numFields != expectedColumns.size) {
fail(s"Wrong output row. Got $rawOutput\n$queryExecution")
// These tests check a particular plan, disable whole stage codegen.
caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
try {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
case p: execution.PhysicalRDD => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
}
val rawColumns = rawPlan.output.map(_.name)
val rawOutput = rawPlan.execute().first()

if (rawColumns != expectedColumns) {
fail(
s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" +
s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
queryExecution)
}

if (rawOutput.numFields != expectedColumns.size) {
fail(s"Wrong output row. Got $rawOutput\n$queryExecution")
}
} finally {
caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get)
}
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -74,32 +74,34 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
bucketValues: Seq[Integer],
filterCondition: Column,
originalDataFrame: DataFrame): Unit = {
// This test verifies parts of the plan. Disable whole stage codegen.
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
// Limit: bucket pruning only works when the bucket column has one and only one column
assert(bucketColumnNames.length == 1)
val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)
val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex)
val matchedBuckets = new BitSet(numBuckets)
bucketValues.foreach { value =>
matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value))
}

val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
// Limit: bucket pruning only works when the bucket column has one and only one column
assert(bucketColumnNames.length == 1)
val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)
val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex)
val matchedBuckets = new BitSet(numBuckets)
bucketValues.foreach { value =>
matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value))
}
// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
val rdd = plan.find(_.isInstanceOf[PhysicalRDD])
assert(rdd.isDefined, plan)

// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
val rdd = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
.find(_.isInstanceOf[PhysicalRDD])
assert(rdd.isDefined)
val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty)
}
// checking if all the pruned buckets are empty
assert(checkedResult.collect().forall(_ == true))

val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty)
checkAnswer(
bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
}
// checking if all the pruned buckets are empty
assert(checkedResult.collect().forall(_ == true))

checkAnswer(
bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
}

test("read partitioning bucketed tables with bucket pruning filters") {
Expand Down

0 comments on commit 5a7af9e

Please sign in to comment.