Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46732][CONNECT][3.5]Make Subquery/Broadcast thread work with Connect's artifact management #44763

Closed
wants to merge 1 commit into from

Conversation

xieshuaihu
Copy link

What changes were proposed in this pull request?

Similar with SPARK-44794, propagate JobArtifactState to broadcast/subquery thread.

This is an example:

val add1 = udf((i: Long) => i + 1)
val tableA = spark.range(2).alias("a")
val tableB = broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b")
tableA.join(tableB).
  where(col("a.id")===col("b.id")).
  select(col("a.id").alias("a_id"), col("b.id").alias("b_id")).
  collect().
  mkString("[", ", ", "]")

Before this pr, this example will throw exception ClassNotFoundException. Subquery and Broadcast execution use a separate ThreadPool which don't have the JobArtifactState.

Why are the changes needed?

Fix bug. Make Subquery/Broadcast thread work with Connect's artifact management.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add a new test to ReplE2ESuite

Was this patch authored or co-authored using generative AI tooling?

No

@xieshuaihu
Copy link
Author

@HyukjinKwon

@HyukjinKwon
Copy link
Member

Merged to branch-3.5.

HyukjinKwon pushed a commit that referenced this pull request Jan 17, 2024
…Connect's artifact management

### What changes were proposed in this pull request?

Similar with SPARK-44794, propagate JobArtifactState to broadcast/subquery thread.

This is an example:

```scala
val add1 = udf((i: Long) => i + 1)
val tableA = spark.range(2).alias("a")
val tableB = broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b")
tableA.join(tableB).
  where(col("a.id")===col("b.id")).
  select(col("a.id").alias("a_id"), col("b.id").alias("b_id")).
  collect().
  mkString("[", ", ", "]")
```

Before this pr, this example will throw exception `ClassNotFoundException`. Subquery and Broadcast execution use a separate ThreadPool which don't have the `JobArtifactState`.

### Why are the changes needed?
Fix bug. Make Subquery/Broadcast thread work with Connect's artifact management.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add a new test to `ReplE2ESuite`

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44763 from xieshuaihu/SPARK-46732backport.

Authored-by: xieshuaihu <xieshuaihu@agora.io>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants