diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 8fa798e863694..ca342a53796a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ShuffleExchangeExec} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType @@ -251,8 +251,12 @@ case class RecursiveTableExec( s"increasing ${SQLConf.RECURSION_LEVEL_LIMIT.key}") } + val newCoordinators = recursiveTerm.collect { + case ShuffleExchangeExec(_, _, Some(co)) => co + }.toSet[ExchangeCoordinator].map(co => co -> co.copy).toMap val newRecursiveTerm = recursiveTerm.transform { - case se @ ShuffleExchangeExec(_, _, Some(co)) => se.copy(coordinator = Some(co.copy)) + case se @ ShuffleExchangeExec(_, _, Some(co)) => + se.copy(coordinator = Some(newCoordinators(co))) } if (level > 0) { newRecursiveTerm.reset()