diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index d316174c40211..e44b65378977c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import java.util.concurrent.{Future => JFuture} +import java.util.concurrent.{Future => JFuture, LinkedBlockingQueue} import java.util.concurrent.TimeUnit._ import scala.collection.mutable @@ -287,9 +287,10 @@ case class RecursiveRelationExec( override def stringArgs: Iterator[Any] = Iterator(cteName, output) - private val physicalRecursiveTerms = new mutable.ArrayBuffer[SparkPlan] + private val physicalRecursiveTerms = new LinkedBlockingQueue[SparkPlan] - def recursiveTermIterations: Seq[SparkPlan] = physicalRecursiveTerms.toList + def recursiveTermIterations: Seq[SparkPlan] = + physicalRecursiveTerms.toArray(Array.empty[SparkPlan]) override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -307,14 +308,11 @@ case class RecursiveRelationExec( override protected def doExecute(): RDD[InternalRow] = { val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - val executionIdLong = if (false) { - None - } else { - Option(executionId).map(_.toLong) - } + val executionIdLong = Option(executionId).map(_.toLong) val levelLimit = conf.getConf(SQLConf.CTE_RECURSION_LEVEL_LIMIT) + // TODO: cache before count, as the RDD can be reused in the next iteration var prevIterationRDD = anchorTerm.execute().map(_.copy()) var prevIterationCount = prevIterationRDD.count() @@ -365,10 +363,11 @@ case class RecursiveRelationExec( val physicalRecursiveTerm = QueryExecution.prepareExecutedPlan(sqlContext.sparkSession, newLogicalRecursiveTerm) - physicalRecursiveTerms += physicalRecursiveTerm + physicalRecursiveTerms.offer(physicalRecursiveTerm) executionIdLong.foreach(onUpdatePlan) + // TODO: cache before count, as the RDD can be reused in the next iteration prevIterationRDD = physicalRecursiveTerm.execute().map(_.copy()) prevIterationCount = prevIterationRDD.count() @@ -385,6 +384,14 @@ case class RecursiveRelationExec( sparkContext.union(accumulatedRDDs) } } + + override def verboseStringWithOperatorId(): String = { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |${ExplainUtils.generateFieldString("CTE", cteName)} + |${ExplainUtils.generateFieldString("Output", output)} + """.stripMargin + } } /** diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index 776a28bc46039..f2db4b3f9248e 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -1052,19 +1052,25 @@ RecursiveRelation (3) (1) LocalTableScan [codegen id : 1] -Output: [col1#x] - +Output [1]: [col1#x] +Arguments: [col1#x] + (2) Project [codegen id : 1] -Output : [col1#x AS level#x] -Input : [col1#x] +Output [1]: [col1#x AS level#x] +Input [1]: [col1#x] (3) RecursiveRelation +CTE: r +Output [1]: [level#x] (4) RecursiveReference - +Arguments: r, [level#x], false, 0 + (5) Filter - -(6) Project +Arguments: (level#x < 10) + +(6) Project +Arguments: [(level#x + 1) AS (level + 1)#x] -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/with.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/with.sql.out index b429d8e867b5f..9d16fe070ce22 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/with.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/with.sql.out @@ -389,7 +389,7 @@ SHOW CREATE TABLE vsubdepartment AS SERDE -- !query schema struct -- !query output -CREATE VIEW `vsubdepartment`( +CREATE VIEW `default`.`vsubdepartment`( `id`, `parent_department`, `name`) @@ -430,7 +430,7 @@ SHOW CREATE TABLE sums_1_100 AS SERDE -- !query schema struct -- !query output -CREATE VIEW `sums_1_100`( +CREATE VIEW `default`.`sums_1_100`( `sum(n)`) AS WITH RECURSIVE t(n) AS ( VALUES (1) @@ -1050,7 +1050,7 @@ SELECT * FROM x struct<> -- !query output org.apache.spark.sql.AnalysisException -cannot resolve '`id`' given input columns: [default.y.a]; line 2 pos 57 +cannot resolve '`id`' given input columns: [spark_catalog.default.y.a]; line 2 pos 57 -- !query