Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42034] QueryExecutionListener and Observation API do not work with foreach / reduce / foreachPartition action. #39976

Closed
wants to merge 1 commit into from

Conversation

zzzzming95
Copy link
Contributor

What changes were proposed in this pull request?

Add the name parameter for 'foreach'/'reduce'/'foreachPartition' operators in DataSet#withNewRDDExecutionId. Because the QueryExecutionListener and Observation API is triggered only when the operators have the name parameter.

e.executionName.isDefined && e.qe.sparkSession.sessionUUID == sessionUUID

Why are the changes needed?

The QueryExecutionListener and Observation API is triggered only when the operators have the name parameter.

Does this PR introduce any user-facing change?

No

How was this patch tested?

add two unit test.

@github-actions github-actions bot added the SQL label Feb 11, 2023
@@ -960,6 +960,19 @@ class DatasetSuite extends QueryTest
observe(spark.range(1, 10, 1, 11), Map("percentile_approx_val" -> 5))
}

test("observation on datasets when a DataSet trigger foreach action") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
test("observation on datasets when a DataSet trigger foreach action") {
test("SPARK-42034: observation on datasets when a DataSet trigger foreach action") {

@@ -96,6 +96,34 @@ class DataFrameCallbackSuite extends QueryTest
spark.listenerManager.unregister(listener)
}

test("execute callback functions when a DataSet trigger foreach action finished") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
test("execute callback functions when a DataSet trigger foreach action finished") {
test("SPARK-42034: execute callback functions when a DataSet trigger foreach action finished") {

assert(metrics(0)._1 == "foreach")
assert(metrics(1)._1 == "reduce")

spark.listenerManager.unregister(listener)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add this into finally so the test failure of this doesn't affect other tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know other tests don't. but let's at least do it here.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM from my end. cc @hvanhovell and @beliefer FYI

@beliefer
Copy link
Contributor

@HyukjinKwon Thank you for ping me.

Copy link
Contributor

@beliefer beliefer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too.

@HyukjinKwon
Copy link
Member

Merged to master.

@zzzzming95
Copy link
Contributor Author

Thanks @HyukjinKwon @beliefer

@vidma
Copy link

vidma commented Oct 9, 2023

this wouldn't fix the .observe() not working in df.write.jdbc() because that one uses repartitionedDF.rdd.foreachPartition (i.e. with extra .rdd)!
I think we need to remove .rdd usage from JdbcUtils.scala and make any other changes if necessary.

@zzzzming95 @HyukjinKwon @beliefer

https://github.com/zzzzming95/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L901-L903

@HyukjinKwon
Copy link
Member

Thanks. Made a followup: #43304

HyukjinKwon added a commit that referenced this pull request Oct 10, 2023
…eachPartition in JdbcUtils

### What changes were proposed in this pull request?

This PR is kind of a followup for #39976 that addresses #39976 (comment) comment.

### Why are the changes needed?

In order to probably assign the SQL execution ID so `df.observe` works with this.

### Does this PR introduce _any_ user-facing change?

Yes. `df.observe` will work with JDBC connectors.

### How was this patch tested?

Manually tested.

### Was this patch authored or co-authored using generative AI tooling?

Unit test was added.

Closes #43304 from HyukjinKwon/foreachbatch.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request Oct 10, 2023
…eachPartition in JdbcUtils

This PR is kind of a followup for #39976 that addresses #39976 (comment) comment.

In order to probably assign the SQL execution ID so `df.observe` works with this.

Yes. `df.observe` will work with JDBC connectors.

Manually tested.

Unit test was added.

Closes #43304 from HyukjinKwon/foreachbatch.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 39cc4ab)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request Oct 11, 2023
…D.foreachPartition in JdbcUtils

This PR cherry-picks #43304 to branch-3.5

---

### What changes were proposed in this pull request?

This PR is kind of a followup for #39976 that addresses #39976 (comment) comment.

### Why are the changes needed?

In order to probably assign the SQL execution ID so `df.observe` works with this.

### Does this PR introduce _any_ user-facing change?

Yes. `df.observe` will work with JDBC connectors.

### How was this patch tested?

Manually tested.

### Was this patch authored or co-authored using generative AI tooling?

Unit test was added.

Closes #43322 from HyukjinKwon/SPARK-45475-3.5.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants