From fcecb2278621ef032cc7bfce301a75545efc579a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 6 Aug 2015 22:45:13 -0700 Subject: [PATCH 1/4] [SPARK-9733][SQL] Improve explain message for data source scan node. --- .../expressions/aggregate/interfaces.scala | 2 +- .../org/apache/spark/sql/SQLContext.scala | 4 ---- .../spark/sql/execution/ExistingRDD.scala | 15 ++++++++++++- .../spark/sql/execution/SparkStrategies.scala | 4 ++-- .../aggregate/TungstenAggregate.scala | 9 +++++--- .../datasources/DataSourceStrategy.scala | 22 +++++++++++++------ .../apache/spark/sql/sources/interfaces.scala | 2 +- .../execution/RowFormatConvertersSuite.scala | 4 ++-- 8 files changed, 41 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index 4abfdfe87d5e9..576d8c7a3a68a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -93,7 +93,7 @@ private[sql] case class AggregateExpression2( AttributeSet(childReferences) } - override def toString: String = s"(${aggregateFunction}2,mode=$mode,isDistinct=$isDistinct)" + override def toString: String = s"(${aggregateFunction},mode=$mode,isDistinct=$isDistinct)" } abstract class AggregateFunction2 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 075c0ea2544b2..832572571cabd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1011,9 +1011,6 @@ class SQLContext(@transient val sparkContext: SparkContext) def output = analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") - // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)}) - // however, the `toRdd` will cause the real execution, which is not what we want. - // We need to think about how to avoid the side effect. s"""== Parsed Logical Plan == |${stringOrError(logical)} |== Analyzed Logical Plan == @@ -1024,7 +1021,6 @@ class SQLContext(@transient val sparkContext: SparkContext) |== Physical Plan == |${stringOrError(executedPlan)} |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} - |== RDD == """.stripMargin.trim } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index fbaa8e276ddb7..cae7ca5cbdc88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{Row, SQLContext} @@ -95,11 +96,23 @@ private[sql] case class LogicalRDD( /** Physical plan node for scanning data from an RDD. */ private[sql] case class PhysicalRDD( output: Seq[Attribute], - rdd: RDD[InternalRow]) extends LeafNode { + rdd: RDD[InternalRow], + extraInformation: String) extends LeafNode { override protected[sql] val trackNumOfRowsEnabled = true protected override def doExecute(): RDD[InternalRow] = rdd + + override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]") +} + +private[sql] object PhysicalRDD { + def createFromDataSource( + output: Seq[Attribute], + rdd: RDD[InternalRow], + relation: BaseRelation): PhysicalRDD = { + PhysicalRDD(output, rdd, relation.toString) + } } /** Logical plan node for scanning data from a local collection. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c5aaebe673225..c4b9b5acea4de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -363,12 +363,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Generate( generator, join = join, outer = outer, g.output, planLater(child)) :: Nil case logical.OneRowRelation => - execution.PhysicalRDD(Nil, singleRowRdd) :: Nil + execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil case logical.RepartitionByExpression(expressions, child) => execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil case e @ EvaluatePython(udf, child, _) => BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil - case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil + case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil case BroadcastHint(child) => apply(child) case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 5a0b4d47d62f8..c3dcbd2b71ee8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -93,10 +93,13 @@ case class TungstenAggregate( val allAggregateExpressions = nonCompleteAggregateExpressions ++ completeAggregateExpressions testFallbackStartsAt match { - case None => s"TungstenAggregate ${groupingExpressions} ${allAggregateExpressions}" + case None => + val keyString = groupingExpressions.mkString("[", ",", "]") + val valueString = allAggregateExpressions.mkString("[", ",", "]") + s"TungstenAggregate(key=$keyString, value=$valueString" case Some(fallbackStartsAt) => - s"TungstenAggregateWithControlledFallback ${groupingExpressions} " + - s"${allAggregateExpressions} fallbackStartsAt=$fallbackStartsAt" + s"TungstenAggregateWithControlledFallback $groupingExpressions " + + s"$allAggregateExpressions fallbackStartsAt=$fallbackStartsAt" } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e5dc676b87841..5b5fa8c93ec52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -101,8 +101,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil - case l @ LogicalRelation(t: TableScan) => - execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil + case l @ LogicalRelation(baseRelation: TableScan) => + execution.PhysicalRDD.createFromDataSource( + l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil case i @ logical.InsertIntoTable( l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty => @@ -169,7 +170,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows) } - execution.PhysicalRDD(projections.map(_.toAttribute), unionedRows) + execution.PhysicalRDD.createFromDataSource( + projections.map(_.toAttribute), + unionedRows, + logicalRelation.relation) } // TODO: refactor this thing. It is very complicated because it does projection internally. @@ -299,14 +303,18 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { projects.asInstanceOf[Seq[Attribute]] // Safe due to if above. .map(relation.attributeMap) // Match original case of attributes. - val scan = execution.PhysicalRDD(projects.map(_.toAttribute), - scanBuilder(requestedColumns, pushedFilters)) + val scan = execution.PhysicalRDD.createFromDataSource( + projects.map(_.toAttribute), + scanBuilder(requestedColumns, pushedFilters), + relation.relation) filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) } else { val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq - val scan = execution.PhysicalRDD(requestedColumns, - scanBuilder(requestedColumns, pushedFilters)) + val scan = execution.PhysicalRDD.createFromDataSource( + requestedColumns, + scanBuilder(requestedColumns, pushedFilters), + relation.relation) execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index c04557e5a0818..0b2929661b657 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -383,7 +383,7 @@ private[sql] abstract class OutputWriterInternal extends OutputWriter { abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec]) extends BaseRelation with Logging { - logInfo("Constructing HadoopFsRelation") + override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]") def this() = this(None) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala index 8208b25b5708c..322966f423784 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala @@ -32,9 +32,9 @@ class RowFormatConvertersSuite extends SparkPlanTest { case c: ConvertToSafe => c } - private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null)) + private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null, "name")) assert(!outputsSafe.outputsUnsafeRows) - private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null)) + private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null, "name")) assert(outputsUnsafe.outputsUnsafeRows) test("planner should insert unsafe->safe conversions when required") { From eea9ccc182d06e9c4eb100423a00f587449a04b1 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 7 Aug 2015 00:29:59 -0700 Subject: [PATCH 2/4] Fix test case. --- .../org/apache/spark/sql/hive/execution/HiveExplainSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index 697211222b90c..8215dd6c2e711 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -36,7 +36,7 @@ class HiveExplainSuite extends QueryTest { "== Analyzed Logical Plan ==", "== Optimized Logical Plan ==", "== Physical Plan ==", - "Code Generation", "== RDD ==") + "Code Generation") } test("explain create table command") { From 52cab773c47ffad13db5d93a18ff76f4148a2bfa Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 7 Aug 2015 00:36:21 -0700 Subject: [PATCH 3/4] Cast. --- .../org/apache/spark/sql/catalyst/expressions/Cast.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 39f99700c8a26..946c5a9c04f14 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -107,6 +107,8 @@ object Cast { case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with CodegenFallback { + override def toString: String = s"cast($child as ${dataType.simpleString})" + override def checkInputDataTypes(): TypeCheckResult = { if (Cast.canCast(child.dataType, dataType)) { TypeCheckResult.TypeCheckSuccess @@ -118,8 +120,6 @@ case class Cast(child: Expression, dataType: DataType) override def nullable: Boolean = Cast.forceNullable(child.dataType, dataType) || child.nullable - override def toString: String = s"CAST($child, $dataType)" - // [[func]] assumes the input is no longer null because eval already does the null check. @inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T]) From 811b90e2c97ec7f9a20528404a1f3133a50ddf31 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 7 Aug 2015 11:51:37 -0700 Subject: [PATCH 4/4] Fixed Python test case. --- python/pyspark/sql/dataframe.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 0f3480c239187..47d5a6a43a84d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -212,8 +212,7 @@ def explain(self, extended=False): :param extended: boolean, default ``False``. If ``False``, prints only the physical plan. >>> df.explain() - PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at applySchemaToPythonRDD at\ - NativeMethodAccessorImpl.java:... + Scan PhysicalRDD[age#0,name#1] >>> df.explain(True) == Parsed Logical Plan == @@ -224,7 +223,6 @@ def explain(self, extended=False): ... == Physical Plan == ... - == RDD == """ if extended: print(self._jdf.queryExecution().toString())