Skip to content

Commit

Permalink
[SPARK-46732][CONNECT][3.5] Make Subquery/Broadcast thread work with …
Browse files Browse the repository at this point in the history
…Connect's artifact management

### What changes were proposed in this pull request?

Similar with SPARK-44794, propagate JobArtifactState to broadcast/subquery thread.

This is an example:

```scala
val add1 = udf((i: Long) => i + 1)
val tableA = spark.range(2).alias("a")
val tableB = broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b")
tableA.join(tableB).
  where(col("a.id")===col("b.id")).
  select(col("a.id").alias("a_id"), col("b.id").alias("b_id")).
  collect().
  mkString("[", ", ", "]")
```

Before this pr, this example will throw exception `ClassNotFoundException`. Subquery and Broadcast execution use a separate ThreadPool which don't have the `JobArtifactState`.

### Why are the changes needed?
Fix bug. Make Subquery/Broadcast thread work with Connect's artifact management.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add a new test to `ReplE2ESuite`

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44763 from xieshuaihu/SPARK-46732backport.

Authored-by: xieshuaihu <xieshuaihu@agora.io>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
xieshuaihu authored and HyukjinKwon committed Jan 17, 2024
1 parent 10d5d89 commit 5a0bc96
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,20 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach {
val output = runCommandsInShell(input)
assertContains("noException: Boolean = true", output)
}

test("broadcast works with REPL generated code") {
val input =
"""
|val add1 = udf((i: Long) => i + 1)
|val tableA = spark.range(2).alias("a")
|val tableB = broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b")
|tableA.join(tableB).
| where(col("a.id")===col("b.id")).
| select(col("a.id").alias("a_id"), col("b.id").alias("b_id")).
| collect().
| mkString("[", ", ", "]")
|""".stripMargin
val output = runCommandsInShell(input)
assertContains("""String = "[[1,1]]"""", output)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future => JFuture}
import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.{ErrorMessageFormat, SparkContext, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.{ErrorMessageFormat, JobArtifactSet, SparkContext, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX}
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -215,7 +215,8 @@ object SQLExecution {
val activeSession = sparkSession
val sc = sparkSession.sparkContext
val localProps = Utils.cloneProperties(sc.getLocalProperties)
exec.submit(() => {
val artifactState = JobArtifactSet.getCurrentJobArtifactState.orNull
exec.submit(() => JobArtifactSet.withActiveJobArtifactState(artifactState) {
val originalSession = SparkSession.getActiveSession
val originalLocalProps = sc.getLocalProperties
SparkSession.setActiveSession(activeSession)
Expand Down

0 comments on commit 5a0bc96

Please sign in to comment.