From c1291848d9443c6ea0dd009a0d107473a30da1a0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 2 Nov 2018 11:41:09 +0300 Subject: [PATCH 1/6] Fix number partitions returned by outputPartitioning --- .../sql/execution/DataSourceScanExec.scala | 4 ++++ .../spark/sql/execution/ExistingRDD.scala | 4 ++++ .../sql/execution/LocalTableScanExec.scala | 5 +++++ .../apache/spark/sql/execution/SparkPlan.scala | 7 ++++++- .../spark/sql/execution/SparkStrategies.scala | 2 ++ .../sql/execution/basicPhysicalOperators.scala | 4 ++++ .../spark/sql/execution/command/commands.scala | 5 +++++ .../datasources/v2/DataSourceV2ScanExec.scala | 11 +++++------ .../v2/WriteToDataSourceV2Exec.scala | 3 +++ .../joins/BroadcastNestedLoopJoinExec.scala | 10 ++++++++-- .../execution/joins/CartesianProductExec.scala | 18 +++++++++++++----- .../apache/spark/sql/execution/objects.scala | 2 ++ .../sql/execution/python/EvalPythonExec.scala | 3 +++ .../streaming/EventTimeWatermarkExec.scala | 3 +++ .../streaming/FlatMapGroupsWithStateExec.scala | 4 +++- .../streaming/StreamingRelation.scala | 2 ++ .../WriteToContinuousDataSourceExec.scala | 3 +++ .../spark/sql/ExtraStrategiesSuite.scala | 4 ++++ .../ui/SQLAppStatusListenerSuite.scala | 3 +++ .../sql/hive/execution/HiveTableScanExec.scala | 17 +++++++++++++---- 20 files changed, 95 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index a9b18ab57237d..67ab15dbae927 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -143,6 +143,10 @@ case class RowDataSourceScanExec( fullOutput.map(QueryPlan.normalizeExprId(_, fullOutput)), rdd = null, tableIdentifier = None) + + override def outputPartitioning: Partitioning = { + SparkPlan.defaultPartitioning(rdd.getNumPartitions) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 2962becb64e88..82e2fdb0e9b94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -122,6 +122,10 @@ case class ExternalRDDScanExec[T]( override def simpleString: String = { s"$nodeName${output.mkString("[", ",", "]")}" } + + override def outputPartitioning: Partitioning = { + SparkPlan.defaultPartitioning(rdd.getNumPartitions) + } } /** Logical plan node for scanning data from an RDD of InternalRow. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index 448eb703eacde..02bceadfe1aa5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.metric.SQLMetrics @@ -76,4 +77,8 @@ case class LocalTableScanExec( longMetric("numOutputRows").add(taken.size) taken } + + override def outputPartitioning: Partitioning = { + SparkPlan.defaultPartitioning(rdd.getNumPartitions) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 9d9b020309d9f..76de220089381 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -92,7 +92,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ // TODO: Move to `DistributedPlan` /** Specifies how data is partitioned across different nodes in the cluster. */ - def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH! + def outputPartitioning: Partitioning /** * Specifies the data distribution requirements of all the children for this operator. By default @@ -426,6 +426,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ object SparkPlan { private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + + def defaultPartitioning(numPartitions: Int): Partitioning = { + if (numPartitions == 1) SinglePartition + else UnknownPartitioning(numPartitions) + } } trait LeafExecNode extends SparkPlan { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index dbc6db62bd820..46795b39f217e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -57,6 +57,8 @@ case class PlanLater(plan: LogicalPlan) extends LeafExecNode { protected override def doExecute(): RDD[InternalRow] = { throw new UnsupportedOperationException() } + + override def outputPartitioning: Partitioning = UnknownPartitioning(0) } abstract class SparkStrategies extends QueryPlanner[SparkPlan] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 09effe087e195..0ea76f1bd3c11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -602,6 +602,10 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { protected override def doExecute(): RDD[InternalRow] = sparkContext.union(children.map(_.execute())) + + override def outputPartitioning: Partitioning = { + PartitioningCollection(children.map(_.outputPartitioning)) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 2cc0e38adc2ee..205a6d9628d59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition, UnknownPartitioning} import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.debug._ import org.apache.spark.sql.execution.metric.SQLMetric @@ -85,6 +86,8 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode { protected override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(sideEffectResult, 1) } + + override def outputPartitioning: Partitioning = SinglePartition } /** @@ -121,6 +124,8 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) protected override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(sideEffectResult, 1) } + + override def outputPartitioning: Partitioning = SinglePartition } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 25f86a66a8269..c040afd6ec4e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical -import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.plans.physical.{SinglePartition, UnknownPartitioning} import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.DataSourceV2 @@ -54,15 +54,14 @@ case class DataSourceV2ScanExec( Seq(output, source, options).hashCode() } - override def outputPartitioning: physical.Partitioning = readSupport match { - case _ if partitions.length == 1 => - SinglePartition + override def outputPartitioning: physical.Partitioning = (readSupport, partitions.length) match { + case (_, 1) => SinglePartition - case s: SupportsReportPartitioning => + case (s: SupportsReportPartitioning, _) => new DataSourcePartitioning( s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name))) - case _ => super.outputPartitioning + case (_, numPartitions) => UnknownPartitioning(numPartitions) } private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index c3f7b690ef636..2b87e24af527a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.util.Utils @@ -95,6 +96,8 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark sparkContext.emptyRDD } + + override def outputPartitioning: Partitioning = query.outputPartitioning } object DataWritingSparkTask extends Logging { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala index f526a19876670..0d82c6af542e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala @@ -339,10 +339,10 @@ case class BroadcastNestedLoopJoinExec( ) } - protected override def doExecute(): RDD[InternalRow] = { + private lazy val resultRdd = { val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]() - val resultRdd = (joinType, buildSide) match { + (joinType, buildSide) match { case (_: InnerLike, _) => innerJoin(broadcastedRelation) case (LeftOuter, BuildRight) | (RightOuter, BuildLeft) => @@ -364,7 +364,9 @@ case class BroadcastNestedLoopJoinExec( */ defaultJoin(broadcastedRelation) } + } + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") resultRdd.mapPartitionsWithIndexInternal { (index, iter) => val resultProj = genResultProjection @@ -375,4 +377,8 @@ case class BroadcastNestedLoopJoinExec( } } } + + override def outputPartitioning: Partitioning = { + SparkPlan.defaultPartitioning(resultRdd.getNumPartitions) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index 4d261dd422bc5..fe1661f014f2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -22,6 +22,7 @@ import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.{BinaryExecNode, ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.util.CompletionIterator @@ -66,19 +67,22 @@ case class CartesianProductExec( override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) - protected override def doExecute(): RDD[InternalRow] = { - val numOutputRows = longMetric("numOutputRows") - + private lazy val cartesianRdd = { val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]] val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]] - val pair = new UnsafeCartesianRDD( + new UnsafeCartesianRDD( leftResults, rightResults, right.output.size, sqlContext.conf.cartesianProductExecBufferInMemoryThreshold, sqlContext.conf.cartesianProductExecBufferSpillThreshold) - pair.mapPartitionsWithIndexInternal { (index, iter) => + } + + protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + + cartesianRdd.mapPartitionsWithIndexInternal { (index, iter) => val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema) val filtered = if (condition.isDefined) { val boundCondition = newPredicate(condition.get, left.output ++ right.output) @@ -97,4 +101,8 @@ case class CartesianProductExec( } } } + + override def outputPartitioning: Partitioning = { + SparkPlan.defaultPartitioning(cartesianRdd.getNumPartitions) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 03d1bbf2ab882..0ff0f1e4592aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -481,4 +481,6 @@ case class CoGroupExec( } } } + + override def outputPartitioning: Partitioning = left.outputPartitioning } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 942a6db57416e..10f30464a3e80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -26,6 +26,7 @@ import org.apache.spark.api.python.ChainedPythonFunctions import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.util.Utils @@ -135,4 +136,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } } } + + override def outputPartitioning: Partitioning = child.outputPartitioning } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index b161651c4e6a3..0a2cae4a53c29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.unsafe.types.CalendarInterval @@ -115,4 +116,6 @@ case class EventTimeWatermarkExec( a } } + + override def outputPartitioning: Partitioning = child.outputPartitioning } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index bfe7d00f56048..74192ebbe1151 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, Literal, SortOrder, UnsafeRow} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution} +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode} @@ -239,4 +239,6 @@ case class FlatMapGroupsWithStateExec( CompletionIterator[InternalRow, Iterator[InternalRow]](mappedIterator, onIteratorCompletion) } } + + override def outputPartitioning: Partitioning = child.outputPartitioning } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index 4b696dfa57359..2b72a5460878f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceV2} @@ -142,6 +143,7 @@ case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) ext override protected def doExecute(): RDD[InternalRow] = { throw new UnsupportedOperationException("StreamingRelationExec cannot be executed") } + override def outputPartitioning: Partitioning = UnknownPartitioning(0) } object StreamingExecutionRelation { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala index a797ac1879f41..33208d1938e57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala @@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport @@ -66,4 +67,6 @@ case class WriteToContinuousDataSourceExec(writeSupport: StreamingWriteSupport, sparkContext.emptyRDD } + + override def outputPartitioning: Partitioning = query.outputPartitioning } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala index a41b465548622..7d293ff0ea9a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.test.SharedSQLContext @@ -36,6 +37,9 @@ case class FastOperator(output: Seq[Attribute]) extends SparkPlan { override def producedAttributes: AttributeSet = outputSet override def children: Seq[SparkPlan] = Nil + override def outputPartitioning: Partitioning = { + SparkPlan.defaultPartitioning(sparkContext.defaultParallelism) + } } object TestStrategy extends Strategy { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 02df45d1b7989..da8a1b0afae1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -594,6 +595,8 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long, expectedValue2: Seq(metrics("dummy2"))) sc.emptyRDD } + + override def outputPartitioning: Partitioning = UnknownPartitioning(0) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 92c6632ad7863..827eb293fbc70 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.hive._ @@ -180,10 +181,8 @@ case class HiveTableScanExec( prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) } - protected override def doExecute(): RDD[InternalRow] = { - // Using dummyCallSite, as getCallSite can turn out to be expensive with - // multiple partitions. - val rdd = if (!relation.isPartitioned) { + private def rddForTable(): RDD[InternalRow] = { + if (!relation.isPartitioned) { Utils.withDummyCallSite(sqlContext.sparkContext) { hadoopReader.makeRDDForTable(hiveQlTable) } @@ -192,6 +191,12 @@ case class HiveTableScanExec( hadoopReader.makeRDDForPartitionedTable(prunePartitions(rawPartitions)) } } + } + + protected override def doExecute(): RDD[InternalRow] = { + // Using dummyCallSite, as getCallSite can turn out to be expensive with + // multiple partitions. + val rdd = rddForTable() val numOutputRows = longMetric("numOutputRows") // Avoid to serialize MetastoreRelation because schema is lazy. (see SPARK-15649) val outputSchema = schema @@ -214,4 +219,8 @@ case class HiveTableScanExec( } override def otherCopyArgs: Seq[AnyRef] = Seq(sparkSession) + + override def outputPartitioning: Partitioning = { + SparkPlan.defaultPartitioning(rddForTable().getNumPartitions) + } } From 25f57f92bfcf28b1b29d75dde75fc2dd358488cb Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 2 Nov 2018 12:59:54 +0300 Subject: [PATCH 2/6] Fix number of partitions for union --- .../org/apache/spark/sql/execution/basicPhysicalOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 0ea76f1bd3c11..2329d4b4d1629 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -604,7 +604,7 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { sparkContext.union(children.map(_.execute())) override def outputPartitioning: Partitioning = { - PartitioningCollection(children.map(_.outputPartitioning)) + UnknownPartitioning(children.map(_.outputPartitioning.numPartitions).sum) } } From 4484b242af756b0afd7f0e947512e72069e175ee Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 2 Nov 2018 13:14:37 +0300 Subject: [PATCH 3/6] Replacing def by lazy val --- .../apache/spark/sql/hive/execution/HiveTableScanExec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 827eb293fbc70..33a620706e079 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -181,7 +181,7 @@ case class HiveTableScanExec( prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) } - private def rddForTable(): RDD[InternalRow] = { + private lazy val rddForTable: RDD[InternalRow] = { if (!relation.isPartitioned) { Utils.withDummyCallSite(sqlContext.sparkContext) { hadoopReader.makeRDDForTable(hiveQlTable) @@ -196,7 +196,7 @@ case class HiveTableScanExec( protected override def doExecute(): RDD[InternalRow] = { // Using dummyCallSite, as getCallSite can turn out to be expensive with // multiple partitions. - val rdd = rddForTable() + val rdd = rddForTable val numOutputRows = longMetric("numOutputRows") // Avoid to serialize MetastoreRelation because schema is lazy. (see SPARK-15649) val outputSchema = schema @@ -221,6 +221,6 @@ case class HiveTableScanExec( override def otherCopyArgs: Seq[AnyRef] = Seq(sparkSession) override def outputPartitioning: Partitioning = { - SparkPlan.defaultPartitioning(rddForTable().getNumPartitions) + SparkPlan.defaultPartitioning(rddForTable.getNumPartitions) } } From ec5fd9b5a141c6514814c9b1b677739fc15a6f4c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 2 Nov 2018 13:30:25 +0300 Subject: [PATCH 4/6] Fix union --- .../org/apache/spark/sql/execution/basicPhysicalOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 2329d4b4d1629..a39c303ffac21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -604,7 +604,7 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { sparkContext.union(children.map(_.execute())) override def outputPartitioning: Partitioning = { - UnknownPartitioning(children.map(_.outputPartitioning.numPartitions).sum) + SparkPlan.defaultPartitioning(children.map(_.outputPartitioning.numPartitions).sum) } } From e9f448fefd91327fc532f2fe374773382ebd893b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 2 Nov 2018 20:04:51 +0300 Subject: [PATCH 5/6] Reverting changes because they causes an assert failure --- .../hive/execution/HiveTableScanExec.scala | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 33a620706e079..5137a9599506c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -181,22 +181,20 @@ case class HiveTableScanExec( prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) } - private lazy val rddForTable: RDD[InternalRow] = { - if (!relation.isPartitioned) { - Utils.withDummyCallSite(sqlContext.sparkContext) { - hadoopReader.makeRDDForTable(hiveQlTable) - } - } else { - Utils.withDummyCallSite(sqlContext.sparkContext) { - hadoopReader.makeRDDForPartitionedTable(prunePartitions(rawPartitions)) - } - } - } - protected override def doExecute(): RDD[InternalRow] = { // Using dummyCallSite, as getCallSite can turn out to be expensive with // multiple partitions. - val rdd = rddForTable + val rdd = { + if (!relation.isPartitioned) { + Utils.withDummyCallSite(sqlContext.sparkContext) { + hadoopReader.makeRDDForTable(hiveQlTable) + } + } else { + Utils.withDummyCallSite(sqlContext.sparkContext) { + hadoopReader.makeRDDForPartitionedTable(prunePartitions(rawPartitions)) + } + } + } val numOutputRows = longMetric("numOutputRows") // Avoid to serialize MetastoreRelation because schema is lazy. (see SPARK-15649) val outputSchema = schema @@ -220,7 +218,5 @@ case class HiveTableScanExec( override def otherCopyArgs: Seq[AnyRef] = Seq(sparkSession) - override def outputPartitioning: Partitioning = { - SparkPlan.defaultPartitioning(rddForTable.getNumPartitions) - } + override def outputPartitioning: Partitioning = UnknownPartitioning(0) } From fc765f021131da0b3291d2f7ba1238db5430ca57 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 10 Nov 2018 11:47:28 +0100 Subject: [PATCH 6/6] Reverting back changes for UnionExec --- .../apache/spark/sql/execution/basicPhysicalOperators.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index a39c303ffac21..953c3b35c5c52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -603,9 +603,7 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { protected override def doExecute(): RDD[InternalRow] = sparkContext.union(children.map(_.execute())) - override def outputPartitioning: Partitioning = { - SparkPlan.defaultPartitioning(children.map(_.outputPartitioning.numPartitions).sum) - } + override def outputPartitioning: Partitioning = UnknownPartitioning(0) } /**