Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32916][SHUFFLE] Implementation of shuffle service that leverages push-based shuffle in YARN deployment mode #30062

Closed
wants to merge 35 commits into from

Conversation

otterc
Copy link
Contributor

@otterc otterc commented Oct 15, 2020

What changes were proposed in this pull request?

This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
Summary of changes:

  • Adds an implementation of MergedShuffleFileManager which was introduced with Spark 32915.
  • Integrated the push-based shuffle service with YarnShuffleService.

Why are the changes needed?

Refer to the SPIP in SPARK-30602.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Lead-authored-by: Min Shen mshen@linkedin.com
Co-authored-by: Chandni Singh chsingh@linkedin.com
Co-authored-by: Ye Zhou yezhou@linkedin.com

@otterc
Copy link
Contributor Author

otterc commented Oct 15, 2020

cc @Victsm @mridulm @tgravescs @jiangxb1987 @attilapiros @Ngone51
Please take a look.

@dongjoon-hyun dongjoon-hyun changed the title [Spark 32916][Shuffle] Implementation of shuffle service that leverages push-based shuffle in YARN deployment mode [SPARK-32916][SHUFFLE] Implementation of shuffle service that leverages push-based shuffle in YARN deployment mode Oct 15, 2020
@dongjoon-hyun
Copy link
Member

ok to test

@mridulm
Copy link
Contributor

mridulm commented Oct 15, 2020

+CC @vanzin

@otterc
Copy link
Contributor Author

otterc commented Oct 15, 2020

+CC @venkata91

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Test build #129856 has finished for PR 30062 at commit f34935f.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34462/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34462/

@mridulm
Copy link
Contributor

mridulm commented Oct 16, 2020

Can you fix the lint errors in RemoteBlockPushResolverSuite @otterc ?

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34474/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34474/

@otterc
Copy link
Contributor Author

otterc commented Oct 16, 2020

Fixed the lint errors in RemoteBlockPushResolverSuite.

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Test build #129869 has finished for PR 30062 at commit dceae72.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the same reminder: Spark follows databricks/scala-style-guide and Use 2-space indentation in general. Could you please fix the indentation errors?

@SparkQA
Copy link

SparkQA commented Oct 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34632/

@SparkQA
Copy link

SparkQA commented Oct 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34633/

@SparkQA
Copy link

SparkQA commented Oct 20, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34632/

@SparkQA
Copy link

SparkQA commented Oct 20, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34633/

@SparkQA
Copy link

SparkQA commented Oct 20, 2020

Test build #130026 has finished for PR 30062 at commit fbdd333.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 8, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35361/

@SparkQA
Copy link

SparkQA commented Nov 8, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35361/

@SparkQA
Copy link

SparkQA commented Nov 8, 2020

Test build #130752 has finished for PR 30062 at commit cb1881c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Nov 9, 2020

Thanks for taking another pass @Ngone51
Any other thoughts or comments @attilapiros, @tgravescs ?

@attilapiros
Copy link
Contributor

I have not found any new issues. LGTM.

@tgravescs
Copy link
Contributor

my last round of comments were addressed so +1 for me

@mridulm
Copy link
Contributor

mridulm commented Nov 9, 2020

Thanks for the reviews @Ngone51, @attilapiros , @tgravescs , @Victsm and @venkata91 !
Merging to master.

Thanks for the contribution @otterc

@asfgit asfgit closed this in 8113c88 Nov 9, 2020
@dongjoon-hyun
Copy link
Member

Since this isn't recovered for two days, could you take a look, @mridulm ?

@otterc
Copy link
Contributor Author

otterc commented Nov 12, 2020

I had added YarnShuffleServiceSuite.java to add unit tests related to creation of MergedShuffleFileManager Implementation.
The tests are failing with

java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.network.yarn.YarnShuffleService
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testCreateRemoteBlockPushResolverInstance(YarnShuffleServiceSuite.java:47)

Not sure why it can't find YarnShuffleService class?
@mridulm @tgravescs Do you know why this could happen?

@mridulm
Copy link
Contributor

mridulm commented Nov 12, 2020

Thanks for the ping @dongjoon-hyun ! I had not noticed this issue.

@otterc Is it a dependency issue in hadoop-2.7 ?

[ERROR] testCreateDefaultMergedShuffleFileManagerInstance(org.apache.spark.network.yarn.YarnShuffleServiceSuite)  Time elapsed: 0.627 s  <<< ERROR!
java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testCreateDefaultMergedShuffleFileManagerInstance(YarnShuffleServiceSuite.java:37)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testCreateDefaultMergedShuffleFileManagerInstance(YarnShuffleServiceSuite.java:37)

[ERROR] testCreateRemoteBlockPushResolverInstance(org.apache.spark.network.yarn.YarnShuffleServiceSuite)  Time elapsed: 0 s  <<< ERROR!
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.network.yarn.YarnShuffleService
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testCreateRemoteBlockPushResolverInstance(YarnShuffleServiceSuite.java:47)

[ERROR] testInvalidClassNameOfMergeManagerWillUseNoOpInstance(org.apache.spark.network.yarn.YarnShuffleServiceSuite)  Time elapsed: 0.001 s  <<< ERROR!
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.network.yarn.YarnShuffleService
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testInvalidClassNameOfMergeManagerWillUseNoOpInstance(YarnShuffleServiceSuite.java:57)

@otterc Can you try building and running master locally with the hadoop-2.7 profile to check please ? That should help understand what the issue is, and more quickly iterate towards a solution.
I am guessing the solution should be addition of commons logging for hadoop-2.7 profile for spark network module - but please check. Thanks !

@otterc
Copy link
Contributor Author

otterc commented Nov 12, 2020

I think there is something different with how the network-yarn module is build. I see that there existed
resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala.
I think I shouldn't have added YarnShuffleServiceSuite.java in the network-yarn module.

@mridulm
Copy link
Contributor

mridulm commented Nov 12, 2020

Can you make a followup patch for this @otterc ?
Please do test it locally on both hadoop-2.7 and default, thanks !

asfgit pushed a commit that referenced this pull request Nov 13, 2020
…dded YarnShuffleServiceSuite.java

### What changes were proposed in this pull request?
This is a follow-up fix for the failing tests in `YarnShuffleServiceSuite.java`. This java class was introduced in #30062. The tests in the class fail when run with hadoop-2.7 profile:
```
[ERROR] testCreateDefaultMergedShuffleFileManagerInstance(org.apache.spark.network.yarn.YarnShuffleServiceSuite)  Time elapsed: 0.627 s  <<< ERROR!
java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testCreateDefaultMergedShuffleFileManagerInstance(YarnShuffleServiceSuite.java:37)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testCreateDefaultMergedShuffleFileManagerInstance(YarnShuffleServiceSuite.java:37)

[ERROR] testCreateRemoteBlockPushResolverInstance(org.apache.spark.network.yarn.YarnShuffleServiceSuite)  Time elapsed: 0 s  <<< ERROR!
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.network.yarn.YarnShuffleService
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testCreateRemoteBlockPushResolverInstance(YarnShuffleServiceSuite.java:47)

[ERROR] testInvalidClassNameOfMergeManagerWillUseNoOpInstance(org.apache.spark.network.yarn.YarnShuffleServiceSuite)  Time elapsed: 0.001 s  <<< ERROR!
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.network.yarn.YarnShuffleService
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testInvalidClassNameOfMergeManagerWillUseNoOpInstance(YarnShuffleServiceSuite.java:57)
```
A test suit for `YarnShuffleService` did exist here:
`resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala`
I missed this when I created `common/network-yarn/src/test/java/org/apache/spark/network/yarn/YarnShuffleServiceSuite.java`. Moving all the new tests to the earlier test suite fixes the failures with hadoop-2.7 even though why this happened is not clear.

### Why are the changes needed?
The newly added tests are failing when run with hadoop profile 2.7

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Ran the unit tests with the default profile as well as hadoop 2.7 profile.
`build/mvn test -Dtest=none -DwildcardSuites=org.apache.spark.network.yarn.YarnShuffleServiceSuite -Phadoop-2.7 -Pyarn`
```
Run starting. Expected test count is: 11
YarnShuffleServiceSuite:
- executor state kept across NM restart
- removed applications should not be in registered executor file
- shuffle service should be robust to corrupt registered executor file
- get correct recovery path
- moving recovery file from NM local dir to recovery path
- service throws error if cannot start
- recovery db should not be created if NM recovery is not enabled
- SPARK-31646: metrics should be registered into Node Manager's metrics system
- create default merged shuffle file manager instance
- create remote block push resolver instance
- invalid class name of merge manager will use noop instance
Run completed in 2 seconds, 572 milliseconds.
Total number of tests run: 11
Suites: completed 2, aborted 0
Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes #30349 from otterc/SPARK-32916-followup.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
asfgit pushed a commit that referenced this pull request Dec 23, 2020
…of chunks in meta file and index file are equal

### What changes were proposed in this pull request?
1. Fixes for bugs in `RemoteBlockPushResolver` where the number of chunks in meta file and index file are inconsistent due to exceptions while writing to either index file or meta file. This java class was introduced in #30062.
 - If the writing to index file fails, the position of meta file is not reset. This means that the number of chunks in meta file is inconsistent with index file.
- During the exception handling while writing to index/meta file, we just set the pointer to the start position. If the files are closed just after this then it doesn't get rid of any the extra bytes written to it.
2. Adds an IOException threshold. If the `RemoteBlockPushResolver` encounters IOExceptions greater than this threshold  while updating data/meta/index file of a shuffle partition, then it responds to the client with  exception- `IOExceptions exceeded the threshold` so that client can stop pushing data for this shuffle partition.
3. When the update to metadata fails, exception is not propagated back to the client. This results in the increased size of the current chunk. However, with (2) in place, the current chunk will still be of a manageable size.

### Why are the changes needed?
This fix is needed for the bugs mentioned above.
1. Moved writing to meta file after index file. This fixes the issue because if there is an exception writing to meta file, then the index file position is not updated. With this change, if there is an exception writing to index file, then none of the files are effectively updated and the same is true vice-versa.
2. Truncating the lengths of data/index/meta files when the partition is finalized.
3. When the IOExceptions have reached the threshold, it is most likely that future blocks will also face the issue. So, it is better to let the clients know so that they can stop pushing the blocks for that partition.
4. When just the meta update fails, client retries pushing the block which was successfully merged to data file. This can be avoided by letting the chunk grow slightly.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests for all the bugs and threshold.

Closes #30433 from otterc/SPARK-32916-followup.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
asfgit pushed a commit that referenced this pull request Dec 23, 2020
…of chunks in meta file and index file are equal

### What changes were proposed in this pull request?
1. Fixes for bugs in `RemoteBlockPushResolver` where the number of chunks in meta file and index file are inconsistent due to exceptions while writing to either index file or meta file. This java class was introduced in #30062.
 - If the writing to index file fails, the position of meta file is not reset. This means that the number of chunks in meta file is inconsistent with index file.
- During the exception handling while writing to index/meta file, we just set the pointer to the start position. If the files are closed just after this then it doesn't get rid of any the extra bytes written to it.
2. Adds an IOException threshold. If the `RemoteBlockPushResolver` encounters IOExceptions greater than this threshold  while updating data/meta/index file of a shuffle partition, then it responds to the client with  exception- `IOExceptions exceeded the threshold` so that client can stop pushing data for this shuffle partition.
3. When the update to metadata fails, exception is not propagated back to the client. This results in the increased size of the current chunk. However, with (2) in place, the current chunk will still be of a manageable size.

### Why are the changes needed?
This fix is needed for the bugs mentioned above.
1. Moved writing to meta file after index file. This fixes the issue because if there is an exception writing to meta file, then the index file position is not updated. With this change, if there is an exception writing to index file, then none of the files are effectively updated and the same is true vice-versa.
2. Truncating the lengths of data/index/meta files when the partition is finalized.
3. When the IOExceptions have reached the threshold, it is most likely that future blocks will also face the issue. So, it is better to let the clients know so that they can stop pushing the blocks for that partition.
4. When just the meta update fails, client retries pushing the block which was successfully merged to data file. This can be avoided by letting the chunk grow slightly.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests for all the bugs and threshold.

Closes #30433 from otterc/SPARK-32916-followup.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 0677c39)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
asfgit pushed a commit that referenced this pull request Mar 25, 2021
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in #30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes #31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
asfgit pushed a commit that referenced this pull request Mar 25, 2021
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in #30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes #31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 6d88212)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in apache#30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes apache#31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 6d88212)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in apache#30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes apache#31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 6d88212)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
domybest11 pushed a commit to domybest11/spark that referenced this pull request Jun 15, 2022
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in apache#30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes apache#31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
wangyum pushed a commit that referenced this pull request May 26, 2023
…es push-based shuffle in YARN deployment mode

This is one of the patches for SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602) which is needed for push-based shuffle.
Summary of changes:
- Adds an implementation of `MergedShuffleFileManager` which was introduced with [Spark 32915](https://issues.apache.org/jira/browse/SPARK-32915).
- Integrated the push-based shuffle service with `YarnShuffleService`.

Refer to the SPIP in  [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).

No

Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Ye Zhou yezhoulinkedin.com

Closes #30062 from otterc/SPARK-32916.

Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Ye Zhou <yezhou@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
wangyum pushed a commit that referenced this pull request May 26, 2023
…dded YarnShuffleServiceSuite.java

This is a follow-up fix for the failing tests in `YarnShuffleServiceSuite.java`. This java class was introduced in #30062. The tests in the class fail when run with hadoop-2.7 profile:
```
[ERROR] testCreateDefaultMergedShuffleFileManagerInstance(org.apache.spark.network.yarn.YarnShuffleServiceSuite)  Time elapsed: 0.627 s  <<< ERROR!
java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testCreateDefaultMergedShuffleFileManagerInstance(YarnShuffleServiceSuite.java:37)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testCreateDefaultMergedShuffleFileManagerInstance(YarnShuffleServiceSuite.java:37)

[ERROR] testCreateRemoteBlockPushResolverInstance(org.apache.spark.network.yarn.YarnShuffleServiceSuite)  Time elapsed: 0 s  <<< ERROR!
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.network.yarn.YarnShuffleService
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testCreateRemoteBlockPushResolverInstance(YarnShuffleServiceSuite.java:47)

[ERROR] testInvalidClassNameOfMergeManagerWillUseNoOpInstance(org.apache.spark.network.yarn.YarnShuffleServiceSuite)  Time elapsed: 0.001 s  <<< ERROR!
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.network.yarn.YarnShuffleService
	at org.apache.spark.network.yarn.YarnShuffleServiceSuite.testInvalidClassNameOfMergeManagerWillUseNoOpInstance(YarnShuffleServiceSuite.java:57)
```
A test suit for `YarnShuffleService` did exist here:
`resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala`
I missed this when I created `common/network-yarn/src/test/java/org/apache/spark/network/yarn/YarnShuffleServiceSuite.java`. Moving all the new tests to the earlier test suite fixes the failures with hadoop-2.7 even though why this happened is not clear.

The newly added tests are failing when run with hadoop profile 2.7

No

Ran the unit tests with the default profile as well as hadoop 2.7 profile.
`build/mvn test -Dtest=none -DwildcardSuites=org.apache.spark.network.yarn.YarnShuffleServiceSuite -Phadoop-2.7 -Pyarn`
```
Run starting. Expected test count is: 11
YarnShuffleServiceSuite:
- executor state kept across NM restart
- removed applications should not be in registered executor file
- shuffle service should be robust to corrupt registered executor file
- get correct recovery path
- moving recovery file from NM local dir to recovery path
- service throws error if cannot start
- recovery db should not be created if NM recovery is not enabled
- SPARK-31646: metrics should be registered into Node Manager's metrics system
- create default merged shuffle file manager instance
- create remote block push resolver instance
- invalid class name of merge manager will use noop instance
Run completed in 2 seconds, 572 milliseconds.
Total number of tests run: 11
Suites: completed 2, aborted 0
Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes #30349 from otterc/SPARK-32916-followup.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
wangyum pushed a commit that referenced this pull request May 26, 2023
…of chunks in meta file and index file are equal

### What changes were proposed in this pull request?
1. Fixes for bugs in `RemoteBlockPushResolver` where the number of chunks in meta file and index file are inconsistent due to exceptions while writing to either index file or meta file. This java class was introduced in #30062.
 - If the writing to index file fails, the position of meta file is not reset. This means that the number of chunks in meta file is inconsistent with index file.
- During the exception handling while writing to index/meta file, we just set the pointer to the start position. If the files are closed just after this then it doesn't get rid of any the extra bytes written to it.
2. Adds an IOException threshold. If the `RemoteBlockPushResolver` encounters IOExceptions greater than this threshold  while updating data/meta/index file of a shuffle partition, then it responds to the client with  exception- `IOExceptions exceeded the threshold` so that client can stop pushing data for this shuffle partition.
3. When the update to metadata fails, exception is not propagated back to the client. This results in the increased size of the current chunk. However, with (2) in place, the current chunk will still be of a manageable size.

### Why are the changes needed?
This fix is needed for the bugs mentioned above.
1. Moved writing to meta file after index file. This fixes the issue because if there is an exception writing to meta file, then the index file position is not updated. With this change, if there is an exception writing to index file, then none of the files are effectively updated and the same is true vice-versa.
2. Truncating the lengths of data/index/meta files when the partition is finalized.
3. When the IOExceptions have reached the threshold, it is most likely that future blocks will also face the issue. So, it is better to let the clients know so that they can stop pushing the blocks for that partition.
4. When just the meta update fails, client retries pushing the block which was successfully merged to data file. This can be avoided by letting the chunk grow slightly.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests for all the bugs and threshold.

Closes #30433 from otterc/SPARK-32916-followup.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
wangyum pushed a commit that referenced this pull request May 26, 2023
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in #30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes #31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
10 participants