Skip to content

[SPARK-31475][SQL] Broadcast stage in AQE did not timeout#28250

Closed
maryannxue wants to merge 2 commits intoapache:masterfrom
maryannxue:aqe-broadcast-timeout
Closed

[SPARK-31475][SQL] Broadcast stage in AQE did not timeout#28250
maryannxue wants to merge 2 commits intoapache:masterfrom
maryannxue:aqe-broadcast-timeout

Conversation

@maryannxue
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds a timeout for the Future of a BroadcastQueryStageExec to make sure it can have the same timeout behavior as a non-AQE broadcast exchange.

Why are the changes needed?

This is to make the broadcast timeout behavior in AQE consistent with that in non-AQE.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added UT.

@maryannxue
Copy link
Contributor Author

@cloud-fan @Ngone51 @JkSelf

@SparkQA
Copy link

SparkQA commented Apr 18, 2020

Test build #121429 has finished for PR 28250 at commit 8b4134a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val e = intercept[Exception] {
testDf.collect()
}
AdaptiveTestUtils.assertExceptionMessage(e, s"Could not execute broadcast in $timeout secs.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this test runs 30 seconds? Can we make it a bit shorter?

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

override def run(): Unit = {
promise.tryFailure(new SparkException(s"Could not execute broadcast in $timeout secs. " +
s"You can increase the timeout for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or " +
s"disable broadcast join by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancel the job group as BroadcastExchangeExec does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done in the AQE mechanism already: after the timeout happens, this will become a StageFailure event in the AQE event queue, which will trigger a cleanup that calls the cancel() routine of each running query stage (including the broadcast stage that has timed out). And a broadcast stage's cancel() stops the broadcast thread as well as the job group.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for your explanation :)

@SparkQA
Copy link

SparkQA commented Apr 20, 2020

Test build #121534 has finished for PR 28250 at commit 65ee658.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master/3.0

gatorsmile pushed a commit that referenced this pull request Apr 20, 2020
### What changes were proposed in this pull request?

This PR adds a timeout for the Future of a BroadcastQueryStageExec to make sure it can have the same timeout behavior as a non-AQE broadcast exchange.

### Why are the changes needed?

This is to make the broadcast timeout behavior in AQE consistent with that in non-AQE.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added UT.

Closes #28250 from maryannxue/aqe-broadcast-timeout.

Authored-by: Maryann Xue <maryann.xue@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
(cherry picked from commit 44d370d)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants

Comments