diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/PushBasedShuffleUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/PushBasedShuffleUtils.java new file mode 100644 index 000000000000..a7cd21c81437 --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/util/PushBasedShuffleUtils.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +public class PushBasedShuffleUtils { + + /** + * Push based shuffle requires a comparable Id to distinguish the shuffle data among multiple + * application attempts. This variable is derived from the String typed appAttemptId. + * If no appAttemptId is set, the default comparableAppAttemptId is -1. + */ + public static final int DEFAUT_APP_ATTEMPT_ID = -1; + + /** + * The flag for deleting the current merged shuffle data. + */ + public static final int DELETE_CURRENT_MERGED_SHUFFLE_ID = -1; +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java index 4e40090b065e..5da7b026d10e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java @@ -198,6 +198,10 @@ protected void handleMessage( int numRemovedBlocks = blockManager.removeBlocks(msg.appId, msg.execId, msg.blockIds); callback.onSuccess(new BlocksRemoved(numRemovedBlocks).toByteBuffer()); + } else if (msgObj instanceof RemoveShuffleMerge) { + RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj; + checkAuth(client, msg.appId); + mergeManager.removeShuffleMerge(msg); } else if (msgObj instanceof GetLocalDirsForExecutors) { GetLocalDirsForExecutors msg = (GetLocalDirsForExecutors) msgObj; checkAuth(client, msg.appId); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index b066d99e8ef8..504df47de1bd 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -28,6 +28,8 @@ import com.codahale.metrics.MetricSet; import com.google.common.collect.Lists; +import static org.apache.spark.network.util.PushBasedShuffleUtils.DEFAUT_APP_ATTEMPT_ID; +import static org.apache.spark.network.util.PushBasedShuffleUtils.DELETE_CURRENT_MERGED_SHUFFLE_ID; import org.apache.spark.network.TransportContext; import org.apache.spark.network.buffer.ManagedBuffer; @@ -38,7 +40,14 @@ import org.apache.spark.network.crypto.AuthClientBootstrap; import org.apache.spark.network.sasl.SecretKeyHolder; import org.apache.spark.network.server.NoOpRpcHandler; -import org.apache.spark.network.shuffle.protocol.*; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; +import org.apache.spark.network.shuffle.protocol.BlocksRemoved; +import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; +import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; +import org.apache.spark.network.shuffle.protocol.MergeStatuses; +import org.apache.spark.network.shuffle.protocol.RegisterExecutor; +import org.apache.spark.network.shuffle.protocol.RemoveBlocks; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.apache.spark.network.util.TransportConf; /** @@ -52,10 +61,8 @@ public class ExternalBlockStoreClient extends BlockStoreClient { private final boolean authEnabled; private final SecretKeyHolder secretKeyHolder; private final long registrationTimeoutMs; - // Push based shuffle requires a comparable Id to distinguish the shuffle data among multiple - // application attempts. This variable is derived from the String typed appAttemptId. If no - // appAttemptId is set, the default comparableAppAttemptId is -1. - private int comparableAppAttemptId = -1; + + private int comparableAppAttemptId = DEFAUT_APP_ATTEMPT_ID; /** * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled, @@ -315,6 +322,23 @@ public void onFailure(Throwable e) { return numRemovedBlocksFuture; } + public Boolean removeShuffleMerge(String host, int port, int shuffleId) { + try { + checkInit(); + TransportClient client = clientFactory.createClient(host, port); + String appAttemptId = + getAppAttemptId() == null ? String.valueOf(DEFAUT_APP_ATTEMPT_ID) : getAppAttemptId(); + ByteBuffer removeMessage = new RemoveShuffleMerge(appId, appAttemptId, shuffleId, + DELETE_CURRENT_MERGED_SHUFFLE_ID).toByteBuffer(); + client.send(removeMessage); + } catch (Throwable e) { + logger.error("Error while sending RemoveShuffleMerge request to {}:{}", + host, port, e); + return false; + } + return true; + } + @Override public void close() { checkInit(); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java index 051684a92d0b..e70e365cb97a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java @@ -26,6 +26,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; /** * The MergedShuffleFileManager is used to process push based shuffle when enabled. It works @@ -121,6 +122,14 @@ MergedBlockMeta getMergedBlockMeta( */ String[] getMergedBlockDirs(String appId); + /** + * Handles the request to remove shuffle merge files. + * + * @param msg contains shuffle details (appId, shuffleId, etc) to uniquely identify + * a shuffle to be removed + */ + void removeShuffleMerge(RemoveShuffleMerge msg); + /** * Optionally close any resources associated the MergedShuffleFileManager, such as the * leveldb for state persistence. diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java index 876b10095938..da122c30755f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java @@ -26,6 +26,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.apache.spark.network.util.TransportConf; /** @@ -80,6 +81,11 @@ public MergedBlockMeta getMergedBlockMeta( throw new UnsupportedOperationException("Cannot handle shuffle block merge"); } + @Override + public void removeShuffleMerge(RemoveShuffleMerge msg) { + throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); + } + @Override public String[] getMergedBlockDirs(String appId) { throw new UnsupportedOperationException("Cannot handle shuffle block merge"); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 816d1082850c..8beef207b1f9 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -33,15 +33,15 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentMap; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonProperty; @@ -56,10 +56,11 @@ import com.google.common.cache.Weigher; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; - import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.spark.network.util.PushBasedShuffleUtils.DEFAUT_APP_ATTEMPT_ID; +import static org.apache.spark.network.util.PushBasedShuffleUtils.DELETE_CURRENT_MERGED_SHUFFLE_ID; import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.buffer.ManagedBuffer; @@ -71,6 +72,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.apache.spark.network.shuffledb.DB; import org.apache.spark.network.shuffledb.DBBackend; import org.apache.spark.network.shuffledb.DBIterator; @@ -94,7 +96,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { public static final String SHUFFLE_META_DELIMITER = ":"; public static final String MERGE_DIR_KEY = "mergeDir"; public static final String ATTEMPT_ID_KEY = "attemptId"; - private static final int UNDEFINED_ATTEMPT_ID = -1; private static final String DB_KEY_DELIMITER = ";"; private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler(); // ByteBuffer to respond to client upon a successful merge of a pushed block @@ -227,15 +228,14 @@ AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( // Higher shuffleMergeId seen for the shuffle ID meaning new stage attempt is being // run for the shuffle ID. Close and clean up old shuffleMergeId files, // happens in the indeterminate stage retries - AppAttemptShuffleMergeId appAttemptShuffleMergeId = + AppAttemptShuffleMergeId higherShuffleMergeId = new AppAttemptShuffleMergeId( appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, shuffleMergeId); logger.info("{}: creating a new shuffle merge metadata since received " + "shuffleMergeId is higher than latest shuffleMergeId {}", - appAttemptShuffleMergeId, latestShuffleMergeId); - submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + higherShuffleMergeId, latestShuffleMergeId); + submitRemoveShuffleMergeTask( + appShuffleInfo, shuffleId, Optional.of(higherShuffleMergeId)); return new AppShuffleMergePartitionsInfo(shuffleMergeId, false); } else { // The request is for block with same shuffleMergeId as the latest shuffleMergeId @@ -365,7 +365,7 @@ public String[] getMergedBlockDirs(String appId) { } private void removeOldApplicationAttemptsFromDb(AppShuffleInfo info) { - if (info.attemptId != UNDEFINED_ATTEMPT_ID) { + if (info.attemptId != DEFAUT_APP_ATTEMPT_ID) { for (int formerAttemptId = 0; formerAttemptId < info.attemptId; formerAttemptId++) { removeAppAttemptPathInfoFromDB(info.appId, formerAttemptId); } @@ -418,6 +418,46 @@ void closeAndDeletePartitionsIfNeeded( removeAppShuffleInfoFromDB(appShuffleInfo); } + @Override + public void removeShuffleMerge(RemoveShuffleMerge msg) { + if (!appsShuffleInfo.containsKey(msg.appId)) { + logger.debug("Asked to remove merged shuffle, but application {} " + + "is not registered or NM was restarted.", msg.appId); + return; + } + AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(msg.appId); + if (!msg.appAttemptId.equals(String.valueOf(appShuffleInfo.attemptId))) { + throw new IllegalArgumentException( + String.format("The attempt id %s in this removeMergedData message does not match " + + "with the current attempt id %s stored in shuffle service for application %s", + msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); + } + if (!appShuffleInfo.shuffles.containsKey(msg.shuffleId)) { + logger.debug("Asked to remove Application {} unknown shuffle merged data, shuffleId = {}", + msg.appId, msg.shuffleId); + return; + } + logger.info("Removing merged shuffle for application = {}, shuffleId = {}, shuffleMergeId = {}", + msg.appId, msg.shuffleId, msg.shuffleMergeId); + AppShuffleMergePartitionsInfo mergePartitionsInfo = appShuffleInfo.shuffles.get(msg.shuffleId); + boolean deleteCurrent = msg.shuffleMergeId.equals(DELETE_CURRENT_MERGED_SHUFFLE_ID); + if (!deleteCurrent && msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { + throw new RuntimeException( + String.format("Asked to remove old shuffle merged data for application = %s ," + + " shuffleId = %s, shuffleMergeId = %s, but current shuffleMergeId = %s ", + msg.appId, msg.shuffleId, msg.shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); + } else if (!deleteCurrent && msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { + // There is a higher shuffleMergeId request to clean, but also clean up older + // shuffleMergeId partitions. + Optional toRemoveHigherShuffleMergeId = + Optional.of(new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId, + msg.shuffleId, msg.shuffleMergeId)); + submitRemoveShuffleMergeTask(appShuffleInfo, msg.shuffleId, toRemoveHigherShuffleMergeId); + } else { + submitRemoveShuffleMergeTask(appShuffleInfo, msg.shuffleId, Optional.empty()); + } + } + /** * Remove the application attempt local paths information from the DB. This method is being * invoked within the lock from the ConcurrentHashmap appsShuffleInfo on the specific @@ -452,22 +492,79 @@ void removeAppShuffleInfoFromDB(AppShuffleInfo appShuffleInfo) { } /** - * Clean up all the AppShufflePartitionInfo and the finalized shuffle partitions in DB for - * a specific shuffleMergeId. This is done since there is a higher shuffleMergeId request made - * for a shuffleId, therefore clean up older shuffleMergeId partitions. The cleanup will be - * executed the mergedShuffleCleaner thread. + * Clean up the outdated finalized or unfinalized shuffle partitions. + * The cleanup will be executed in the mergedShuffleCleaner thread. Two cases: + * 1. there is a higher shuffleMergeId request made for a shuffleId, therefore clean up + * older shuffleMergeId partitions. + * 2. Application requires to clean up the expired or unused specific shuffleId partitions */ + @VisibleForTesting + void submitRemoveShuffleMergeTask( + AppShuffleInfo shuffleInfo, Integer shuffleId, + Optional higherShuffleMergeIdToClean) { + AppShuffleMergePartitionsInfo mergePartitionsInfo = shuffleInfo.shuffles.get(shuffleId); + AppAttemptShuffleMergeId shuffleMergeId = new AppAttemptShuffleMergeId( + shuffleInfo.appId, shuffleInfo.attemptId, shuffleId, mergePartitionsInfo.shuffleMergeId); + // Before submitting an asynchronous task, partitions should be extracted first + if (!mergePartitionsInfo.isFinalized()) { + Map partitionsToClean = mergePartitionsInfo.shuffleMergePartitions; + submitCleanupTask(() -> + closeAndDeleteOutdatedPartitions(shuffleMergeId, partitionsToClean, + higherShuffleMergeIdToClean)); + } else { + int[] partitionsToClean = mergePartitionsInfo.finalizedPartitionsForClean; + submitCleanupTask(() -> + deleteOutdatedFinalizedPartitions(shuffleInfo, shuffleMergeId, + partitionsToClean, higherShuffleMergeIdToClean)); + } + } + @VisibleForTesting void closeAndDeleteOutdatedPartitions( - AppAttemptShuffleMergeId appAttemptShuffleMergeId, - Map partitions) { - removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId); + AppAttemptShuffleMergeId shuffleMergeId, + Map partitions, + Optional higherShuffleMergeId) { + removeAppShufflePartitionInfoFromDB(shuffleMergeId); + higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB); partitions - .forEach((partitionId, partitionInfo) -> { - synchronized (partitionInfo) { - partitionInfo.closeAllFilesAndDeleteIfNeeded(true); - } - }); + .forEach((partitionId, partitionInfo) -> { + synchronized (partitionInfo) { + partitionInfo.closeAllFilesAndDeleteIfNeeded(true); + } + }); + } + + @VisibleForTesting + void deleteOutdatedFinalizedPartitions( + AppShuffleInfo shuffleInfo, + AppAttemptShuffleMergeId shuffleMergeId, + int[] outdatedFinalizedPartitions, + Optional higherShuffleMergeId) { + int shuffleId = shuffleMergeId.shuffleId; + int mergeId = shuffleMergeId.shuffleMergeId; + removeAppShufflePartitionInfoFromDB(shuffleMergeId); + higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB); + Arrays.stream(outdatedFinalizedPartitions).forEach(partition -> { + try { + File metaFile = shuffleInfo.getMergedShuffleMetaFile(shuffleId, mergeId, partition); + metaFile.delete(); + } catch (Exception e) { + logger.warn("Error deleting meta file for {}", shuffleMergeId, e); + } + try { + File indexFile = + new File(shuffleInfo.getMergedShuffleIndexFilePath(shuffleId, mergeId, partition)); + indexFile.delete(); + } catch (Exception e) { + logger.warn("Error deleting index file for {}", shuffleMergeId, e); + } + try { + File dataFile = shuffleInfo.getMergedShuffleDataFile(shuffleId, mergeId, partition); + dataFile.delete(); + } catch (Exception e) { + logger.warn("Error deleting dataFile file for {}", shuffleMergeId, e); + } + }); } /** @@ -653,9 +750,8 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return // empty MergeStatuses but cleanup the older shuffleMergeId files. - submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + submitRemoveShuffleMergeTask(appShuffleInfo, shuffleId, + Optional.of(appAttemptShuffleMergeId)); } else { // This block covers: // 1. finalization of determinate stage @@ -712,6 +808,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId, bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); + appShuffleInfo.shuffles.get(msg.shuffleId).setFinalizedPartitions(Ints.toArray(reduceIds)); } logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalization of shuffle merge completed", msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId); @@ -736,21 +833,21 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { Map metaMap = mapper.readValue(mergeDirInfo, typeRef); String mergeDir = metaMap.get(MERGE_DIR_KEY); int attemptId = Integer.valueOf( - metaMap.getOrDefault(ATTEMPT_ID_KEY, String.valueOf(UNDEFINED_ATTEMPT_ID))); + metaMap.getOrDefault(ATTEMPT_ID_KEY, String.valueOf(DEFAUT_APP_ATTEMPT_ID))); if (mergeDir == null) { throw new IllegalArgumentException( String.format("Failed to get the merge directory information from the " + "shuffleManagerMeta %s in executor registration message", shuffleManagerMeta)); } - if (attemptId == UNDEFINED_ATTEMPT_ID) { + if (attemptId == DEFAUT_APP_ATTEMPT_ID) { // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo. // Only the first ExecutorRegister message can register the merge dirs. // DB will also get updated with the registered local path information. appsShuffleInfo.computeIfAbsent(appId, id -> { AppPathsInfo appPathsInfo = new AppPathsInfo(appId, executorInfo.localDirs, mergeDir, executorInfo.subDirsPerLocalDir); - writeAppPathsInfoToDb(appId, UNDEFINED_ATTEMPT_ID, appPathsInfo); - return new AppShuffleInfo(appId, UNDEFINED_ATTEMPT_ID, appPathsInfo); + writeAppPathsInfoToDb(appId, DEFAUT_APP_ATTEMPT_ID, appPathsInfo); + return new AppShuffleInfo(appId, DEFAUT_APP_ATTEMPT_ID, appPathsInfo); }); } else { // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo. @@ -1464,11 +1561,14 @@ public static class AppShuffleMergePartitionsInfo { Collections.emptyMap(); private final int shuffleMergeId; private final Map shuffleMergePartitions; + // Mark the finalized partitions need to be cleaned when removing merged shuffle later + private int[] finalizedPartitionsForClean; public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean shuffleFinalized) { this.shuffleMergeId = shuffleMergeId; this.shuffleMergePartitions = shuffleFinalized ? SHUFFLE_FINALIZED_MARKER : new ConcurrentHashMap<>(); + this.finalizedPartitionsForClean = new int[0]; } @VisibleForTesting @@ -1479,6 +1579,11 @@ public Map getShuffleMergePartitions() { public boolean isFinalized() { return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER; } + + public void setFinalizedPartitions(int[] finalizedPartitionsForClean){ + assert isFinalized(); + this.finalizedPartitionsForClean = finalizedPartitionsForClean; + } } /** @@ -1687,9 +1792,9 @@ void closeAllFilesAndDeleteIfNeeded(boolean delete) { try { if (dataChannel.isOpen()) { dataChannel.close(); - if (delete) { - dataFile.delete(); - } + } + if (delete) { + dataFile.delete(); } } catch (IOException ioe) { logger.warn("Error closing data channel for {} reduceId {}", diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index ad959c7e2e7c..33411baa09f8 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -50,7 +50,7 @@ public enum Type { FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10), LOCAL_DIRS_FOR_EXECUTORS(11), PUSH_BLOCK_STREAM(12), FINALIZE_SHUFFLE_MERGE(13), MERGE_STATUSES(14), FETCH_SHUFFLE_BLOCK_CHUNKS(15), DIAGNOSE_CORRUPTION(16), CORRUPTION_CAUSE(17), - PUSH_BLOCK_RETURN_CODE(18); + PUSH_BLOCK_RETURN_CODE(18), REMOVE_SHUFFLE_MERGE(19); private final byte id; @@ -88,6 +88,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) { case 16: return DiagnoseCorruption.decode(buf); case 17: return CorruptionCause.decode(buf); case 18: return BlockPushReturnCode.decode(buf); + case 19: return RemoveShuffleMerge.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java new file mode 100644 index 000000000000..798457655119 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol; + +import java.util.Objects; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +import org.apache.spark.network.protocol.Encoders; + +/** + * Request to remove merged shuffle blocks. + */ +public class RemoveShuffleMerge extends BlockTransferMessage { + public final String appId; + public final String appAttemptId; + public final Integer shuffleId; + public final Integer shuffleMergeId; + + public RemoveShuffleMerge( + String appId, String appAttemptId, Integer shuffleId, Integer shuffleMergeId) { + this.appId = appId; + this.appAttemptId = appAttemptId; + this.shuffleId = shuffleId; + this.shuffleMergeId = shuffleMergeId; + } + + @Override + protected Type type() { return Type.REMOVE_SHUFFLE_MERGE; } + + @Override + public int hashCode() { + return com.google.common.base.Objects + .hashCode(appId, appAttemptId, shuffleId, shuffleMergeId); + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("appId", appId) + .append("appAttemptId", appAttemptId) + .append("shuffleId", shuffleId) + .append("shuffleMergeId", shuffleMergeId) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (other instanceof RemoveShuffleMerge) { + RemoveShuffleMerge o = (RemoveShuffleMerge) other; + return Objects.equals(appId, o.appId) + && Objects.equals(appAttemptId, o.appAttemptId) + && Objects.equals(shuffleId, o.shuffleId) + && Objects.equals(shuffleMergeId, o.shuffleMergeId); + } + return false; + } + + @Override + public int encodedLength() { + return Encoders.Strings.encodedLength(appId) + + 4 /* encoded length of appAttemptId */ + + 4 /* encoded length of shuffleId */ + + 4/* encoded length of shuffleMergeId */; + } + + @Override + public void encode(ByteBuf buf) { + Encoders.Strings.encode(buf, appId); + Encoders.Strings.encode(buf, appAttemptId); + buf.writeInt(shuffleId); + buf.writeInt(shuffleMergeId); + } + + public static RemoveShuffleMerge decode(ByteBuf buf) { + String appId = Encoders.Strings.decode(buf); + String appAttemptId = Encoders.Strings.decode(buf); + int shuffleId = buf.readInt(); + int shuffleMergeId = buf.readInt(); + return new RemoveShuffleMerge(appId, appAttemptId, shuffleId, shuffleMergeId); + } +} diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index eb2c1d9fa5cb..2532c3f6c4c3 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -29,12 +29,14 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; - import org.apache.commons.io.FileUtils; import org.junit.After; import org.junit.Assert; @@ -43,8 +45,11 @@ import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.client.StreamCallbackWithID; @@ -56,6 +61,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; @@ -1146,17 +1152,8 @@ public void testBlockFetchWithOlderShuffleMergeId() throws IOException { @Test public void testCleanupOlderShuffleMergeId() throws IOException, InterruptedException { Semaphore closed = new Semaphore(0); - pushResolver = new RemoteBlockPushResolver(conf, null) { - @Override - void closeAndDeleteOutdatedPartitions( - AppAttemptShuffleMergeId appAttemptShuffleMergeId, - Map partitions) { - super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, partitions); - closed.release(); - } - }; String testApp = "testCleanupOlderShuffleMergeId"; - registerExecutor(testApp, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); + prepareForRemoveShuffle(closed, testApp); StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 1, 0, 0, 0)); @@ -1166,7 +1163,7 @@ void closeAndDeleteOutdatedPartitions( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 2, 0, 0, 0)); RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo = pushResolver.validateAndGetAppShuffleInfo(testApp); - closed.acquire(); + closed.tryAcquire(10, TimeUnit.SECONDS); assertFalse("Data files on the disk should be cleaned up", appShuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists()); assertFalse("Meta files on the disk should be cleaned up", @@ -1185,7 +1182,7 @@ void closeAndDeleteOutdatedPartitions( StreamCallbackWithID stream3 = pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 3, 0, 0, 0)); - closed.acquire(); + closed.tryAcquire(10, TimeUnit.SECONDS); stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2])); stream3.onComplete(stream3.getID()); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 3)); @@ -1195,7 +1192,7 @@ void closeAndDeleteOutdatedPartitions( StreamCallbackWithID stream4 = pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 4, 0, 0, 0)); - closed.acquire(); + closed.tryAcquire(10, TimeUnit.SECONDS); // Do not finalize shuffleMergeId 4 can happen during stage cancellation. stream4.onData(stream4.getID(), ByteBuffer.wrap(new byte[2])); stream4.onComplete(stream4.getID()); @@ -1203,7 +1200,7 @@ void closeAndDeleteOutdatedPartitions( // Check whether the data is cleaned up when higher shuffleMergeId finalize request comes // but no blocks pushed for that shuffleMergeId pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 5)); - closed.acquire(); + closed.tryAcquire(10, TimeUnit.SECONDS); assertFalse("MergedBlock meta file for shuffle 0 and shuffleMergeId 4 should be cleaned" + " up", appShuffleInfo.getMergedShuffleMetaFile(0, 4, 0).exists()); assertFalse("MergedBlock index file for shuffle 0 and shuffleMergeId 4 should be cleaned" @@ -1212,6 +1209,43 @@ void closeAndDeleteOutdatedPartitions( + " up", appShuffleInfo.getMergedShuffleDataFile(0, 4, 0).exists()); } + @Test + public void testRemoveShuffleMerge() throws IOException, InterruptedException { + Semaphore closed = new Semaphore(0); + String testApp = "testRemoveShuffleMerge"; + prepareForRemoveShuffle(closed, testApp); + + // 1. Check whether the data is cleaned up when merged shuffle is finalized + RemoteBlockPushResolver.AppShuffleInfo shuffleInfo = + pushResolver.validateAndGetAppShuffleInfo(testApp); + createShuffleData(testApp, 0, 1, 0, 0); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 1)); + assertTrue(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists()); + assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists()); + assertTrue(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists()); + pushResolver.removeShuffleMerge( + new RemoveShuffleMerge(testApp, String.valueOf(NO_ATTEMPT_ID), 0, 1)); + closed.tryAcquire(10, TimeUnit.SECONDS); + assertFalse(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists()); + assertFalse(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists()); + assertFalse(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists()); + + // 2. Check whether the data is cleaned up when merged shuffle is not finalized. + createShuffleData(testApp, 2, 1, 0, 0); + assertTrue(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists()); + pushResolver.removeShuffleMerge( + new RemoveShuffleMerge(testApp, String.valueOf(NO_ATTEMPT_ID), 2, 1)); + closed.tryAcquire(10, TimeUnit.SECONDS); + assertFalse(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists()); + + // 3. Check whether the data is cleaned up when higher shuffleMergeId finalize request comes + createShuffleData(testApp, 3, 1, 0, 0); + assertTrue(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists()); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 3, 2)); + closed.tryAcquire(10, TimeUnit.SECONDS); + assertFalse(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists()); + } + @Test public void testFinalizationResultIsEmptyWhenTheServerDidNotReceiveAnyBlocks() { //shuffle 1 0 is finalized even though the server didn't receive any blocks for it. @@ -1341,6 +1375,38 @@ AppShufflePartitionInfo newAppShufflePartitionInfo( registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); } + private void prepareForRemoveShuffle(Semaphore closed, String appId) throws IOException { + pushResolver = new RemoteBlockPushResolver(conf, null) { + @Override + void closeAndDeleteOutdatedPartitions( + AppAttemptShuffleMergeId shuffleMergeId, Map partitions, + Optional higherShuffleMergeId) { + super.closeAndDeleteOutdatedPartitions(shuffleMergeId, partitions, higherShuffleMergeId); + closed.release(); + } + @VisibleForTesting + void deleteOutdatedFinalizedPartitions( + AppShuffleInfo shuffleInfo, AppAttemptShuffleMergeId shuffleMergeId, + int[] outdatedFinalizedPartitions, + Optional higherShuffleMergeId) { + super.deleteOutdatedFinalizedPartitions(shuffleInfo, shuffleMergeId, + outdatedFinalizedPartitions, higherShuffleMergeId); + closed.release(); + } + }; + registerExecutor(appId, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); + } + + private StreamCallbackWithID createShuffleData(String appId, int shuffleId, + int shuffleMergedId, int mapIndex, int reduceId) throws IOException { + StreamCallbackWithID streamCallback = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(appId, NO_ATTEMPT_ID, shuffleId, + shuffleMergedId, mapIndex, reduceId, 0)); + streamCallback.onData(streamCallback.getID(), ByteBuffer.wrap(new byte[2])); + streamCallback.onComplete(streamCallback.getID()); + return streamCallback; + } + private Path[] createLocalDirs(int numLocalDirs) throws IOException { Path[] localDirs = new Path[numLocalDirs]; for (int i = 0; i < localDirs.length; i++) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fb3512619d87..6260c436dea3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1397,7 +1397,7 @@ private[spark] class DAGScheduler( private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { assert(stage.shuffleDep.shuffleMergeAllowed && !stage.shuffleDep.isShuffleMergeFinalizedMarked) if (stage.shuffleDep.getMergerLocs.isEmpty) { - getAndSetShufflePushMergerLocations(stage) + configureShufflePushMergerLocations(stage) } val shuffleId = stage.shuffleDep.shuffleId @@ -1413,17 +1413,20 @@ private[spark] class DAGScheduler( } } - private def getAndSetShufflePushMergerLocations(stage: ShuffleMapStage): Seq[BlockManagerId] = { + private def configureShufflePushMergerLocations(stage: ShuffleMapStage): Unit = { + if (stage.shuffleDep.getMergerLocs.nonEmpty) { + return + } val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) if (mergerLocs.nonEmpty) { stage.shuffleDep.setMergerLocs(mergerLocs) } + mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId, mergerLocs) logDebug(s"Shuffle merge locations for shuffle ${stage.shuffleDep.shuffleId} with" + s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" + s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") - mergerLocs } /** Called when stage's parents are available and we can now do its task. */ @@ -2621,13 +2624,13 @@ private[spark] class DAGScheduler( stage.shuffleDep.shuffleMergeAllowed && stage.shuffleDep.getMergerLocs.isEmpty && runningStages.contains(stage) }.foreach { case(_, stage: ShuffleMapStage) => - if (getAndSetShufflePushMergerLocations(stage).nonEmpty) { + configureShufflePushMergerLocations(stage) + if(stage.shuffleDep.getMergerLocs.nonEmpty) { + // stage's merger locations change from empty to non-empty logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" + s" ${stage.shuffleDep.shuffleId} and shuffle merge" + s" ${stage.shuffleDep.shuffleMergeId} with ${stage.shuffleDep.getMergerLocs.size}" + s" merger locations") - mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId, - stage.shuffleDep.getMergerLocs) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index adeb507941c0..55f019773910 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -319,51 +319,68 @@ class BlockManagerMasterEndpoint( } private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = { - val removeMsg = RemoveShuffle(shuffleId) - val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => - bm.storageEndpoint.ask[Boolean](removeMsg).recover { - // use false as default value means no shuffle data were removed - handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) + var removeShuffleFromShuffleServicesFutures = Seq.empty[Future[Boolean]] + if (pushBasedShuffleEnabled && externalBlockStoreClient.isDefined) { + val shuffleClient = externalBlockStoreClient.get + mapOutputTracker.shuffleStatuses.get(shuffleId) match { + case Some(shuffleStatus) => + val shuffleMergerLocations = shuffleStatus.getShufflePushMergerLocations + removeShuffleFromShuffleServicesFutures ++= shuffleMergerLocations.map(bmId => { + Future[Boolean] { + shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId) + } + }) + case None => + logWarning(s"Asked to remove merge shuffle blocks from " + + s"shuffle service for unknown shuffle ${shuffleId}") } - }.toSeq + } - // Find all shuffle blocks on executors that are no longer running - val blocksToDeleteByShuffleService = - new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] if (externalShuffleServiceRemoveShuffleEnabled) { - mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus => - shuffleStatus.withMapStatuses { mapStatuses => - mapStatuses.foreach { mapStatus => - // Check if the executor has been deallocated - if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) { - val blocksToDel = - shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId) - if (blocksToDel.nonEmpty) { - val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location, - new mutable.HashSet[BlockId]) - blocks ++= blocksToDel + val shuffleClient = externalBlockStoreClient.get + // Find all shuffle blocks on executors that are no longer running + val blocksToDelete = new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] + mapOutputTracker.shuffleStatuses.get(shuffleId) match { + case Some(shuffleStatus) => + shuffleStatus.withMapStatuses { mapStatuses => + mapStatuses.foreach { mapStatus => + // Check if the executor has been deallocated + if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) { + val blocksToDel = shuffleManager.shuffleBlockResolver + .getBlocksForShuffle(shuffleId, mapStatus.mapId) + if (blocksToDel.nonEmpty) { + val blocks = blocksToDelete.getOrElseUpdate(mapStatus.location, + new mutable.HashSet[BlockId]) + blocks ++= blocksToDel + } } } } - } + removeShuffleFromShuffleServicesFutures ++= blocksToDelete.map { case (bmId, blockIds) => + Future[Boolean] { + val numRemovedBlocks = shuffleClient.removeBlocks( + bmId.host, + bmId.port, + bmId.executorId, + blockIds.map(_.toString).toArray) + numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, + TimeUnit.SECONDS) == blockIds.size + } + }.toSeq + case None => + logDebug(s"Asked to remove shuffle blocks from " + + s"shuffle service for unknown shuffle ${shuffleId}") } } - - val removeShuffleFromShuffleServicesFutures = - externalBlockStoreClient.map { shuffleClient => - blocksToDeleteByShuffleService.map { case (bmId, blockIds) => - Future[Boolean] { - val numRemovedBlocks = shuffleClient.removeBlocks( - bmId.host, - bmId.port, - bmId.executorId, - blockIds.map(_.toString).toArray) - numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, - TimeUnit.SECONDS) == blockIds.size - } - } - }.getOrElse(Seq.empty) - + // It needs to be invoked at last to avoid cleaning up shuffleStatuses in mapOutputTracker + // too early before used by [[removeShuffleFromShuffleServicesFutures]] + val removeMsg = RemoveShuffle(shuffleId) + val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => + bm.storageEndpoint.ask[Boolean](removeMsg).recover { + // use false as default value means no shuffle data were removed + handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) + } + }.toSeq Future.sequence(removeShuffleFromExecutorsFutures ++ removeShuffleFromShuffleServicesFutures) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 16fa42056921..075a21c399e0 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -956,7 +956,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload( mergeManager2, mergeManager2DB) == 1) assert(ShuffleTestAccessor.getOutdatedFinalizedShuffleCountDuringDBReload( - mergeManager2, mergeManager2DB) == 2) + mergeManager2, mergeManager2DB) == 1) s2.stop() // Yarn Shuffle service comes back up without custom mergeManager