Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36772] FinalizeShuffleMerge fails with an exception due to attempt id not matching #34018

Closed
wants to merge 7 commits into from

Conversation

zhouyejoe
Copy link
Contributor

@zhouyejoe zhouyejoe commented Sep 16, 2021

What changes were proposed in this pull request?

Remove the appAttemptId from TransportConf, and parsing through SparkEnv.

Why are the changes needed?

Push based shuffle will fail if there are any attemptId set in the SparkConf, as the attemptId is not set correctly in Driver.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Tested within our Yarn cluster. Without this PR, the Driver will fail to finalize the shuffle merge on all the mergers. After the patch, Driver can successfully finalize the shuffle merge and the push based shuffle can work fine.
Also with unit test to verify the attemptId is being set in the BlockStoreClient in Driver.

@github-actions github-actions bot added the CORE label Sep 16, 2021
@zhouyejoe
Copy link
Contributor Author

cc @mridulm @Ngone51

@gengliangwang
Copy link
Member

gengliangwang commented Sep 16, 2021

@zhouyejoe Thanks for the fix.
I will cut 3.2.0 RC3 after this one is merged.

@zhouyejoe
Copy link
Contributor Author

Tested the patch within our own cluster with YARN set up. Push based shuffle works fine in yarn cluster mode, where Yarn will provide default application attempt 1 to the job. Added unit test, leveraging the CSMockExternalBlockManager, started a SparkContext and check whether the application attemptId is being set in BlockStoreClient

@gengliangwang
Copy link
Member

@zhouyejoe could you update the PR title and description?

@zhouyejoe zhouyejoe changed the title [WIP][SPARK-36772] FinalizeShuffleMerge fails with an exception due to attempt id not matching [SPARK-36772] FinalizeShuffleMerge fails with an exception due to attempt id not matching Sep 17, 2021
@zhouyejoe
Copy link
Contributor Author

@zhouyejoe could you update the PR title and description?

Updated the PR title and description.

@mridulm
Copy link
Contributor

mridulm commented Sep 17, 2021

Discussed with @Ngone51.
@zhouyejoe can we make the following change please:

  • Move toInt from SparkContext into setAttemptId (which will take it as a String param).
  • try/catch the Integer.parseInt and log warning in case it is not able to parse attempt id to string.
    • If successful, set attempt id to that value - else leave it unchanged (defaulting to -1).
  • Let us keep the attempt id in BlockStoreClient itself.

Thoughts ?

@zhouyejoe
Copy link
Contributor Author

Discussed with @Ngone51.
@zhouyejoe can we make the following change please:

  • Move toInt from SparkContext into setAttemptId (which will take it as a String param).

  • try/catch the Integer.parseInt and log warning in case it is not able to parse attempt id to string.

    • If successful, set attempt id to that value - else leave it unchanged (defaulting to -1).
  • Let us keep the attempt id in BlockStoreClient itself.

Thoughts ?

SGTM. Will update accordingly.

@Ngone51
Copy link
Member

Ngone51 commented Sep 17, 2021

I'm thinking of the potential compatibility issue in the future by using the int type of appAttemptId inside BlockStoreClinet. Using int might impede the future extension of CMs (e.g., K8s, Standalone, or something new) if they want the arbitrary attempt id to include more info. So I think it's good to keep it as a string. However, since push-based shuffle would require a comparable number value for its functionality, we'd still require the attempt id to at least includes one comparable number value, e.g., (xxx_attempt_id_12). Then, in the case of push-based shuffle, we can just extract the number value into int number, which would be compatible with the current push-based shuffle protocols.
@mridulm @zhouyejoe WDYT?

…nt from Int to String, but add comparableAppAttemptId in ExternalBlockStoreClient
@zhouyejoe
Copy link
Contributor Author

I am also thinking about the same.

Discussed with @Ngone51.
@zhouyejoe can we make the following change please:

  • Move toInt from SparkContext into setAttemptId (which will take it as a String param).

  • try/catch the Integer.parseInt and log warning in case it is not able to parse attempt id to string.

    • If successful, set attempt id to that value - else leave it unchanged (defaulting to -1).
  • Let us keep the attempt id in BlockStoreClient itself.

Thoughts ?

Update:

  1. Changed the AppAttemptId type in BlockStoreClient from Int to String. Add integer comparableAppAttemptId in ExternalBlockStoreClient, and override the getAppAttemptId and setAppAttemptId there.
  2. Try catch the String to Integer parse in setComparableAppAttemptId.
  3. The default value of comparableAppAttemptId is -1.
  4. Only setAppAttemptId in Driver or Executor when Push based shuffle is enabled.
  5. Minor change in PushBasedExternalClusterManager to return 1 for the TaskScheduler.applicationAttemptId()

@zhouyejoe
Copy link
Contributor Author

BTW, I tested in three modes:

  1. Local mode with Spark Pi example.
  2. Yarn client mode from Spark-shell with a large amount of shuffle.
  3. Yarn cluster mode by launching an actual application with a large amount of shuffle.
    cc @mridulm

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just couple of minor comments - looks good to me.
+CC @Ngone51

@zhouyejoe
Copy link
Contributor Author

Updated. Last run PR test failed on some unrelated unit test.
Jenkins, test this please.

@mridulm
Copy link
Contributor

mridulm commented Sep 18, 2021

Ok to test

@SparkQA
Copy link

SparkQA commented Sep 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47941/

@SparkQA
Copy link

SparkQA commented Sep 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47941/

@gengliangwang
Copy link
Member

Merging to master/3.2
@zhouyejoe thanks for the work and test!
@Ngone51 @mridulm @venkata91 thanks for the review.

gengliangwang pushed a commit that referenced this pull request Sep 18, 2021
…empt id not matching

### What changes were proposed in this pull request?
Remove the appAttemptId from TransportConf, and parsing through SparkEnv.

### Why are the changes needed?
Push based shuffle will fail if there are any attemptId set in the SparkConf, as the attemptId is not set correctly in Driver.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested within our Yarn cluster. Without this PR, the Driver will fail to finalize the shuffle merge on all the mergers. After the patch, Driver can successfully finalize the shuffle merge and the push based shuffle can work fine.
Also with unit test to verify the attemptId is being set in the BlockStoreClient in Driver.

Closes #34018 from zhouyejoe/SPARK-36772.

Authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit cabc36b)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
@SparkQA
Copy link

SparkQA commented Sep 18, 2021

Test build #143433 has finished for PR 34018 at commit f6e47b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member

Ngone51 commented Sep 18, 2021

Late lgtm. Thanks @zhouyejoe

@mridulm
Copy link
Contributor

mridulm commented Sep 18, 2021

Thanks for reviewing and merging it @gengliangwang !

@zhouyejoe
Copy link
Contributor Author

Thanks for review @mridulm @Ngone51 @venkata91 @gengliangwang

venkata91 pushed a commit to linkedin/spark that referenced this pull request Oct 13, 2021
…empt id not matching

### What changes were proposed in this pull request?
Remove the appAttemptId from TransportConf, and parsing through SparkEnv.

### Why are the changes needed?
Push based shuffle will fail if there are any attemptId set in the SparkConf, as the attemptId is not set correctly in Driver.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested within our Yarn cluster. Without this PR, the Driver will fail to finalize the shuffle merge on all the mergers. After the patch, Driver can successfully finalize the shuffle merge and the push based shuffle can work fine.
Also with unit test to verify the attemptId is being set in the BlockStoreClient in Driver.

Closes apache#34018 from zhouyejoe/SPARK-36772.

Authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit cabc36b)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
wangyum pushed a commit that referenced this pull request May 26, 2023
…empt id not matching

Remove the appAttemptId from TransportConf, and parsing through SparkEnv.

Push based shuffle will fail if there are any attemptId set in the SparkConf, as the attemptId is not set correctly in Driver.

No

Tested within our Yarn cluster. Without this PR, the Driver will fail to finalize the shuffle merge on all the mergers. After the patch, Driver can successfully finalize the shuffle merge and the push based shuffle can work fine.
Also with unit test to verify the attemptId is being set in the BlockStoreClient in Driver.

Closes #34018 from zhouyejoe/SPARK-36772.

Authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants