Skip to content

Commit

Permalink
[SPARK-24497][SQL] fixes exchange coordinator copy
Browse files Browse the repository at this point in the history
Change-Id: Id37676580badfe861558d57781eb0ba78b1752e4
  • Loading branch information
peter-toth committed Jan 16, 2019
1 parent d9cc864 commit 8f9a673
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ShuffleExchangeExec}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.LongType
Expand Down Expand Up @@ -251,8 +251,12 @@ case class RecursiveTableExec(
s"increasing ${SQLConf.RECURSION_LEVEL_LIMIT.key}")
}

val newCoordinators = recursiveTerm.collect {
case ShuffleExchangeExec(_, _, Some(co)) => co
}.toSet[ExchangeCoordinator].map(co => co -> co.copy).toMap
val newRecursiveTerm = recursiveTerm.transform {
case se @ ShuffleExchangeExec(_, _, Some(co)) => se.copy(coordinator = Some(co.copy))
case se @ ShuffleExchangeExec(_, _, Some(co)) =>
se.copy(coordinator = Some(newCoordinators(co)))
}
if (level > 0) {
newRecursiveTerm.reset()
Expand Down

0 comments on commit 8f9a673

Please sign in to comment.