Skip to content

Commit

Permalink
separate physical RDD and scan
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Mar 4, 2016
1 parent 465c665 commit 0e78b3a
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,69 @@ private[sql] case class LogicalRDD(
private[sql] case class PhysicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow],
override val nodeName: String,
override val metadata: Map[String, String] = Map.empty,
isUnsafeRow: Boolean = false,
override val outputPartitioning: Partitioning = UnknownPartitioning(0))
override val nodeName: String) extends LeafNode {

private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
rdd.mapPartitionsInternal { iter =>
val proj = UnsafeProjection.create(schema)
iter.map { r =>
numOutputRows += 1
proj(r)
}
}
}

override def simpleString: String = {
s"RDD $nodeName${output.mkString("[", ",", "]")}"
}
}

/** Physical plan node for scanning data from a relation. */
private[sql] case class PhysicalScan(
output: Seq[Attribute],
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val metadata: Map[String, String] = Map.empty)
extends LeafNode with CodegenSupport {

override val nodeName: String = relation.toString

private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

val isUnsafeRow = if (relation.isInstanceOf[ParquetRelation]) {
// The vectorized parquet reader does not produce unsafe rows.
!SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
} else {
// All HadoopFsRelations output UnsafeRows
relation.isInstanceOf[HadoopFsRelation]
}

override val outputPartitioning = {
val bucketSpec = relation match {
case r: HadoopFsRelation => r.getBucketSpec
case _ => None
}

def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
throw new AnalysisException(s"bucket column $colName not found in existing columns " +
s"(${output.map(_.name).mkString(", ")})")
}

if (bucketSpec.isDefined) {
val spec = bucketSpec.get
val numBuckets = spec.numBuckets
val bucketColumns = spec.bucketColumnNames.map(toAttribute)
HashPartitioning(bucketColumns, numBuckets)
} else {
UnknownPartitioning(0)
}
}

protected override def doExecute(): RDD[InternalRow] = {
val unsafeRow = if (isUnsafeRow) {
rdd
Expand Down Expand Up @@ -163,41 +217,8 @@ private[sql] case class PhysicalRDD(
}
}

private[sql] object PhysicalRDD {
private[sql] object PhysicalScan {
// Metadata keys
val INPUT_PATHS = "InputPaths"
val PUSHED_FILTERS = "PushedFilters"

def createFromDataSource(
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation,
metadata: Map[String, String] = Map.empty): PhysicalRDD = {
val outputUnsafeRows = if (relation.isInstanceOf[ParquetRelation]) {
// The vectorized parquet reader does not produce unsafe rows.
!SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
} else {
// All HadoopFsRelations output UnsafeRows
relation.isInstanceOf[HadoopFsRelation]
}

val bucketSpec = relation match {
case r: HadoopFsRelation => r.getBucketSpec
case _ => None
}

def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
throw new AnalysisException(s"bucket column $colName not found in existing columns " +
s"(${output.map(_.name).mkString(", ")})")
}

bucketSpec.map { spec =>
val numBuckets = spec.numBuckets
val bucketColumns = spec.bucketColumnNames.map(toAttribute)
val partitioning = HashPartitioning(bucketColumns, numBuckets)
PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows, partitioning)
}.getOrElse {
PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ trait CodegenSupport extends SparkPlan {
case _: BroadcastHashJoin => "bhj"
case _: SortMergeJoin => "smj"
case _: PhysicalRDD => "rdd"
case _: PhysicalScan => "scan"
case _ => nodeName.toLowerCase
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
import org.apache.spark.sql.execution.PhysicalScan.{INPUT_PATHS, PUSHED_FILTERS}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.ExecutedCommand
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -142,7 +142,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
t.buildInternalScan(a.map(_.name).toArray, f, bucketSet, t.paths, confBroadcast)) :: Nil

case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
execution.PhysicalRDD.createFromDataSource(
execution.PhysicalScan(
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil

case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
Expand Down Expand Up @@ -440,7 +440,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Don't request columns that are only referenced by pushed filters.
.filterNot(handledSet.contains)

val scan = execution.PhysicalRDD.createFromDataSource(
val scan = execution.PhysicalScan(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
Expand All @@ -450,7 +450,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq

val scan = execution.PhysicalRDD.createFromDataSource(
val scan = execution.PhysicalScan(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.execution.PhysicalScan
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
Expand Down Expand Up @@ -210,8 +210,8 @@ class JDBCSuite extends SparkFunSuite
// the plan only has PhysicalRDD to scan JDBCRelation.
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD])
assert(node.child.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.PhysicalScan])
assert(node.child.asInstanceOf[PhysicalScan].nodeName.contains("JDBCRelation"))
df
}
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
try {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
case p: execution.PhysicalRDD => p
case p: execution.PhysicalScan => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {
try {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
case p: execution.PhysicalRDD => p
case p: execution.PhysicalScan => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
import java.io.File

import org.apache.spark.sql._
import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.execution.PhysicalScan
import org.apache.spark.sql.execution.command.ExecutedCommand
import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
Expand Down Expand Up @@ -197,7 +197,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}.isEmpty)
assert(
sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
case _: PhysicalRDD => true
case _: PhysicalScan => true
}.nonEmpty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.execution.PhysicalScan
import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy}
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.SortMergeJoin
Expand Down Expand Up @@ -90,7 +90,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet

// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
val rdd = plan.find(_.isInstanceOf[PhysicalRDD])
val rdd = plan.find(_.isInstanceOf[PhysicalScan])
assert(rdd.isDefined, plan)

val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{execution, Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, PredicateHelper}
import org.apache.spark.sql.execution.{LogicalRDD, PhysicalRDD}
import org.apache.spark.sql.execution.{LogicalRDD, PhysicalScan}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -159,7 +159,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
val sparkPlan = queryExecution.sparkPlan

val rawScan = sparkPlan.collect {
case p: PhysicalRDD => p
case p: PhysicalScan => p
} match {
case Seq(scan) => scan
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
Expand Down

0 comments on commit 0e78b3a

Please sign in to comment.