Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37023][CORE] Avoid fetching merge status when shuffleMergeEnabled is false for a shuffleDependency during retry #34461

Closed
wants to merge 5 commits into from

Conversation

rmcyang
Copy link
Contributor

@rmcyang rmcyang commented Nov 1, 2021

What changes were proposed in this pull request?

At high level, created a helper method getMapSizesByExecutorIdImpl on which getMapSizesByExecutorId and getPushBasedShuffleMapSizesByExecutorId can rely. It takes a parameter useMergeResult, which helps to check if fetching merge result is needed or not, and pass it as canFetchMergeResult into getStatuses.

Why are the changes needed?

During some stage retry cases, the shuffleDependency.shuffleMergeEnabled can be set to false, but there will be mergeStatus since the Driver has already collected the merged status for its shuffle dependency. If this is the case, the current implementation would set the enableBatchFetch to false, since there are mergeStatus, to cause the assertion in MapOutoutputTracker.getMapSizesByExecutorId failed:

assert(mapSizesByExecutorId.enableBatchFetch == true)

The proposed fix helps resolve the issue.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Passed the existing UTs.

…false for a shuffleDependency during retry
@github-actions github-actions bot added the CORE label Nov 1, 2021
@rmcyang
Copy link
Contributor Author

rmcyang commented Nov 1, 2021

cc @mridulm @Ngone51 @venkata91 @zhouyejoe Please take a look. Thanks.

@HyukjinKwon HyukjinKwon changed the title SPARK-37023: Avoid fetching merge status when shuffleMergeEnabled is false for a shuffleDependency during retry [SPARK-37023][CORE] Avoid fetching merge status when shuffleMergeEnabled is false for a shuffleDependency during retry Nov 2, 2021
Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a minor comment.
The change looks good to me.

+CC @Ngone51

@mridulm
Copy link
Contributor

mridulm commented Nov 3, 2021

Ok to test

@SparkQA
Copy link

SparkQA commented Nov 3, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49354/

@SparkQA
Copy link

SparkQA commented Nov 3, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49354/

@SparkQA
Copy link

SparkQA commented Nov 3, 2021

Test build #144884 has finished for PR 34461 at commit b1cca0c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this looks like a bug, could you add a unit test case please, @rmcyang ?

@rmcyang
Copy link
Contributor Author

rmcyang commented Nov 5, 2021

Added unit test. cc @mridulm @dongjoon-hyun @zhouyejoe

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49412/

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49412/

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Test build #144940 has finished for PR 34461 at commit 9c399b4.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that this condition may be wrong:

if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)

For a retried map stage, we don't push&merge shuffle blocks for tasks due to dep.shuffleMergeEnabled=false. However, dep.shuffleMergeEnabled=false should be the reason for the reduce stage to not fetch from the existing merged shuffle data in the previous attempt of the map stage. Right?

@rmcyang
Copy link
Contributor Author

rmcyang commented Nov 8, 2021

I'm thinking that this condition may be wrong:

if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)

For a retried map stage, we don't push&merge shuffle blocks for tasks due to dep.shuffleMergeEnabled=false. However, dep.shuffleMergeEnabled=false should be the reason for the reduce stage to not fetch from the existing merged shuffle data in the previous attempt of the map stage. Right?

IIUC, for a retried map stage, it would go with below code path due to dep.shuffleMergeEnabled=false:

} else {
val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)

Prior to this PR, getMapSizesByExecutorId would call getPushBasedShuffleMapSizesByExecutorId, which would fetch mergeOutputStatuses in getStatuses even for a retried map stage where push-based shuffle is disabled. This further causes enableBatchFetch=false in the convertMapStatuses, thus results in the assertion failure.

The proposed change tries to avoid this behavior - when dep.shuffleMergeEnabled=false, getMapSizesByExecutorId would pass canFetchMergeResult=false to getStatuses to not fetch mergeOutputStatuses for this case. @Ngone51

@Ngone51
Copy link
Member

Ngone51 commented Nov 9, 2021

IIUC, for a retried map stage, it would go with below code path due to dep.shuffleMergeEnabled=false:
...

That's what we have today. But I'm thinking the current behavior may be wrong. (Your fix based on the current behavior is correct). For example, assuming we have stage 1 and stage 2, and stage 2 hits fetch failure and leads to stage 1 rerun.
So, with the current behavior, when stage 2 reruns, it'd have stage1.dep.shuffleMergeEnabled=false, and stage 2 can't fetch merge data of stage 1. However, I think we do can fetch the merge data, right?

@mridulm
Copy link
Contributor

mridulm commented Nov 9, 2021

@Ngone51 For the example you gave, namely:
stage1/attempt1 (parent) -> stage2/attempt1 (child) -- fetch failed --> stage1/attempt2 -> stage2/attempt2

Here, we have two cases:

  • If stage1 was non determinate stage - we throw away all merged output (conceptually, we dont really clear MapOutputTracker !) from stage1/attempt1 when we run stage1/attempt2.
    • This happens via shuffleDep.newShuffleMergeState() which is invoked from beginning of submitMissingTasks
  • If stage1 was determinate stage - we disable shuffle push for stage1/attempt2, and run stage1/attempt2.
    • This happens in submitMissingTasks if it is already merged finalized, see here.

In all of these combinations, whether stage2/attemptN uses merged output or not must only be based on whether merged output (finalized) is available for it to use - based on parent stage.
Currently, we are using a single variable to control behavior at both mapper side (push side) and reducer side (using merged output) - this was fine initially - but given for deterministic parent stages and retried child stages, we should make them separate.

IMO we should add a new variable, say useMergedShuffleInput - which will be based on whether there exists finalized/valid merged output from parent or not for that shuffle dep at time of launch of child stage.

Thoughts @Victsm, @zhouyejoe, @otterc, @rmcyang, @venkata91 ?

@rmcyang
Copy link
Contributor Author

rmcyang commented Nov 10, 2021

Ok this makes sense to me - current behavior indeed seems not a very efficient way to handle the scenario of stage retry when the parent stage is determinate and the parent stage has merge data, which should not be ignored by that condition.

Given SPARK-37023 is designated to fix the enableBatchFetch assertion failure bug for current behavior, shall we file another spark jira and implement the improvement there? @Ngone51 @mridulm @Victsm @zhouyejoe @otterc @venkata91

@mridulm
Copy link
Contributor

mridulm commented Nov 11, 2021

That sounds fine to me, we can fix the immediate assertion issue (which will fix the problem of stage retry causing application failure when push based shuffle is enabled) and follow up with the fix to ensure we allow determinate stages to read on stage retry.
How does that sound @Ngone51 ?

@Ngone51
Copy link
Member

Ngone51 commented Nov 11, 2021

sgtm.

@mridulm
Copy link
Contributor

mridulm commented Nov 11, 2021

@rmcyang Can you please file a jira for the issue that @Ngone51 detailed ? Thanks !
Will merge this to master and branch-3.2

@asfgit asfgit closed this in f1532a2 Nov 11, 2021
asfgit pushed a commit that referenced this pull request Nov 11, 2021
…led is false for a shuffleDependency during retry

### What changes were proposed in this pull request?

At high level, created a helper method `getMapSizesByExecutorIdImpl` on which `getMapSizesByExecutorId` and `getPushBasedShuffleMapSizesByExecutorId` can rely. It takes a parameter `useMergeResult`, which helps to check if fetching merge result is needed or not, and pass it as `canFetchMergeResult` into `getStatuses`.

### Why are the changes needed?

During some stage retry cases, the `shuffleDependency.shuffleMergeEnabled` can be set to false, but there will be `mergeStatus` since the Driver has already collected the merged status for its shuffle dependency. If this is the case, the current implementation would set the enableBatchFetch to false, since there are mergeStatus, to cause the assertion in `MapOutoutputTracker.getMapSizesByExecutorId` failed:
```
assert(mapSizesByExecutorId.enableBatchFetch == true)
```

The proposed fix helps resolve the issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Passed the existing UTs.

Closes #34461 from rmcyang/SPARK-37023.

Authored-by: Minchu Yang <minyang@minyang-mn3.linkedin.biz>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit f1532a2)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
@mridulm
Copy link
Contributor

mridulm commented Nov 11, 2021

Merged to master and branch-3.2
Thanks for working on this @rmcyang !
Thanks for the reviews @dongjoon-hyun, @Ngone51 :-)

@rmcyang
Copy link
Contributor Author

rmcyang commented Nov 13, 2021

Filed SPARK-37313 for the issue that @Ngone51 raised. Thanks for the reviews! @mridulm @dongjoon-hyun @Ngone51

sunchao pushed a commit to sunchao/spark that referenced this pull request Dec 8, 2021
…led is false for a shuffleDependency during retry

### What changes were proposed in this pull request?

At high level, created a helper method `getMapSizesByExecutorIdImpl` on which `getMapSizesByExecutorId` and `getPushBasedShuffleMapSizesByExecutorId` can rely. It takes a parameter `useMergeResult`, which helps to check if fetching merge result is needed or not, and pass it as `canFetchMergeResult` into `getStatuses`.

### Why are the changes needed?

During some stage retry cases, the `shuffleDependency.shuffleMergeEnabled` can be set to false, but there will be `mergeStatus` since the Driver has already collected the merged status for its shuffle dependency. If this is the case, the current implementation would set the enableBatchFetch to false, since there are mergeStatus, to cause the assertion in `MapOutoutputTracker.getMapSizesByExecutorId` failed:
```
assert(mapSizesByExecutorId.enableBatchFetch == true)
```

The proposed fix helps resolve the issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Passed the existing UTs.

Closes apache#34461 from rmcyang/SPARK-37023.

Authored-by: Minchu Yang <minyang@minyang-mn3.linkedin.biz>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit f1532a2)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
catalinii pushed a commit to lyft/spark that referenced this pull request Feb 22, 2022
…led is false for a shuffleDependency during retry

### What changes were proposed in this pull request?

At high level, created a helper method `getMapSizesByExecutorIdImpl` on which `getMapSizesByExecutorId` and `getPushBasedShuffleMapSizesByExecutorId` can rely. It takes a parameter `useMergeResult`, which helps to check if fetching merge result is needed or not, and pass it as `canFetchMergeResult` into `getStatuses`.

### Why are the changes needed?

During some stage retry cases, the `shuffleDependency.shuffleMergeEnabled` can be set to false, but there will be `mergeStatus` since the Driver has already collected the merged status for its shuffle dependency. If this is the case, the current implementation would set the enableBatchFetch to false, since there are mergeStatus, to cause the assertion in `MapOutoutputTracker.getMapSizesByExecutorId` failed:
```
assert(mapSizesByExecutorId.enableBatchFetch == true)
```

The proposed fix helps resolve the issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Passed the existing UTs.

Closes apache#34461 from rmcyang/SPARK-37023.

Authored-by: Minchu Yang <minyang@minyang-mn3.linkedin.biz>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit f1532a2)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
catalinii pushed a commit to lyft/spark that referenced this pull request Mar 4, 2022
…led is false for a shuffleDependency during retry

### What changes were proposed in this pull request?

At high level, created a helper method `getMapSizesByExecutorIdImpl` on which `getMapSizesByExecutorId` and `getPushBasedShuffleMapSizesByExecutorId` can rely. It takes a parameter `useMergeResult`, which helps to check if fetching merge result is needed or not, and pass it as `canFetchMergeResult` into `getStatuses`.

### Why are the changes needed?

During some stage retry cases, the `shuffleDependency.shuffleMergeEnabled` can be set to false, but there will be `mergeStatus` since the Driver has already collected the merged status for its shuffle dependency. If this is the case, the current implementation would set the enableBatchFetch to false, since there are mergeStatus, to cause the assertion in `MapOutoutputTracker.getMapSizesByExecutorId` failed:
```
assert(mapSizesByExecutorId.enableBatchFetch == true)
```

The proposed fix helps resolve the issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Passed the existing UTs.

Closes apache#34461 from rmcyang/SPARK-37023.

Authored-by: Minchu Yang <minyang@minyang-mn3.linkedin.biz>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit f1532a2)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
wangyum pushed a commit that referenced this pull request May 26, 2023
…led is false for a shuffleDependency during retry

### What changes were proposed in this pull request?

At high level, created a helper method `getMapSizesByExecutorIdImpl` on which `getMapSizesByExecutorId` and `getPushBasedShuffleMapSizesByExecutorId` can rely. It takes a parameter `useMergeResult`, which helps to check if fetching merge result is needed or not, and pass it as `canFetchMergeResult` into `getStatuses`.

### Why are the changes needed?

During some stage retry cases, the `shuffleDependency.shuffleMergeEnabled` can be set to false, but there will be `mergeStatus` since the Driver has already collected the merged status for its shuffle dependency. If this is the case, the current implementation would set the enableBatchFetch to false, since there are mergeStatus, to cause the assertion in `MapOutoutputTracker.getMapSizesByExecutorId` failed:
```
assert(mapSizesByExecutorId.enableBatchFetch == true)
```

The proposed fix helps resolve the issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Passed the existing UTs.

Closes #34461 from rmcyang/SPARK-37023.

Authored-by: Minchu Yang <minyang@minyang-mn3.linkedin.biz>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants