New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished #37922
Conversation
+CC @akpatnam25, @otterc - since you were planning to work on this already. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @wankunde !
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
Outdated
Show resolved
Hide resolved
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java
Show resolved
Hide resolved
@@ -393,6 +393,20 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) { | |||
} | |||
} | |||
|
|||
@Override | |||
public void removeShuffleMerge(String appId, int shuffleId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When adding support for shuffleMergeId
, follow the same pattern as finalizeShuffleMerge
- there are a few corner cases here, and that method handles them.
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Show resolved
Hide resolved
Hi, @mridulm Thanks for your review. In my opinion, I do not think we should add What do you think? |
We should decouple current implementation details when making protocol changes, and make it extensible for future evolution. In this case though, it is much more straightforward - there is an existing usecase which requires shuffle merge id. This specific change can be done in a follow up PR though - I want to get the basic mechanics working in this PR, and ensure the cleanup usecase is handled - before looking at further enhancements. |
The push-based shuffle service will auto clean up the old shuffle merge data, and the following stage will read the new stage's output, so we don't need send RemoveShuffleMerge RPC for a new ShuffleMerge? |
retest this please |
c32bea1
to
79df01a
Compare
Consider the case I mentioned above - stage retry for an The previous attempt's mergers are not reused for the next attempt - and so the previous mergers will continue to hold stale data without cleaning them up - until application terminates. |
Can one of the admins verify this patch? |
Thanks @mridulm , I will the the |
} else if (msgObj instanceof RemoveShuffleMerge) { | ||
RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj; | ||
checkAuth(client, msg.appId); | ||
logger.info("Remove shuffle merge data for application %s shuffle %d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove
--> removing
...network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
Show resolved
Hide resolved
...network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
Gentle ping @wankunde. Do you think you can update the PR soon? Please let us know if you need any help. |
Sorry for the late reply, I will update this PR today. |
Hi, @otterc @mridulm If the spark application stops unexpectedly, there will be some leaked merge files. In our production cluster, yarn nodeManager will cleanup these leaked files if the disk is not full, but not if the disk is full. spark/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java Line 433 in e669957
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few comments.
While going over this change, I think I might have hit upon an existing bug as well - so thanks for raising this PR for two reasons now !
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
Outdated
Show resolved
Hide resolved
...network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
submitCleanupTask(() -> | ||
closeAndDeleteOutdatedPartitions( | ||
appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); | ||
closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, shuffleMergePartitions)); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now in finalizeShuffleMerge()
method we will update the AppShuffleMergePartitionsInfo
to an empty object, so we can not cleanup the leaked files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my note above on how to handle this.
// Update the DB for the finalized shuffle | ||
writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); | ||
// Even when the mergePartitionsInfo is null, we mark the shuffle as finalized but the results | ||
// sent to the driver will be empty. This can happen when the service didn't receive any | ||
// blocks for the shuffle yet and the driver didn't wait for enough time to finalize the | ||
// shuffle. | ||
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); | ||
return mergePartitionsInfo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert changes to this method ? (assuming the changes I sketched above are fine)
@@ -702,7 +722,8 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { | |||
"finalizing shuffle partition {}", msg.appId, msg.appAttemptId, msg.shuffleId, | |||
msg.shuffleMergeId, partition.reduceId); | |||
} finally { | |||
partition.closeAllFilesAndDeleteIfNeeded(false); | |||
Boolean deleteFile = partition.mapTracker.getCardinality() == 0; | |||
partition.closeAllFilesAndDeleteIfNeeded(deleteFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't return the reduceId which partition.mapTracker.getCardinality() == 0
, so we can closeAllFilesAndDeleteIfNeeded
now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actual deletion will be handled by either application termination, or during shuffle cleanup (this PR).
Making this change here might have other issues we will need to think through.
If we do want to do this, can you file a follow up jira and we can investigate it there ?
I want to limit this PR specifically to changes required for this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this for the time being, and revisit it as a follow up as required.
There are a number of edge cases which needs to be handled in this important PR, and I dont want to add to them with optional changes with minor utility.
@@ -1410,26 +1431,27 @@ public String toString() { | |||
* required for the shuffles of indeterminate stages. | |||
*/ | |||
public static class AppShuffleMergePartitionsInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert changes to this class ? (assuming the changes I sketched above are fine)
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
I will try to get to this later this week, do let me know if you are still working on it/have pending comments to address. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a quick look, and some of the comments from the previous review are not yet addressed.
Do let me know if you have queries or comments on this @wankunde .
Thanks
/** | ||
* Remove shuffle merge data files. | ||
* | ||
* @param removeShuffleMerge Remove shuffle merge RPC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param removeShuffleMerge Remove shuffle merge RPC | |
* @param removeShuffleMerge contains shuffle details (appId, shuffleId, etc) to uniquely identify a shuffle to be removed |
|
||
@Override | ||
public void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge) { | ||
throw new UnsupportedOperationException("Cannot handle shuffle block merge"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
throw new UnsupportedOperationException("Cannot handle shuffle block merge"); | |
throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); |
} | ||
|
||
appShuffleInfo.shuffles.compute(shuffleId, (shuffleIdKey, partitionsInfo) -> { | ||
if (null != partitionsInfo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation related to shuffleMergeId
, from finalizeShuffleMerge
, is applicable here depending on shuffleMergeId
.
I have given the details here - please refer to it.
a3ceecb
to
ce2d576
Compare
ce2d576
to
091574b
Compare
6c409dc
to
29f4918
Compare
@mridulm I rebase the code, could you help to review this PR again? Thanks |
I just merged #39316 to unblock this PR, can you update to latest please ? Thx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just couple of minor comments - rest looks fine.
Please ping me when done, I want to try to get this in for 3.4, thanks !
File dataFile = | ||
appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, reduceId); | ||
dataFile.delete(); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we fix this @wankunde ?
Essentially, the changes are:
a) There is no exception thrown in this block - we dont need the try/catch.
b) When the delete fails, we dont need to do the warn
(here and below) : this can happen if application exit is racing against remove shuffle - the info
at the end of the method will suffice.
assertTrue(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists()); | ||
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 3, 2)); | ||
closed.tryAcquire(10, TimeUnit.SECONDS); | ||
assertFalse(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add tests for the additional cases we have ?
removeShuffleMerge
forDELETE_ALL_MERGED_SHUFFLE
- we can just do what we have forstreamCallback1
(Case 1) above, but useDELETE_ALL_MERGED_SHUFFLE
for merge id.- additional cases here to make sure of the negative cases as well ? namely:
msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId
- delete for a merge id which does not exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
...network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
Show resolved
Hide resolved
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test to check whether the message goes to the expected mergers from driver for RemoteShuffleMerge
when a shuffle is getting cleaned up ?
While trying to test the proposal here, I realized we are not testing this behavior - even replacing it with Nil always causes all tests to pass !
sc.cleaner.get.doCleanupShuffle(SHUFFLE_ID, blocking = true) | ||
assert(foundHosts.asScala == mergerLocs.map(_.host).toSet) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a bug in this test - sorry about that.
Please take a look here: 26b2d18
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Backport the code, much thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just have some minor comments
msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId); | ||
AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId( | ||
msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId); | ||
if(deleteCurrentMergedShuffle || shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: missing a space after if
deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo, | ||
mergePartitionsInfo.getReduceIds(), false)); | ||
} | ||
} else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space missing after if
. Also the condition here is not necessary. It will always be true (IDE shows that as well).
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
…shuffle/RemoteBlockPushResolver.java Co-authored-by: otterc <singh.chandni@gmail.com>
@otterc Thanks for your review. Format the code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added one comment.
Rest looks good to me.
Will it be possible to change that and push an update soon @wankunde ? I will merge it once the tests pass successfully.
...network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
Outdated
Show resolved
Hide resolved
3cc6896
to
7284b8f
Compare
The tests are failing @wankunde, though I dont think it is due to your PR. |
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
Merged to master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.
What changes were proposed in this pull request?
Cleanup merged shuffle data files after query finished.
The main changes are:
Why are the changes needed?
There will be too many merged shuffle data files for long running spark applications.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Local cluster test.