From 5a0bc96d5f5e42fa5e6ea9d024da572343f239a9 Mon Sep 17 00:00:00 2001 From: xieshuaihu Date: Wed, 17 Jan 2024 15:24:58 +0900 Subject: [PATCH] [SPARK-46732][CONNECT][3.5] Make Subquery/Broadcast thread work with Connect's artifact management ### What changes were proposed in this pull request? Similar with SPARK-44794, propagate JobArtifactState to broadcast/subquery thread. This is an example: ```scala val add1 = udf((i: Long) => i + 1) val tableA = spark.range(2).alias("a") val tableB = broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b") tableA.join(tableB). where(col("a.id")===col("b.id")). select(col("a.id").alias("a_id"), col("b.id").alias("b_id")). collect(). mkString("[", ", ", "]") ``` Before this pr, this example will throw exception `ClassNotFoundException`. Subquery and Broadcast execution use a separate ThreadPool which don't have the `JobArtifactState`. ### Why are the changes needed? Fix bug. Make Subquery/Broadcast thread work with Connect's artifact management. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add a new test to `ReplE2ESuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes #44763 from xieshuaihu/SPARK-46732backport. Authored-by: xieshuaihu Signed-off-by: Hyukjin Kwon --- .../spark/sql/application/ReplE2ESuite.scala | 16 ++++++++++++++++ .../spark/sql/execution/SQLExecution.scala | 5 +++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala index 5bb8cbf3543b0..9d61b4d56e1ed 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala @@ -362,4 +362,20 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { val output = runCommandsInShell(input) assertContains("noException: Boolean = true", output) } + + test("broadcast works with REPL generated code") { + val input = + """ + |val add1 = udf((i: Long) => i + 1) + |val tableA = spark.range(2).alias("a") + |val tableB = broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b") + |tableA.join(tableB). + | where(col("a.id")===col("b.id")). + | select(col("a.id").alias("a_id"), col("b.id").alias("b_id")). + | collect(). + | mkString("[", ", ", "]") + |""".stripMargin + val output = runCommandsInShell(input) + assertContains("""String = "[[1,1]]"""", output) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index daeac699c2791..b4cbb61352235 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future => JFuture} import java.util.concurrent.atomic.AtomicLong -import org.apache.spark.{ErrorMessageFormat, SparkContext, SparkThrowable, SparkThrowableHelper} +import org.apache.spark.{ErrorMessageFormat, JobArtifactSet, SparkContext, SparkThrowable, SparkThrowableHelper} import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX} import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.sql.SparkSession @@ -215,7 +215,8 @@ object SQLExecution { val activeSession = sparkSession val sc = sparkSession.sparkContext val localProps = Utils.cloneProperties(sc.getLocalProperties) - exec.submit(() => { + val artifactState = JobArtifactSet.getCurrentJobArtifactState.orNull + exec.submit(() => JobArtifactSet.withActiveJobArtifactState(artifactState) { val originalSession = SparkSession.getActiveSession val originalLocalProps = sc.getLocalProperties SparkSession.setActiveSession(activeSession)