From 0e78b3afde48f1a4d2d470ee715efe1031b89882 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 3 Mar 2016 23:58:11 -0800 Subject: [PATCH 1/7] separate physical RDD and scan --- .../spark/sql/execution/ExistingRDD.scala | 97 +++++++++++-------- .../sql/execution/WholeStageCodegen.scala | 1 + .../datasources/DataSourceStrategy.scala | 8 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 6 +- .../spark/sql/sources/FilteredScanSuite.scala | 2 +- .../spark/sql/sources/PrunedScanSuite.scala | 2 +- .../apache/spark/sql/hive/parquetSuites.scala | 4 +- .../spark/sql/sources/BucketedReadSuite.scala | 4 +- .../SimpleTextHadoopFsRelationSuite.scala | 4 +- 9 files changed, 75 insertions(+), 53 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 2cbe3f2c94202..d59edeab3530f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -101,15 +101,69 @@ private[sql] case class LogicalRDD( private[sql] case class PhysicalRDD( output: Seq[Attribute], rdd: RDD[InternalRow], - override val nodeName: String, - override val metadata: Map[String, String] = Map.empty, - isUnsafeRow: Boolean = false, - override val outputPartitioning: Partitioning = UnknownPartitioning(0)) + override val nodeName: String) extends LeafNode { + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + + protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + rdd.mapPartitionsInternal { iter => + val proj = UnsafeProjection.create(schema) + iter.map { r => + numOutputRows += 1 + proj(r) + } + } + } + + override def simpleString: String = { + s"RDD $nodeName${output.mkString("[", ",", "]")}" + } +} + +/** Physical plan node for scanning data from a relation. */ +private[sql] case class PhysicalScan( + output: Seq[Attribute], + rdd: RDD[InternalRow], + @transient relation: BaseRelation, + override val metadata: Map[String, String] = Map.empty) extends LeafNode with CodegenSupport { + override val nodeName: String = relation.toString + private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + val isUnsafeRow = if (relation.isInstanceOf[ParquetRelation]) { + // The vectorized parquet reader does not produce unsafe rows. + !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) + } else { + // All HadoopFsRelations output UnsafeRows + relation.isInstanceOf[HadoopFsRelation] + } + + override val outputPartitioning = { + val bucketSpec = relation match { + case r: HadoopFsRelation => r.getBucketSpec + case _ => None + } + + def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { + throw new AnalysisException(s"bucket column $colName not found in existing columns " + + s"(${output.map(_.name).mkString(", ")})") + } + + if (bucketSpec.isDefined) { + val spec = bucketSpec.get + val numBuckets = spec.numBuckets + val bucketColumns = spec.bucketColumnNames.map(toAttribute) + HashPartitioning(bucketColumns, numBuckets) + } else { + UnknownPartitioning(0) + } + } + protected override def doExecute(): RDD[InternalRow] = { val unsafeRow = if (isUnsafeRow) { rdd @@ -163,41 +217,8 @@ private[sql] case class PhysicalRDD( } } -private[sql] object PhysicalRDD { +private[sql] object PhysicalScan { // Metadata keys val INPUT_PATHS = "InputPaths" val PUSHED_FILTERS = "PushedFilters" - - def createFromDataSource( - output: Seq[Attribute], - rdd: RDD[InternalRow], - relation: BaseRelation, - metadata: Map[String, String] = Map.empty): PhysicalRDD = { - val outputUnsafeRows = if (relation.isInstanceOf[ParquetRelation]) { - // The vectorized parquet reader does not produce unsafe rows. - !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) - } else { - // All HadoopFsRelations output UnsafeRows - relation.isInstanceOf[HadoopFsRelation] - } - - val bucketSpec = relation match { - case r: HadoopFsRelation => r.getBucketSpec - case _ => None - } - - def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { - throw new AnalysisException(s"bucket column $colName not found in existing columns " + - s"(${output.map(_.name).mkString(", ")})") - } - - bucketSpec.map { spec => - val numBuckets = spec.numBuckets - val bucketColumns = spec.bucketColumnNames.map(toAttribute) - val partitioning = HashPartitioning(bucketColumns, numBuckets) - PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows, partitioning) - }.getOrElse { - PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows) - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 6d231bf74a0e9..340381f12b421 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -41,6 +41,7 @@ trait CodegenSupport extends SparkPlan { case _: BroadcastHashJoin => "bhj" case _: SortMergeJoin => "smj" case _: PhysicalRDD => "rdd" + case _: PhysicalScan => "scan" case _ => nodeName.toLowerCase } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index ceb35107bf7d8..8c70efd4b083d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS} +import org.apache.spark.sql.execution.PhysicalScan.{INPUT_PATHS, PUSHED_FILTERS} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.ExecutedCommand import org.apache.spark.sql.sources._ @@ -142,7 +142,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { t.buildInternalScan(a.map(_.name).toArray, f, bucketSet, t.paths, confBroadcast)) :: Nil case l @ LogicalRelation(baseRelation: TableScan, _, _) => - execution.PhysicalRDD.createFromDataSource( + execution.PhysicalScan( l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _), @@ -440,7 +440,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Don't request columns that are only referenced by pushed filters. .filterNot(handledSet.contains) - val scan = execution.PhysicalRDD.createFromDataSource( + val scan = execution.PhysicalScan( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, metadata) @@ -450,7 +450,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val requestedColumns = (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq - val scan = execution.PhysicalRDD.createFromDataSource( + val scan = execution.PhysicalScan( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, metadata) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index dfffa582120cc..395e5c0d73ba0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.execution.PhysicalRDD +import org.apache.spark.sql.execution.PhysicalScan import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD @@ -210,8 +210,8 @@ class JDBCSuite extends SparkFunSuite // the plan only has PhysicalRDD to scan JDBCRelation. assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]) val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen] - assert(node.child.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD]) - assert(node.child.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) + assert(node.child.isInstanceOf[org.apache.spark.sql.execution.PhysicalScan]) + assert(node.child.asInstanceOf[PhysicalScan].nodeName.contains("JDBCRelation")) df } assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 2ff79a2316bdc..40034c2c78187 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -312,7 +312,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic try { val queryExecution = sql(sqlString).queryExecution val rawPlan = queryExecution.executedPlan.collect { - case p: execution.PhysicalRDD => p + case p: execution.PhysicalScan => p } match { case Seq(p) => p case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index db722975379a2..5f47da9208339 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -124,7 +124,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext { try { val queryExecution = sql(sqlString).queryExecution val rawPlan = queryExecution.executedPlan.collect { - case p: execution.PhysicalRDD => p + case p: execution.PhysicalScan => p } match { case Seq(p) => p case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index e5077376a3ba4..5de4598d308b3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql._ -import org.apache.spark.sql.execution.PhysicalRDD +import org.apache.spark.sql.execution.PhysicalScan import org.apache.spark.sql.execution.command.ExecutedCommand import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation @@ -197,7 +197,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { }.isEmpty) assert( sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect { - case _: PhysicalRDD => true + case _: PhysicalScan => true }.nonEmpty) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 9a52276fcdc6a..2eabc7345e583 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -22,7 +22,7 @@ import java.io.File import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.PhysicalRDD +import org.apache.spark.sql.execution.PhysicalScan import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy} import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.SortMergeJoin @@ -90,7 +90,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet // Filter could hide the bug in bucket pruning. Thus, skipping all the filters val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan - val rdd = plan.find(_.isInstanceOf[PhysicalRDD]) + val rdd = plan.find(_.isInstanceOf[PhysicalScan]) assert(rdd.isDefined, plan) val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index 9ab3e11609cec..24a46bbf7d810 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.{execution, Column, DataFrame, Row} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, PredicateHelper} -import org.apache.spark.sql.execution.{LogicalRDD, PhysicalRDD} +import org.apache.spark.sql.execution.{LogicalRDD, PhysicalScan} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -159,7 +159,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat val sparkPlan = queryExecution.sparkPlan val rawScan = sparkPlan.collect { - case p: PhysicalRDD => p + case p: PhysicalScan => p } match { case Seq(scan) => scan case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") From 0278fd94a230108c37e1e9c17365bd37b30a5288 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 7 Mar 2016 23:31:59 -0800 Subject: [PATCH 2/7] fix tests --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- .../test/scala/org/apache/spark/sql/hive/parquetSuites.scala | 2 +- .../scala/org/apache/spark/sql/sources/BucketedReadSuite.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 7a852e235ef65..1ef517324d7cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.execution.DataSourceScan$ +import org.apache.spark.sql.execution.DataSourceScan import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 89d146190e4e0..8fdbbd94c807e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql._ -import org.apache.spark.sql.execution.DataSourceScan$ +import org.apache.spark.sql.execution.DataSourceScan import org.apache.spark.sql.execution.command.ExecutedCommand import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.execution.HiveTableScan diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 596921239a17b..a0be55cfba94c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -22,7 +22,7 @@ import java.io.File import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.DataSourceScan$ +import org.apache.spark.sql.execution.DataSourceScan import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy} import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.SortMergeJoin From 6cfa5450156ae0ad1ca4d5872dfdd2ef56a2e17b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 9 Mar 2016 15:21:20 -0800 Subject: [PATCH 3/7] fix sameResult on DataSourceScan --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 3 ++- .../apache/spark/sql/catalyst/plans/QueryPlan.scala | 12 ++++++------ .../org/apache/spark/sql/execution/ExistingRDD.scala | 8 +++++++- .../execution/datasources/DataSourceStrategy.scala | 4 ++-- .../sql/execution/datasources/jdbc/JDBCRDD.scala | 2 +- .../spark/sql/execution/ui/SparkPlanGraph.scala | 6 +++++- .../scala/org/apache/spark/sql/sources/filters.scala | 2 +- 7 files changed, 24 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 586bf3d4dd976..f17692e4536d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -78,7 +78,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { CombineLimits, CombineUnions, // Constant folding and strength reduction - NullFiltering, + // TODO: enable this once we can fix the regression. + // NullFiltering, NullPropagation, OptimizeIn, ConstantFolding, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 371d72ef5af08..8210d5ac7e1b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -257,12 +257,12 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT * can do better should override this function. */ def sameResult(plan: PlanType): Boolean = { - val canonicalizedLeft = this.canonicalized - val canonicalizedRight = plan.canonicalized - canonicalizedLeft.getClass == canonicalizedRight.getClass && - canonicalizedLeft.children.size == canonicalizedRight.children.size && - canonicalizedLeft.cleanArgs == canonicalizedRight.cleanArgs && - (canonicalizedLeft.children, canonicalizedRight.children).zipped.forall(_ sameResult _) + val left = this.canonicalized + val right = plan.canonicalized + left.getClass == right.getClass && + left.children.size == right.children.size && + left.cleanArgs == right.cleanArgs && + (left.children, right.children).zipped.forall(_ sameResult _) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index d482a31388909..6814313b4b306 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -132,6 +132,12 @@ private[sql] case class DataSourceScan( override val nodeName: String = relation.toString + // Ignore rdd when checking results + override def sameResult(plan: SparkPlan ): Boolean = plan match { + case other: DataSourceScan => relation == other.relation && metadata == other.metadata + case _ => false + } + private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) @@ -240,7 +246,7 @@ private[sql] case class DataSourceScan( ctx.INPUT_ROW = row ctx.currentVars = null val columns2 = exprs.map(_.gen(ctx)) - val inputRow = if (isUnsafeRow) row else null + val inputRow = if (outputUnsafeRows) row else null val scanRows = ctx.freshName("processRows") ctx.addNewFunction(scanRows, s""" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 1adf3b6676555..e55746b1b55d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -718,7 +718,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case expressions.InSet(a: Attribute, set) => val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType) - Some(sources.In(a.name, set.toArray.map(toScala))) + Some(sources.In(a.name, set.toSeq.map(toScala))) // Because we only convert In to InSet in Optimizer when there are more than certain // items. So it is possible we still get an In expression here that needs to be pushed @@ -726,7 +726,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case expressions.In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) => val hSet = list.map(e => e.eval(EmptyRow)) val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType) - Some(sources.In(a.name, hSet.toArray.map(toScala))) + Some(sources.In(a.name, hSet.toSeq.map(toScala))) case expressions.IsNull(a: Attribute) => Some(sources.IsNull(a.name)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 4dd3c50cdf036..2c84add27bb13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -180,7 +180,7 @@ private[sql] object JDBCRDD extends Logging { case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "'" + timestampValue + "'" case dateValue: Date => "'" + dateValue + "'" - case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") + case arrayValue: Seq[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 94d318e702789..8a36d3224003a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -93,6 +93,10 @@ private[sql] object SparkPlanGraph { case "Subquery" if subgraph != null => // Subquery should not be included in WholeStageCodegen buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, parent, null, exchanges) + case "ReusedExchange" => + // Point to the re-used exchange + val node = exchanges(planInfo.children.head) + edges += SparkPlanGraphEdge(node.id, parent.id) case name => val metrics = planInfo.metrics.map { metric => SQLPlanMetric(metric.name, metric.accumulatorId, @@ -106,7 +110,7 @@ private[sql] object SparkPlanGraph { } else { subgraph.nodes += node } - if (name == "ShuffleExchange" || name == "BroadcastExchange") { + if (name.contains("Exchange")) { exchanges += planInfo -> node } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala index 3780cbbcc9631..36df907f08c18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -82,7 +82,7 @@ case class LessThanOrEqual(attribute: String, value: Any) extends Filter * * @since 1.3.0 */ -case class In(attribute: String, values: Array[Any]) extends Filter +case class In(attribute: String, values: Seq[Any]) extends Filter /** * A filter that evaluates to `true` iff the attribute evaluates to null. From c4ea2e8b12fd802fd2403bbaefbbf8ab698705bb Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 9 Mar 2016 16:50:10 -0800 Subject: [PATCH 4/7] fix In --- .../datasources/DataSourceStrategy.scala | 4 ++-- .../execution/datasources/jdbc/JDBCRDD.scala | 2 +- .../apache/spark/sql/sources/filters.scala | 19 ++++++++++++++++++- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e55746b1b55d8..1adf3b6676555 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -718,7 +718,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case expressions.InSet(a: Attribute, set) => val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType) - Some(sources.In(a.name, set.toSeq.map(toScala))) + Some(sources.In(a.name, set.toArray.map(toScala))) // Because we only convert In to InSet in Optimizer when there are more than certain // items. So it is possible we still get an In expression here that needs to be pushed @@ -726,7 +726,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case expressions.In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) => val hSet = list.map(e => e.eval(EmptyRow)) val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType) - Some(sources.In(a.name, hSet.toSeq.map(toScala))) + Some(sources.In(a.name, hSet.toArray.map(toScala))) case expressions.IsNull(a: Attribute) => Some(sources.IsNull(a.name)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 2c84add27bb13..4dd3c50cdf036 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -180,7 +180,7 @@ private[sql] object JDBCRDD extends Logging { case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "'" + timestampValue + "'" case dateValue: Date => "'" + dateValue + "'" - case arrayValue: Seq[Any] => arrayValue.map(compileValue).mkString(", ") + case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala index 36df907f08c18..9130e77ea5724 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -82,7 +82,24 @@ case class LessThanOrEqual(attribute: String, value: Any) extends Filter * * @since 1.3.0 */ -case class In(attribute: String, values: Seq[Any]) extends Filter +case class In(attribute: String, values: Array[Any]) extends Filter { + override def hashCode(): Int = { + var h = attribute.hashCode + values.foreach { v => + h *= 41 + h += v.hashCode() + } + h + } + override def equals(o: Any): Boolean = o match { + case In(a, vs) => + a == attribute && vs.length == values.length && vs.zip(values).forall(x => x._1 == x._2) + case _ => false + } + override def toString: String = { + s"In($attribute, [${values.mkString(",")}]" + } +} /** * A filter that evaluates to `true` iff the attribute evaluates to null. From c159b258f116c5ebf2a87dce241169425314c3ea Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 10 Mar 2016 13:18:18 -0800 Subject: [PATCH 5/7] fix tests --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f17692e4536d8..586bf3d4dd976 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -78,8 +78,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { CombineLimits, CombineUnions, // Constant folding and strength reduction - // TODO: enable this once we can fix the regression. - // NullFiltering, + NullFiltering, NullPropagation, OptimizeIn, ConstantFolding, From b482d2cbf367e27cc203d89a966d0a2a053bcc35 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 10 Mar 2016 15:07:43 -0800 Subject: [PATCH 6/7] fix test --- .../main/scala/org/apache/spark/sql/execution/ExistingRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 6814313b4b306..d363cb000d39a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -118,7 +118,7 @@ private[sql] case class PhysicalRDD( } override def simpleString: String = { - s"RDD $nodeName${output.mkString("[", ",", "]")}" + s"Scan $nodeName${output.mkString("[", ",", "]")}" } } From b3d2df0282f2be935e2976be39544f58e6ee3431 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 11 Mar 2016 15:51:15 -0800 Subject: [PATCH 7/7] fix tests --- python/pyspark/sql/dataframe.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 99d665fafec89..7008e8fadffc3 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -173,8 +173,7 @@ def explain(self, extended=False): >>> df.explain() == Physical Plan == - WholeStageCodegen - : +- Scan ExistingRDD[age#0,name#1] + Scan ExistingRDD[age#0,name#1] >>> df.explain(True) == Parsed Logical Plan ==