Skip to content

Commit

Permalink
follow-up fix for cleanup of older shuffle files
Browse files Browse the repository at this point in the history
  • Loading branch information
venkata91 committed Aug 2, 2021
1 parent c039d99 commit 936ddc0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Expand Up @@ -513,12 +513,18 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
}
} else {
appShuffleInfo.shuffles.compute(msg.shuffleId, (id, value) -> {
if (null == value || msg.shuffleMergeId != value.shuffleMergeId ||
if (null == value || msg.shuffleMergeId < value.shuffleMergeId ||
INDETERMINATE_SHUFFLE_FINALIZED == value.shuffleMergePartitions) {
throw new RuntimeException(String.format(
"Shuffle merge finalize request for shuffle %s with" + " shuffleMergeId %s is %s",
msg.shuffleId, msg.shuffleMergeId,
ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
} else if (msg.shuffleMergeId > value.shuffleMergeId) {
// If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return
// empty MergeStatuses but cleanup the older shuffleMergeId files.
mergedShuffleCleaner.execute(() ->
closeAndDeletePartitionFiles(value.shuffleMergePartitions));
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
} else {
shuffleMergePartitionsRef.set(value.shuffleMergePartitions);
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
Expand Down
Expand Up @@ -1234,6 +1234,26 @@ void closeAndDeletePartitionFiles(Map<Integer, AppShufflePartitionInfo> partitio
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 3));
MergedBlockMeta mergedBlockMeta = pushResolver.getMergedBlockMeta(testApp, 0, 3, 0);
validateChunks(testApp, 0, 3, 0, mergedBlockMeta, new int[]{2}, new int[][]{{0}});

StreamCallbackWithID stream4 =
pushResolver.receiveBlockDataAsStream(
new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 4, 0, 0, 0));
closed.acquire();
// Do not finalize shuffleMergeId 4 can happen during stage cancellation.
stream4.onData(stream4.getID(), ByteBuffer.wrap(new byte[2]));
stream4.onComplete(stream4.getID());

// Check whether the data is cleaned up when higher shuffleMergeId finalize request comes
// but no blocks pushed for that shuffleMergeId
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 5));
closed.acquire();
try {
pushResolver.getMergedBlockMeta(testApp, 0, 4, 0);
} catch(RuntimeException re) {
assertEquals("MergedBlockMeta fetch for shuffle 0 with shuffleMergeId 4 reduceId 0"
+ " is stale shuffle block fetch request as shuffle blocks of a higher shuffleMergeId for"
+ " the shuffle is available", re.getMessage());
}
}

private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException {
Expand Down

0 comments on commit 936ddc0

Please sign in to comment.