Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35874][SQL] AQE Shuffle should wait for its subqueries to finish before materializing #33058

Closed
wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Jun 24, 2021

What changes were proposed in this pull request?

Currently, AQE uses a very tricky way to trigger and wait for the subqueries:

  1. submitting stage calls QueryStageExec.materialize
  2. QueryStageExec.materialize calls executeQuery
  3. executeQuery does some preparation works, which goes to QueryStageExec.doPrepare
  4. QueryStageExec.doPrepare calls prepare of shuffle/broadcast, which triggers all the subqueries in this stage
  5. executeQuery then calls waitForSubqueries, which does nothing because QueryStageExec itself has no subqueries
  6. then we submit the shuffle/broadcast job, without waiting for subqueries
  7. for ShuffleExchangeExec.mapOutputStatisticsFuture, it calls child.execute, which calls executeQuery and wait for subqueries in the query tree of child
  8. The only missing case is: ShuffleExchangeExec itself may contain subqueries(repartition expression) and AQE doesn't wait for it.

A simple fix would be overwriting waitForSubqueries in QueryStageExec, and forward the request to shuffle/broadcast, but this PR proposes a different and probably cleaner way: we follow execute/doExecute in SparkPlan, and add similar APIs in the AQE version of "execute", which gets a future from shuffle/broadcast.

Why are the changes needed?

bug fix

Does this PR introduce any user-facing change?

a query fails without the fix and can run now

How was this patch tested?

new test

@github-actions github-actions bot added the SQL label Jun 24, 2021
@cloud-fan
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44792/

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44792/

@@ -58,7 +58,11 @@ trait BroadcastExchangeLike extends Exchange {
* For registering callbacks on `relationFuture`.
* Note that calling this method may not start the execution of broadcast job.
*/
def completionFuture: scala.concurrent.Future[broadcast.Broadcast[Any]]
final def submitBroadcastJob: scala.concurrent.Future[broadcast.Broadcast[Any]] = executeQuery {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of the AQE version of "execute", as AQE won't call execute of shuffle/broadcast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some comments for this method?

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44795/

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44795/

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Test build #140263 has finished for PR 33058 at commit 1f7e6c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Test build #140265 has finished for PR 33058 at commit c964743.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 1, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45015/

@SparkQA
Copy link

SparkQA commented Jul 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45017/

@SparkQA
Copy link

SparkQA commented Jul 1, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45017/

@SparkQA
Copy link

SparkQA commented Jul 1, 2021

Test build #140503 has finished for PR 33058 at commit c964743.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 7, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45276/

@SparkQA
Copy link

SparkQA commented Jul 7, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45276/

@SparkQA
Copy link

SparkQA commented Jul 7, 2021

Test build #140764 has finished for PR 33058 at commit 6a7e388.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yaooqinn
Copy link
Member

yaooqinn commented Jul 8, 2021

LGTM

@cloud-fan cloud-fan closed this in 2df67a1 Jul 8, 2021
cloud-fan added a commit that referenced this pull request Jul 8, 2021
…sh before materializing

### What changes were proposed in this pull request?

Currently, AQE uses a very tricky way to trigger and wait for the subqueries:
1. submitting stage calls `QueryStageExec.materialize`
2. `QueryStageExec.materialize` calls `executeQuery`
3. `executeQuery` does some preparation works, which goes to `QueryStageExec.doPrepare`
4. `QueryStageExec.doPrepare` calls `prepare` of shuffle/broadcast, which triggers all the subqueries in this stage
5. `executeQuery` then calls `waitForSubqueries`, which does nothing because `QueryStageExec` itself has no subqueries
6. then we submit the shuffle/broadcast job, without waiting for subqueries
7. for `ShuffleExchangeExec.mapOutputStatisticsFuture`, it calls `child.execute`, which calls `executeQuery` and wait for subqueries in the query tree of `child`
8. The only missing case is: `ShuffleExchangeExec` itself may contain subqueries(repartition expression) and AQE doesn't wait for it.

A simple fix would be overwriting `waitForSubqueries` in `QueryStageExec`, and forward the request to shuffle/broadcast, but this PR proposes a different and probably cleaner way: we follow `execute`/`doExecute` in `SparkPlan`, and add similar APIs in the AQE version of "execute", which gets a future from shuffle/broadcast.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

a query fails without the fix and can run now

### How was this patch tested?

new test

Closes #33058 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 2df67a1)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor Author

thanks for the review, merging to master/3.2!

domybest11 pushed a commit to domybest11/spark that referenced this pull request Jun 15, 2022
…sh before materializing

Currently, AQE uses a very tricky way to trigger and wait for the subqueries:
1. submitting stage calls `QueryStageExec.materialize`
2. `QueryStageExec.materialize` calls `executeQuery`
3. `executeQuery` does some preparation works, which goes to `QueryStageExec.doPrepare`
4. `QueryStageExec.doPrepare` calls `prepare` of shuffle/broadcast, which triggers all the subqueries in this stage
5. `executeQuery` then calls `waitForSubqueries`, which does nothing because `QueryStageExec` itself has no subqueries
6. then we submit the shuffle/broadcast job, without waiting for subqueries
7. for `ShuffleExchangeExec.mapOutputStatisticsFuture`, it calls `child.execute`, which calls `executeQuery` and wait for subqueries in the query tree of `child`
8. The only missing case is: `ShuffleExchangeExec` itself may contain subqueries(repartition expression) and AQE doesn't wait for it.

A simple fix would be overwriting `waitForSubqueries` in `QueryStageExec`, and forward the request to shuffle/broadcast, but this PR proposes a different and probably cleaner way: we follow `execute`/`doExecute` in `SparkPlan`, and add similar APIs in the AQE version of "execute", which gets a future from shuffle/broadcast.

bug fix

a query fails without the fix and can run now

new test

Closes apache#33058 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 2df67a1)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
wangyum pushed a commit that referenced this pull request May 26, 2023
…eries to finish before materializing (#998)

* [SPARK-35874][SQL] AQE Shuffle should wait for its subqueries to finish before materializing

Currently, AQE uses a very tricky way to trigger and wait for the subqueries:
1. submitting stage calls `QueryStageExec.materialize`
2. `QueryStageExec.materialize` calls `executeQuery`
3. `executeQuery` does some preparation works, which goes to `QueryStageExec.doPrepare`
4. `QueryStageExec.doPrepare` calls `prepare` of shuffle/broadcast, which triggers all the subqueries in this stage
5. `executeQuery` then calls `waitForSubqueries`, which does nothing because `QueryStageExec` itself has no subqueries
6. then we submit the shuffle/broadcast job, without waiting for subqueries
7. for `ShuffleExchangeExec.mapOutputStatisticsFuture`, it calls `child.execute`, which calls `executeQuery` and wait for subqueries in the query tree of `child`
8. The only missing case is: `ShuffleExchangeExec` itself may contain subqueries(repartition expression) and AQE doesn't wait for it.

A simple fix would be overwriting `waitForSubqueries` in `QueryStageExec`, and forward the request to shuffle/broadcast, but this PR proposes a different and probably cleaner way: we follow `execute`/`doExecute` in `SparkPlan`, and add similar APIs in the AQE version of "execute", which gets a future from shuffle/broadcast.

bug fix

a query fails without the fix and can run now

new test

Closes #33058 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 2df67a1)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* fix ut

Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants