New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-43952][SQL][FOLLOWUP] Correct AQE cancel broadcast job tag #41979
Conversation
ee9c1c6
to
afda62c
Compare
*/ | ||
def runId: UUID = UUID.randomUUID | ||
@deprecated("Use jobTag", "3.5.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this annotation necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that, the semantics of runId lightly changed. Before it represents the job id, and now it is the unique id in job tag. But I'm fine to remove it if you think it is unnecessary.
afda62c
to
eb280a9
Compare
@transient | ||
val runId: UUID = UUID.randomUUID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before transient was on jobTag
, which is deterministically derived from runId
. Now as the runId
is transient, will it not cause a new UUID.randomUUID to be regenerated if this classes is serialized and deserialized?
I don't have full context of the implications so defer to @HyukjinKwon and @cloud-fan .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting.. Let me remove it for safe.
@@ -250,7 +250,7 @@ case class BroadcastQueryStageExec( | |||
|
|||
override def cancel(): Unit = { | |||
if (!broadcast.relationFuture.isDone) { | |||
sparkContext.cancelJobGroup(broadcast.runId.toString) | |||
sparkContext.cancelJobsWithTag(broadcast.jobTag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm,
currently BroadcastExchangeExec.doBroacast only does cancelJobsWithTag when it catches a TimeoutException.
Should we cancel there for broader exceptions (even any Throwable), and then broadcast.relationFuture.cancel(true)
here should throw there and cancel there, and this place does not have to cancel separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When AQE materialize broadcast relation, it does not call BroadcastExchangeExec.doBroacast
, so it has no affect to cache exception in BroadcastExchangeExec.doBroacast
.
cc @yaooqinn @juliuszsompolski @HyukjinKwon @cloud-fan any more comments? I think this should go to branch-3.5 |
@ulysses-you wanna try merging this PR? |
@HyukjinKwon sure, let me try to merge it carefully |
### What changes were proposed in this pull request? This pr changes `cancelJobGroup` to `cancelJobsWithTag ` in AQE, so that broadcast exchange can be cancelled correctly. Since we do not set job id when executing broadcast job and use job tag to cancel it, this pr adds `jobTag` to `BroadcastExchangeLike`. ### Why are the changes needed? fix regression ### Does this PR introduce _any_ user-facing change? no, not released yet ### How was this patch tested? test manully ```sql select * from t1 join (select c1, java_method('java.lang.Thread', 'sleep', 5000l) from t2)t2 on t1.c1 = t2.c1 join (select c1, raise_error('force_fail') from t3)t3 on t1.c1 = t3.c1 ``` before: <img width="1194" alt="image" src="https://github.com/apache/spark/assets/12025282/55d218da-7289-404a-b201-1ea9f4902026"> after: <img width="1202" alt="image" src="https://github.com/apache/spark/assets/12025282/9b293d1f-01d6-43e2-9c1a-20540f58c3e5"> Closes #41979 from ulysses-you/jobtag-followup. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Xiduo You <ulyssesyou@apache.org> (cherry picked from commit 99f9df5) Signed-off-by: Xiduo You <ulyssesyou@apache.org>
thank you all, merged to master/branch-3.5 |
### What changes were proposed in this pull request? This pr changes `cancelJobGroup` to `cancelJobsWithTag ` in AQE, so that broadcast exchange can be cancelled correctly. Since we do not set job id when executing broadcast job and use job tag to cancel it, this pr adds `jobTag` to `BroadcastExchangeLike`. ### Why are the changes needed? fix regression ### Does this PR introduce _any_ user-facing change? no, not released yet ### How was this patch tested? test manully ```sql select * from t1 join (select c1, java_method('java.lang.Thread', 'sleep', 5000l) from t2)t2 on t1.c1 = t2.c1 join (select c1, raise_error('force_fail') from t3)t3 on t1.c1 = t3.c1 ``` before: <img width="1194" alt="image" src="https://github.com/apache/spark/assets/12025282/55d218da-7289-404a-b201-1ea9f4902026"> after: <img width="1202" alt="image" src="https://github.com/apache/spark/assets/12025282/9b293d1f-01d6-43e2-9c1a-20540f58c3e5"> Closes apache#41979 from ulysses-you/jobtag-followup. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Xiduo You <ulyssesyou@apache.org>
What changes were proposed in this pull request?
This pr changes
cancelJobGroup
tocancelJobsWithTag
in AQE, so that broadcast exchange can be cancelled correctly.Since we do not set job id when executing broadcast job and use job tag to cancel it, this pr adds
jobTag
toBroadcastExchangeLike
.Why are the changes needed?
fix regression
Does this PR introduce any user-facing change?
no, not released yet
How was this patch tested?
test manully
before:
after: