Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Mar 16, 2020
1 parent feace7b commit 656995b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import java.util.concurrent.{Future => JFuture}
import java.util.concurrent.{Future => JFuture, LinkedBlockingQueue}
import java.util.concurrent.TimeUnit._

import scala.collection.mutable
Expand Down Expand Up @@ -287,9 +287,10 @@ case class RecursiveRelationExec(

override def stringArgs: Iterator[Any] = Iterator(cteName, output)

private val physicalRecursiveTerms = new mutable.ArrayBuffer[SparkPlan]
private val physicalRecursiveTerms = new LinkedBlockingQueue[SparkPlan]

def recursiveTermIterations: Seq[SparkPlan] = physicalRecursiveTerms.toList
def recursiveTermIterations: Seq[SparkPlan] =
physicalRecursiveTerms.toArray(Array.empty[SparkPlan])

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand All @@ -307,14 +308,11 @@ case class RecursiveRelationExec(

override protected def doExecute(): RDD[InternalRow] = {
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val executionIdLong = if (false) {
None
} else {
Option(executionId).map(_.toLong)
}
val executionIdLong = Option(executionId).map(_.toLong)

val levelLimit = conf.getConf(SQLConf.CTE_RECURSION_LEVEL_LIMIT)

// TODO: cache before count, as the RDD can be reused in the next iteration
var prevIterationRDD = anchorTerm.execute().map(_.copy())
var prevIterationCount = prevIterationRDD.count()

Expand Down Expand Up @@ -365,10 +363,11 @@ case class RecursiveRelationExec(
val physicalRecursiveTerm =
QueryExecution.prepareExecutedPlan(sqlContext.sparkSession, newLogicalRecursiveTerm)

physicalRecursiveTerms += physicalRecursiveTerm
physicalRecursiveTerms.offer(physicalRecursiveTerm)

executionIdLong.foreach(onUpdatePlan)

// TODO: cache before count, as the RDD can be reused in the next iteration
prevIterationRDD = physicalRecursiveTerm.execute().map(_.copy())
prevIterationCount = prevIterationRDD.count()

Expand All @@ -385,6 +384,14 @@ case class RecursiveRelationExec(
sparkContext.union(accumulatedRDDs)
}
}

override def verboseStringWithOperatorId(): String = {
s"""
|(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
|${ExplainUtils.generateFieldString("CTE", cteName)}
|${ExplainUtils.generateFieldString("Output", output)}
""".stripMargin
}
}

/**
Expand Down
20 changes: 13 additions & 7 deletions sql/core/src/test/resources/sql-tests/results/explain.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -1052,19 +1052,25 @@ RecursiveRelation (3)


(1) LocalTableScan [codegen id : 1]
Output: [col1#x]

Output [1]: [col1#x]
Arguments: [col1#x]

(2) Project [codegen id : 1]
Output : [col1#x AS level#x]
Input : [col1#x]
Output [1]: [col1#x AS level#x]
Input [1]: [col1#x]

(3) RecursiveRelation
CTE: r
Output [1]: [level#x]

(4) RecursiveReference

Arguments: r, [level#x], false, 0

(5) Filter

(6) Project
Arguments: (level#x < 10)

(6) Project
Arguments: [(level#x + 1) AS (level + 1)#x]


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ SHOW CREATE TABLE vsubdepartment AS SERDE
-- !query schema
struct<createtab_stmt:string>
-- !query output
CREATE VIEW `vsubdepartment`(
CREATE VIEW `default`.`vsubdepartment`(
`id`,
`parent_department`,
`name`)
Expand Down Expand Up @@ -430,7 +430,7 @@ SHOW CREATE TABLE sums_1_100 AS SERDE
-- !query schema
struct<createtab_stmt:string>
-- !query output
CREATE VIEW `sums_1_100`(
CREATE VIEW `default`.`sums_1_100`(
`sum(n)`)
AS WITH RECURSIVE t(n) AS (
VALUES (1)
Expand Down Expand Up @@ -1050,7 +1050,7 @@ SELECT * FROM x
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve '`id`' given input columns: [default.y.a]; line 2 pos 57
cannot resolve '`id`' given input columns: [spark_catalog.default.y.a]; line 2 pos 57


-- !query
Expand Down

0 comments on commit 656995b

Please sign in to comment.